Skip to content

Commit

Permalink
Merge pull request #97 from NeurodataWithoutBorders/remove-staging-area
Browse files Browse the repository at this point in the history
remove staging area functionality
  • Loading branch information
magland authored Sep 24, 2024
2 parents 9183af9 + 9e11c34 commit 5ee481b
Show file tree
Hide file tree
Showing 10 changed files with 11 additions and 610 deletions.
44 changes: 0 additions & 44 deletions examples/lindi_demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -138,50 +138,6 @@
"# Save the changes to a new .nwb.lindi.json file\n",
"client.write_lindi_file('new.nwb.lindi.json')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fe19e0f6-1c62-42e9-9af0-4a57c8a61364",
"metadata": {},
"outputs": [],
"source": [
"# Now load that file\n",
"client2 = lindi.LindiH5pyFile.from_lindi_file('new.nwb.lindi.json')\n",
"print(client2.attrs['new_attribute'])"
]
},
{
"cell_type": "markdown",
"id": "4a2addfc-58ed-4e79-9c64-b7ec95cb12f5",
"metadata": {},
"source": [
"### Add datasets to a .nwb.lindi.json file using a local staging area"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6e87640d-1927-43c1-89c1-c1274a11f185",
"metadata": {},
"outputs": [],
"source": [
"import lindi\n",
"\n",
"# URL of the remote .nwb.lindi.json file\n",
"url = 'https://lindi.neurosift.org/dandi/dandisets/000939/assets/56d875d6-a705-48d3-944c-53394a389c85/nwb.lindi.json'\n",
"\n",
"# Load the h5py-like client for the reference file system\n",
"# in read-write mode with a staging area\n",
"with lindi.StagingArea.create(base_dir='lindi_staging') as staging_area:\n",
" client = lindi.LindiH5pyFile.from_lindi_file(\n",
" url,\n",
" mode=\"r+\",\n",
" staging_area=staging_area\n",
" )\n",
" # add datasets to client using pynwb or other tools\n",
" # upload the changes to the remote .nwb.lindi.json file"
]
}
],
"metadata": {
Expand Down
34 changes: 0 additions & 34 deletions lindi/File/File.py

This file was deleted.

Empty file removed lindi/File/__init__.py
Empty file.
124 changes: 11 additions & 113 deletions lindi/LindiH5pyFile/LindiH5pyFile.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Union, Literal, Callable
from typing import Union, Literal
import os
import json
import tempfile
Expand All @@ -12,8 +12,6 @@
from .LindiH5pyReference import LindiH5pyReference
from .LindiReferenceFileSystemStore import LindiReferenceFileSystemStore

from ..LindiStagingStore.StagingArea import StagingArea
from ..LindiStagingStore.LindiStagingStore import LindiStagingStore, _apply_templates
from ..LindiH5ZarrStore.LindiH5ZarrStoreOpts import LindiH5ZarrStoreOpts

from ..LocalCache.LocalCache import LocalCache
Expand All @@ -26,10 +24,6 @@

LindiFileMode = Literal["r", "r+", "w", "w-", "x", "a"]

# Accepts a string path to a file, uploads (or copies) it somewhere, and returns a string URL
# (or local path)
UploadFileFunc = Callable[[str], str]


class LindiH5pyFile(h5py.File):
def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None, _close_source_tar_file_on_close: bool = False):
Expand All @@ -53,7 +47,7 @@ def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, Non
self._is_open = True

