From f2889b7bccc97aa30e430dbeda3699ae5aa524b3 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Fri, 4 Oct 2024 07:59:44 -0500 Subject: [PATCH 1/3] fix(get_threshold_associated_pellets): more checks for empty data --- aeon/dj_pipeline/analysis/block_analysis.py | 53 ++++++++++++--------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/aeon/dj_pipeline/analysis/block_analysis.py b/aeon/dj_pipeline/analysis/block_analysis.py index 2e4e900e..f0d2d66e 100644 --- a/aeon/dj_pipeline/analysis/block_analysis.py +++ b/aeon/dj_pipeline/analysis/block_analysis.py @@ -1511,7 +1511,6 @@ class AnalysisNote(dj.Manual): # ---- Helper Functions ---- - def get_threshold_associated_pellets(patch_key, start, end): """Retrieve the pellet delivery timestamps associated with each patch threshold update within the specified start-end time. @@ -1538,35 +1537,53 @@ def get_threshold_associated_pellets(patch_key, start, end): """ chunk_restriction = acquisition.create_chunk_restriction(patch_key["experiment_name"], start, end) - # Get pellet delivery trigger data + # Step 1 - fetch data + # pellet delivery trigger delivered_pellet_df = fetch_stream( streams.UndergroundFeederDeliverPellet & patch_key & chunk_restriction )[start:end] - # Remove invalid rows where the time difference is less than 1.2 seconds - invalid_rows = delivered_pellet_df.index.to_series().diff().dt.total_seconds() < 1.2 - delivered_pellet_df = delivered_pellet_df[~invalid_rows] - - # Get beambreak data + # beambreak beambreak_df = fetch_stream(streams.UndergroundFeederBeamBreak & patch_key & chunk_restriction)[ start:end ] - # Remove invalid rows where the time difference is less than 1 second - invalid_rows = beambreak_df.index.to_series().diff().dt.total_seconds() < 1 - beambreak_df = beambreak_df[~invalid_rows] - # Exclude manual deliveries + # patch threshold + depletion_state_df = fetch_stream( + streams.UndergroundFeederDepletionState & patch_key & chunk_restriction + )[start:end] + # manual delivery manual_delivery_df = fetch_stream( streams.UndergroundFeederManualDelivery & patch_key & chunk_restriction )[start:end] + + # Return empty if no data + if delivered_pellet_df.empty or beambreak_df.empty or depletion_state_df.empty: + return acquisition.io_api._empty( + ["threshold", "offset", "rate", "pellet_timestamp", "beam_break_timestamp"] + ) + + # Step 2 - Remove invalid rows (back-to-back events) + # pellet delivery trigger - time difference is less than 1.2 seconds + invalid_rows = delivered_pellet_df.index.to_series().diff().dt.total_seconds() < 1.2 + delivered_pellet_df = delivered_pellet_df[~invalid_rows] + # exclude manual deliveries delivered_pellet_df = delivered_pellet_df.loc[ delivered_pellet_df.index.difference(manual_delivery_df.index) ] + # beambreak - time difference is less than 1 seconds + invalid_rows = beambreak_df.index.to_series().diff().dt.total_seconds() < 1 + beambreak_df = beambreak_df[~invalid_rows] + # patch threshold - time difference is less than 1 seconds + depletion_state_df = depletion_state_df.dropna(subset=["threshold"]) + invalid_rows = depletion_state_df.index.to_series().diff().dt.total_seconds() < 1 + depletion_state_df = depletion_state_df[~invalid_rows] - # Return empty if no pellets - if delivered_pellet_df.empty or beambreak_df.empty: + # Return empty if no data + if delivered_pellet_df.empty or beambreak_df.empty or depletion_state_df.empty: return acquisition.io_api._empty( ["threshold", "offset", "rate", "pellet_timestamp", "beam_break_timestamp"] ) + # Step 3 - event matching # Find pellet delivery triggers with matching beambreaks within 1.2s after each pellet delivery pellet_beam_break_df = ( pd.merge_asof( @@ -1582,16 +1599,6 @@ def get_threshold_associated_pellets(patch_key, start, end): ) pellet_beam_break_df.drop_duplicates(subset="beam_break_timestamp", keep="last", inplace=True) - # Get patch threshold data - depletion_state_df = fetch_stream( - streams.UndergroundFeederDepletionState & patch_key & chunk_restriction - )[start:end] - # Remove NaNs - depletion_state_df = depletion_state_df.dropna(subset=["threshold"]) - # Remove invalid rows where the time difference is less than 1 second - invalid_rows = depletion_state_df.index.to_series().diff().dt.total_seconds() < 1 - depletion_state_df = depletion_state_df[~invalid_rows] - # Find pellet delivery triggers that approximately coincide with each threshold update # i.e. nearest pellet delivery within 100ms before or after threshold update pellet_ts_threshold_df = ( From 747232bc21d3b9c88f11f59bcca42b15545c6796 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Fri, 4 Oct 2024 08:16:01 -0500 Subject: [PATCH 2/3] feat(block_analysis): add table for BlockForagingBout --- aeon/dj_pipeline/analysis/block_analysis.py | 41 +++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/aeon/dj_pipeline/analysis/block_analysis.py b/aeon/dj_pipeline/analysis/block_analysis.py index f0d2d66e..59a69bc3 100644 --- a/aeon/dj_pipeline/analysis/block_analysis.py +++ b/aeon/dj_pipeline/analysis/block_analysis.py @@ -1496,6 +1496,46 @@ def make(self, key): self.insert1(entry) +# ---- Foraging Bout Analysis ---- + +@schema +class BlockForaging(dj.Computed): + definition = """ + -> BlockSubjectAnalysis + --- + bout_count: int # number of foraging bouts in the block + """ + + class Bout(dj.Part): + definition = """ + -> master + -> BlockAnalysis.Subject + bout_start: datetime(6) + --- + bout_end: datetime(6) + bout_duration: float # (seconds) + pellet_count: int # number of pellets consumed during the bout + cum_wheel_dist: float # cumulative distance travelled during the bout + """ + + def make(self, key): + foraging_bout_df = get_foraging_bouts(key) + foraging_bout_df.rename( + columns={ + "subject_name": "subject", + "bout_start": "start", + "bout_end": "end", + "bout_duration": "duration", + "pellet_count": "n_pellets", + "cum_wheel_dist": "cum_wheel_dist", + }, + inplace=True, + ) + + self.insert1({**key, "bout_count": len(foraging_bout_df)}) + self.Bout.insert({**key, **row} for _, row in foraging_bout_df.iterrows()) + + # ---- AnalysisNote ---- @@ -1741,6 +1781,7 @@ def get_foraging_bouts( { "start": bout_starts_ends[:, 0], "end": bout_starts_ends[:, 1], + "duration": bout_durations, "n_pellets": bout_pellets, "cum_wheel_dist": bout_cum_wheel_dist, "subject": subject, From 24061c5d18e5576d519995a8b684b2ae0dbdf376 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Fri, 4 Oct 2024 09:13:16 -0500 Subject: [PATCH 3/3] fix(block_analysis): remove `bout_duration` --- aeon/dj_pipeline/analysis/block_analysis.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/aeon/dj_pipeline/analysis/block_analysis.py b/aeon/dj_pipeline/analysis/block_analysis.py index 59a69bc3..7e853a5b 100644 --- a/aeon/dj_pipeline/analysis/block_analysis.py +++ b/aeon/dj_pipeline/analysis/block_analysis.py @@ -1513,7 +1513,6 @@ class Bout(dj.Part): bout_start: datetime(6) --- bout_end: datetime(6) - bout_duration: float # (seconds) pellet_count: int # number of pellets consumed during the bout cum_wheel_dist: float # cumulative distance travelled during the bout """ @@ -1525,7 +1524,6 @@ def make(self, key): "subject_name": "subject", "bout_start": "start", "bout_end": "end", - "bout_duration": "duration", "pellet_count": "n_pellets", "cum_wheel_dist": "cum_wheel_dist", }, @@ -1781,7 +1779,6 @@ def get_foraging_bouts( { "start": bout_starts_ends[:, 0], "end": bout_starts_ends[:, 1], - "duration": bout_durations, "n_pellets": bout_pellets, "cum_wheel_dist": bout_cum_wheel_dist, "subject": subject,