Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New ingestion_schemas + MANY minor fixes and improvements #438

Merged
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
833f9e9
feat: add one-off logic to ingest fullpose data for social02
ttngu207 Aug 8, 2024
0558cbb
Merge branch 'datajoint_pipeline' into dev_fullpose_for_social02
ttngu207 Aug 20, 2024
e9c7fa2
Create reingest_fullpose_sleap_data.py
ttngu207 Aug 21, 2024
38b44e0
Allow reading model metadata from local folder
glopesdev Sep 26, 2024
028ffc5
Avoid iterating over None
glopesdev Sep 26, 2024
25b7195
Avoid iterating over the config file twice
glopesdev Sep 26, 2024
f77ac1d
Avoid mixing dtypes with conditional assignment
glopesdev Sep 26, 2024
ac2aa13
Remove whitespace on blank line
glopesdev Sep 26, 2024
caf3ce1
Use replace function instead of explicit loop
glopesdev Sep 27, 2024
93428c8
Improve error logic when model metadata not found
glopesdev Sep 27, 2024
00c1cca
Test loading poses with local model metadata
glopesdev Sep 27, 2024
6b32583
Use all components other than time and device name
glopesdev Sep 27, 2024
010fdb9
Add regression test for poses with register prefix
glopesdev Oct 2, 2024
0a88b79
Infer base prefix from stream search pattern
glopesdev Oct 2, 2024
f925d75
Use full identity likelihood vectors in test data
glopesdev Oct 2, 2024
83cd905
Merge pull request #421 from SainsburyWellcomeCentre/gl-issue-418
glopesdev Oct 3, 2024
36ee97a
Update worker.py
ttngu207 Oct 10, 2024
b54e1c3
new readers and schemas for reduced data storage in db
jkbhagatio Oct 10, 2024
d6cf52f
updated tests
jkbhagatio Oct 10, 2024
f12e359
cleaned up linting for ruff
jkbhagatio Oct 10, 2024
daf6224
updated pandas and changed S to s lmao
jkbhagatio Oct 10, 2024
6d798b8
chore: code cleanup
ttngu207 Oct 15, 2024
ea3c2ef
Merge remote-tracking branch 'upstream/ingestion_readers_schemas' int…
ttngu207 Oct 15, 2024
697c0a8
chore: delete the obsolete `dataset` (replaced by `schemas`)
ttngu207 Oct 15, 2024
2ef32c3
chore: clean up `load_metadata`
ttngu207 Oct 16, 2024
d5bd0fe
feat(ingestion): use new `ingestion_schemas`
ttngu207 Oct 16, 2024
8725e8f
feat(streams): update streams with new ingestion_schemas
ttngu207 Oct 16, 2024
0f210e1
fix(ingestion_schemas): downsampling Encoder
ttngu207 Oct 16, 2024
d365bcd
fix(ingestion_schemas): minor fix in `_Encoder`, calling `super()` init
ttngu207 Oct 18, 2024
cb90843
fix(harp reader): remove rows where the index is zero
ttngu207 Oct 18, 2024
9c7e9d9
fix(BlockForaging): bugfix in col rename
ttngu207 Oct 18, 2024
0a9c1e1
fix(block_analysis): bugfix in extracting `subject_in_patch` time
ttngu207 Oct 21, 2024
4020900
feat(fetch_stream): flag to round to microseconds
ttngu207 Oct 21, 2024
566c3ed
fix(block_analysis): bugfix `in_patch_timestamps`
ttngu207 Oct 21, 2024
28e39c1
Update reingest_fullpose_sleap_data.py
ttngu207 Oct 21, 2024
41a248d
Create reingest_fullpose_sleap_data.py
ttngu207 Aug 21, 2024
cda41fb
Update reingest_fullpose_sleap_data.py
ttngu207 Oct 21, 2024
6f7f541
Merge branch 'dev_fullpose_for_social02' into datajoint_pipeline
ttngu207 Oct 21, 2024
f783067
fix: `social_02.Pose03` in `ingestion_schemas` only
ttngu207 Oct 21, 2024
3e59db8
Update reingest_fullpose_sleap_data.py
ttngu207 Oct 21, 2024
64900ad
Update reingest_fullpose_sleap_data.py
ttngu207 Oct 21, 2024
538e4e5
feat(tracking): add `BlobPositionTracking`
ttngu207 Oct 22, 2024
290fe4e
fix(block_analysis): various fixes and code improvements
ttngu207 Oct 22, 2024
fb18016
fix: improve logic to search for chunks in a given block
ttngu207 Oct 22, 2024
8f2fffc
feat(script): add script `sync_ingested_and_raw_epochs`
ttngu207 Oct 23, 2024
8762fcf
fix(sync_ingested_and_raw_epochs): minor code cleanup
ttngu207 Oct 24, 2024
9078085
fix(BlockSubjectAnalysis): handle edge case where the encoder data ar…
ttngu207 Oct 24, 2024
ebecb00
chore: minor fixes to address PR review comments
ttngu207 Oct 30, 2024
b0952eb
fix: address PR comments
ttngu207 Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(script): add script sync_ingested_and_raw_epochs
  • Loading branch information
