Skip to content

Commit

Permalink
Merge branch 'release/2.1.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
juhuntenburg committed Oct 6, 2021
2 parents c5887ba + f877fee commit fa8f3d4
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 25 deletions.
57 changes: 55 additions & 2 deletions brainbox/io/spikeglx.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import logging
from pathlib import Path
import time

import numpy as np
# (Previously required `os.path` to get file info before memmapping)
# import os.path as op

from ibllib.io import spikeglx

_logger = logging.getLogger('ibllib')


def extract_waveforms(ephys_file, ts, ch, t=2.0, sr=30000, n_ch_probe=385, car=True):
"""
Expand Down Expand Up @@ -100,3 +104,52 @@ def extract_waveforms(ephys_file, ts, ch, t=2.0, sr=30000, n_ch_probe=385, car=T
print('Done. ({})'.format(time.ctime()))

return waveforms


def stream(pid, t0, nsecs=1, one=None, cache_folder=None, dsets=None, typ='ap'):
"""
NB: returned Reader object must be closed after use
:param pid: Probe UUID
:param t0: time of the first sample
:param nsecs: duration of the streamed data
:param one: An instance of ONE
:param cache_folder:
:param typ: 'ap' or 'lf'
:return: sr, dsets, t0
"""
CHUNK_DURATION_SECS = 1 # the mtscomp chunk duration. Right now it's a constant
if nsecs > 10:
ValueError(f'Streamer works only with 10 or less seconds, set nsecs to lesss than {nsecs}')
assert one
assert typ in ['lf', 'ap']
t0 = np.floor(t0 / CHUNK_DURATION_SECS) * CHUNK_DURATION_SECS
if cache_folder is None:
samples_folder = Path(one.alyx._par.CACHE_DIR).joinpath('cache', typ)
sample_file_name = Path(f"{pid}_{str(int(t0)).zfill(5)}.meta")

if samples_folder.joinpath(sample_file_name).exists():
_logger.info(f'loading {sample_file_name} from cache')
sr = spikeglx.Reader(samples_folder.joinpath(sample_file_name).with_suffix('.bin'),
open=True)
return sr, t0

eid, pname = one.pid2eid(pid)
cbin_rec = one.list_datasets(eid, collection=f"*{pname}", filename='*ap.*bin', details=True)
ch_rec = one.list_datasets(eid, collection=f"*{pname}", filename='*ap.ch', details=True)
meta_rec = one.list_datasets(eid, collection=f"*{pname}", filename='*ap.meta', details=True)
ch_file = one._download_datasets(ch_rec)[0]
one._download_datasets(meta_rec)[0]

first_chunk = int(t0 / CHUNK_DURATION_SECS)
last_chunk = int((t0 + nsecs) / CHUNK_DURATION_SECS) - 1

samples_folder.mkdir(exist_ok=True, parents=True)
sr = spikeglx.download_raw_partial(
one=one,
url_cbin=one.record2url(cbin_rec)[0],
url_ch=ch_file,
first_chunk=first_chunk,
last_chunk=last_chunk,
cache_dir=samples_folder)

return sr, t0
1 change: 1 addition & 0 deletions ibllib/dsp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .fourier import fscale, freduce, fexpand, lp, hp, bp, fshift, dephas, fit_phase
from .utils import rms, WindowGenerator, rises, falls, fronts, fcn_cosine
from .voltage import destripe
148 changes: 136 additions & 12 deletions ibllib/ephys/ephysqc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
import numpy as np
import pandas as pd
from scipy import signal
from tqdm import tqdm
import one.alf.io as alfio
from iblutil.util import Bunch

from brainbox.metrics.single_units import spike_sorting_metrics
from brainbox.io.spikeglx import stream as sglx_streamer
from ibllib.ephys import sync_probes
from ibllib.io import spikeglx
import ibllib.dsp as dsp
from ibllib.qc import base
from ibllib.io.extractors import ephys_fpga, training_wheel
from ibllib.misc import print_progress
from phylib.io import model
Expand All @@ -25,18 +28,141 @@
RMS_WIN_LENGTH_SECS = 3
WELCH_WIN_LENGTH_SAMPLES = 1024
NCH_WAVEFORMS = 32 # number of channels to be saved in templates.waveforms and channels.waveforms
BATCHES_SPACING = 300
TMIN = 40
SAMPLE_LENGTH = 1


def rmsmap(fbin):
class EphysQC(base.QC):
"""
A class for computing Ephys QC metrics.
:param probe_id: An existing and registered probe insertion ID.
:param one: An ONE instance pointing to the database the probe_id is registered with. Optional, will instantiate
default database if not given.
"""

def __init__(self, probe_id, **kwargs):
super().__init__(probe_id, endpoint='insertions', **kwargs)
self.pid = probe_id
self.stream = kwargs.pop('stream', True)
keys = ('ap', 'ap_meta', 'lf', 'lf_meta')
self.data = Bunch.fromkeys(keys)
self.metrics = {}
self.outcome = 'NOT_SET'

def _ensure_required_data(self):
"""
Ensures the datasets required for QC are available locally or remotely.
"""
assert self.one is not None, 'ONE instance is required to ensure required data'
eid, pname = self.one.pid2eid(self.pid)
self.probe_path = self.one.eid2path(eid).joinpath('raw_ephys_data', pname)
# Check if there is at least one meta file available
meta_files = list(self.probe_path.rglob('*.meta'))
assert len(meta_files) != 0, f'No meta files in {self.probe_path}'
# Check if there is no more than one meta file per type
ap_meta = [meta for meta in meta_files if 'ap.meta' in meta.name]
assert not len(ap_meta) > 1, f'More than one ap.meta file in {self.probe_path}. Remove redundant files to run QC'
lf_meta = [meta for meta in meta_files if 'lf.meta' in meta.name]
assert not len(lf_meta) > 1, f'More than one lf.meta file in {self.probe_path}. Remove redundant files to run QC'

def load_data(self) -> None:
"""
Load any locally available data.
"""
# First sanity check
self._ensure_required_data()

_logger.info('Gathering data for QC')
# Load metadata and, if locally present, bin file
for dstype in ['ap', 'lf']:
# We already checked that there is not more than one meta file per type
meta_file = next(self.probe_path.rglob(f'*{dstype}.meta'), None)
if meta_file is None:
_logger.warning(f'No {dstype}.meta file in {self.probe_path}, skipping QC for {dstype} data.')
else:
self.data[f'{dstype}_meta'] = spikeglx.read_meta_data(meta_file)
bin_file = next(meta_file.parent.glob(f'*{dstype}.*bin'), None)
self.data[f'{dstype}'] = spikeglx.Reader(bin_file, open=True) if bin_file is not None else None

def run(self, update: bool = False, overwrite: bool = True, stream: bool = None, **kwargs) -> (str, dict):
"""
Run QC on samples of the .ap file, and on the entire file for .lf data if it is present.
:param update: bool, whether to update the qc json fields for this probe. Default is False.
:param overwrite: bool, whether to overwrite locally existing outputs of this function. Default is False.
:param stream: bool, whether to stream the samples of the .ap data if not locally available. Defaults to value
set in class init (True if none set).
:return: A list of QC output files. In case of a complete run that is one file for .ap and three files for .lf.
"""
# If stream is explicitly given in run, overwrite value from init
if stream is not None:
self.stream = stream
# Load data
self.load_data()
qc_files = []
# If ap meta file present, calculate median RMS per channel before and after destriping
# TODO: This should go a a separate function once we have a spikeglx.Streamer that behaves like the Reader
if self.data.ap_meta:
rms_file = self.probe_path.joinpath("_iblqc_ephysChannels.apRMS.npy")
if rms_file.exists() and not overwrite:
_logger.warning(f'File {rms_file} already exists and overwrite=False. Skipping RMS compute.')
median_rms = np.load(rms_file)
else:
rl = self.data.ap_meta.fileTimeSecs
nc = spikeglx._get_nchannels_from_meta(self.data.ap_meta)
t0s = np.arange(TMIN, rl - SAMPLE_LENGTH, BATCHES_SPACING)
all_rms = np.zeros((2, nc - 1, t0s.shape[0]))
# If the ap.bin file is not present locally, stream it
if self.data.ap is None and self.stream is True:
_logger.warning(f'Streaming .ap data to compute RMS samples for probe {self.pid}')
for i, t0 in enumerate(tqdm(t0s)):
sr, _ = sglx_streamer(self.pid, t0=t0, nsecs=1, one=self.one)
raw = sr[:, :-1].T
destripe = dsp.destripe(raw, fs=sr.fs, neuropixel_version=1)
all_rms[0, :, i] = dsp.rms(raw)
all_rms[1, :, i] = dsp.rms(destripe)
elif self.data.ap is None and self.stream is not True:
_logger.warning('Raw .ap data is not available locally. Run with stream=True in order to stream '
'data for calculating RMS samples.')
else:
_logger.info(f'Computing RMS samples for .ap data using local data in {self.probe_path}')
for i, t0 in enumerate(t0s):
sl = slice(int(t0 * self.data.ap.fs), int((t0 + SAMPLE_LENGTH) * self.data.ap.fs))
raw = self.data.ap[sl, :-1].T
destripe = dsp.destripe(raw, fs=self.data.ap.fs, neuropixel_version=1)
all_rms[0, :, i] = dsp.rms(raw)
all_rms[1, :, i] = dsp.rms(destripe)
# Calculate the median RMS across all samples per channel
median_rms = np.median(all_rms, axis=-1)
np.save(rms_file, median_rms)
qc_files.append(rms_file)

for p in [10, 90]:
self.metrics[f'apRms_p{p}_raw'] = np.format_float_scientific(np.percentile(median_rms[0, :], p),
precision=2)
self.metrics[f'apRms_p{p}_proc'] = np.format_float_scientific(np.percentile(median_rms[1, :], p),
precision=2)
if update:
self.update_extended_qc(self.metrics)
# self.update(outcome)

# If lf meta and bin file present, run the old qc on LF data
if self.data.lf_meta and self.data.lf:
qc_files.extend(extract_rmsmap(self.data.lf, out_folder=self.probe_path, overwrite=overwrite))

return qc_files


def rmsmap(sglx):
"""
Computes RMS map in time domain and spectra for each channel of Neuropixel probe
:param fbin: binary file in spike glx format (will look for attached metatdata)
:type fbin: str or pathlib.Path
:param sglx: Open spikeglx reader
:return: a dictionary with amplitudes in channeltime space, channelfrequency space, time
and frequency scales
"""
sglx = spikeglx.Reader(fbin, open=True)
rms_win_length_samples = 2 ** np.ceil(np.log2(sglx.fs * RMS_WIN_LENGTH_SECS))
# the window generator will generates window indices
wingen = dsp.WindowGenerator(ns=sglx.ns, nswin=rms_win_length_samples, overlap=0)
Expand Down Expand Up @@ -68,33 +194,31 @@ def rmsmap(fbin):
return win


def extract_rmsmap(fbin, out_folder=None, overwrite=False):
def extract_rmsmap(sglx, out_folder=None, overwrite=False):
"""
Wrapper for rmsmap that outputs _ibl_ephysRmsMap and _ibl_ephysSpectra ALF files
:param fbin: binary file in spike glx format (will look for attached metatdata)
:param sglx: Open spikeglx Reader with data for which to compute rmsmap
:param out_folder: folder in which to store output ALF files. Default uses the folder in which
the `fbin` file lives.
:param overwrite: do not re-extract if all ALF files already exist
:param label: string or list of strings that will be appended to the filename before extension
:return: None
"""
_logger.info(f"Computing QC for {fbin}")
sglx = spikeglx.Reader(fbin)
# check if output ALF files exist already:
if out_folder is None:
out_folder = Path(fbin).parent
out_folder = sglx.file_bin.parent
else:
out_folder = Path(out_folder)
_logger.info(f"Computing RMS map for .{sglx.type} data in {out_folder}")
alf_object_time = f'ephysTimeRms{sglx.type.upper()}'
alf_object_freq = f'ephysSpectralDensity{sglx.type.upper()}'
files_time = list(out_folder.glob(f"_iblqc_{alf_object_time}*"))
files_freq = list(out_folder.glob(f"_iblqc_{alf_object_freq}*"))
if (len(files_time) == 2 == len(files_freq)) and not overwrite:
_logger.warning(f'{fbin.name} QC already exists, skipping. Use overwrite option.')
_logger.warning(f'RMS map already exists for .{sglx.type} data in {out_folder}, skipping. Use overwrite option.')
return files_time + files_freq
# crunch numbers
rms = rmsmap(fbin)
rms = rmsmap(sglx)
# output ALF files, single precision with the optional label as suffix before extension
if not out_folder.exists():
out_folder.mkdir()
Expand Down
18 changes: 11 additions & 7 deletions ibllib/io/spikeglx.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ def get_sync_map(folder_ephys):
return _sync_map_from_hardware_config(hc)


def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None):
def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None, cache_dir=None):
"""
TODO Document
:param url_cbin:
Expand All @@ -702,10 +702,11 @@ def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None
"""
assert str(url_cbin).endswith('.cbin')
assert str(url_ch).endswith('.ch')
webclient = (one or ONE()).alyx

one = one or ONE()
webclient = one.alyx
cache_dir = cache_dir or webclient.cache_dir
relpath = Path(url_cbin.replace(webclient._par.HTTP_DATA_SERVER, '.')).parents[0]
target_dir = Path(webclient.cache_dir, relpath)
target_dir = Path(cache_dir, relpath)
Path(target_dir).mkdir(parents=True, exist_ok=True)

# First, download the .ch file if necessary
Expand All @@ -715,7 +716,7 @@ def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None
ch_file = Path(webclient.download_file(
url_ch, cache_dir=target_dir, clobber=True, return_md5=False))
ch_file = remove_uuid_file(ch_file)
ch_file_stream = ch_file.with_suffix('.stream.ch')
ch_file_stream = target_dir.joinpath(ch_file.name).with_suffix('.stream.ch')

# Load the .ch file.
with open(ch_file, 'r') as f:
Expand All @@ -725,6 +726,11 @@ def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None
i0 = cmeta['chunk_bounds'][first_chunk]
ns_stream = cmeta['chunk_bounds'][last_chunk + 1] - i0

# handles the meta file
meta_local_path = ch_file_stream.with_suffix('.meta')
if not meta_local_path.exists():
shutil.copy(ch_file.with_suffix('.meta'), meta_local_path)

# if the cached version happens to be the same as the one on disk, just load it
if ch_file_stream.exists():
with open(ch_file_stream, 'r') as f:
Expand Down Expand Up @@ -769,7 +775,5 @@ def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None
cbin_local_path.replace(cbin_local_path_renamed)
assert cbin_local_path_renamed.exists()

shutil.copy(cbin_local_path.with_suffix('.meta'),
cbin_local_path_renamed.with_suffix('.meta'))
reader = Reader(cbin_local_path_renamed)
return reader
12 changes: 11 additions & 1 deletion ibllib/pipes/ephys_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from ibllib.io.extractors import ephys_fpga, ephys_passive, camera
from ibllib.pipes import tasks
from ibllib.pipes.training_preprocessing import TrainingRegisterRaw as EphysRegisterRaw
from ibllib.pipes.misc import create_alyx_probe_insertions
from ibllib.qc.task_extractors import TaskQCExtractor
from ibllib.qc.task_metrics import TaskQC
from ibllib.qc.camera import run_all_qc as run_camera_qc
Expand Down Expand Up @@ -60,7 +61,16 @@ class RawEphysQC(tasks.Task):
input_files = signatures.RAWEPHYSQC

def _run(self, overwrite=False):
qc_files = ephysqc.raw_qc_session(self.session_path, overwrite=overwrite)
eid = self.one.path2eid(self.session_path)
pids = [x['id'] for x in self.one.alyx.rest('insertions', 'list', session=eid)]
# Usually there should be two probes, if there are less, check if all probes are registered
if len(pids) < 2:
_logger.warning(f"{len(pids)} probes registered for session {eid}, trying to register from local data")
pids = [p['id'] for p in create_alyx_probe_insertions(self.session_path, one=self.one)]
qc_files = []
for pid in pids:
eqc = ephysqc.EphysQC(pid, one=self.one)
qc_files.extend(eqc.run(update=True, overwrite=overwrite))
return qc_files


Expand Down
2 changes: 1 addition & 1 deletion ibllib/pipes/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def create_alyx_probe_insertions(
):
if one is None:
one = ONE(cache_rest=None)
eid = session_path if is_uuid_string(session_path) else one.eid_from_path(session_path)
eid = session_path if is_uuid_string(session_path) else one.path2eid(session_path)
if eid is None:
print("Session not found on Alyx: please create session before creating insertions")
if model is None:
Expand Down
Loading

0 comments on commit fa8f3d4

Please sign in to comment.