import os
import time
from typing import Tuple, List
import cv2
import numpy as np
import torch
from src.runtime.modules.output.common import get_filename_date_string, map_x_to_image, evaluate_predictions
from src.common.config.global_config import cfg, adv_cfg
[docs]def get_lane_color(i: int) -> Tuple:
""" Get a predefined colors depending on i. Colors repeat if i gets to big
Args:
i: any number, same number -> same color
Returns: Tuple containing 3 values, eg (255, 0, 0)
"""
lane_colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255)]
return lane_colors[i % 5]
[docs]class VisualOut:
"""
provides different visual output types
* live video
* record video
* save images
visualization can be points or lines
"""
def __init__(
self,
enable_live_video=cfg.video_out_enable_live_video,
enable_video_export=cfg.video_out_enable_video_export,
enable_image_export=cfg.video_out_enable_image_export,
enable_line_mode=cfg.video_out_enable_line_mode,
):
"""
used non-basic-cfg values:
- cfg.video_out_enable_live_video
- cfg.video_out_enable_video_export
- cfg.video_out_enable_image_export
- cfg.video_out_enable_line_mode
Args:
enable_live_video: show video
enable_video_export: save as video to disk
enable_image_export: save as image files to disk
enable_line_mode: visualization as lines instead of dots
"""
self.enable_live_video = enable_live_video
self.enable_video_export = enable_video_export
self.enable_image_export = enable_image_export
self.enable_line_mode = enable_line_mode
if enable_video_export:
# init video out
fourcc = cv2.VideoWriter_fourcc(*'MJPG')
out_filename = f'{cfg.dataset}{cfg.note}_{os.path.basename(cfg.trained_model).split(".")[0]}_{get_filename_date_string()}.avi'
out_full_path = os.path.join(cfg.work_dir, out_filename)
print(out_full_path)
self.vout = cv2.VideoWriter(out_full_path, fourcc, 30.0, (cfg.img_width, cfg.img_height))
[docs] def out(self, y: torch.Tensor, names: List[str], frames: List[np.ndarray]):
""" Generate visual output
Args:
y: network result (list of samples containing probabilities per sample)
names: filenames for y, if empty: frames have to be provided
frames: source frames, if empty: names have to be provided
"""
if not names and not frames:
raise Exception('at least frames or names have to be provided')
# iterate over samples
for i in range(len(y)):
lanes = np.array(map_x_to_image(evaluate_predictions(y[i]))) # get x coordinates based on probabilities
if frames:
vis = frames[i]
else:
vis = cv2.imread(os.path.join(cfg.data_root, names[i]))
if vis is None:
raise Exception('failed to load frame')
for i in range(lanes.shape[0]): # iterate over lanes
lane = lanes[i, :]
if np.sum(lane != -2) > 2: # If more than two points found for this lane
color = get_lane_color(i)
for j in range(lanes.shape[1]):
img_x = lane[j]
img_y = adv_cfg.scaled_h_samples[j]
if img_x != -2:
if self.enable_line_mode:
# find all previous points for current lane (in reverse order) that are not -2
# and store indexes of these points in prev_points
prev_points = [x for x in range(j - 1, -1, -1) if lane[x] != -2]
if prev_points:
cv2.line(
vis,
(lane[prev_points[0]], adv_cfg.scaled_h_samples[prev_points[0]]),
(img_x, img_y), color,
5
)
else:
cv2.circle(vis, (img_x, img_y), 5, color, -1)
if self.enable_live_video:
cv2.imshow('video', vis)
cv2.waitKey(1)
if self.enable_video_export:
self.vout.write(vis)
if self.enable_image_export:
out_path = os.path.join(
cfg.work_dir,
f'{get_filename_date_string()}_out', names[i] if names else int(time.time() * 1000000)
) # use current timestamp (nanoseconds) as fallback
cv2.imwrite(out_path, vis)