Skip to content

Commit

Permalink
Merge pull request #392 from euroargodev/dask-ok
Browse files Browse the repository at this point in the history
Make data fetching scalable with Dask
  • Loading branch information
gmaze authored Oct 15, 2024
2 parents ff63703 + b852a65 commit f405296
Show file tree
Hide file tree
Showing 172 changed files with 10,095 additions and 3,500 deletions.
9 changes: 5 additions & 4 deletions .github/workflows/pytests-upstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ jobs:
run: |
echo "CONDA_ENV_FILE=ci/requirements/py${{matrix.python-version}}-core-free.yml" >> $GITHUB_ENV
echo "PYTHON_VERSION=${{ matrix.python-version }}" >> $GITHUB_ENV
echo "LOG_FILE=argopy-tests-Core-Free-Py${{matrix.python-version}}-${{matrix.os}}.log" >> $GITHUB_ENV
- name: Setup Micromamba ${{ matrix.python-version }}
uses: mamba-org/setup-micromamba@v1
Expand All @@ -110,8 +111,8 @@ jobs:
environment-name: argopy-tests
environment-file: ${{ env.CONDA_ENV_FILE }}
init-shell: bash
cache-environment: true
cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
cache-environment: false
# cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
create-args: >-
python=${{matrix.python-version}}
Expand Down Expand Up @@ -237,8 +238,8 @@ jobs:
environment-name: argopy-tests
environment-file: ${{ env.CONDA_ENV_FILE }}
init-shell: bash
cache-environment: true
cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
cache-environment: false
# cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
create-args: >-
python=${{matrix.python-version}}
pytest-reportlog
Expand Down
26 changes: 19 additions & 7 deletions .github/workflows/pytests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ jobs:
environment-name: argopy-tests
environment-file: ${{ env.CONDA_ENV_FILE }}
init-shell: bash
cache-environment: true
cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
cache-environment: false
# cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
create-args: >-
python=${{matrix.python-version}}
Expand Down Expand Up @@ -119,7 +119,7 @@ jobs:
- name: Test with pytest
run: |
pytest -ra -v -s -c argopy/tests/pytest.ini --durations=10 --cov=./ --cov-config=.coveragerc --cov-report xml:cov.xml --cov-report term-missing --log-file=${{env.LOG_FILE}}
pytest -ra -v -s -c argopy/tests/pytest.ini --durations=10 --cov=./ --cov-config=.coveragerc --cov-report xml:cov.xml --cov-report term-missing --log-file=${{env.LOG_FILE}} --junitxml=junit.xml -o junit_family=legacy
- name: Energy Estimation - Measure Tests Exec
if: ${{matrix.os == 'ubuntu-latest'}}
Expand All @@ -141,6 +141,12 @@ jobs:
fail_ci_if_error: false
env_vars: RUNNER_OS,PYTHON_VERSION

- name: Upload test results to Codecov
if: ${{ !cancelled() }}
uses: codecov/test-results-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}

- name: Save tests log as artifact
uses: actions/upload-artifact@v4
with:
Expand Down Expand Up @@ -211,8 +217,8 @@ jobs:
environment-name: argopy-tests
environment-file: ${{ env.CONDA_ENV_FILE }}
init-shell: bash
cache-environment: true
cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
cache-environment: false
# cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
create-args: >-
python=${{matrix.python-version}}
Expand Down Expand Up @@ -242,7 +248,7 @@ jobs:
- name: Test with pytest
run: |
pytest -ra -v -s -c argopy/tests/pytest.ini --durations=10 --cov=./ --cov-config=.coveragerc --cov-report xml:cov.xml --cov-report term-missing --log-file=${{env.LOG_FILE}}
pytest -ra -v -s -c argopy/tests/pytest.ini --durations=10 --cov=./ --cov-config=.coveragerc --cov-report xml:cov.xml --cov-report term-missing --log-file=${{env.LOG_FILE}} --junitxml=junit.xml -o junit_family=legacy
- name: Energy Estimation - Measure Tests Exec
if: ${{matrix.os == 'ubuntu-latest'}}
Expand All @@ -263,7 +269,13 @@ jobs:
name: codecov-github
fail_ci_if_error: false
env_vars: RUNNER_OS,PYTHON_VERSION


- name: Upload test results to Codecov
if: ${{ !cancelled() }}
uses: codecov/test-results-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}

