-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New ingestion_schemas
+ MANY minor fixes and improvements
#438
Changes from all commits
833f9e9
0558cbb
e9c7fa2
38b44e0
028ffc5
25b7195
f77ac1d
ac2aa13
caf3ce1
93428c8
00c1cca
6b32583
010fdb9
0a88b79
f925d75
83cd905
36ee97a
b54e1c3
d6cf52f
f12e359
daf6224
6d798b8
ea3c2ef
697c0a8
2ef32c3
d5bd0fe
8725e8f
0f210e1
d365bcd
cb90843
9c7e9d9
0a9c1e1
4020900
566c3ed
28e39c1
41a248d
cda41fb
6f7f541
f783067
3e59db8
64900ad
538e4e5
290fe4e
fb18016
8f2fffc
8762fcf
9078085
ebecb00
b0952eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,8 @@ class BlockDetection(dj.Computed): | |
-> acquisition.Environment | ||
""" | ||
|
||
key_source = acquisition.Environment - {"experiment_name": "social0.1-aeon3"} | ||
|
||
def make(self, key): | ||
"""On a per-chunk basis, check for the presence of new block, insert into Block table. | ||
|
@@ -88,8 +90,7 @@ def make(self, key): | |
blocks_df = block_state_df[block_state_df.pellet_ct == 0] | ||
# account for the double 0s - find any 0s that are within 1 second of each other, remove the 2nd one | ||
double_0s = blocks_df.index.to_series().diff().dt.total_seconds() < 1 | ||
# find the indices of the 2nd 0s and remove | ||
double_0s = double_0s.shift(-1).fillna(False) | ||
# keep the first 0s | ||
blocks_df = blocks_df[~double_0s] | ||
|
||
block_entries = [] | ||
|
@@ -144,8 +145,8 @@ class Patch(dj.Part): | |
wheel_timestamps: longblob | ||
patch_threshold: longblob | ||
patch_threshold_timestamps: longblob | ||
patch_rate: float | ||
patch_offset: float | ||
patch_rate=null: float | ||
patch_offset=null: float | ||
""" | ||
|
||
class Subject(dj.Part): | ||
|
@@ -181,17 +182,27 @@ def make(self, key): | |
streams.UndergroundFeederDepletionState, | ||
streams.UndergroundFeederDeliverPellet, | ||
streams.UndergroundFeederEncoder, | ||
tracking.SLEAPTracking, | ||
) | ||
for streams_table in streams_tables: | ||
if len(streams_table & chunk_keys) < len(streams_table.key_source & chunk_keys): | ||
raise ValueError( | ||
f"BlockAnalysis Not Ready - {streams_table.__name__} not yet fully ingested for block: {key}. Skipping (to retry later)..." | ||
) | ||
|
||
# Check if SLEAPTracking is ready, if not, see if BlobPosition can be used instead | ||
use_blob_position = False | ||
if len(tracking.SLEAPTracking & chunk_keys) < len(tracking.SLEAPTracking.key_source & chunk_keys): | ||
if len(tracking.BlobPosition & chunk_keys) < len(tracking.BlobPosition.key_source & chunk_keys): | ||
raise ValueError( | ||
f"BlockAnalysis Not Ready - SLEAPTracking (and BlobPosition) not yet fully ingested for block: {key}. Skipping (to retry later)..." | ||
) | ||
else: | ||
use_blob_position = True | ||
|
||
# Patch data - TriggerPellet, DepletionState, Encoder (distancetravelled) | ||
# For wheel data, downsample to 10Hz | ||
final_encoder_fs = 10 | ||
# For wheel data, downsample to 50Hz | ||
final_encoder_hz = 50 | ||
freq = 1 / final_encoder_hz * 1e3 # in ms | ||
|
||
maintenance_period = get_maintenance_periods(key["experiment_name"], block_start, block_end) | ||
|
||
|
@@ -233,51 +244,52 @@ def make(self, key): | |
encoder_df, maintenance_period, block_end, dropna=True | ||
) | ||
|
||
if depletion_state_df.empty: | ||
raise ValueError(f"No depletion state data found for block {key} - patch: {patch_name}") | ||
|
||
encoder_df["distance_travelled"] = -1 * analysis_utils.distancetravelled(encoder_df.angle) | ||
|
||
if len(depletion_state_df.rate.unique()) > 1: | ||
# multiple patch rates per block is unexpected, log a note and pick the first rate to move forward | ||
AnalysisNote.insert1( | ||
{ | ||
"note_timestamp": datetime.utcnow(), | ||
"note_type": "Multiple patch rates", | ||
"note": f"Found multiple patch rates for block {key} - patch: {patch_name} - rates: {depletion_state_df.rate.unique()}", | ||
} | ||
) | ||
# if all dataframes are empty, skip | ||
if pellet_ts_threshold_df.empty and depletion_state_df.empty and encoder_df.empty: | ||
continue | ||
|
||
patch_rate = depletion_state_df.rate.iloc[0] | ||
patch_offset = depletion_state_df.offset.iloc[0] | ||
# handles patch rate value being INF | ||
patch_rate = 999999999 if np.isinf(patch_rate) else patch_rate | ||
if encoder_df.empty: | ||
encoder_df["distance_travelled"] = 0 | ||
else: | ||
# -1 is for placement of magnetic encoder, where wheel movement actually decreases encoder | ||
encoder_df["distance_travelled"] = -1 * analysis_utils.distancetravelled(encoder_df.angle) | ||
encoder_df = encoder_df.resample(f"{freq}ms").first() | ||
|
||
if not depletion_state_df.empty: | ||
if len(depletion_state_df.rate.unique()) > 1: | ||
# multiple patch rates per block is unexpected, log a note and pick the first rate to move forward | ||
AnalysisNote.insert1( | ||
{ | ||
"note_timestamp": datetime.utcnow(), | ||
"note_type": "Multiple patch rates", | ||
"note": f"Found multiple patch rates for block {key} - patch: {patch_name} - rates: {depletion_state_df.rate.unique()}", | ||
} | ||
) | ||
|
||
encoder_fs = ( | ||
1 / encoder_df.index.to_series().diff().dt.total_seconds().median() | ||
) # mean or median? | ||
wheel_downsampling_factor = int(encoder_fs / final_encoder_fs) | ||
patch_rate = depletion_state_df.rate.iloc[0] | ||
patch_offset = depletion_state_df.offset.iloc[0] | ||
# handles patch rate value being INF | ||
patch_rate = 999999999 if np.isinf(patch_rate) else patch_rate | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it actually an issue if patch rate is inf? Does it cause some downstream issue? We do this as default when no env is loaded. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's due to MySQL |
||
else: | ||
logger.warning(f"No depletion state data found for block {key} - patch: {patch_name}") | ||
patch_rate = None | ||
patch_offset = None | ||
|
||
block_patch_entries.append( | ||
{ | ||
**key, | ||
"patch_name": patch_name, | ||
"pellet_count": len(pellet_ts_threshold_df), | ||
"pellet_timestamps": pellet_ts_threshold_df.pellet_timestamp.values, | ||
"wheel_cumsum_distance_travelled": encoder_df.distance_travelled.values[ | ||
::wheel_downsampling_factor | ||
], | ||
"wheel_timestamps": encoder_df.index.values[::wheel_downsampling_factor], | ||
"wheel_cumsum_distance_travelled": encoder_df.distance_travelled.values, | ||
"wheel_timestamps": encoder_df.index.values, | ||
"patch_threshold": pellet_ts_threshold_df.threshold.values, | ||
"patch_threshold_timestamps": pellet_ts_threshold_df.index.values, | ||
"patch_rate": patch_rate, | ||
"patch_offset": patch_offset, | ||
} | ||
) | ||
|
||
# update block_end if last timestamp of encoder_df is before the current block_end | ||
block_end = min(encoder_df.index[-1], block_end) | ||
|
||
# Subject data | ||
# Get all unique subjects that visited the environment over the entire exp; | ||
# For each subject, see 'type' of visit most recent to start of block | ||
|
@@ -288,27 +300,50 @@ def make(self, key): | |
& f'chunk_start <= "{chunk_keys[-1]["chunk_start"]}"' | ||
)[:block_start] | ||
subject_visits_df = subject_visits_df[subject_visits_df.region == "Environment"] | ||
subject_visits_df = subject_visits_df[~subject_visits_df.id.str.contains("Test", case=False)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sometimes we use other, non "Test" subjects as test subjects. Maybe the check should be, if the subject does not begin with 'baa' (can str.lower to check for regardless of case) ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, your suggestion would be a better check. |
||
subject_names = [] | ||
for subject_name in set(subject_visits_df.id): | ||
_df = subject_visits_df[subject_visits_df.id == subject_name] | ||
if _df.type.iloc[-1] != "Exit": | ||
subject_names.append(subject_name) | ||
|
||
if use_blob_position and len(subject_names) > 1: | ||
raise ValueError( | ||
f"Without SLEAPTracking, BlobPosition can only handle single-subject block. Found {len(subject_names)} subjects." | ||
) | ||
|
||
block_subject_entries = [] | ||
for subject_name in subject_names: | ||
# positions - query for CameraTop, identity_name matches subject_name, | ||
pos_query = ( | ||
streams.SpinnakerVideoSource | ||
* tracking.SLEAPTracking.PoseIdentity.proj("identity_name", part_name="anchor_part") | ||
* tracking.SLEAPTracking.Part | ||
& key | ||
& { | ||
"spinnaker_video_source_name": "CameraTop", | ||
"identity_name": subject_name, | ||
} | ||
& chunk_restriction | ||
) | ||
pos_df = fetch_stream(pos_query)[block_start:block_end] | ||
if use_blob_position: | ||
pos_query = ( | ||
streams.SpinnakerVideoSource | ||
* tracking.BlobPosition.Object | ||
& key | ||
& chunk_restriction | ||
& { | ||
"spinnaker_video_source_name": "CameraTop", | ||
"identity_name": subject_name | ||
} | ||
) | ||
pos_df = fetch_stream(pos_query)[block_start:block_end] | ||
pos_df["likelihood"] = np.nan | ||
# keep only rows with area between 0 and 1000 - likely artifacts otherwise | ||
pos_df = pos_df[(pos_df.area > 0) & (pos_df.area < 1000)] | ||
else: | ||
pos_query = ( | ||
streams.SpinnakerVideoSource | ||
* tracking.SLEAPTracking.PoseIdentity.proj("identity_name", part_name="anchor_part") | ||
* tracking.SLEAPTracking.Part | ||
& key | ||
& { | ||
"spinnaker_video_source_name": "CameraTop", | ||
"identity_name": subject_name, | ||
} | ||
& chunk_restriction | ||
) | ||
pos_df = fetch_stream(pos_query)[block_start:block_end] | ||
|
||
pos_df = filter_out_maintenance_periods(pos_df, maintenance_period, block_end) | ||
|
||
if pos_df.empty: | ||
|
@@ -345,8 +380,8 @@ def make(self, key): | |
{ | ||
**key, | ||
"block_duration": (block_end - block_start).total_seconds() / 3600, | ||
"patch_count": len(patch_keys), | ||
"subject_count": len(subject_names), | ||
"patch_count": len(block_patch_entries), | ||
"subject_count": len(block_subject_entries), | ||
} | ||
) | ||
self.Patch.insert(block_patch_entries) | ||
|
@@ -423,6 +458,17 @@ def make(self, key): | |
) | ||
subjects_positions_df.set_index("position_timestamps", inplace=True) | ||
|
||
# Ensure wheel_timestamps are of the same length across all patches | ||
wheel_lens = [len(p["wheel_timestamps"]) for p in block_patches] | ||
if len(set(wheel_lens)) > 1: | ||
max_diff = max(wheel_lens) - min(wheel_lens) | ||
if max_diff > 10: | ||
# if diff is more than 10 samples, raise error, this is unexpected, some patches crash? | ||
raise ValueError(f"Wheel data lengths are not consistent across patches ({max_diff} samples diff)") | ||
for p in block_patches: | ||
p["wheel_timestamps"] = p["wheel_timestamps"][: min(wheel_lens)] | ||
p["wheel_cumsum_distance_travelled"] = p["wheel_cumsum_distance_travelled"][: min(wheel_lens)] | ||
|
||
self.insert1(key) | ||
|
||
in_patch_radius = 130 # pixels | ||
|
@@ -541,7 +587,7 @@ def make(self, key): | |
| { | ||
"patch_name": patch["patch_name"], | ||
"subject_name": subject_name, | ||
"in_patch_timestamps": subject_in_patch.index.values, | ||
"in_patch_timestamps": subject_in_patch[in_patch[subject_name]].index.values, | ||
"in_patch_time": subject_in_patch_cum_time[-1], | ||
"pellet_count": len(subj_pellets), | ||
"pellet_timestamps": subj_pellets.index.values, | ||
|
@@ -1521,10 +1567,10 @@ def make(self, key): | |
foraging_bout_df = get_foraging_bouts(key) | ||
foraging_bout_df.rename( | ||
columns={ | ||
"subject_name": "subject", | ||
"bout_start": "start", | ||
"bout_end": "end", | ||
"pellet_count": "n_pellets", | ||
"subject": "subject_name", | ||
"start": "bout_start", | ||
"end": "bout_end", | ||
"n_pellets": "pellet_count", | ||
"cum_wheel_dist": "cum_wheel_dist", | ||
}, | ||
inplace=True, | ||
|
@@ -1540,7 +1586,7 @@ def make(self, key): | |
@schema | ||
class AnalysisNote(dj.Manual): | ||
definition = """ # Generic table to catch all notes generated during analysis | ||
note_timestamp: datetime | ||
note_timestamp: datetime(6) | ||
--- | ||
note_type='': varchar(64) | ||
note: varchar(3000) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add a comment saying something like -1 is for placement of magnetic encoder, where wheel movement actually decreases encoder value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done