Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug with GDAC data source and BGC dataset #418

Merged
merged 11 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions argopy/data_fetchers/gdac_data_processors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import numpy as np
import xarray as xr
import logging


log = logging.getLogger("argopy.gdac.data")


def pre_process_multiprof(
Expand Down
45 changes: 36 additions & 9 deletions argopy/extensions/params_data_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
import copy

from ..utils import to_list, list_core_parameters
from ..utils.transform import split_data_mode, merge_param_with_param_adjusted, filter_param_by_data_mode
from ..utils.transform import (
split_data_mode,
merge_param_with_param_adjusted,
filter_param_by_data_mode,
)
from ..stores import (
indexstore_pd as ArgoIndex,
) # make sure we work with a Pandas index store
Expand Down Expand Up @@ -43,10 +47,12 @@ class ParamsDataMode(ArgoAccessorExtension):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def compute(self, indexfs: Union[None, ArgoIndex]) -> xr.Dataset: # noqa: C901
"""Compute and add <PARAM>_DATA_MODE variables to a xarray dataset
def _compute_from_ArgoIndex(
self, indexfs: Union[None, ArgoIndex]
) -> xr.Dataset: # noqa: C901
"""Compute <PARAM>_DATA_MODE variables from ArgoIndex

This method consume a collection of points.
This method consumes a collection of points.

Parameters
----------
Expand All @@ -55,9 +61,9 @@ def compute(self, indexfs: Union[None, ArgoIndex]) -> xr.Dataset: # noqa: C901

Returns
-------
:class:`xr.Dataset`
:class:`xarray.Dataset`
"""
idx = copy.copy(indexfs) if isinstance(indexfs, ArgoIndex) else ArgoIndex()
idx = indexfs.copy(deep=True) if isinstance(indexfs, ArgoIndex) else ArgoIndex()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we have a new method copy for the ArgoIndex


def complete_df(this_df, params):
"""Add 'wmo', 'cyc' and '<param>_data_mode' columns to this dataframe"""
Expand Down Expand Up @@ -103,6 +109,7 @@ def print_etime(txt, t0):

profiles = self._argo.list_WMO_CYC
idx.search_wmo(self._argo.list_WMO)

params = [
p
for p in idx.read_params()
Expand Down Expand Up @@ -168,10 +175,30 @@ def print_etime(txt, t0):
self._obj = self._obj[np.sort(self._obj.data_vars)]
return self._obj

def split(self):
def compute(self, indexfs: Union[None, ArgoIndex]) -> xr.Dataset:
"""Compute <PARAM>_DATA_MODE variables"""
if "STATION_PARAMETERS" in self._obj and "PARAMETER_DATA_MODE" in self._obj:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not spot this subtility before

return split_data_mode(self._obj)
else:
return self._compute_from_ArgoIndex(indexfs=indexfs)

def split(self) -> xr.Dataset:
"""Convert PARAMETER_DATA_MODE(N_PROF, N_PARAM) into several <PARAM>_DATA_MODE(N_PROF) variables

Using the list of *PARAM* found in ``STATION_PARAMETERS``, this method will create ``N_PARAM``
new variables in the dataset ``<PARAM>_DATA_MODE(N_PROF)``.

The variable ``PARAMETER_DATA_MODE`` is drop from the dataset at the end of the process.

Returns
-------
:class:`xarray.Dataset`
"""
return split_data_mode(self._obj)

def merge(self, params: Union[str, List[str]] = "all", errors: str = "raise") -> xr.Dataset:
def merge(
self, params: Union[str, List[str]] = "all", errors: str = "raise"
) -> xr.Dataset:
"""Merge <PARAM> and <PARAM>_ADJUSTED variables according to DATA_MODE or <PARAM>_DATA_MODE

Merging is done as follows:
Expand Down Expand Up @@ -251,7 +278,7 @@ def filter(
logical: str = "and",
mask: bool = False,
errors: str = "raise",
):
) -> xr.Dataset:
"""Filter measurements according to parameters data mode

Filter the dataset to keep points where all or some of the parameters are in any of the data mode specified.
Expand Down
2 changes: 2 additions & 0 deletions argopy/fetchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ def __init__(self, mode: str = "", src: str = "", ds: str = "", **fetcher_kwargs
raise OptionValueError(
"The 'argovis' data source fetching is only available in 'standard' user mode"
)
if self._src == "gdac" and "bgc" in self._dataset_id:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont remember why this was removed

warnings.warn("BGC data support with the 'gdac' data source is still in Work In Progress")

@property
def _icon_user_mode(self):
Expand Down
99 changes: 88 additions & 11 deletions argopy/stores/argo_index_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Argo file index store prototype

"""

import copy
import numpy as np
import pandas as pd
import logging
Expand All @@ -11,6 +13,13 @@
from urllib.parse import urlparse
from typing import Union
from pathlib import Path
import sys


if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self

