Skip to content

Commit

Permalink
re-auth on file link timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
rymarczy committed Nov 12, 2024
1 parent a02fdea commit c1449e4
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 21 deletions.
51 changes: 34 additions & 17 deletions src/cubic_loader/dmap/api_copy_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def schema_compare(local_path: str, destination_table: Any) -> None:
schema_compare_log.log_complete()


def insert_update_last_updated(url: str, result: ApiResult, db_manager: DatabaseManager) -> None:
def update_api_metadata_table(url: str, result: ApiResult, db_manager: DatabaseManager) -> None:
"""
Set 'last_updated' value for 'url' in ApiMetadata table
Attempts INSERT, if url already in table will conflict on url UNIQUE
Expand Down Expand Up @@ -126,9 +126,18 @@ def run_api_copy(url: str, destination_table: Any) -> None:
:param url: CUBIC API Endpoint URL for dataset
:param destination_table: SQLAlchemy target table object
"""
# File links recieved from CUBC API Endpoints are valid for ~1 hour after initial API call.
# In case of API failure, or extremely long running ETL process, we limit the maximum number of times
# that we will re-attempt to retrieve file links from a CUBIC API Endpoint
max_api_auths = 5
api_retry_count = 0

db_manager = DatabaseManager()

for result in get_api_results(url, db_manager):
api_results = get_api_results(url, db_manager)

while len(api_results) > 0:
result = api_results.pop(0)
api_result_log = ProcessLogger(
"load_api_result",
api_url=url,
Expand All @@ -138,15 +147,25 @@ def run_api_copy(url: str, destination_table: Any) -> None:
last_updated=result["last_updated"],
start_date=result["start_date"],
end_date=result["end_date"],
api_result_count=len(api_results),
)

try:
# temporary local file path for API Result file download
temp_file = result["url"].split("?", 1)[0].split("/")[-1]
with TemporaryDirectory(ignore_cleanup_errors=True) as tempdir:
temp_file_path = os.path.join(tempdir, temp_file)

download_from_url(result["url"], temp_file_path)
temp_file_name = result["url"].split("?", 1)[0].split("/")[-1]
temp_file_path = os.path.join(tempdir, temp_file_name)

if download_from_url(result["url"], temp_file_path) is False and api_retry_count < max_api_auths:
# api_results have been invalidated from Authentication timeout
# get new set of results and continue processing
api_results = get_api_results(url, db_manager)
api_result_log.log_complete(auth_failure=True)
api_retry_count += 1
continue
if api_retry_count == max_api_auths:
raise TimeoutError(
f"Maximum number of CUBIC API re-authentication attempts ({max_api_auths}) reached"
)

# compare schema of API file download to destination_table
# throw if additional columns found in file download
Expand All @@ -161,27 +180,25 @@ def run_api_copy(url: str, destination_table: Any) -> None:

db_manager.vaccuum_analyze(destination_table)

delete_dataset_id = sa.delete(destination_table).where(destination_table.dataset_id == result["dataset_id"])
delete_result = db_manager.execute(delete_dataset_id)
# delete records from table that have been over-written by API downloaded file
delete_result = db_manager.execute(
sa.delete(destination_table).where(destination_table.dataset_id == result["dataset_id"])
)

# update dataset_id for all records just loaded into DB from
# API downloaded file
update_dataset_id = (
# update dataset_id for all records just loaded into DB from API downloaded file
update_result = db_manager.execute(
sa.update(destination_table)
.where(destination_table.dataset_id.is_(None))
.values(dataset_id=result["dataset_id"])
)
update_result = db_manager.execute(update_dataset_id)

insert_update_last_updated(url, result, db_manager)
update_api_metadata_table(url, result, db_manager)

api_result_log.add_metadata(
api_result_log.log_complete(
db_records_deleted=delete_result.rowcount,
db_records_added=update_result.rowcount,
)

api_result_log.log_complete()

except Exception as exception:
api_result_log.log_failure(exception)
raise exception
Expand Down
12 changes: 8 additions & 4 deletions src/cubic_loader/dmap/dmap_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ def apikey_from_environment(url: str) -> Optional[str]:
return default


def download_from_url(url: str, local_path: str) -> Optional[str]:
def download_from_url(url: str, local_path: str) -> bool:
"""
Download file from url to local_path.
will throw for HTTP error
will throw for HTTP error (Except Auth Error 403)
:param url: CUBIC API URL string of file
:param local_path: local file path to save downloaded file to
:return local file path of successfully downloaded file
:return: True if download success, False if Authentication Error
"""
download_log = ProcessLogger(
"download_from_url",
Expand All @@ -73,6 +73,10 @@ def download_from_url(url: str, local_path: str) -> Optional[str]:
break

except Exception as _:
if response.status_code == 403:
# handle Authentication Error
download_log.log_complete(status_code=response.status_code)
return False
if retry_count < max_retries:
# wait and try again
time.sleep(15)
Expand All @@ -90,7 +94,7 @@ def download_from_url(url: str, local_path: str) -> Optional[str]:
download_log.add_metadata(file_size_mb=f"{file_size_mb:.4f}")
download_log.log_complete()

return local_path
return True


def get_api_results(url: str, db_manager: DatabaseManager) -> List[ApiResult]:
Expand Down

0 comments on commit c1449e4

Please sign in to comment.