Skip to content

Commit

Permalink
Serializable erddap processor
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaze committed Sep 26, 2024
1 parent f285e05 commit 944f56e
Show file tree
Hide file tree
Showing 2 changed files with 295 additions and 16 deletions.
60 changes: 44 additions & 16 deletions argopy/data_fetchers/erddap_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
from ..options import OPTIONS
from ..utils.format import format_oneline
from ..utils.lists import list_bgc_s_variables, list_core_parameters
from ..utils.decorators import deprecated
from ..errors import ErddapServerError, DataNotFound
from ..stores import httpstore, has_distributed, distributed
from ..stores import (
indexstore_pd as ArgoIndex,
) # make sure we work with the Pandas index store
from ..utils import is_list_of_strings, to_list, Chunker
from .proto import ArgoDataFetcherProto
from .erddap_data_proprocessor import post_process


log = logging.getLogger("argopy.erddap.data")
Expand Down Expand Up @@ -148,11 +150,11 @@ def __init__( # noqa: C901
if parallel:
if has_distributed and isinstance(parallel_method, distributed.client.Client):
warnings.warn("Using experimental Dask client as a parallelization method")
elif parallel_method not in ["thread", "seq", "erddap"]:
raise ValueError(
"erddap only support multi-threading, use 'thread' or 'erddap' instead of '%s'"
% parallel_method
)
# elif parallel_method not in ["thread", "seq", "erddap"]:
# raise ValueError(
# "erddap only support multi-threading, use 'thread' or 'erddap' instead of '%s'"
# % parallel_method
# )
self.parallel = parallel
self.parallel_method = parallel_method
self.progress = progress
Expand Down Expand Up @@ -258,6 +260,7 @@ def server(self, value):
log.debug("The erddap server has been modified, updating internal data")
self._init_erddapy()

@deprecated('Not serializable')
def _add_attributes(self, this): # noqa: C901
"""Add variables attributes not return by erddap requests (csv)
Expand Down Expand Up @@ -666,6 +669,7 @@ def getNfromncHeader(url):
N += getNfromncHeader(url)
return N

@deprecated("Not serializable !")
def post_process(
self, this_ds, add_dm: bool = True, URI: list = None
): # noqa: C901
Expand Down Expand Up @@ -788,12 +792,13 @@ def to_xarray( # noqa: C901
add_dm = self.dataset_id in ["bgc", "bgc-s"] if add_dm is None else bool(add_dm)

# Download data
results = []
if not self.parallel:
if len(URI) == 1:
try:
# log_argopy_callerstack()
results = self.fs.open_dataset(URI[0])
results = self.post_process(results, add_dm=add_dm, URI=URI)
results = post_process(results, add_dm=add_dm, URI=URI)
except ClientResponseError as e:
if "Proxy Error" in e.message:
raise ErddapServerError(
Expand All @@ -820,14 +825,14 @@ def to_xarray( # noqa: C901
errors=errors,
concat=concat,
concat_dim="N_POINTS",
preprocess=self.post_process,
preprocess=post_process,
preprocess_opts={"add_dm": False, "URI": URI},
final_opts={"data_vars": "all"},
)
if results is not None:
if self.progress:
print("Final post-processing of the merged dataset () ...")
results = self.post_process(
results = post_process(
results, **{"add_dm": add_dm, "URI": URI}
)
except ClientResponseError as e:
Expand All @@ -843,30 +848,53 @@ def to_xarray( # noqa: C901
errors=errors,
concat=concat,
concat_dim="N_POINTS",
preprocess=self.post_process,
preprocess=post_process,
preprocess_opts={"add_dm": False, "URI": URI},
final_opts={"data_vars": "all"},
)

elif has_distributed and isinstance(self.parallel_method, distributed.client.Client):
preprocess_opts = {'add_dm': False, 'dataset_id': self.dataset_id}
results = self.fs.open_mfdataset(
URI,
method=self.parallel_method,
progress=self.progress,
max_workers=max_workers,
open_dataset_opts={'errors': 'ignore', 'download_url_opts': {'errors': 'ignore'}},
progress=False,
errors=errors,
concat=concat,
concat_dim="N_POINTS",
preprocess=self.post_process,
preprocess_opts={"add_dm": False, "URI": URI},
preprocess=post_process,
preprocess_opts=preprocess_opts,
final_opts={"data_vars": "all"},
)

else:
preprocess_opts = {'add_dm': False, 'dataset_id': self.dataset_id}
results = self.fs.open_mfdataset(
URI,
method=self.parallel_method,
open_dataset_opts={'errors': 'ignore', 'download_url_opts': {'errors': 'ignore'}},
progress=False,
errors=errors,
concat=concat,
concat_dim="N_POINTS",
preprocess=post_process,
preprocess_opts=preprocess_opts,
final_opts={"data_vars": "all"},
)


if concat:
if results is not None:
if self.progress:
print("Final post-processing of the merged dataset () ...")
results = self.post_process(
results, **{"add_dm": add_dm, "URI": URI}
print("Final post-processing of the merged dataset ...")
preprocess_opts = {'add_dm': False, 'dataset_id': self.dataset_id}
if self.dataset_id in ['bgc', 'bgc-s']:
preprocess_opts = {'add_dm': True,
'dataset_id': self.dataset_id,
'indexfs': self.indexfs}
results = post_process(
results, **preprocess_opts
)
except DataNotFound:
if self.dataset_id in ["bgc", "bgc-s"] and len(self._bgc_vlist_measured) > 0:
Expand Down
251 changes: 251 additions & 0 deletions argopy/data_fetchers/erddap_data_proprocessor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
import numpy as np
import pandas as pd
import xarray as xr
import getpass
from urllib.parse import urlparse, parse_qs
from ..utils import to_list, UriCName
from ..utils import list_bgc_s_parameters


def post_process(
this_ds, add_dm: bool = True, dataset_id: str = 'phy', **kwargs
): # noqa: C901
"""Post-processor of a xarray.DataSet created from a netcdf erddap response
This method can also be applied on a regular dataset to re-enforce format compliance
This methdd must be seriablizable:
>>> from distributed.protocol import serialize
>>> from distributed.protocol.serialize import ToPickle
>>> serialize(ToPickle(post_process))
"""
if this_ds is None:
return None

if "row" in this_ds.dims:
this_ds = this_ds.rename({"row": "N_POINTS"})
elif "N_POINTS" in this_ds.dims:
this_ds["N_POINTS"] = np.arange(0, len(this_ds["N_POINTS"]))

# Set coordinates:
coords = ("LATITUDE", "LONGITUDE", "TIME", "N_POINTS")
this_ds = this_ds.reset_coords()
this_ds["N_POINTS"] = this_ds["N_POINTS"]

# Convert all coordinate variable names to upper case
for v in this_ds.data_vars:
this_ds = this_ds.rename({v: v.upper()})
this_ds = this_ds.set_coords(coords)

if dataset_id == "ref":
this_ds["DIRECTION"] = xr.full_like(this_ds["CYCLE_NUMBER"], "A", dtype=str)

# Cast data types:
this_ds = this_ds.argo.cast_types()
# if '999' in to_list(np.unique(this_ds['PLATFORM_NUMBER'].values)):
# log.error(this_ds.attrs)

# With BGC, some points may not have a PLATFORM_NUMBER !
# So, we remove these
if dataset_id in ["bgc", "bgc-s"] and "999" in to_list(
np.unique(this_ds["PLATFORM_NUMBER"].values)
):
this_ds = this_ds.where(this_ds["PLATFORM_NUMBER"] != "999", drop=True)
this_ds = this_ds.argo.cast_types(overwrite=True)

if dataset_id in ["bgc", "bgc-s"] and add_dm:
this_ds = this_ds.argo.datamode.compute(indexfs=kwargs['indexfs'])
this_ds = this_ds.argo.cast_types(overwrite=False)

# Overwrite Erddap attributes with those from Argo standards:
this_ds = _add_attributes(this_ds)

# In the case of a parallel download, this is a trick to preserve the chunk uri in the chunk dataset:
# (otherwise all chunks have the same list of uri)
# "history" is an attribute return by the erddap
if 'Fetched_uri' in this_ds.attrs:
Fetched_url = this_ds.attrs.get("Fetched_uri")
else:
Fetched_url = this_ds.attrs.get("history", "").split('\n')[-1].split(' ')[-1]
Fetched_constraints = UriCName(Fetched_url)

# Finally overwrite erddap attributes with those from argopy:
raw_attrs = this_ds.attrs.copy()
this_ds.attrs = {}
if dataset_id == "phy":
this_ds.attrs["DATA_ID"] = "ARGO"
this_ds.attrs["DOI"] = "http://doi.org/10.17882/42182"
elif dataset_id == "ref":
this_ds.attrs["DATA_ID"] = "ARGO_Reference"
this_ds.attrs["DOI"] = "-"
this_ds.attrs["Fetched_version"] = raw_attrs.get('version', '?')
elif dataset_id == "ref-ctd":
this_ds.attrs["DATA_ID"] = "ARGO_Reference_CTD"
this_ds.attrs["DOI"] = "-"
this_ds.attrs["Fetched_version"] = raw_attrs.get('version', '?')
elif dataset_id in ["bgc", "bgc-s"]:
this_ds.attrs["DATA_ID"] = "ARGO-BGC"
this_ds.attrs["DOI"] = "http://doi.org/10.17882/42182"

this_ds.attrs["Fetched_from"] = urlparse(Fetched_url).netloc
try:
this_ds.attrs["Fetched_by"] = getpass.getuser()
except: # noqa: E722
this_ds.attrs["Fetched_by"] = "anonymous"
this_ds.attrs["Fetched_date"] = pd.to_datetime("now", utc=True).strftime(
"%Y/%m/%d"
)
this_ds.attrs["Fetched_constraints"] = Fetched_constraints
this_ds.attrs["Fetched_uri"] = Fetched_url
this_ds = this_ds[np.sort(this_ds.data_vars)]

# if dataset_id in ["bgc", "bgc-s"]:
# n_zero = np.count_nonzero(np.isnan(np.unique(this_ds["PLATFORM_NUMBER"])))
# if n_zero > 0:
# log.error("Some points (%i) have no PLATFORM_NUMBER !" % n_zero)

return this_ds


def _add_attributes(this): # noqa: C901
"""Add variables attributes not return by erddap requests (csv)
This is hard coded, but should be retrieved from an API somewhere
"""

for v in this.data_vars:
param = "PRES"
if v in [param, "%s_ADJUSTED" % param, "%s_ADJUSTED_ERROR" % param]:
this[v].attrs = {
"long_name": "Sea Pressure",
"standard_name": "sea_water_pressure",
"units": "decibar",
"valid_min": 0.0,
"valid_max": 12000.0,
"resolution": 0.1,
"axis": "Z",
"casted": this[v].attrs["casted"]
if "casted" in this[v].attrs
else 0,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)

for v in this.data_vars:
param = "TEMP"
if v in [param, "%s_ADJUSTED" % param, "%s_ADJUSTED_ERROR" % param]:
this[v].attrs = {
"long_name": "SEA TEMPERATURE IN SITU ITS-90 SCALE",
"standard_name": "sea_water_temperature",
"units": "degree_Celsius",
"valid_min": -2.0,
"valid_max": 40.0,
"resolution": 0.001,
"casted": this[v].attrs["casted"]
if "casted" in this[v].attrs
else 0,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)

for v in this.data_vars:
param = "PSAL"
if v in [param, "%s_ADJUSTED" % param, "%s_ADJUSTED_ERROR" % param]:
this[v].attrs = {
"long_name": "PRACTICAL SALINITY",
"standard_name": "sea_water_salinity",
"units": "psu",
"valid_min": 0.0,
"valid_max": 43.0,
"resolution": 0.001,
"casted": this[v].attrs["casted"]
if "casted" in this[v].attrs
else 0,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)

for v in this.data_vars:
param = "DOXY"
if v in [param, "%s_ADJUSTED" % param, "%s_ADJUSTED_ERROR" % param]:
this[v].attrs = {
"long_name": "Dissolved oxygen",
"standard_name": "moles_of_oxygen_per_unit_mass_in_sea_water",
"units": "micromole/kg",
"valid_min": -5.0,
"valid_max": 600.0,
"resolution": 0.001,
"casted": this[v].attrs["casted"]
if "casted" in this[v].attrs
else 0,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)

for v in this.data_vars:
if "_QC" in v:
attrs = {
"long_name": "Global quality flag of %s profile" % v,
"conventions": "Argo reference table 2a",
"casted": this[v].attrs["casted"]
if "casted" in this[v].attrs
else 0,
}
this[v].attrs = attrs

if "CYCLE_NUMBER" in this.data_vars:
this["CYCLE_NUMBER"].attrs = {
"long_name": "Float cycle number",
"conventions": "0..N, 0 : launch cycle (if exists), 1 : first complete cycle",
"casted": this["CYCLE_NUMBER"].attrs["casted"]
if "casted" in this["CYCLE_NUMBER"].attrs
else 0,
}
if "DIRECTION" in this.data_vars:
this["DIRECTION"].attrs = {
"long_name": "Direction of the station profiles",
"conventions": "A: ascending profiles, D: descending profiles",
"casted": this["DIRECTION"].attrs["casted"]
if "casted" in this["DIRECTION"].attrs
else 0,
}

if "PLATFORM_NUMBER" in this.data_vars:
this["PLATFORM_NUMBER"].attrs = {
"long_name": "Float unique identifier",
"conventions": "WMO float identifier : A9IIIII",
"casted": this["PLATFORM_NUMBER"].attrs["casted"]
if "casted" in this["PLATFORM_NUMBER"].attrs
else 0,
}

if "DATA_MODE" in this.data_vars:
this["DATA_MODE"].attrs = {
"long_name": "Delayed mode or real time data",
"conventions": "R : real time; D : delayed mode; A : real time with adjustment",
"casted": this["DATA_MODE"].attrs["casted"]
if "casted" in this["DATA_MODE"].attrs
else 0,
}

for param in list_bgc_s_parameters():
if "%s_DATA_MODE" % param in this.data_vars:
this["%s_DATA_MODE" % param].attrs = {
"long_name": "Delayed mode or real time data",
"conventions": "R : real time; D : delayed mode; A : real time with adjustment",
"casted": this["%s_DATA_MODE" % param].attrs["casted"]
if "casted" in this["%s_DATA_MODE" % param].attrs
else 0,
}

return this

0 comments on commit 944f56e

Please sign in to comment.