Skip to content

Commit

Permalink
feat(french catalogue methods and tests): added in new methods for fr…
Browse files Browse the repository at this point in the history
…ench catalogue exploration and added tests for this [2024-12-03]
  • Loading branch information
CHRISCARLON committed Dec 3, 2024
1 parent 3d96544 commit d54b6d2
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 141 deletions.
2 changes: 1 addition & 1 deletion HerdingCats/endpoints/api_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class OpenDataSoftApiPaths:
SHOW_DATASET_EXPORTS = BASE_PATH.format("datasets/{}/exports")

# Alternative base paths...
# Sometimes these are needed - not sure why need to dig into this
# Sometimes these are needed - not sure why need to dig into this!
BASE_PATH_2 = "/api/explore/v2.0/catalog/{}"
SHOW_DATASETS_2 = BASE_PATH_2.format("datasets")
SHOW_DATASET_INFO_2 = BASE_PATH_2.format("datasets/{}")
Expand Down
31 changes: 28 additions & 3 deletions HerdingCats/errors/cats_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ class CatExplorerError(Exception):

class OpenDataSoftExplorerError(Exception):
"""
Custom exception class for OpenDataSoft Explorer errors with colored output using ANSI codes.
Custom exception class for OpenDataSoft Explorer errors.
"""
# ANSI escape codes for colors
RED = '\033[91m'
YELLOW = '\033[93m'
RESET = '\033[0m'
Expand All @@ -19,7 +18,6 @@ def __init__(self, message: str, original_error: Optional[Exception] = None) ->
self.message = message
self.original_error = original_error

# Build the error message with color
error_msg = (
f"{self.RED}OpenDataSoftExplorer Error 🐈‍⬛: {message}{self.RESET}"
)
Expand All @@ -34,3 +32,30 @@ def __init__(self, message: str, original_error: Optional[Exception] = None) ->

def __str__(self) -> str:
return self.args[0]

class WrongCatalogueError(CatExplorerError):
"""
Custom exception class for when the wrong catalogue type is used with an explorer.
"""

RED = '\033[91m'
YELLOW = '\033[93m'
RESET = '\033[0m'

def __init__(self, message: str, expected_catalogue: str, received_catalogue: Optional[str] = None) -> None:
self.message = message
self.expected_catalogue = expected_catalogue
self.received_catalogue = received_catalogue

error_msg = (
f"{self.RED}[Wrong Catalogue Error]: {message}{self.RESET}\n"
f"{self.YELLOW}Expected catalogue: {expected_catalogue}"
)
if received_catalogue:
error_msg += f"\nReceived catalogue: {received_catalogue}"
error_msg += f"{self.RESET}"

super().__init__(error_msg)

def __str__(self) -> str:
return self.args[0]
238 changes: 122 additions & 116 deletions HerdingCats/explorer/cat_explore.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
from loguru import logger
from urllib.parse import urlencode

from ..endpoints.api_endpoints import CkanApiPaths, OpenDataSoftApiPaths, FrenchGouvApiPaths
from ..errors.cats_errors import CatExplorerError
from ..session.cat_session import CatSession

from ..endpoints.api_endpoints import (
CkanApiPaths,
OpenDataSoftApiPaths,
FrenchGouvApiPaths
)
from ..errors.cats_errors import CatExplorerError, WrongCatalogueError
from ..session.cat_session import CatSession, CatalogueType

# FIND THE DATA YOU WANT / NEED / ISOLATE PACKAGES AND RESOURCES
# For Ckan Catalogues Only
Expand All @@ -23,7 +26,8 @@ def __init__(self, cat_session: CatSession):
Allows user to start exploring data catalogue programatically
Make sure you pass a valid CkanCatSession in
Make sure you pass a valid CkanCatSession in - checks if the right type.
Args:
CkanCatSession
Expand All @@ -38,6 +42,21 @@ def main():
if __name__ == "__main__":
main()
"""

if not hasattr(cat_session, 'catalogue_type'):
raise WrongCatalogueError(
"CatSession missing catalogue_type attribute",
expected_catalogue=str(CatalogueType.CKAN),
received_catalogue="Unknown"
)

if cat_session.catalogue_type != CatalogueType.CKAN:
raise WrongCatalogueError(
"Invalid catalogue type. CkanCatExplorer requires a Ckan catalogue session.",
expected_catalogue=str(CatalogueType.CKAN),
received_catalogue=str(cat_session.catalogue_type)
)

self.cat_session = cat_session

# ----------------------------
Expand Down Expand Up @@ -1008,7 +1027,8 @@ def __init__(self, cat_session: CatSession):
Allows user to start exploring data catalogue programatically
Make sure you pass a valid CkanCatSession in
Make sure you pass a valid CkanCatSession in - checks if the right type.
Args:
CkanCatSession
Expand All @@ -1018,6 +1038,22 @@ def __init__(self, cat_session: CatSession):
with CatSession("ukpowernetworks.opendatasoft.com") as session:
explore = CatExplorer(session)
"""