- name: Upload tests log to GA
uses: actions/upload-artifact@v4
with:
Expand Down
4 changes: 2 additions & 2 deletions argopy/data_fetchers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"erddap_data",
"erddap_index",
"argovis_data",
"gdac_data.py",
"gdac_index.py",
"gdac_data",
"gdac_index",
"CTDRefDataFetcher",
)
161 changes: 26 additions & 135 deletions argopy/data_fetchers/argovis_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
import warnings

from ..stores import httpstore
from ..options import OPTIONS, DEFAULT
from ..options import OPTIONS, DEFAULT, VALIDATE, PARALLEL_SETUP
from ..utils.format import format_oneline
from ..utils.chunking import Chunker
from ..utils.decorators import deprecated
from ..errors import DataNotFound
from .proto import ArgoDataFetcherProto

from .argovis_data_processors import pre_process, add_attributes

access_points = ["wmo", "box"]
exit_formats = ["xarray"]
Expand Down Expand Up @@ -49,7 +50,6 @@ def __init__(
cache: bool = False,
cachedir: str = "",
parallel: bool = False,
parallel_method: str = "thread",
progress: bool = False,
chunks: str = "auto",
chunks_maxsize: dict = {},
Expand All @@ -66,10 +66,13 @@ def __init__(
Cache data or not (default: False)
cachedir: str (optional)
Path to cache folder
parallel: bool (optional)
Chunk request to use parallel fetching (default: False)
parallel_method: str (optional)
Define the parallelization method: ``thread``, ``process`` or a :class:`dask.distributed.client.Client`.
parallel: bool, str, :class:`distributed.Client`, default: False
Set whether to use parallelization or not, and possibly which method to use.
Possible values:
- ``False``: no parallelization is used
- ``True``: use default method specified by the ``parallel_default_method`` option
- any other values accepted by the ``parallel_default_method`` option
progress: bool (optional)
Show a progress bar or not when ``parallel`` is set to True.
chunks: 'auto' or dict of integers (optional)
Expand All @@ -96,16 +99,8 @@ def __init__(
}
self.fs = kwargs["fs"] if "fs" in kwargs else httpstore(**self.store_opts)

if not isinstance(parallel, bool):
parallel_method = parallel
parallel = True
if parallel_method not in ["thread"]:
raise ValueError(
"argovis only support multi-threading, use 'thread' instead of '%s'"
% parallel_method
)
self.parallel = parallel
self.parallel_method = parallel_method
self.parallelize, self.parallel_method = PARALLEL_SETUP(parallel)

self.progress = progress
self.chunks = chunks
self.chunks_maxsize = chunks_maxsize
Expand Down Expand Up @@ -149,105 +144,6 @@ def _add_history(self, this, txt):
this.attrs["history"] = txt
return this

def _add_attributes(self, this): # noqa: C901
"""Add variables attributes not return by argovis requests
#todo: This is hard coded, but should be retrieved from an API somewhere
"""
for v in this.data_vars:
if "TEMP" in v and "_QC" not in v:
this[v].attrs = {
"long_name": "SEA TEMPERATURE IN SITU ITS-90 SCALE",
"standard_name": "sea_water_temperature",
"units": "degree_Celsius",
"valid_min": -2.0,
"valid_max": 40.0,
"resolution": 0.001,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)

for v in this.data_vars:
if "PSAL" in v and "_QC" not in v:
this[v].attrs = {
"long_name": "PRACTICAL SALINITY",
"standard_name": "sea_water_salinity",
"units": "psu",
"valid_min": 0.0,
"valid_max": 43.0,
"resolution": 0.001,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)

for v in this.data_vars:
if "PRES" in v and "_QC" not in v:
this[v].attrs = {
"long_name": "Sea Pressure",
"standard_name": "sea_water_pressure",
"units": "decibar",
"valid_min": 0.0,
"valid_max": 12000.0,
"resolution": 0.1,
"axis": "Z",
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)

for v in this.data_vars:
if "DOXY" in v and "_QC" not in v:
this[v].attrs = {
"long_name": "Dissolved oxygen",
"standard_name": "moles_of_oxygen_per_unit_mass_in_sea_water",
"units": "micromole/kg",
"valid_min": -5.0,
"valid_max": 600.0,
"resolution": 0.001,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)

for v in this.data_vars:
if "_QC" in v:
attrs = {
"long_name": "Global quality flag of %s profile" % v,
"convention": "Argo reference table 2a",
}
this[v].attrs = attrs

if "CYCLE_NUMBER" in this.data_vars:
this["CYCLE_NUMBER"].attrs = {
"long_name": "Float cycle number",
"convention": "0..N, 0 : launch cycle (if exists), 1 : first complete cycle",
}

if "DATA_MODE" in this.data_vars:
this["DATA_MODE"].attrs = {
"long_name": "Delayed mode or real time data",
"convention": "R : real time; D : delayed mode; A : real time with adjustment",
}

if "DIRECTION" in this.data_vars:
this["DIRECTION"].attrs = {
"long_name": "Direction of the station profiles",
"convention": "A: ascending profiles, D: descending profiles",
}

if "PLATFORM_NUMBER" in this.data_vars:
this["PLATFORM_NUMBER"].attrs = {
"long_name": "Float unique identifier",
"convention": "WMO float identifier : A9IIIII",
}

return this

@property
def cachepath(self):
Expand All @@ -269,6 +165,7 @@ def safe_for_fsspec_cache(url):

return [safe_for_fsspec_cache(url) for url in urls]

@deprecated('Not serializable')
def json2dataframe(self, profiles):
"""convert json data to Pandas DataFrame"""
# Make sure we deal with a list
Expand Down Expand Up @@ -312,38 +209,34 @@ def json2dataframe(self, profiles):
df = pd.DataFrame(rows)
return df

def to_dataframe(self, errors: str = "ignore"):
def to_dataframe(self, errors: str = "ignore") -> pd.DataFrame:
"""Load Argo data and return a Pandas dataframe"""

# Download data:
if not self.parallel:
method = "sequential"
else:
method = self.parallel_method
preprocess_opts = {'key_map': self.key_map}
df_list = self.fs.open_mfjson(
self.uri,
method=method,
preprocess=self.json2dataframe,
method=self.parallel_method,
preprocess=pre_process,
preprocess_opts=preprocess_opts,
progress=self.progress,
errors=errors,
)

# Merge results (list of dataframe):
for i, df in enumerate(df_list):
df = df.reset_index()
df = df.rename(columns=self.key_map)
df = df[[value for value in self.key_map.values() if value in df.columns]]
df_list[i] = df
df = pd.concat(df_list, ignore_index=True)
if df.shape[0] == 0:
raise DataNotFound("No data found for: %s" % self.cname())

df.sort_values(by=["TIME", "PRES"], inplace=True)
df["N_POINTS"] = np.arange(0, len(df["N_POINTS"]))
df = df.set_index(["N_POINTS"])
return df

def to_xarray(self, errors: str = "ignore"):
def to_xarray(self, errors: str = "ignore") -> xr.Dataset:
"""Download and return data as xarray Datasets"""
ds = self.to_dataframe(errors=errors).to_xarray()
# ds["TIME"] = pd.to_datetime(ds["TIME"], utc=True)
ds = ds.sortby(
["TIME", "PRES"]
) # should already be sorted by date in descending order
Expand All @@ -352,7 +245,6 @@ def to_xarray(self, errors: str = "ignore"):
) # Re-index to avoid duplicate values

# Set coordinates:
# ds = ds.set_coords('N_POINTS')
coords = ("LATITUDE", "LONGITUDE", "TIME", "N_POINTS")
ds = ds.reset_coords()
ds["N_POINTS"] = ds["N_POINTS"]
Expand All @@ -361,12 +253,11 @@ def to_xarray(self, errors: str = "ignore"):
ds = ds.rename({v: v.upper()})
ds = ds.set_coords(coords)

# Cast data types and add variable attributes (not available in the csv download):
ds["TIME"] = pd.to_datetime(ds["TIME"], utc=True)
ds = self._add_attributes(ds)
# Add variable attributes and cast data types:
ds = add_attributes(ds)
ds = ds.argo.cast_types()

# Remove argovis file attributes and replace them with argopy ones:
# Remove argovis dataset attributes and replace them with argopy ones:
ds.attrs = {}
if self.dataset_id == "phy":
ds.attrs["DATA_ID"] = "ARGO"
Expand Down Expand Up @@ -520,7 +411,7 @@ def uri(self):
MaxLen = np.timedelta64(MaxLenTime, "D")

urls = []
if not self.parallel:
if not self.parallelize:
# Check if the time range is not larger than allowed (MaxLenTime days):
if Lt > MaxLen:
self.Chunker = Chunker(
Expand Down
Loading

0 comments on commit f405296

Please sign in to comment.