From c1449e447e1a09e5c42b928ede15c178a239f57b Mon Sep 17 00:00:00 2001 From: Ryan Rymarczyk Date: Sun, 10 Nov 2024 06:59:36 -0500 Subject: [PATCH] re-auth on file link timeout --- src/cubic_loader/dmap/api_copy_job.py | 51 ++++++++++++++++++--------- src/cubic_loader/dmap/dmap_api.py | 12 ++++--- 2 files changed, 42 insertions(+), 21 deletions(-) diff --git a/src/cubic_loader/dmap/api_copy_job.py b/src/cubic_loader/dmap/api_copy_job.py index c17634f..fb94fe1 100644 --- a/src/cubic_loader/dmap/api_copy_job.py +++ b/src/cubic_loader/dmap/api_copy_job.py @@ -74,7 +74,7 @@ def schema_compare(local_path: str, destination_table: Any) -> None: schema_compare_log.log_complete() -def insert_update_last_updated(url: str, result: ApiResult, db_manager: DatabaseManager) -> None: +def update_api_metadata_table(url: str, result: ApiResult, db_manager: DatabaseManager) -> None: """ Set 'last_updated' value for 'url' in ApiMetadata table Attempts INSERT, if url already in table will conflict on url UNIQUE @@ -126,9 +126,18 @@ def run_api_copy(url: str, destination_table: Any) -> None: :param url: CUBIC API Endpoint URL for dataset :param destination_table: SQLAlchemy target table object """ + # File links recieved from CUBC API Endpoints are valid for ~1 hour after initial API call. + # In case of API failure, or extremely long running ETL process, we limit the maximum number of times + # that we will re-attempt to retrieve file links from a CUBIC API Endpoint + max_api_auths = 5 + api_retry_count = 0 + db_manager = DatabaseManager() - for result in get_api_results(url, db_manager): + api_results = get_api_results(url, db_manager) + + while len(api_results) > 0: + result = api_results.pop(0) api_result_log = ProcessLogger( "load_api_result", api_url=url, @@ -138,15 +147,25 @@ def run_api_copy(url: str, destination_table: Any) -> None: last_updated=result["last_updated"], start_date=result["start_date"], end_date=result["end_date"], + api_result_count=len(api_results), ) try: - # temporary local file path for API Result file download - temp_file = result["url"].split("?", 1)[0].split("/")[-1] with TemporaryDirectory(ignore_cleanup_errors=True) as tempdir: - temp_file_path = os.path.join(tempdir, temp_file) - - download_from_url(result["url"], temp_file_path) + temp_file_name = result["url"].split("?", 1)[0].split("/")[-1] + temp_file_path = os.path.join(tempdir, temp_file_name) + + if download_from_url(result["url"], temp_file_path) is False and api_retry_count < max_api_auths: + # api_results have been invalidated from Authentication timeout + # get new set of results and continue processing + api_results = get_api_results(url, db_manager) + api_result_log.log_complete(auth_failure=True) + api_retry_count += 1 + continue + if api_retry_count == max_api_auths: + raise TimeoutError( + f"Maximum number of CUBIC API re-authentication attempts ({max_api_auths}) reached" + ) # compare schema of API file download to destination_table # throw if additional columns found in file download @@ -161,27 +180,25 @@ def run_api_copy(url: str, destination_table: Any) -> None: db_manager.vaccuum_analyze(destination_table) - delete_dataset_id = sa.delete(destination_table).where(destination_table.dataset_id == result["dataset_id"]) - delete_result = db_manager.execute(delete_dataset_id) + # delete records from table that have been over-written by API downloaded file + delete_result = db_manager.execute( + sa.delete(destination_table).where(destination_table.dataset_id == result["dataset_id"]) + ) - # update dataset_id for all records just loaded into DB from - # API downloaded file - update_dataset_id = ( + # update dataset_id for all records just loaded into DB from API downloaded file + update_result = db_manager.execute( sa.update(destination_table) .where(destination_table.dataset_id.is_(None)) .values(dataset_id=result["dataset_id"]) ) - update_result = db_manager.execute(update_dataset_id) - insert_update_last_updated(url, result, db_manager) + update_api_metadata_table(url, result, db_manager) - api_result_log.add_metadata( + api_result_log.log_complete( db_records_deleted=delete_result.rowcount, db_records_added=update_result.rowcount, ) - api_result_log.log_complete() - except Exception as exception: api_result_log.log_failure(exception) raise exception diff --git a/src/cubic_loader/dmap/dmap_api.py b/src/cubic_loader/dmap/dmap_api.py index e2977f6..eab4077 100644 --- a/src/cubic_loader/dmap/dmap_api.py +++ b/src/cubic_loader/dmap/dmap_api.py @@ -41,15 +41,15 @@ def apikey_from_environment(url: str) -> Optional[str]: return default -def download_from_url(url: str, local_path: str) -> Optional[str]: +def download_from_url(url: str, local_path: str) -> bool: """ Download file from url to local_path. - will throw for HTTP error + will throw for HTTP error (Except Auth Error 403) :param url: CUBIC API URL string of file :param local_path: local file path to save downloaded file to - :return local file path of successfully downloaded file + :return: True if download success, False if Authentication Error """ download_log = ProcessLogger( "download_from_url", @@ -73,6 +73,10 @@ def download_from_url(url: str, local_path: str) -> Optional[str]: break except Exception as _: + if response.status_code == 403: + # handle Authentication Error + download_log.log_complete(status_code=response.status_code) + return False if retry_count < max_retries: # wait and try again time.sleep(15) @@ -90,7 +94,7 @@ def download_from_url(url: str, local_path: str) -> Optional[str]: download_log.add_metadata(file_size_mb=f"{file_size_mb:.4f}") download_log.log_complete() - return local_path + return True def get_api_results(url: str, db_manager: DatabaseManager) -> List[ApiResult]: