Skip to content

Commit

Permalink
add camera synch
Browse files Browse the repository at this point in the history
  • Loading branch information
h-mayorquin committed Nov 6, 2024
1 parent cc2d467 commit ad28275
Show file tree
Hide file tree
Showing 5 changed files with 376 additions and 54 deletions.
29 changes: 1 addition & 28 deletions src/fox_lab_to_nwb/assets/trialanalysis_photron.m
Original file line number Diff line number Diff line change
Expand Up @@ -62,34 +62,7 @@
ptrig = ddata.daq.data(:,9); %photron trigger
wtrig = ddata.daq.data(:,10); %wind trigger

%NOT CURRENTLY USING
%including in struct though so it's there if needed?
%otrig = ddata.daq.data(:,3); %opto trigger
dwbf = ddata.daq.data(:,6);
%wbf data from wingbeat amplifier needs to be multiplied by 100:
% 1V = 100Hz
dwbf = dwbf*100; %daq wingbeat freq, added 7/17
dwbaL = ddata.daq.data(:,4); %temp
% twbaL = twbaL - mean(twbaL(1:2499)); %set to 0 for denoising
dwbaR = ddata.daq.data(:,5);
% twbaR = twbaR - mean(twbaR(1:2499));
% hutchenL = ddata.daq.data(:,7);
% hutchenR = ddata.daq.data(:,8);

%ALIGN CAMERAS TO DAQ - CHECK WITH MIKE AGAIN?
%find trigger times
ctrigtime = find(ctrig>3,1)/ddata.daq.fs;
ptrigtime = find(ptrig>3,1)/ddata.daq.fs;

%topcam (currently skipping sidecam!)
meta_t = fastecMetaReader(fullfile(trial_path, 'TOPCAM_000000.txt'));
ts_t = linspace(1/meta_t.fs, meta_t.numframes/meta_t.fs, meta_t.numframes);
ts_t = ts_t-(ts_t(end)-ctrigtime);
%photron/phantom camera
mp = dir([trial_path filesep 'HALTCAM*.mii']); %since the number changes every time
meta_p = photronMetaReader(fullfile(mp.folder, mp.name));
ts_p = linspace(1/meta_p.fs, meta_p.numframes/meta_p.fs, meta_p.numframes);
ts_p = ts_p-(ts_p(end)-ptrigtime);
q

%% ANTENNAE
%subtract height from y data because y = 0 is at top of image
Expand Down
31 changes: 24 additions & 7 deletions src/fox_lab_to_nwb/behavior.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ def add_to_nwbfile(
"PTrigger",
]

# TODO: figure how how to store this
# Synchronization signals
cam_sync = daq_struct["data"][:, 0]
cam_trigger = daq_struct["data"][:, 1]
opto_trigger = daq_struct["data"][:, 2]
ptrigger = daq_struct["data"][:, 8]