@staticmethod
def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None):
def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None):
"""
Create a LindiH5pyFile from a URL or path to a .lindi.json file.
Expand All @@ -62,7 +56,6 @@ def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area
return LindiH5pyFile.from_reference_file_system(
url_or_path,
mode=mode,
staging_area=staging_area,
local_cache=local_cache
)

Expand Down Expand Up @@ -108,7 +101,7 @@ def from_hdf5_file(
)

@staticmethod
def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None, _close_source_tar_file_on_close: bool = False):
def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None, _close_source_tar_file_on_close: bool = False):
"""
Create a LindiH5pyFile from a reference file system.
Expand All @@ -120,9 +113,6 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
be created.
mode : Literal["r", "r+", "w", "w-", "x", "a"], optional
The mode to open the file object in, by default "r".
staging_area : Union[StagingArea, None], optional
The staging area to use for writing data, preparing for upload. This
is only used in write mode, by default None.
local_cache : Union[LocalCache, None], optional
The local cache to use for caching data, by default None.
_source_url_or_path : Union[str, None], optional
Expand Down Expand Up @@ -152,7 +142,6 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
return LindiH5pyFile.from_reference_file_system(
data,
mode=mode,
staging_area=staging_area,
local_cache=local_cache,
_source_tar_file=tar_file,
_source_url_or_path=rfs,
Expand Down Expand Up @@ -193,7 +182,6 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
return LindiH5pyFile.from_reference_file_system(
data,
mode=mode,
staging_area=staging_area,
local_cache=local_cache,
_source_url_or_path=rfs,
_source_tar_file=tar_file,
Expand All @@ -208,11 +196,7 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
_source_tar_file=_source_tar_file
)
source_is_url = _source_url_or_path is not None and (_source_url_or_path.startswith("http://") or _source_url_or_path.startswith("https://"))
if staging_area:
if _source_tar_file and not source_is_url:
raise Exception("Cannot use staging area when source is a local tar file")
store = LindiStagingStore(base_store=store, staging_area=staging_area)
elif _source_url_or_path and _source_tar_file and not source_is_url:
if _source_url_or_path and _source_tar_file and not source_is_url:
store = LindiTarStore(base_store=store, tar_file=_source_tar_file)
return LindiH5pyFile.from_zarr_store(
store,
Expand Down Expand Up @@ -274,9 +258,6 @@ def to_reference_file_system(self):
if self._zarr_store is None:
raise Exception("Cannot convert to reference file system without zarr store")
zarr_store = self._zarr_store
if isinstance(zarr_store, LindiStagingStore):
zarr_store.consolidate_chunks()
zarr_store = zarr_store._base_store
if isinstance(zarr_store, LindiTarStore):
zarr_store = zarr_store._base_store
if isinstance(zarr_store, LindiH5ZarrStore):
Expand All @@ -289,58 +270,6 @@ def to_reference_file_system(self):
LindiReferenceFileSystemStore.use_templates_in_rfs(rfs_copy)
return rfs_copy

def upload(
self,
*,
on_upload_blob: UploadFileFunc,
on_upload_main: UploadFileFunc
):
"""
Consolidate the chunks in the staging area, upload them to a storage
system, updating the references in the base store, and then upload the
updated reference file system .json file.
Parameters
----------
on_upload_blob : StoreFileFunc
A function that takes a string path to a blob file, uploads or copies it
somewhere, and returns a string URL (or local path).
on_upload_main : StoreFileFunc
A function that takes a string path to the main .json file, stores
it somewhere, and returns a string URL (or local path).
Returns
-------
str
The URL (or local path) of the uploaded reference file system .json
file.
"""
rfs = self.to_reference_file_system()
blobs_to_upload = set()
# Get the set of all local URLs in rfs['refs']
for k, v in rfs['refs'].items():
if isinstance(v, list) and len(v) == 3:
url = _apply_templates(v[0], rfs.get('templates', {}))
if not url.startswith("http://") and not url.startswith("https://"):
local_path = url
blobs_to_upload.add(local_path)
# Upload each of the local blobs using the given upload function and get a mapping from
# the original file paths to the URLs of the uploaded files
blob_mapping = _upload_blobs(blobs_to_upload, on_upload_blob=on_upload_blob)
# Replace the local URLs in rfs['refs'] with URLs of the uploaded files
for k, v in rfs['refs'].items():
if isinstance(v, list) and len(v) == 3:
url1 = _apply_templates(v[0], rfs.get('templates', {}))
url2 = blob_mapping.get(url1, None)
if url2 is not None:
v[0] = url2
# Write the updated LINDI file to a temp directory and upload it
with tempfile.TemporaryDirectory() as tmpdir:
rfs_fname = f"{tmpdir}/rfs.lindi.json"
LindiReferenceFileSystemStore.use_templates_in_rfs(rfs)
_write_rfs_to_file(rfs=rfs, output_file_name=rfs_fname)
return on_upload_main(rfs_fname)

def write_lindi_file(self, filename: str, *, generation_metadata: Union[dict, None] = None):
"""
Write the reference file system to a lindi or .lindi.json file.
Expand Down Expand Up @@ -568,15 +497,6 @@ def require_dataset(self, name, shape, dtype, exact=False, **kwds):
raise Exception("Cannot require dataset in read-only mode")
return self._the_group.require_dataset(name, shape, dtype, exact=exact, **kwds)

##############################
# staging store
@property
def staging_store(self):
store = self._zarr_store
if not isinstance(store, LindiStagingStore):
return None
return store


def _download_file(url: str, filename: str) -> None:
headers = {
Expand Down Expand Up @@ -650,35 +570,6 @@ def _deep_copy(obj):
return obj


def _upload_blobs(
blobs: set,
*,
on_upload_blob: UploadFileFunc
) -> dict:
"""
Upload all the blobs in a set to a storage system and return a mapping from
the original file paths to the URLs of the uploaded files.
"""
blob_mapping = {}
for i, blob in enumerate(blobs):
size = os.path.getsize(blob)
print(f'Uploading blob {i + 1} of {len(blobs)} {blob} ({_format_size_bytes(size)})')
blob_url = on_upload_blob(blob)
blob_mapping[blob] = blob_url
return blob_mapping


def _format_size_bytes(size_bytes: int) -> str:
if size_bytes < 1024:
return f"{size_bytes} bytes"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / 1024 / 1024:.1f} MB"
else:
return f"{size_bytes / 1024 / 1024 / 1024:.1f} GB"


def _load_rfs_from_url(url: str):
file_size = _get_file_size_of_remote_file(url)
if file_size < 1024 * 1024 * 2:
Expand Down Expand Up @@ -832,3 +723,10 @@ def _update_internal_references_to_remote_tar_file(rfs: dict, remote_url: str, r
raise Exception(f"Unexpected length for reference: {len(v)}")

LindiReferenceFileSystemStore.use_templates_in_rfs(rfs)


def _apply_templates(x: str, templates: dict) -> str:
if '{{' in x and '}}' in x:
for key, val in templates.items():
x = x.replace('{{' + key + '}}', val)
return x
Loading

0 comments on commit 5ee481b

Please sign in to comment.