if not hasattr(cat_session, 'catalogue_type'):
raise WrongCatalogueError(
"CatSession missing catalogue_type attribute",
expected_catalogue=str(CatalogueType.OPENDATA_SOFT),
received_catalogue="Unknown"
)

if cat_session.catalogue_type != CatalogueType.OPENDATA_SOFT:
raise WrongCatalogueError(
"Invalid catalogue type. OpenDataSoft requires an OpenDataSoft catalogue session.",
expected_catalogue=str(CatalogueType.OPENDATA_SOFT),
received_catalogue=str(cat_session.catalogue_type)
)

self.cat_session = cat_session

# ----------------------------
Expand Down Expand Up @@ -1193,7 +1229,7 @@ def __init__(self, cat_session: CatSession):
Allows user to start exploring data catalogue programatically
Make sure you pass a valid CkanCatSession in
Make sure you pass a valid CkanCatSession in - checks if the right type.
Args:
CkanCatSession
Expand All @@ -1208,6 +1244,21 @@ def main():
if __name__ == "__main__":
main()
"""

if not hasattr(cat_session, 'catalogue_type'):
raise WrongCatalogueError(
"CatSession missing catalogue_type attribute",
expected_catalogue=str(CatalogueType.GOUV_FR),
received_catalogue="Unknown"
)

if cat_session.catalogue_type != CatalogueType.GOUV_FR:
raise WrongCatalogueError(
"Invalid catalogue type. FrenchGouvCatExplorer requires a French Government catalogue session.",
expected_catalogue=str(CatalogueType.GOUV_FR),
received_catalogue=str(cat_session.catalogue_type)
)

self.cat_session = cat_session

# ----------------------------
Expand Down Expand Up @@ -1239,121 +1290,34 @@ def check_health_check(self) -> None:
# ----------------------------
def get_all_datasets(self) -> dict:
"""
Paginates through all datasets in the French Government data catalogue
and creates a dictionary of acronyms and IDs using streaming.
Returns:
dict: Dictionary with dataset IDs as keys and acronyms as values
"""
datasets = {}
page = 1
page_size = 100
base_url = self.cat_session.base_url + FrenchGouvApiPaths.SHOW_DATASETS

while True:
try:
params = {
'page': page,
'page_size': page_size
}
# Stream the response
response = self.cat_session.session.get(base_url, params=params, stream=True)
if response.status_code != 200:
logger.error(f"Failed to fetch page {page} with status code {response.status_code}")
break

# Process the streaming response
next_page_exists = False
for line in response.iter_lines():
if not line:
continue

decoded_line = line.decode('utf-8')

# Check for dataset entries
if '"id":' in decoded_line and '"acronym":' in decoded_line:
# Extract just what we need using string operations
id_start = decoded_line.find('"id": "') + 7
id_end = decoded_line.find('"', id_start)
dataset_id = decoded_line[id_start:id_end]

acronym_start = decoded_line.find('"acronym": "') + 11
if acronym_start > 10: # Found acronym
acronym_end = decoded_line.find('"', acronym_start)
acronym = decoded_line[acronym_start:acronym_end]
else:
acronym = ''

datasets[dataset_id] = acronym

# Check for next_page
elif '"next_page":' in decoded_line:
next_page_exists = 'null' not in decoded_line

if not next_page_exists:
break

page += 1
if page % 10 == 0:
logger.info(f"Processed {page} pages ({len(datasets)} datasets)")

except Exception as e:
logger.error(f"Error processing page {page}: {str(e)}")
break

logger.success(f"Finished processing {len(datasets)} datasets")
return datasets

def get_datasets_by_id_dict(self, id: str) -> dict:
"""
Paginates through all datasets in the French Government data catalogue
and creates a dictionary of acronyms and IDs.
Uses DuckDB to read a Parquet file of whole French Gouv data catalogue instead and create a dictionary of slugs and IDs.
Returns:
dict: Dictionary with dataset IDs as keys and acronyms as values
dict: Dictionary with slugs as keys and dataset IDs as values
"""
datasets = {}
page = 1
base_url = self.cat_session.base_url + FrenchGouvApiPaths.SHOW_DATASETS

while True:
try:
# Make request with pagination
params = {'page': page}
response = self.cat_session.session.get(base_url, params=params)

if response.status_code != 200:
logger.error(f"Failed to fetch page {page} with status code {response.status_code}")
break

data = response.json()

# Process datasets on current page
for dataset in data['data']:
dataset_id = dataset.get('id', '')
# Handle null or empty acronyms by setting to empty string
acronym = dataset.get('acronym') if dataset.get('acronym') else ''
datasets[dataset_id] = acronym

# Check if we've reached the last page
if not data.get('next_page'):
break

page += 1

# Optional: Log progress every 10 pages
if page % 10 == 0:
logger.info(f"Processed {page} pages ({len(datasets)} datasets)")

except Exception as e:
logger.error(f"Error processing page {page}: {str(e)}")
break

logger.success(f"Finished processing {len(datasets)} datasets")
return datasets
try:
with duckdb.connect(':memory:') as con:
# Install and load httpfs extension
con.execute("INSTALL httpfs;")
con.execute("LOAD httpfs;")
# Query to select only id and slug, converting to dict format
query = """
SELECT DISTINCT slug, id
FROM read_parquet('https://object.files.data.gouv.fr/hydra-parquet/hydra-parquet/b06842f8ee27a0302ebbaaa344d35e4c.parquet')
WHERE slug IS NOT NULL AND id IS NOT NULL
"""
# Execute query and fetch results
result = con.execute(query).fetchall()
# Convert results to dictionary
datasets = {slug: id for slug, id in result}
return datasets
except Exception as e:
logger.error(f"Error processing parquet file: {str(e)}")
return {}

def get_dataset_by_identifier(self, identifier: str) -> dict:
"""
Fetches a specific dataset using either its ID or slug.
Fetches a metadata for a specific dataset using either its ID or slug.
Args:
identifier (str): Dataset ID or slug to fetch
Expand Down Expand Up @@ -1414,3 +1378,45 @@ def get_datasets_by_identifiers(self, identifiers: list) -> dict:

