Skip to content

Commit

Permalink
update tracking3d
Browse files Browse the repository at this point in the history
  • Loading branch information
chanwutk committed Aug 10, 2023
1 parent e9a7c56 commit 756086f
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 4 deletions.
2 changes: 1 addition & 1 deletion playground/run-ablation.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@
" if geo_depth:\n",
" pipeline.add_filter(FromTracking2DAndRoad())\n",
" else:\n",
" pipeline.add_filter(FromTracking3DAndDepth())\n",
" pipeline.add_filter(FromTracking2DAndDepth())\n",
"\n",
" # Segment Trajectory\n",
" # pipeline.add_filter(FromTracking3D())\n",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from bitarray import bitarray

from ...payload import Payload
from ..detection_3d import Detection3D
from ..tracking_2d.tracking_2d import Tracking2D
from .tracking_3d import Metadatum, Tracking3D, Tracking3DResult


class FromTracking2DAndDetection2D(Tracking3D):
def _run(self, payload: "Payload") -> "tuple[bitarray | None, dict[str, list] | None]":
metadata: "list[Metadatum]" = []
trajectories: "dict[int, list[Tracking3DResult]]" = {}

trackings = Tracking2D.get(payload.metadata)
assert trackings is not None

detections = Detection3D.get(payload.metadata)
assert detections is not None

for k, detection, tracking, frame in zip(payload.keep, detections, trackings, payload.video):
dets, _, dids = detection
if not k or tracking is None or detection is None:
metadata.append(dict())
continue

points_left = dets[:, 6:9]
points_right = dets[:, 9:12]
points = (points_left + points_right) / 2

points_from_camera_left = dets[:, 12:15]
points_from_camera_right = dets[:, 15:18]
points_from_camera = (points_from_camera_left + points_from_camera_right) / 2

detection_map = {
did: (det, p, pfc)
for det, p, pfc, did
in zip(dets, points, points_from_camera, dids)
}
trackings3d: "dict[int, Tracking3DResult]" = {}
for object_id, t in tracking.items():
did = t.detection_id
det, p, pfc = detection_map[did]

xfc, yxc, zxc = pfc.tolist()
trackings3d[object_id] = Tracking3DResult(
t.frame_idx,
t.detection_id,
t.object_id,
(xfc, yxc, zxc),
p,
t.bbox_left,
t.bbox_top,
t.bbox_w,
t.bbox_h,
t.object_type,
frame.timestamp,
)
if object_id not in trajectories:
trajectories[object_id] = []
trajectories[object_id].append(trackings3d[object_id])
metadata.append(trackings3d)

for trajectory in trajectories.values():
last = len(trajectory) - 1
for i, traj in enumerate(trajectory):
if i > 0:
traj.prev = trajectory[i - 1]
if i < last:
traj.next = trajectory[i + 1]

return None, {self.classname(): metadata}
6 changes: 3 additions & 3 deletions spatialyze/video_processor/stages/tracking_3d/tracking_3d.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import datetime
from dataclasses import dataclass
from typing import Any, Dict, Tuple
from typing import Any, Dict

import numpy as np
import numpy.typing as npt

from ...types import DetectionId
from ...types import DetectionId, Float3
from ..stage import Stage


Expand All @@ -14,7 +14,7 @@ class Tracking3DResult:
frame_idx: int
detection_id: DetectionId
object_id: float
point_from_camera: Tuple[float, float, float]
point_from_camera: "Float3"
point: "npt.NDArray[np.floating]"
bbox_left: float
bbox_top: float
Expand Down

0 comments on commit 756086f

Please sign in to comment.