Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add video preprocessing (denoising) feature #83

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,8 @@ cython_debug/
#.idea/
wirefree_example.mp4
wirefree_example.avi
.pdm-python
.pdm-python
user_dir/*
!user_dir/.gitkeep

~/.config/miniscope_io/logs/
2 changes: 2 additions & 0 deletions mio/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import click

from mio.cli.config import config
from mio.cli.process import process
from mio.cli.stream import stream
from mio.cli.update import device, update

Expand All @@ -23,3 +24,4 @@ def cli(ctx: click.Context) -> None:
cli.add_command(update)
cli.add_command(device)
cli.add_command(config)
cli.add_command(process)
42 changes: 42 additions & 0 deletions mio/cli/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
Command line interface for offline video pre-processing.
"""

import click

from mio.models.process import DenoiseConfig
from mio.process.video import VideoProcessor


@click.group()
def process() -> None:
"""
Command group for video processing.
"""
pass


@process.command()
@click.option(
"-i",
"--input",
required=True,
type=click.Path(exists=True, dir_okay=False),
help="Path to the video file to process.",
)
@click.option(
"-c",
"--denoise_config",
required=True,
type=click.Path(exists=True, dir_okay=False),
help="Path to the YAML processing configuration file.",
)
def denoise(
input: str,
denoise_config: str,
) -> None:
"""
Denoise a video file.
"""
denoise_config_parsed = DenoiseConfig.from_yaml(denoise_config)
VideoProcessor.denoise(input, denoise_config_parsed)
29 changes: 29 additions & 0 deletions mio/data/config/process/denoise_example.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
id: denoise_example
mio_model: tests.test_mixins.LoaderModel
mio_version: 0.6.1.dev16+g6436633.d20241211
interactive_display:
enable: false
start_frame: 40
end_frame: 140
noise_patch:
enable: true
method: mean_error
threshold: 30
buffer_size: 5032
buffer_split: 10
diff_multiply: 1
output_result: true
output_noise_patch: true
output_diff: false
frequency_masking:
enable: true
spatial_LPF_cutoff_radius: 15
vertical_BEF_cutoff: 2
horizontal_BEF_cutoff: 0
display_mask: false
output_mask: true
output_result: true
output_freq_domain: false
end_frame: -1 #-1 means all frames
output_result: true
output_dir: user_dir/output
103 changes: 102 additions & 1 deletion mio/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import contextlib
import csv
from pathlib import Path
from typing import Any, BinaryIO, List, Literal, Optional, Union, overload
from typing import Any, BinaryIO, Iterator, List, Literal, Optional, Tuple, Union, overload

import cv2
import numpy as np
Expand All @@ -19,6 +19,107 @@
from mio.types import ConfigSource


class VideoWriter:
"""
Write data to a video file using OpenCV.
"""

@staticmethod
def init_video(
path: Union[Path, str],
width: int,
height: int,
fps: int,
fourcc: str = "Y800",
**kwargs: dict,
) -> cv2.VideoWriter:
"""
Create a parameterized video writer

Parameters
----------
frame_buffer_queue : multiprocessing.Queue[list[bytes]]
Input buffer queue.
path : Union[Path, str]
Video file to write to
width : int
Width of video
height : int
Height of video
frame_rate : int
Frame rate of video
fourcc : str
Fourcc code to use
kwargs : dict
passed to :class:`cv2.VideoWriter`

Returns:
---------
:class:`cv2.VideoWriter`
"""
if isinstance(path, str):
path = Path(path)

fourcc = cv2.VideoWriter_fourcc(*fourcc)
frame_rate = fps
frame_size = (width, height)
out = cv2.VideoWriter(str(path), fourcc, frame_rate, frame_size, **kwargs)
return out


class VideoReader:
"""
A class to read video files.
"""

def __init__(self, video_path: str):
"""
Initialize the VideoReader object.

Parameters:
video_path (str): The path to the video file.

Raises:
ValueError: If the video file cannot be opened.
"""
self.video_path = video_path
self.cap = cv2.VideoCapture(str(video_path))
self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.logger = init_logger("VideoReader")

if not self.cap.isOpened():
raise ValueError(f"Could not open video at {video_path}")

self.logger.info(f"Opened video at {video_path}")

def read_frames(self) -> Iterator[Tuple[int, np.ndarray]]:
"""
Read frames from the video file along with their index.

Yields:
Tuple[int, np.ndarray]: The index and the next frame in the video.
"""
while self.cap.isOpened():
ret, frame = self.cap.read()
index = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES))
self.logger.debug(f"Reading frame {index}")

if not ret:
break

yield index, frame

def release(self) -> None:
"""
Release the video capture object.
"""
self.cap.release()

def __del__(self):
self.release()


class BufferedCSVWriter:
"""
Write data to a CSV file in buffered mode.
Expand Down
136 changes: 136 additions & 0 deletions mio/models/frames.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""
Pydantic models for storing frames and videos.
"""

from pathlib import Path
from typing import List, Optional, TypeVar

import cv2
import numpy as np
from pydantic import BaseModel, Field, model_validator

from mio.io import VideoWriter
from mio.logging import init_logger

T = TypeVar("T", np.ndarray, List[np.ndarray], List[List[np.ndarray]])

logger = init_logger("model.frames")


class NamedFrame(BaseModel):
"""
Pydantic model to store an array (frame/video/video list) together with a name.
"""

name: str = Field(
...,
description="Name of the video.",
)
static_frame: Optional[np.ndarray] = Field(
None,
description="Frame data, if provided.",
)
video_frame: Optional[List[np.ndarray]] = Field(
None,
description="Video data, if provided.",
)
video_list_frame: Optional[List[List[np.ndarray]]] = Field(
None,
description="List of video data, if provided.",
)
frame_type: Optional[str] = Field(
None,
description="Type of frame data.",
)

@model_validator(mode="before")
def check_frame_type(cls, values: dict) -> dict:
"""
Ensure that exactly one of static_frame, video_frame, or video_list_frame is provided.
"""
static = values.get("static_frame")
video = values.get("video_frame")
video_list = values.get("video_list_frame")

# Identify which fields are present
present_fields = [
(field_name, field_value)
for field_name, field_value in zip(
("static_frame", "video_frame", "video_list_frame"), (static, video, video_list)
)
if field_value is not None
]

if len(present_fields) != 1:
raise ValueError(
"Exactly one of static_frame, video_frame, or video_list_frame must be provided."
)

# Record which frame type is present
values["frame_type"] = present_fields[0][0]

return values

@property
def data(self) -> T:
"""Return the content of the populated field."""
if self.frame_type == "static_frame":
return self.static_frame
elif self.frame_type == "video_frame":
return self.video_frame
elif self.frame_type == "video_list_frame":
return self.video_list_frame
else:
raise ValueError("Unknown frame type or no frame data provided.")

def export(self, output_path: Path, fps: int, suffix: bool) -> None:
"""
Export the frame data to a file.

Parameters
----------
output_path : str
Path to the output file.
fps : int
Frames per second for the

Raises
------
NotImplementedError
If the frame type is video_list_frame.
"""
if suffix:
output_path = output_path.with_name(output_path.stem + f"_{self.name}")
if self.frame_type == "static_frame":
# write PNG out
cv2.imwrite(str(output_path.with_suffix(".png")), self.static_frame)
elif self.frame_type == "video_frame":
writer = VideoWriter.init_video(
path=output_path.with_suffix(".avi"),
width=self.video_frame[0].shape[1],
height=self.video_frame[0].shape[0],
fps=20,
)
logger.info(
f"Writing video to {output_path}.avi:"
f"{self.video_frame[0].shape[1]}x{self.video_frame[0].shape[0]}"
)
try:
for frame in self.video_frame:
picture = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
writer.write(picture)
finally:
writer.release()

elif self.frame_type == "video_list_frame":
raise NotImplementedError("Exporting video list frames is not yet supported.")
else:
raise ValueError("Unknown frame type or no frame data provided.")

class Config:
"""
Pydantic config for allowing np.ndarray types.
Could be an Numpydantic situation so will look into it later.
"""

arbitrary_types_allowed = True
Loading
Loading