Skip to content

Commit

Permalink
fix(dataset): additional logic and consistency fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nfrasser committed Dec 13, 2024
1 parent 0246f8a commit 5f4bac3
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions cryosparc/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,16 +681,17 @@ def _load_stream(
descr = filter_descr(header["dtype"], keep_prefixes=prefixes, keep_fields=fields)
field_names = {field[0] for field in descr}

# Calling addrows separately to minimizes column-based
# allocations, improves performance by ~20%
# Calling addrows separately to minimize column-based allocations,
# improves performance by ~20%
dset = cls.allocate(0, descr)
if header["length"] == 0:
return dset # no more data to load

data = dset._data
data.addrows(header["length"])

# If a dataset is empty, it won't have anything in its data section.
# Just the string heap at the end.
dtype = [] if header["length"] == 0 else header["dtype"]
loader = Stream(data)
for field in header["dtype"]:
for field in dtype:
colsize = u32intle(f.read(4))
if field[0] not in field_names:
# try to seek instead of read to reduce memory usage
Expand All @@ -699,10 +700,10 @@ def _load_stream(
buffer = f.read(colsize)
if field[0] in header["compressed_fields"]:
loader.decompress_col(field[0], buffer)
else:
mem = data.getbuf(field[0])
assert mem is not None, f"Failed to get {field[0]} memory"
mem[:] = buffer
continue
mem = data.getbuf(field[0])
assert mem is not None, f"Could not load stream (missing {field[0]} buffer)"
mem[:] = buffer

# Read in the string heap (rest of stream)
# NOTE: There will be a bug here for long column keys that are
Expand All @@ -726,16 +727,20 @@ async def from_async_stream(cls, stream: AsyncBinaryIO):
dset = cls.allocate(0, header["dtype"])
data = dset._data
data.addrows(header["length"])

# If a dataset is empty, it won't have anything in its data section.
# Just the string heap at the end.
dtype = [] if header["length"] == 0 else header["dtype"]
loader = Stream(data)
for field in header["dtype"]:
for field in dtype:
colsize = u32intle(await stream.read(4))
buffer = await stream.read(colsize)
if field[0] in header["compressed_fields"]:
loader.decompress_col(field[0], buffer)
else:
mem = data.getbuf(field[0])
assert mem is not None, f"Failed to get {field[0]} memory"
mem[:] = buffer
continue
mem = data.getbuf(field[0])
assert mem is not None, f"Could not load stream (missing {field[0]} buffer)"
mem[:] = buffer

heap = await stream.read()
data.setstrheap(heap)
Expand Down Expand Up @@ -805,16 +810,14 @@ def stream(self, compression: Literal["lz4", None] = None) -> Generator[bytes, N
yield u32bytesle(len(header))
yield header

if len(self) == 0:
return # empty dataset, don't yield anything

for f in self:
fields = [] if len(self) == 0 else self.fields()
for f in fields:
if f in compressed_fields:
# obj columns added to strheap and loaded as indexes
fielddata = stream.compress_col(f)
else:
fielddata = stream.stralloc_col(f) or data.getbuf(f)
assert fielddata is not None, f"Could not stream {f}"
assert fielddata is not None, f"Could not stream dataset (missing {f} buffer)"
yield u32bytesle(len(fielddata))
yield bytes(fielddata.memview)

Expand Down

0 comments on commit 5f4bac3

Please sign in to comment.