logger.success(f"Finished fetching {len(results)} datasets")
return results

# ----------------------------
# Show available resources for a particular dataset
# ----------------------------
def get_dataset_resources(self, dataset) -> list:

try:
resources = [resource for resource in dataset["resources"]]
return resources
except Exception as e:
raise

# ----------------------------
# Show all organisation available
# ----------------------------
def get_all_orgs(self) -> dict:
"""
Uses DuckDB to read a Parquet file of whole French Gouv data catalogue instead and create a dictionary of orgs and org ids.
Returns:
dict: Dictionary with orgs as keys and org IDs as values
"""

try:
with duckdb.connect(':memory:') as con:
# Install and load httpfs extension
con.execute("INSTALL httpfs;")
con.execute("LOAD httpfs;")
# Query to select only id and slug, converting to dict format
query = """
SELECT DISTINCT organization, organization_id
FROM read_parquet('https://object.files.data.gouv.fr/hydra-parquet/hydra-parquet/b06842f8ee27a0302ebbaaa344d35e4c.parquet')
WHERE organization IS NOT NULL AND organization_id IS NOT NULL
"""
# Execute query and fetch results
result = con.execute(query).fetchall()
# Convert results to dictionary
organisations = {organization: organization_id for organization, organization_id in result}
return organisations
except Exception as e:
logger.error(f"Error processing parquet file: {str(e)}")
return {}
Loading

0 comments on commit d54b6d2

Please sign in to comment.