From 12bfb8b734312ca7d3ee30c732ed05df0cd83f6e Mon Sep 17 00:00:00 2001 From: juhuntenburg Date: Tue, 21 Sep 2021 16:45:04 +0100 Subject: [PATCH 01/18] Compute rms from samples --- ibllib/ephys/ephysqc.py | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/ibllib/ephys/ephysqc.py b/ibllib/ephys/ephysqc.py index bc704d035..806d9125b 100644 --- a/ibllib/ephys/ephysqc.py +++ b/ibllib/ephys/ephysqc.py @@ -8,6 +8,7 @@ import numpy as np import pandas as pd from scipy import signal +from tqdm import tqdm import one.alf.io as alfio from iblutil.util import Bunch @@ -108,6 +109,34 @@ def extract_rmsmap(fbin, out_folder=None, overwrite=False): return out_time + out_freq +def extract_rms_samples(fbin, sample_length=1., sample_spacing=120, overwrite=False): + """ + Calculates RMS as a quality metric for samples of sample_length at sample_spacing from + a raw binary ephys file. Returns vector of median RMS per channel + :param fbin: + :param sample_length: + :param sample_spacing: + :param overwrite: + :return: rms_vector + """ + _logger.info(f"Computing QC for {fbin}") + sglx = spikeglx.Reader(fbin) + rl = sglx.ns / sglx.fs + t0s = np.arange(0, rl - sample_length, sample_spacing) + rms_matrix = np.zeros((sglx.nc - 1, t0s.shape[0])) + for i, t0 in enumerate(tqdm(t0s)): + # Get samples of sample_length every sample_spacing + sl = slice(int(t0 * sglx.fs), int((t0 + sample_length) * sglx.fs)) + raw = sglx[sl, :-1].T + # Run preprocessing on these samples + destripe = dsp.destripe(raw, fs=sglx.fs, neuropixel_version=1) + # Calculate rms for each channel + rms_matrix[:, i] = dsp.rms(destripe) + + rms_vector = np.nanmedian(rms_matrix, axis=1) + return rms_vector + + def raw_qc_session(session_path, overwrite=False): """ Wrapper that exectutes QC from a session folder and outputs the results whithin the same folder @@ -120,7 +149,7 @@ def raw_qc_session(session_path, overwrite=False): qc_files = [] for efile in efiles: if efile.get('ap') and efile.ap.exists(): - qc_files.extend(extract_rmsmap(efile.ap, out_folder=None, overwrite=overwrite)) + rms_channels = extract_rms_samples(efile.ap, overwrite=overwrite) if efile.get('lf') and efile.lf.exists(): qc_files.extend(extract_rmsmap(efile.lf, out_folder=None, overwrite=overwrite)) return qc_files From dbfcdb41cc49bf636e3162ce934c06980e5857a2 Mon Sep 17 00:00:00 2001 From: olivier Date: Tue, 21 Sep 2021 18:55:12 +0100 Subject: [PATCH 02/18] WIP stream ephys data loader --- brainbox/io/spikeglx.py | 58 +++++++++++++++++++++++++++++++++++++++-- ibllib/io/spikeglx.py | 11 ++++---- 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/brainbox/io/spikeglx.py b/brainbox/io/spikeglx.py index 287fb9d54..ee3f77765 100644 --- a/brainbox/io/spikeglx.py +++ b/brainbox/io/spikeglx.py @@ -1,9 +1,14 @@ +import logging +from pathlib import Path import time +import shutil + import numpy as np -# (Previously required `os.path` to get file info before memmapping) -# import os.path as op + from ibllib.io import spikeglx +_logger = logging.getLogger('ibllib') + def extract_waveforms(ephys_file, ts, ch, t=2.0, sr=30000, n_ch_probe=385, car=True): """ @@ -100,3 +105,52 @@ def extract_waveforms(ephys_file, ts, ch, t=2.0, sr=30000, n_ch_probe=385, car=T print('Done. ({})'.format(time.ctime())) return waveforms + + +def stream(pid, t0, nsecs=1, one=None, cache_folder=None, dsets=None, typ='ap'): + """ + NB: returned Reader object must be closed after use + :param pid: Probe UUID + :param t0: time of the first sample + :param nsecs: duration of the streamed data + :param one: An instance of ONE + :param cache_folder: + :param typ: 'ap' or 'lf' + :return: sr, dsets, t0 + """ + CHUNK_DURATION_SECS = 1 # the mtscomp chunk duration. Right now it's a constant + if nsecs > 10: + ValueError(f'Streamer works only with 10 or less seconds, set nsecs to lesss than {nsecs}') + assert one + assert typ in ['lf', 'ap'] + t0 = np.floor(t0 / CHUNK_DURATION_SECS) * CHUNK_DURATION_SECS + if cache_folder is None: + samples_folder = Path(one.alyx._par.CACHE_DIR).joinpath('cache', typ) + sample_file_name = Path(f"{pid}_{str(int(t0)).zfill(5)}.meta") + + if samples_folder.joinpath(sample_file_name).exists(): + _logger.info(f'loading {sample_file_name} from cache') + sr = spikeglx.Reader(samples_folder.joinpath(sample_file_name).with_suffix('.bin'), + open=True) + return sr, t0 + + eid, pname = one.pid2eid(pid) + cbin_rec = one.list_datasets(eid, collection=f"*{pname}", filename='*ap.*bin', details=True) + ch_rec = one.list_datasets(eid, collection=f"*{pname}", filename='*ap.ch', details=True) + meta_rec = one.list_datasets(eid, collection=f"*{pname}", filename='*ap.meta', details=True) + ch_file = one._download_datasets(ch_rec)[0] + meta_file = one._download_datasets(meta_rec)[0] + + first_chunk = int(t0 / CHUNK_DURATION_SECS) + last_chunk = int((t0 + nsecs) / CHUNK_DURATION_SECS) - 1 + + samples_folder.mkdir(exist_ok=True, parents=True) + sr = spikeglx.download_raw_partial( + one=one, + url_cbin=one.record2url(cbin_rec), + url_ch=ch_file, + first_chunk=first_chunk, + last_chunk=last_chunk, + cache_dir=samples_folder) + + return sr, t0 diff --git a/ibllib/io/spikeglx.py b/ibllib/io/spikeglx.py index bcd0bb827..5ebb1cf39 100644 --- a/ibllib/io/spikeglx.py +++ b/ibllib/io/spikeglx.py @@ -634,7 +634,7 @@ def get_sync_map(folder_ephys): return _sync_map_from_hardware_config(hc) -def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None): +def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None, cache_dir=None): """ TODO Document :param url_cbin: @@ -645,10 +645,11 @@ def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None """ assert str(url_cbin).endswith('.cbin') assert str(url_ch).endswith('.ch') - webclient = (one or ONE()).alyx - + one = one or ONE() + webclient = one.alyx + cache_dir = cache_dir or webclient.cache_dir relpath = Path(url_cbin.replace(webclient._par.HTTP_DATA_SERVER, '.')).parents[0] - target_dir = Path(webclient.cache_dir, relpath) + target_dir = Path(cache_dir, relpath) Path(target_dir).mkdir(parents=True, exist_ok=True) # First, download the .ch file if necessary @@ -658,7 +659,7 @@ def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None ch_file = Path(webclient.download_file( url_ch, cache_dir=target_dir, clobber=True, return_md5=False)) ch_file = remove_uuid_file(ch_file) - ch_file_stream = ch_file.with_suffix('.stream.ch') + ch_file_stream = target_dir.joinpath(ch_file.name).with_suffix('.stream.ch') # Load the .ch file. with open(ch_file, 'r') as f: From 6235418596eeef6042e5a36df3710b6b72993b1b Mon Sep 17 00:00:00 2001 From: olivier Date: Wed, 22 Sep 2021 13:29:47 +0100 Subject: [PATCH 03/18] streaming binary data with new ONE --- brainbox/io/spikeglx.py | 3 +-- ibllib/io/spikeglx.py | 8 ++++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/brainbox/io/spikeglx.py b/brainbox/io/spikeglx.py index ee3f77765..99181fb3a 100644 --- a/brainbox/io/spikeglx.py +++ b/brainbox/io/spikeglx.py @@ -1,7 +1,6 @@ import logging from pathlib import Path import time -import shutil import numpy as np @@ -139,7 +138,7 @@ def stream(pid, t0, nsecs=1, one=None, cache_folder=None, dsets=None, typ='ap'): ch_rec = one.list_datasets(eid, collection=f"*{pname}", filename='*ap.ch', details=True) meta_rec = one.list_datasets(eid, collection=f"*{pname}", filename='*ap.meta', details=True) ch_file = one._download_datasets(ch_rec)[0] - meta_file = one._download_datasets(meta_rec)[0] + one._download_datasets(meta_rec)[0] first_chunk = int(t0 / CHUNK_DURATION_SECS) last_chunk = int((t0 + nsecs) / CHUNK_DURATION_SECS) - 1 diff --git a/ibllib/io/spikeglx.py b/ibllib/io/spikeglx.py index 5ebb1cf39..d1efe9a12 100644 --- a/ibllib/io/spikeglx.py +++ b/ibllib/io/spikeglx.py @@ -713,7 +713,11 @@ def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None cbin_local_path.replace(cbin_local_path_renamed) assert cbin_local_path_renamed.exists() - shutil.copy(cbin_local_path.with_suffix('.meta'), - cbin_local_path_renamed.with_suffix('.meta')) + # handles the meta file + meta_local_path = cbin_local_path.with_suffix('.meta') + if not meta_local_path.exists: + shutil.copy(cbin_local_path.with_suffix('.meta'), + cbin_local_path_renamed.with_suffix('.meta')) + reader = Reader(cbin_local_path_renamed) return reader From 7350e8255ac27377b9d2c631e408fc0b86ba2374 Mon Sep 17 00:00:00 2001 From: ibladmin Date: Thu, 23 Sep 2021 14:37:06 +0100 Subject: [PATCH 04/18] bugfixes stream ap files --- brainbox/io/spikeglx.py | 2 +- ibllib/io/spikeglx.py | 15 +++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/brainbox/io/spikeglx.py b/brainbox/io/spikeglx.py index 99181fb3a..3576210b1 100644 --- a/brainbox/io/spikeglx.py +++ b/brainbox/io/spikeglx.py @@ -146,7 +146,7 @@ def stream(pid, t0, nsecs=1, one=None, cache_folder=None, dsets=None, typ='ap'): samples_folder.mkdir(exist_ok=True, parents=True) sr = spikeglx.download_raw_partial( one=one, - url_cbin=one.record2url(cbin_rec), + url_cbin=one.record2url(cbin_rec)[0], url_ch=ch_file, first_chunk=first_chunk, last_chunk=last_chunk, diff --git a/ibllib/io/spikeglx.py b/ibllib/io/spikeglx.py index d1efe9a12..255c20493 100644 --- a/ibllib/io/spikeglx.py +++ b/ibllib/io/spikeglx.py @@ -660,7 +660,7 @@ def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None url_ch, cache_dir=target_dir, clobber=True, return_md5=False)) ch_file = remove_uuid_file(ch_file) ch_file_stream = target_dir.joinpath(ch_file.name).with_suffix('.stream.ch') - + # Load the .ch file. with open(ch_file, 'r') as f: cmeta = json.load(f) @@ -668,7 +668,12 @@ def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None # Get the first byte and number of bytes to download. i0 = cmeta['chunk_bounds'][first_chunk] ns_stream = cmeta['chunk_bounds'][last_chunk + 1] - i0 - + + # handles the meta file + meta_local_path = ch_file_stream.with_suffix('.meta') + if not meta_local_path.exists(): + shutil.copy(ch_file.with_suffix('.meta'), meta_local_path) + # if the cached version happens to be the same as the one on disk, just load it if ch_file_stream.exists(): with open(ch_file_stream, 'r') as f: @@ -713,11 +718,5 @@ def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None cbin_local_path.replace(cbin_local_path_renamed) assert cbin_local_path_renamed.exists() - # handles the meta file - meta_local_path = cbin_local_path.with_suffix('.meta') - if not meta_local_path.exists: - shutil.copy(cbin_local_path.with_suffix('.meta'), - cbin_local_path_renamed.with_suffix('.meta')) - reader = Reader(cbin_local_path_renamed) return reader From ffc683fc2fa8d479fda9ed70bc713089bd3e1eab Mon Sep 17 00:00:00 2001 From: juhuntenburg Date: Fri, 24 Sep 2021 12:10:22 +0100 Subject: [PATCH 05/18] WIP ephysqc --- ibllib/ephys/ephysqc.py | 72 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 69 insertions(+), 3 deletions(-) diff --git a/ibllib/ephys/ephysqc.py b/ibllib/ephys/ephysqc.py index 806d9125b..908dd1737 100644 --- a/ibllib/ephys/ephysqc.py +++ b/ibllib/ephys/ephysqc.py @@ -13,9 +13,11 @@ from iblutil.util import Bunch from brainbox.metrics.single_units import spike_sorting_metrics +from brainbox.io.spikeglx import stream as sglx_streamer from ibllib.ephys import sync_probes from ibllib.io import spikeglx import ibllib.dsp as dsp +from ibllib.qc import base from ibllib.io.extractors import ephys_fpga, training_wheel from ibllib.misc import print_progress from phylib.io import model @@ -27,6 +29,72 @@ WELCH_WIN_LENGTH_SAMPLES = 1024 NCH_WAVEFORMS = 32 # number of channels to be saved in templates.waveforms and channels.waveforms +# class EphysQC(base.QC): +# """A class for computing Ephys QC metrics""" +# +# dstypes = ['ephysData.raw.lf', +# 'ephysData.raw.ap'] +# +# def __init__(self, probe_id, **kwargs): +# super().__init__(probe_id, endpoint='insertions,'**kwargs) +# self.pid = probe_id +# self.stream = kwargs.pop('stream', None) +# keys = ('sglx') +# self.data = Bunch.fromkeys(keys) +# self.metrics = None +# self.outcome = 'NOT_SET' +# +# +# def load_data(self, stream: bool = None) -> None: +# # If stream is explicitly given here, overwrite init +# if stream is not None: +# self.stream = stream +# self._ensure_required_data() +# _logger.info('Gathering data for QC') +# +# # Find fbin +# fbin = +# +# # Load spikeglx reader or streamer +# if self.stream is True: +# self.data['sglx'] = sglx_streamer(fbin) +# else: +# self.data['sglx'] = spikeglx.Reader(fbin) +# +# +# def _ensure_required_data(self): +# """ +# Ensures the datasets required for QC are available locally or remotely. +# """ +# assert self.one is not None, 'ONE instance is required to ensure required data' +# eid, pname = self.one.pid2eid(self.pid) +# session_path = self.one.eid2path(eid) +# # Check if data is available locally +# +# +# +# for dstype in self.dstypes: +# dataset = self.one.alyx.rest('datasets', 'list', session=eid, dataset_type=dstype, +# collection=f'raw_ephys_data/{pname}') +# # check that there is any dataset +# assert len(dataset) != 0, f'No dataset {dstype} in database' +# # check if dataset is local +# session_path.rglob(d), None) for d in datasets['rel_path']) +# +# # If not, and stream is not True, download. +# +# +# def run(self, update: bool = False, **kwargs) -> (str, dict): +# efiles = spikeglx.glob_ephys_files(session_path) +# qc_files = [] +# if efile.get('ap') and efile.ap.exists(): +# rms_channels = extract_rms_samples(efile.ap, overwrite=overwrite) +# if efile.get('lf') and efile.lf.exists(): +# qc_files.extend(extract_rmsmap(efile.lf, out_folder=None, overwrite=overwrite)) +# +# +# + def rmsmap(fbin): """ @@ -109,7 +177,7 @@ def extract_rmsmap(fbin, out_folder=None, overwrite=False): return out_time + out_freq -def extract_rms_samples(fbin, sample_length=1., sample_spacing=120, overwrite=False): +def extract_rms_samples(sglx, sample_length=1., sample_spacing=120, overwrite=False): """ Calculates RMS as a quality metric for samples of sample_length at sample_spacing from a raw binary ephys file. Returns vector of median RMS per channel @@ -119,8 +187,6 @@ def extract_rms_samples(fbin, sample_length=1., sample_spacing=120, overwrite=Fa :param overwrite: :return: rms_vector """ - _logger.info(f"Computing QC for {fbin}") - sglx = spikeglx.Reader(fbin) rl = sglx.ns / sglx.fs t0s = np.arange(0, rl - sample_length, sample_spacing) rms_matrix = np.zeros((sglx.nc - 1, t0s.shape[0])) From e60942273c1d46e17415fc0bccbc8fae33824bb1 Mon Sep 17 00:00:00 2001 From: juhuntenburg Date: Tue, 28 Sep 2021 13:05:09 +0100 Subject: [PATCH 06/18] WIP, ensure required data --- ibllib/ephys/ephysqc.py | 168 +++++++++++++++++++++------------------- 1 file changed, 87 insertions(+), 81 deletions(-) diff --git a/ibllib/ephys/ephysqc.py b/ibllib/ephys/ephysqc.py index 908dd1737..d5ee92dfd 100644 --- a/ibllib/ephys/ephysqc.py +++ b/ibllib/ephys/ephysqc.py @@ -28,62 +28,64 @@ RMS_WIN_LENGTH_SECS = 3 WELCH_WIN_LENGTH_SAMPLES = 1024 NCH_WAVEFORMS = 32 # number of channels to be saved in templates.waveforms and channels.waveforms +BATCHES_SPACING = 300 +TMIN = 40 +SAMPLE_LENGTH = 1 + + +class EphysQC(base.QC): + """A class for computing Ephys QC metrics""" + + dstypes = ['ephysData.raw.lf', + 'ephysData.raw.ap'] + + def __init__(self, probe_id, **kwargs): + super().__init__(probe_id, endpoint='insertions', **kwargs) + self.pid = probe_id + self.stream = kwargs.pop('stream', None) + keys = () + self.data = Bunch.fromkeys(keys) + self.metrics = None + self.outcome = 'NOT_SET' + + def _ensure_required_data(self): + """ + Ensures the datasets required for QC are available locally or remotely. + """ + assert self.one is not None, 'ONE instance is required to ensure required data' + eid, pname = self.one.pid2eid(self.pid) + probe_path = self.one.eid2path(eid).joinpath('raw_ephys_data', pname) + # Check if there is at least one, but not more than one metafile of each type available locally + meta_files = list(probe_path.rglob(f'*.meta')) + assert len(meta_files) != 0, f'No meta files in raw_ephys_data/{pname} folder for session {eid}' + ap_meta = [meta for meta in meta_files if 'ap.meta' in meta.name] + assert not len(ap_meta) > 1, f'More than one ap.meta file in raw_ephys_data/{pname} folder for session {eid}. ' \ + f'Remove any redundant files to run QC' + lf_meta = [meta for meta in meta_files if 'lf.meta' in meta.name] + assert not len(lf_meta) > 1, f'More than one lf.meta file in raw_ephys_data/{pname} folder for session {eid}. ' \ + f'Remove any redundant files to run QC' + + # def load_data(self, stream: bool = None) -> None: + # # If stream is explicitly given here, overwrite init value + # if stream is not None: + # self.stream = stream + # self._ensure_required_data() + # for type in ['ap', 'lf']: + # # Metafile should always be present + # meta_files = probe_path.rglob(f'*{type}.meta') + # if len(meta_files) == 0: + # _logger.warning(f'No {type}.meta file in raw_ephys_data/{pname} folder for session {eid}. ' + # f'Skipping QC for this data.') + # elif len(meta_files) > 1: + # _logger.warning(f'More than one {type}.meta file in raw_ephys_data/{pname} folder for session {eid}. ' + # f'Skipping QC for this data. Please remove redundant {type}.meta files.') + # else: + # meta_file = meta_files[0] + # self.data[f'{type}_meta'] = spikeglx.read_meta_data(meta_file) + # bin_file = next(meta_file.parent.glob(f'*{type}.*bin'), None) + # + # _logger.info('Gathering data for QC') -# class EphysQC(base.QC): -# """A class for computing Ephys QC metrics""" -# -# dstypes = ['ephysData.raw.lf', -# 'ephysData.raw.ap'] -# -# def __init__(self, probe_id, **kwargs): -# super().__init__(probe_id, endpoint='insertions,'**kwargs) -# self.pid = probe_id -# self.stream = kwargs.pop('stream', None) -# keys = ('sglx') -# self.data = Bunch.fromkeys(keys) -# self.metrics = None -# self.outcome = 'NOT_SET' -# -# -# def load_data(self, stream: bool = None) -> None: -# # If stream is explicitly given here, overwrite init -# if stream is not None: -# self.stream = stream -# self._ensure_required_data() -# _logger.info('Gathering data for QC') -# -# # Find fbin -# fbin = -# -# # Load spikeglx reader or streamer -# if self.stream is True: -# self.data['sglx'] = sglx_streamer(fbin) -# else: -# self.data['sglx'] = spikeglx.Reader(fbin) -# -# -# def _ensure_required_data(self): -# """ -# Ensures the datasets required for QC are available locally or remotely. -# """ -# assert self.one is not None, 'ONE instance is required to ensure required data' -# eid, pname = self.one.pid2eid(self.pid) -# session_path = self.one.eid2path(eid) -# # Check if data is available locally -# -# -# -# for dstype in self.dstypes: -# dataset = self.one.alyx.rest('datasets', 'list', session=eid, dataset_type=dstype, -# collection=f'raw_ephys_data/{pname}') -# # check that there is any dataset -# assert len(dataset) != 0, f'No dataset {dstype} in database' -# # check if dataset is local -# session_path.rglob(d), None) for d in datasets['rel_path']) -# -# # If not, and stream is not True, download. -# -# # def run(self, update: bool = False, **kwargs) -> (str, dict): # efiles = spikeglx.glob_ephys_files(session_path) # qc_files = [] @@ -92,8 +94,38 @@ # if efile.get('lf') and efile.lf.exists(): # qc_files.extend(extract_rmsmap(efile.lf, out_folder=None, overwrite=overwrite)) # +# def extract_rms_samples(sglx, sample_length=SAMPLE_LENGTH, sample_spacing=BATCHES_SPACING, tmin=TMIN, overwrite=False): +# """ +# Calculates RMS as a quality metric for samples of sample_length at sample_spacing from +# a raw binary ephys file, before and after destriping. Returns median RMS per channel +# """ +# rl = metadata.fileTimeSecs +# nc = spikeglx._get_nchannels_from_meta(metadata) +# rl = sglx.ns / sglx.fs +# t0s = np.arange(tmin, rl - sample_length, sample_spacing) +# rms_full = np.zeros((2, sglx.nc - 1, t0s.shape[0])) +# if isinstance(sglx, sglx_streamer): +# _logger.warning("Streaming data for RMS samples") +# for i, t0 in enumerate(tqdm(t0s)): +# sr, _ = stream(pid, t0=t0, nsecs=1, one=one) +# raw = sr[:, :-1].T +# destripe = voltage.destripe(raw, fs=sr.fs, neuropixel_version=1) +# all_rms[0, :, i] = rms(raw) +# all_rms[1, :, i] = rms(destripe) # +# elif isinstance(sglx, spikeglx.Reader()): +# for i, t0 in enumerate(tqdm(t0s)): +# # Get samples of sample_length every sample_spacing +# sl = slice(int(t0 * sglx.fs), int((t0 + sample_length) * sglx.fs)) +# raw = sglx[sl, :-1].T +# # Run preprocessing on these samples +# destripe = dsp.destripe(raw, fs=sglx.fs, neuropixel_version=1) +# # Calculate rms for each channel for raw and destriped data +# rms_full[0, :, i] = dsp.rms(raw) +# rms_full[1, :, i] = dsp.rms(destripe) # +# rms_vector = np.nanmedian(rms_matrix, axis=1) +# return rms_vector def rmsmap(fbin): @@ -177,32 +209,6 @@ def extract_rmsmap(fbin, out_folder=None, overwrite=False): return out_time + out_freq -def extract_rms_samples(sglx, sample_length=1., sample_spacing=120, overwrite=False): - """ - Calculates RMS as a quality metric for samples of sample_length at sample_spacing from - a raw binary ephys file. Returns vector of median RMS per channel - :param fbin: - :param sample_length: - :param sample_spacing: - :param overwrite: - :return: rms_vector - """ - rl = sglx.ns / sglx.fs - t0s = np.arange(0, rl - sample_length, sample_spacing) - rms_matrix = np.zeros((sglx.nc - 1, t0s.shape[0])) - for i, t0 in enumerate(tqdm(t0s)): - # Get samples of sample_length every sample_spacing - sl = slice(int(t0 * sglx.fs), int((t0 + sample_length) * sglx.fs)) - raw = sglx[sl, :-1].T - # Run preprocessing on these samples - destripe = dsp.destripe(raw, fs=sglx.fs, neuropixel_version=1) - # Calculate rms for each channel - rms_matrix[:, i] = dsp.rms(destripe) - - rms_vector = np.nanmedian(rms_matrix, axis=1) - return rms_vector - - def raw_qc_session(session_path, overwrite=False): """ Wrapper that exectutes QC from a session folder and outputs the results whithin the same folder From f675451a859350034c8d1343f21a67bd5cad5fa2 Mon Sep 17 00:00:00 2001 From: juhuntenburg Date: Tue, 28 Sep 2021 13:22:31 +0100 Subject: [PATCH 07/18] WIP: load data --- ibllib/ephys/ephysqc.py | 56 +++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/ibllib/ephys/ephysqc.py b/ibllib/ephys/ephysqc.py index d5ee92dfd..f1f493496 100644 --- a/ibllib/ephys/ephysqc.py +++ b/ibllib/ephys/ephysqc.py @@ -43,7 +43,7 @@ def __init__(self, probe_id, **kwargs): super().__init__(probe_id, endpoint='insertions', **kwargs) self.pid = probe_id self.stream = kwargs.pop('stream', None) - keys = () + keys = ('ap', 'ap_meta', 'lf', 'lf_meta') self.data = Bunch.fromkeys(keys) self.metrics = None self.outcome = 'NOT_SET' @@ -54,37 +54,33 @@ def _ensure_required_data(self): """ assert self.one is not None, 'ONE instance is required to ensure required data' eid, pname = self.one.pid2eid(self.pid) - probe_path = self.one.eid2path(eid).joinpath('raw_ephys_data', pname) - # Check if there is at least one, but not more than one metafile of each type available locally - meta_files = list(probe_path.rglob(f'*.meta')) - assert len(meta_files) != 0, f'No meta files in raw_ephys_data/{pname} folder for session {eid}' + self.probe_path = self.one.eid2path(eid).joinpath('raw_ephys_data', pname) + # Check if there is at least one meta file available + meta_files = list(self.probe_path.rglob(f'*.meta')) + assert len(meta_files) != 0, f'No meta files in {self.probe_path}' + # Check if there is no more than one meta file per type ap_meta = [meta for meta in meta_files if 'ap.meta' in meta.name] - assert not len(ap_meta) > 1, f'More than one ap.meta file in raw_ephys_data/{pname} folder for session {eid}. ' \ - f'Remove any redundant files to run QC' + assert not len(ap_meta) > 1, f'More than one ap.meta file in {self.probe_path}. Remove redundant files to run QC' lf_meta = [meta for meta in meta_files if 'lf.meta' in meta.name] - assert not len(lf_meta) > 1, f'More than one lf.meta file in raw_ephys_data/{pname} folder for session {eid}. ' \ - f'Remove any redundant files to run QC' - - # def load_data(self, stream: bool = None) -> None: - # # If stream is explicitly given here, overwrite init value - # if stream is not None: - # self.stream = stream - # self._ensure_required_data() - # for type in ['ap', 'lf']: - # # Metafile should always be present - # meta_files = probe_path.rglob(f'*{type}.meta') - # if len(meta_files) == 0: - # _logger.warning(f'No {type}.meta file in raw_ephys_data/{pname} folder for session {eid}. ' - # f'Skipping QC for this data.') - # elif len(meta_files) > 1: - # _logger.warning(f'More than one {type}.meta file in raw_ephys_data/{pname} folder for session {eid}. ' - # f'Skipping QC for this data. Please remove redundant {type}.meta files.') - # else: - # meta_file = meta_files[0] - # self.data[f'{type}_meta'] = spikeglx.read_meta_data(meta_file) - # bin_file = next(meta_file.parent.glob(f'*{type}.*bin'), None) - # - # _logger.info('Gathering data for QC') + assert not len(lf_meta) > 1, f'More than one lf.meta file in {self.probe_path}. Remove redundant files to run QC' + + def load_data(self, stream: bool = None) -> None: + # If stream is explicitly given here, overwrite init value + if stream is not None: + self.stream = stream + self._ensure_required_data() + + _logger.info('Gathering data for QC') + # Load metadata and, if locally present, bin file + for dstype in ['ap', 'lf']: + # We already checked that there is not more than one meta file per type + meta_file = next(self.probe_path.rglob(f'*{dstype}.meta'), None) + if meta_file is None: + _logger.warning(f'No {dstype}.meta file in {self.probe_path}, skipping QC for {dstype} data.') + else: + self.data[f'{dstype}_meta'] = spikeglx.read_meta_data(meta_file) + bin_file = next(meta_file.parent.glob(f'*{type}.*bin'), None) + self.data[f'{dstype}'] = spikeglx.Reader(bin_file, open=True) if bin_file is not None else None # def run(self, update: bool = False, **kwargs) -> (str, dict): # efiles = spikeglx.glob_ephys_files(session_path) From 1bb8b13bdf727e43662231d8ca27f37c704b2c16 Mon Sep 17 00:00:00 2001 From: juhuntenburg Date: Tue, 28 Sep 2021 16:29:20 +0100 Subject: [PATCH 08/18] WIP: run --- ibllib/ephys/ephysqc.py | 103 ++++++++++++++-------------- ibllib/pipes/ephys_preprocessing.py | 7 +- 2 files changed, 59 insertions(+), 51 deletions(-) diff --git a/ibllib/ephys/ephysqc.py b/ibllib/ephys/ephysqc.py index f1f493496..5ae4d00c1 100644 --- a/ibllib/ephys/ephysqc.py +++ b/ibllib/ephys/ephysqc.py @@ -36,13 +36,10 @@ class EphysQC(base.QC): """A class for computing Ephys QC metrics""" - dstypes = ['ephysData.raw.lf', - 'ephysData.raw.ap'] - def __init__(self, probe_id, **kwargs): super().__init__(probe_id, endpoint='insertions', **kwargs) self.pid = probe_id - self.stream = kwargs.pop('stream', None) + self.stream = kwargs.pop('stream', True) keys = ('ap', 'ap_meta', 'lf', 'lf_meta') self.data = Bunch.fromkeys(keys) self.metrics = None @@ -64,10 +61,8 @@ def _ensure_required_data(self): lf_meta = [meta for meta in meta_files if 'lf.meta' in meta.name] assert not len(lf_meta) > 1, f'More than one lf.meta file in {self.probe_path}. Remove redundant files to run QC' - def load_data(self, stream: bool = None) -> None: - # If stream is explicitly given here, overwrite init value - if stream is not None: - self.stream = stream + def load_data(self) -> None: + # First sanity check self._ensure_required_data() _logger.info('Gathering data for QC') @@ -82,46 +77,51 @@ def load_data(self, stream: bool = None) -> None: bin_file = next(meta_file.parent.glob(f'*{type}.*bin'), None) self.data[f'{dstype}'] = spikeglx.Reader(bin_file, open=True) if bin_file is not None else None -# def run(self, update: bool = False, **kwargs) -> (str, dict): -# efiles = spikeglx.glob_ephys_files(session_path) -# qc_files = [] -# if efile.get('ap') and efile.ap.exists(): -# rms_channels = extract_rms_samples(efile.ap, overwrite=overwrite) -# if efile.get('lf') and efile.lf.exists(): -# qc_files.extend(extract_rmsmap(efile.lf, out_folder=None, overwrite=overwrite)) -# -# def extract_rms_samples(sglx, sample_length=SAMPLE_LENGTH, sample_spacing=BATCHES_SPACING, tmin=TMIN, overwrite=False): -# """ -# Calculates RMS as a quality metric for samples of sample_length at sample_spacing from -# a raw binary ephys file, before and after destriping. Returns median RMS per channel -# """ -# rl = metadata.fileTimeSecs -# nc = spikeglx._get_nchannels_from_meta(metadata) -# rl = sglx.ns / sglx.fs -# t0s = np.arange(tmin, rl - sample_length, sample_spacing) -# rms_full = np.zeros((2, sglx.nc - 1, t0s.shape[0])) -# if isinstance(sglx, sglx_streamer): -# _logger.warning("Streaming data for RMS samples") -# for i, t0 in enumerate(tqdm(t0s)): -# sr, _ = stream(pid, t0=t0, nsecs=1, one=one) -# raw = sr[:, :-1].T -# destripe = voltage.destripe(raw, fs=sr.fs, neuropixel_version=1) -# all_rms[0, :, i] = rms(raw) -# all_rms[1, :, i] = rms(destripe) -# -# elif isinstance(sglx, spikeglx.Reader()): -# for i, t0 in enumerate(tqdm(t0s)): -# # Get samples of sample_length every sample_spacing -# sl = slice(int(t0 * sglx.fs), int((t0 + sample_length) * sglx.fs)) -# raw = sglx[sl, :-1].T -# # Run preprocessing on these samples -# destripe = dsp.destripe(raw, fs=sglx.fs, neuropixel_version=1) -# # Calculate rms for each channel for raw and destriped data -# rms_full[0, :, i] = dsp.rms(raw) -# rms_full[1, :, i] = dsp.rms(destripe) -# -# rms_vector = np.nanmedian(rms_matrix, axis=1) -# return rms_vector + def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, **kwargs) -> (str, dict): + # If stream is explicitly given in run, overwrite value from init + if stream is not None: + self.stream = stream + # Load data + self.load_data() + qc_files = [] + # If ap meta file present, calculate median RMS per channel before and after destriping + # TODO: This should go a a separate function once we have a spikeglx.Streamer that behaves like the Reader + if self.data.ap_meta: + rms_file = self.probe_path.joinpath("_iblqc_ephysChannels.apRms.npy") + if rms_file.exists() and not overwrite: + _logger.info(f'File {rms_file} already exists and overwrite=False. Skipping RMS compute.') + else: + rl = self.data.ap_meta.fileTimeSecs + nc = spikeglx._get_nchannels_from_meta(self.data.ap_meta) + t0s = np.arange(TMIN, rl - SAMPLE_LENGTH, BATCHES_SPACING) + all_rms = np.zeros((2, nc - 1, t0s.shape[0])) + if self.data.ap is None and self.stream is True: + _logger.warning(f'Streaming data to compute RMS for probe {self.pid}') + for i, t0 in enumerate(tqdm(t0s)): + sr, _ = sglx_streamer(self.pid, t0=t0, nsecs=1, one=self.one) + raw = sr[:, :-1].T + destripe = dsp.voltage.destripe(raw, fs=sr.fs, neuropixel_version=1) + all_rms[0, :, i] = dsp.rms(raw) + all_rms[1, :, i] = dsp.rms(destripe) + elif self.data.ap is None and self.stream is not True: + _logger.warning(f'Raw ap data is not available locally. Run with stream=True in order to stream ' + f'data for calculating RMS.') + else: + _logger.info(f'Computing RMS for ap data using local data in {self.probe_path}') + for i, t0 in enumerate(t0s): + sl = slice(int(t0 * self.data.ap.fs), int((t0 + SAMPLE_LENGTH) * self.data.ap.fs)) + raw = self.data.ap[sl, :-1].T + destripe = dsp.voltage.destripe(raw, fs=self.data.ap.fs, neuropixel_version=1) + all_rms[0, :, i] = dsp.rms(raw) + all_rms[1, :, i] = dsp.rms(destripe) + np.save(rms_file, np.median(all_rms, axis=-1)) + qc_files.extend(rms_file) + + # If lf meta and bin file present, run the old qc on LF data + if self.data.lf_meta and self.data.lf: + qc_files.extend(extract_rmsmap(self.data.lf, out_folder=None, overwrite=overwrite)) + + return qc_files def rmsmap(fbin): @@ -177,7 +177,10 @@ def extract_rmsmap(fbin, out_folder=None, overwrite=False): :return: None """ _logger.info(f"Computing QC for {fbin}") - sglx = spikeglx.Reader(fbin) + if isinstance(fbin, spikeglx.Reader): + sglx = fbin + else: + sglx = spikeglx.Reader(fbin) # check if output ALF files exist already: if out_folder is None: out_folder = Path(fbin).parent @@ -217,7 +220,7 @@ def raw_qc_session(session_path, overwrite=False): qc_files = [] for efile in efiles: if efile.get('ap') and efile.ap.exists(): - rms_channels = extract_rms_samples(efile.ap, overwrite=overwrite) + qc_files.extend(extract_rmsmap(efile.ap, out_folder=None, overwrite=overwrite)) if efile.get('lf') and efile.lf.exists(): qc_files.extend(extract_rmsmap(efile.lf, out_folder=None, overwrite=overwrite)) return qc_files diff --git a/ibllib/pipes/ephys_preprocessing.py b/ibllib/pipes/ephys_preprocessing.py index 63d629b86..1571a51c9 100644 --- a/ibllib/pipes/ephys_preprocessing.py +++ b/ibllib/pipes/ephys_preprocessing.py @@ -58,7 +58,12 @@ class RawEphysQC(tasks.Task): level = 0 # this job doesn't depend on anything def _run(self, overwrite=False): - qc_files = ephysqc.raw_qc_session(self.session_path, overwrite=overwrite) + eid = self.one.path2eid(self.session_path) + pids = [x['id'] for x in self.one.alyx.rest('insertions', 'list', session=eid)] + qc_files = [] + for pid in pids: + eqc = ephysqc.EphysQC(pid) + qc_files.extend(eqc.run()) return qc_files From b884614ccd2f3ea543b90d78a825d817abf1b4a9 Mon Sep 17 00:00:00 2001 From: juhuntenburg Date: Tue, 28 Sep 2021 18:14:55 +0100 Subject: [PATCH 09/18] WIP: fixes in run --- ibllib/dsp/__init__.py | 1 + ibllib/ephys/ephysqc.py | 22 ++++++++++++++++++---- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/ibllib/dsp/__init__.py b/ibllib/dsp/__init__.py index 9944e0b45..00c578627 100644 --- a/ibllib/dsp/__init__.py +++ b/ibllib/dsp/__init__.py @@ -1,2 +1,3 @@ from .fourier import fscale, freduce, fexpand, lp, hp, bp, fshift, dephas, fit_phase from .utils import rms, WindowGenerator, rises, falls, fronts, fcn_cosine +from .voltage import destripe diff --git a/ibllib/ephys/ephysqc.py b/ibllib/ephys/ephysqc.py index 5ae4d00c1..6ccbbe650 100644 --- a/ibllib/ephys/ephysqc.py +++ b/ibllib/ephys/ephysqc.py @@ -74,7 +74,7 @@ def load_data(self) -> None: _logger.warning(f'No {dstype}.meta file in {self.probe_path}, skipping QC for {dstype} data.') else: self.data[f'{dstype}_meta'] = spikeglx.read_meta_data(meta_file) - bin_file = next(meta_file.parent.glob(f'*{type}.*bin'), None) + bin_file = next(meta_file.parent.glob(f'*{dstype}.*bin'), None) self.data[f'{dstype}'] = spikeglx.Reader(bin_file, open=True) if bin_file is not None else None def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, **kwargs) -> (str, dict): @@ -100,7 +100,7 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, for i, t0 in enumerate(tqdm(t0s)): sr, _ = sglx_streamer(self.pid, t0=t0, nsecs=1, one=self.one) raw = sr[:, :-1].T - destripe = dsp.voltage.destripe(raw, fs=sr.fs, neuropixel_version=1) + destripe = dsp.destripe(raw, fs=sr.fs, neuropixel_version=1) all_rms[0, :, i] = dsp.rms(raw) all_rms[1, :, i] = dsp.rms(destripe) elif self.data.ap is None and self.stream is not True: @@ -111,12 +111,26 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, for i, t0 in enumerate(t0s): sl = slice(int(t0 * self.data.ap.fs), int((t0 + SAMPLE_LENGTH) * self.data.ap.fs)) raw = self.data.ap[sl, :-1].T - destripe = dsp.voltage.destripe(raw, fs=self.data.ap.fs, neuropixel_version=1) + destripe = dsp.destripe(raw, fs=self.data.ap.fs, neuropixel_version=1) all_rms[0, :, i] = dsp.rms(raw) all_rms[1, :, i] = dsp.rms(destripe) - np.save(rms_file, np.median(all_rms, axis=-1)) + median_rms = np.median(all_rms, axis=-1) + np.save(rms_file, median_rms) qc_files.extend(rms_file) + # self.metrics['good_chan_raw'] = np.where(median_rms>30) + # self.metrics['good_chan_destriped'] = + # + # if update: + # extended = { + # k: None if v is None or v == 'NOT_SET' + # else base.CRITERIA[v] < 3 if isinstance(v, str) + # else (base.CRITERIA[v[0]] < 3, *v[1:]) # Convert first value to bool if array + # for k, v in self.metrics.items() + # } + # self.update_extended_qc(extended) + #self.update(outcome, namespace) + # If lf meta and bin file present, run the old qc on LF data if self.data.lf_meta and self.data.lf: qc_files.extend(extract_rmsmap(self.data.lf, out_folder=None, overwrite=overwrite)) From e391e2c3378fd7176604f2bac4fed5b22b4c8310 Mon Sep 17 00:00:00 2001 From: juhuntenburg Date: Wed, 29 Sep 2021 10:46:56 +0100 Subject: [PATCH 10/18] Working version of EphysQC --- ibllib/ephys/ephysqc.py | 35 ++++++++++++++--------------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/ibllib/ephys/ephysqc.py b/ibllib/ephys/ephysqc.py index 6ccbbe650..e96ab8c3e 100644 --- a/ibllib/ephys/ephysqc.py +++ b/ibllib/ephys/ephysqc.py @@ -96,7 +96,7 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, t0s = np.arange(TMIN, rl - SAMPLE_LENGTH, BATCHES_SPACING) all_rms = np.zeros((2, nc - 1, t0s.shape[0])) if self.data.ap is None and self.stream is True: - _logger.warning(f'Streaming data to compute RMS for probe {self.pid}') + _logger.warning(f'Streaming .ap data to compute RMS samples for probe {self.pid}') for i, t0 in enumerate(tqdm(t0s)): sr, _ = sglx_streamer(self.pid, t0=t0, nsecs=1, one=self.one) raw = sr[:, :-1].T @@ -104,10 +104,10 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, all_rms[0, :, i] = dsp.rms(raw) all_rms[1, :, i] = dsp.rms(destripe) elif self.data.ap is None and self.stream is not True: - _logger.warning(f'Raw ap data is not available locally. Run with stream=True in order to stream ' - f'data for calculating RMS.') + _logger.warning(f'Raw .ap data is not available locally. Run with stream=True in order to stream ' + f'data for calculating RMS samples.') else: - _logger.info(f'Computing RMS for ap data using local data in {self.probe_path}') + _logger.info(f'Computing RMS samples for .ap data using local data in {self.probe_path}') for i, t0 in enumerate(t0s): sl = slice(int(t0 * self.data.ap.fs), int((t0 + SAMPLE_LENGTH) * self.data.ap.fs)) raw = self.data.ap[sl, :-1].T @@ -116,7 +116,7 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, all_rms[1, :, i] = dsp.rms(destripe) median_rms = np.median(all_rms, axis=-1) np.save(rms_file, median_rms) - qc_files.extend(rms_file) + qc_files.append(rms_file) # self.metrics['good_chan_raw'] = np.where(median_rms>30) # self.metrics['good_chan_destriped'] = @@ -133,21 +133,19 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, # If lf meta and bin file present, run the old qc on LF data if self.data.lf_meta and self.data.lf: - qc_files.extend(extract_rmsmap(self.data.lf, out_folder=None, overwrite=overwrite)) + qc_files.extend(extract_rmsmap(self.data.lf, out_folder=self.probe_path, overwrite=overwrite)) return qc_files -def rmsmap(fbin): +def rmsmap(sglx): """ Computes RMS map in time domain and spectra for each channel of Neuropixel probe - :param fbin: binary file in spike glx format (will look for attached metatdata) - :type fbin: str or pathlib.Path + :param sglx: Open spikeglx reader :return: a dictionary with amplitudes in channeltime space, channelfrequency space, time and frequency scales """ - sglx = spikeglx.Reader(fbin, open=True) rms_win_length_samples = 2 ** np.ceil(np.log2(sglx.fs * RMS_WIN_LENGTH_SECS)) # the window generator will generates window indices wingen = dsp.WindowGenerator(ns=sglx.ns, nswin=rms_win_length_samples, overlap=0) @@ -179,36 +177,31 @@ def rmsmap(fbin): return win -def extract_rmsmap(fbin, out_folder=None, overwrite=False): +def extract_rmsmap(sglx, out_folder=None, overwrite=False): """ Wrapper for rmsmap that outputs _ibl_ephysRmsMap and _ibl_ephysSpectra ALF files - :param fbin: binary file in spike glx format (will look for attached metatdata) + :param sglx: Open spikeglx Reader with data for which to compute rmsmap :param out_folder: folder in which to store output ALF files. Default uses the folder in which the `fbin` file lives. :param overwrite: do not re-extract if all ALF files already exist :param label: string or list of strings that will be appended to the filename before extension :return: None """ - _logger.info(f"Computing QC for {fbin}") - if isinstance(fbin, spikeglx.Reader): - sglx = fbin - else: - sglx = spikeglx.Reader(fbin) - # check if output ALF files exist already: if out_folder is None: - out_folder = Path(fbin).parent + out_folder = sglx.file_bin.parent else: out_folder = Path(out_folder) + _logger.info(f"Computing RMS map for .{sglx.type} data in {out_folder}") alf_object_time = f'ephysTimeRms{sglx.type.upper()}' alf_object_freq = f'ephysSpectralDensity{sglx.type.upper()}' files_time = list(out_folder.glob(f"_iblqc_{alf_object_time}*")) files_freq = list(out_folder.glob(f"_iblqc_{alf_object_freq}*")) if (len(files_time) == 2 == len(files_freq)) and not overwrite: - _logger.warning(f'{fbin.name} QC already exists, skipping. Use overwrite option.') + _logger.warning(f'RMS map already exists for .{sglx.type} data in {out_folder}, skipping. Use overwrite option.') return files_time + files_freq # crunch numbers - rms = rmsmap(fbin) + rms = rmsmap(sglx) # output ALF files, single precision with the optional label as suffix before extension if not out_folder.exists(): out_folder.mkdir() From 9b57521b3e090117e817e4cb697fa69ee23e901e Mon Sep 17 00:00:00 2001 From: juhuntenburg Date: Wed, 29 Sep 2021 14:59:55 +0100 Subject: [PATCH 11/18] Update extended qc --- ibllib/ephys/ephysqc.py | 23 +++++++++-------------- ibllib/pipes/ephys_preprocessing.py | 2 +- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/ibllib/ephys/ephysqc.py b/ibllib/ephys/ephysqc.py index e96ab8c3e..47ab6a373 100644 --- a/ibllib/ephys/ephysqc.py +++ b/ibllib/ephys/ephysqc.py @@ -42,7 +42,7 @@ def __init__(self, probe_id, **kwargs): self.stream = kwargs.pop('stream', True) keys = ('ap', 'ap_meta', 'lf', 'lf_meta') self.data = Bunch.fromkeys(keys) - self.metrics = None + self.metrics = {} self.outcome = 'NOT_SET' def _ensure_required_data(self): @@ -89,7 +89,8 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, if self.data.ap_meta: rms_file = self.probe_path.joinpath("_iblqc_ephysChannels.apRms.npy") if rms_file.exists() and not overwrite: - _logger.info(f'File {rms_file} already exists and overwrite=False. Skipping RMS compute.') + _logger.warning(f'File {rms_file} already exists and overwrite=False. Skipping RMS compute.') + median_rms = np.load(rms_file) else: rl = self.data.ap_meta.fileTimeSecs nc = spikeglx._get_nchannels_from_meta(self.data.ap_meta) @@ -118,18 +119,12 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, np.save(rms_file, median_rms) qc_files.append(rms_file) - # self.metrics['good_chan_raw'] = np.where(median_rms>30) - # self.metrics['good_chan_destriped'] = - # - # if update: - # extended = { - # k: None if v is None or v == 'NOT_SET' - # else base.CRITERIA[v] < 3 if isinstance(v, str) - # else (base.CRITERIA[v[0]] < 3, *v[1:]) # Convert first value to bool if array - # for k, v in self.metrics.items() - # } - # self.update_extended_qc(extended) - #self.update(outcome, namespace) + self.metrics['apRms_p10'] = np.format_float_scientific(np.percentile(median_rms, 90), precision=2) + self.metrics['apRms_p90'] = np.format_float_scientific(np.percentile(median_rms, 10), precision=2) + + if update: + self.update_extended_qc(self.metrics) + # self.update(outcome) # If lf meta and bin file present, run the old qc on LF data if self.data.lf_meta and self.data.lf: diff --git a/ibllib/pipes/ephys_preprocessing.py b/ibllib/pipes/ephys_preprocessing.py index 1571a51c9..d58be41f1 100644 --- a/ibllib/pipes/ephys_preprocessing.py +++ b/ibllib/pipes/ephys_preprocessing.py @@ -63,7 +63,7 @@ def _run(self, overwrite=False): qc_files = [] for pid in pids: eqc = ephysqc.EphysQC(pid) - qc_files.extend(eqc.run()) + qc_files.extend(eqc.run(update=True, overwrite=overwrite)) return qc_files From 1b45c7ba3b530a49840a359e0a497d7ae69a09af Mon Sep 17 00:00:00 2001 From: juhuntenburg Date: Thu, 30 Sep 2021 13:27:15 +0100 Subject: [PATCH 12/18] Adapt to final rms datatype --- ibllib/ephys/ephysqc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ibllib/ephys/ephysqc.py b/ibllib/ephys/ephysqc.py index 47ab6a373..1d75c6643 100644 --- a/ibllib/ephys/ephysqc.py +++ b/ibllib/ephys/ephysqc.py @@ -87,7 +87,7 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, # If ap meta file present, calculate median RMS per channel before and after destriping # TODO: This should go a a separate function once we have a spikeglx.Streamer that behaves like the Reader if self.data.ap_meta: - rms_file = self.probe_path.joinpath("_iblqc_ephysChannels.apRms.npy") + rms_file = self.probe_path.joinpath("_iblqc_ephysChannels.apRMS.npy") if rms_file.exists() and not overwrite: _logger.warning(f'File {rms_file} already exists and overwrite=False. Skipping RMS compute.') median_rms = np.load(rms_file) @@ -117,7 +117,7 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, all_rms[1, :, i] = dsp.rms(destripe) median_rms = np.median(all_rms, axis=-1) np.save(rms_file, median_rms) - qc_files.append(rms_file) + qc_files.append(rms_file) self.metrics['apRms_p10'] = np.format_float_scientific(np.percentile(median_rms, 90), precision=2) self.metrics['apRms_p90'] = np.format_float_scientific(np.percentile(median_rms, 10), precision=2) From eb56526b211f734fcdbca2886673452e7d96ed77 Mon Sep 17 00:00:00 2001 From: juhuntenburg Date: Thu, 30 Sep 2021 15:22:58 +0100 Subject: [PATCH 13/18] Unittests for ephysQC --- ibllib/tests/test_ephys.py | 52 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/ibllib/tests/test_ephys.py b/ibllib/tests/test_ephys.py index 9ce03b752..d895a21bf 100644 --- a/ibllib/tests/test_ephys.py +++ b/ibllib/tests/test_ephys.py @@ -1,9 +1,13 @@ # Mock dataset import unittest +from tempfile import TemporaryDirectory import numpy as np from ibllib.ephys import ephysqc, neuropixel +from one.tests import TEST_DB_1, OFFLINE_ONLY +from ibllib.tests.fixtures import utils +from one.api import ONE class TestNeuropixel(unittest.TestCase): @@ -56,5 +60,53 @@ def test_impeccable_dataset(self): self.assertTrue(np.all([np.all(qct[k]) for k in qct])) +@unittest.skipIf(OFFLINE_ONLY, 'online only test') +class TestEphysQC(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + cls.tempdir = TemporaryDirectory() + cls.one = ONE(**TEST_DB_1, cache_dir=cls.tempdir.name) + + @classmethod + def tearDownClass(cls) -> None: + # Clear overwritten methods by destroying cached instance + ONE.cache_clear() + cls.tempdir.cleanup() + + def setUp(self) -> None: + + self.eid = 'b1c968ad-4874-468d-b2e4-5ffa9b9964e9' + # make a temp probe insertion + self.pname = 'probe02' + # Find any existing insertions with this name and delete + probe_insertions = self.one.alyx.rest('insertions', 'list', session=self.eid, name=self.pname, no_cache=True) + for pi in probe_insertions: + self.one.alyx.rest('insertions', 'delete', pi['id']) + # Create new insertion with this name and add teardown hook to delete it + probe_insertion = self.one.alyx.rest('insertions', 'create', data={'session': self.eid, 'name': self.pname}) + self.addCleanup(self.one.alyx.rest, 'insertions', 'delete', id=probe_insertion['id']) + self.pid = probe_insertion['id'] + self.qc = ephysqc.EphysQC(self.pid, one=self.one) + + def tearDown(self) -> None: + pass + + def test_ensure_data(self): + # Make sure raises an error when no data present + utils.create_fake_raw_ephys_data_folder(self.one.eid2path(self.eid), populate=False) + with self.assertRaises(AssertionError): + self.qc._ensure_required_data() + # Make sure it runs through fine when meta files are present + utils.create_fake_raw_ephys_data_folder(self.one.eid2path(self.eid), populate=True) + self.qc._ensure_required_data() + + def test_load_data(self): + # In case that hasn't been run + utils.create_fake_raw_ephys_data_folder(self.one.eid2path(self.eid), populate=True) + # Remove the fake bin files because they won't be able to load + for fbin in ['_spikeglx_ephysData_g0_t0.imec.lf.bin', '_spikeglx_ephysData_g0_t0.imec.ap.bin']: + self.one.eid2path(self.eid).joinpath('raw_ephys_data', self.pname, fbin).unlink() + self.qc.load_data() + if __name__ == "__main__": unittest.main(exit=False, verbosity=2) From 9b42341c829a69065cc98fccd0554e96fb38967a Mon Sep 17 00:00:00 2001 From: juhuntenburg Date: Mon, 4 Oct 2021 13:36:04 +0100 Subject: [PATCH 14/18] Create probe insertions on alyx if they don't exist --- ibllib/pipes/ephys_preprocessing.py | 13 +++++++++---- ibllib/pipes/misc.py | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/ibllib/pipes/ephys_preprocessing.py b/ibllib/pipes/ephys_preprocessing.py index d58be41f1..301494ff4 100644 --- a/ibllib/pipes/ephys_preprocessing.py +++ b/ibllib/pipes/ephys_preprocessing.py @@ -17,6 +17,7 @@ from ibllib.io.extractors import ephys_fpga, ephys_passive, camera from ibllib.pipes import tasks from ibllib.pipes.training_preprocessing import TrainingRegisterRaw as EphysRegisterRaw +from ibllib.pipes.misc import create_alyx_probe_insertions from ibllib.qc.task_extractors import TaskQCExtractor from ibllib.qc.task_metrics import TaskQC from ibllib.qc.camera import run_all_qc as run_camera_qc @@ -53,16 +54,20 @@ class RawEphysQC(tasks.Task): """ cpu = 2 - io_charge = 30 # this jobs reads raw ap files - priority = 10 # a lot of jobs depend on this one - level = 0 # this job doesn't depend on anything + io_charge = 30 + priority = 10 + level = 0 def _run(self, overwrite=False): eid = self.one.path2eid(self.session_path) pids = [x['id'] for x in self.one.alyx.rest('insertions', 'list', session=eid)] + # Usually there should be two probes, if there are less, check if all probes are registered + if len(pids) < 2: + _logger.warning(f"{len(pids)} probes registered for session {eid}, trying to register from local data") + pids = [p['id'] for p in create_alyx_probe_insertions(self.session_path, one=self.one)] qc_files = [] for pid in pids: - eqc = ephysqc.EphysQC(pid) + eqc = ephysqc.EphysQC(pid, one=self.one) qc_files.extend(eqc.run(update=True, overwrite=overwrite)) return qc_files diff --git a/ibllib/pipes/misc.py b/ibllib/pipes/misc.py index 4456b079d..02a335d6f 100644 --- a/ibllib/pipes/misc.py +++ b/ibllib/pipes/misc.py @@ -355,7 +355,7 @@ def create_alyx_probe_insertions( ): if one is None: one = ONE(cache_rest=None) - eid = session_path if is_uuid_string(session_path) else one.eid_from_path(session_path) + eid = session_path if is_uuid_string(session_path) else one.path2eid(session_path) if eid is None: print("Session not found on Alyx: please create session before creating insertions") if model is None: From c70c5a17bdd899da172e62c8ff3afaf2b680d68b Mon Sep 17 00:00:00 2001 From: juhuntenburg Date: Mon, 4 Oct 2021 19:16:16 +0100 Subject: [PATCH 15/18] Docs and fix json fields --- ibllib/ephys/ephysqc.py | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/ibllib/ephys/ephysqc.py b/ibllib/ephys/ephysqc.py index 1d75c6643..943a34ee1 100644 --- a/ibllib/ephys/ephysqc.py +++ b/ibllib/ephys/ephysqc.py @@ -34,7 +34,13 @@ class EphysQC(base.QC): - """A class for computing Ephys QC metrics""" + """ + A class for computing Ephys QC metrics. + + :param probe_id: An existing and registered probe insertion ID. + :param one: An ONE instance pointing to the database the probe_id is registered with. Optional, will instantiate + default database if not given. + """ def __init__(self, probe_id, **kwargs): super().__init__(probe_id, endpoint='insertions', **kwargs) @@ -62,6 +68,9 @@ def _ensure_required_data(self): assert not len(lf_meta) > 1, f'More than one lf.meta file in {self.probe_path}. Remove redundant files to run QC' def load_data(self) -> None: + """ + Load any locally available data. + """ # First sanity check self._ensure_required_data() @@ -78,6 +87,15 @@ def load_data(self) -> None: self.data[f'{dstype}'] = spikeglx.Reader(bin_file, open=True) if bin_file is not None else None def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, **kwargs) -> (str, dict): + """ + Run QC on samples of the .ap file, and on the entire file for .lf data if it is present. + + :param update: bool, whether to update the qc json fields for this probe. Default is False. + :param overwrite: bool, whether to overwrite locally existing outputs of this function. Default is False. + :param stream: bool, whether to stream the samples of the .ap data if not locally available. Defaults to value + set in class init (True if none set). + :return: A list of QC output files. In case of a complete run that is one file for .ap and three files for .lf. + """ # If stream is explicitly given in run, overwrite value from init if stream is not None: self.stream = stream @@ -96,6 +114,7 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, nc = spikeglx._get_nchannels_from_meta(self.data.ap_meta) t0s = np.arange(TMIN, rl - SAMPLE_LENGTH, BATCHES_SPACING) all_rms = np.zeros((2, nc - 1, t0s.shape[0])) + # If the ap.bin file is not present locally, stream it if self.data.ap is None and self.stream is True: _logger.warning(f'Streaming .ap data to compute RMS samples for probe {self.pid}') for i, t0 in enumerate(tqdm(t0s)): @@ -115,13 +134,16 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, destripe = dsp.destripe(raw, fs=self.data.ap.fs, neuropixel_version=1) all_rms[0, :, i] = dsp.rms(raw) all_rms[1, :, i] = dsp.rms(destripe) + # Calculate the median RMS across all samples per channel median_rms = np.median(all_rms, axis=-1) np.save(rms_file, median_rms) qc_files.append(rms_file) - self.metrics['apRms_p10'] = np.format_float_scientific(np.percentile(median_rms, 90), precision=2) - self.metrics['apRms_p90'] = np.format_float_scientific(np.percentile(median_rms, 10), precision=2) - + for p in [10, 90]: + self.metrics[f'apRms_p{p}_raw'] = np.format_float_scientific(np.percentile(median_rms[0, :], p), + precision=2) + self.metrics[f'apRms_p{p}_proc'] = np.format_float_scientific(np.percentile(median_rms[1, :], p), + precision=2) if update: self.update_extended_qc(self.metrics) # self.update(outcome) From 9d1c40865e08a97e50d49ded02e0b296b708785d Mon Sep 17 00:00:00 2001 From: juhuntenburg Date: Tue, 5 Oct 2021 08:00:46 +0100 Subject: [PATCH 16/18] Flakify --- ibllib/ephys/ephysqc.py | 6 +++--- ibllib/io/spikeglx.py | 6 +++--- ibllib/tests/test_ephys.py | 1 + 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/ibllib/ephys/ephysqc.py b/ibllib/ephys/ephysqc.py index 943a34ee1..7582305d6 100644 --- a/ibllib/ephys/ephysqc.py +++ b/ibllib/ephys/ephysqc.py @@ -59,7 +59,7 @@ def _ensure_required_data(self): eid, pname = self.one.pid2eid(self.pid) self.probe_path = self.one.eid2path(eid).joinpath('raw_ephys_data', pname) # Check if there is at least one meta file available - meta_files = list(self.probe_path.rglob(f'*.meta')) + meta_files = list(self.probe_path.rglob('*.meta')) assert len(meta_files) != 0, f'No meta files in {self.probe_path}' # Check if there is no more than one meta file per type ap_meta = [meta for meta in meta_files if 'ap.meta' in meta.name] @@ -124,8 +124,8 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, all_rms[0, :, i] = dsp.rms(raw) all_rms[1, :, i] = dsp.rms(destripe) elif self.data.ap is None and self.stream is not True: - _logger.warning(f'Raw .ap data is not available locally. Run with stream=True in order to stream ' - f'data for calculating RMS samples.') + _logger.warning('Raw .ap data is not available locally. Run with stream=True in order to stream ' + 'data for calculating RMS samples.') else: _logger.info(f'Computing RMS samples for .ap data using local data in {self.probe_path}') for i, t0 in enumerate(t0s): diff --git a/ibllib/io/spikeglx.py b/ibllib/io/spikeglx.py index 255c20493..b02215664 100644 --- a/ibllib/io/spikeglx.py +++ b/ibllib/io/spikeglx.py @@ -660,7 +660,7 @@ def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None url_ch, cache_dir=target_dir, clobber=True, return_md5=False)) ch_file = remove_uuid_file(ch_file) ch_file_stream = target_dir.joinpath(ch_file.name).with_suffix('.stream.ch') - + # Load the .ch file. with open(ch_file, 'r') as f: cmeta = json.load(f) @@ -668,12 +668,12 @@ def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None # Get the first byte and number of bytes to download. i0 = cmeta['chunk_bounds'][first_chunk] ns_stream = cmeta['chunk_bounds'][last_chunk + 1] - i0 - + # handles the meta file meta_local_path = ch_file_stream.with_suffix('.meta') if not meta_local_path.exists(): shutil.copy(ch_file.with_suffix('.meta'), meta_local_path) - + # if the cached version happens to be the same as the one on disk, just load it if ch_file_stream.exists(): with open(ch_file_stream, 'r') as f: diff --git a/ibllib/tests/test_ephys.py b/ibllib/tests/test_ephys.py index d895a21bf..5372e6c62 100644 --- a/ibllib/tests/test_ephys.py +++ b/ibllib/tests/test_ephys.py @@ -108,5 +108,6 @@ def test_load_data(self): self.one.eid2path(self.eid).joinpath('raw_ephys_data', self.pname, fbin).unlink() self.qc.load_data() + if __name__ == "__main__": unittest.main(exit=False, verbosity=2) From 029d63a66dde9779b24d831cd901259acfd71924 Mon Sep 17 00:00:00 2001 From: juhuntenburg Date: Tue, 5 Oct 2021 09:12:26 +0200 Subject: [PATCH 17/18] fix test_db import --- ibllib/tests/test_ephys.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ibllib/tests/test_ephys.py b/ibllib/tests/test_ephys.py index 5372e6c62..abe8cf3d7 100644 --- a/ibllib/tests/test_ephys.py +++ b/ibllib/tests/test_ephys.py @@ -5,7 +5,7 @@ import numpy as np from ibllib.ephys import ephysqc, neuropixel -from one.tests import TEST_DB_1, OFFLINE_ONLY +from ibllib.tests import TEST_DB from ibllib.tests.fixtures import utils from one.api import ONE @@ -60,12 +60,11 @@ def test_impeccable_dataset(self): self.assertTrue(np.all([np.all(qct[k]) for k in qct])) -@unittest.skipIf(OFFLINE_ONLY, 'online only test') class TestEphysQC(unittest.TestCase): @classmethod def setUpClass(cls) -> None: cls.tempdir = TemporaryDirectory() - cls.one = ONE(**TEST_DB_1, cache_dir=cls.tempdir.name) + cls.one = ONE(**TEST_DB, cache_dir=cls.tempdir.name) @classmethod def tearDownClass(cls) -> None: From f877feedade2b943a9bb5522eb17ccf4848258e8 Mon Sep 17 00:00:00 2001 From: juhuntenburg Date: Wed, 6 Oct 2021 16:14:01 +0100 Subject: [PATCH 18/18] Bump version and release notes --- release_notes.md | 5 ++++- setup.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/release_notes.md b/release_notes.md index 37a7b4b0a..b4ad1e039 100644 --- a/release_notes.md +++ b/release_notes.md @@ -3,8 +3,11 @@ - destriping as pykilosort internal pre-processing - NP2 probe framework for splitting shanks and LFP band - Extension of task module to rerun from different locations +### Release Notes 2.1.0 2021-10-06 +- RawEphysQC tasks computes median RMS from samples for .ap data (stored in _iblqc_ephysChannels.RMS) +- New EphysQC class -## Release Notes 2.0.0 +## Release Notes 2.0 ### Release Notes 2.0.1 2021-08-07 - pykilosort error handling ### Release Notes 2.0.2 2021-08-31 diff --git a/setup.py b/setup.py index 7a2887859..f0c91705b 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ setup( name='ibllib', - version='2.1.0', + version='2.1.1', python_requires='>={}.{}'.format(*REQUIRED_PYTHON), description='IBL libraries', license="MIT",