Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nick updates. #7

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 124 additions & 58 deletions src/jazayeri_lab_to_nwb/watters/watters_convert_session.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,55 @@
"""Primary script to run to convert an entire session for of data using the NWBConverter."""
from pathlib import Path
from typing import Union

import datetime
import glob
import json
from zoneinfo import ZoneInfo
import logging
from pathlib import Path
from typing import Union
from uuid import uuid4
from zoneinfo import ZoneInfo

from neuroconv.utils import load_dict_from_file, dict_deep_update

from jazayeri_lab_to_nwb.watters import WattersNWBConverter
from wattersnwbconverter import WattersNWBConverter

# Set logger level for info is displayed in console
logging.getLogger().setLevel(logging.INFO)


def _get_single_file(directory, suffix=''):
"""Get path to a file in given directory with given suffix.

Raisees error if not exactly one satisfying file.
"""
files = list(glob.glob(str(directory / f'*{suffix}')))
if len(files) == 0:
raise ValueError(f'No {suffix} files found in {directory}')
if len(files) > 1:
raise ValueError(f'Multiple {suffix} files found in {directory}')
return files[0]

def session_to_nwb(data_dir_path: Union[str, Path], output_dir_path: Union[str, Path], stub_test: bool = False):

data_dir_path = Path(data_dir_path)
def session_to_nwb(data_dir: Union[str, Path],
output_dir_path: Union[str, Path],
stub_test: bool=False):

logging.info('')
logging.info(f'data_dir = {data_dir}')
logging.info(f'output_dir_path = {output_dir_path}')
logging.info(f'stub_test = {stub_test}')

data_dir = Path(data_dir)
output_dir_path = Path(output_dir_path)
if stub_test:
output_dir_path = output_dir_path / "nwb_stub"
output_dir_path.mkdir(parents=True, exist_ok=True)

session_id = f"ses-{data_dir_path.name}"
session_id = f"ses-{data_dir.name}"
raw_nwbfile_path = output_dir_path / f"{session_id}_raw.nwb"
processed_nwbfile_path = output_dir_path / f"{session_id}_processed.nwb"
logging.info(f'raw_nwbfile_path = {raw_nwbfile_path}')
logging.info(f'processed_nwbfile_path = {processed_nwbfile_path}')

