Skip to content

Commit

Permalink
Make base class for video processing, change noise masking to subclass
Browse files Browse the repository at this point in the history
  • Loading branch information
t-sasatani committed Jan 15, 2025
1 parent 77da3b8 commit 80f3a4f
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 34 deletions.
2 changes: 1 addition & 1 deletion mio/models/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

from pathlib import Path
from typing import List, Optional, TypeVar
from typing import List, Optional, TypeVar, Union

import cv2
import numpy as np
Expand Down
216 changes: 183 additions & 33 deletions mio/process/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,171 @@ def gen_freq_mask(
cv2.destroyAllWindows()
return mask

class BaseVideoProcessor:
"""
Base class for defining an abstract video processor.
Attributes:
name (str): The name of the video processor.
output_frames (list): A list of output frames.
named_frame (NamedFrame): A NamedFrame object.
"""
def __init__(self, name: str, width: int, height: int, output_dir: Path):
"""
Initialize the BaseVideoProcessor object.
Parameters:
name (str): The name of the video processor.
"""
self.name: str = name
self.output_dir: Path = output_dir
self.output_frames: list = []
self.output_named_frame = None
self.processor = FrameProcessor(
height=height,
width=width,
)

def append_frame(self, frame: np.ndarray)-> None:
"""
Append a frame to the output_frames list.
Parameters:
frame (np.ndarray): The frame to append.
"""
self.output_frames.append(frame)

def process_frame(self, frame: np.ndarray)-> None:
"""
Process a single frame. This method should be implemented in the subclass.
"""
pass

def _set_output_named_frame(self)-> None:
"""
Set the named frame object.
"""
self.output_named_frame = NamedFrame(name=self.name, video_frame=self.output_frames)

def export_video(self)-> None:
"""
Export the video to a file.
"""
self._set_output_named_frame()
self.output_named_frame.export(
output_path=self.output_dir / "output",
fps=20,
suffix=True,
)

def get_output_named_frames(self)-> NamedFrame:
"""
Get a NamedFrame object from the output frames.
"""
self._set_output_named_frame()
return self.output_named_frame

class NoisePatchProcessor(BaseVideoProcessor):
"""
A class to apply noise patching to a video.
"""
def __init__(
self, name: str,
noise_patch_config: NoisePatchConfig,
width: int,
height: int,
output_dir: Path)-> None:
"""
Initialize the NoisePatchProcessor object.
Parameters:
name (str): The name of the video processor.
noise_patch_config (NoisePatchConfig): The noise patch configuration.
"""
super().__init__(name, width, height, output_dir)
self.noise_patch_config: NoisePatchConfig = noise_patch_config
self.noise_patchs = []
self.diff_frames = []
self.noise_patch_named_frame: NamedFrame = None
self.diff_frames_named_frame: NamedFrame = None

def process_frame(
self,
raw_frame: np.ndarray,
previous_frame: np.ndarray)-> Tuple[np.ndarray, np.ndarray]:
"""
Process a single frame.
Parameters:
raw_frame (np.ndarray): The raw frame to process.
previous_frame (np.ndarray): The previous frame to compare against.
"""

if self.noise_patch_config.enable:
patched_frame, noise_patch = self.processor.patch_noisy_buffer(
raw_frame,
previous_frame,
self.noise_patch_config,
)
self.append_frame(patched_frame)
self.noise_patchs.append(noise_patch * np.iinfo(np.uint8).max)
self.diff_frames.append(
cv2.absdiff(raw_frame, previous_frame) * self.noise_patch_config.diff_multiply)

return patched_frame, noise_patch
else:
return raw_frame, None

def _set_noise_patch_named_frame(self)-> None:
"""
Set the NamedFrame object for the noise patch.
"""
self.noise_patch_named_frame = NamedFrame(name="patched_area", video_frame=self.noise_patchs)

def _set_diff_frames_named_frame(self)-> None:
"""
Set the NamedFrame object for the difference frames.
"""
self.diff_frames_named_frame = NamedFrame(
name=f"diff_{self.noise_patch_config.diff_multiply}x",
video_frame=self.diff_frames,
)

def get_noise_patch_named_frame(self)-> NamedFrame:
"""
Get a NamedFrame object for the noise patch.
"""
self._set_noise_patch_named_frame()
return self.noise_patch_named_frame

def get_diff_frames_named_frame(self)-> NamedFrame:
"""
Get a NamedFrame object for the difference frames.
"""
self._set_diff_frames_named_frame()
return self.diff_frames_named_frame

def export_noise_patch(self)-> None:
"""
Export the noise patch to a file.
"""
self._set_noise_patch_named_frame()
self.noise_patch_named_frame.export(
output_path=self.output_dir / f"{self.name}",
fps=20,
suffix=True,
)

def export_diff_frames(self)-> None:
"""
Export the difference frames to a file.
"""
self._set_diff_frames_named_frame()
self.diff_frames_named_frame.export(
output_path=self.output_dir / f"{self.name}",
fps=20,
suffix=True,
)

class VideoProcessor:
"""
Expand All @@ -229,19 +394,24 @@ def denoise(
)

reader = VideoReader(video_path)

pathstem = Path(video_path).stem

output_dir = Path.cwd() / config.output_dir
if not output_dir.exists():
output_dir.mkdir(parents=True)

# Initialize lists to store frames
raw_frames = []
output_frames = []
if config.noise_patch.enable:
patched_frames = []
noise_patchs = []
diff_frames = []

noise_patch_processor = NoisePatchProcessor(
output_dir=output_dir,
name="patch",
noise_patch_config=config.noise_patch,
width=reader.width,
height=reader.height,
)

if config.frequency_masking.enable:
freq_domain_frames = []
freq_filtered_frames = []
Expand All @@ -252,7 +422,7 @@ def denoise(
width=reader.width,
)

if config.noise_patch.enable:
if config.frequency_masking.enable:
freq_mask = processor.gen_freq_mask(
freq_mask_config=config.frequency_masking,
)
Expand All @@ -273,19 +443,11 @@ def denoise(
previous_frame = raw_frame.copy()
logger.debug(f"Processing frame {index}")

patched_frame, noise_patch = processor.patch_noisy_buffer(
patched_frame, noise_patch = noise_patch_processor.process_frame(
raw_frame,
previous_frame,
config.noise_patch,
)
previous_frame = patched_frame
patched_frames.append(patched_frame)
noise_patchs.append(noise_patch * np.iinfo(np.uint8).max)

if config.noise_patch.output_diff:
diff_frame = cv2.absdiff(raw_frame, previous_frame)
diff_frames.append(diff_frame * config.noise_patch.diff_multiply)

output_frame = patched_frame

if config.frequency_masking.enable:
Expand All @@ -308,24 +470,12 @@ def denoise(
raw_video = NamedFrame(name="RAW", video_frame=raw_frames)

if config.noise_patch.enable:
patched_video = NamedFrame(name="patched", video_frame=patched_frames)
if config.noise_patch.output_result:
patched_video.export(
output_dir / f"{pathstem}",
suffix=True,
fps=20,
)
patched_video = noise_patch_processor.get_output_named_frames()
noise_patch_processor.export_video()
if config.noise_patch.output_diff:
diff_video = NamedFrame(
name=f"diff_{config.noise_patch.diff_multiply}x", video_frame=diff_frames
)
diff_video.export(
output_dir / f"{pathstem}",
suffix=True,
fps=20,
)
noise_patch_processor.export_diff_frames()
if config.noise_patch.output_noise_patch:
noise_patch = NamedFrame(name="noise_patch", video_frame=noise_patchs)
noise_patch_processor.export_noise_patch()
if config.frequency_masking.output_mask:
freq_mask_frame = NamedFrame(
name="freq_mask", static_frame=freq_mask * np.iinfo(np.uint8).max
Expand Down Expand Up @@ -359,8 +509,8 @@ def denoise(
if config.interactive_display.enable:
videos = [
raw_video,
noise_patch,
patched_video,
noise_patch_processor.get_noise_patch_named_frame(),
noise_patch_processor.get_output_named_frames(),
freq_filtered_video,
freq_domain_video,
min_proj_frame,
Expand Down

0 comments on commit 80f3a4f

Please sign in to comment.