Skip to content

Commit

Permalink
cover test (#13)
Browse files Browse the repository at this point in the history
* cover test

* cover depth estimation

* cover

* enable progress
  • Loading branch information
chanwutk authored Aug 13, 2023
1 parent 77578f5 commit 2e6fa16
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 35 deletions.
1 change: 0 additions & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ ignore:
- "spatialyze/video_processor/modules"
- "spatialyze/video_processor/stages/detection_estimation/*"
- "spatialyze/video_processor/stages/segment_trajectory/*"
- "spatialyze/video_processor/stages/depth_estimation.py"
- "spatialyze/video_processor/stages/strongsort_with_skip.py"
- "spatialyze/video_processor/utils/preprocess.py"
- "spatialyze/video_processor/utils/process_pipeline.py"
Expand Down
6 changes: 2 additions & 4 deletions spatialyze/video_processor/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,13 @@ def metadata_len(metadata: "dict[str, list[Any]]") -> "int | None":
for v in metadata.values():
if length is None:
length = len(v)
elif length != len(v):
raise Exception()
assert length == len(v), (length, len(v))
return length


def _default_keep(video: "Video", keep: "bitarray | None" = None):
if keep is None:
keep = bitarray(len(video))
keep.setall(1)
elif len(keep) != len(video):
raise Exception()
assert len(keep) == len(video), (len(keep), len(video))
return keep
31 changes: 5 additions & 26 deletions spatialyze/video_processor/stages/depth_estimation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING

import numpy.typing as npt
import PIL.Image as pil
Expand All @@ -21,10 +21,10 @@


class DepthEstimation(Stage["npt.NDArray | None"]):
def _run(self, payload: "Payload") -> "Tuple[Optional[bitarray], Optional[Dict[str, list]]]":
def _run(self, payload: "Payload") -> "tuple[bitarray | None, dict[str, list] | None]":
md = monodepth()
assert payload.metadata is not None
images: "List[npt.NDArray | None]" = []
images: "list[npt.NDArray | None]" = []

decoded_frames = DecodeFrame.get(payload)
assert decoded_frames is not None
Expand Down Expand Up @@ -102,29 +102,8 @@ def __init__(self, model_name=MODEL_NAMES[2], no_cuda=False, pred_metric_depth=T
self.depth_decoder.to(self.device)
self.depth_decoder.eval()

def eval(self, input_image_numpy):
with torch.no_grad():
# Load image and preprocess
input_image = pil.fromarray(input_image_numpy[:, :, [2, 1, 0]])
original_width, original_height = input_image.size
input_image = input_image.resize((self.feed_width, self.feed_height), pil.LANCZOS)
input_image = transforms.ToTensor()(input_image).unsqueeze(0)

# PREDICTION
input_image = input_image.to(self.device)
features = self.encoder(input_image)
outputs = self.depth_decoder(features)

disp = outputs[("disp", 0)]
disp_resized = torch.nn.functional.interpolate(
disp, (original_height, original_width), mode="bilinear", align_corners=False
)

disp_resized_np = disp_resized.squeeze().cpu().detach().numpy() * 5.4
return disp_resized_np

def eval_all(self, input_images: "List[npt.NDArray | None]"):
output: "List[npt.NDArray | None]" = []
def eval_all(self, input_images: "list[npt.NDArray | None]"):
output: "list[npt.NDArray | None]" = []
with torch.no_grad():
# for im in tqdm(input_images):
for im in input_images:
Expand Down
7 changes: 5 additions & 2 deletions tests/video_processor/steps/test_detection_2d.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ def test_detection_2d():
with open(os.path.join(VIDEO_DIR, 'frames.pkl'), 'rb') as f:
videos = pickle.load(f)

pipeline = Pipeline([DecodeFrame(), YoloDetection()])
pipeline = Pipeline()
pipeline.add_filter(DecodeFrame())
pipeline.add_filter(YoloDetection())

for name, video in videos.items():
if video['filename'] not in files:
Expand All @@ -38,7 +40,8 @@ def test_detection_2d():
keep[(len(frames) * 3) // 4:] = 1

output = pipeline.run(Payload(frames, keep))
det_result = YoloDetection.get(output)
# det_result = YoloDetection.get(output)
det_result = output['Detection2D']
assert det_result is not None

# with open(os.path.join(OUTPUT_DIR, f'YoloDetection--{name}.json'), 'w') as f:
Expand Down
3 changes: 2 additions & 1 deletion tests/video_processor/steps/test_detection_3d.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def test_detection_3d():
keep[(len(frames) * 3) // 4:] = 1

output = pipeline.run(Payload(frames, keep))
det_result = FromDetection2DAndRoad.get(output)
# det_result = FromDetection2DAndRoad.get(output)
det_result = output[FromDetection2DAndRoad]
assert det_result is not None

# with open(os.path.join(OUTPUT_DIR, f'FromDetection2DAndRoad--{name}.json'), 'w') as f:
Expand Down
5 changes: 4 additions & 1 deletion tests/video_processor/steps/test_tracking_2d.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
from spatialyze.video_processor.stages.decode_frame.decode_frame import DecodeFrame
from spatialyze.video_processor.stages.detection_2d.yolo_detection import YoloDetection
from spatialyze.video_processor.stages.tracking_2d.strongsort import StrongSORT
from spatialyze.video_processor.stages.stage import Stage

OUTPUT_DIR = './data/pipeline/test-results'
VIDEO_DIR = './data/pipeline/videos'
disable_cache()
Stage.enable_progress()

def test_detection_3d():
files = os.listdir(VIDEO_DIR)
Expand All @@ -44,7 +46,8 @@ def test_detection_3d():
keep[(len(frames) * 3) // 4:] = 1

output = pipeline.run(Payload(frames, keep))
track_result = StrongSORT.get(output)
# track_result = StrongSORT.get(output)
track_result = output['Tracking2D.StrongSORT']
assert track_result is not None

# with open(os.path.join(OUTPUT_DIR, f'StrongSORT--{name}.json'), 'w') as f:
Expand Down

0 comments on commit 2e6fa16

Please sign in to comment.