Skip to content

Commit

Permalink
Remove duplicate CSLCs with different PGE versions (#41)
Browse files Browse the repository at this point in the history
* Remove duplicate CSLCs with different PGE versions

* remove `_search` in favor of using `asf.search`

* add download filter test

* remove unused _search
  • Loading branch information
scottstanie authored Jun 18, 2024
1 parent f9d1ff3 commit 156c55b
Show file tree
Hide file tree
Showing 4 changed files with 2,673 additions and 15 deletions.
62 changes: 47 additions & 15 deletions src/opera_utils/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import netrc
import warnings
from enum import Enum
from functools import cache
from itertools import groupby
from pathlib import Path
from typing import Literal, Sequence, Union

from packaging.version import parse

try:
import asf_search as asf
from asf_search.ASFSearchResults import ASFSearchResults
except ImportError:
warnings.warn("Can't import `asf_search`. Unable to search/download data. ")

Expand Down Expand Up @@ -165,9 +168,14 @@ def _download_for_burst_ids(
logger.info(
f"Searching {len(burst_ids)} for {product} (Dates:{start} to {end})"
)
results = _search(
burst_ids=tuple(burst_ids), product=product, start=start, end=end
results = asf.search(
operaBurstID=list(burst_ids),
processingLevel=product.value,
start=start,
end=end,
)
logger.debug(f"Found {len(results)} total results before deduping pgeVersion")
results = filter_results_by_date_and_version(results)
logger.info(f"Found {len(results)} results")
session = _get_auth_session()
urls = _get_urls(results)
Expand All @@ -177,20 +185,44 @@ def _download_for_burst_ids(
return [Path(output_dir) / r.properties["fileName"] for r in results]


@cache
def _search(
burst_ids: Sequence[str],
product: L2Product,
start: DatetimeInput,
end: DatetimeInput,
) -> asf.ASFSearchResults:
return asf.search(
operaBurstID=list(burst_ids),
processingLevel=product.value,
start=start,
end=end,
def filter_results_by_date_and_version(results: ASFSearchResults) -> ASFSearchResults:
"""Filter ASF search results to retain only one result per unique 'startTime'.
Function selects the result with the latest 'pgeVersion' if multiple results
exist for the same 'startTime'.
Parameters
----------
results : asf_search.ASFSearchResults.ASFSearchResults
List of ASF search results to filter.
Returns
-------
asf_search.ASFSearchResults.ASFSearchResults
Filtered list of ASF search results with unique 'startTime',
each having the latest 'pgeVersion'.
"""
# First, sort the results primarily by 'startTime' and secondarily by 'pgeVersion' in descending order
sorted_results = sorted(
results,
key=lambda r: (r.properties["startTime"], parse(r.properties["pgeVersion"])),
reverse=True,
)

# It is important to sort by startTime before using groupby,
# as groupby only works correctly if the input data is sorted by the key
grouped_by_start_time = groupby(
sorted_results, key=lambda r: r.properties["startTime"]
)

# Extract the result with the highest pgeVersion for each group
filtered_results = [
max(group, key=lambda r: parse(r.properties["pgeVersion"]))
for _, group in grouped_by_start_time
]

return filtered_results


def _get_urls(
results: asf.ASFSearchResults,
Expand Down
Loading

0 comments on commit 156c55b

Please sign in to comment.