ttngu207 committed Oct 23, 2024
commit 8f2fffc8e7ea2ccde889d47a1e390e27da0376fc
11 changes: 8 additions & 3 deletions aeon/dj_pipeline/scripts/reingest_fullpose_sleap_data.py
Original file line number Diff line number Diff line change
@@ -2,12 +2,17 @@
from aeon.dj_pipeline import acquisition, tracking

aeon_schemas = acquisition.aeon_schemas
logger = acquisition.logger


exp_key = {"experiment_name": "social0.3-aeon3"}
exp_key = {"experiment_name": "social0.2-aeon4"}


def find_chunks_to_reingest(exp_key, delete_not_fullpose=False):
"""
Find chunks with newly available full pose data to reingest.
If available, fullpose data can be found in `processed` folder
"""

device_name = "CameraTop"

devices_schema = getattr(
@@ -45,7 +50,7 @@ def find_chunks_to_reingest(exp_key, delete_not_fullpose=False):
else:
fullpose.append(key)

print(f"Fullpose: {len(fullpose)}\nNot fullpose: {len(not_fullpose)}")
logger.info(f"Fullpose: {len(fullpose)} | Not fullpose: {len(not_fullpose)}")

if delete_not_fullpose:
(tracking.SLEAPTracking & not_fullpose).delete()
73 changes: 73 additions & 0 deletions aeon/dj_pipeline/scripts/sync_ingested_and_raw_epochs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import datajoint as dj
from datetime import datetime

from aeon.dj_pipeline import acquisition, streams
from aeon.dj_pipeline.analysis import block_analysis

aeon_schemas = acquisition.aeon_schemas
logger = acquisition.logger

exp_key = {"experiment_name": "social0.2-aeon4"}


def find_orphaned_ingested_epochs(exp_key, delete_invalid_epochs=False):
"""
Find ingested epochs that are no longer valid
This is due to the raw epoch/chunk files/directories being deleted for whatever reason
(e.g. bad data, testing, etc.)
"""
raw_dir = acquisition.Experiment.get_data_directory(exp_key, "raw")
epoch_dirs = [d.name for d in raw_dir.glob("*T*") if d.is_dir()]

epoch_query = acquisition.Epoch.join(acquisition.EpochEnd, left=True) & exp_key

valid_epochs = epoch_query & f"epoch_dir in {tuple(epoch_dirs)}"
invalid_epochs = epoch_query - f"epoch_dir in {tuple(epoch_dirs)}"

logger.info(f"Valid Epochs: {len(valid_epochs)} | Invalid Epochs: {len(invalid_epochs)}")

if not invalid_epochs or not delete_invalid_epochs:
return

# delete blocks
# delete streams device installations
# delete epochs
invalid_blocks = []
for key in invalid_epochs.fetch("KEY"):
epoch_start, epoch_end = (invalid_epochs & key).fetch1("epoch_start", "epoch_end")
invalid_blocks.extend(
(block_analysis.Block
& exp_key
& f"block_start BETWEEN '{epoch_start}' AND '{epoch_end}'").fetch("KEY"))

# devices
invalid_devices_query = acquisition.EpochConfig.DeviceType & invalid_epochs
if invalid_devices_query:
logger.warning("Invalid devices found - please run the rest manually to confirm deletion")
logger.warning(invalid_devices_query)
return

device_types = set(invalid_devices_query.fetch("device_type"))
device_table_invalid_query = []
for device_type in device_types:
device_table = getattr(streams, device_type)
install_time_attr_name = next(n for n in device_table.primary_key if n.endswith("_install_time"))
invalid_device_query = device_table & invalid_epochs.proj(**{install_time_attr_name: "epoch_start"})
logger.debug(invalid_device_query)
device_table_invalid_query.append((device_table, invalid_device_query))

# delete
dj.conn().start_transaction()

with dj.config(safemode=False):
(block_analysis.Block & invalid_blocks).delete()
for device_table, invalid_query in device_table_invalid_query:
(device_table & invalid_query.fetch("KEY")).delete()
(acquisition.Epoch & invalid_epochs.fetch("KEY")).delete()

if dj.utils.user_choice("Commit deletes?", default="no") == "yes":
dj.conn().commit_transaction()
logger.info("Deletes committed.")
else:
dj.conn().cancel_transaction()
logger.info("Deletes cancelled")