Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make data fetching scalable with Dask #392

Merged
merged 60 commits into from
Oct 15, 2024
Merged

Make data fetching scalable with Dask #392

merged 60 commits into from
Oct 15, 2024

Conversation

gmaze
Copy link
Member

@gmaze gmaze commented Sep 18, 2024

Motivation

This has been under the hood for a very long time and somehow hidden in our file stores, waiting for prime time.

But with the advance of Argo data availability in the cloud, eg:

It's now time to support a new parallelization method with Dask

Existing methods are documented here: https://argopy.readthedocs.io/en/latest/performances.html#parallel-data-fetching

Support for Dask already works but only down to the very low level of fetching the raw data.
To fully make the high level argopy API works with a Dask client requires more work.

Existing feature

Today, we can fetch data in parallel with a Dask client only at low level.

First, let's create a fetcher without downloading data, just to get the list of resources to load and process:

f = argopy.DataFetcher(ds='bgc', src='erddap', 
                       chunks={'lon':1, 'lat':1, 'dpt':'auto', 'time':1}, 
                       chunks_maxsize={'dpt':100})
f = f.region([-78, -50, 34, 80, 0, 5000, '2020-01-01', '2021-01-01'])
len(f.uri)  # Returns 50 erddap urls

with this fetcher, we need to load and process 50 chunks of data.

If we now get a Dask client:

from dask.distributed import Client
client = Client(processes=True)

we can fetch data with our file store method open_mfdataset:

ds = f.fetcher.fs.open_mfdataset(
			f.uri,
			method=client,
			open_dataset_opts={'errors': 'ignore', 'download_url_opts': {'errors': 'ignore'}},
			)

The trick is to give the Dask client to the method argument and to ignore all errors (because if one url throw an error, the entire process fails).

What needs to be done

Although our open_mfdataset is not fundamentally different compared to using a catalog or xarray's ability to open multiple file; using our low level argopy file store method open_mfdataset allows for more Argo-related control of the data processing.

That's why in practice, the high level API: f.load() or f.to_xarray() are not yet working with a Dask client: because under the hood, the data fetcher performs complex processing of the data, and this processing is not yet serialisable by Dask.

This illustrated by the following snippet, where we now try to apply the data fetcher processing function to each chunk of data:

ds = f.fetcher.fs.open_mfdataset(
			f.uri,
			method=client,
			open_dataset_opts={'errors': 'ignore', 'download_url_opts': {'errors': 'ignore'}},
			preprocess=f.fetcher.post_process,
			preprocess_opts={"add_dm": False, "URI": f.uri}
			)

This raises a pickle error like Could not serialize object ...

So that's where most of the work in this PR should go: make the argopy data processing chain serialisable.

What has been done

Rq: this PR is based on the upcoming v1.0.0 release

  • Clarify what we call pre and post processing withing argopy (see schematic below !)
  • Refactor the pre-processing chain out of the GDAC data fetcher: argopy.data_fetchers.gdac_data.GDACArgoDataFetcher.post_process refactored to argopy.data_fetchers.gdac_data_processors.pre_process
  • Refactor the pre-processing chain out of the Erddap data fetcher: argopy.data_fetchers.erddap_data.ErddapArgoDataFetcher.post_process refactored to argopy.data_fetchers.erddap_data_processors.pre_process
  • Refactor the pre-processing chain out of the Argovis data fetcher: argopy.data_fetchers.argovis_data.ArgovisArgoDataFetcher.post_process refactored to argopy.data_fetchers.argovis_data_processors.pre_process
  • Refactor any methods related to data modes into a new DataSet.argo.datamode extensions
  • Allows for low level data download errors (by httpstore.download_url) to be ignored from higher levels calls (by httpstore.open_dataset, httpstore.open_mfdataset, httpstore.open_json, httpstore.open_mfjson)
  • Add new options parallel and parallel_default_method
  • Make it easier to use the option parallel, which can take False as a value for no parallelization or the name of the method to use. If set to True, we fall back on using the default method
  • Update the documentation:
  • Refactor methods to adhere to the pre and post processing steps describe in the schematic below
  • TBC

argopy-data-processing-chain 001

And also:

@gmaze gmaze added enhancement New feature or request backends performance labels Sep 18, 2024
@gmaze gmaze marked this pull request as draft September 18, 2024 13:04
@gmaze gmaze self-assigned this Sep 30, 2024
@gmaze gmaze marked this pull request as ready for review September 30, 2024 14:03
- also update option validation
- expose new options.VALIDATE function
@gmaze gmaze requested a review from quai20 October 3, 2024 13:01
@quai20
Copy link
Member

quai20 commented Oct 8, 2024

