Skip to content

Commit

Permalink
update logs
Browse files Browse the repository at this point in the history
  • Loading branch information
dontseyit committed Aug 5, 2024
1 parent 2ffaf6f commit ea54e8b
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 8 deletions.
6 changes: 6 additions & 0 deletions src/metldata/artifacts_rest/load_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

"""Logic for loading artifacts."""

import logging
from typing import cast

from ghga_event_schemas.pydantic_ import (
Expand All @@ -41,6 +42,8 @@
lookup_slot_in_resource,
)

log = logging.getLogger(__name__)


def extract_class_resources_from_artifact(
*, artifact_content: Json, resource_class: ArtifactResourceClass
Expand Down Expand Up @@ -136,6 +139,7 @@ async def process_removed_resources(
dao_collection: ArtifactDaoCollection,
):
"""Delete no longer needed artifact resources from DB and send corresponding events"""
log.info("Starting to process removed resources: %s", len(resource_tags))
for resource_tag in resource_tags:
artifact_name, class_name, resource_id = resource_tag
# resource tag was obtained from querying the db, so resource with given ID
Expand Down Expand Up @@ -164,6 +168,8 @@ async def process_new_or_changed_resources(
dao_collection: ArtifactDaoCollection,
):
"""Insert newly received artifact resources into DB and send corresponding events"""
log.info("Starting to process new or changed resources: %s", len(resources))

for resource_tag, resource in resources.items():
artifact_name = resource_tag[0]
# no resource tag was obtained from querying the db, so the resource with the
Expand Down
3 changes: 0 additions & 3 deletions src/metldata/load/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ async def load_artifacts(
):
"""Load artifacts into services for querying via user-accessible API."""
try:
log.debug("Checking artifact resources...")
check_artifact_resources(
artifact_resources=artifact_resources,
artifact_infos=artifact_info_dict,
Expand All @@ -123,15 +122,13 @@ async def load_artifacts(
config=config, provider=event_pub_provider
)

log.debug("Loading artifacts...")
await load_artifacts_using_dao(
artifact_resources=artifact_resources,
artifact_info_dict=artifact_info_dict,
event_publisher=event_publisher,
dao_collection=dao_collection,
)

log.debug("Creating stats for artifacts...")
await create_stats_using_aggregator(
artifact_infos=artifact_info_dict,
primary_artifact_name=primary_artifact_name,
Expand Down
13 changes: 9 additions & 4 deletions src/metldata/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def load_artifacts_using_dao(
dao_collection: ArtifactDaoCollection,
) -> None:
"""Load artifact resources from multiple submissions using the given dao collection."""
log.debug("Loading artifact resources...")
log.info("Starting to load artifact resources.")
(
removed_resource_tags,
new_resources,
Expand All @@ -65,30 +65,35 @@ async def load_artifacts_using_dao(
artifact_info_dict=artifact_info_dict,
dao_collection=dao_collection,
)
log.info(
"Fetched changed resources: %d removed, %d new, %d changed.",
len(removed_resource_tags),
len(new_resources),
len(changed_resources),
)

log.debug("Processing removed resources... Count: %s", len(removed_resource_tags))
await process_removed_resources(
event_publisher=event_publisher,
resource_tags=removed_resource_tags,
dao_collection=dao_collection,
)

log.debug("Processing new resources... Count: %s", len(new_resources))
await process_new_or_changed_resources(
artifact_info_dict=artifact_info_dict,
event_publisher=event_publisher,
resources=new_resources,
dao_collection=dao_collection,
)

log.debug("Processing changed resources... Count: %s", len(changed_resources))
await process_new_or_changed_resources(
artifact_info_dict=artifact_info_dict,
event_publisher=event_publisher,
resources=changed_resources,
dao_collection=dao_collection,
)

log.info("Finished loading artifact resources.")


async def _get_changed_resources(
artifact_resources: ArtifactResourceDict,
Expand Down
20 changes: 19 additions & 1 deletion src/metldata/load/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,49 @@ async def create_stats_using_aggregator(
db_aggregator: DbAggregator,
) -> None:
"""Create summary by running an aggregation pipeline on the database."""
log.debug("Creating global summary statistics...")
log.info(
"Creating global summary statistics for artifact: %s",
primary_artifact_name,
)
resource_stats: dict[str, ResourceStats] = {}
artifact_name = primary_artifact_name
artifact_info = artifact_infos[artifact_name]
for resource_class in artifact_info.resource_classes:
collection_name = f"art_{artifact_name}_class_{resource_class}"
log.debug("Aggregating data for %s", collection_name)

pipeline: list[dict[str, Any]] = [{"$count": "count"}]
result = await db_aggregator.aggregate(
collection_name=collection_name, pipeline=pipeline
)
if not result:
# Initialize with default values if not present
log.debug(
"No data found for resource class %s, initializing with default values",
resource_class,
)
resource_stats[resource_class] = ResourceStats(count=0)
continue
resource_stats[resource_class] = cast(ResourceStats, result[0])

stat_slot = get_stat_slot(resource_class)
if not stat_slot:
log.debug(
"No stat slot found for resource class %s, skipping detailed stats",
resource_class,
)
continue

pipeline = [{"$group": {"_id": f"$content.{stat_slot}", "count": {"$sum": 1}}}]
result = await db_aggregator.aggregate(
collection_name=collection_name, pipeline=pipeline
)
if not result:
log.debug(
"No detailed stats found for resource class %s, stat slot %s",
resource_class,
stat_slot,
)
continue

stats: list[ValueCount] = sorted(
Expand All @@ -96,6 +113,7 @@ async def create_stats_using_aggregator(
resource_stats[resource_class]["stats"] = {stat_slot: stats}

if resource_stats:
log.debug("Updating global statistics summary in the database")
global_stats = GlobalStats(
id="global", created=now_as_utc(), resource_stats=resource_stats
)
Expand Down

0 comments on commit ea54e8b

Please sign in to comment.