Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use CMR metadata to get S3FS sessions and authenticate queries to CMR using top level APIs #296

Merged
merged 9 commits into from
Sep 18, 2023
41 changes: 34 additions & 7 deletions earthaccess/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ def search_datasets(
"Warning: a valid set of parameters is needed to search for datasets on CMR"
)
return []
query = DataCollections().parameters(**kwargs)
if earthaccess.__auth__.authenticated:
query = DataCollections(auth=earthaccess.__auth__).parameters(**kwargs)
else:
query = DataCollections().parameters(**kwargs)
datasets_found = query.hits()
print(f"Datasets found: {datasets_found}")
if count > 0:
Expand Down Expand Up @@ -100,7 +103,10 @@ def search_data(
)
```
"""
query = DataGranules().parameters(**kwargs)
if earthaccess.__auth__.authenticated:
query = DataGranules(earthaccess.__auth__).parameters(**kwargs)
else:
query = DataGranules().parameters(**kwargs)
granules_found = query.hits()
print(f"Granules found: {granules_found}")
if count > 0:
Expand Down Expand Up @@ -191,20 +197,29 @@ def open(


def get_s3_credentials(
daac: Optional[str] = None, provider: Optional[str] = None
daac: Optional[str] = None,
provider: Optional[str] = None,
results: Optional[List[earthaccess.results.DataGranule]] = None,
) -> Dict[str, Any]:
"""Returns temporary (1 hour) credentials for direct access to NASA S3 buckets
"""Returns temporary (1 hour) credentials for direct access to NASA S3 buckets, we can
use the daac name, the provider or a list of results from earthaccess.search_data()
if we use results earthaccess will use the metadata on the response to get the credentials,
this is useful for missions that do not use the same endpoint as their DAACs e.g. SWOT

Parameters:
daac: a DAAC short_name like NSIDC or PODAAC etc
provider: if we know the provider for the DAAC e.g. POCLOUD, LPCLOUD etc.
daac (String): a DAAC short_name like NSIDC or PODAAC etc
provider (String: if we know the provider for the DAAC e.g. POCLOUD, LPCLOUD etc.
results (list[earthaccess.results.DataGranule]): List of results from search_data()
Returns:
a dictionary with S3 credentials for the DAAC or provider
"""
if daac is not None:
daac = daac.upper()
if provider is not None:
provider = provider.upper()
if results is not None:
endpoint = results[0].get_s3_credentials_endpoint()
return earthaccess.__auth__.get_s3_credentials(endpoint=endpoint)
return earthaccess.__auth__.get_s3_credentials(daac=daac, provider=provider)


Expand Down Expand Up @@ -284,13 +299,25 @@ class requests.Session: an authenticated requests Session instance.


def get_s3fs_session(
daac: Optional[str] = None, provider: Optional[str] = None
daac: Optional[str] = None,
provider: Optional[str] = None,
results: Optional[earthaccess.results.DataGranule] = None,
) -> s3fs.S3FileSystem:
"""Returns a fsspec s3fs file session for direct access when we are in us-west-2

Parameters:
daac (String): Any DAAC short name e.g. NSIDC, GES_DISC
provider (String): Each DAAC can have a cloud provider, if the DAAC is specified, there is no need to use provider
results (list[class earthaccess.results.DataGranule]): A list of results from search_data(), earthaccess will use the metadata form CMR to obtain the S3 Endpoint

Returns:
class s3fs.S3FileSystem: an authenticated s3fs session valid for 1 hour
"""
if results is not None:
endpoint = results[0].get_s3_credentials_endpoint()
if endpoint is not None:
session = earthaccess.__store__.get_s3fs_session(endpoint=endpoint)
return session
session = earthaccess.__store__.get_s3fs_session(daac=daac, provider=provider)
return session

Expand Down
13 changes: 11 additions & 2 deletions earthaccess/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,22 +138,31 @@ def refresh_tokens(self) -> bool:
return False

def get_s3_credentials(
self, daac: Optional[str] = "", provider: Optional[str] = ""
self,
daac: Optional[str] = None,
provider: Optional[str] = None,
endpoint: Optional[str] = None,
) -> Dict[str, str]:
"""Gets AWS S3 credentials for a given NASA cloud provider, the
easier way is to use the DAAC short name. provider is optional if we know it.

Parameters:
provider: A valid cloud provider, each DAAC has a provider code for their cloud distributions
daac: the name of a NASA DAAC, i.e. NSIDC or PODAAC
endpoint: getting the credentials directly from the S3Credentials URL

Rreturns:
A Python dictionary with the temporary AWS S3 credentials

"""
if self.authenticated:
session = SessionWithHeaderRedirection(self.username, self.password)
auth_url = self._get_cloud_auth_url(daac_shortname=daac, provider=provider)
if endpoint is None:
auth_url = self._get_cloud_auth_url(
daac_shortname=daac, provider=provider
)
else:
auth_url = endpoint
if auth_url.startswith("https://"):
cumulus_resp = session.get(auth_url, timeout=15, allow_redirects=True)
auth_resp = session.get(
Expand Down
6 changes: 6 additions & 0 deletions earthaccess/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ def _repr_html_(self) -> str:
granule_html_repr = _repr_granule_html(self)
return granule_html_repr

def get_s3_credentials_endpoint(self) -> Union[str, None]:
for link in self["umm"]["RelatedUrls"]:
if "/s3credentials" in link["URL"]:
return link["URL"]
return None

def size(self) -> float:
"""
Returns:
Expand Down
39 changes: 32 additions & 7 deletions earthaccess/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from functools import lru_cache
from itertools import chain
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union
from uuid import uuid4

import fsspec
Expand Down Expand Up @@ -128,6 +128,12 @@ def _is_cloud_collection(self, concept_id: List[str]) -> bool:
return True
return False

def _own_s3_credentials(self, links: List[Dict[str, Any]]) -> Union[str, None]:
for link in links:
if "/s3credentials" in link["URL"]:
return link["URL"]
return None

def _am_i_in_aws(self) -> bool:
session = self.auth.get_session()
try:
Expand Down Expand Up @@ -181,22 +187,27 @@ def get_s3fs_session(
daac: Optional[str] = None,
concept_id: Optional[str] = None,
provider: Optional[str] = None,
endpoint: Optional[str] = None,
) -> s3fs.S3FileSystem:
"""
Returns a s3fs instance for a given cloud provider / DAAC

Parameters:
daac: any of the DAACs e.g. NSIDC, PODAAC
provider: a data provider if we know them, e.g PODAAC -> POCLOUD
endpoint: pass the URL for the credentials directly
Returns:
a s3fs file instance
"""
if self.auth is not None:
if not any([concept_id, daac, provider]):
if not any([concept_id, daac, provider, endpoint]):
raise ValueError(
"At least one of the concept_id, daac, or provider "
"At least one of the concept_id, daac, provider or endpoint"
"parameters must be specified. "
)
if concept_id is not None:
if endpoint is not None:
s3_credentials = self.auth.get_s3_credentials(endpoint=endpoint)
elif concept_id is not None:
provider = self._derive_concept_provider(concept_id)
s3_credentials = self.auth.get_s3_credentials(provider=provider)
elif daac is not None:
Expand Down Expand Up @@ -309,7 +320,14 @@ def _open_granules(
if granules[0].cloud_hosted:
access_method = "direct"
provider = granules[0]["meta"]["provider-id"]
s3_fs = self.get_s3fs_session(provider=provider)
# if the data has its own S3 credentials endpoint we'll use it
endpoint = self._own_s3_credentials(granules[0]["umm"]["RelatedUrls"])
if endpoint is not None:
print(f"using endpoint: {endpoint}")
s3_fs = self.get_s3fs_session(endpoint=endpoint)
else:
print(f"using provider: {provider}")
s3_fs = self.get_s3fs_session(provider=provider)
else:
access_method = "on_prem"
s3_fs = None
Expand Down Expand Up @@ -512,6 +530,7 @@ def _get_granules(
data_links: List = []
downloaded_files: List = []
provider = granules[0]["meta"]["provider-id"]
endpoint = self._own_s3_credentials(granules[0]["umm"]["RelatedUrls"])
cloud_hosted = granules[0].cloud_hosted
access = "direc" if (cloud_hosted and self.running_in_aws) else "external"
data_links = list(
Expand All @@ -526,8 +545,14 @@ def _get_granules(
f" Getting {len(granules)} granules, approx download size: {total_size} GB"
)
if access == "direct":
print(f"Accessing cloud dataset using provider: {provider}")
s3_fs = self.get_s3fs_session(provider)
if endpoint is not None:
print(
f"Accessing cloud dataset using dataset endpoint credentials: {endpoint}"
)
s3_fs = self.get_s3fs_session(endpoint=endpoint)
else:
print(f"Accessing cloud dataset using provider: {provider}")
s3_fs = self.get_s3fs_session(provider=provider)
# TODO: make this async
for file in data_links:
s3_fs.get(file, local_path)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "earthaccess"
version = "0.5.3"
version = "0.5.4"
homepage = "https://github.com/nsidc/earthaccess"
description = "Client library for NASA Earthdata APIs"
authors = ["earthaccess contributors"]
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ def test_store_can_create_https_fsspec_session(self):
def test_store_can_create_s3_fsspec_session(self):
from earthaccess.daac import DAACS

custom_endpoints = [
"https://archive.swot.podaac.earthdata.nasa.gov/s3credentials",
"https://api.giovanni.earthdata.nasa.gov/s3credentials",
"https://data.laadsdaac.earthdatacloud.nasa.gov/s3credentials",
]

for endpoint in custom_endpoints:
responses.add(
responses.GET,
endpoint,
json={},
status=200,
)

for daac in DAACS:
if "s3-credentials" in daac:
responses.add(
Expand All @@ -80,6 +94,10 @@ def test_store_can_create_s3_fsspec_session(self):
s3_fs = store.get_s3fs_session(daac=daac)
self.assertEqual(type(s3_fs), type(fsspec.filesystem("s3")))

for endpoint in custom_endpoints:
s3_fs = store.get_s3fs_session(endpoint=endpoint)
self.assertEqual(type(s3_fs), type(fsspec.filesystem("s3")))

for provider in [
"NSIDC_CPRD",
"POCLOUD",
Expand Down
Loading