Skip to content

Commit

Permalink
Merge pull request #296 from nsidc/wip-s3credentials
Browse files Browse the repository at this point in the history
Use CMR metadata to get S3FS sessions and authenticate queries to CMR using top level APIs
  • Loading branch information
betolink authored Sep 18, 2023
2 parents 4717c82 + c4321c0 commit 01a7da0
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 17 deletions.
41 changes: 34 additions & 7 deletions earthaccess/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ def search_datasets(
"Warning: a valid set of parameters is needed to search for datasets on CMR"
)
return []
query = DataCollections().parameters(**kwargs)
if earthaccess.__auth__.authenticated:
query = DataCollections(auth=earthaccess.__auth__).parameters(**kwargs)
else:
query = DataCollections().parameters(**kwargs)
datasets_found = query.hits()
print(f"Datasets found: {datasets_found}")
if count > 0:
Expand Down Expand Up @@ -100,7 +103,10 @@ def search_data(
)
```
"""
query = DataGranules().parameters(**kwargs)
if earthaccess.__auth__.authenticated:
query = DataGranules(earthaccess.__auth__).parameters(**kwargs)
else:
query = DataGranules().parameters(**kwargs)
granules_found = query.hits()
print(f"Granules found: {granules_found}")
if count > 0:
Expand Down Expand Up @@ -191,20 +197,29 @@ def open(


def get_s3_credentials(
daac: Optional[str] = None, provider: Optional[str] = None
daac: Optional[str] = None,
provider: Optional[str] = None,
results: Optional[List[earthaccess.results.DataGranule]] = None,
) -> Dict[str, Any]:
"""Returns temporary (1 hour) credentials for direct access to NASA S3 buckets
"""Returns temporary (1 hour) credentials for direct access to NASA S3 buckets, we can
use the daac name, the provider or a list of results from earthaccess.search_data()
if we use results earthaccess will use the metadata on the response to get the credentials,
this is useful for missions that do not use the same endpoint as their DAACs e.g. SWOT
Parameters:
daac: a DAAC short_name like NSIDC or PODAAC etc
provider: if we know the provider for the DAAC e.g. POCLOUD, LPCLOUD etc.
daac (String): a DAAC short_name like NSIDC or PODAAC etc
provider (String: if we know the provider for the DAAC e.g. POCLOUD, LPCLOUD etc.
results (list[earthaccess.results.DataGranule]): List of results from search_data()
Returns:
a dictionary with S3 credentials for the DAAC or provider
"""
if daac is not None:
daac = daac.upper()
if provider is not None:
provider = provider.upper()
if results is not None:
endpoint = results[0].get_s3_credentials_endpoint()
return earthaccess.__auth__.get_s3_credentials(endpoint=endpoint)
return earthaccess.__auth__.get_s3_credentials(daac=daac, provider=provider)


Expand Down Expand Up @@ -284,13 +299,25 @@ class requests.Session: an authenticated requests Session instance.


def get_s3fs_session(
daac: Optional[str] = None, provider: Optional[str] = None
daac: Optional[str] = None,
provider: Optional[str] = None,
results: Optional[earthaccess.results.DataGranule] = None,
) -> s3fs.S3FileSystem:
"""Returns a fsspec s3fs file session for direct access when we are in us-west-2
Parameters:
daac (String): Any DAAC short name e.g. NSIDC, GES_DISC
provider (String): Each DAAC can have a cloud provider, if the DAAC is specified, there is no need to use provider
results (list[class earthaccess.results.DataGranule]): A list of results from search_data(), earthaccess will use the metadata form CMR to obtain the S3 Endpoint
Returns:
class s3fs.S3FileSystem: an authenticated s3fs session valid for 1 hour
"""
if results is not None:
endpoint = results[0].get_s3_credentials_endpoint()
if endpoint is not None:
session = earthaccess.__store__.get_s3fs_session(endpoint=endpoint)
return session
session = earthaccess.__store__.get_s3fs_session(daac=daac, provider=provider)
return session

Expand Down
13 changes: 11 additions & 2 deletions earthaccess/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,22 +138,31 @@ def refresh_tokens(self) -> bool:
return False

def get_s3_credentials(
self, daac: Optional[str] = "", provider: Optional[str] = ""
self,
daac: Optional[str] = None,
provider: Optional[str] = None,
endpoint: Optional[str] = None,
) -> Dict[str, str]:
"""Gets AWS S3 credentials for a given NASA cloud provider, the
easier way is to use the DAAC short name. provider is optional if we know it.
Parameters:
provider: A valid cloud provider, each DAAC has a provider code for their cloud distributions
daac: the name of a NASA DAAC, i.e. NSIDC or PODAAC
endpoint: getting the credentials directly from the S3Credentials URL
Rreturns:
A Python dictionary with the temporary AWS S3 credentials
"""
if self.authenticated:
session = SessionWithHeaderRedirection(self.username, self.password)
auth_url = self._get_cloud_auth_url(daac_shortname=daac, provider=provider)
if endpoint is None:
auth_url = self._get_cloud_auth_url(
daac_shortname=daac, provider=provider
)
else:
auth_url = endpoint
if auth_url.startswith("https://"):
cumulus_resp = session.get(auth_url, timeout=15, allow_redirects=True)
auth_resp = session.get(
Expand Down
6 changes: 6 additions & 0 deletions earthaccess/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ def _repr_html_(self) -> str:
granule_html_repr = _repr_granule_html(self)
return granule_html_repr

def get_s3_credentials_endpoint(self) -> Union[str, None]:
for link in self["umm"]["RelatedUrls"]:
if "/s3credentials" in link["URL"]:
return link["URL"]
return None

def size(self) -> float:
"""
Returns:
Expand Down
39 changes: 32 additions & 7 deletions earthaccess/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from functools import lru_cache
from itertools import chain
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union
from uuid import uuid4

import fsspec
Expand Down Expand Up @@ -128,6 +128,12 @@ def _is_cloud_collection(self, concept_id: List[str]) -> bool:
return True
return False

def _own_s3_credentials(self, links: List[Dict[str, Any]]) -> Union[str, None]:
for link in links:
if "/s3credentials" in link["URL"]:
return link["URL"]
return None

def _am_i_in_aws(self) -> bool:
session = self.auth.get_session()
try:
Expand Down Expand Up @@ -181,22 +187,27 @@ def get_s3fs_session(
daac: Optional[str] = None,
concept_id: Optional[str] = None,
provider: Optional[str] = None,
endpoint: Optional[str] = None,
) -> s3fs.S3FileSystem:
"""
Returns a s3fs instance for a given cloud provider / DAAC
Parameters:
daac: any of the DAACs e.g. NSIDC, PODAAC
provider: a data provider if we know them, e.g PODAAC -> POCLOUD
endpoint: pass the URL for the credentials directly
Returns:
a s3fs file instance
"""
if self.auth is not None:
if not any([concept_id, daac, provider]):
if not any([concept_id, daac, provider, endpoint]):
raise ValueError(
"At least one of the concept_id, daac, or provider "
"At least one of the concept_id, daac, provider or endpoint"
"parameters must be specified. "
)
if concept_id is not None:
if endpoint is not None:
s3_credentials = self.auth.get_s3_credentials(endpoint=endpoint)
elif concept_id is not None:
provider = self._derive_concept_provider(concept_id)
s3_credentials = self.auth.get_s3_credentials(provider=provider)
elif daac is not None:
Expand Down Expand Up @@ -309,7 +320,14 @@ def _open_granules(
if granules[0].cloud_hosted:
access_method = "direct"
provider = granules[0]["meta"]["provider-id"]
s3_fs = self.get_s3fs_session(provider=provider)
# if the data has its own S3 credentials endpoint we'll use it
endpoint = self._own_s3_credentials(granules[0]["umm"]["RelatedUrls"])
if endpoint is not None:
print(f"using endpoint: {endpoint}")
s3_fs = self.get_s3fs_session(endpoint=endpoint)
else:
print(f"using provider: {provider}")
s3_fs = self.get_s3fs_session(provider=provider)
else:
access_method = "on_prem"
s3_fs = None
Expand Down Expand Up @@ -512,6 +530,7 @@ def _get_granules(
data_links: List = []
downloaded_files: List = []
provider = granules[0]["meta"]["provider-id"]
endpoint = self._own_s3_credentials(granules[0]["umm"]["RelatedUrls"])
cloud_hosted = granules[0].cloud_hosted
access = "direc" if (cloud_hosted and self.running_in_aws) else "external"
data_links = list(
Expand All @@ -526,8 +545,14 @@ def _get_granules(
f" Getting {len(granules)} granules, approx download size: {total_size} GB"
)
if access == "direct":
print(f"Accessing cloud dataset using provider: {provider}")
s3_fs = self.get_s3fs_session(provider)
if endpoint is not None:
print(
f"Accessing cloud dataset using dataset endpoint credentials: {endpoint}"
)
s3_fs = self.get_s3fs_session(endpoint=endpoint)
else:
print(f"Accessing cloud dataset using provider: {provider}")
s3_fs = self.get_s3fs_session(provider=provider)
# TODO: make this async
for file in data_links:
s3_fs.get(file, local_path)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "earthaccess"
version = "0.5.3"
version = "0.5.4"
homepage = "https://github.com/nsidc/earthaccess"
description = "Client library for NASA Earthdata APIs"
authors = ["earthaccess contributors"]
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ def test_store_can_create_https_fsspec_session(self):
def test_store_can_create_s3_fsspec_session(self):
from earthaccess.daac import DAACS

custom_endpoints = [
"https://archive.swot.podaac.earthdata.nasa.gov/s3credentials",
"https://api.giovanni.earthdata.nasa.gov/s3credentials",
"https://data.laadsdaac.earthdatacloud.nasa.gov/s3credentials",
]

for endpoint in custom_endpoints:
responses.add(
responses.GET,
endpoint,
json={},
status=200,
)

for daac in DAACS:
if "s3-credentials" in daac:
responses.add(
Expand All @@ -80,6 +94,10 @@ def test_store_can_create_s3_fsspec_session(self):
s3_fs = store.get_s3fs_session(daac=daac)
self.assertEqual(type(s3_fs), type(fsspec.filesystem("s3")))

for endpoint in custom_endpoints:
s3_fs = store.get_s3fs_session(endpoint=endpoint)
self.assertEqual(type(s3_fs), type(fsspec.filesystem("s3")))

for provider in [
"NSIDC_CPRD",
"POCLOUD",
Expand Down

0 comments on commit 01a7da0

Please sign in to comment.