Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support digital channels on NIDQ interface and use TimeSeries instead of ElecricalSeries for analog channels #1152

Merged
merged 22 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
* Make `NWBMetaDataEncoder` public again [PR #1142](https://github.com/catalystneuro/neuroconv/pull/1142)
* Fix a bug where data in `DeepLabCutInterface` failed to write when `ndx-pose` was not imported. [#1144](https://github.com/catalystneuro/neuroconv/pull/1144)
* `SpikeGLXConverterPipe` converter now accepts multi-probe structures with multi-trigger and does not assume a specific folder structure [#1150](https://github.com/catalystneuro/neuroconv/pull/1150)
* `SpikeGLXNIDQInterface` is no longer written as an ElectricalSeries [#1152](https://github.com/catalystneuro/neuroconv/pull/1152)


## Features
* Propagate the `unit_electrode_indices` argument from the spikeinterface tools to `BaseSortingExtractorInterface`. This allows users to map units to the electrode table when adding sorting data [PR #1124](https://github.com/catalystneuro/neuroconv/pull/1124)
Expand All @@ -17,7 +19,10 @@
* `SpikeGLXRecordingInterface` now also accepts `folder_path` making its behavior equivalent to SpikeInterface [#1150](https://github.com/catalystneuro/neuroconv/pull/1150)
* Added the `rclone_transfer_batch_job` helper function for executing Rclone data transfers in AWS Batch jobs. [PR #1085](https://github.com/catalystneuro/neuroconv/pull/1085)
* Added the `deploy_neuroconv_batch_job` helper function for deploying NeuroConv AWS Batch jobs. [PR #1086](https://github.com/catalystneuro/neuroconv/pull/1086)
* YAML specification files now accept an outer keyword `upload_to_dandiset="< six-digit ID >"` to automatically upload the produced NWB files to the DANDI archive [PR #1089](https://github.com/catalystneuro/neuroconv/pull/1089)
* YAML specification files now accepts an outer keyword `upload_to_dandiset="< six-digit ID >"` to automatically upload the produced NWB files to the DANDI archive [PR #1089](https://github.com/catalystneuro/neuroconv/pull/1089)
*`SpikeGLXNIDQInterface` now handdles digital demuxed channels (`XD0`) [#1152](https://github.com/catalystneuro/neuroconv/pull/1152)




## Improvements
Expand Down
2 changes: 1 addition & 1 deletion docs/conversion_examples_gallery/recording/spikeglx.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ We can easily convert all data stored in the native SpikeGLX folder structure to
>>>
>>> folder_path = f"{ECEPHY_DATA_PATH}/spikeglx/Noise4Sam_g0"
>>> converter = SpikeGLXConverterPipe(folder_path=folder_path)
>>>
Source data is valid!
>>> # Extract what metadata we can from the source files
>>> metadata = converter.get_metadata()
>>> # For data provenance we add the time zone information to the conversion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,22 @@ def __init__(
Folder path containing the binary files of the SpikeGLX recording.
stream_id: str, optional
Stream ID of the SpikeGLX recording.
Examples are 'nidq', 'imec0.ap', 'imec0.lf', 'imec1.ap', 'imec1.lf', etc.
Examples are 'imec0.ap', 'imec0.lf', 'imec1.ap', 'imec1.lf', etc.
file_path : FilePathType
Path to .bin file. Point to .ap.bin for SpikeGLXRecordingInterface and .lf.bin for SpikeGLXLFPInterface.
verbose : bool, default: True
Whether to output verbose text.
es_key : str, the key to access the metadata of the ElectricalSeries.
"""

if stream_id == "nidq":
raise ValueError(
"SpikeGLXRecordingInterface is not designed to handle nidq files. Use SpikeGLXNIDQInterface instead"
)

if file_path is not None and stream_id is None:
self.stream_id = fetch_stream_id_for_spikelgx_file(file_path)
self.folder_path = Path(file_path).parent

else:
self.stream_id = stream_id
self.folder_path = Path(folder_path)
Expand Down
263 changes: 229 additions & 34 deletions src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxnidqinterface.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,35 @@
from pathlib import Path
from typing import Optional
from typing import Literal, Optional

import numpy as np
from pydantic import ConfigDict, DirectoryPath, FilePath, validate_call
from pynwb import NWBFile
from pynwb.base import TimeSeries

from .spikeglx_utils import get_session_start_time
from ..baserecordingextractorinterface import BaseRecordingExtractorInterface
from ....basedatainterface import BaseDataInterface
from ....tools.signal_processing import get_rising_frames_from_ttl
from ....utils import get_json_schema_from_method_signature
from ....tools.spikeinterface.spikeinterface import _recording_traces_to_hdmf_iterator
from ....utils import (
calculate_regular_series_rate,
get_json_schema_from_method_signature,
)


class SpikeGLXNIDQInterface(BaseRecordingExtractorInterface):
class SpikeGLXNIDQInterface(BaseDataInterface):
"""Primary data interface class for converting the high-pass (ap) SpikeGLX format."""

display_name = "NIDQ Recording"
keywords = BaseRecordingExtractorInterface.keywords + ("Neuropixels",)
keywords = ("Neuropixels", "nidq", "NIDQ", "SpikeGLX")
associated_suffixes = (".nidq", ".meta", ".bin")
info = "Interface for NIDQ board recording data."

ExtractorName = "SpikeGLXRecordingExtractor"
stream_id = "nidq"

@classmethod
def get_source_schema(cls) -> dict:
source_schema = get_json_schema_from_method_signature(method=cls.__init__, exclude=["x_pitch", "y_pitch"])
source_schema["properties"]["file_path"]["description"] = "Path to SpikeGLX .nidq file."
return source_schema

def _source_data_to_extractor_kwargs(self, source_data: dict) -> dict:

extractor_kwargs = source_data.copy()
extractor_kwargs["folder_path"] = self.folder_path
extractor_kwargs["stream_id"] = self.stream_id
return extractor_kwargs

@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def __init__(
self,
Expand All @@ -56,12 +52,18 @@ def __init__(
Path to .nidq.bin file.
verbose : bool, default: True
Whether to output verbose text.
load_sync_channel : bool, default: False
Whether to load the last channel in the stream, which is typically used for synchronization.
If True, then the probe is not loaded.
es_key : str, default: "ElectricalSeriesNIDQ"
"""

if load_sync_channel:
bendichter marked this conversation as resolved.
Show resolved Hide resolved
import warnings

warnings.warn(
"The load_sync_channel parameter is deprecated and will be removed in June 2025.",
DeprecationWarning,
stacklevel=2,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the behavior change? What should be done instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last channel of the ap and lf bands of SpikeGLX binaries are used to store synch data. This command does not make sense with the NIDQ interface because there is no such channel. To be honest, I don't know what role was playing here before.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved the warning to give more context.


if file_path is None and folder_path is None:
raise ValueError("Either 'file_path' or 'folder_path' must be provided.")

Expand All @@ -72,18 +74,36 @@ def __init__(
if folder_path is not None:
self.folder_path = Path(folder_path)

from spikeinterface.extractors import SpikeGLXRecordingExtractor

self.recording_extractor = SpikeGLXRecordingExtractor(
folder_path=self.folder_path,
stream_id="nidq",
all_annotations=True,
)

channel_ids = self.recording_extractor.get_channel_ids()
analog_channel_signatures = ["XA", "MA"]
self.analog_channel_ids = [ch for ch in channel_ids if "XA" in ch or "MA" in ch]
self.has_analog_channels = len(self.analog_channel_ids) > 0
self.has_digital_channels = len(self.analog_channel_ids) < len(channel_ids)
if self.has_digital_channels:
import ndx_events # noqa: F401
from spikeinterface.extractors import SpikeGLXEventExtractor

self.event_extractor = SpikeGLXEventExtractor(folder_path=self.folder_path)

super().__init__(
verbose=verbose,
load_sync_channel=load_sync_channel,
es_key=es_key,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should es_key still be here? "es" stands for ElectricalSeries

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think you are right.

At the moment, it is not clear how to pass metadata to this object but also there is not clear metadata to be passed. I discussed yesterday with a user that has this type of data about what could be useful for their use case and the only thing that came up is a description of the channels.

I will open another issue to discuss this.

Related, I really like the proposal that you had here:
#629

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I opened an issue here:
#1154

I would like to hear your thoughts on it when you have some time.

folder_path=self.folder_path,
file_path=file_path,
)
self.source_data.update(file_path=str(file_path))

self.recording_extractor.set_property(
key="group_name", values=["NIDQChannelGroup"] * self.recording_extractor.get_num_channels()
)
self.subset_channels = None

signal_info_key = (0, self.stream_id) # Key format is (segment_index, stream_id)
signal_info_key = (0, "nidq") # Key format is (segment_index, stream_id)
self._signals_info_dict = self.recording_extractor.neo_reader.signals_info_dict[signal_info_key]
self.meta = self._signals_info_dict["meta"]

Expand All @@ -101,24 +121,199 @@ def get_metadata(self) -> dict:
manufacturer="National Instruments",
)

# Add groups metadata
metadata["Ecephys"]["Device"] = [device]
metadata["Devices"] = [device]

metadata["Ecephys"]["ElectrodeGroup"][0].update(
name="NIDQChannelGroup", description="A group representing the NIDQ channels.", device=device["name"]
)
metadata["Ecephys"]["Electrodes"] = [
dict(name="group_name", description="Name of the ElectrodeGroup this electrode is a part of."),
]
metadata["Ecephys"]["ElectricalSeriesNIDQ"][
"description"
] = "Raw acquisition traces from the NIDQ (.nidq.bin) channels."
return metadata

def get_channel_names(self) -> list[str]:
"""Return a list of channel names as set in the recording extractor."""
return list(self.recording_extractor.get_channel_ids())

def add_to_nwbfile(
self,
nwbfile: NWBFile,
metadata: Optional[dict] = None,
stub_test: bool = False,
starting_time: Optional[float] = None,
write_as: Literal["raw", "lfp", "processed"] = "raw",
write_electrical_series: bool = True,
iterator_type: Optional[str] = "v2",
iterator_opts: Optional[dict] = None,
always_write_timestamps: bool = False,
):
"""
Add NIDQ board data to an NWB file, including both analog and digital channels if present.

Parameters
----------
nwbfile : NWBFile
The NWB file to which the NIDQ data will be added
metadata : Optional[dict], default: None
Metadata dictionary with device information. If None, uses default metadata
stub_test : bool, default: False
If True, only writes a small amount of data for testing
starting_time : Optional[float], default: None
DEPRECATED: Will be removed in June 2025. Starting time offset for the TimeSeries
write_as : Literal["raw", "lfp", "processed"], default: "raw"
DEPRECATED: Will be removed in June 2025. Specifies how to write the data
write_electrical_series : bool, default: True
DEPRECATED: Will be removed in June 2025. Whether to write electrical series data
iterator_type : Optional[str], default: "v2"
Type of iterator to use for data streaming
iterator_opts : Optional[dict], default: None
Additional options for the iterator
always_write_timestamps : bool, default: False
If True, always writes timestamps instead of using sampling rate
"""
import warnings

if starting_time is not None:
warnings.warn(
"The 'starting_time' parameter is deprecated and will be removed in June 2025.",
DeprecationWarning,
stacklevel=2,
)

if write_as != "raw":
warnings.warn(
"The 'write_as' parameter is deprecated and will be removed in June 2025.",
DeprecationWarning,
stacklevel=2,
)

if write_electrical_series is not True:
warnings.warn(
"The 'write_electrical_series' parameter is deprecated and will be removed in June 2025.",
DeprecationWarning,
stacklevel=2,
)
Copy link
Contributor

@bendichter bendichter Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each of these, it would be helpful to provide a bit more explanation about why it is being deprecated, and/or what the new behavior is, and/or what to do to get the same result instead of using this arg

Copy link
Collaborator Author

@h-mayorquin h-mayorquin Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right that's a good practice.

To add more context, those are conversion options that only made sense for a general electrical series and did not even make sense for the NIDQ interface. For example, the write_as is a general argument to decide whether we add this in acquisition or as a processing module but in this case we are sure NIDQ is acquisition so the choice does not make sense.

In other words, if someone played with those parameters for the nidq interface they were writing wrong data. I don't think that people did but just to be safe I ported the signature as it was so their code will not break immediately.

All of that said, I think that this type of information should be included in the warning. It is useful and it would have avoided this discussion at the review, my bad.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that makes sense. I just think we need a sentence or two alluding to this for users who are less familiar and may be copying/pasting old code

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I updated all the warning strings to add context and information. Take a look and let me know what you think.


if stub_test or self.subset_channels is not None:
recording = self.subset_recording(stub_test=stub_test)
else:
recording = self.recording_extractor

if metadata is None:
metadata = self.get_metadata()

# Add devices
device_metadata = metadata.get("Devices", [])
for device in device_metadata:
if device["name"] not in nwbfile.devices:
nwbfile.create_device(**device)

# Add analog and digital channels
if self.has_analog_channels:
self._add_analog_channels(
nwbfile=nwbfile,
recording=recording,
iterator_type=iterator_type,
iterator_opts=iterator_opts,
always_write_timestamps=always_write_timestamps,
)

if self.has_digital_channels:
self._add_digital_channels(nwbfile=nwbfile)

def _add_analog_channels(
self,
nwbfile: NWBFile,
recording,
iterator_type: Optional[str],
iterator_opts: Optional[dict],
always_write_timestamps: bool,
):
"""
Add analog channels from the NIDQ board to the NWB file.

Parameters
----------
nwbfile : NWBFile
The NWB file to add the analog channels to
recording : BaseRecording
The recording extractor containing the analog channels
iterator_type : Optional[str]
Type of iterator to use for data streaming
iterator_opts : Optional[dict]
Additional options for the iterator
always_write_timestamps : bool
If True, always writes timestamps instead of using sampling rate
"""
analog_recorder = recording.select_channels(channel_ids=self.analog_channel_ids)
channel_names = analog_recorder.get_property(key="channel_names")
segment_index = 0
analog_data_iterator = _recording_traces_to_hdmf_iterator(
recording=analog_recorder,
segment_index=segment_index,
iterator_type=iterator_type,
iterator_opts=iterator_opts,
)

name = "TimeSeriesNIDQ"
description = f"Analog data from the NIDQ board. Channels are {channel_names} in that order."
time_series_kwargs = dict(name=name, data=analog_data_iterator, unit="a.u.", description=description)

if always_write_timestamps:
timestamps = recording.get_times(segment_index=segment_index)
shifted_timestamps = timestamps
time_series_kwargs.update(timestamps=shifted_timestamps)
else:
recording_has_timestamps = recording.has_time_vector(segment_index=segment_index)
if recording_has_timestamps:
timestamps = recording.get_times(segment_index=segment_index)
rate = calculate_regular_series_rate(series=timestamps)
recording_t_start = timestamps[0]
else:
rate = recording.get_sampling_frequency()
recording_t_start = recording._recording_segments[segment_index].t_start or 0

if rate:
starting_time = float(recording_t_start)
time_series_kwargs.update(starting_time=starting_time, rate=recording.get_sampling_frequency())
else:
shifted_timestamps = timestamps
time_series_kwargs.update(timestamps=shifted_timestamps)

time_series = TimeSeries(**time_series_kwargs)
nwbfile.add_acquisition(time_series)

def _add_digital_channels(self, nwbfile: NWBFile):
"""
Add digital channels from the NIDQ board to the NWB file as events.

Parameters
----------
nwbfile : NWBFile
The NWB file to add the digital channels to
"""
from ndx_events import LabeledEvents

event_channels = self.event_extractor.channel_ids
for channel_id in event_channels:
events_structure = self.event_extractor.get_events(channel_id=channel_id)
timestamps = events_structure["time"]
labels = events_structure["label"]

# Some channels have no events
if timestamps.size > 0:

# Timestamps are not ordered, the ones for off are first and then the ones for on
ordered_indices = np.argsort(timestamps)
ordered_timestamps = timestamps[ordered_indices]
ordered_labels = labels[ordered_indices]

unique_labels = np.unique(ordered_labels)
label_to_index = {label: index for index, label in enumerate(unique_labels)}
data = [label_to_index[label] for label in ordered_labels]

channel_name = channel_id.split("#")[-1]
description = f"On and Off Events from channel {channel_name}"
name = f"EventsNIDQDigitalChannel{channel_name}"
labeled_events = LabeledEvents(
name=name, description=description, timestamps=ordered_timestamps, data=data, labels=unique_labels
)
nwbfile.add_acquisition(labeled_events)

def get_event_times_from_ttl(self, channel_name: str) -> np.ndarray:
"""
Return the start of event times from the rising part of TTL pulses on one of the NIDQ channels.
Expand Down
4 changes: 4 additions & 0 deletions src/neuroconv/tools/testing/mock_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ def __init__(
"""
from spikeinterface.extractors import NumpyRecording

self.has_analog_channels = True
self.has_digital_channels = False

if ttl_times is None:
# Begin in 'off' state
number_of_periods = int(np.ceil((signal_duration - ttl_duration) / (ttl_duration * 2)))
Expand All @@ -127,6 +130,7 @@ def __init__(
number_of_channels = len(ttl_times)
channel_ids = [f"nidq#XA{channel_index}" for channel_index in range(number_of_channels)] # NIDQ channel IDs
channel_groups = ["NIDQChannelGroup"] * number_of_channels
self.analog_channel_ids = channel_ids

sampling_frequency = 25_000.0 # NIDQ sampling rate
number_of_frames = int(signal_duration * sampling_frequency)
Expand Down
Loading
Loading