# Behavior signals
left_wing_beat_amplitude = daq_struct["data"][:, 3]
right_wing_beat_amplitude = daq_struct["data"][:, 4]
Expand Down Expand Up @@ -105,3 +98,27 @@ def add_to_nwbfile(

nwbfile.add_acquisition(lhutchen_time_series)
nwbfile.add_acquisition(rhutchen_time_series)

def extract_synchronization_signals_info(self):

mat = read_mat(self.file_path)
recording_structure = mat["rec"]
daq_struct = recording_structure["daq"]

daq_sampling_rate = daq_struct["fs"]

cam_sync = daq_struct["data"][:, 0]
cam_trigger = daq_struct["data"][:, 1]
opto_trigger = daq_struct["data"][:, 2]
ptrigger = daq_struct["data"][:, 8]

return_dict = {
"daq_sampling_rate": daq_sampling_rate,
"cam_sync": cam_sync,
"cam_trigger": cam_trigger,
"opto_trigger": opto_trigger,
"ptrigger": ptrigger,
}

return return_dict

207 changes: 207 additions & 0 deletions src/fox_lab_to_nwb/camera_utilites.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
from lxml import etree
import os

def extract_phantom_metadata(xml_path):
# Parse XML using lxml
tree = etree.parse(xml_path)
root = tree.getroot()

# Create metadata dictionary
metadata = {}

# Define paths and their corresponding keys/types
paths = {
# Camera settings
"frame_rate": (".//FrameRateDouble", float),
"total_frames": (".//TotalImageCount", int),
"first_frame": (".//FirstMovieImage", int),
"image_count": (".//ImageCount", int),
# Image properties
"width": (".//biWidth", int),
"height": (".//biHeight", int),
"bit_depth": (".//biBitCount", int),
"bit_depth_recording": (".//RecBPP", int),
# Camera info
"camera_model": (".//CameraModel", str),
"camera_version": (".//CameraVersion", int),
"firmware_version": (".//FirmwareVersion", int),
"software_version": (".//SoftwareVersion", int),
"serial": (".//Serial", int),
# Timing
"shutter_ns": (".//ShutterNs", int),
"frame_delay_ns": (".//FrameDelayNs", int),
# Image settings
"compression": (".//Compression", int),
"saturation": (".//Saturation", float),
"brightness": (".//Bright", int),
"contrast": (".//Contrast", int),
"gamma": (".//Gamma", float),
# Trigger settings
"trigger_frame": (".//TrigFrame", int),
"post_trigger": (".//PostTrigger", int),
# Auto exposure
"auto_exposure": (".//AutoExposure", bool),
"auto_exp_level": (".//AutoExpLevel", int),
"auto_exp_speed": (".//AutoExpSpeed", int),
}

# Extract all metadata based on paths
for key, (xpath, type_conv) in paths.items():
element = root.find(xpath)
if element is not None and element.text:
try:
metadata[key] = type_conv(element.text)
except (ValueError, TypeError):
metadata[key] = None
else:
metadata[key] = None

# Special handling for trigger time
trigger_date = root.find(".//TriggerTime/Date")
trigger_time = root.find(".//TriggerTime/Time")
if trigger_date is not None and trigger_time is not None:
metadata["trigger_time"] = f"{trigger_date.text} {trigger_time.text}"

# Get image acquisition position and size
metadata["acquisition"] = {
"pos_x": int(root.find(".//ImPosXAcq").text) if root.find(".//ImPosXAcq") is not None else None,
"pos_y": int(root.find(".//ImPosYAcq").text) if root.find(".//ImPosYAcq") is not None else None,
"width": int(root.find(".//ImWidthAcq").text) if root.find(".//ImWidthAcq") is not None else None,
"height": int(root.find(".//ImHeightAcq").text) if root.find(".//ImHeightAcq") is not None else None,
}

# Get white balance gains
wb_element = root.find(".//WBGain")
if wb_element is not None:
metadata["white_balance"] = {
"red": float(wb_element.find("Red").text) if wb_element.find("Red") is not None else None,
"blue": float(wb_element.find("Blue").text) if wb_element.find("Blue") is not None else None,
}

return metadata


from typing import Any


def extract_fastec_metadata(file_path: str) -> dict[str, dict[str, Any]]:
"""
Extract metadata from a Fastec camera metadata file.
Parameters
----------
file_path : str
Path to the Fastec metadata file.
Returns
-------
Dict[str, Dict[str, Any]]
Nested dictionary containing the parsed metadata.
The top level dictionary has sections as keys ('image', 'camera', 'record', 'normalization').
Each section contains a dictionary of key-value pairs with automatically converted data types.
Notes
-----
The function automatically converts values to appropriate types:
- Integers for numeric values
- Floats for decimal numbers
- Lists for matrix values [x,y,z]
- Tuples for bit modes (e.g., "10:3")
- Strings for text and other values
Examples
--------
>>> metadata = extract_fastec_metadata('metadata.txt')
>>> frame_rate = metadata['record']['fps']
>>> resolution = (metadata['image']['width'], metadata['image']['height'])
Raises
------
FileNotFoundError
If the metadata file is not found.
PermissionError
If there are insufficient permissions to read the file.
"""

def parse_value(value: str) -> int | float | list[int] | tuple[int, int] | str:
"""
Parse a string value into its appropriate type.
Parameters
----------
value : str
The string value to parse
Returns
-------
int | float | list[int] | tuple[int, int] | str
Parsed value in its appropriate type
"""
# Try to convert to int
try:
return int(value)
except ValueError:
pass

# Try to convert to float
try:
return float(value)
except ValueError:
pass

# Handle matrix values [x,y,z]
if value.startswith("[") and value.endswith("]"):
try:
return [int(x) for x in value[1:-1].split(",")]
except ValueError:
return value

# Handle bit mode (e.g., "10:3")
if ":" in value and len(value.split(":")) == 2:
try:
return tuple(int(x) for x in value.split(":"))
except ValueError:
return value

# Return as string if no other type matches
return value

# Check if file exists
if not os.path.exists(file_path):
raise FileNotFoundError(f"Metadata file not found: {file_path}")

metadata: Dict[str, Dict[str, Any]] = {}
current_section: Union[str, None] = None

try:
with open(file_path, "r") as file:
for line in file:
line = line.strip()

# Skip empty lines
if not line:
continue

# Check if line is a section header
if line.startswith("[") and line.endswith("]"):
current_section = line[1:-1].lower()
metadata[current_section] = {}
continue

# Parse key-value pairs
if "=" in line and current_section is not None:
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()

# Parse value to appropriate type
parsed_value = parse_value(value)

metadata[current_section][key] = parsed_value

return metadata

except PermissionError:
raise PermissionError(f"Insufficient permissions to read file: {file_path}")
except Exception as e:
raise Exception(f"Error parsing metadata file: {str(e)}")
Loading

0 comments on commit ad28275

Please sign in to comment.