diff --git a/acapy_agent/__main__.py b/acapy_agent/__main__.py index bd0c964705..9669588b83 100644 --- a/acapy_agent/__main__.py +++ b/acapy_agent/__main__.py @@ -1,8 +1,11 @@ """acapy_agent package entry point.""" +import logging import os import sys +LOGGER = logging.getLogger(__name__) + def init_debug(args): """Initialize debugging environment.""" @@ -26,16 +29,18 @@ def init_debug(args): import debugpy debugpy.listen((DAP_HOST, DAP_PORT)) - print(f"=== Waiting for debugger to attach to {DAP_HOST}:{DAP_PORT} ===") + LOGGER.info( + f"=== Waiting for debugger to attach to {DAP_HOST}:{DAP_PORT} ===" + ) debugpy.wait_for_client() except ImportError: - print("debugpy library was not found") + LOGGER.error("debugpy library was not found") if ENABLE_PYDEVD_PYCHARM or "--debug-pycharm" in args: try: import pydevd_pycharm - print( + LOGGER.info( "aca-py remote debugging to " f"{PYDEVD_PYCHARM_HOST}:{PYDEVD_PYCHARM_AGENT_PORT}" ) @@ -47,7 +52,7 @@ def init_debug(args): suspend=False, ) except ImportError: - print("pydevd_pycharm library was not found") + LOGGER.error("pydevd_pycharm library was not found") def run(args): diff --git a/acapy_agent/anoncreds/default/did_indy/registry.py b/acapy_agent/anoncreds/default/did_indy/registry.py index dcaafe4c06..388dfe5022 100644 --- a/acapy_agent/anoncreds/default/did_indy/registry.py +++ b/acapy_agent/anoncreds/default/did_indy/registry.py @@ -41,7 +41,7 @@ def supported_identifiers_regex(self) -> Pattern: async def setup(self, context: InjectionContext): """Setup.""" - print("Successfully registered DIDIndyRegistry") + LOGGER.info("Successfully registered DIDIndyRegistry") async def get_schema(self, profile: Profile, schema_id: str) -> GetSchemaResult: """Get a schema from the registry.""" diff --git a/acapy_agent/anoncreds/default/did_web/registry.py b/acapy_agent/anoncreds/default/did_web/registry.py index f97ba88fb8..9afa2ebb80 100644 --- a/acapy_agent/anoncreds/default/did_web/registry.py +++ b/acapy_agent/anoncreds/default/did_web/registry.py @@ -1,5 +1,6 @@ """DID Web Registry.""" +import logging import re from typing import Optional, Pattern, Sequence @@ -17,6 +18,8 @@ ) from ...models.schema import AnonCredsSchema, GetSchemaResult, SchemaResult +LOGGER = logging.getLogger(__name__) + class DIDWebRegistry(BaseAnonCredsResolver, BaseAnonCredsRegistrar): """DIDWebRegistry.""" @@ -40,7 +43,7 @@ def supported_identifiers_regex(self) -> Pattern: async def setup(self, context: InjectionContext): """Setup.""" - print("Successfully registered DIDWebRegistry") + LOGGER.info("Successfully registered DIDWebRegistry") async def get_schema(self, profile, schema_id: str) -> GetSchemaResult: """Get a schema from the registry.""" diff --git a/acapy_agent/anoncreds/default/legacy_indy/registry.py b/acapy_agent/anoncreds/default/legacy_indy/registry.py index aff1040616..2f91937580 100644 --- a/acapy_agent/anoncreds/default/legacy_indy/registry.py +++ b/acapy_agent/anoncreds/default/legacy_indy/registry.py @@ -144,7 +144,7 @@ def supported_identifiers_regex(self) -> Pattern: async def setup(self, context: InjectionContext): """Setup.""" - print("Successfully registered LegacyIndyRegistry") + LOGGER.info("Successfully registered LegacyIndyRegistry") @staticmethod def make_schema_id(schema: AnonCredsSchema) -> str: diff --git a/acapy_agent/commands/provision.py b/acapy_agent/commands/provision.py index 962b38dc8e..f0fb7cf561 100644 --- a/acapy_agent/commands/provision.py +++ b/acapy_agent/commands/provision.py @@ -1,6 +1,7 @@ """Provision command for setting up agent settings before starting.""" import asyncio +import logging from typing import Sequence from configargparse import ArgumentParser @@ -22,6 +23,8 @@ from ..storage.base import BaseStorage from . import PROG +LOGGER = logging.getLogger(__name__) + class ProvisionError(BaseError): """Base exception for provisioning errors.""" @@ -58,9 +61,9 @@ async def provision(settings: dict): ) if await ledger_config(root_profile, public_did and public_did.did, True): - print("Ledger configured") + LOGGER.info("Ledger configured") else: - print("Ledger not configured") + LOGGER.warning("Ledger not configured") await root_profile.close() except BaseError as e: diff --git a/acapy_agent/commands/start.py b/acapy_agent/commands/start.py index f30fc5c582..fb8652314c 100644 --- a/acapy_agent/commands/start.py +++ b/acapy_agent/commands/start.py @@ -31,7 +31,7 @@ async def start_app(conductor: Conductor): async def shutdown_app(conductor: Conductor): """Shut down.""" - print("\nShutting down") + LOGGER.info("Shutting down") await conductor.stop() @@ -59,7 +59,7 @@ def execute(argv: Sequence[str] = None): # Run the application if uvloop: uvloop.install() - print("uvloop installed") + LOGGER.info("uvloop installed") run_loop(start_app(conductor), shutdown_app(conductor)) diff --git a/acapy_agent/config/argparse.py b/acapy_agent/config/argparse.py index 87a67aeaf7..d56a218bb6 100644 --- a/acapy_agent/config/argparse.py +++ b/acapy_agent/config/argparse.py @@ -63,7 +63,7 @@ def get_registered(cls, category: Optional[str] = None): def create_argument_parser(*, prog: Optional[str] = None): - """Create am instance of an arg parser, force yaml format for external config.""" + """Create an instance of an arg parser, force yaml format for external config.""" return ArgumentParser(config_file_parser_class=YAMLConfigFileParser, prog=prog) diff --git a/acapy_agent/config/default_context.py b/acapy_agent/config/default_context.py index 136c79791d..8f28b0eaee 100644 --- a/acapy_agent/config/default_context.py +++ b/acapy_agent/config/default_context.py @@ -1,5 +1,7 @@ """Classes for configuring the default injection context.""" +import logging + from ..anoncreds.registry import AnonCredsRegistry from ..cache.base import BaseCache from ..cache.in_memory import InMemoryCache @@ -26,17 +28,22 @@ from .injection_context import InjectionContext from .provider import CachedProvider, ClassProvider +LOGGER = logging.getLogger(__name__) + class DefaultContextBuilder(ContextBuilder): """Default context builder.""" async def build_context(self) -> InjectionContext: """Build the base injection context; set DIDComm prefix to emit.""" + LOGGER.debug("Building new injection context") + context = InjectionContext(settings=self.settings) context.settings.set_default("default_label", "Aries Cloud Agent") if context.settings.get("timing.enabled"): timing_log = context.settings.get("timing.log_file") + LOGGER.debug("Enabling timing collector with log file: %s", timing_log) collector = Collector(log_path=timing_log) context.injector.bind_instance(Collector, collector) @@ -63,11 +70,8 @@ async def build_context(self) -> InjectionContext: # DIDComm Messaging if context.settings.get("experiment.didcomm_v2"): - from didcomm_messaging import ( - CryptoService, - PackagingService, - RoutingService, - ) + LOGGER.info("DIDComm v2 experimental mode enabled") + from didcomm_messaging import CryptoService, PackagingService, RoutingService from didcomm_messaging.crypto.backend.askar import AskarCryptoService context.injector.bind_instance(CryptoService, AskarCryptoService()) @@ -81,11 +85,13 @@ async def build_context(self) -> InjectionContext: async def bind_providers(self, context: InjectionContext): """Bind various class providers.""" + LOGGER.debug("Begin binding providers to context") context.injector.bind_provider(ProfileManager, ProfileManagerProvider()) wallet_type = self.settings.get("wallet.type") if wallet_type == "askar-anoncreds": + LOGGER.debug("Using AnonCreds tails server") context.injector.bind_provider( BaseTailsServer, ClassProvider( @@ -93,6 +99,7 @@ async def bind_providers(self, context: InjectionContext): ), ) else: + LOGGER.debug("Using Indy tails server") context.injector.bind_provider( BaseTailsServer, ClassProvider( @@ -104,12 +111,7 @@ async def bind_providers(self, context: InjectionContext): context.injector.bind_provider( BaseWireFormat, CachedProvider( - # StatsProvider( ClassProvider("acapy_agent.transport.pack_format.PackWireFormat"), - # ( - # "encode_message", "parse_message" - # ), - # ) ), ) @@ -120,6 +122,7 @@ async def bind_providers(self, context: InjectionContext): async def load_plugins(self, context: InjectionContext): """Set up plugin registry and load plugins.""" + LOGGER.debug("Initializing plugin registry") plugin_registry = PluginRegistry( blocklist=self.settings.get("blocked_plugins", []) ) @@ -130,18 +133,18 @@ async def load_plugins(self, context: InjectionContext): if not self.settings.get("transport.disabled"): plugin_registry.register_package("acapy_agent.protocols") - # Currently providing admin routes only - plugin_registry.register_plugin("acapy_agent.holder") - - plugin_registry.register_plugin("acapy_agent.ledger") - - plugin_registry.register_plugin("acapy_agent.messaging.jsonld") - plugin_registry.register_plugin("acapy_agent.resolver") - plugin_registry.register_plugin("acapy_agent.settings") - plugin_registry.register_plugin("acapy_agent.vc") - plugin_registry.register_plugin("acapy_agent.vc.data_integrity") - plugin_registry.register_plugin("acapy_agent.wallet") - plugin_registry.register_plugin("acapy_agent.wallet.keys") + # Define plugin groups + default_plugins = [ + "acapy_agent.holder", + "acapy_agent.ledger", + "acapy_agent.messaging.jsonld", + "acapy_agent.resolver", + "acapy_agent.settings", + "acapy_agent.vc", + "acapy_agent.vc.data_integrity", + "acapy_agent.wallet", + "acapy_agent.wallet.keys", + ] anoncreds_plugins = [ "acapy_agent.anoncreds", @@ -157,26 +160,34 @@ async def load_plugins(self, context: InjectionContext): "acapy_agent.revocation", ] - def register_askar_plugins(): - for plugin in askar_plugins: + def register_plugins(plugins: list[str], plugin_type: str): + """Register a group of plugins with logging.""" + LOGGER.debug("Registering %s plugins", plugin_type) + for plugin in plugins: plugin_registry.register_plugin(plugin) + def register_askar_plugins(): + register_plugins(askar_plugins, "askar") + def register_anoncreds_plugins(): - for plugin in anoncreds_plugins: - plugin_registry.register_plugin(plugin) + register_plugins(anoncreds_plugins, "anoncreds") - if wallet_type == "askar-anoncreds": - register_anoncreds_plugins() - else: - register_askar_plugins() + register_plugins(default_plugins, "default") if context.settings.get("multitenant.admin_enabled"): + LOGGER.debug("Multitenant admin enabled - registering additional plugins") plugin_registry.register_plugin("acapy_agent.multitenant.admin") register_askar_plugins() register_anoncreds_plugins() + else: + if wallet_type == "askar-anoncreds": + register_anoncreds_plugins() + else: + register_askar_plugins() # Register external plugins for plugin_path in self.settings.get("external_plugins", []): + LOGGER.debug("Registering external plugin: %s", plugin_path) plugin_registry.register_plugin(plugin_path) # Register message protocols diff --git a/acapy_agent/config/ledger.py b/acapy_agent/config/ledger.py index 7d7acb1e7e..ab335913b9 100644 --- a/acapy_agent/config/ledger.py +++ b/acapy_agent/config/ledger.py @@ -34,14 +34,18 @@ async def fetch_genesis_transactions(genesis_url: str) -> str: # https://github.com/openwallet-foundation/acapy/issues/1745 return await fetch(genesis_url, headers=headers, max_attempts=20) except FetchError as e: + LOGGER.error("Error retrieving genesis transactions from %s: %s", genesis_url, e) raise ConfigError("Error retrieving ledger genesis transactions") from e async def get_genesis_transactions(settings: Settings) -> str: """Fetch genesis transactions if necessary.""" + LOGGER.debug("Getting genesis transactions from settings") txns = settings.get("ledger.genesis_transactions") + LOGGER.debug("Genesis transactions from settings: %s", "found" if txns else "absent") if not txns: + LOGGER.debug("No genesis transactions found in settings") if settings.get("ledger.genesis_url"): txns = await fetch_genesis_transactions(settings["ledger.genesis_url"]) elif settings.get("ledger.genesis_file"): @@ -51,8 +55,10 @@ async def get_genesis_transactions(settings: Settings) -> str: with open(genesis_path, "r") as genesis_file: txns = genesis_file.read() except IOError as e: + LOGGER.error("Failed to read genesis file: %s", str(e)) raise ConfigError("Error reading ledger genesis transactions") from e if txns: + LOGGER.debug("Storing genesis transactions in settings") settings["ledger.genesis_transactions"] = txns return txns @@ -63,6 +69,8 @@ async def load_multiple_genesis_transactions_from_config(settings: Settings): ledger_config_list = settings.get("ledger.ledger_config_list") ledger_txns_list = [] write_ledger_set = False + LOGGER.debug("Processing %d ledger configs", len(ledger_config_list)) + for config in ledger_config_list: txns = None if "genesis_transactions" in config: @@ -74,11 +82,12 @@ async def load_multiple_genesis_transactions_from_config(settings: Settings): try: genesis_path = config.get("genesis_file") LOGGER.info( - "Reading ledger genesis transactions from: %s", genesis_path + "Reading ledger genesis transactions from file: %s", genesis_path ) with open(genesis_path, "r") as genesis_file: txns = genesis_file.read() except IOError as e: + LOGGER.error("Failed to read genesis file: %s", str(e)) raise ConfigError("Error reading ledger genesis transactions") from e is_write_ledger = ( False if config.get("is_write") is None else config.get("is_write") @@ -119,6 +128,7 @@ async def load_multiple_genesis_transactions_from_config(settings: Settings): " genesis_file and genesis_transactions provided." ) settings["ledger.ledger_config_list"] = ledger_txns_list + LOGGER.debug("Processed %d ledger configs successfully", len(ledger_txns_list)) async def ledger_config( @@ -126,6 +136,10 @@ async def ledger_config( ) -> bool: """Perform Indy ledger configuration.""" + LOGGER.debug( + "Configuring ledger for profile %s and public_did %s", profile.name, public_did + ) + session = await profile.session() ledger = session.inject_or(BaseLedger) @@ -136,32 +150,46 @@ async def ledger_config( async with ledger: # Check transaction author agreement acceptance if not ledger.read_only: + LOGGER.debug("Checking transaction author agreement") taa_info = await ledger.get_txn_author_agreement() if taa_info["taa_required"] and public_did: + LOGGER.debug("TAA acceptance required") taa_accepted = await ledger.get_latest_txn_author_acceptance() if ( not taa_accepted or taa_info["taa_record"]["digest"] != taa_accepted["digest"] ): + LOGGER.info("TAA acceptance needed - performing acceptance") if not await accept_taa(ledger, profile, taa_info, provision): + LOGGER.warning("TAA acceptance failed") return False + LOGGER.info("TAA acceptance completed") # Publish endpoints if necessary - skipped if TAA is required but not accepted endpoint = session.settings.get("default_endpoint") if public_did: wallet = session.inject(BaseWallet) try: + LOGGER.debug("Setting DID endpoint to: %s", endpoint) await wallet.set_did_endpoint(public_did, endpoint, ledger) except LedgerError as x_ledger: + LOGGER.error("Error setting DID endpoint: %s", x_ledger.message) raise ConfigError(x_ledger.message) from x_ledger # e.g., read-only # Publish profile endpoint if ledger is NOT read-only profile_endpoint = session.settings.get("profile_endpoint") if profile_endpoint and not ledger.read_only: + LOGGER.debug( + "Publishing profile endpoint: %s for DID: %s", + profile_endpoint, + public_did, + ) await ledger.update_endpoint_for_did( public_did, profile_endpoint, EndpointType.PROFILE ) + LOGGER.info("Profile endpoint published successfully") + LOGGER.info("Ledger configuration complete") return True diff --git a/acapy_agent/config/logging/configurator.py b/acapy_agent/config/logging/configurator.py index 11763dca08..128ccf5e49 100644 --- a/acapy_agent/config/logging/configurator.py +++ b/acapy_agent/config/logging/configurator.py @@ -32,6 +32,8 @@ TimedRotatingFileMultiProcessHandler, ) +LOGGER = logging.getLogger(__name__) + def load_resource(path: str, encoding: Optional[str] = None): """Open a resource file located in a python package or the local filesystem. @@ -57,7 +59,8 @@ def load_resource(path: str, encoding: Optional[str] = None): return io.TextIOWrapper(bstream, encoding=encoding) return bstream except IOError: - pass + LOGGER.warning("Resource not found: %s", path) + return None def dictConfig(config, new_file_path=None): @@ -95,18 +98,7 @@ def fileConfig( raise RuntimeError(f"{fname} is invalid: {e}") if new_file_path and cp.has_section("handler_timed_file_handler"): - cp.set( - "handler_timed_file_handler", - "args", - str( - ( - f"{new_file_path}", - "d", - 7, - 1, - ) - ), - ) + cp.set("handler_timed_file_handler", "args", str((new_file_path, "d", 7, 1))) formatters = _create_formatters(cp) with logging._lock: diff --git a/acapy_agent/config/wallet.py b/acapy_agent/config/wallet.py index ce5beb8fc3..c0a91741e3 100644 --- a/acapy_agent/config/wallet.py +++ b/acapy_agent/config/wallet.py @@ -62,11 +62,11 @@ async def wallet_config( if provision: if profile.created: - print("Created new profile") + LOGGER.info("Created new profile") else: - print("Opened existing profile") - print("Profile backend:", profile.backend) - print("Profile name:", profile.name) + LOGGER.info("Opened existing profile") + LOGGER.info("Profile backend: %s", profile.backend) + LOGGER.info("Profile name: %s", profile.name) wallet_seed = context.settings.get("wallet.seed") wallet_local_did = context.settings.get("wallet.local_did") @@ -85,8 +85,8 @@ async def wallet_config( ) public_did = replace_did_info.did await wallet.set_public_did(public_did) - print(f"Created new public DID: {public_did}") - print(f"Verkey: {replace_did_info.verkey}") + LOGGER.info(f"Created new public DID: {public_did}") + LOGGER.info(f"Verkey: {replace_did_info.verkey}") else: # If we already have a registered public did and it doesn't match # the one derived from `wallet_seed` then we error out. @@ -108,20 +108,20 @@ async def wallet_config( ) local_did = local_did_info.did if provision: - print(f"Created new local DID: {local_did}") - print(f"Verkey: {local_did_info.verkey}") + LOGGER.info(f"Created new local DID: {local_did}") + LOGGER.info(f"Verkey: {local_did_info.verkey}") else: public_did_info = await wallet.create_public_did( method=SOV, key_type=ED25519, seed=wallet_seed ) public_did = public_did_info.did if provision: - print(f"Created new public DID: {public_did}") - print(f"Verkey: {public_did_info.verkey}") + LOGGER.info(f"Created new public DID: {public_did}") + LOGGER.info(f"Verkey: {public_did_info.verkey}") # wait until ledger config to set public DID endpoint - wallet goes first if provision and not wallet_local_did and not public_did: - print("No public DID") + LOGGER.info("No public DID") # Debug settings test_seed = context.settings.get("debug.seed") diff --git a/acapy_agent/core/conductor.py b/acapy_agent/core/conductor.py index 9935b6f745..e44bbead93 100644 --- a/acapy_agent/core/conductor.py +++ b/acapy_agent/core/conductor.py @@ -42,10 +42,7 @@ from ..messaging.responder import BaseResponder from ..multitenant.base import BaseMultitenantManager from ..multitenant.manager_provider import MultitenantManagerProvider -from ..protocols.connections.v1_0.manager import ( - ConnectionManager, - ConnectionManagerError, -) +from ..protocols.connections.v1_0.manager import ConnectionManager, ConnectionManagerError from ..protocols.connections.v1_0.messages.connection_invitation import ( ConnectionInvitation, ) @@ -80,7 +77,7 @@ from ..wallet.anoncreds_upgrade import upgrade_wallet_to_anoncreds_if_requested from ..wallet.did_info import DIDInfo from .dispatcher import Dispatcher -from .error import StartupError +from .error import ProfileError, StartupError from .oob_processor import OobMessageProcessor from .util import SHUTDOWN_EVENT_TOPIC, STARTUP_EVENT_TOPIC @@ -124,41 +121,60 @@ def context(self) -> InjectionContext: async def setup(self): """Initialize the global request context.""" + LOGGER.debug("Starting setup of the Conductor") context = await self.context_builder.build_context() + LOGGER.debug("Context built successfully") if self.force_agent_anoncreds: + LOGGER.debug( + "Force agent anoncreds is enabled. " + "Setting wallet type to 'askar-anoncreds'." + ) context.settings.set_value("wallet.type", "askar-anoncreds") # Fetch genesis transactions if necessary if context.settings.get("ledger.ledger_config_list"): + LOGGER.debug( + "Ledger config list found. Loading multiple genesis transactions" + ) await load_multiple_genesis_transactions_from_config(context.settings) if ( context.settings.get("ledger.genesis_transactions") or context.settings.get("ledger.genesis_file") or context.settings.get("ledger.genesis_url") ): + LOGGER.debug( + "Genesis transactions/configurations found. Fetching genesis transactions" + ) await get_genesis_transactions(context.settings) # Configure the root profile + LOGGER.debug("Configuring the root profile and setting up public DID") self.root_profile, self.setup_public_did = await wallet_config(context) context = self.root_profile.context + LOGGER.debug("Root profile configured successfully") # Multiledger Setup - if ( - context.settings.get("ledger.ledger_config_list") - and len(context.settings.get("ledger.ledger_config_list")) > 0 - ): + ledger_config_list = context.settings.get("ledger.ledger_config_list") + if ledger_config_list and len(ledger_config_list) > 0: + LOGGER.debug("Setting up multiledger manager") context.injector.bind_provider( BaseMultipleLedgerManager, MultiIndyLedgerManagerProvider(self.root_profile), ) - if not (context.settings.get("ledger.genesis_transactions")): + if not context.settings.get("ledger.genesis_transactions"): ledger = context.injector.inject(BaseLedger) + LOGGER.debug( + "Ledger backend: %s, Profile backend: %s", + ledger.BACKEND_NAME, + self.root_profile.BACKEND_NAME, + ) if ( self.root_profile.BACKEND_NAME == "askar" and ledger.BACKEND_NAME == "indy-vdr" ): + LOGGER.debug("Binding IndyCredxVerifier for 'askar' backend.") context.injector.bind_provider( IndyVerifier, ClassProvider( @@ -170,6 +186,9 @@ async def setup(self): self.root_profile.BACKEND_NAME == "askar-anoncreds" and ledger.BACKEND_NAME == "indy-vdr" ): + LOGGER.debug( + "Binding IndyCredxVerifier for 'askar-anoncreds' backend." + ) context.injector.bind_provider( IndyVerifier, ClassProvider( @@ -178,6 +197,7 @@ async def setup(self): ), ) else: + LOGGER.error("Unsupported ledger backend for multiledger setup.") raise MultipleLedgerManagerError( "Multiledger is supported only for Indy SDK or Askar " "[Indy VDR] profile" @@ -187,13 +207,17 @@ async def setup(self): ) # Configure the ledger - if not await ledger_config( + ledger_configured = await ledger_config( self.root_profile, self.setup_public_did and self.setup_public_did.did - ): - LOGGER.warning("No ledger configured") + ) + if not ledger_configured: + LOGGER.warning("No ledger configured.") + else: + LOGGER.debug("Ledger configured successfully.") if not context.settings.get("transport.disabled"): # Register all inbound transports if enabled + LOGGER.debug("Transport not disabled. Setting up inbound transports.") self.inbound_transport_manager = InboundTransportManager( self.root_profile, self.inbound_message_router, self.handle_not_returned ) @@ -201,45 +225,54 @@ async def setup(self): context.injector.bind_instance( InboundTransportManager, self.inbound_transport_manager ) + LOGGER.debug("Inbound transports registered successfully.") - if not context.settings.get("transport.disabled"): # Register all outbound transports + LOGGER.debug("Setting up outbound transports.") self.outbound_transport_manager = OutboundTransportManager( self.root_profile, self.handle_not_delivered ) await self.outbound_transport_manager.setup() + LOGGER.debug("Outbound transports registered successfully.") # Initialize dispatcher + LOGGER.debug("Initializing dispatcher.") self.dispatcher = Dispatcher(self.root_profile) await self.dispatcher.setup() + LOGGER.debug("Dispatcher initialized successfully.") wire_format = context.inject_or(BaseWireFormat) if wire_format and hasattr(wire_format, "task_queue"): wire_format.task_queue = self.dispatcher.task_queue + LOGGER.debug("Wire format task queue bound to dispatcher.") # Bind manager for multitenancy related tasks if context.settings.get("multitenant.enabled"): + LOGGER.debug("Multitenant is enabled. Binding MultitenantManagerProvider.") context.injector.bind_provider( BaseMultitenantManager, MultitenantManagerProvider(self.root_profile) ) # Bind route manager provider + LOGGER.debug("Binding RouteManagerProvider.") context.injector.bind_provider( RouteManager, RouteManagerProvider(self.root_profile) ) - # Bind oob message processor to be able to receive and process un-encrypted - # messages + # Bind OobMessageProcessor to be able to receive and process unencrypted messages + LOGGER.debug("Binding OobMessageProcessor.") context.injector.bind_instance( OobMessageProcessor, OobMessageProcessor(inbound_message_router=self.inbound_message_router), ) # Bind default PyLD document loader + LOGGER.debug("Binding default DocumentLoader.") context.injector.bind_instance(DocumentLoader, DocumentLoader(self.root_profile)) # Admin API if context.settings.get("admin.enabled"): + LOGGER.debug("Admin API is enabled. Attempting to register admin server.") try: admin_host = context.settings.get("admin.host", "0.0.0.0") admin_port = context.settings.get("admin.port", "80") @@ -255,13 +288,15 @@ async def setup(self): self.get_stats, ) context.injector.bind_instance(BaseAdminServer, self.admin_server) + LOGGER.debug("Admin server registered on %s:%s", admin_host, admin_port) except Exception: - LOGGER.exception("Unable to register admin server") + LOGGER.exception("Unable to register admin server.") raise # Fetch stats collector, if any collector = context.inject_or(Collector) if collector: + LOGGER.debug("Stats collector found. Wrapping methods for collection.") # add stats to our own methods collector.wrap( self, @@ -280,32 +315,40 @@ async def setup(self): "find_inbound_connection", ), ) + LOGGER.debug("Methods wrapped with stats collector.") async def start(self) -> None: """Start the agent.""" - + LOGGER.debug("Starting the Conductor agent.") context = self.root_profile.context await self.check_for_valid_wallet_type(self.root_profile) + LOGGER.debug("Wallet type validated.") if not context.settings.get("transport.disabled"): # Start up transports if enabled try: + LOGGER.debug("Transport not disabled. Starting inbound transports.") await self.inbound_transport_manager.start() + LOGGER.debug("Inbound transports started successfully.") except Exception: - LOGGER.exception("Unable to start inbound transports") + LOGGER.exception("Unable to start inbound transports.") raise try: + LOGGER.debug("Starting outbound transports.") await self.outbound_transport_manager.start() + LOGGER.debug("Outbound transports started successfully.") except Exception: - LOGGER.exception("Unable to start outbound transports") + LOGGER.exception("Unable to start outbound transports.") raise # Start up Admin server if self.admin_server: + LOGGER.debug("Admin server present. Starting admin server.") try: await self.admin_server.start() + LOGGER.debug("Admin server started successfully.") except Exception: - LOGGER.exception("Unable to start administration API") + LOGGER.exception("Unable to start administration API.") # Make admin responder available during message parsing # This allows webhooks to be called when a connection is marked active, # for example @@ -314,9 +357,11 @@ async def start(self) -> None: self.admin_server.outbound_message_router, ) context.injector.bind_instance(BaseResponder, responder) + LOGGER.debug("Admin responder bound to injector.") # Get agent label default_label = context.settings.get("default_label") + LOGGER.debug("Agent label: %s", default_label) if context.settings.get("transport.disabled"): LoggingConfigurator.print_banner( @@ -341,6 +386,7 @@ async def start(self) -> None: from_version_storage = None from_version = None agent_version = f"v{__version__}" + LOGGER.debug("Recording ACA-Py version in wallet if needed.") async with self.root_profile.session() as session: storage: BaseStorage = session.context.inject(BaseStorage) try: @@ -355,10 +401,16 @@ async def start(self) -> None: ) except StorageNotFoundError: LOGGER.warning("Wallet version storage record not found.") + from_version_config = self.root_profile.settings.get("upgrade.from_version") force_upgrade_flag = ( self.root_profile.settings.get("upgrade.force_upgrade") or False ) + LOGGER.debug( + "Force upgrade flag: %s, From version config: %s", + force_upgrade_flag, + from_version_config, + ) if force_upgrade_flag and from_version_config: if from_version_storage: @@ -370,8 +422,13 @@ async def start(self) -> None: from_version = from_version_storage else: from_version = from_version_config + LOGGER.debug( + "Determined from_version based on force_upgrade: %s", from_version + ) else: from_version = from_version_storage or from_version_config + LOGGER.debug("Determined from_version: %s", from_version) + if not from_version: LOGGER.warning( ( @@ -382,17 +439,27 @@ async def start(self) -> None: ) from_version = DEFAULT_ACAPY_VERSION self.root_profile.settings.set_value("upgrade.from_version", from_version) + LOGGER.debug("Set upgrade.from_version to default: %s", from_version) + config_available_list = get_upgrade_version_list( config_path=self.root_profile.settings.get("upgrade.config_path"), from_version=from_version, ) + LOGGER.debug("Available upgrade versions: %s", config_available_list) + if len(config_available_list) >= 1: + LOGGER.info("Upgrade configurations available. Initiating upgrade.") await upgrade(profile=self.root_profile) elif not (from_version_storage and from_version_storage == agent_version): + LOGGER.debug("No upgrades needed. Adding version record.") await add_version_record(profile=self.root_profile, version=agent_version) # Create a static connection for use by the test-suite if context.settings.get("debug.test_suite_endpoint"): + LOGGER.debug( + "Test suite endpoint configured. " + "Creating static connection for test suite." + ) mgr = ConnectionManager(self.root_profile) their_endpoint = context.settings["debug.test_suite_endpoint"] test_conn = await mgr.create_static_connection( @@ -401,32 +468,38 @@ async def start(self) -> None: their_endpoint=their_endpoint, alias="test-suite", ) - print("Created static connection for test suite") - print(" - My DID:", test_conn.my_did) - print(" - Their DID:", test_conn.their_did) - print(" - Their endpoint:", their_endpoint) - print() + LOGGER.info( + "Created static connection for test suite\n" + f" - My DID: {test_conn.my_did}\n" + f" - Their DID: {test_conn.their_did}\n" + f" - Their endpoint: {their_endpoint}\n" + ) del mgr + LOGGER.debug("Static connection for test suite created and manager deleted.") # Clear default mediator if context.settings.get("mediation.clear"): + LOGGER.debug("Mediation clear flag set. Clearing default mediator.") mediation_mgr = MediationManager(self.root_profile) await mediation_mgr.clear_default_mediator() - print("Default mediator cleared.") + LOGGER.info("Default mediator cleared.") - # Clear default mediator # Set default mediator by id default_mediator_id = context.settings.get("mediation.default_id") if default_mediator_id: + LOGGER.debug("Setting default mediator to ID: %s", default_mediator_id) mediation_mgr = MediationManager(self.root_profile) try: await mediation_mgr.set_default_mediator_by_id(default_mediator_id) - print(f"Default mediator set to {default_mediator_id}") + LOGGER.info(f"Default mediator set to {default_mediator_id}") except Exception: - LOGGER.exception("Error updating default mediator") + LOGGER.exception("Error updating default mediator.") # Print an invitation to the terminal if context.settings.get("debug.print_invitation"): + LOGGER.debug( + "Debug flag for printing invitation is set. Creating invitation." + ) try: mgr = OutOfBandManager(self.root_profile) invi_rec = await mgr.create_invitation( @@ -440,17 +513,20 @@ async def start(self) -> None: ) base_url = context.settings.get("invite_base_url") invite_url = invi_rec.invitation.to_url(base_url) - print("Invitation URL:") - print(invite_url, flush=True) + LOGGER.info(f"Invitation URL:\n{invite_url}") qr = QRCode(border=1) qr.add_data(invite_url) qr.print_ascii(invert=True) del mgr except Exception: - LOGGER.exception("Error creating invitation") + LOGGER.exception("Error creating invitation.") # Print connections protocol invitation to the terminal if context.settings.get("debug.print_connections_invitation"): + LOGGER.debug( + "Debug flag for printing connections invitation is set. " + "Creating connections invitation." + ) try: mgr = ConnectionManager(self.root_profile) _record, invite = await mgr.create_invitation( @@ -463,17 +539,17 @@ async def start(self) -> None: ) base_url = context.settings.get("invite_base_url") invite_url = invite.to_url(base_url) - print("Invitation URL (Connections protocol):") - print(invite_url, flush=True) + LOGGER.info(f"Invitation URL (Connections protocol):\n{invite_url}") qr = QRCode(border=1) qr.add_data(invite_url) qr.print_ascii(invert=True) del mgr except Exception: - LOGGER.exception("Error creating invitation") + LOGGER.exception("Error creating connections protocol invitation.") # mediation connection establishment provided_invite: str = context.settings.get("mediation.invite") + LOGGER.debug("Mediation invite provided: %s", provided_invite) try: async with self.root_profile.session() as session: @@ -481,16 +557,24 @@ async def start(self) -> None: mediation_invite_record = await invite_store.get_mediation_invite_record( provided_invite ) + LOGGER.debug("Mediation invite record retrieved successfully.") except Exception: - LOGGER.exception("Error retrieving mediator invitation") + LOGGER.exception("Error retrieving mediator invitation.") mediation_invite_record = None # Accept mediation invitation if one was specified or stored if mediation_invite_record is not None: + LOGGER.debug( + "Mediation invite record found. " + "Attempting to accept mediation invitation." + ) try: mediation_connections_invite = context.settings.get( "mediation.connections_invite", False ) + LOGGER.debug( + "Mediation connections invite flag: %s", mediation_connections_invite + ) invitation_handler = ( ConnectionInvitation if mediation_connections_invite @@ -498,8 +582,11 @@ async def start(self) -> None: ) if not mediation_invite_record.used: - # clear previous mediator configuration before establishing a - # new one + # clear previous mediator configuration before establishing a new one + LOGGER.debug( + "Mediation invite not used. " + "Clearing default mediator before accepting new invite." + ) await MediationManager(self.root_profile).clear_default_mediator() mgr = ( @@ -507,6 +594,7 @@ async def start(self) -> None: if mediation_connections_invite else OutOfBandManager(self.root_profile) ) + LOGGER.debug("Receiving mediation invitation.") record = await mgr.receive_invitation( invitation=invitation_handler.from_url( mediation_invite_record.invite @@ -517,6 +605,7 @@ async def start(self) -> None: await MediationInviteStore( session.context.inject(BaseStorage) ).mark_default_invite_as_used() + LOGGER.debug("Marked mediation invite as used.") await record.metadata_set( session, MediationManager.SEND_REQ_AFTER_CONNECTION, True @@ -524,48 +613,65 @@ async def start(self) -> None: await record.metadata_set( session, MediationManager.SET_TO_DEFAULT_ON_GRANTED, True ) + LOGGER.debug("Set mediation metadata after connection.") - print("Attempting to connect to mediator...") + LOGGER.info("Attempting to connect to mediator...") del mgr + LOGGER.debug("Mediation manager deleted after setting up mediator.") except Exception: - LOGGER.exception("Error accepting mediation invitation") + LOGGER.exception("Error accepting mediation invitation.") try: + LOGGER.debug("Checking for wallet upgrades in progress.") await self.check_for_wallet_upgrades_in_progress() + LOGGER.debug("Wallet upgrades check completed.") except Exception: LOGGER.exception( - "An exception was caught while checking for wallet upgrades in progress" + "An exception was caught while checking for wallet upgrades in progress." ) # notify protocols of startup status + LOGGER.debug("Notifying protocols of startup status.") await self.root_profile.notify(STARTUP_EVENT_TOPIC, {}) + LOGGER.debug("Startup notification sent.") async def stop(self, timeout=1.0): """Stop the agent.""" + LOGGER.info("Stopping the Conductor agent.") # notify protocols that we are shutting down if self.root_profile: + LOGGER.debug("Notifying protocols of shutdown.") await self.root_profile.notify(SHUTDOWN_EVENT_TOPIC, {}) + LOGGER.debug("Shutdown notification sent.") shutdown = TaskQueue() if self.dispatcher: + LOGGER.debug("Initiating shutdown of dispatcher.") shutdown.run(self.dispatcher.complete()) if self.admin_server: + LOGGER.debug("Initiating shutdown of admin server.") shutdown.run(self.admin_server.stop()) if self.inbound_transport_manager: + LOGGER.debug("Initiating shutdown of inbound transport manager.") shutdown.run(self.inbound_transport_manager.stop()) if self.outbound_transport_manager: + LOGGER.debug("Initiating shutdown of outbound transport manager.") shutdown.run(self.outbound_transport_manager.stop()) if self.root_profile: # close multitenant profiles multitenant_mgr = self.context.inject_or(BaseMultitenantManager) if multitenant_mgr: + LOGGER.debug("Closing multitenant profiles.") for profile in multitenant_mgr.open_profiles: + LOGGER.debug("Closing profile: %s", profile.name) shutdown.run(profile.close()) - + LOGGER.debug("Closing root profile.") shutdown.run(self.root_profile.close()) + LOGGER.debug("Waiting for shutdown tasks to complete with timeout=%f.", timeout) await shutdown.complete(timeout) + LOGGER.info("Conductor agent stopped successfully.") def inbound_message_router( self, @@ -607,22 +713,26 @@ def inbound_message_router( def dispatch_complete(self, message: InboundMessage, completed: CompletedTask): """Handle completion of message dispatch.""" if completed.exc_info: - LOGGER.exception("Exception in message handler:", exc_info=completed.exc_info) - if isinstance(completed.exc_info[1], LedgerConfigError) or isinstance( - completed.exc_info[1], LedgerTransactionError - ): - LOGGER.error( + exc_class, exc, _ = completed.exc_info + if isinstance(exc, (LedgerConfigError, LedgerTransactionError)): + LOGGER.fatal( "%shutdown on ledger error %s", "S" if self.admin_server else "No admin server to s", - str(completed.exc_info[1]), + str(exc), + exc_info=completed.exc_info, ) if self.admin_server: self.admin_server.notify_fatal_error() + elif isinstance(exc, (ProfileError, StorageNotFoundError)): + LOGGER.warning( + "Storage error occurred in message handler: %s: %s", + exc_class.__name__, + str(exc), + exc_info=completed.exc_info, + ) else: - LOGGER.error( - "DON'T shutdown on %s %s", - completed.exc_info[0].__name__, - str(completed.exc_info[1]), + LOGGER.exception( + "Exception in message handler:", exc_info=completed.exc_info ) self.inbound_transport_manager.dispatch_complete(message, completed) diff --git a/acapy_agent/core/dispatcher.py b/acapy_agent/core/dispatcher.py index 962b69d97c..dfa476e444 100644 --- a/acapy_agent/core/dispatcher.py +++ b/acapy_agent/core/dispatcher.py @@ -176,9 +176,6 @@ async def handle_v1_message( inbound_message: The inbound message instance send_outbound: Async function to send outbound messages - # Raises: - # MessageParseError: If the message type version is not supported - Returns: The response from the handler @@ -193,7 +190,9 @@ async def handle_v1_message( except ProblemReportParseError: pass # avoid problem report recursion except MessageParseError as e: - self.logger.error(f"Message parsing failed: {str(e)}, sending problem report") + self.logger.error( + f"Message parsing failed: {str(e)}, sending problem report", exc_info=e + ) error_result = ProblemReport( description={ "en": str(e), diff --git a/acapy_agent/core/plugin_registry.py b/acapy_agent/core/plugin_registry.py index b3fa709386..55bf1d08a1 100644 --- a/acapy_agent/core/plugin_registry.py +++ b/acapy_agent/core/plugin_registry.py @@ -3,7 +3,7 @@ import logging from collections import OrderedDict from types import ModuleType -from typing import Iterable, Optional, Sequence +from typing import Optional, Sequence, Set from ..config.injection_context import InjectionContext from ..core.event_bus import EventBus @@ -18,10 +18,10 @@ class PluginRegistry: """Plugin registry for indexing application plugins.""" - def __init__(self, blocklist: Iterable[str] = []): + def __init__(self, blocklist: Optional[Set[str]] = None): """Initialize a `PluginRegistry` instance.""" - self._plugins = OrderedDict() - self._blocklist = set(blocklist) + self._plugins: OrderedDict[str, ModuleType] = OrderedDict() + self._blocklist: Set[str] = set(blocklist) if blocklist else set() @property def plugin_names(self) -> Sequence[str]: @@ -57,7 +57,6 @@ def validate_version(self, version_list, module_name): for version_dict in version_list: # Dicts must have correct format - try: if not ( isinstance(version_dict["major_version"], int) @@ -89,8 +88,8 @@ def validate_version(self, version_list, module_name): > version_dict["current_minor_version"] ): raise ProtocolDefinitionValidationError( - "Minimum supported minor version cannot" - + " be greater than current minor version" + "Minimum supported minor version cannot " + "be greater than current minor version" ) # There can only be one definition per major version @@ -102,7 +101,7 @@ def validate_version(self, version_list, module_name): if count > 1: raise ProtocolDefinitionValidationError( "There can only be one definition per major version. " - + f"Found {count} for major version {major_version}." + f"Found {count} for major version {major_version}." ) # Specified module must be loadable @@ -111,97 +110,126 @@ def validate_version(self, version_list, module_name): if not mod: raise ProtocolDefinitionValidationError( - "Version module path is not " - + f"loadable: {module_name}, {version_path}" + f"Version module path is not loadable: {module_name}, {version_path}" ) return True - def register_plugin(self, module_name: str) -> ModuleType: + def register_plugin(self, module_name: str) -> Optional[ModuleType]: """Register a plugin module.""" - if module_name in self._plugins: - mod = self._plugins[module_name] - elif module_name in self._blocklist: - LOGGER.debug(f"Blocked {module_name} from loading due to blocklist") + if self._is_already_registered(module_name): + return self._plugins.get(module_name) + + if self._is_blocked(module_name): return None - else: - try: - mod = ClassLoader.load_module(module_name) - LOGGER.debug(f"Loaded module: {module_name}") - except ModuleLoadError as e: - LOGGER.error(f"Error loading plugin module: {e}") - return None - # Module must exist - if not mod: - LOGGER.error(f"Module doesn't exist: {module_name}") - return None - - # Any plugin with a setup method is considered valid. - if hasattr(mod, "setup"): - self._plugins[module_name] = mod - return mod - - # Make an exception for non-protocol modules - # that contain admin routes and for old-style protocol - # modules without version support - routes = ClassLoader.load_module("routes", module_name) - message_types = ClassLoader.load_module("message_types", module_name) - if routes or message_types: - self._plugins[module_name] = mod - return mod - - definition = ClassLoader.load_module("definition", module_name) - - # definition.py must exist in protocol - if not definition: - LOGGER.error(f"Protocol does not include definition.py: {module_name}") - return None - - # definition.py must include versions attribute - if not hasattr(definition, "versions"): - LOGGER.error( - "Protocol definition does not include " - f"versions attribute: {module_name}" - ) - return None + mod = self._load_module(module_name) + if not mod: + LOGGER.error("Module doesn't exist: %s", module_name) + return None - # Definition list must not be malformed - try: - self.validate_version(definition.versions, module_name) - except ProtocolDefinitionValidationError as e: - LOGGER.error(f"Protocol versions definition is malformed. {e}") - return None + if self._is_valid_plugin(mod, module_name): + self._plugins[module_name] = mod + LOGGER.debug("Registered plugin: %s", module_name) + return mod - self._plugins[module_name] = mod - return mod + LOGGER.warning("Failed to register plugin: %s", module_name) + return None - # # Load each version as a separate plugin - # for version in definition.versions: - # mod = ClassLoader.load_module(f"{module_name}.{version['path']}") - # self._plugins[module_name] = mod - # return mod + def _is_already_registered(self, module_name: str) -> bool: + """Check if the plugin is already registered.""" + if module_name in self._plugins: + LOGGER.debug("Plugin %s is already registered.", module_name) + return True + return False + + def _is_blocked(self, module_name: str) -> bool: + """Check if the plugin is in the blocklist.""" + if module_name in self._blocklist: + LOGGER.debug("Blocked %s from loading due to blocklist.", module_name) + return True + return False + + def _load_module(self, module_name: str) -> Optional[ModuleType]: + """Load the plugin module using ClassLoader.""" + try: + mod = ClassLoader.load_module(module_name) + return mod + except ModuleLoadError as e: + LOGGER.error("Error loading plugin module '%s': %s", module_name, e) + return None + + def _is_valid_plugin(self, mod: ModuleType, module_name: str) -> bool: + """Validate the plugin based on various criteria.""" + # Check if the plugin has a 'setup' method + if hasattr(mod, "setup"): + return True + + # Check for 'routes' or 'message_types' modules + # This makes an exception for non-protocol modules that contain admin routes + # and for old-style protocol modules without version support + routes = ClassLoader.load_module("routes", module_name) + message_types = ClassLoader.load_module("message_types", module_name) + if routes or message_types: + return True + + # Check for 'definition' module with 'versions' attribute + definition = ClassLoader.load_module("definition", module_name) + if not definition: + LOGGER.error( + "Protocol does not include 'definition.py' for module: %s", + module_name, + ) + return False + + if not hasattr(definition, "versions"): + LOGGER.error( + "Protocol definition does not include versions attribute for module: %s", + module_name, + ) + return False + + # Validate the 'versions' attribute + try: + self.validate_version(definition.versions, module_name) + return True + except ProtocolDefinitionValidationError as e: + LOGGER.error( + "Protocol versions definition is malformed for module '%s': %s", + module_name, + e, + ) + return False def register_package(self, package_name: str) -> Sequence[ModuleType]: """Register all modules (sub-packages) under a given package name.""" + LOGGER.debug("Registering package: %s", package_name) try: module_names = ClassLoader.scan_subpackages(package_name) except ModuleLoadError: LOGGER.error("Plugin module package not found: %s", package_name) module_names = [] - return list( - filter( - None, - ( - self.register_plugin(module_name) - for module_name in module_names - if module_name.split(".")[-1] != "tests" - ), - ) - ) - async def init_context(self, context: InjectionContext): + registered_plugins = [] + for module_name in module_names: + # Skip any module whose last segment is 'tests' + if module_name.split(".")[-1] == "tests": + continue + + plugin = self.register_plugin(module_name) + if plugin: + registered_plugins.append(plugin) + else: + LOGGER.warning( + "Failed to register %s under %s", module_name, package_name + ) + + return registered_plugins + + async def init_context(self, context: InjectionContext) -> None: """Call plugin setup methods on the current context.""" + LOGGER.debug("Initializing plugin context for %d plugins", len(self._plugins)) + for plugin in self._plugins.values(): if hasattr(plugin, "setup"): await plugin.setup(context) @@ -216,25 +244,29 @@ async def load_protocol_version( context: InjectionContext, mod: ModuleType, version_definition: Optional[dict] = None, - ): + ) -> None: """Load a particular protocol version.""" protocol_registry = context.inject(ProtocolRegistry) goal_code_registry = context.inject(GoalCodeRegistry) + if hasattr(mod, "MESSAGE_TYPES"): protocol_registry.register_message_types( mod.MESSAGE_TYPES, version_definition=version_definition ) + if hasattr(mod, "CONTROLLERS"): protocol_registry.register_controllers(mod.CONTROLLERS) goal_code_registry.register_controllers(mod.CONTROLLERS) - async def load_protocols(self, context: InjectionContext, plugin: ModuleType): + async def load_protocols(self, context: InjectionContext, plugin: ModuleType) -> None: """For modules that don't implement setup, register protocols manually.""" + plugin_name = plugin.__name__ # If this module contains message_types, then assume that # this is a valid module of the old style (not versioned) try: - mod = ClassLoader.load_module(plugin.__name__ + ".message_types") + message_types_path = f"{plugin_name}.message_types" + mod = ClassLoader.load_module(message_types_path) except ModuleLoadError as e: LOGGER.error("Error loading plugin module message types: %s", e) return @@ -242,106 +274,134 @@ async def load_protocols(self, context: InjectionContext, plugin: ModuleType): if mod: await self.load_protocol_version(context, mod) else: - # Otherwise, try check for definition.py for versioned - # protocol packages + # Otherwise, try check for definition.py for versioned protocol packages try: - definition = ClassLoader.load_module(plugin.__name__ + ".definition") + definition_path = f"{plugin_name}.definition" + definition = ClassLoader.load_module(definition_path) except ModuleLoadError as e: LOGGER.error("Error loading plugin definition module: %s", e) return if definition: for protocol_version in definition.versions: + version_path = ( + f"{plugin_name}.{protocol_version['path']}.message_types" + ) try: - mod = ClassLoader.load_module( - f"{plugin.__name__}.{protocol_version['path']}" - + ".message_types" - ) - await self.load_protocol_version(context, mod, protocol_version) - + mod = ClassLoader.load_module(version_path) except ModuleLoadError as e: - LOGGER.error("Error loading plugin module message types: %s", e) + LOGGER.error( + "Error loading plugin module message types from %s: %s", + version_path, + e, + ) return - async def register_admin_routes(self, app): + if mod: + await self.load_protocol_version(context, mod, protocol_version) + else: + LOGGER.debug("Failed to load %s", version_path) + + async def register_admin_routes(self, app) -> None: """Call route registration methods on the current context.""" + LOGGER.debug("Registering admin routes for %d plugins", len(self._plugins)) + for plugin in self._plugins.values(): - definition = ClassLoader.load_module("definition", plugin.__name__) + plugin_name = plugin.__name__ + mod = None + definition = ClassLoader.load_module("definition", plugin_name) if definition: # Load plugin routes that are in a versioned package. for plugin_version in definition.versions: + version_path = f"{plugin_name}.{plugin_version['path']}.routes" try: - mod = ClassLoader.load_module( - f"{plugin.__name__}.{plugin_version['path']}.routes" - ) + mod = ClassLoader.load_module(version_path) except ModuleLoadError as e: - LOGGER.error("Error loading admin routes: %s", e) + LOGGER.error( + "Error loading admin routes from %s: %s", version_path, e + ) continue + if mod and hasattr(mod, "register"): await mod.register(app) else: # Load plugin routes that aren't in a versioned package. + routes_path = f"{plugin_name}.routes" try: - mod = ClassLoader.load_module(f"{plugin.__name__}.routes") + mod = ClassLoader.load_module(routes_path) except ModuleLoadError as e: - LOGGER.error("Error loading admin routes: %s", e) + LOGGER.error("Error loading admin routes from %s: %s", routes_path, e) continue + if mod and hasattr(mod, "register"): await mod.register(app) - def register_protocol_events(self, context: InjectionContext): + def register_protocol_events(self, context: InjectionContext) -> None: """Call route register_events methods on the current context.""" + LOGGER.debug("Registering protocol events for %d plugins", len(self._plugins)) + event_bus = context.inject_or(EventBus) if not event_bus: LOGGER.error("No event bus in context") return + for plugin in self._plugins.values(): - definition = ClassLoader.load_module("definition", plugin.__name__) + plugin_name = plugin.__name__ + mod = None + definition = ClassLoader.load_module("definition", plugin_name) if definition: # Load plugin routes that are in a versioned package. for plugin_version in definition.versions: + version_path = f"{plugin_name}.{plugin_version['path']}.routes" try: - mod = ClassLoader.load_module( - f"{plugin.__name__}.{plugin_version['path']}.routes" - ) + mod = ClassLoader.load_module(version_path) except ModuleLoadError as e: - LOGGER.error("Error loading admin routes: %s", e) + LOGGER.error("Error loading events from %s: %s", version_path, e) continue + if mod and hasattr(mod, "register_events"): mod.register_events(event_bus) else: # Load plugin routes that aren't in a versioned package. + routes_path = f"{plugin_name}.routes" try: - mod = ClassLoader.load_module(f"{plugin.__name__}.routes") + mod = ClassLoader.load_module(routes_path) except ModuleLoadError as e: - LOGGER.error("Error loading admin routes: %s", e) + LOGGER.error("Error loading events from %s: %s", routes_path, e) continue + if mod and hasattr(mod, "register_events"): mod.register_events(event_bus) - def post_process_routes(self, app): + def post_process_routes(self, app) -> None: """Call route binary file response OpenAPI fixups if applicable.""" + LOGGER.debug("Post-processing routes for %d plugins", len(self._plugins)) + for plugin in self._plugins.values(): - definition = ClassLoader.load_module("definition", plugin.__name__) + plugin_name = plugin.__name__ + mod = None + definition = ClassLoader.load_module("definition", plugin_name) if definition: # Set binary file responses for routes that are in a versioned package. for plugin_version in definition.versions: + version_path = f"{plugin_name}.{plugin_version['path']}.routes" try: - mod = ClassLoader.load_module( - f"{plugin.__name__}.{plugin_version['path']}.routes" - ) + mod = ClassLoader.load_module(version_path) except ModuleLoadError as e: - LOGGER.error("Error loading admin routes: %s", e) + LOGGER.error("Error loading routes from %s: %s", version_path, e) continue + if mod and hasattr(mod, "post_process_routes"): mod.post_process_routes(app) else: # Set binary file responses for routes not in a versioned package. + routes_path = f"{plugin_name}.routes" try: - mod = ClassLoader.load_module(f"{plugin.__name__}.routes") + mod = ClassLoader.load_module(routes_path) except ModuleLoadError as e: - LOGGER.error("Error loading admin routes: %s", e) + LOGGER.error("Error loading routes from %s: %s", routes_path, e) continue + if mod and hasattr(mod, "post_process_routes"): mod.post_process_routes(app) diff --git a/acapy_agent/core/tests/test_conductor.py b/acapy_agent/core/tests/test_conductor.py index 6b66d1fa73..6d8530a4e5 100644 --- a/acapy_agent/core/tests/test_conductor.py +++ b/acapy_agent/core/tests/test_conductor.py @@ -1,5 +1,5 @@ -from io import StringIO from unittest import IsolatedAsyncioTestCase +from unittest.mock import call import pytest @@ -1314,6 +1314,18 @@ async def test_print_invite_connection(self): test_profile = await create_test_profile(None, await builder.build_context()) + # Define expected invitation URLs + expected_oob_url = "http://localhost?oob=test_oob_invite" + expected_ci_url = "http://localhost?c_i=test_ci_invite" + + # Mock the InvitationRecord returned by create_invitation for OOB + mock_oob_invitation = mock.MagicMock() + mock_oob_invitation.invitation.to_url.return_value = expected_oob_url + + # Mock the InvitationRecord returned by create_invitation for Connections Protocol + mock_ci_invitation = mock.MagicMock() + mock_ci_invitation.to_url.return_value = expected_ci_url + with ( mock.patch.object( test_module, @@ -1323,21 +1335,37 @@ async def test_print_invite_connection(self): DIDInfo("did", "verkey", metadata={}, method=SOV, key_type=ED25519), ), ), - mock.patch("sys.stdout", new=StringIO()) as captured, mock.patch.object( test_module, "OutboundTransportManager", autospec=True ) as mock_outbound_mgr, + mock.patch.object(test_module, "OutOfBandManager") as oob_mgr, + mock.patch.object(test_module, "ConnectionManager") as conn_mgr, + mock.patch.object(test_module.LOGGER, "info") as mock_logger_info, ): mock_outbound_mgr.return_value.registered_transports = { "test": mock.MagicMock(schemes=["http"]) } + + # Configure create_invitation to return the mocked invitations + oob_mgr.return_value.create_invitation = mock.CoroutineMock( + return_value=mock_oob_invitation + ) + conn_mgr.return_value.create_invitation = mock.CoroutineMock( + return_value=(None, mock_ci_invitation) + ) + + # Execute the conductor lifecycle await conductor.setup() await conductor.start() await conductor.stop() - value = captured.getvalue() - assert "http://localhost?oob=" in value - assert "http://localhost?c_i=" in value + + # Assert that LOGGER.info was called twice with the expected URLs + expected_calls = [ + call(f"Invitation URL:\n{expected_oob_url}"), + call(f"Invitation URL (Connections protocol):\n{expected_ci_url}"), + ] + mock_logger_info.assert_has_calls(expected_calls, any_order=True) async def test_clear_default_mediator(self): builder: ContextBuilder = StubContextBuilder(self.test_settings) diff --git a/acapy_agent/core/tests/test_plugin_registry.py b/acapy_agent/core/tests/test_plugin_registry.py index b5727a4546..4e870fbdfb 100644 --- a/acapy_agent/core/tests/test_plugin_registry.py +++ b/acapy_agent/core/tests/test_plugin_registry.py @@ -537,6 +537,7 @@ async def test_load_protocols_load_mod(self): mock_mod = mock.MagicMock() mock_mod.MESSAGE_TYPES = mock.MagicMock() mock_mod.CONTROLLERS = mock.MagicMock() + mock_mod.__name__ = "test_mod" with mock.patch.object( ClassLoader, "load_module", mock.MagicMock() @@ -595,6 +596,7 @@ async def test_load_protocols_no_mod_def_message_types(self): mock_mod = mock.MagicMock() mock_mod.MESSAGE_TYPES = mock.MagicMock() mock_mod.CONTROLLERS = mock.MagicMock() + mock_mod.__name__ = "test_mod" with mock.patch.object( ClassLoader, "load_module", mock.MagicMock() diff --git a/acapy_agent/protocols/issue_credential/v2_0/handlers/cred_problem_report_handler.py b/acapy_agent/protocols/issue_credential/v2_0/handlers/cred_problem_report_handler.py index b8f56f119d..f513496c79 100644 --- a/acapy_agent/protocols/issue_credential/v2_0/handlers/cred_problem_report_handler.py +++ b/acapy_agent/protocols/issue_credential/v2_0/handlers/cred_problem_report_handler.py @@ -40,7 +40,11 @@ async def handle(self, context: RequestContext, responder: BaseResponder): context.message, context.connection_record.connection_id, ) - except (StorageError, StorageNotFoundError): + except StorageNotFoundError: + self._logger.warning( + "Record not found while processing issue-credential v2.0 problem report" + ) + except StorageError: self._logger.exception( - "Error processing issue-credential v2.0 problem report message" + "Storage error while processing issue-credential v2.0 problem report" ) diff --git a/acapy_agent/protocols/issue_credential/v2_0/routes.py b/acapy_agent/protocols/issue_credential/v2_0/routes.py index 9128f7aeac..3993eadfca 100644 --- a/acapy_agent/protocols/issue_credential/v2_0/routes.py +++ b/acapy_agent/protocols/issue_credential/v2_0/routes.py @@ -846,7 +846,12 @@ async def credential_exchange_send(request: web.BaseRequest): V20CredManagerError, V20CredFormatError, ) as err: - LOGGER.exception("Error preparing credential offer") + # Only log full exception for unexpected errors + if isinstance(err, (V20CredFormatError, V20CredManagerError)): + LOGGER.warning(f"Error preparing credential offer: {err.roll_up}") + else: + LOGGER.exception("Error preparing credential offer") + if cred_ex_record: async with profile.session() as session: await cred_ex_record.save_error_state(session, reason=err.roll_up) diff --git a/acapy_agent/protocols/present_proof/v2_0/handlers/pres_problem_report_handler.py b/acapy_agent/protocols/present_proof/v2_0/handlers/pres_problem_report_handler.py index be4aab8620..e10df27e2e 100644 --- a/acapy_agent/protocols/present_proof/v2_0/handlers/pres_problem_report_handler.py +++ b/acapy_agent/protocols/present_proof/v2_0/handlers/pres_problem_report_handler.py @@ -34,7 +34,11 @@ async def handle(self, context: RequestContext, responder: BaseResponder): else None ), ) - except (StorageError, StorageNotFoundError): + except StorageNotFoundError: + self._logger.warning( + "Record not found while processing present-proof v2.0 problem report" + ) + except StorageError: self._logger.exception( - "Error processing present-proof v2.0 problem report message" + "Storage error while processing present-proof v2.0 problem report" ) diff --git a/acapy_agent/transport/v2_pack_format.py b/acapy_agent/transport/v2_pack_format.py index 266c0b8b08..92e26da698 100644 --- a/acapy_agent/transport/v2_pack_format.py +++ b/acapy_agent/transport/v2_pack_format.py @@ -63,8 +63,7 @@ async def parse_message( try: message_unpack = await messaging.unpack(message_json) except CryptoServiceError: - LOGGER.debug("Message unpack failed, falling back to JSON") - print("HIT CRTYPTO SER ERR EXCEPT BLOC") + LOGGER.info("Message unpack failed, falling back to JSON") else: # Set message_dict to be the dictionary that we unpacked message_dict = message_unpack.message diff --git a/acapy_agent/utils/classloader.py b/acapy_agent/utils/classloader.py index 09d4799aea..0889f50dc8 100644 --- a/acapy_agent/utils/classloader.py +++ b/acapy_agent/utils/classloader.py @@ -1,6 +1,7 @@ """The classloader provides utilities to dynamically load classes and modules.""" import inspect +import logging import sys from importlib import import_module, resources from importlib.util import find_spec, resolve_name @@ -9,6 +10,8 @@ from ..core.error import BaseError +LOGGER = logging.getLogger(__name__) + class ModuleLoadError(BaseError): """Module load error.""" @@ -22,7 +25,9 @@ class ClassLoader: """Class used to load classes from modules dynamically.""" @classmethod - def load_module(cls, mod_path: str, package: Optional[str] = None) -> ModuleType: + def load_module( + cls, mod_path: str, package: Optional[str] = None + ) -> Optional[ModuleType]: """Load a module by its absolute path. Args: @@ -36,6 +41,7 @@ def load_module(cls, mod_path: str, package: Optional[str] = None) -> ModuleType ModuleLoadError: If there was an error loading the module """ + if package: # preload parent package if not cls.load_module(package): @@ -45,6 +51,7 @@ def load_module(cls, mod_path: str, package: Optional[str] = None) -> ModuleType mod_path = f".{mod_path}" full_path = resolve_name(mod_path, package) + if full_path in sys.modules: return sys.modules[full_path] @@ -58,7 +65,6 @@ def load_module(cls, mod_path: str, package: Optional[str] = None) -> ModuleType mod_path = f".{mod_name}" # Load the module spec first - # this means that a later ModuleNotFoundError indicates a code issue spec = find_spec(mod_path, package) if not spec: return None @@ -66,6 +72,7 @@ def load_module(cls, mod_path: str, package: Optional[str] = None) -> ModuleType try: return import_module(mod_path, package) except ModuleNotFoundError as e: + LOGGER.warning("Module %s not found during import", full_path) raise ModuleLoadError(f"Unable to import module {full_path}: {str(e)}") from e @classmethod @@ -96,24 +103,36 @@ def load_class( mod_path, class_name = class_name.rsplit(".", 1) elif default_module: mod_path = default_module + LOGGER.debug("No module in class name, using default_module: %s", mod_path) else: + LOGGER.warning( + "Cannot resolve class name %s with no default module", class_name + ) raise ClassNotFoundError( f"Cannot resolve class name with no default module: {class_name}" ) mod = cls.load_module(mod_path, package) if not mod: - raise ClassNotFoundError(f"Module '{mod_path}' not found") + LOGGER.warning( + "Module %s not found when loading class %s", mod_path, class_name + ) + raise ClassNotFoundError(f"Module {mod_path} not found") resolved = getattr(mod, class_name, None) if not resolved: + LOGGER.warning("Class %s not found in module %s", class_name, mod_path) raise ClassNotFoundError( f"Class '{class_name}' not defined in module: {mod_path}" ) if not isinstance(resolved, type): + LOGGER.warning( + "Resolved attribute %s in module %s is not a class", class_name, mod_path + ) raise ClassNotFoundError( f"Resolved value is not a class: {mod_path}.{class_name}" ) + LOGGER.debug("Successfully loaded class %s from module %s", class_name, mod_path) return resolved @classmethod @@ -138,9 +157,14 @@ def load_subclass_of( mod = cls.load_module(mod_path, package) if not mod: + LOGGER.warning( + "Module %s not found when loading subclass of %s", + mod_path, + base_class.__name__, + ) raise ClassNotFoundError(f"Module '{mod_path}' not found") - # Find an the first declared class that inherits from + # Find the first declared class that inherits from the base_class try: imported_class = next( obj @@ -148,6 +172,11 @@ def load_subclass_of( if issubclass(obj, base_class) and obj is not base_class ) except StopIteration: + LOGGER.debug( + "No subclass of %s found in module %s", + base_class.__name__, + mod_path, + ) raise ClassNotFoundError( f"Could not resolve a class that inherits from {base_class}" ) from None @@ -156,17 +185,22 @@ def load_subclass_of( @classmethod def scan_subpackages(cls, package: str) -> Sequence[str]: """Return a list of sub-packages defined under a named package.""" + LOGGER.debug("Scanning subpackages under package %s", package) if "." in package: package, sub_pkg = package.split(".", 1) + LOGGER.debug("Extracted main package: %s, sub-package: %s", package, sub_pkg) else: sub_pkg = "." + LOGGER.debug("No sub-package provided, defaulting to %s", sub_pkg) try: package_path = resources.files(package) except FileNotFoundError: + LOGGER.warning("Package %s not found during subpackage scan", package) raise ModuleLoadError(f"Undefined package {package}") if not (package_path / sub_pkg).is_dir(): + LOGGER.warning("Sub-package %s is not a directory under %s", sub_pkg, package) raise ModuleLoadError(f"Undefined package {package}") found = [] @@ -174,7 +208,9 @@ def scan_subpackages(cls, package: str) -> Sequence[str]: sub_path = package_path / sub_pkg for item in sub_path.iterdir(): if (item / "__init__.py").exists(): - found.append(f"{package}.{joiner}{item.name}") + subpackage = f"{package}.{joiner}{item.name}" + found.append(subpackage) + LOGGER.debug("%d sub-packages found under %s: %s", len(found), package, found) return found diff --git a/acapy_agent/vc/vc_di/prove.py b/acapy_agent/vc/vc_di/prove.py index 33c94fa544..acba4359fe 100644 --- a/acapy_agent/vc/vc_di/prove.py +++ b/acapy_agent/vc/vc_di/prove.py @@ -1,6 +1,7 @@ """Verifiable Credential and Presentation proving methods.""" import asyncio +import logging import re from hashlib import sha256 from typing import Any, Optional, Tuple @@ -20,6 +21,8 @@ from ...core.profile import Profile from ..ld_proofs import LinkedDataProofException, ProofPurpose +LOGGER = logging.getLogger(__name__) + async def create_signed_anoncreds_presentation( *, @@ -311,7 +314,7 @@ async def prepare_data_for_presentation( # issuer_id = field["filter"]["const"] pass else: - print("... skipping:", path) + LOGGER.info("... skipping: %s", path) return anoncreds_proofrequest, w3c_creds_metadata diff --git a/acapy_agent/wallet/crypto.py b/acapy_agent/wallet/crypto.py index 0ceef63a91..37ddeb7a13 100644 --- a/acapy_agent/wallet/crypto.py +++ b/acapy_agent/wallet/crypto.py @@ -1,5 +1,6 @@ """Cryptography functions used by BasicWallet.""" +import logging import re from collections import OrderedDict from typing import Callable, List, Optional, Sequence, Tuple, Union @@ -20,6 +21,8 @@ from .key_type import BLS12381G2, ED25519, KeyType from .util import b58_to_bytes, b64_to_bytes, bytes_to_b58, random_seed +LOGGER = logging.getLogger(__name__) + def create_keypair( key_type: KeyType, seed: Optional[bytes] = None @@ -423,7 +426,7 @@ def decode_pack_message_outer(enc_message: bytes) -> Tuple[dict, dict, bool]: try: wrapper = JweEnvelope.from_json(enc_message) except ValidationError as err: - print(err) + LOGGER.error(err) raise ValueError("Invalid packed message") alg = wrapper.protected.get("alg")