Skip to content

Commit

Permalink
Improve perf and docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaze committed Nov 6, 2024
1 parent 5afa8cd commit 4186506
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 105 deletions.
30 changes: 10 additions & 20 deletions argopy/data_fetchers/erddap_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __init__( # noqa: C901
Parameters
----------
ds: str (optional)
ds: str, default = OPTIONS['ds']
Dataset to load: 'phy' or 'ref' or 'bgc-s'
cache: bool (optional)
Cache data or not (default: False)
Expand Down Expand Up @@ -128,6 +128,13 @@ def __init__( # noqa: C901
List of BGC essential variables that can't be NaN. If set to 'all', this is an easy way to reduce the size of the
:class:`xr.DataSet`` to points where all variables have been measured. Otherwise, provide a simple list of
variables.
Other parameters
----------------
server: str, default = OPTIONS['erddap']
URL to erddap server
mode: str, default = OPTIONS['mode']
"""
timeout = OPTIONS["api_timeout"] if api_timeout == 0 else api_timeout
self.definition = "Ifremer erddap Argo data fetcher"
Expand Down Expand Up @@ -707,11 +714,9 @@ def to_xarray( # noqa: C901
-------
:class:`xarray.Dataset`
"""

URI = self.uri # Call it once

# Should we compute (from the index) and add DATA_MODE for BGC variables:
add_dm = self.dataset_id in ["bgc", "bgc-s"] if add_dm is None else bool(add_dm)
# Pre-processor options:
preprocess_opts = {
"add_dm": False,
"URI": URI,
Expand All @@ -725,7 +730,7 @@ def to_xarray( # noqa: C901
"indexfs": self.indexfs,
}

# Download data
# Download and pre-process data:
results = []
if not self.parallelize:
if len(URI) == 1:
Expand Down Expand Up @@ -840,21 +845,6 @@ def to_xarray( # noqa: C901
[filtered.append(self.filter_measured(r)) for r in results]
results = filtered

# empty = []
# for v in self._bgc_vlist_measured:
# if v in results and np.count_nonzero(results[v]) != len(results["N_POINTS"]):
# empty.append(v)
# if len(empty) > 0:
# msg = (
# "After processing, your BGC request returned final data with NaNs (%s). "
# "This may be due to the 'measured' argument ('%s') that imposes a no-NaN constraint "
# "impossible to fulfill for the access point defined (%s)]. "
# "\nUsing the 'measured' argument, you can try to minimize the list of variables to "
# "return without NaNs, or set it to 'None' to return all samples."
# % (",".join(to_list(v)), ",".join(self._bgc_measured), self.cname())
# )
# raise ValueError(msg)

if concat and results is not None:
results["N_POINTS"] = np.arange(0, len(results["N_POINTS"]))

Expand Down
172 changes: 110 additions & 62 deletions argopy/data_fetchers/gdac_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
import warnings
import getpass
import logging
from typing import Literal

from ..utils.format import argo_split_path
from ..utils.decorators import deprecated
from ..options import OPTIONS, check_gdac_path, PARALLEL_SETUP
from ..errors import DataNotFound
from ..stores import ArgoIndex
from ..stores import ArgoIndex, has_distributed, distributed
from .proto import ArgoDataFetcherProto
from .gdac_data_processors import pre_process_multiprof, filter_points

Expand Down Expand Up @@ -56,24 +57,21 @@ def init(self, *args, **kwargs):
###
def __init__(
self,
gdac: str = "",
ds: str = "",
cache: bool = False,
cachedir: str = "",
dimension: str = "point",
errors: str = "raise",
parallel: bool = False,
progress: bool = False,
dimension: Literal['point', 'profile'] = "point",
errors: str = "raise",
api_timeout: int = 0,
**kwargs
):
"""Init fetcher
Parameters
----------
gdac: str (optional)
Path to the local or remote directory where the 'dac' folder is located
ds: str (optional)
ds: str, default = OPTIONS['ds']
Dataset to load: 'phy' or 'bgc'
cache: bool (optional)
Cache data or not (default: False)
Expand All @@ -97,12 +95,19 @@ def __init__(
Show a progress bar or not when fetching data.
api_timeout: int (optional)
Server request time out in seconds. Set to OPTIONS['api_timeout'] by default.
Other parameters
----------------
gdac: str, default = OPTIONS['gdac']
Path to the local or remote directory where the 'dac' folder is located
"""
self.timeout = OPTIONS["api_timeout"] if api_timeout == 0 else api_timeout
self.dataset_id = OPTIONS["ds"] if ds == "" else ds
self.user_mode = kwargs["mode"] if "mode" in kwargs else OPTIONS["mode"]
self.server = OPTIONS["gdac"] if gdac == "" else gdac
self.server = kwargs["gdac"] if "gdac" in kwargs else OPTIONS["gdac"]

self.errors = errors
self.dimension = dimension

# Validate server, raise GdacPathError if not valid.
check_gdac_path(self.server, errors="raise")
Expand All @@ -111,7 +116,7 @@ def __init__(
if self.dataset_id in ["bgc-s", "bgc-b"]:
index_file = self.dataset_id

# Validation of self.server is done by the ArgoIndex:
# Validation of self.server is done by the ArgoIndex instance:
self.indexfs = ArgoIndex(
host=self.server,
index_file=index_file,
Expand Down Expand Up @@ -146,10 +151,14 @@ def __repr__(self):
else:
summary.append("📕 Index: %s (not loaded)" % self.indexfs.index_file)
if hasattr(self.indexfs, "search"):
match = "matches" if self.N_FILES > 1 else "match"
match = "matches" if self.indexfs.N_MATCH > 1 else "match"
summary.append(
"📸 Index searched: True (%i %s, %0.4f%%)"
% (self.N_FILES, match, self.N_FILES * 100 / self.N_RECORDS)
% (
self.indexfs.N_MATCH,
match,
self.indexfs.N_MATCH * 100 / self.N_RECORDS,
)
)
else:
summary.append("📷 Index searched: False")
Expand Down Expand Up @@ -307,7 +316,13 @@ def _preprocess_multiprof(self, ds):
def pre_process(self, ds, *args, **kwargs):
return pre_process_multiprof(ds, *args, **kwargs)

def to_xarray(self, errors: str = "ignore"):
def to_xarray(
self,
errors: str = "ignore",
concat: bool = True,
concat_method: Literal["drop", "fill"] = "fill",
dimension: Literal['point', 'profile'] = "",
):
"""Load Argo data and return a :class:`xarray.Dataset`
Parameters
Expand All @@ -323,18 +338,22 @@ def to_xarray(self, errors: str = "ignore"):
-------
:class:`xarray.Dataset`
"""
URI = self.uri # Call it once
dimension = self.dimension if dimension == "" else dimension

if (
len(self.uri) > 50
len(URI) > 50
and not self.parallelize
and self.parallel_method == "sequential"
):
warnings.warn(
"Found more than 50 files to load, this may take a while to process sequentially ! "
"Consider using another data source (eg: 'erddap') or the 'parallel=True' option to improve processing time."
)
elif len(self.uri) == 0:
elif len(URI) == 0:
raise DataNotFound("No data found for: %s" % self.indexfs.cname)

# Pre-processor options:
if hasattr(self, "BOX"):
access_point = "BOX"
access_point_opts = {"BOX": self.BOX}
Expand All @@ -344,59 +363,88 @@ def to_xarray(self, errors: str = "ignore"):
elif hasattr(self, "WMO"):
access_point = "WMO"
access_point_opts = {"WMO": self.WMO}
preprocess_opts = {
"access_point": access_point,
"access_point_opts": access_point_opts,
"pre_filter_points": self._post_filter_points,
"dimension": dimension,
}

# Download and pre-process data:
ds = self.fs.open_mfdataset(
self.uri,
method=self.parallel_method,
concat_dim="N_POINTS",
concat=True,
preprocess=pre_process_multiprof,
preprocess_opts={
"access_point": access_point,
"access_point_opts": access_point_opts,
"pre_filter_points": self._post_filter_points,
},
progress=self.progress,
errors=errors,
open_dataset_opts={
opts = {
"progress": self.progress,
"errors": errors,
"concat": concat,
"concat_dim": "N_POINTS",
"preprocess": pre_process_multiprof,
"preprocess_opts": preprocess_opts,
}
if self.parallel_method in ["thread"]:
opts["method"] = "thread"
opts["open_dataset_opts"] = {
"xr_opts": {"decode_cf": 1, "use_cftime": 0, "mask_and_scale": 1}
},
)
}

# Meta-data processing:
ds["N_POINTS"] = np.arange(
0, len(ds["N_POINTS"])
) # Re-index to avoid duplicate values
ds = ds.set_coords("N_POINTS")
ds = ds.sortby("TIME")

# Remove netcdf file attributes and replace them with simplified argopy ones:
if "Fetched_from" not in ds.attrs:
raw_attrs = ds.attrs
ds.attrs = {}
ds.attrs.update({"raw_attrs": raw_attrs})
if self.dataset_id == "phy":
ds.attrs["DATA_ID"] = "ARGO"
if self.dataset_id in ["bgc", "bgc-s"]:
ds.attrs["DATA_ID"] = "ARGO-BGC"
ds.attrs["DOI"] = "http://doi.org/10.17882/42182"
ds.attrs["Fetched_from"] = self.server
try:
ds.attrs["Fetched_by"] = getpass.getuser()
except: # noqa: E722
ds.attrs["Fetched_by"] = "anonymous"
ds.attrs["Fetched_date"] = pd.to_datetime("now", utc=True).strftime(
"%Y/%m/%d"
)
elif (self.parallel_method in ["process"]) | (
has_distributed
and isinstance(self.parallel_method, distributed.client.Client)
):
opts["method"] = self.parallel_method
opts["open_dataset_opts"] = {
"errors": "ignore",
"download_url_opts": {"errors": "ignore"},
}
opts["progress"] = False

results = self.fs.open_mfdataset(URI, **opts)

if concat and results is not None:
if self.progress:
print("Final post-processing of the merged dataset ...")
# results = pre_process_multiprof(results, **preprocess_opts)
results = results.argo.cast_types(overwrite=False)

# Meta-data processing for a single merged dataset:
results = results.assign_coords({'N_POINTS': np.arange(0, len(results['N_POINTS']))})
results = results.sortby("TIME")

# Remove netcdf file attributes and replace them with simplified argopy ones:
if "Fetched_from" not in results.attrs:
raw_attrs = results.attrs

results.attrs = {}
if "Processing_history" in raw_attrs:
results.attrs.update({"Processing_history": raw_attrs["Processing_history"]})
raw_attrs.pop("Processing_history")
results.argo.add_history("URI merged with '%s'" % concat_method)

results.attrs.update({"raw_attrs": raw_attrs})
if self.dataset_id == "phy":
results.attrs["DATA_ID"] = "ARGO"
if self.dataset_id in ["bgc", "bgc-s"]:
results.attrs["DATA_ID"] = "ARGO-BGC"
results.attrs["DOI"] = "http://doi.org/10.17882/42182"
results.attrs["Fetched_from"] = self.server
try:
results.attrs["Fetched_by"] = getpass.getuser()
except: # noqa: E722
results.attrs["Fetched_by"] = "anonymous"
results.attrs["Fetched_date"] = pd.to_datetime(
"now", utc=True
).strftime("%Y/%m/%d")

results.attrs["Fetched_constraints"] = self.cname()
if len(self.uri) == 1:
results.attrs["Fetched_uri"] = self.uri[0]
else:
results.attrs["Fetched_uri"] = ";".join(self.uri)

ds.attrs["Fetched_constraints"] = self.cname()
if len(self.uri) == 1:
ds.attrs["Fetched_uri"] = self.uri[0]
if concat:
results.attrs = dict(sorted(results.attrs.items()))
else:
ds.attrs["Fetched_uri"] = ";".join(self.uri)

return ds
for ds in results:
ds.attrs = dict(sorted(ds.attrs.items()))
return results

@deprecated(
"Not serializable, please use 'gdac_data_processors.filter_points'",
Expand Down Expand Up @@ -563,4 +611,4 @@ def uri(self):
self._list_of_argo_files = URIs

self.N_FILES = len(self._list_of_argo_files)
return self._list_of_argo_files
return self._list_of_argo_files
Loading

0 comments on commit 4186506

Please sign in to comment.