Skip to content

Commit

Permalink
Add metadata consolidation utility
Browse files Browse the repository at this point in the history
  • Loading branch information
jrbourbeau committed Aug 15, 2023
1 parent 95253c0 commit 0a6c7fa
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 0 deletions.
1 change: 1 addition & 0 deletions earthaccess/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .auth import Auth
from .search import DataCollections, DataGranules
from .store import Store
from .kerchunk import consolidate_metadata

__all__ = [
"login",
Expand Down
65 changes: 65 additions & 0 deletions earthaccess/kerchunk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import annotations

from dask.base import flatten
from dask.distributed import default_client, progress, Client, Worker, WorkerPlugin
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr

import earthaccess
from .auth import Auth


def get_chunk_metadata(
granuale: earthaccess.results.DataGranule, access: str
) -> list[dict]:
if access == "direct":
fs_data = earthaccess.get_s3fs_session(provider=granuale["meta"]["provider-id"])
else:
fs_data = earthaccess.get_fsspec_https_session()

metadata = []
for url in granuale.data_links(access=access):
with fs_data.open(url) as inf:
h5chunks = SingleHdf5ToZarr(inf, url)
m = h5chunks.translate()
metadata.append(m)
return metadata


class EarthAccessAuth(WorkerPlugin):
name = "earthaccess-auth"

def __init__(self, auth: Auth):
self.auth = auth

def setup(self, worker: Worker) -> None:
if not earthaccess.__auth__.authenticated:
earthaccess.__auth__ = self.auth
earthaccess.login()


def consolidate_metadata(
granuales: list[earthaccess.results.DataGranule],
outfile: str,
storage_options: dict | None = None,
kerchunk_options: dict | None = None,
access: str = "direct",
client: Client | None = None,
) -> str:
if client is None:
client = default_client()

# Make sure cluster is authenticated
client.register_worker_plugin(EarthAccessAuth(earthaccess.__auth__))

# Write out metadata file for each granuale
futures = client.map(get_chunk_metadata, granuales, access=access)
progress(futures)
chunks = client.gather(futures)
chunks = list(flatten(chunks))

# Write combined metadata file
mzz = MultiZarrToZarr(chunks, **kerchunk_options)
mzz.translate(outfile, storage_options=storage_options or {})

return outfile
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ s3fs = ">=2021.11, <2024"
fsspec = ">=2022.1"
tinynetrc = "^1.3.1"
multimethod = ">=1.8"
kerchunk = ">=0.1.2"
dask = {extras = ["complete"], version = ">=2023.8.0"}


[tool.poetry.dev-dependencies]
Expand Down

0 comments on commit 0a6c7fa

Please sign in to comment.