raw_source_data = dict()
raw_conversion_options = dict()
Expand All @@ -31,107 +58,134 @@ def session_to_nwb(data_dir_path: Union[str, Path], output_dir_path: Union[str,

for probe_num in range(2):
# Add V-Probe Recording
if not (data_dir_path / "raw_data" / f"v_probe_{probe_num}").exists():
probe_data_dir = (data_dir / "raw_data" / f"v_probe_{probe_num}")
if not probe_data_dir.exists():
continue
recording_files = list(glob.glob(str(data_dir_path / "raw_data" / f"v_probe_{probe_num}" / "*.dat")))
assert len(recording_files) > 0, f"No .dat files found in {data_dir_path}"
assert len(recording_files) == 1, f"Multiple .dat files found in {data_dir_path}"
logging.info(f'\nAdding V-probe {probe_num} recording')

logging.info(' Raw data')
recording_file = _get_single_file(probe_data_dir, suffix='.dat')
recording_source_data = {
f"RecordingVP{probe_num}": dict(
file_path=str(recording_files[0]),
probe_metadata_file=str(data_dir_path / "data_open_source" / "probes.metadata.json"),
probe_key=f"probe{(probe_num+1):02d}",
file_path=recording_file,
probe_metadata_file=str(
data_dir / "data_open_source" /
"probes.metadata.json"
),
probe_key=f"probe{(probe_num + 1):02d}",
probe_name=f"vprobe{probe_num}",
es_key=f"ElectricalSeriesVP{probe_num}",
)
}
raw_source_data.update(recording_source_data)
processed_source_data.update(recording_source_data)
raw_conversion_options.update({f"RecordingVP{probe_num}": dict(stub_test=stub_test)})
processed_conversion_options.update(
{f"RecordingVP{probe_num}": dict(stub_test=stub_test, write_electrical_series=False)}
)
raw_conversion_options.update(
{f"RecordingVP{probe_num}": dict(stub_test=stub_test)})
processed_conversion_options.update({
f"RecordingVP{probe_num}": dict(
stub_test=stub_test, write_electrical_series=False)
})

# Add V-Probe Sorting
logging.info(' Spike sorted data')
processed_source_data.update(
{
f"SortingVP{probe_num}": dict(
folder_path=str(data_dir_path / "spike_sorting_raw" / f"v_probe_{probe_num}"),
folder_path=str(
data_dir / "spike_sorting_raw" / f"v_probe_{probe_num}"
),
keep_good_only=False,
)
}
)
processed_conversion_options.update({f"SortingVP{probe_num}": dict(stub_test=stub_test, write_as="processing")})

# Add Recording
recording_files = list(glob.glob(str(data_dir_path / "raw_data" / "spikeglx" / "*" / "*" / "*.ap.bin")))
assert len(recording_files) > 0, f"No .ap.bin files found in {data_dir_path}"
assert len(recording_files) == 1, f"Multiple .ap.bin files found in {data_dir_path}"
raw_source_data.update(dict(RecordingNP=dict(file_path=str(recording_files[0]))))
processed_source_data.update(dict(RecordingNP=dict(file_path=str(recording_files[0]))))
processed_conversion_options.update({
f"SortingVP{probe_num}": dict(
stub_test=stub_test, write_as="processing")
})

# Add SpikeGLX Recording
logging.info('Adding SpikeGLX recordings')
logging.info(' AP data')
probe_data_dir = (data_dir / "raw_data" / "spikeglx" / "*" / "*")
ap_file = _get_single_file(probe_data_dir, suffix='.ap.bin')
raw_source_data.update(dict(RecordingNP=dict(file_path=ap_file)))
processed_source_data.update(dict(RecordingNP=dict(file_path=ap_file)))
raw_conversion_options.update(dict(RecordingNP=dict(stub_test=stub_test)))
processed_conversion_options.update(dict(RecordingNP=dict(stub_test=stub_test, write_electrical_series=False)))
processed_conversion_options.update(dict(
RecordingNP=dict(stub_test=stub_test, write_electrical_series=False)))

# Add LFP
lfp_files = list(glob.glob(str(data_dir_path / "raw_data" / "spikeglx" / "*" / "*" / "*.lf.bin")))
assert len(lfp_files) > 0, f"No .lf.bin files found in {data_dir_path}"
assert len(lfp_files) == 1, f"Multiple .lf.bin files found in {data_dir_path}"
raw_source_data.update(dict(LF=dict(file_path=str(lfp_files[0]))))
processed_source_data.update(dict(LF=dict(file_path=str(lfp_files[0]))))
logging.info(' LFP data')
lfp_file = _get_single_file(probe_data_dir, suffix='.lf.bin')
raw_source_data.update(dict(LF=dict(file_path=lfp_file)))
processed_source_data.update(dict(LF=dict(file_path=lfp_file)))
raw_conversion_options.update(dict(LF=dict(stub_test=stub_test)))
processed_conversion_options.update(dict(LF=dict(stub_test=stub_test, write_electrical_series=False)))
processed_conversion_options.update(
dict(LF=dict(stub_test=stub_test, write_electrical_series=False)))

# Add Sorting
logging.info(' Spike sorted data')
processed_source_data.update(
dict(
SortingNP=dict(
folder_path=str(data_dir_path / "spike_sorting_raw" / "np"),
folder_path=str(data_dir / "spike_sorting_raw" / "np"),
keep_good_only=False,
)
)
)
processed_conversion_options.update(dict(SortingNP=dict(stub_test=stub_test, write_as="processing")))
processed_conversion_options.update(
dict(SortingNP=dict(stub_test=stub_test, write_as="processing")))

# Add Behavior
logging.info('Adding behavior')
behavior_path = str(data_dir / "data_open_source" / "behavior")
processed_source_data.update(
dict(EyePosition=dict(folder_path=str(data_dir_path / "data_open_source" / "behavior")))
dict(EyePosition=dict(folder_path=behavior_path))
)
processed_conversion_options.update(dict(EyePosition=dict()))

processed_source_data.update(dict(PupilSize=dict(folder_path=str(data_dir_path / "data_open_source" / "behavior"))))
processed_source_data.update(
dict(PupilSize=dict(folder_path=behavior_path)))
processed_conversion_options.update(dict(PupilSize=dict()))

# Add Trials
processed_source_data.update(dict(Trials=dict(folder_path=str(data_dir_path / "data_open_source"))))
logging.info('Adding task data')
processed_source_data.update(
dict(Trials=dict(folder_path=str(data_dir / "data_open_source"))))
processed_conversion_options.update(dict(Trials=dict()))

processed_converter = WattersNWBConverter(
source_data=processed_source_data, sync_dir=str(data_dir_path / "sync_pulses")
source_data=processed_source_data,
sync_dir=str(data_dir / "sync_pulses")
)

# Add datetime to conversion
metadata = processed_converter.get_metadata() # use processed b/c it has everything
metadata = processed_converter.get_metadata()
metadata["NWBFile"]["session_id"] = session_id

# Subject name
if "monkey0" in str(data_dir_path):
metadata["Subject"]["subject_id"] = "Perle"
elif "monkey1" in str(data_dir_path):
metadata["Subject"]["subject_id"] = "Elgar"
if "monkey0" in str(data_dir):
metadata["Subject"]["subject_id"] = "P"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI this will be a value that shows up in the final filename on DANDI - I highly recommend making it the full name or some other easily identifiable/communicable string

elif "monkey1" in str(data_dir):
metadata["Subject"]["subject_id"] = "E"

# EcePhys
probe_metadata_file = data_dir_path / "data_open_source" / "probes.metadata.json"
probe_metadata_file = (
data_dir / "data_open_source" / "probes.metadata.json")
with open(probe_metadata_file, "r") as f:
probe_metadata = json.load(f)
neuropixel_metadata = [entry for entry in probe_metadata if entry["label"] == "probe00"][0]
neuropixel_metadata = [
entry for entry in probe_metadata if entry["label"] == "probe00"
][0]
for entry in metadata["Ecephys"]["ElectrodeGroup"]:
if entry["device"] == "Neuropixel-Imec":
# TODO: uncomment when fixed in pynwb
# entry.update(dict(position=[(
# neuropixel_metadata["coordinates"][0],
# neuropixel_metadata["coordinates"][1],
# neuropixel_metadata["depth_from_surface"],
# )]
pass # TODO: uncomment when fixed in pynwb
logging.warning('\n\n PROBE COORDINATES NOT IMPLEMENTED\n\n')

# Update default metadata with the editable in the corresponding yaml file
editable_metadata_path = Path(__file__).parent / "watters_metadata.yaml"
Expand All @@ -141,35 +195,47 @@ def session_to_nwb(data_dir_path: Union[str, Path], output_dir_path: Union[str,
# check if session_start_time was found/set
if "session_start_time" not in metadata["NWBFile"]:
try:
date = datetime.datetime.strptime(data_dir_path.name, "%Y-%m-%d").replace(tzinfo=ZoneInfo("US/Eastern"))
date = datetime.datetime.strptime(data_dir.name, "%Y-%m-%d")
date = date.replace(tzinfo=ZoneInfo("US/Eastern"))
except:
raise AssertionError(
"Session start time was not auto-detected. Please provide it in `watters_metadata.yaml`"
raise ValueError(
'Session start time was not auto-detected. Please provide it '
'in `watters_metadata.yaml`'
)
metadata["NWBFile"]["session_start_time"] = date

# Run conversion
logging.info('Running processed conversion')
processed_converter.run_conversion(
metadata=metadata, nwbfile_path=processed_nwbfile_path, conversion_options=processed_conversion_options
metadata=metadata, nwbfile_path=processed_nwbfile_path,
conversion_options=processed_conversion_options
)

logging.info('Running raw data conversion')
metadata["NWBFile"]["identifier"] = str(uuid4())
raw_converter = WattersNWBConverter(source_data=raw_source_data, sync_dir=str(data_dir_path / "sync_pulses"))
raw_converter = WattersNWBConverter(
source_data=raw_source_data, sync_dir=str(data_dir / "sync_pulses")
)
raw_converter.run_conversion(
metadata=metadata, nwbfile_path=raw_nwbfile_path, conversion_options=raw_conversion_options
metadata=metadata, nwbfile_path=raw_nwbfile_path,
conversion_options=raw_conversion_options,
)


if __name__ == "__main__":

# Parameters for conversion
data_dir_path = Path("/shared/catalystneuro/JazLab/monkey0/2022-06-01/")
# data_dir_path = Path("/shared/catalystneuro/JazLab/monkey1/2022-06-05/")
output_dir_path = Path("~/conversion_nwb/jazayeri-lab-to-nwb/watters_perle_combined/").expanduser()
data_dir = Path(
'/om2/user/nwatters/catalystneuro/initial_data_transfer/'
'monkey0/2022-06-01/'
)
output_dir_path = Path(
'/om/user/nwatters/nwb_data/watters_perle_combined/'
)
stub_test = True

session_to_nwb(
data_dir_path=data_dir_path,
data_dir=data_dir,
output_dir_path=output_dir_path,
stub_test=stub_test,
)
1 change: 0 additions & 1 deletion src/jazayeri_lab_to_nwb/watters/watters_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
nwb-conversion-tools==0.11.1 # Example of specific pinned dependecy
some-extra-package==1.11.3 # Example of another extra package that's necessary for the current conversion
roiextractors @ git+https://github.com/catalystneuro/roiextractors.git@8db5f9cb3a7ee5efee49b7fd0b694c7a8105519a # Github pinned dependency
12 changes: 6 additions & 6 deletions src/jazayeri_lab_to_nwb/watters/wattersnwbconverter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Primary NWBConverter class for this dataset."""
import json
import logging
import numpy as np
from typing import Optional
from pathlib import Path
Expand All @@ -18,12 +19,9 @@
from spikeinterface.core.waveform_tools import has_exceeding_spikes
from spikeinterface.curation import remove_excess_spikes

from jazayeri_lab_to_nwb.watters import (
WattersDatRecordingInterface,
WattersEyePositionInterface,
WattersPupilSizeInterface,
WattersTrialsInterface,
)
from wattersbehaviorinterface import WattersEyePositionInterface, WattersPupilSizeInterface
from watterstrialsinterface import WattersTrialsInterface
from wattersrecordinginterface import WattersDatRecordingInterface


class WattersNWBConverter(NWBConverter):
Expand Down Expand Up @@ -62,6 +60,8 @@ def __init__(
unit_name_start += np.max(unit_ids) + 1

def temporally_align_data_interfaces(self):
logging.info('Temporally aligning data interfaces')

if self.sync_dir is None:
return
sync_dir = Path(self.sync_dir)
Expand Down
Loading