diff --git a/pyproject.toml b/pyproject.toml index 151d7ac..0dfbeea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -86,6 +86,7 @@ ignore = [ "S101", "ICN001", "INP001", + "E501", ] fixable = ["ALL"] diff --git a/src/ibl_to_nwb/_scripts/convert_brainwide_map.py b/src/ibl_to_nwb/_scripts/convert_brainwide_map.py index 7618fc6..0250f5f 100644 --- a/src/ibl_to_nwb/_scripts/convert_brainwide_map.py +++ b/src/ibl_to_nwb/_scripts/convert_brainwide_map.py @@ -1,228 +1,228 @@ -import os - -os.environ["JUPYTER_PLATFORM_DIRS"] = "1" # Annoying - -import os -from pathlib import Path -from shutil import rmtree - -# from neuroconv.tools.data_transfers import automatic_dandi_upload as neuroconv_automatic_dandi_upload -from one.api import ONE - -from src.ibl_to_nwb.brainwide_map import BrainwideMapConverter -from src.ibl_to_nwb.brainwide_map.datainterfaces import ( - BrainwideMapTrialsInterface, -) -from src.ibl_to_nwb.datainterfaces import ( - IblPoseEstimationInterface, - IblSortingInterface, - IblStreamingApInterface, - IblStreamingLfInterface, - LickInterface, - PupilTrackingInterface, - RoiMotionEnergyInterface, - WheelInterface, -) - -# def automatic_dandi_upload( -# dandiset_id: str, -# nwb_folder_path: str, -# dandiset_folder_path: str = None, -# version: str = "draft", -# files_mode: str = "move", -# staging: bool = False, -# cleanup: bool = False, -# ): -# """ -# Fully automated upload of NWBFiles to a DANDISet. -# -# Requires an API token set as an envrinment variable named DANDI_API_KEY. -# -# To set this in your bash terminal in Linux or MacOS, run -# export DANDI_API_KEY=... -# or in Windows -# set DANDI_API_KEY=... -# -# DO NOT STORE THIS IN ANY PUBLICLY SHARED CODE. -# -# Parameters -# ---------- -# dandiset_id : str -# Six-digit string identifier for the DANDISet the NWBFiles will be uploaded to. -# nwb_folder_path : folder path -# Folder containing the NWBFiles to be uploaded. -# dandiset_folder_path : folder path, optional -# A separate folder location within which to download the dandiset. -# Used in cases where you do not have write permissions for the parent of the 'nwb_folder_path' directory. -# Default behavior downloads the DANDISet to a folder adjacent to the 'nwb_folder_path'. -# version : {None, "draft", "version"} -# The default is "draft". -# staging : bool, default: False -# Is the DANDISet hosted on the staging server? This is mostly for testing purposes. -# The default is False. -# cleanup : bool, default: False -# Whether to remove the dandiset folder path and nwb_folder_path. -# Defaults to False. -# """ -# nwb_folder_path = Path(nwb_folder_path) -# dandiset_folder_path = ( -# Path(mkdtemp(dir=nwb_folder_path.parent)) if dandiset_folder_path is None else dandiset_folder_path +# import os +# +# os.environ["JUPYTER_PLATFORM_DIRS"] = "1" # Annoying +# +# import os +# from pathlib import Path +# from shutil import rmtree +# +# # from neuroconv.tools.data_transfers import automatic_dandi_upload as neuroconv_automatic_dandi_upload +# from one.api import ONE +# +# from ibl_to_nwb.brainwide_map import BrainwideMapConverter +# from ibl_to_nwb.brainwide_map.datainterfaces import ( +# BrainwideMapTrialsInterface, +# ) +# from ibl_to_nwb.datainterfaces import ( +# IblPoseEstimationInterface, +# IblSortingInterface, +# IblStreamingApInterface, +# IblStreamingLfInterface, +# LickInterface, +# PupilTrackingInterface, +# RoiMotionEnergyInterface, +# WheelInterface, +# ) +# +# # def automatic_dandi_upload( +# # dandiset_id: str, +# # nwb_folder_path: str, +# # dandiset_folder_path: str = None, +# # version: str = "draft", +# # files_mode: str = "move", +# # staging: bool = False, +# # cleanup: bool = False, +# # ): +# # """ +# # Fully automated upload of NWBFiles to a DANDISet. +# # +# # Requires an API token set as an envrinment variable named DANDI_API_KEY. +# # +# # To set this in your bash terminal in Linux or MacOS, run +# # export DANDI_API_KEY=... +# # or in Windows +# # set DANDI_API_KEY=... +# # +# # DO NOT STORE THIS IN ANY PUBLICLY SHARED CODE. +# # +# # Parameters +# # ---------- +# # dandiset_id : str +# # Six-digit string identifier for the DANDISet the NWBFiles will be uploaded to. +# # nwb_folder_path : folder path +# # Folder containing the NWBFiles to be uploaded. +# # dandiset_folder_path : folder path, optional +# # A separate folder location within which to download the dandiset. +# # Used in cases where you do not have write permissions for the parent of the 'nwb_folder_path' directory. +# # Default behavior downloads the DANDISet to a folder adjacent to the 'nwb_folder_path'. +# # version : {None, "draft", "version"} +# # The default is "draft". +# # staging : bool, default: False +# # Is the DANDISet hosted on the staging server? This is mostly for testing purposes. +# # The default is False. +# # cleanup : bool, default: False +# # Whether to remove the dandiset folder path and nwb_folder_path. +# # Defaults to False. +# # """ +# # nwb_folder_path = Path(nwb_folder_path) +# # dandiset_folder_path = ( +# # Path(mkdtemp(dir=nwb_folder_path.parent)) if dandiset_folder_path is None else dandiset_folder_path +# # ) +# # dandiset_path = dandiset_folder_path / dandiset_id +# # assert os.getenv("DANDI_API_KEY"), ( +# # "Unable to find environment variable 'DANDI_API_KEY'. " +# # "Please retrieve your token from DANDI and set this environment variable." +# # ) +# # +# # url_base = "https://gui-staging.dandiarchive.org" if staging else "https://dandiarchive.org" +# # dandiset_url = f"{url_base}/dandiset/{dandiset_id}/{version}" +# # dandi_download(urls=dandiset_url, output_dir=str(dandiset_folder_path), get_metadata=True, get_assets=False) +# # assert dandiset_path.exists(), "DANDI download failed!" +# # +# # dandi_organize( +# # paths=str(nwb_folder_path), +# # dandiset_path=str(dandiset_path), +# # update_external_file_paths=True, +# # files_mode=files_mode, +# # media_files_mode=files_mode, +# # ) +# # organized_nwbfiles = dandiset_path.rglob("*.nwb") +# # +# # # DANDI has yet to implement forcing of session_id inclusion in organize step +# # # This manually enforces it when only a single session per subject is organized +# # for organized_nwbfile in organized_nwbfiles: +# # if "ses" not in organized_nwbfile.stem: +# # with NWBHDF5IO(path=organized_nwbfile, mode="r", load_namespaces=True) as io: +# # nwbfile = io.read() +# # session_id = nwbfile.session_id +# # dandi_stem = organized_nwbfile.stem +# # dandi_stem_split = dandi_stem.split("_") +# # dandi_stem_split.insert(1, f"ses-{session_id}") +# # corrected_name = "_".join(dandi_stem_split) + ".nwb" +# # organized_nwbfile.rename(organized_nwbfile.parent / corrected_name) +# # organized_nwbfiles = list(dandiset_path.rglob("*.nwb")) +# # # The above block can be removed once they add the feature +# # +# # # If any external images +# # image_folders = set(dandiset_path.rglob("*image*")) - set(organized_nwbfiles) +# # for image_folder in image_folders: +# # if "ses" not in image_folder.stem and len(organized_nwbfiles) == 1: # Think about multiple file case +# # corrected_name = "_".join(dandi_stem_split) +# # image_folder = image_folder.rename(image_folder.parent / corrected_name) +# # +# # # For image in folder, rename +# # with NWBHDF5IO(path=organized_nwbfiles[0], mode="r+", load_namespaces=True) as io: +# # nwbfile = io.read() +# # for _, object in nwbfile.objects.items(): +# # if isinstance(object, ImageSeries): +# # this_external_file = image_folder / Path(str(object.external_file[0])).name +# # corrected_name = "_".join(dandi_stem_split[:2]) + f"_{object.name}{this_external_file.suffix}" +# # this_external_file = this_external_file.rename(this_external_file.parent / corrected_name) +# # object.external_file[0] = "./" + str(this_external_file.relative_to(organized_nwbfile.parent)) +# # +# # assert len(list(dandiset_path.iterdir())) > 1, "DANDI organize failed!" +# # +# # dandi_instance = "dandi-staging" if staging else "dandi" +# # dandi_upload(paths=[dandiset_folder_path / dandiset_id], dandi_instance=dandi_instance) +# # +# # # Cleanup should be confirmed manually; Windows especially can complain +# # if cleanup: +# # try: +# # rmtree(path=dandiset_folder_path) +# # except PermissionError: # pragma: no cover +# # warn("Unable to clean up source files and dandiset! Please manually delete them.", stacklevel=2) +# +# +# base_path = Path("/home/jovyan/IBL") # prototype on DANDI Hub for now +# +# # session_retrieval_one = ONE( +# # base_url="https://openalyx.internationalbrainlab.org", password="international", silent=True +# # ) +# # brain_wide_sessions = session_retrieval_one.alyx.rest(url="sessions", action="list", tag="2022_Q4_IBL_et_al_BWM") +# +# # session = session_info["id"] +# session = "3e7ae7c0-fe8b-487c-9354-036236fa1010" +# +# nwbfile_path = base_path / "nwbfiles" / session / f"{session}.nwb" +# nwbfile_path.parent.mkdir(exist_ok=True) +# +# stub_test = False +# cleanup = False +# files_mode = "move" +# +# assert len(os.environ.get("DANDI_API_KEY", "")) > 0, "Run `export DANDI_API_KEY=...`!" +# +# # Download behavior and spike sorted data for this session +# session_path = base_path / "ibl_conversion" / session +# cache_folder = base_path / "ibl_conversion" / session / "cache" +# session_one = ONE( +# base_url="https://openalyx.internationalbrainlab.org", +# password="international", +# silent=True, +# cache_dir=cache_folder, +# ) +# +# # Get stream names from SI +# ap_stream_names = IblStreamingApInterface.get_stream_names(session=session) +# lf_stream_names = IblStreamingLfInterface.get_stream_names(session=session) +# +# # Initialize as many of each interface as we need across the streams +# data_interfaces = list() +# for stream_name in ap_stream_names: +# data_interfaces.append( +# IblStreamingApInterface(session=session, stream_name=stream_name, cache_folder=cache_folder / "ap_recordings") # ) -# dandiset_path = dandiset_folder_path / dandiset_id -# assert os.getenv("DANDI_API_KEY"), ( -# "Unable to find environment variable 'DANDI_API_KEY'. " -# "Please retrieve your token from DANDI and set this environment variable." +# for stream_name in lf_stream_names: +# data_interfaces.append( +# IblStreamingLfInterface(session=session, stream_name=stream_name, cache_folder=cache_folder / "lf_recordings") # ) # -# url_base = "https://gui-staging.dandiarchive.org" if staging else "https://dandiarchive.org" -# dandiset_url = f"{url_base}/dandiset/{dandiset_id}/{version}" -# dandi_download(urls=dandiset_url, output_dir=str(dandiset_folder_path), get_metadata=True, get_assets=False) -# assert dandiset_path.exists(), "DANDI download failed!" -# -# dandi_organize( -# paths=str(nwb_folder_path), -# dandiset_path=str(dandiset_path), -# update_external_file_paths=True, -# files_mode=files_mode, -# media_files_mode=files_mode, +# # These interfaces should always be present in source data +# data_interfaces.append(IblSortingInterface(session=session, cache_folder=cache_folder / "sorting")) +# data_interfaces.append(BrainwideMapTrialsInterface(one=session_one, session=session)) +# data_interfaces.append(WheelInterface(one=session_one, session=session)) +# +# # These interfaces may not be present; check if they are before adding to list +# pose_estimation_files = session_one.list_datasets(eid=session, filename="*.dlc*") +# for pose_estimation_file in pose_estimation_files: +# camera_name = pose_estimation_file.replace("alf/_ibl_", "").replace(".dlc.pqt", "") +# data_interfaces.append( +# IblPoseEstimationInterface(one=session_one, session=session, camera_name=camera_name, include_video=True) # ) -# organized_nwbfiles = dandiset_path.rglob("*.nwb") -# -# # DANDI has yet to implement forcing of session_id inclusion in organize step -# # This manually enforces it when only a single session per subject is organized -# for organized_nwbfile in organized_nwbfiles: -# if "ses" not in organized_nwbfile.stem: -# with NWBHDF5IO(path=organized_nwbfile, mode="r", load_namespaces=True) as io: -# nwbfile = io.read() -# session_id = nwbfile.session_id -# dandi_stem = organized_nwbfile.stem -# dandi_stem_split = dandi_stem.split("_") -# dandi_stem_split.insert(1, f"ses-{session_id}") -# corrected_name = "_".join(dandi_stem_split) + ".nwb" -# organized_nwbfile.rename(organized_nwbfile.parent / corrected_name) -# organized_nwbfiles = list(dandiset_path.rglob("*.nwb")) -# # The above block can be removed once they add the feature -# -# # If any external images -# image_folders = set(dandiset_path.rglob("*image*")) - set(organized_nwbfiles) -# for image_folder in image_folders: -# if "ses" not in image_folder.stem and len(organized_nwbfiles) == 1: # Think about multiple file case -# corrected_name = "_".join(dandi_stem_split) -# image_folder = image_folder.rename(image_folder.parent / corrected_name) -# -# # For image in folder, rename -# with NWBHDF5IO(path=organized_nwbfiles[0], mode="r+", load_namespaces=True) as io: -# nwbfile = io.read() -# for _, object in nwbfile.objects.items(): -# if isinstance(object, ImageSeries): -# this_external_file = image_folder / Path(str(object.external_file[0])).name -# corrected_name = "_".join(dandi_stem_split[:2]) + f"_{object.name}{this_external_file.suffix}" -# this_external_file = this_external_file.rename(this_external_file.parent / corrected_name) -# object.external_file[0] = "./" + str(this_external_file.relative_to(organized_nwbfile.parent)) -# -# assert len(list(dandiset_path.iterdir())) > 1, "DANDI organize failed!" -# -# dandi_instance = "dandi-staging" if staging else "dandi" -# dandi_upload(paths=[dandiset_folder_path / dandiset_id], dandi_instance=dandi_instance) -# -# # Cleanup should be confirmed manually; Windows especially can complain -# if cleanup: -# try: -# rmtree(path=dandiset_folder_path) -# except PermissionError: # pragma: no cover -# warn("Unable to clean up source files and dandiset! Please manually delete them.", stacklevel=2) - - -base_path = Path("/home/jovyan/IBL") # prototype on DANDI Hub for now - -# session_retrieval_one = ONE( -# base_url="https://openalyx.internationalbrainlab.org", password="international", silent=True +# +# pupil_tracking_files = session_one.list_datasets(eid=session, filename="*features*") +# for pupil_tracking_file in pupil_tracking_files: +# camera_name = pupil_tracking_file.replace("alf/_ibl_", "").replace(".features.pqt", "") +# data_interfaces.append(PupilTrackingInterface(one=session_one, session=session, camera_name=camera_name)) +# +# roi_motion_energy_files = session_one.list_datasets(eid=session, filename="*ROIMotionEnergy.npy*") +# for roi_motion_energy_file in roi_motion_energy_files: +# camera_name = roi_motion_energy_file.replace("alf/", "").replace(".ROIMotionEnergy.npy", "") +# data_interfaces.append(RoiMotionEnergyInterface(one=session_one, session=session, camera_name=camera_name)) +# +# if session_one.list_datasets(eid=session, collection="alf", filename="licks*"): +# data_interfaces.append(LickInterface(one=session_one, session=session)) +# +# # Run conversion +# session_converter = BrainwideMapConverter( +# one=session_one, session=session, data_interfaces=data_interfaces, verbose=False # ) -# brain_wide_sessions = session_retrieval_one.alyx.rest(url="sessions", action="list", tag="2022_Q4_IBL_et_al_BWM") - -# session = session_info["id"] -session = "3e7ae7c0-fe8b-487c-9354-036236fa1010" - -nwbfile_path = base_path / "nwbfiles" / session / f"{session}.nwb" -nwbfile_path.parent.mkdir(exist_ok=True) - -stub_test = False -cleanup = False -files_mode = "move" - -assert len(os.environ.get("DANDI_API_KEY", "")) > 0, "Run `export DANDI_API_KEY=...`!" - -# Download behavior and spike sorted data for this session -session_path = base_path / "ibl_conversion" / session -cache_folder = base_path / "ibl_conversion" / session / "cache" -session_one = ONE( - base_url="https://openalyx.internationalbrainlab.org", - password="international", - silent=True, - cache_dir=cache_folder, -) - -# Get stream names from SI -ap_stream_names = IblStreamingApInterface.get_stream_names(session=session) -lf_stream_names = IblStreamingLfInterface.get_stream_names(session=session) - -# Initialize as many of each interface as we need across the streams -data_interfaces = list() -for stream_name in ap_stream_names: - data_interfaces.append( - IblStreamingApInterface(session=session, stream_name=stream_name, cache_folder=cache_folder / "ap_recordings") - ) -for stream_name in lf_stream_names: - data_interfaces.append( - IblStreamingLfInterface(session=session, stream_name=stream_name, cache_folder=cache_folder / "lf_recordings") - ) - -# These interfaces should always be present in source data -data_interfaces.append(IblSortingInterface(session=session, cache_folder=cache_folder / "sorting")) -data_interfaces.append(BrainwideMapTrialsInterface(one=session_one, session=session)) -data_interfaces.append(WheelInterface(one=session_one, session=session)) - -# These interfaces may not be present; check if they are before adding to list -pose_estimation_files = session_one.list_datasets(eid=session, filename="*.dlc*") -for pose_estimation_file in pose_estimation_files: - camera_name = pose_estimation_file.replace("alf/_ibl_", "").replace(".dlc.pqt", "") - data_interfaces.append( - IblPoseEstimationInterface(one=session_one, session=session, camera_name=camera_name, include_video=True) - ) - -pupil_tracking_files = session_one.list_datasets(eid=session, filename="*features*") -for pupil_tracking_file in pupil_tracking_files: - camera_name = pupil_tracking_file.replace("alf/_ibl_", "").replace(".features.pqt", "") - data_interfaces.append(PupilTrackingInterface(one=session_one, session=session, camera_name=camera_name)) - -roi_motion_energy_files = session_one.list_datasets(eid=session, filename="*ROIMotionEnergy.npy*") -for roi_motion_energy_file in roi_motion_energy_files: - camera_name = roi_motion_energy_file.replace("alf/", "").replace(".ROIMotionEnergy.npy", "") - data_interfaces.append(RoiMotionEnergyInterface(one=session_one, session=session, camera_name=camera_name)) - -if session_one.list_datasets(eid=session, collection="alf", filename="licks*"): - data_interfaces.append(LickInterface(one=session_one, session=session)) - -# Run conversion -session_converter = BrainwideMapConverter( - one=session_one, session=session, data_interfaces=data_interfaces, verbose=False -) - -conversion_options = dict() -if stub_test: - for data_interface_name in session_converter.data_interface_objects: - if "Ap" in data_interface_name or "Lf" in data_interface_name: - conversion_options.update({data_interface_name: dict(stub_test=True)}) - -session_converter.run_conversion( - nwbfile_path=nwbfile_path, - metadata=session_converter.get_metadata(), - conversion_options=conversion_options, - overwrite=True, -) -# automatic_dandi_upload( -# dandiset_id="000409", nwb_folder_path=nwbfile_path.parent, cleanup=cleanup, files_mode=files_mode +# +# conversion_options = dict() +# if stub_test: +# for data_interface_name in session_converter.data_interface_objects: +# if "Ap" in data_interface_name or "Lf" in data_interface_name: +# conversion_options.update({data_interface_name: dict(stub_test=True)}) +# +# session_converter.run_conversion( +# nwbfile_path=nwbfile_path, +# metadata=session_converter.get_metadata(), +# conversion_options=conversion_options, +# overwrite=True, # ) -if cleanup: - rmtree(cache_folder) - rmtree(nwbfile_path.parent) +# # automatic_dandi_upload( +# # dandiset_id="000409", nwb_folder_path=nwbfile_path.parent, cleanup=cleanup, files_mode=files_mode +# # ) +# if cleanup: +# rmtree(cache_folder) +# rmtree(nwbfile_path.parent) diff --git a/src/ibl_to_nwb/_scripts/convert_brainwide_map_parallel.py b/src/ibl_to_nwb/_scripts/convert_brainwide_map_parallel.py index cd68a3f..20fdb47 100644 --- a/src/ibl_to_nwb/_scripts/convert_brainwide_map_parallel.py +++ b/src/ibl_to_nwb/_scripts/convert_brainwide_map_parallel.py @@ -1,286 +1,286 @@ -import os - -os.environ["JUPYTER_PLATFORM_DIRS"] = "1" # Annoying - -import os -import traceback -from concurrent.futures import ProcessPoolExecutor, as_completed -from pathlib import Path -from shutil import rmtree - -from one.api import ONE -from tqdm import tqdm - -from src.ibl_to_nwb.brainwide_map import BrainwideMapConverter -from src.ibl_to_nwb.brainwide_map.datainterfaces import ( - BrainwideMapTrialsInterface, -) -from src.ibl_to_nwb.datainterfaces import ( - IblPoseEstimationInterface, - IblSortingInterface, - IblStreamingApInterface, - IblStreamingLfInterface, - LickInterface, - PupilTrackingInterface, - RoiMotionEnergyInterface, - WheelInterface, -) - -# def automatic_dandi_upload( -# dandiset_id: str, -# nwb_folder_path: str, -# dandiset_folder_path: str = None, -# version: str = "draft", -# files_mode: str = "move", -# staging: bool = False, +# import os +# +# os.environ["JUPYTER_PLATFORM_DIRS"] = "1" # Annoying +# +# import os +# import traceback +# from concurrent.futures import ProcessPoolExecutor, as_completed +# from pathlib import Path +# from shutil import rmtree +# +# from one.api import ONE +# from tqdm import tqdm +# +# from ibl_to_nwb.brainwide_map import BrainwideMapConverter +# from ibl_to_nwb.brainwide_map.datainterfaces import ( +# BrainwideMapTrialsInterface, +# ) +# from ibl_to_nwb.datainterfaces import ( +# IblPoseEstimationInterface, +# IblSortingInterface, +# IblStreamingApInterface, +# IblStreamingLfInterface, +# LickInterface, +# PupilTrackingInterface, +# RoiMotionEnergyInterface, +# WheelInterface, +# ) +# +# # def automatic_dandi_upload( +# # dandiset_id: str, +# # nwb_folder_path: str, +# # dandiset_folder_path: str = None, +# # version: str = "draft", +# # files_mode: str = "move", +# # staging: bool = False, +# # cleanup: bool = False, +# # ): +# # """ +# # Fully automated upload of NWBFiles to a DANDISet. +# # +# # Requires an API token set as an envrinment variable named DANDI_API_KEY. +# # +# # To set this in your bash terminal in Linux or MacOS, run +# # export DANDI_API_KEY=... +# # or in Windows +# # set DANDI_API_KEY=... +# # +# # DO NOT STORE THIS IN ANY PUBLICLY SHARED CODE. +# # +# # Parameters +# # ---------- +# # dandiset_id : str +# # Six-digit string identifier for the DANDISet the NWBFiles will be uploaded to. +# # nwb_folder_path : folder path +# # Folder containing the NWBFiles to be uploaded. +# # dandiset_folder_path : folder path, optional +# # A separate folder location within which to download the dandiset. +# # Used in cases where you do not have write permissions for the parent of the 'nwb_folder_path' directory. +# # Default behavior downloads the DANDISet to a folder adjacent to the 'nwb_folder_path'. +# # version : {None, "draft", "version"} +# # The default is "draft". +# # staging : bool, default: False +# # Is the DANDISet hosted on the staging server? This is mostly for testing purposes. +# # The default is False. +# # cleanup : bool, default: False +# # Whether to remove the dandiset folder path and nwb_folder_path. +# # Defaults to False. +# # """ +# # nwb_folder_path = Path(nwb_folder_path) +# # dandiset_folder_path = ( +# # Path(mkdtemp(dir=nwb_folder_path.parent)) if dandiset_folder_path is None else dandiset_folder_path +# # ) +# # dandiset_path = dandiset_folder_path / dandiset_id +# # assert os.getenv("DANDI_API_KEY"), ( +# # "Unable to find environment variable 'DANDI_API_KEY'. " +# # "Please retrieve your token from DANDI and set this environment variable." +# # ) +# # +# # url_base = "https://gui-staging.dandiarchive.org" if staging else "https://dandiarchive.org" +# # dandiset_url = f"{url_base}/dandiset/{dandiset_id}/{version}" +# # dandi_download(urls=dandiset_url, output_dir=str(dandiset_folder_path), get_metadata=True, get_assets=False) +# # assert dandiset_path.exists(), "DANDI download failed!" +# # +# # dandi_organize( +# # paths=str(nwb_folder_path), +# # dandiset_path=str(dandiset_path), +# # update_external_file_paths=True, +# # files_mode=files_mode, +# # media_files_mode=files_mode, +# # ) +# # organized_nwbfiles = dandiset_path.rglob("*.nwb") +# # +# # # DANDI has yet to implement forcing of session_id inclusion in organize step +# # # This manually enforces it when only a single session per subject is organized +# # for organized_nwbfile in organized_nwbfiles: +# # if "ses" not in organized_nwbfile.stem: +# # with NWBHDF5IO(path=organized_nwbfile, mode="r", load_namespaces=True) as io: +# # nwbfile = io.read() +# # session_id = nwbfile.session_id +# # dandi_stem = organized_nwbfile.stem +# # dandi_stem_split = dandi_stem.split("_") +# # dandi_stem_split.insert(1, f"ses-{session_id}") +# # corrected_name = "_".join(dandi_stem_split) + ".nwb" +# # organized_nwbfile.rename(organized_nwbfile.parent / corrected_name) +# # organized_nwbfiles = list(dandiset_path.rglob("*.nwb")) +# # # The above block can be removed once they add the feature +# # +# # # If any external images +# # image_folders = set(dandiset_path.rglob("*image*")) - set(organized_nwbfiles) +# # for image_folder in image_folders: +# # if "ses" not in image_folder.stem and len(organized_nwbfiles) == 1: # Think about multiple file case +# # corrected_name = "_".join(dandi_stem_split) +# # image_folder = image_folder.rename(image_folder.parent / corrected_name) +# # +# # # For image in folder, rename +# # with NWBHDF5IO(path=organized_nwbfiles[0], mode="r+", load_namespaces=True) as io: +# # nwbfile = io.read() +# # for _, object in nwbfile.objects.items(): +# # if isinstance(object, ImageSeries): +# # this_external_file = image_folder / Path(str(object.external_file[0])).name +# # corrected_name = "_".join(dandi_stem_split[:2]) + f"_{object.name}{this_external_file.suffix}" +# # this_external_file = this_external_file.rename(this_external_file.parent / corrected_name) +# # object.external_file[0] = "./" + str(this_external_file.relative_to(organized_nwbfile.parent)) +# # +# # assert len(list(dandiset_path.iterdir())) > 1, "DANDI organize failed!" +# # +# # dandi_instance = "dandi-staging" if staging else "dandi" +# # dandi_upload(paths=[dandiset_folder_path / dandiset_id], dandi_instance=dandi_instance) +# # +# # # Cleanup should be confirmed manually; Windows especially can complain +# # if cleanup: +# # try: +# # rmtree(path=dandiset_folder_path) +# # except PermissionError: # pragma: no cover +# # warn("Unable to clean up source files and dandiset! Please manually delete them.", stacklevel=2) +# +# +# def convert_and_upload_session( +# base_path: Path, +# session: str, +# nwbfile_path: str, +# stub_test: bool = False, +# progress_position: int = 0, # cleanup: bool = False, +# files_mode: str = "move", # ): -# """ -# Fully automated upload of NWBFiles to a DANDISet. +# try: +# assert len(os.environ.get("DANDI_API_KEY", "")) > 0, "Run `export DANDI_API_KEY=...`!" +# +# # Download behavior and spike sorted data for this session +# # session_path = base_path / "ibl_conversion" / session +# cache_folder = base_path / "ibl_conversion" / session / "cache" +# session_one = ONE( +# base_url="https://openalyx.internationalbrainlab.org", +# password="international", +# silent=True, +# cache_dir=cache_folder, +# ) +# +# # Get stream names from SI +# ap_stream_names = IblStreamingApInterface.get_stream_names(session=session) +# lf_stream_names = IblStreamingLfInterface.get_stream_names(session=session) +# +# # Initialize as many of each interface as we need across the streams +# data_interfaces = list() +# for stream_name in ap_stream_names: +# data_interfaces.append( +# IblStreamingApInterface( +# session=session, stream_name=stream_name, cache_folder=cache_folder / "ap_recordings" +# ) +# ) +# for stream_name in lf_stream_names: +# data_interfaces.append( +# IblStreamingLfInterface( +# session=session, stream_name=stream_name, cache_folder=cache_folder / "lf_recordings" +# ) +# ) +# +# # These interfaces should always be present in source data +# data_interfaces.append(IblSortingInterface(session=session, cache_folder=cache_folder / "sorting")) +# data_interfaces.append(BrainwideMapTrialsInterface(one=session_one, session=session)) +# data_interfaces.append(WheelInterface(one=session_one, session=session)) +# +# # These interfaces may not be present; check if they are before adding to list +# pose_estimation_files = session_one.list_datasets(eid=session, filename="*.dlc*") +# for pose_estimation_file in pose_estimation_files: +# camera_name = pose_estimation_file.replace("alf/_ibl_", "").replace(".dlc.pqt", "") +# data_interfaces.append( +# IblPoseEstimationInterface(one=session_one, session=session, camera_name=camera_name) +# ) # -# Requires an API token set as an envrinment variable named DANDI_API_KEY. +# pupil_tracking_files = session_one.list_datasets(eid=session, filename="*features*") +# for pupil_tracking_file in pupil_tracking_files: +# camera_name = pupil_tracking_file.replace("alf/_ibl_", "").replace(".features.pqt", "") +# data_interfaces.append(PupilTrackingInterface(one=session_one, session=session, camera_name=camera_name)) # -# To set this in your bash terminal in Linux or MacOS, run -# export DANDI_API_KEY=... -# or in Windows -# set DANDI_API_KEY=... +# roi_motion_energy_files = session_one.list_datasets(eid=session, filename="*ROIMotionEnergy.npy*") +# for roi_motion_energy_file in roi_motion_energy_files: +# camera_name = roi_motion_energy_file.replace("alf/", "").replace(".ROIMotionEnergy.npy", "") +# data_interfaces.append(RoiMotionEnergyInterface(one=session_one, session=session, camera_name=camera_name)) # -# DO NOT STORE THIS IN ANY PUBLICLY SHARED CODE. +# if session_one.list_datasets(eid=session, collection="alf", filename="licks*"): +# data_interfaces.append(LickInterface(one=session_one, session=session)) # -# Parameters -# ---------- -# dandiset_id : str -# Six-digit string identifier for the DANDISet the NWBFiles will be uploaded to. -# nwb_folder_path : folder path -# Folder containing the NWBFiles to be uploaded. -# dandiset_folder_path : folder path, optional -# A separate folder location within which to download the dandiset. -# Used in cases where you do not have write permissions for the parent of the 'nwb_folder_path' directory. -# Default behavior downloads the DANDISet to a folder adjacent to the 'nwb_folder_path'. -# version : {None, "draft", "version"} -# The default is "draft". -# staging : bool, default: False -# Is the DANDISet hosted on the staging server? This is mostly for testing purposes. -# The default is False. -# cleanup : bool, default: False -# Whether to remove the dandiset folder path and nwb_folder_path. -# Defaults to False. -# """ -# nwb_folder_path = Path(nwb_folder_path) -# dandiset_folder_path = ( -# Path(mkdtemp(dir=nwb_folder_path.parent)) if dandiset_folder_path is None else dandiset_folder_path -# ) -# dandiset_path = dandiset_folder_path / dandiset_id -# assert os.getenv("DANDI_API_KEY"), ( -# "Unable to find environment variable 'DANDI_API_KEY'. " -# "Please retrieve your token from DANDI and set this environment variable." -# ) +# # Run conversion +# session_converter = BrainwideMapConverter( +# one=session_one, session=session, data_interfaces=data_interfaces, verbose=False +# ) # -# url_base = "https://gui-staging.dandiarchive.org" if staging else "https://dandiarchive.org" -# dandiset_url = f"{url_base}/dandiset/{dandiset_id}/{version}" -# dandi_download(urls=dandiset_url, output_dir=str(dandiset_folder_path), get_metadata=True, get_assets=False) -# assert dandiset_path.exists(), "DANDI download failed!" +# conversion_options = dict() +# if stub_test: +# for data_interface_name in session_converter.data_interface_objects: +# if "Ap" in data_interface_name or "Lf" in data_interface_name: +# conversion_options.update( +# { +# data_interface_name: dict( +# progress_position=progress_position, +# stub_test=True, +# ) +# } +# ) +# else: +# for data_interface_name in session_converter.data_interface_objects: +# if "Ap" in data_interface_name or "Lf" in data_interface_name: +# conversion_options.update( +# { +# data_interface_name: dict( +# progress_position=progress_position, +# ) +# } +# ) # -# dandi_organize( -# paths=str(nwb_folder_path), -# dandiset_path=str(dandiset_path), -# update_external_file_paths=True, -# files_mode=files_mode, -# media_files_mode=files_mode, -# ) -# organized_nwbfiles = dandiset_path.rglob("*.nwb") +# metadata = session_converter.get_metadata() # -# # DANDI has yet to implement forcing of session_id inclusion in organize step -# # This manually enforces it when only a single session per subject is organized -# for organized_nwbfile in organized_nwbfiles: -# if "ses" not in organized_nwbfile.stem: -# with NWBHDF5IO(path=organized_nwbfile, mode="r", load_namespaces=True) as io: -# nwbfile = io.read() -# session_id = nwbfile.session_id -# dandi_stem = organized_nwbfile.stem -# dandi_stem_split = dandi_stem.split("_") -# dandi_stem_split.insert(1, f"ses-{session_id}") -# corrected_name = "_".join(dandi_stem_split) + ".nwb" -# organized_nwbfile.rename(organized_nwbfile.parent / corrected_name) -# organized_nwbfiles = list(dandiset_path.rglob("*.nwb")) -# # The above block can be removed once they add the feature +# session_converter.run_conversion( +# nwbfile_path=nwbfile_path, +# metadata=metadata, +# conversion_options=conversion_options, +# overwrite=True, +# ) +# # automatic_dandi_upload( +# # dandiset_id="000409", nwb_folder_path=nwbfile_path.parent, cleanup=cleanup, files_mode=files_mode +# # ) +# if cleanup: +# rmtree(cache_folder) +# rmtree(nwbfile_path.parent) # -# # If any external images -# image_folders = set(dandiset_path.rglob("*image*")) - set(organized_nwbfiles) -# for image_folder in image_folders: -# if "ses" not in image_folder.stem and len(organized_nwbfiles) == 1: # Think about multiple file case -# corrected_name = "_".join(dandi_stem_split) -# image_folder = image_folder.rename(image_folder.parent / corrected_name) +# return 1 +# except Exception as exception: +# error_file_path = base_path / "errors" / "8-7-23" / f"{session}_error.txt" +# error_file_path.parent.mkdir(exist_ok=True) +# with open(file=error_file_path, mode="w") as file: +# file.write(f"{type(exception)}: {str(exception)}\n{traceback.format_exc()}") +# return 0 # -# # For image in folder, rename -# with NWBHDF5IO(path=organized_nwbfiles[0], mode="r+", load_namespaces=True) as io: -# nwbfile = io.read() -# for _, object in nwbfile.objects.items(): -# if isinstance(object, ImageSeries): -# this_external_file = image_folder / Path(str(object.external_file[0])).name -# corrected_name = "_".join(dandi_stem_split[:2]) + f"_{object.name}{this_external_file.suffix}" -# this_external_file = this_external_file.rename(this_external_file.parent / corrected_name) -# object.external_file[0] = "./" + str(this_external_file.relative_to(organized_nwbfile.parent)) # -# assert len(list(dandiset_path.iterdir())) > 1, "DANDI organize failed!" +# number_of_parallel_jobs = 8 +# base_path = Path("/home/jovyan/IBL") # prototype on DANDI Hub for now # -# dandi_instance = "dandi-staging" if staging else "dandi" -# dandi_upload(paths=[dandiset_folder_path / dandiset_id], dandi_instance=dandi_instance) +# session_retrieval_one = ONE( +# base_url="https://openalyx.internationalbrainlab.org", password="international", silent=True +# ) +# brain_wide_sessions = session_retrieval_one.alyx.rest(url="sessions", action="list", tag="2022_Q4_IBL_et_al_BWM") # -# # Cleanup should be confirmed manually; Windows especially can complain -# if cleanup: -# try: -# rmtree(path=dandiset_folder_path) -# except PermissionError: # pragma: no cover -# warn("Unable to clean up source files and dandiset! Please manually delete them.", stacklevel=2) - - -def convert_and_upload_session( - base_path: Path, - session: str, - nwbfile_path: str, - stub_test: bool = False, - progress_position: int = 0, - cleanup: bool = False, - files_mode: str = "move", -): - try: - assert len(os.environ.get("DANDI_API_KEY", "")) > 0, "Run `export DANDI_API_KEY=...`!" - - # Download behavior and spike sorted data for this session - # session_path = base_path / "ibl_conversion" / session - cache_folder = base_path / "ibl_conversion" / session / "cache" - session_one = ONE( - base_url="https://openalyx.internationalbrainlab.org", - password="international", - silent=True, - cache_dir=cache_folder, - ) - - # Get stream names from SI - ap_stream_names = IblStreamingApInterface.get_stream_names(session=session) - lf_stream_names = IblStreamingLfInterface.get_stream_names(session=session) - - # Initialize as many of each interface as we need across the streams - data_interfaces = list() - for stream_name in ap_stream_names: - data_interfaces.append( - IblStreamingApInterface( - session=session, stream_name=stream_name, cache_folder=cache_folder / "ap_recordings" - ) - ) - for stream_name in lf_stream_names: - data_interfaces.append( - IblStreamingLfInterface( - session=session, stream_name=stream_name, cache_folder=cache_folder / "lf_recordings" - ) - ) - - # These interfaces should always be present in source data - data_interfaces.append(IblSortingInterface(session=session, cache_folder=cache_folder / "sorting")) - data_interfaces.append(BrainwideMapTrialsInterface(one=session_one, session=session)) - data_interfaces.append(WheelInterface(one=session_one, session=session)) - - # These interfaces may not be present; check if they are before adding to list - pose_estimation_files = session_one.list_datasets(eid=session, filename="*.dlc*") - for pose_estimation_file in pose_estimation_files: - camera_name = pose_estimation_file.replace("alf/_ibl_", "").replace(".dlc.pqt", "") - data_interfaces.append( - IblPoseEstimationInterface(one=session_one, session=session, camera_name=camera_name) - ) - - pupil_tracking_files = session_one.list_datasets(eid=session, filename="*features*") - for pupil_tracking_file in pupil_tracking_files: - camera_name = pupil_tracking_file.replace("alf/_ibl_", "").replace(".features.pqt", "") - data_interfaces.append(PupilTrackingInterface(one=session_one, session=session, camera_name=camera_name)) - - roi_motion_energy_files = session_one.list_datasets(eid=session, filename="*ROIMotionEnergy.npy*") - for roi_motion_energy_file in roi_motion_energy_files: - camera_name = roi_motion_energy_file.replace("alf/", "").replace(".ROIMotionEnergy.npy", "") - data_interfaces.append(RoiMotionEnergyInterface(one=session_one, session=session, camera_name=camera_name)) - - if session_one.list_datasets(eid=session, collection="alf", filename="licks*"): - data_interfaces.append(LickInterface(one=session_one, session=session)) - - # Run conversion - session_converter = BrainwideMapConverter( - one=session_one, session=session, data_interfaces=data_interfaces, verbose=False - ) - - conversion_options = dict() - if stub_test: - for data_interface_name in session_converter.data_interface_objects: - if "Ap" in data_interface_name or "Lf" in data_interface_name: - conversion_options.update( - { - data_interface_name: dict( - progress_position=progress_position, - stub_test=True, - ) - } - ) - else: - for data_interface_name in session_converter.data_interface_objects: - if "Ap" in data_interface_name or "Lf" in data_interface_name: - conversion_options.update( - { - data_interface_name: dict( - progress_position=progress_position, - ) - } - ) - - metadata = session_converter.get_metadata() - - session_converter.run_conversion( - nwbfile_path=nwbfile_path, - metadata=metadata, - conversion_options=conversion_options, - overwrite=True, - ) - # automatic_dandi_upload( - # dandiset_id="000409", nwb_folder_path=nwbfile_path.parent, cleanup=cleanup, files_mode=files_mode - # ) - if cleanup: - rmtree(cache_folder) - rmtree(nwbfile_path.parent) - - return 1 - except Exception as exception: - error_file_path = base_path / "errors" / "8-7-23" / f"{session}_error.txt" - error_file_path.parent.mkdir(exist_ok=True) - with open(file=error_file_path, mode="w") as file: - file.write(f"{type(exception)}: {str(exception)}\n{traceback.format_exc()}") - return 0 - - -number_of_parallel_jobs = 8 -base_path = Path("/home/jovyan/IBL") # prototype on DANDI Hub for now - -session_retrieval_one = ONE( - base_url="https://openalyx.internationalbrainlab.org", password="international", silent=True -) -brain_wide_sessions = session_retrieval_one.alyx.rest(url="sessions", action="list", tag="2022_Q4_IBL_et_al_BWM") - -with ProcessPoolExecutor(max_workers=number_of_parallel_jobs) as executor: - with tqdm(total=len(brain_wide_sessions), position=0, desc="Converting sessions...") as main_progress_bar: - futures = [] - for progress_position, session_info in enumerate(brain_wide_sessions): - session = session_info["id"] - nwbfile_path = base_path / "nwbfiles" / session / f"{session}.nwb" - nwbfile_path.parent.mkdir(exist_ok=True) - futures.append( - executor.submit( - convert_and_upload_session, - base_path=base_path, - session=session, - nwbfile_path=nwbfile_path, - progress_position=1 + progress_position, - # stub_test=True, - # files_mode="copy", # useful when debugging - # cleanup=False, - ) - ) - for future in as_completed(futures): - status = future.result() - main_progress_bar.update(1) +# with ProcessPoolExecutor(max_workers=number_of_parallel_jobs) as executor: +# with tqdm(total=len(brain_wide_sessions), position=0, desc="Converting sessions...") as main_progress_bar: +# futures = [] +# for progress_position, session_info in enumerate(brain_wide_sessions): +# session = session_info["id"] +# nwbfile_path = base_path / "nwbfiles" / session / f"{session}.nwb" +# nwbfile_path.parent.mkdir(exist_ok=True) +# futures.append( +# executor.submit( +# convert_and_upload_session, +# base_path=base_path, +# session=session, +# nwbfile_path=nwbfile_path, +# progress_position=1 + progress_position, +# # stub_test=True, +# # files_mode="copy", # useful when debugging +# # cleanup=False, +# ) +# ) +# for future in as_completed(futures): +# status = future.result() +# main_progress_bar.update(1) diff --git a/src/ibl_to_nwb/_scripts/convert_brainwide_map_processed_only.py b/src/ibl_to_nwb/_scripts/convert_brainwide_map_processed_only.py index 3341c7e..068fcd8 100644 --- a/src/ibl_to_nwb/_scripts/convert_brainwide_map_processed_only.py +++ b/src/ibl_to_nwb/_scripts/convert_brainwide_map_processed_only.py @@ -7,7 +7,7 @@ from one.api import ONE -from src.ibl_to_nwb import ( +from ibl_to_nwb import ( BrainwideMapConverter, BrainwideMapTrialsInterface, IblPoseEstimationInterface, @@ -18,63 +18,60 @@ WheelInterface, ) -stub_test = False -cleanup = False - base_path = Path("E:/IBL") -session = "d32876dd-8303-4720-8e7e-20678dc2fd71" - -nwbfile_path = base_path / "nwbfiles" / session / f"{session}.nwb" -nwbfile_path.parent.mkdir(parents=True, exist_ok=True) +nwbfiles_folder_path = base_path / "nwbfiles" +nwbfiles_folder_path.mkdir(exist_ok=True) -nwbfile_path.parent.mkdir(exist_ok=True) +session_id = "d32876dd-8303-4720-8e7e-20678dc2fd71" -# Download behavior and spike sorted data for this session -# session_path = base_path / "ibl_conversion" / session -cache_folder = base_path / "ibl_conversion" / session / "cache" -session_one = ONE( +# Initialize IBL (ONE) client to download processed data for this session +one_cache_folder_path = base_path / "cache" +ibl_client = ONE( base_url="https://openalyx.internationalbrainlab.org", password="international", silent=True, - cache_dir=cache_folder, + cache_dir=one_cache_folder_path, ) # Initialize as many of each interface as we need across the streams data_interfaces = list() # These interfaces should always be present in source data -data_interfaces.append(IblSortingInterface(session=session, cache_folder=cache_folder / "sorting")) -data_interfaces.append(BrainwideMapTrialsInterface(one=session_one, session=session)) -data_interfaces.append(WheelInterface(one=session_one, session=session)) +data_interfaces.append(IblSortingInterface(session=session_id, cache_folder=one_cache_folder_path / "sorting")) +data_interfaces.append(BrainwideMapTrialsInterface(one=ibl_client, session=session_id)) +data_interfaces.append(WheelInterface(one=ibl_client, session=session_id)) # These interfaces may not be present; check if they are before adding to list -pose_estimation_files = session_one.list_datasets(eid=session, filename="*.dlc*") +pose_estimation_files = ibl_client.list_datasets(eid=session_id, filename="*.dlc*") for pose_estimation_file in pose_estimation_files: camera_name = pose_estimation_file.replace("alf/_ibl_", "").replace(".dlc.pqt", "") data_interfaces.append( - IblPoseEstimationInterface(one=session_one, session=session, camera_name=camera_name, include_video=False) + IblPoseEstimationInterface(one=ibl_client, session=session_id, camera_name=camera_name, include_video=False) ) -pupil_tracking_files = session_one.list_datasets(eid=session, filename="*features*") +pupil_tracking_files = ibl_client.list_datasets(eid=session_id, filename="*features*") for pupil_tracking_file in pupil_tracking_files: camera_name = pupil_tracking_file.replace("alf/_ibl_", "").replace(".features.pqt", "") - data_interfaces.append(PupilTrackingInterface(one=session_one, session=session, camera_name=camera_name)) + data_interfaces.append(PupilTrackingInterface(one=ibl_client, session=session_id, camera_name=camera_name)) -roi_motion_energy_files = session_one.list_datasets(eid=session, filename="*ROIMotionEnergy.npy*") +roi_motion_energy_files = ibl_client.list_datasets(eid=session_id, filename="*ROIMotionEnergy.npy*") for roi_motion_energy_file in roi_motion_energy_files: camera_name = roi_motion_energy_file.replace("alf/", "").replace(".ROIMotionEnergy.npy", "") - data_interfaces.append(RoiMotionEnergyInterface(one=session_one, session=session, camera_name=camera_name)) + data_interfaces.append(RoiMotionEnergyInterface(one=ibl_client, session=session_id, camera_name=camera_name)) -if session_one.list_datasets(eid=session, collection="alf", filename="licks*"): - data_interfaces.append(LickInterface(one=session_one, session=session)) +if ibl_client.list_datasets(eid=session_id, collection="alf", filename="licks*"): + data_interfaces.append(LickInterface(one=ibl_client, session=session_id)) # Run conversion session_converter = BrainwideMapConverter( - one=session_one, session=session, data_interfaces=data_interfaces, verbose=False + one=ibl_client, session=session_id, data_interfaces=data_interfaces, verbose=False ) metadata = session_converter.get_metadata() -metadata["NWBFile"]["session_id"] = metadata["NWBFile"]["session_id"] + "-processed-only" +subject_id = metadata["Subject"]["subject_id"] + +subject_folder_path = nwbfiles_folder_path / f"sub-{subject_id}" +nwbfile_path = nwbfiles_folder_path / f"sub-{subject_id}_ses-{session_id}_desc-processed_behavior+ecephys.nwb" session_converter.run_conversion( nwbfile_path=nwbfile_path, diff --git a/src/ibl_to_nwb/_scripts/convert_brainwide_map_processed_only_local_testing.py b/src/ibl_to_nwb/_scripts/convert_brainwide_map_processed_only_local_testing.py index 3310178..8200505 100644 --- a/src/ibl_to_nwb/_scripts/convert_brainwide_map_processed_only_local_testing.py +++ b/src/ibl_to_nwb/_scripts/convert_brainwide_map_processed_only_local_testing.py @@ -20,11 +20,11 @@ # from pynwb import NWBHDF5IO # from pynwb.image import ImageSeries # from tqdm import tqdm -from src.ibl_to_nwb.brainwide_map import BrainwideMapConverter -from src.ibl_to_nwb.brainwide_map.datainterfaces import ( +from ibl_to_nwb.brainwide_map import BrainwideMapConverter +from ibl_to_nwb.brainwide_map.datainterfaces import ( BrainwideMapTrialsInterface, ) -from src.ibl_to_nwb.datainterfaces import ( +from ibl_to_nwb.datainterfaces import ( IblPoseEstimationInterface, IblSortingInterface, LickInterface, diff --git a/src/ibl_to_nwb/_scripts/convert_brainwide_map_processed_only_parallel.py b/src/ibl_to_nwb/_scripts/convert_brainwide_map_processed_only_parallel.py index b28ced0..308dd39 100644 --- a/src/ibl_to_nwb/_scripts/convert_brainwide_map_processed_only_parallel.py +++ b/src/ibl_to_nwb/_scripts/convert_brainwide_map_processed_only_parallel.py @@ -1,156 +1,156 @@ -import os - -os.environ["JUPYTER_PLATFORM_DIRS"] = "1" # Annoying - -import os -import traceback -from concurrent.futures import ProcessPoolExecutor, as_completed -from pathlib import Path -from shutil import rmtree - -from neuroconv.tools.data_transfers import automatic_dandi_upload -from nwbinspector.tools import get_s3_urls_and_dandi_paths -from one.api import ONE -from tqdm import tqdm - -from src.ibl_to_nwb.brainwide_map import BrainwideMapConverter -from src.ibl_to_nwb.brainwide_map.datainterfaces import ( - BrainwideMapTrialsInterface, -) -from src.ibl_to_nwb.datainterfaces import ( - IblPoseEstimationInterface, - IblSortingInterface, - LickInterface, - PupilTrackingInterface, - RoiMotionEnergyInterface, - WheelInterface, -) - - -def convert_and_upload_parallel_processed_only( - base_path: Path, - session: str, - nwbfile_path: str, - stub_test: bool = False, - progress_position: int = 0, - cleanup: bool = False, - files_mode: str = "move", -): - try: - assert len(os.environ.get("DANDI_API_KEY", "")) > 0, "Run `export DANDI_API_KEY=...`!" - - nwbfile_path.parent.mkdir(exist_ok=True) - - # Download behavior and spike sorted data for this session - # session_path = base_path / "ibl_conversion" / session - cache_folder = base_path / "ibl_conversion" / session / "cache" - session_one = ONE( - base_url="https://openalyx.internationalbrainlab.org", - password="international", - silent=True, - cache_dir=cache_folder, - ) - - # Initialize as many of each interface as we need across the streams - data_interfaces = list() - - # These interfaces should always be present in source data - data_interfaces.append(IblSortingInterface(session=session, cache_folder=cache_folder / "sorting")) - data_interfaces.append(BrainwideMapTrialsInterface(one=session_one, session=session)) - data_interfaces.append(WheelInterface(one=session_one, session=session)) - - # These interfaces may not be present; check if they are before adding to list - pose_estimation_files = session_one.list_datasets(eid=session, filename="*.dlc*") - for pose_estimation_file in pose_estimation_files: - camera_name = pose_estimation_file.replace("alf/_ibl_", "").replace(".dlc.pqt", "") - data_interfaces.append( - IblPoseEstimationInterface( - one=session_one, session=session, camera_name=camera_name, include_video=False - ) - ) - - pupil_tracking_files = session_one.list_datasets(eid=session, filename="*features*") - for pupil_tracking_file in pupil_tracking_files: - camera_name = pupil_tracking_file.replace("alf/_ibl_", "").replace(".features.pqt", "") - data_interfaces.append(PupilTrackingInterface(one=session_one, session=session, camera_name=camera_name)) - - roi_motion_energy_files = session_one.list_datasets(eid=session, filename="*ROIMotionEnergy.npy*") - for roi_motion_energy_file in roi_motion_energy_files: - camera_name = roi_motion_energy_file.replace("alf/", "").replace(".ROIMotionEnergy.npy", "") - data_interfaces.append(RoiMotionEnergyInterface(one=session_one, session=session, camera_name=camera_name)) - - if session_one.list_datasets(eid=session, collection="alf", filename="licks*"): - data_interfaces.append(LickInterface(one=session_one, session=session)) - - # Run conversion - session_converter = BrainwideMapConverter( - one=session_one, session=session, data_interfaces=data_interfaces, verbose=False - ) - - metadata = session_converter.get_metadata() - metadata["NWBFile"]["session_id"] = metadata["NWBFile"]["session_id"] + "-processed-only" - - session_converter.run_conversion( - nwbfile_path=nwbfile_path, - metadata=metadata, - overwrite=True, - ) - automatic_dandi_upload( - dandiset_id="000409", - nwb_folder_path=nwbfile_path.parent, - cleanup=cleanup, # files_mode=files_mode - ) - if cleanup: - rmtree(cache_folder) - rmtree(nwbfile_path.parent) - - return 1 - except Exception as exception: - error_file_path = base_path / "errors" / "7-30-23" / f"{session}_error.txt" - error_file_path.parent.mkdir(exist_ok=True) - with open(file=error_file_path, mode="w") as file: - file.write(f"{type(exception)}: {str(exception)}\n{traceback.format_exc()}") - return 0 - - -number_of_parallel_jobs = 8 -base_path = Path("/home/jovyan/IBL") # prototype on DANDI Hub for now - -session_retrieval_one = ONE( - base_url="https://openalyx.internationalbrainlab.org", password="international", silent=True -) -brain_wide_sessions = [ - session_info["id"] - for session_info in session_retrieval_one.alyx.rest(url="sessions", action="list", tag="2022_Q4_IBL_et_al_BWM") -] - -# Already written sessions -dandi_file_paths = list(get_s3_urls_and_dandi_paths(dandiset_id="000409").values()) -dandi_processed_file_paths = [dandi_file_path for dandi_file_path in dandi_file_paths if "processed" in dandi_file_path] -already_written_processed_sessions = [ - processed_dandi_file_path.split("ses-")[1].split("_")[0].strip("-processed-only") - for processed_dandi_file_path in dandi_processed_file_paths -] -sessions_to_run = list(set(brain_wide_sessions) - set(already_written_processed_sessions)) - -with ProcessPoolExecutor(max_workers=number_of_parallel_jobs) as executor: - with tqdm(total=len(sessions_to_run), position=0, desc="Converting sessions...") as main_progress_bar: - futures = [] - for progress_position, session in enumerate(sessions_to_run): - nwbfile_path = base_path / "nwbfiles" / session / f"{session}.nwb" - nwbfile_path.parent.mkdir(exist_ok=True) - futures.append( - executor.submit( - convert_and_upload_parallel_processed_only, - base_path=base_path, - session=session, - nwbfile_path=nwbfile_path, - progress_position=1 + progress_position, - # stub_test=True, - # files_mode="copy", # useful when debugging - # cleanup=True, # causing shutil error ATM - ) - ) - for future in as_completed(futures): - status = future.result() - main_progress_bar.update(1) +# import os +# +# os.environ["JUPYTER_PLATFORM_DIRS"] = "1" # Annoying +# +# import os +# import traceback +# from concurrent.futures import ProcessPoolExecutor, as_completed +# from pathlib import Path +# from shutil import rmtree +# +# from neuroconv.tools.data_transfers import automatic_dandi_upload +# from nwbinspector.tools import get_s3_urls_and_dandi_paths +# from one.api import ONE +# from tqdm import tqdm +# +# from ibl_to_nwb.brainwide_map import BrainwideMapConverter +# from ibl_to_nwb.brainwide_map.datainterfaces import ( +# BrainwideMapTrialsInterface, +# ) +# from ibl_to_nwb.datainterfaces import ( +# IblPoseEstimationInterface, +# IblSortingInterface, +# LickInterface, +# PupilTrackingInterface, +# RoiMotionEnergyInterface, +# WheelInterface, +# ) +# +# +# def convert_and_upload_parallel_processed_only( +# base_path: Path, +# session: str, +# nwbfile_path: str, +# stub_test: bool = False, +# progress_position: int = 0, +# cleanup: bool = False, +# files_mode: str = "move", +# ): +# try: +# assert len(os.environ.get("DANDI_API_KEY", "")) > 0, "Run `export DANDI_API_KEY=...`!" +# +# nwbfile_path.parent.mkdir(exist_ok=True) +# +# # Download behavior and spike sorted data for this session +# # session_path = base_path / "ibl_conversion" / session +# cache_folder = base_path / "ibl_conversion" / session / "cache" +# session_one = ONE( +# base_url="https://openalyx.internationalbrainlab.org", +# password="international", +# silent=True, +# cache_dir=cache_folder, +# ) +# +# # Initialize as many of each interface as we need across the streams +# data_interfaces = list() +# +# # These interfaces should always be present in source data +# data_interfaces.append(IblSortingInterface(session=session, cache_folder=cache_folder / "sorting")) +# data_interfaces.append(BrainwideMapTrialsInterface(one=session_one, session=session)) +# data_interfaces.append(WheelInterface(one=session_one, session=session)) +# +# # These interfaces may not be present; check if they are before adding to list +# pose_estimation_files = session_one.list_datasets(eid=session, filename="*.dlc*") +# for pose_estimation_file in pose_estimation_files: +# camera_name = pose_estimation_file.replace("alf/_ibl_", "").replace(".dlc.pqt", "") +# data_interfaces.append( +# IblPoseEstimationInterface( +# one=session_one, session=session, camera_name=camera_name, include_video=False +# ) +# ) +# +# pupil_tracking_files = session_one.list_datasets(eid=session, filename="*features*") +# for pupil_tracking_file in pupil_tracking_files: +# camera_name = pupil_tracking_file.replace("alf/_ibl_", "").replace(".features.pqt", "") +# data_interfaces.append(PupilTrackingInterface(one=session_one, session=session, camera_name=camera_name)) +# +# roi_motion_energy_files = session_one.list_datasets(eid=session, filename="*ROIMotionEnergy.npy*") +# for roi_motion_energy_file in roi_motion_energy_files: +# camera_name = roi_motion_energy_file.replace("alf/", "").replace(".ROIMotionEnergy.npy", "") +# data_interfaces.append(RoiMotionEnergyInterface(one=session_one, session=session, camera_name=camera_name)) +# +# if session_one.list_datasets(eid=session, collection="alf", filename="licks*"): +# data_interfaces.append(LickInterface(one=session_one, session=session)) +# +# # Run conversion +# session_converter = BrainwideMapConverter( +# one=session_one, session=session, data_interfaces=data_interfaces, verbose=False +# ) +# +# metadata = session_converter.get_metadata() +# metadata["NWBFile"]["session_id"] = metadata["NWBFile"]["session_id"] + "-processed-only" +# +# session_converter.run_conversion( +# nwbfile_path=nwbfile_path, +# metadata=metadata, +# overwrite=True, +# ) +# automatic_dandi_upload( +# dandiset_id="000409", +# nwb_folder_path=nwbfile_path.parent, +# cleanup=cleanup, # files_mode=files_mode +# ) +# if cleanup: +# rmtree(cache_folder) +# rmtree(nwbfile_path.parent) +# +# return 1 +# except Exception as exception: +# error_file_path = base_path / "errors" / "7-30-23" / f"{session}_error.txt" +# error_file_path.parent.mkdir(exist_ok=True) +# with open(file=error_file_path, mode="w") as file: +# file.write(f"{type(exception)}: {str(exception)}\n{traceback.format_exc()}") +# return 0 +# +# +# number_of_parallel_jobs = 8 +# base_path = Path("/home/jovyan/IBL") # prototype on DANDI Hub for now +# +# session_retrieval_one = ONE( +# base_url="https://openalyx.internationalbrainlab.org", password="international", silent=True +# ) +# brain_wide_sessions = [ +# session_info["id"] +# for session_info in session_retrieval_one.alyx.rest(url="sessions", action="list", tag="2022_Q4_IBL_et_al_BWM") +# ] +# +# # Already written sessions +# dandi_file_paths = list(get_s3_urls_and_dandi_paths(dandiset_id="000409").values()) +# dandi_processed_file_paths = [dandi_file_path for dandi_file_path in dandi_file_paths if "processed" in dandi_file_path] +# already_written_processed_sessions = [ +# processed_dandi_file_path.split("ses-")[1].split("_")[0].strip("-processed-only") +# for processed_dandi_file_path in dandi_processed_file_paths +# ] +# sessions_to_run = list(set(brain_wide_sessions) - set(already_written_processed_sessions)) +# +# with ProcessPoolExecutor(max_workers=number_of_parallel_jobs) as executor: +# with tqdm(total=len(sessions_to_run), position=0, desc="Converting sessions...") as main_progress_bar: +# futures = [] +# for progress_position, session in enumerate(sessions_to_run): +# nwbfile_path = base_path / "nwbfiles" / session / f"{session}.nwb" +# nwbfile_path.parent.mkdir(exist_ok=True) +# futures.append( +# executor.submit( +# convert_and_upload_parallel_processed_only, +# base_path=base_path, +# session=session, +# nwbfile_path=nwbfile_path, +# progress_position=1 + progress_position, +# # stub_test=True, +# # files_mode="copy", # useful when debugging +# # cleanup=True, # causing shutil error ATM +# ) +# ) +# for future in as_completed(futures): +# status = future.result() +# main_progress_bar.update(1) diff --git a/src/ibl_to_nwb/_scripts/convert_brainwide_map_raw_only_parallel.py b/src/ibl_to_nwb/_scripts/convert_brainwide_map_raw_only_parallel.py index c0d45d1..2caa5e2 100644 --- a/src/ibl_to_nwb/_scripts/convert_brainwide_map_raw_only_parallel.py +++ b/src/ibl_to_nwb/_scripts/convert_brainwide_map_raw_only_parallel.py @@ -1,270 +1,270 @@ -import os - -os.environ["JUPYTER_PLATFORM_DIRS"] = "1" # Annoying - -import os -import traceback -from concurrent.futures import ProcessPoolExecutor, as_completed -from pathlib import Path -from shutil import rmtree - -from neuroconv.tools.data_transfers import automatic_dandi_upload -from one.api import ONE -from tqdm import tqdm - -from src.ibl_to_nwb.brainwide_map import BrainwideMapConverter -from src.ibl_to_nwb.datainterfaces import ( - IblPoseEstimationInterface, - IblStreamingApInterface, - IblStreamingLfInterface, -) - -# def automatic_dandi_upload( -# dandiset_id: str, -# nwb_folder_path: str, -# dandiset_folder_path: str = None, -# version: str = "draft", -# files_mode: str = "move", -# staging: bool = False, +# import os +# +# os.environ["JUPYTER_PLATFORM_DIRS"] = "1" # Annoying +# +# import os +# import traceback +# from concurrent.futures import ProcessPoolExecutor, as_completed +# from pathlib import Path +# from shutil import rmtree +# +# from neuroconv.tools.data_transfers import automatic_dandi_upload +# from one.api import ONE +# from tqdm import tqdm +# +# from ibl_to_nwb.brainwide_map import BrainwideMapConverter +# from ibl_to_nwb.datainterfaces import ( +# IblPoseEstimationInterface, +# IblStreamingApInterface, +# IblStreamingLfInterface, +# ) +# +# # def automatic_dandi_upload( +# # dandiset_id: str, +# # nwb_folder_path: str, +# # dandiset_folder_path: str = None, +# # version: str = "draft", +# # files_mode: str = "move", +# # staging: bool = False, +# # cleanup: bool = False, +# # ): +# # """ +# # Fully automated upload of NWBFiles to a DANDISet. +# # +# # Requires an API token set as an envrinment variable named DANDI_API_KEY. +# # +# # To set this in your bash terminal in Linux or MacOS, run +# # export DANDI_API_KEY=... +# # or in Windows +# # set DANDI_API_KEY=... +# # +# # DO NOT STORE THIS IN ANY PUBLICLY SHARED CODE. +# # +# # Parameters +# # ---------- +# # dandiset_id : str +# # Six-digit string identifier for the DANDISet the NWBFiles will be uploaded to. +# # nwb_folder_path : folder path +# # Folder containing the NWBFiles to be uploaded. +# # dandiset_folder_path : folder path, optional +# # A separate folder location within which to download the dandiset. +# # Used in cases where you do not have write permissions for the parent of the 'nwb_folder_path' directory. +# # Default behavior downloads the DANDISet to a folder adjacent to the 'nwb_folder_path'. +# # version : {None, "draft", "version"} +# # The default is "draft". +# # staging : bool, default: False +# # Is the DANDISet hosted on the staging server? This is mostly for testing purposes. +# # The default is False. +# # cleanup : bool, default: False +# # Whether to remove the dandiset folder path and nwb_folder_path. +# # Defaults to False. +# # """ +# # nwb_folder_path = Path(nwb_folder_path) +# # dandiset_folder_path = ( +# # Path(mkdtemp(dir=nwb_folder_path.parent)) if dandiset_folder_path is None else dandiset_folder_path +# # ) +# # dandiset_path = dandiset_folder_path / dandiset_id +# # assert os.getenv("DANDI_API_KEY"), ( +# # "Unable to find environment variable 'DANDI_API_KEY'. " +# # "Please retrieve your token from DANDI and set this environment variable." +# # ) +# # +# # url_base = "https://gui-staging.dandiarchive.org" if staging else "https://dandiarchive.org" +# # dandiset_url = f"{url_base}/dandiset/{dandiset_id}/{version}" +# # dandi_download(urls=dandiset_url, output_dir=str(dandiset_folder_path), get_metadata=True, get_assets=False) +# # assert dandiset_path.exists(), "DANDI download failed!" +# # +# # dandi_organize( +# # paths=str(nwb_folder_path), +# # dandiset_path=str(dandiset_path), +# # update_external_file_paths=True, +# # files_mode=files_mode, +# # media_files_mode=files_mode, +# # ) +# # organized_nwbfiles = dandiset_path.rglob("*.nwb") +# # +# # # DANDI has yet to implement forcing of session_id inclusion in organize step +# # # This manually enforces it when only a single session per subject is organized +# # for organized_nwbfile in organized_nwbfiles: +# # if "ses" not in organized_nwbfile.stem: +# # with NWBHDF5IO(path=organized_nwbfile, mode="r", load_namespaces=True) as io: +# # nwbfile = io.read() +# # session_id = nwbfile.session_id +# # dandi_stem = organized_nwbfile.stem +# # dandi_stem_split = dandi_stem.split("_") +# # dandi_stem_split.insert(1, f"ses-{session_id}") +# # corrected_name = "_".join(dandi_stem_split) + ".nwb" +# # organized_nwbfile.rename(organized_nwbfile.parent / corrected_name) +# # organized_nwbfiles = list(dandiset_path.rglob("*.nwb")) +# # # The above block can be removed once they add the feature +# # +# # # If any external images +# # image_folders = set(dandiset_path.rglob("*image*")) - set(organized_nwbfiles) +# # for image_folder in image_folders: +# # if "ses" not in image_folder.stem and len(organized_nwbfiles) == 1: # Think about multiple file case +# # corrected_name = "_".join(dandi_stem_split) +# # image_folder = image_folder.rename(image_folder.parent / corrected_name) +# # +# # # For image in folder, rename +# # with NWBHDF5IO(path=organized_nwbfiles[0], mode="r+", load_namespaces=True) as io: +# # nwbfile = io.read() +# # for _, object in nwbfile.objects.items(): +# # if isinstance(object, ImageSeries): +# # this_external_file = image_folder / Path(str(object.external_file[0])).name +# # corrected_name = "_".join(dandi_stem_split[:2]) + f"_{object.name}{this_external_file.suffix}" +# # this_external_file = this_external_file.rename(this_external_file.parent / corrected_name) +# # object.external_file[0] = "./" + str(this_external_file.relative_to(organized_nwbfile.parent)) +# # +# # assert len(list(dandiset_path.iterdir())) > 1, "DANDI organize failed!" +# # +# # dandi_instance = "dandi-staging" if staging else "dandi" +# # dandi_upload(paths=[dandiset_folder_path / dandiset_id], dandi_instance=dandi_instance) +# # +# # # Cleanup should be confirmed manually; Windows especially can complain +# # if cleanup: +# # try: +# # rmtree(path=dandiset_folder_path) +# # except PermissionError: # pragma: no cover +# # warn("Unable to clean up source files and dandiset! Please manually delete them.", stacklevel=2) +# +# +# def convert_and_upload_session( +# base_path: Path, +# session: str, +# nwbfile_path: str, +# stub_test: bool = False, +# progress_position: int = 0, # cleanup: bool = False, +# files_mode: str = "move", # ): -# """ -# Fully automated upload of NWBFiles to a DANDISet. +# try: +# assert len(os.environ.get("DANDI_API_KEY", "")) > 0, "Run `export DANDI_API_KEY=...`!" +# +# # Download behavior and spike sorted data for this session +# # session_path = base_path / "ibl_conversion" / f"{session}_{progress_position}" +# cache_folder = base_path / "ibl_conversion" / f"{session}_{progress_position}" / "cache" +# session_one = ONE( +# base_url="https://openalyx.internationalbrainlab.org", +# password="international", +# silent=True, +# cache_dir=cache_folder, +# ) +# +# # Get stream names from SI +# ap_stream_names = IblStreamingApInterface.get_stream_names(session=session) +# lf_stream_names = IblStreamingLfInterface.get_stream_names(session=session) # -# Requires an API token set as an envrinment variable named DANDI_API_KEY. +# # Initialize as many of each interface as we need across the streams +# data_interfaces = list() +# for stream_name in ap_stream_names: +# data_interfaces.append( +# IblStreamingApInterface( +# session=session, stream_name=stream_name, cache_folder=cache_folder / "ap_recordings" +# ) +# ) +# for stream_name in lf_stream_names: +# data_interfaces.append( +# IblStreamingLfInterface( +# session=session, stream_name=stream_name, cache_folder=cache_folder / "lf_recordings" +# ) +# ) # -# To set this in your bash terminal in Linux or MacOS, run -# export DANDI_API_KEY=... -# or in Windows -# set DANDI_API_KEY=... +# # These interfaces may not be present; check if they are before adding to list +# pose_estimation_files = session_one.list_datasets(eid=session, filename="*.dlc*") +# for pose_estimation_file in pose_estimation_files: +# camera_name = pose_estimation_file.replace("alf/_ibl_", "").replace(".dlc.pqt", "") +# data_interfaces.append( +# IblPoseEstimationInterface( +# one=session_one, session=session, camera_name=camera_name, include_video=True, include_pose=False +# ) +# ) # -# DO NOT STORE THIS IN ANY PUBLICLY SHARED CODE. +# # Run conversion +# session_converter = BrainwideMapConverter( +# one=session_one, session=session, data_interfaces=data_interfaces, verbose=False +# ) # -# Parameters -# ---------- -# dandiset_id : str -# Six-digit string identifier for the DANDISet the NWBFiles will be uploaded to. -# nwb_folder_path : folder path -# Folder containing the NWBFiles to be uploaded. -# dandiset_folder_path : folder path, optional -# A separate folder location within which to download the dandiset. -# Used in cases where you do not have write permissions for the parent of the 'nwb_folder_path' directory. -# Default behavior downloads the DANDISet to a folder adjacent to the 'nwb_folder_path'. -# version : {None, "draft", "version"} -# The default is "draft". -# staging : bool, default: False -# Is the DANDISet hosted on the staging server? This is mostly for testing purposes. -# The default is False. -# cleanup : bool, default: False -# Whether to remove the dandiset folder path and nwb_folder_path. -# Defaults to False. -# """ -# nwb_folder_path = Path(nwb_folder_path) -# dandiset_folder_path = ( -# Path(mkdtemp(dir=nwb_folder_path.parent)) if dandiset_folder_path is None else dandiset_folder_path -# ) -# dandiset_path = dandiset_folder_path / dandiset_id -# assert os.getenv("DANDI_API_KEY"), ( -# "Unable to find environment variable 'DANDI_API_KEY'. " -# "Please retrieve your token from DANDI and set this environment variable." -# ) +# chunk_shape = (81920, 64) # -# url_base = "https://gui-staging.dandiarchive.org" if staging else "https://dandiarchive.org" -# dandiset_url = f"{url_base}/dandiset/{dandiset_id}/{version}" -# dandi_download(urls=dandiset_url, output_dir=str(dandiset_folder_path), get_metadata=True, get_assets=False) -# assert dandiset_path.exists(), "DANDI download failed!" +# conversion_options = dict() +# if stub_test: +# for data_interface_name in session_converter.data_interface_objects: +# if "Ap" in data_interface_name or "Lf" in data_interface_name: +# conversion_options.update( +# { +# data_interface_name: dict( +# progress_position=progress_position, +# stub_test=True, +# iterator_opts=dict(chunk_shape=chunk_shape), +# ) +# } +# ) +# else: +# for data_interface_name in session_converter.data_interface_objects: +# if "Ap" in data_interface_name or "Lf" in data_interface_name: +# conversion_options.update( +# { +# data_interface_name: dict( +# progress_position=progress_position, +# iterator_opts=dict(chunk_shape=chunk_shape), +# ) +# } +# ) # -# dandi_organize( -# paths=str(nwb_folder_path), -# dandiset_path=str(dandiset_path), -# update_external_file_paths=True, -# files_mode=files_mode, -# media_files_mode=files_mode, -# ) -# organized_nwbfiles = dandiset_path.rglob("*.nwb") +# metadata = session_converter.get_metadata() +# metadata["NWBFile"]["session_id"] = metadata["NWBFile"]["session_id"] + "-raw-only" # -# # DANDI has yet to implement forcing of session_id inclusion in organize step -# # This manually enforces it when only a single session per subject is organized -# for organized_nwbfile in organized_nwbfiles: -# if "ses" not in organized_nwbfile.stem: -# with NWBHDF5IO(path=organized_nwbfile, mode="r", load_namespaces=True) as io: -# nwbfile = io.read() -# session_id = nwbfile.session_id -# dandi_stem = organized_nwbfile.stem -# dandi_stem_split = dandi_stem.split("_") -# dandi_stem_split.insert(1, f"ses-{session_id}") -# corrected_name = "_".join(dandi_stem_split) + ".nwb" -# organized_nwbfile.rename(organized_nwbfile.parent / corrected_name) -# organized_nwbfiles = list(dandiset_path.rglob("*.nwb")) -# # The above block can be removed once they add the feature +# session_converter.run_conversion( +# nwbfile_path=nwbfile_path, +# metadata=metadata, +# conversion_options=conversion_options, +# overwrite=True, +# ) +# automatic_dandi_upload( +# dandiset_id="000409", nwb_folder_path=nwbfile_path.parent, cleanup=cleanup, files_mode=files_mode +# ) +# if cleanup: +# rmtree(cache_folder) +# rmtree(nwbfile_path.parent) # -# # If any external images -# image_folders = set(dandiset_path.rglob("*image*")) - set(organized_nwbfiles) -# for image_folder in image_folders: -# if "ses" not in image_folder.stem and len(organized_nwbfiles) == 1: # Think about multiple file case -# corrected_name = "_".join(dandi_stem_split) -# image_folder = image_folder.rename(image_folder.parent / corrected_name) +# return 1 +# except Exception as exception: +# error_file_path = base_path / "errors" / "9-7-23" / f"{session}_{progress_position}_error.txt" +# error_file_path.parent.mkdir(exist_ok=True) +# with open(file=error_file_path, mode="w") as file: +# file.write(f"{type(exception)}: {str(exception)}\n{traceback.format_exc()}") +# return 0 # -# # For image in folder, rename -# with NWBHDF5IO(path=organized_nwbfiles[0], mode="r+", load_namespaces=True) as io: -# nwbfile = io.read() -# for _, object in nwbfile.objects.items(): -# if isinstance(object, ImageSeries): -# this_external_file = image_folder / Path(str(object.external_file[0])).name -# corrected_name = "_".join(dandi_stem_split[:2]) + f"_{object.name}{this_external_file.suffix}" -# this_external_file = this_external_file.rename(this_external_file.parent / corrected_name) -# object.external_file[0] = "./" + str(this_external_file.relative_to(organized_nwbfile.parent)) # -# assert len(list(dandiset_path.iterdir())) > 1, "DANDI organize failed!" +# number_of_parallel_jobs = 6 +# base_path = Path("/home/jovyan/IBL") # prototype on DANDI Hub for now # -# dandi_instance = "dandi-staging" if staging else "dandi" -# dandi_upload(paths=[dandiset_folder_path / dandiset_id], dandi_instance=dandi_instance) +# session_retrieval_one = ONE( +# base_url="https://openalyx.internationalbrainlab.org", password="international", silent=True +# ) +# # brain_wide_sessions = session_retrieval_one.alyx.rest(url="sessions", action="list", tag="2022_Q4_IBL_et_al_BWM") +# brain_wide_sessions = ["c51f34d8-42f6-4c9c-bb5b-669fd9c42cd9"] # -# # Cleanup should be confirmed manually; Windows especially can complain -# if cleanup: -# try: -# rmtree(path=dandiset_folder_path) -# except PermissionError: # pragma: no cover -# warn("Unable to clean up source files and dandiset! Please manually delete them.", stacklevel=2) - - -def convert_and_upload_session( - base_path: Path, - session: str, - nwbfile_path: str, - stub_test: bool = False, - progress_position: int = 0, - cleanup: bool = False, - files_mode: str = "move", -): - try: - assert len(os.environ.get("DANDI_API_KEY", "")) > 0, "Run `export DANDI_API_KEY=...`!" - - # Download behavior and spike sorted data for this session - # session_path = base_path / "ibl_conversion" / f"{session}_{progress_position}" - cache_folder = base_path / "ibl_conversion" / f"{session}_{progress_position}" / "cache" - session_one = ONE( - base_url="https://openalyx.internationalbrainlab.org", - password="international", - silent=True, - cache_dir=cache_folder, - ) - - # Get stream names from SI - ap_stream_names = IblStreamingApInterface.get_stream_names(session=session) - lf_stream_names = IblStreamingLfInterface.get_stream_names(session=session) - - # Initialize as many of each interface as we need across the streams - data_interfaces = list() - for stream_name in ap_stream_names: - data_interfaces.append( - IblStreamingApInterface( - session=session, stream_name=stream_name, cache_folder=cache_folder / "ap_recordings" - ) - ) - for stream_name in lf_stream_names: - data_interfaces.append( - IblStreamingLfInterface( - session=session, stream_name=stream_name, cache_folder=cache_folder / "lf_recordings" - ) - ) - - # These interfaces may not be present; check if they are before adding to list - pose_estimation_files = session_one.list_datasets(eid=session, filename="*.dlc*") - for pose_estimation_file in pose_estimation_files: - camera_name = pose_estimation_file.replace("alf/_ibl_", "").replace(".dlc.pqt", "") - data_interfaces.append( - IblPoseEstimationInterface( - one=session_one, session=session, camera_name=camera_name, include_video=True, include_pose=False - ) - ) - - # Run conversion - session_converter = BrainwideMapConverter( - one=session_one, session=session, data_interfaces=data_interfaces, verbose=False - ) - - chunk_shape = (81920, 64) - - conversion_options = dict() - if stub_test: - for data_interface_name in session_converter.data_interface_objects: - if "Ap" in data_interface_name or "Lf" in data_interface_name: - conversion_options.update( - { - data_interface_name: dict( - progress_position=progress_position, - stub_test=True, - iterator_opts=dict(chunk_shape=chunk_shape), - ) - } - ) - else: - for data_interface_name in session_converter.data_interface_objects: - if "Ap" in data_interface_name or "Lf" in data_interface_name: - conversion_options.update( - { - data_interface_name: dict( - progress_position=progress_position, - iterator_opts=dict(chunk_shape=chunk_shape), - ) - } - ) - - metadata = session_converter.get_metadata() - metadata["NWBFile"]["session_id"] = metadata["NWBFile"]["session_id"] + "-raw-only" - - session_converter.run_conversion( - nwbfile_path=nwbfile_path, - metadata=metadata, - conversion_options=conversion_options, - overwrite=True, - ) - automatic_dandi_upload( - dandiset_id="000409", nwb_folder_path=nwbfile_path.parent, cleanup=cleanup, files_mode=files_mode - ) - if cleanup: - rmtree(cache_folder) - rmtree(nwbfile_path.parent) - - return 1 - except Exception as exception: - error_file_path = base_path / "errors" / "9-7-23" / f"{session}_{progress_position}_error.txt" - error_file_path.parent.mkdir(exist_ok=True) - with open(file=error_file_path, mode="w") as file: - file.write(f"{type(exception)}: {str(exception)}\n{traceback.format_exc()}") - return 0 - - -number_of_parallel_jobs = 6 -base_path = Path("/home/jovyan/IBL") # prototype on DANDI Hub for now - -session_retrieval_one = ONE( - base_url="https://openalyx.internationalbrainlab.org", password="international", silent=True -) -# brain_wide_sessions = session_retrieval_one.alyx.rest(url="sessions", action="list", tag="2022_Q4_IBL_et_al_BWM") -brain_wide_sessions = ["c51f34d8-42f6-4c9c-bb5b-669fd9c42cd9"] - -with ProcessPoolExecutor(max_workers=number_of_parallel_jobs) as executor: - with tqdm(total=len(brain_wide_sessions), position=0, desc="Converting sessions...") as main_progress_bar: - futures = [] - for progress_position, session in enumerate(brain_wide_sessions): - nwbfile_path = ( - base_path / "nwbfiles" / f"{session}_{progress_position}" / f"{session}_{progress_position}.nwb" - ) - nwbfile_path.parent.mkdir(exist_ok=True) - futures.append( - executor.submit( - convert_and_upload_session, - base_path=base_path, - session=session, - nwbfile_path=nwbfile_path, - progress_position=progress_position, - # stub_test=True, - # files_mode="copy", # useful when debugging - # cleanup=False, - ) - ) - for future in as_completed(futures): - status = future.result() - main_progress_bar.update(1) +# with ProcessPoolExecutor(max_workers=number_of_parallel_jobs) as executor: +# with tqdm(total=len(brain_wide_sessions), position=0, desc="Converting sessions...") as main_progress_bar: +# futures = [] +# for progress_position, session in enumerate(brain_wide_sessions): +# nwbfile_path = ( +# base_path / "nwbfiles" / f"{session}_{progress_position}" / f"{session}_{progress_position}.nwb" +# ) +# nwbfile_path.parent.mkdir(exist_ok=True) +# futures.append( +# executor.submit( +# convert_and_upload_session, +# base_path=base_path, +# session=session, +# nwbfile_path=nwbfile_path, +# progress_position=progress_position, +# # stub_test=True, +# # files_mode="copy", # useful when debugging +# # cleanup=False, +# ) +# ) +# for future in as_completed(futures): +# status = future.result() +# main_progress_bar.update(1) diff --git a/src/ibl_to_nwb/converters/_iblconverter.py b/src/ibl_to_nwb/converters/_iblconverter.py index bf1b7f8..454252c 100644 --- a/src/ibl_to_nwb/converters/_iblconverter.py +++ b/src/ibl_to_nwb/converters/_iblconverter.py @@ -31,6 +31,7 @@ def get_metadata(self) -> dict: session_metadata_list = self.one.alyx.rest(url="sessions", action="list", id=self.session) assert len(session_metadata_list) == 1, "More than one session metadata returned by query." session_metadata = session_metadata_list[0] + assert session_metadata["id"] == self.session, "Session metadata ID does not match the requested session ID." lab_metadata_list = self.one.alyx.rest("labs", "list", name=session_metadata["lab"]) assert len(lab_metadata_list) == 1, "More than one lab metadata returned by query." @@ -135,4 +136,6 @@ def run_conversion( nwbfile=nwbfile_out, metadata=metadata, **conversion_options.get(interface_name, dict()) ) + super().run_conversion() + return nwbfile_out