Skip to content

Commit

Permalink
Merge pull request #32 from openearthplatforminitiative/fix/use-grib-…
Browse files Browse the repository at this point in the history
…caching

fix: move grib data to S3 and read from it
  • Loading branch information
giltinde authored May 6, 2024
2 parents d0c87da + c87c511 commit 7ecd475
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 3 deletions.
8 changes: 7 additions & 1 deletion data_pipelines/assets/flood/discharge.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from data_pipelines.partitions import discharge_partitions
from data_pipelines.resources.dask_resource import DaskResource
from data_pipelines.resources.glofas_resource import CDSClient
from data_pipelines.resources.io_managers import get_path_in_asset
from data_pipelines.resources.io_managers import (
copy_local_file_to_s3,
get_path_in_asset,
)
from data_pipelines.settings import settings
from data_pipelines.utils.flood.config import *
from data_pipelines.utils.flood.filter_by_upstream import apply_upstream_threshold
Expand Down Expand Up @@ -71,6 +74,9 @@ def raw_discharge(context: AssetExecutionContext, cds_client: CDSClient) -> None
out_path = get_path_in_asset(context, settings.tmp_storage, ".grib")
out_path.parent.mkdir(parents=True, exist_ok=True)
cds_client.fetch_data(request_params, out_path)
target_path = get_path_in_asset(context, settings.base_data_upath, ".grib")
target_path.mkdir(parents=True, exist_ok=True)
copy_local_file_to_s3(out_path, target_path)


@asset(
Expand Down
2 changes: 1 addition & 1 deletion data_pipelines/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@
"cds_client": CDSClient(
user_id=EnvVar("CDS_USER_ID"), api_key=EnvVar("CDS_API_KEY")
),
"grib_io_manager": GribDischargeIOManager(base_path=settings.tmp_storage),
"grib_io_manager": GribDischargeIOManager(base_path=settings.base_data_upath),
"netcdf_io_manager": NetdCDFIOManager(base_path=settings.base_data_upath),
}
17 changes: 16 additions & 1 deletion data_pipelines/resources/io_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ def get_path_in_io_manager(
return path.with_suffix(extension)


def copy_local_file_to_s3(local_file_path: str, target_upath: UPath):
# Use UPath to copy the local file to the S3 bucket
with open(local_file_path, "rb") as f:
with target_upath.open("wb") as s3_f:
s3_f.write(f.read())


class COGIOManager(UPathIOManager):
extension: str = ".tif"

Expand Down Expand Up @@ -139,7 +146,15 @@ def load_from_path(self, context: InputContext, path: UPath) -> xr.Dataset:
if isinstance(self.fs, LocalFileSystem):
ds_source = path
else:
ds_source = get_path_in_io_manager(context, self.base_path, self.extension)
# grib files can not be read directly from cloud storage.
# The file is instead cached and read locally
# ref: https://stackoverflow.com/questions/66229140/xarray-read-remote-grib-file-on-s3-using-cfgrib
ds_source = fsspec.open_local(
f"simplecache::{path}",
filecache={"cache_storage": "/tmp/files"},
**path.storage_options,
)
context.log.info(f"Reading raw discharge data from {ds_source}")

ds_cf = xr.open_dataset(
ds_source,
Expand Down

0 comments on commit 7ecd475

Please sign in to comment.