diff --git a/playground/run-ablation.ipynb b/playground/run-ablation.ipynb index 6e1b17b..8ec0c55 100644 --- a/playground/run-ablation.ipynb +++ b/playground/run-ablation.ipynb @@ -595,7 +595,7 @@ " if geo_depth:\n", " pipeline.add_filter(FromTracking2DAndRoad())\n", " else:\n", - " pipeline.add_filter(FromTracking3DAndDepth())\n", + " pipeline.add_filter(FromTracking2DAndDepth())\n", "\n", " # Segment Trajectory\n", " # pipeline.add_filter(FromTracking3D())\n", diff --git a/spatialyze/video_processor/stages/tracking_3d/from_tracking_2d_and_detection_3d.py b/spatialyze/video_processor/stages/tracking_3d/from_tracking_2d_and_detection_3d.py new file mode 100644 index 0000000..b3bfbc7 --- /dev/null +++ b/spatialyze/video_processor/stages/tracking_3d/from_tracking_2d_and_detection_3d.py @@ -0,0 +1,71 @@ +from bitarray import bitarray + +from ...payload import Payload +from ..detection_3d import Detection3D +from ..tracking_2d.tracking_2d import Tracking2D +from .tracking_3d import Metadatum, Tracking3D, Tracking3DResult + + +class FromTracking2DAndDetection2D(Tracking3D): + def _run(self, payload: "Payload") -> "tuple[bitarray | None, dict[str, list] | None]": + metadata: "list[Metadatum]" = [] + trajectories: "dict[int, list[Tracking3DResult]]" = {} + + trackings = Tracking2D.get(payload.metadata) + assert trackings is not None + + detections = Detection3D.get(payload.metadata) + assert detections is not None + + for k, detection, tracking, frame in zip(payload.keep, detections, trackings, payload.video): + dets, _, dids = detection + if not k or tracking is None or detection is None: + metadata.append(dict()) + continue + + points_left = dets[:, 6:9] + points_right = dets[:, 9:12] + points = (points_left + points_right) / 2 + + points_from_camera_left = dets[:, 12:15] + points_from_camera_right = dets[:, 15:18] + points_from_camera = (points_from_camera_left + points_from_camera_right) / 2 + + detection_map = { + did: (det, p, pfc) + for det, p, pfc, did + in zip(dets, points, points_from_camera, dids) + } + trackings3d: "dict[int, Tracking3DResult]" = {} + for object_id, t in tracking.items(): + did = t.detection_id + det, p, pfc = detection_map[did] + + xfc, yxc, zxc = pfc.tolist() + trackings3d[object_id] = Tracking3DResult( + t.frame_idx, + t.detection_id, + t.object_id, + (xfc, yxc, zxc), + p, + t.bbox_left, + t.bbox_top, + t.bbox_w, + t.bbox_h, + t.object_type, + frame.timestamp, + ) + if object_id not in trajectories: + trajectories[object_id] = [] + trajectories[object_id].append(trackings3d[object_id]) + metadata.append(trackings3d) + + for trajectory in trajectories.values(): + last = len(trajectory) - 1 + for i, traj in enumerate(trajectory): + if i > 0: + traj.prev = trajectory[i - 1] + if i < last: + traj.next = trajectory[i + 1] + + return None, {self.classname(): metadata} diff --git a/spatialyze/video_processor/stages/tracking_3d/tracking_3d.py b/spatialyze/video_processor/stages/tracking_3d/tracking_3d.py index f6873c6..e98154c 100644 --- a/spatialyze/video_processor/stages/tracking_3d/tracking_3d.py +++ b/spatialyze/video_processor/stages/tracking_3d/tracking_3d.py @@ -1,11 +1,11 @@ import datetime from dataclasses import dataclass -from typing import Any, Dict, Tuple +from typing import Any, Dict import numpy as np import numpy.typing as npt -from ...types import DetectionId +from ...types import DetectionId, Float3 from ..stage import Stage @@ -14,7 +14,7 @@ class Tracking3DResult: frame_idx: int detection_id: DetectionId object_id: float - point_from_camera: Tuple[float, float, float] + point_from_camera: "Float3" point: "npt.NDArray[np.floating]" bbox_left: float bbox_top: float