Some 1st testing on my side :

%%time
with argopy.set_options(parallel=client):
    f = DataFetcher(src='argovis').region(box)
    print("%i chunks to process" % len(f.uri))
    print(f)
    ds = f.load().data
    print(ds)
117 chunks to process
<datafetcher.argovis>
👁 Name: Argovis Argo data fetcher for a space/time region
🗺  Domain: [x=-60.00/0.00; y=20.00/60.08; z=0.0/500.0; t=2007-01-01/2009-01-01]
🔗 API: https://argovis-api.colorado.edu/
🗝 API KEY: 'guest' (get a free key at https://argovis-keygen.colorado.edu/)
🏊 User mode: standard
🟡+🔵 Dataset: phy
🌤  Performances: cache=False, parallel=True [<Client: 'tcp://127.0.0.1:44031' processes=4 threads=4, memory=15.53 GiB>]
<xarray.Dataset>
    [...]
CPU times: user 3min 21s, sys: 12.9 s, total: 3min 34s
Wall time: 10min 49s

@quai20
Copy link
Member

quai20 commented Oct 8, 2024

%%time
with argopy.set_options(parallel=client):
    f = DataFetcher(src='erddap').region(box)
    print("%i chunks to process" % len(f.uri))
    print(f)
    ds = f.load().data
    print(ds)
81 chunks to process
<datafetcher.erddap>
⭐ Name: Ifremer erddap Argo data fetcher for a space/time region
🗺  Domain: [x=-60.00/0.00; y=20.00/60.09; z=0.0/500.0; t=2007-01-01/2009-01-01]
🔗 API: https://erddap.ifremer.fr/erddap
🏊 User mode: standard
🟡+🔵 Dataset: phy
🌤  Performances: cache=False, parallel=True [<Client: 'tcp://127.0.0.1:44031' processes=4 threads=4, memory=15.53 GiB>]
<xarray.Dataset>
[...]
CPU times: user 1min 45s, sys: 6.01 s, total: 1min 51s
Wall time: 4min 15s

@gmaze
Copy link
Member Author

gmaze commented Oct 8, 2024

Glad this works !

Note @quai20 that perf with argovis are not as good as expected and could be improved, argovis/argovis_api#345

@quai20
Copy link
Member

quai20 commented Oct 8, 2024

Test on datarmor with dask-hpcconfig 'datarmor-local' cluster, and dataref gdac

%%time
with argopy.set_options(parallel=client):
    f = DataFetcher(src='gdac',gdac='/home/ref-argo/gdac').region(box)
    print("%i chunks to process" % len(f.uri))
    print(f)
    ds = f.load().data
    print(ds)
410 chunks to process
<datafetcher.gdac>
🌐 Name: Ifremer GDAC Argo data fetcher for a space/time region
🗺  Domain: [x=-60.00/0.00; y=20.00/60.01; z=0.0/500.0; t=2007-01-01/2009-01-01]
🔗 API: /home/ref-argo/gdac
📗 Index: ar_index_global_prof.txt.gz (3042123 records)
📸 Index searched: True (410 matches, 0.0135%)
🏊 User mode: standard
🟡+🔵 Dataset: phy
🌤  Performances: cache=False, parallel=True [<Client: 'tcp://127.0.0.1:45831' processes=7 threads=14, memory=100.00 GiB>]
Oops! <class 'UnicodeDecodeError'> occurred.
Fail to cast SCIENTIFIC_CALIB_COEFFICIENT[('N_PROF', 'N_CALIB', 'N_PARAM')] from 'object' to <class 'str'>
<xarray.Dataset> Size: 73MB
Dimensions:          (N_POINTS: 611401)
[...]
CPU times: user 1min 25s, sys: 5.73 s, total: 1min 31s
Wall time: 4min 48s

@gmaze
Copy link
Member Author

gmaze commented Oct 8, 2024

Test on datarmor with dask-hpcconfig 'datarmor-local' cluster, and dataref gdac

as fast as the erddap !

@gmaze
Copy link
Member Author

gmaze commented Oct 9, 2024

Hi @quai20
Thanks for the review.
I'll double check the doc and CI tests and merge this
g

Copy link

codecov bot commented Oct 11, 2024

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
1 1 0 0
View the top 1 failed tests by shortest run time
 test_fetchers_dask_cluster
Stack Traces | 0s run time
No failure message available

To view individual test run time comparison to the main branch, go to the Test Analytics Dashboard

@gmaze gmaze merged commit f405296 into master Oct 15, 2024
43 checks passed
@gmaze gmaze deleted the dask-ok branch October 15, 2024 09:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Asynchronous, parallel/concurrent data fetching from a single Argo data server ?
2 participants