from ..options import OPTIONS
from ..errors import GdacPathError, S3PathError, InvalidDataset, OptionValueError
Expand Down Expand Up @@ -126,7 +135,7 @@ def __init__(
# Create a File Store to access index file:
self.cache = cache
self.cachedir = OPTIONS["cachedir"] if cachedir == "" else cachedir
timeout = OPTIONS["api_timeout"] if timeout == 0 else timeout
self.timeout = OPTIONS["api_timeout"] if timeout == 0 else timeout
self.fs = {}
if split_protocol(host)[0] is None:
self.fs["src"] = filestore(cache=cache, cachedir=cachedir)
Expand All @@ -153,7 +162,7 @@ def __init__(
port=0 if urlparse(host).port is None else urlparse(host).port,
cache=cache,
cachedir=cachedir,
timeout=timeout,
timeout=self.timeout,
block_size=1000 * (2**20),
)

Expand All @@ -167,7 +176,8 @@ def __init__(
raise S3PathError("This host (%s) is not alive !" % host)

self.fs["src"] = s3store(
cache=cache, cachedir=cachedir,
cache=cache,
cachedir=cachedir,
anon=not has_aws_credentials(),
)
self.skip_rows = 10
Expand Down Expand Up @@ -228,13 +238,15 @@ def __init__(
if self.fs["src"].exists(self.index_path + ".gz"):
self.index_file += ".gz"

if isinstance(self.fs['src'], s3store):
if isinstance(self.fs["src"], s3store):
# If the index host is on a S3 store, we add another file system that will bypass some
# search methods to improve performances.
self.fs["s3"] = get_a_s3index(self.convention)
# Adjust S3 bucket name and key with host and index file names:
self.fs["s3"].bucket_name = Path(split_protocol(self.host)[1]).parts[0]
self.fs["s3"].key = str(Path(*Path(split_protocol(self.host)[1]).parts[1:]) / self.index_file)
self.fs["s3"].key = str(
Path(*Path(split_protocol(self.host)[1]).parts[1:]) / self.index_file
)

# # CNAME internal manager to be able to chain search methods:
# self._cname = None
Expand All @@ -246,8 +258,10 @@ def __repr__(self):
summary.append("Convention: %s (%s)" % (self.convention, self.convention_title))
if hasattr(self, "index"):
summary.append("In memory: True (%i records)" % self.N_RECORDS)
elif 's3' in self.host:
summary.append("In memory: False [But there's no need to load the full index with a S3 host to make a search]")
elif "s3" in self.host:
summary.append(
"In memory: False [But there's no need to load the full index with a S3 host to make a search]"
)
else:
summary.append("In memory: False")

Expand Down Expand Up @@ -417,7 +431,7 @@ def N_RECORDS(self):
# Must work for all internal storage type (:class:`pyarrow.Table` or :class:`pandas.DataFrame`)
if hasattr(self, "index"):
return self.index.shape[0]
elif 's3' in self.host:
elif "s3" in self.host:
return np.Inf
else:
raise InvalidDataset("Load the index first !")
Expand Down Expand Up @@ -474,7 +488,9 @@ def _write(self, fs, path, obj, fmt="pq"):
if fmt == "parquet":
fmt = "pq"
if isinstance(fs, memorystore):
fs.fs.touch(this_path) # Fix for https://github.com/euroargodev/argopy/issues/345
fs.fs.touch(
this_path
) # Fix for https://github.com/euroargodev/argopy/issues/345
# fs.fs.touch(this_path) # Fix for https://github.com/euroargodev/argopy/issues/345
# This is an f* mystery to me, why do we need 2 calls to trigger file creation FOR REAL ????
# log.debug("memorystore touched this path before open context: '%s'" % this_path)
Expand Down Expand Up @@ -606,7 +622,7 @@ def get_filename(s, index):
from ..related import load_dict, mapp_dict

if nrows is not None:
df = df.loc[0: nrows - 1].copy()
df = df.loc[0 : nrows - 1].copy()

if "index" in df:
df.drop("index", axis=1, inplace=True)
Expand Down Expand Up @@ -900,7 +916,9 @@ def search_params(self, PARAMs: Union[str, list], logical: str):
raise NotImplementedError("Not implemented")

@abstractmethod
def search_parameter_data_mode(self, PARAMs: dict, logical: bool = 'and', nrows=None):
def search_parameter_data_mode(
self, PARAMs: dict, logical: bool = "and", nrows=None
):
"""Search index for profiles with a parameter in a specific data mode

Parameters
Expand Down Expand Up @@ -994,3 +1012,62 @@ def _insert_header(self, originalfile):
f.write(data)

return originalfile

def _copy(
self,
deep: bool = True,
) -> Self:
cls = self.__class__

if deep:
# Ensure complete independence between the original and the copied index:
obj = cls.__new__(cls)
obj.__init__(
host=copy.deepcopy(self.host),
index_file=copy.deepcopy(self.index_file),
timeout=copy.deepcopy(self.timeout),
cache=copy.deepcopy(self.cache),
cachedir=copy.deepcopy(self.cachedir),
)
if hasattr(self, "index"):
obj._nrows_index = copy.deepcopy(self._nrows_index)
obj.index = copy.deepcopy(self.index)
if self.cache:
obj.index_path_cache = copy.deepcopy(self.index_path_cache)

else:
obj = cls.__new__(cls)
obj.__init__(
host=copy.copy(self.host),
index_file=copy.copy(self.index_file),
timeout=copy.copy(self.timeout),
cache=copy.copy(self.cache),
cachedir=copy.copy(self.cachedir),
)
if hasattr(self, "index"):
obj._nrows_index = copy.copy(self._nrows_index)
obj.index = copy.copy(self.index)
if self.cache:
obj.index_path_cache = copy.copy(self.index_path_cache)

if hasattr(self, "search"):
obj.search_type = copy.copy(self.search_type)
obj.search_filter = copy.copy(self.search_filter)
obj.search = copy.copy(self.search)
if obj.cache:
obj.search_path_cache = copy.copy(self.search_path_cache)

return obj

def __copy__(self) -> Self:
return self._copy(deep=False)

def __deepcopy__(self) -> Self:
return self._copy(deep=True)

def copy(
self,
deep: bool = True,
) -> Self:
"""Returns a copy of this object."""
return self._copy(deep=deep)
44 changes: 35 additions & 9 deletions argopy/utils/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numpy as np
import xarray as xr
import pandas as pd
import logging
from typing import List, Union

Expand Down Expand Up @@ -340,6 +341,7 @@ def filter_param_by_data_mode(
return ds.loc[dict(N_POINTS=filter)] if len(filter) > 0 else ds



def split_data_mode(ds: xr.Dataset) -> xr.Dataset:
"""Convert PARAMETER_DATA_MODE(N_PROF, N_PARAM) into several <PARAM>_DATA_MODE(N_PROF) variables

Expand All @@ -352,24 +354,48 @@ def split_data_mode(ds: xr.Dataset) -> xr.Dataset:
-------
:class:`xr.Dataset`
"""
if ds.argo._type != "profile":
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raise an error to avoid passing through for the wrong reasons and not detecting it

raise InvalidDatasetStructure(
"Method only available to a collection of profiles"
)

if "STATION_PARAMETERS" in ds and "PARAMETER_DATA_MODE" in ds:

u64 = lambda s: "%s%s" % (s, " " * (64 - len(s))) # noqa: E731
params = [p.strip() for p in np.unique(ds["STATION_PARAMETERS"])]

def read_data_mode_for(ds: xr.Dataset, param: str) -> xr.DataArray:
"""Return data mode of a given parameter"""
da_masked = ds['PARAMETER_DATA_MODE'].where(ds['STATION_PARAMETERS'] == u64(param))

def _dropna(x):
# x('N_PARAM') is reduced to the first non nan value, a scalar, no dimension
y = pd.Series(x).dropna().tolist()
if len(y) == 0:
return ""
else:
return y[0]

kwargs = dict(
dask="parallelized",
input_core_dims=[["N_PARAM"]], # Function takes N_PARAM as input
output_core_dims=[[]], # Function reduces to a scalar (no dimension)
vectorize=True # Apply function element-wise along the other dimensions
)

dm = xr.apply_ufunc(_dropna, da_masked, **kwargs)
dm = dm.rename("%s_DATA_MODE" % param)
dm.attrs = ds['PARAMETER_DATA_MODE'].attrs
return dm

for param in params:
name = "%s_DATA_MODE" % param.replace("_PARAMETER", "").replace(
"PARAMETER_", ""
)
mask = ds["STATION_PARAMETERS"] == xr.full_like(
ds["STATION_PARAMETERS"],
u64(param),
dtype=ds["STATION_PARAMETERS"].dtype,
)
da = ds["PARAMETER_DATA_MODE"].where(mask, drop=True).isel(N_PARAM=0)
da = da.rename(name)
da = da.astype(ds["PARAMETER_DATA_MODE"].dtype)
ds[name] = da
if name == "_DATA_MODE":
log.error("This dataset has an error in 'STATION_PARAMETERS': it contains an empty string")
else:
ds[name] = read_data_mode_for(ds, param)

ds = ds.drop_vars("PARAMETER_DATA_MODE")
ds.argo.add_history("Transformed with 'split_data_mode'")
Expand Down
6 changes: 4 additions & 2 deletions argopy/xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ def point2profile(self, drop: bool = False) -> xr.Dataset: # noqa: C901

Returns
-------
:class:`xr.dataset`
:class:`xr.Dataset`

See Also
--------
Expand Down Expand Up @@ -563,9 +563,11 @@ def profile2point(self) -> xr.Dataset:
- A "point" is a location with unique (N_PROF, N_LEVELS) indexes
- A "profile" is a collection of points with an unique UID based on WMO, CYCLE_NUMBER and DIRECTION

Note that this method will systematically apply the :meth:`datamode.split` method.

Returns
-------
:class:`xr.dataset`
:class:`xr.Dataset`

Warnings
--------
Expand Down
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ Data Transformation
Dataset.argo.interp_std_levels
Dataset.argo.groupby_pressure_bins
Dataset.argo.datamode.merge
Dataset.argo.datamode.split


Data Filters
Expand Down
Loading