diff --git a/argopy/data_fetchers/gdac_data_processors.py b/argopy/data_fetchers/gdac_data_processors.py index 763e9b96..fc77025b 100644 --- a/argopy/data_fetchers/gdac_data_processors.py +++ b/argopy/data_fetchers/gdac_data_processors.py @@ -1,5 +1,9 @@ import numpy as np import xarray as xr +import logging + + +log = logging.getLogger("argopy.gdac.data") def pre_process_multiprof( diff --git a/argopy/extensions/params_data_mode.py b/argopy/extensions/params_data_mode.py index 9892b495..98ec9f54 100644 --- a/argopy/extensions/params_data_mode.py +++ b/argopy/extensions/params_data_mode.py @@ -6,7 +6,11 @@ import copy from ..utils import to_list, list_core_parameters -from ..utils.transform import split_data_mode, merge_param_with_param_adjusted, filter_param_by_data_mode +from ..utils.transform import ( + split_data_mode, + merge_param_with_param_adjusted, + filter_param_by_data_mode, +) from ..stores import ( indexstore_pd as ArgoIndex, ) # make sure we work with a Pandas index store @@ -43,10 +47,12 @@ class ParamsDataMode(ArgoAccessorExtension): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - def compute(self, indexfs: Union[None, ArgoIndex]) -> xr.Dataset: # noqa: C901 - """Compute and add _DATA_MODE variables to a xarray dataset + def _compute_from_ArgoIndex( + self, indexfs: Union[None, ArgoIndex] + ) -> xr.Dataset: # noqa: C901 + """Compute _DATA_MODE variables from ArgoIndex - This method consume a collection of points. + This method consumes a collection of points. Parameters ---------- @@ -55,9 +61,9 @@ def compute(self, indexfs: Union[None, ArgoIndex]) -> xr.Dataset: # noqa: C901 Returns ------- - :class:`xr.Dataset` + :class:`xarray.Dataset` """ - idx = copy.copy(indexfs) if isinstance(indexfs, ArgoIndex) else ArgoIndex() + idx = indexfs.copy(deep=True) if isinstance(indexfs, ArgoIndex) else ArgoIndex() def complete_df(this_df, params): """Add 'wmo', 'cyc' and '_data_mode' columns to this dataframe""" @@ -103,6 +109,7 @@ def print_etime(txt, t0): profiles = self._argo.list_WMO_CYC idx.search_wmo(self._argo.list_WMO) + params = [ p for p in idx.read_params() @@ -168,10 +175,30 @@ def print_etime(txt, t0): self._obj = self._obj[np.sort(self._obj.data_vars)] return self._obj - def split(self): + def compute(self, indexfs: Union[None, ArgoIndex]) -> xr.Dataset: + """Compute _DATA_MODE variables""" + if "STATION_PARAMETERS" in self._obj and "PARAMETER_DATA_MODE" in self._obj: + return split_data_mode(self._obj) + else: + return self._compute_from_ArgoIndex(indexfs=indexfs) + + def split(self) -> xr.Dataset: + """Convert PARAMETER_DATA_MODE(N_PROF, N_PARAM) into several _DATA_MODE(N_PROF) variables + + Using the list of *PARAM* found in ``STATION_PARAMETERS``, this method will create ``N_PARAM`` + new variables in the dataset ``_DATA_MODE(N_PROF)``. + + The variable ``PARAMETER_DATA_MODE`` is drop from the dataset at the end of the process. + + Returns + ------- + :class:`xarray.Dataset` + """ return split_data_mode(self._obj) - def merge(self, params: Union[str, List[str]] = "all", errors: str = "raise") -> xr.Dataset: + def merge( + self, params: Union[str, List[str]] = "all", errors: str = "raise" + ) -> xr.Dataset: """Merge and _ADJUSTED variables according to DATA_MODE or _DATA_MODE Merging is done as follows: @@ -251,7 +278,7 @@ def filter( logical: str = "and", mask: bool = False, errors: str = "raise", - ): + ) -> xr.Dataset: """Filter measurements according to parameters data mode Filter the dataset to keep points where all or some of the parameters are in any of the data mode specified. diff --git a/argopy/fetchers.py b/argopy/fetchers.py index b0e9306c..9c513d91 100755 --- a/argopy/fetchers.py +++ b/argopy/fetchers.py @@ -177,6 +177,8 @@ def __init__(self, mode: str = "", src: str = "", ds: str = "", **fetcher_kwargs raise OptionValueError( "The 'argovis' data source fetching is only available in 'standard' user mode" ) + if self._src == "gdac" and "bgc" in self._dataset_id: + warnings.warn("BGC data support with the 'gdac' data source is still in Work In Progress") @property def _icon_user_mode(self): diff --git a/argopy/stores/argo_index_proto.py b/argopy/stores/argo_index_proto.py index f40df73c..ac9eee82 100644 --- a/argopy/stores/argo_index_proto.py +++ b/argopy/stores/argo_index_proto.py @@ -2,6 +2,8 @@ Argo file index store prototype """ + +import copy import numpy as np import pandas as pd import logging @@ -11,6 +13,13 @@ from urllib.parse import urlparse from typing import Union from pathlib import Path +import sys + + +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self from ..options import OPTIONS from ..errors import GdacPathError, S3PathError, InvalidDataset, OptionValueError @@ -126,7 +135,7 @@ def __init__( # Create a File Store to access index file: self.cache = cache self.cachedir = OPTIONS["cachedir"] if cachedir == "" else cachedir - timeout = OPTIONS["api_timeout"] if timeout == 0 else timeout + self.timeout = OPTIONS["api_timeout"] if timeout == 0 else timeout self.fs = {} if split_protocol(host)[0] is None: self.fs["src"] = filestore(cache=cache, cachedir=cachedir) @@ -153,7 +162,7 @@ def __init__( port=0 if urlparse(host).port is None else urlparse(host).port, cache=cache, cachedir=cachedir, - timeout=timeout, + timeout=self.timeout, block_size=1000 * (2**20), ) @@ -167,7 +176,8 @@ def __init__( raise S3PathError("This host (%s) is not alive !" % host) self.fs["src"] = s3store( - cache=cache, cachedir=cachedir, + cache=cache, + cachedir=cachedir, anon=not has_aws_credentials(), ) self.skip_rows = 10 @@ -228,13 +238,15 @@ def __init__( if self.fs["src"].exists(self.index_path + ".gz"): self.index_file += ".gz" - if isinstance(self.fs['src'], s3store): + if isinstance(self.fs["src"], s3store): # If the index host is on a S3 store, we add another file system that will bypass some # search methods to improve performances. self.fs["s3"] = get_a_s3index(self.convention) # Adjust S3 bucket name and key with host and index file names: self.fs["s3"].bucket_name = Path(split_protocol(self.host)[1]).parts[0] - self.fs["s3"].key = str(Path(*Path(split_protocol(self.host)[1]).parts[1:]) / self.index_file) + self.fs["s3"].key = str( + Path(*Path(split_protocol(self.host)[1]).parts[1:]) / self.index_file + ) # # CNAME internal manager to be able to chain search methods: # self._cname = None @@ -246,8 +258,10 @@ def __repr__(self): summary.append("Convention: %s (%s)" % (self.convention, self.convention_title)) if hasattr(self, "index"): summary.append("In memory: True (%i records)" % self.N_RECORDS) - elif 's3' in self.host: - summary.append("In memory: False [But there's no need to load the full index with a S3 host to make a search]") + elif "s3" in self.host: + summary.append( + "In memory: False [But there's no need to load the full index with a S3 host to make a search]" + ) else: summary.append("In memory: False") @@ -417,7 +431,7 @@ def N_RECORDS(self): # Must work for all internal storage type (:class:`pyarrow.Table` or :class:`pandas.DataFrame`) if hasattr(self, "index"): return self.index.shape[0] - elif 's3' in self.host: + elif "s3" in self.host: return np.Inf else: raise InvalidDataset("Load the index first !") @@ -474,7 +488,9 @@ def _write(self, fs, path, obj, fmt="pq"): if fmt == "parquet": fmt = "pq" if isinstance(fs, memorystore): - fs.fs.touch(this_path) # Fix for https://github.com/euroargodev/argopy/issues/345 + fs.fs.touch( + this_path + ) # Fix for https://github.com/euroargodev/argopy/issues/345 # fs.fs.touch(this_path) # Fix for https://github.com/euroargodev/argopy/issues/345 # This is an f* mystery to me, why do we need 2 calls to trigger file creation FOR REAL ???? # log.debug("memorystore touched this path before open context: '%s'" % this_path) @@ -606,7 +622,7 @@ def get_filename(s, index): from ..related import load_dict, mapp_dict if nrows is not None: - df = df.loc[0: nrows - 1].copy() + df = df.loc[0 : nrows - 1].copy() if "index" in df: df.drop("index", axis=1, inplace=True) @@ -900,7 +916,9 @@ def search_params(self, PARAMs: Union[str, list], logical: str): raise NotImplementedError("Not implemented") @abstractmethod - def search_parameter_data_mode(self, PARAMs: dict, logical: bool = 'and', nrows=None): + def search_parameter_data_mode( + self, PARAMs: dict, logical: bool = "and", nrows=None + ): """Search index for profiles with a parameter in a specific data mode Parameters @@ -994,3 +1012,62 @@ def _insert_header(self, originalfile): f.write(data) return originalfile + + def _copy( + self, + deep: bool = True, + ) -> Self: + cls = self.__class__ + + if deep: + # Ensure complete independence between the original and the copied index: + obj = cls.__new__(cls) + obj.__init__( + host=copy.deepcopy(self.host), + index_file=copy.deepcopy(self.index_file), + timeout=copy.deepcopy(self.timeout), + cache=copy.deepcopy(self.cache), + cachedir=copy.deepcopy(self.cachedir), + ) + if hasattr(self, "index"): + obj._nrows_index = copy.deepcopy(self._nrows_index) + obj.index = copy.deepcopy(self.index) + if self.cache: + obj.index_path_cache = copy.deepcopy(self.index_path_cache) + + else: + obj = cls.__new__(cls) + obj.__init__( + host=copy.copy(self.host), + index_file=copy.copy(self.index_file), + timeout=copy.copy(self.timeout), + cache=copy.copy(self.cache), + cachedir=copy.copy(self.cachedir), + ) + if hasattr(self, "index"): + obj._nrows_index = copy.copy(self._nrows_index) + obj.index = copy.copy(self.index) + if self.cache: + obj.index_path_cache = copy.copy(self.index_path_cache) + + if hasattr(self, "search"): + obj.search_type = copy.copy(self.search_type) + obj.search_filter = copy.copy(self.search_filter) + obj.search = copy.copy(self.search) + if obj.cache: + obj.search_path_cache = copy.copy(self.search_path_cache) + + return obj + + def __copy__(self) -> Self: + return self._copy(deep=False) + + def __deepcopy__(self) -> Self: + return self._copy(deep=True) + + def copy( + self, + deep: bool = True, + ) -> Self: + """Returns a copy of this object.""" + return self._copy(deep=deep) diff --git a/argopy/utils/transform.py b/argopy/utils/transform.py index 1224f523..ca300a52 100644 --- a/argopy/utils/transform.py +++ b/argopy/utils/transform.py @@ -4,6 +4,7 @@ import numpy as np import xarray as xr +import pandas as pd import logging from typing import List, Union @@ -340,6 +341,7 @@ def filter_param_by_data_mode( return ds.loc[dict(N_POINTS=filter)] if len(filter) > 0 else ds + def split_data_mode(ds: xr.Dataset) -> xr.Dataset: """Convert PARAMETER_DATA_MODE(N_PROF, N_PARAM) into several _DATA_MODE(N_PROF) variables @@ -352,24 +354,48 @@ def split_data_mode(ds: xr.Dataset) -> xr.Dataset: ------- :class:`xr.Dataset` """ + if ds.argo._type != "profile": + raise InvalidDatasetStructure( + "Method only available to a collection of profiles" + ) + if "STATION_PARAMETERS" in ds and "PARAMETER_DATA_MODE" in ds: u64 = lambda s: "%s%s" % (s, " " * (64 - len(s))) # noqa: E731 params = [p.strip() for p in np.unique(ds["STATION_PARAMETERS"])] + def read_data_mode_for(ds: xr.Dataset, param: str) -> xr.DataArray: + """Return data mode of a given parameter""" + da_masked = ds['PARAMETER_DATA_MODE'].where(ds['STATION_PARAMETERS'] == u64(param)) + + def _dropna(x): + # x('N_PARAM') is reduced to the first non nan value, a scalar, no dimension + y = pd.Series(x).dropna().tolist() + if len(y) == 0: + return "" + else: + return y[0] + + kwargs = dict( + dask="parallelized", + input_core_dims=[["N_PARAM"]], # Function takes N_PARAM as input + output_core_dims=[[]], # Function reduces to a scalar (no dimension) + vectorize=True # Apply function element-wise along the other dimensions + ) + + dm = xr.apply_ufunc(_dropna, da_masked, **kwargs) + dm = dm.rename("%s_DATA_MODE" % param) + dm.attrs = ds['PARAMETER_DATA_MODE'].attrs + return dm + for param in params: name = "%s_DATA_MODE" % param.replace("_PARAMETER", "").replace( "PARAMETER_", "" ) - mask = ds["STATION_PARAMETERS"] == xr.full_like( - ds["STATION_PARAMETERS"], - u64(param), - dtype=ds["STATION_PARAMETERS"].dtype, - ) - da = ds["PARAMETER_DATA_MODE"].where(mask, drop=True).isel(N_PARAM=0) - da = da.rename(name) - da = da.astype(ds["PARAMETER_DATA_MODE"].dtype) - ds[name] = da + if name == "_DATA_MODE": + log.error("This dataset has an error in 'STATION_PARAMETERS': it contains an empty string") + else: + ds[name] = read_data_mode_for(ds, param) ds = ds.drop_vars("PARAMETER_DATA_MODE") ds.argo.add_history("Transformed with 'split_data_mode'") diff --git a/argopy/xarray.py b/argopy/xarray.py index a64f0b7c..8cabeb74 100644 --- a/argopy/xarray.py +++ b/argopy/xarray.py @@ -402,7 +402,7 @@ def point2profile(self, drop: bool = False) -> xr.Dataset: # noqa: C901 Returns ------- - :class:`xr.dataset` + :class:`xr.Dataset` See Also -------- @@ -563,9 +563,11 @@ def profile2point(self) -> xr.Dataset: - A "point" is a location with unique (N_PROF, N_LEVELS) indexes - A "profile" is a collection of points with an unique UID based on WMO, CYCLE_NUMBER and DIRECTION + Note that this method will systematically apply the :meth:`datamode.split` method. + Returns ------- - :class:`xr.dataset` + :class:`xr.Dataset` Warnings -------- diff --git a/docs/api.rst b/docs/api.rst index 50c9f323..78496e79 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -159,6 +159,7 @@ Data Transformation Dataset.argo.interp_std_levels Dataset.argo.groupby_pressure_bins Dataset.argo.datamode.merge + Dataset.argo.datamode.split Data Filters