From d54b6d25e7857aad68d5132e79d01127c3ee2f6c Mon Sep 17 00:00:00 2001 From: Chris Carlon Date: Tue, 3 Dec 2024 16:05:58 +0000 Subject: [PATCH] feat(french catalogue methods and tests): added in new methods for french catalogue exploration and added tests for this [2024-12-03] --- HerdingCats/endpoints/api_endpoints.py | 2 +- HerdingCats/errors/cats_errors.py | 31 ++- HerdingCats/explorer/cat_explore.py | 238 +++++++++--------- HerdingCats/session/cat_session.py | 49 ++-- .../test_frgouv_endpoint_health.py | 3 +- tests/french_gouv/test_frgouv_get_datasets.py | 44 ++++ 6 files changed, 226 insertions(+), 141 deletions(-) create mode 100644 tests/french_gouv/test_frgouv_get_datasets.py diff --git a/HerdingCats/endpoints/api_endpoints.py b/HerdingCats/endpoints/api_endpoints.py index 0071ac9..44adfd3 100644 --- a/HerdingCats/endpoints/api_endpoints.py +++ b/HerdingCats/endpoints/api_endpoints.py @@ -47,7 +47,7 @@ class OpenDataSoftApiPaths: SHOW_DATASET_EXPORTS = BASE_PATH.format("datasets/{}/exports") # Alternative base paths... - # Sometimes these are needed - not sure why need to dig into this + # Sometimes these are needed - not sure why need to dig into this! BASE_PATH_2 = "/api/explore/v2.0/catalog/{}" SHOW_DATASETS_2 = BASE_PATH_2.format("datasets") SHOW_DATASET_INFO_2 = BASE_PATH_2.format("datasets/{}") diff --git a/HerdingCats/errors/cats_errors.py b/HerdingCats/errors/cats_errors.py index 709fe8b..d4e83c6 100644 --- a/HerdingCats/errors/cats_errors.py +++ b/HerdingCats/errors/cats_errors.py @@ -8,9 +8,8 @@ class CatExplorerError(Exception): class OpenDataSoftExplorerError(Exception): """ - Custom exception class for OpenDataSoft Explorer errors with colored output using ANSI codes. + Custom exception class for OpenDataSoft Explorer errors. """ - # ANSI escape codes for colors RED = '\033[91m' YELLOW = '\033[93m' RESET = '\033[0m' @@ -19,7 +18,6 @@ def __init__(self, message: str, original_error: Optional[Exception] = None) -> self.message = message self.original_error = original_error - # Build the error message with color error_msg = ( f"{self.RED}OpenDataSoftExplorer Error 🐈‍⬛: {message}{self.RESET}" ) @@ -34,3 +32,30 @@ def __init__(self, message: str, original_error: Optional[Exception] = None) -> def __str__(self) -> str: return self.args[0] + +class WrongCatalogueError(CatExplorerError): + """ + Custom exception class for when the wrong catalogue type is used with an explorer. + """ + + RED = '\033[91m' + YELLOW = '\033[93m' + RESET = '\033[0m' + + def __init__(self, message: str, expected_catalogue: str, received_catalogue: Optional[str] = None) -> None: + self.message = message + self.expected_catalogue = expected_catalogue + self.received_catalogue = received_catalogue + + error_msg = ( + f"{self.RED}[Wrong Catalogue Error]: {message}{self.RESET}\n" + f"{self.YELLOW}Expected catalogue: {expected_catalogue}" + ) + if received_catalogue: + error_msg += f"\nReceived catalogue: {received_catalogue}" + error_msg += f"{self.RESET}" + + super().__init__(error_msg) + + def __str__(self) -> str: + return self.args[0] diff --git a/HerdingCats/explorer/cat_explore.py b/HerdingCats/explorer/cat_explore.py index d4abad9..929df4e 100644 --- a/HerdingCats/explorer/cat_explore.py +++ b/HerdingCats/explorer/cat_explore.py @@ -9,10 +9,13 @@ from loguru import logger from urllib.parse import urlencode -from ..endpoints.api_endpoints import CkanApiPaths, OpenDataSoftApiPaths, FrenchGouvApiPaths -from ..errors.cats_errors import CatExplorerError -from ..session.cat_session import CatSession - +from ..endpoints.api_endpoints import ( + CkanApiPaths, + OpenDataSoftApiPaths, + FrenchGouvApiPaths +) +from ..errors.cats_errors import CatExplorerError, WrongCatalogueError +from ..session.cat_session import CatSession, CatalogueType # FIND THE DATA YOU WANT / NEED / ISOLATE PACKAGES AND RESOURCES # For Ckan Catalogues Only @@ -23,7 +26,8 @@ def __init__(self, cat_session: CatSession): Allows user to start exploring data catalogue programatically - Make sure you pass a valid CkanCatSession in + Make sure you pass a valid CkanCatSession in - checks if the right type. + Args: CkanCatSession @@ -38,6 +42,21 @@ def main(): if __name__ == "__main__": main() """ + + if not hasattr(cat_session, 'catalogue_type'): + raise WrongCatalogueError( + "CatSession missing catalogue_type attribute", + expected_catalogue=str(CatalogueType.CKAN), + received_catalogue="Unknown" + ) + + if cat_session.catalogue_type != CatalogueType.CKAN: + raise WrongCatalogueError( + "Invalid catalogue type. CkanCatExplorer requires a Ckan catalogue session.", + expected_catalogue=str(CatalogueType.CKAN), + received_catalogue=str(cat_session.catalogue_type) + ) + self.cat_session = cat_session # ---------------------------- @@ -1008,7 +1027,8 @@ def __init__(self, cat_session: CatSession): Allows user to start exploring data catalogue programatically - Make sure you pass a valid CkanCatSession in + Make sure you pass a valid CkanCatSession in - checks if the right type. + Args: CkanCatSession @@ -1018,6 +1038,22 @@ def __init__(self, cat_session: CatSession): with CatSession("ukpowernetworks.opendatasoft.com") as session: explore = CatExplorer(session) """ + + + if not hasattr(cat_session, 'catalogue_type'): + raise WrongCatalogueError( + "CatSession missing catalogue_type attribute", + expected_catalogue=str(CatalogueType.OPENDATA_SOFT), + received_catalogue="Unknown" + ) + + if cat_session.catalogue_type != CatalogueType.OPENDATA_SOFT: + raise WrongCatalogueError( + "Invalid catalogue type. OpenDataSoft requires an OpenDataSoft catalogue session.", + expected_catalogue=str(CatalogueType.OPENDATA_SOFT), + received_catalogue=str(cat_session.catalogue_type) + ) + self.cat_session = cat_session # ---------------------------- @@ -1193,7 +1229,7 @@ def __init__(self, cat_session: CatSession): Allows user to start exploring data catalogue programatically - Make sure you pass a valid CkanCatSession in + Make sure you pass a valid CkanCatSession in - checks if the right type. Args: CkanCatSession @@ -1208,6 +1244,21 @@ def main(): if __name__ == "__main__": main() """ + + if not hasattr(cat_session, 'catalogue_type'): + raise WrongCatalogueError( + "CatSession missing catalogue_type attribute", + expected_catalogue=str(CatalogueType.GOUV_FR), + received_catalogue="Unknown" + ) + + if cat_session.catalogue_type != CatalogueType.GOUV_FR: + raise WrongCatalogueError( + "Invalid catalogue type. FrenchGouvCatExplorer requires a French Government catalogue session.", + expected_catalogue=str(CatalogueType.GOUV_FR), + received_catalogue=str(cat_session.catalogue_type) + ) + self.cat_session = cat_session # ---------------------------- @@ -1239,121 +1290,34 @@ def check_health_check(self) -> None: # ---------------------------- def get_all_datasets(self) -> dict: """ - Paginates through all datasets in the French Government data catalogue - and creates a dictionary of acronyms and IDs using streaming. - Returns: - dict: Dictionary with dataset IDs as keys and acronyms as values - """ - datasets = {} - page = 1 - page_size = 100 - base_url = self.cat_session.base_url + FrenchGouvApiPaths.SHOW_DATASETS - - while True: - try: - params = { - 'page': page, - 'page_size': page_size - } - # Stream the response - response = self.cat_session.session.get(base_url, params=params, stream=True) - if response.status_code != 200: - logger.error(f"Failed to fetch page {page} with status code {response.status_code}") - break - - # Process the streaming response - next_page_exists = False - for line in response.iter_lines(): - if not line: - continue - - decoded_line = line.decode('utf-8') - - # Check for dataset entries - if '"id":' in decoded_line and '"acronym":' in decoded_line: - # Extract just what we need using string operations - id_start = decoded_line.find('"id": "') + 7 - id_end = decoded_line.find('"', id_start) - dataset_id = decoded_line[id_start:id_end] - - acronym_start = decoded_line.find('"acronym": "') + 11 - if acronym_start > 10: # Found acronym - acronym_end = decoded_line.find('"', acronym_start) - acronym = decoded_line[acronym_start:acronym_end] - else: - acronym = '' - - datasets[dataset_id] = acronym - - # Check for next_page - elif '"next_page":' in decoded_line: - next_page_exists = 'null' not in decoded_line - - if not next_page_exists: - break - - page += 1 - if page % 10 == 0: - logger.info(f"Processed {page} pages ({len(datasets)} datasets)") - - except Exception as e: - logger.error(f"Error processing page {page}: {str(e)}") - break - - logger.success(f"Finished processing {len(datasets)} datasets") - return datasets - - def get_datasets_by_id_dict(self, id: str) -> dict: - """ - Paginates through all datasets in the French Government data catalogue - and creates a dictionary of acronyms and IDs. + Uses DuckDB to read a Parquet file of whole French Gouv data catalogue instead and create a dictionary of slugs and IDs. Returns: - dict: Dictionary with dataset IDs as keys and acronyms as values + dict: Dictionary with slugs as keys and dataset IDs as values """ - datasets = {} - page = 1 - base_url = self.cat_session.base_url + FrenchGouvApiPaths.SHOW_DATASETS - - while True: - try: - # Make request with pagination - params = {'page': page} - response = self.cat_session.session.get(base_url, params=params) - - if response.status_code != 200: - logger.error(f"Failed to fetch page {page} with status code {response.status_code}") - break - - data = response.json() - - # Process datasets on current page - for dataset in data['data']: - dataset_id = dataset.get('id', '') - # Handle null or empty acronyms by setting to empty string - acronym = dataset.get('acronym') if dataset.get('acronym') else '' - datasets[dataset_id] = acronym - - # Check if we've reached the last page - if not data.get('next_page'): - break - - page += 1 - - # Optional: Log progress every 10 pages - if page % 10 == 0: - logger.info(f"Processed {page} pages ({len(datasets)} datasets)") - - except Exception as e: - logger.error(f"Error processing page {page}: {str(e)}") - break - - logger.success(f"Finished processing {len(datasets)} datasets") - return datasets + try: + with duckdb.connect(':memory:') as con: + # Install and load httpfs extension + con.execute("INSTALL httpfs;") + con.execute("LOAD httpfs;") + # Query to select only id and slug, converting to dict format + query = """ + SELECT DISTINCT slug, id + FROM read_parquet('https://object.files.data.gouv.fr/hydra-parquet/hydra-parquet/b06842f8ee27a0302ebbaaa344d35e4c.parquet') + WHERE slug IS NOT NULL AND id IS NOT NULL + """ + # Execute query and fetch results + result = con.execute(query).fetchall() + # Convert results to dictionary + datasets = {slug: id for slug, id in result} + return datasets + except Exception as e: + logger.error(f"Error processing parquet file: {str(e)}") + return {} def get_dataset_by_identifier(self, identifier: str) -> dict: """ - Fetches a specific dataset using either its ID or slug. + Fetches a metadata for a specific dataset using either its ID or slug. Args: identifier (str): Dataset ID or slug to fetch @@ -1414,3 +1378,45 @@ def get_datasets_by_identifiers(self, identifiers: list) -> dict: logger.success(f"Finished fetching {len(results)} datasets") return results + + # ---------------------------- + # Show available resources for a particular dataset + # ---------------------------- + def get_dataset_resources(self, dataset) -> list: + + try: + resources = [resource for resource in dataset["resources"]] + return resources + except Exception as e: + raise + + # ---------------------------- + # Show all organisation available + # ---------------------------- + def get_all_orgs(self) -> dict: + """ + Uses DuckDB to read a Parquet file of whole French Gouv data catalogue instead and create a dictionary of orgs and org ids. + + Returns: + dict: Dictionary with orgs as keys and org IDs as values + """ + + try: + with duckdb.connect(':memory:') as con: + # Install and load httpfs extension + con.execute("INSTALL httpfs;") + con.execute("LOAD httpfs;") + # Query to select only id and slug, converting to dict format + query = """ + SELECT DISTINCT organization, organization_id + FROM read_parquet('https://object.files.data.gouv.fr/hydra-parquet/hydra-parquet/b06842f8ee27a0302ebbaaa344d35e4c.parquet') + WHERE organization IS NOT NULL AND organization_id IS NOT NULL + """ + # Execute query and fetch results + result = con.execute(query).fetchall() + # Convert results to dictionary + organisations = {organization: organization_id for organization, organization_id in result} + return organisations + except Exception as e: + logger.error(f"Error processing parquet file: {str(e)}") + return {} diff --git a/HerdingCats/session/cat_session.py b/HerdingCats/session/cat_session.py index fb21d09..77761ef 100644 --- a/HerdingCats/session/cat_session.py +++ b/HerdingCats/session/cat_session.py @@ -7,21 +7,24 @@ from ..endpoints.api_endpoints import CkanDataCatalogues, OpenDataSoftDataCatalogues, FrenchGouvCatalogue from ..errors.cats_errors import CatSessionError -class CatalogType(Enum): +# Current Supported Catalogue Types +class CatalogueType(Enum): CKAN = "ckan" OPENDATA_SOFT = "opendatasoft" GOUV_FR = "french_gov" +# START A SESSION WITH A DATA CATALOGUE class CatSession: def __init__( self, domain: Union[str, CkanDataCatalogues, OpenDataSoftDataCatalogues, FrenchGouvCatalogue] ) -> None: """ - Initialise a session with a valid domain or predefined catalog. + Initialise a session with a valid domain or a predefined catalog. + Args: domain (url or catalogue item): str or catalog enum """ - self.domain, self.catalog_type = self._process_domain(domain) + self.domain, self.catalogue_type = self._process_domain(domain) self.session = requests.Session() self.base_url = ( f"https://{self.domain}" @@ -33,56 +36,61 @@ def __init__( @staticmethod def _process_domain( domain: Union[str, CkanDataCatalogues, OpenDataSoftDataCatalogues, FrenchGouvCatalogue] - ) -> tuple[str, CatalogType]: + ) -> tuple[str, CatalogueType]: """ - Process the domain to ensure it's in the correct format. + Process the domain to ensure that it's in the correct format. This iterates through the CkanDataCatalogues, OpenDataSoftDataCatalogues, and FrenchGouvCatalogue - Enums and checks for a match. Otherwise it processes the url as normal. + Enums and checks for a match. + + Otherwise it processes the url as normal. Args: domain (url or catalogue item): str or catalog enum Returns: a tuple of (url in the correct format, catalog type) """ + # Check predefined catalogs first if isinstance(domain, (CkanDataCatalogues, OpenDataSoftDataCatalogues, FrenchGouvCatalogue)): if isinstance(domain, FrenchGouvCatalogue): - catalog_type = CatalogType.GOUV_FR + catalog_type = CatalogueType.GOUV_FR else: catalog_type = ( - CatalogType.CKAN + CatalogueType.CKAN if isinstance(domain, CkanDataCatalogues) - else CatalogType.OPENDATA_SOFT + else CatalogueType.OPENDATA_SOFT ) parsed_url = urlparse(domain.value) return parsed_url.netloc if parsed_url.netloc else parsed_url.path, catalog_type + # Process as normal site url + # Check if site url is in the catalogue already elif isinstance(domain, str): - # Check predefined catalogs first for catalog_enum in (CkanDataCatalogues, OpenDataSoftDataCatalogues, FrenchGouvCatalogue): for catalog in catalog_enum: if domain.lower() == catalog.name.lower().replace("_", " "): parsed_url = urlparse(catalog.value) url = parsed_url.netloc if parsed_url.netloc else parsed_url.path if catalog_enum == FrenchGouvCatalogue: - catalog_type = CatalogType.GOUV_FR + catalog_type = CatalogueType.GOUV_FR else: catalog_type = ( - CatalogType.CKAN + CatalogueType.CKAN if catalog_enum == CkanDataCatalogues - else CatalogType.OPENDATA_SOFT + else CatalogueType.OPENDATA_SOFT ) return url, catalog_type - # If not a predefined catalog, process as a regular domain or URL + # If not a predefined catalogue item, process as a regular domain or URL parsed = urlparse(domain) domain_str = parsed.netloc if parsed.netloc else parsed.path # Check if it's a French government domain + # Otherwise default to CKAN if domain_str.endswith('.gouv.fr'): - return domain_str, CatalogType.GOUV_FR + return domain_str, CatalogueType.GOUV_FR else: - return domain_str, CatalogType.CKAN + return domain_str, CatalogueType.CKAN else: raise ValueError( "Domain must be a string, CkanDataCatalogues enum, OpenDataSoftDataCatalogues enum, or FrenchGouvCatalogue enum" @@ -90,8 +98,9 @@ def _process_domain( def _validate_url(self) -> None: """ - Validate the URL to catch any errors - Will raise status code error if there is a problem + Validate the URL to catch any errors. + + Will raise status code error if there is a problem with url. """ try: response = self.session.get(self.base_url, timeout=10) @@ -126,6 +135,6 @@ def __exit__(self, exc_type, exc_val, exc_tb): """Allows use with the context manager with""" self.close_session() - def get_catalog_type(self) -> CatalogType: + def get_catalogue_type(self) -> CatalogueType: """Return the catalog type (CKAN, OpenDataSoft, or French Government)""" - return self.catalog_type + return self.catalogue_type diff --git a/tests/french_gouv/test_frgouv_endpoint_health.py b/tests/french_gouv/test_frgouv_endpoint_health.py index 88f5b41..4b0b250 100644 --- a/tests/french_gouv/test_frgouv_endpoint_health.py +++ b/tests/french_gouv/test_frgouv_endpoint_health.py @@ -1,7 +1,8 @@ import pytest +import requests + from HerdingCats.session.cat_session import CatSession from HerdingCats.endpoints.api_endpoints import FrenchGouvApiPaths -import requests from loguru import logger CATALOGUES = [ diff --git a/tests/french_gouv/test_frgouv_get_datasets.py b/tests/french_gouv/test_frgouv_get_datasets.py new file mode 100644 index 0000000..6bef109 --- /dev/null +++ b/tests/french_gouv/test_frgouv_get_datasets.py @@ -0,0 +1,44 @@ +import pytest +import requests + +from HerdingCats.session.cat_session import CatSession +from HerdingCats.endpoints.api_endpoints import FrenchGouvApiPaths +from HerdingCats.explorer.cat_explore import FrenchGouvCatExplorer +from loguru import logger + +CATALOGUES = [ + "https://www.data.gouv.fr" +] + +@pytest.mark.parametrize("catalogue_url", CATALOGUES) +def test_ckan_health_check(catalogue_url): + """ + Check that predefined data catalogue fetches all data catalogs available. + The French government catalog should have >1000 datasets. + """ + with CatSession(catalogue_url) as cat_session: + explore = FrenchGouvCatExplorer(cat_session) + try: + data = explore.get_all_datasets() + data_length = len(data) + print(data_length) + assert data_length > 10000, ( + f"Insufficient datasets from {catalogue_url}. " + f"Found {data_length} datasets, expected >10000. " + "This might indicate an API issue or data availability problem." + ) + logger.info( + f"Health check passed for {catalogue_url}: " + f"retrieved {data_length} datasets" + ) + except requests.RequestException as e: + pytest.fail( + f"Connection failed to French Gov endpoint {catalogue_url}: " + f"Error: {str(e)}" + ) + except AssertionError as e: + pytest.fail(str(e)) + except Exception as e: + pytest.fail( + f"Unexpected error while fetching data from {catalogue_url}: " + )