From d69c50fc5aa92f40d5a4b3dc1c55be297cf652e8 Mon Sep 17 00:00:00 2001 From: jamshale Date: Fri, 2 Feb 2024 20:19:46 +0000 Subject: [PATCH] Fix outbound queue with webhook-url Signed-off-by: jamshale --- redis_events/docker/integration.yml | 2 + redis_events/docker/plugins-config.yml | 10 ++++ redis_events/integration/docker-compose.yml | 10 ++++ redis_events/integration/tests/test_events.py | 10 ---- .../v1_0/redis_queue/events/__init__.py | 55 ++++++++----------- 5 files changed, 46 insertions(+), 41 deletions(-) diff --git a/redis_events/docker/integration.yml b/redis_events/docker/integration.yml index 5156f9969..6e0a84fbb 100644 --- a/redis_events/docker/integration.yml +++ b/redis_events/docker/integration.yml @@ -26,3 +26,5 @@ auto-accept-invites: true auto-respond-messages: true log-level: info + +webhook-url: http://dummy-server:8080 \ No newline at end of file diff --git a/redis_events/docker/plugins-config.yml b/redis_events/docker/plugins-config.yml index 55bafea7d..b0def31e8 100644 --- a/redis_events/docker/plugins-config.yml +++ b/redis_events/docker/plugins-config.yml @@ -29,6 +29,16 @@ redis_queue: acapy::revocation-notification::received: acapy-revocation-notification-received acapy::revocation-notification-v2::received: acapy-revocation-notification-v2-received acapy::forward::received: acapy-forward-received + event_webhook_topic_maps: + acapy::basicmessage::received: basicmessages + acapy::problem_report: problem_report + acapy::ping::received: ping + acapy::ping::response_received: ping + acapy::actionmenu::received: actionmenu + acapy::actionmenu::get-active-menu: get-active-menu + acapy::actionmenu::perform-menu-action: perform-menu-action + acapy::keylist::updated: keylist + acapy::record::connections::active: acapy-record-connections-active deliver_webhook: true diff --git a/redis_events/integration/docker-compose.yml b/redis_events/integration/docker-compose.yml index 54b3de8b8..5335f21f9 100644 --- a/redis_events/integration/docker-compose.yml +++ b/redis_events/integration/docker-compose.yml @@ -164,6 +164,14 @@ services: - "alice:host-gateway" - "faber:host-gateway" + dummy-server: + image: python:3.9-slim + ports: + - 8080:8080 + command: python -m http.server 8080 + networks: + - acapy_default + faber: image: redis-events-integration build: @@ -221,12 +229,14 @@ services: - redis-cluster - faber - alice + - dummy-server networks: - acapy_default extra_hosts: - "faber:host-gateway" - "relay:host-gateway" - "alice:host-gateway" + - "dummy-server:host-gateway" networks: acapy_default: diff --git a/redis_events/integration/tests/test_events.py b/redis_events/integration/tests/test_events.py index 739f572f9..f136d557b 100644 --- a/redis_events/integration/tests/test_events.py +++ b/redis_events/integration/tests/test_events.py @@ -65,16 +65,6 @@ async def test_base_redis_keys_are_set(redis): async def test_outbound_queue_removes_messages_from_queue_and_deliver_sends_them(faber: Agent, established_connection: str, redis): faber.send_message(established_connection, "Hello Alice") faber.send_message(established_connection, "Another Alice") - msg_received = False - retry_pop_count = 0 - while not msg_received: - msg = await redis.blpop("acapy_outbound", 2) - if not msg: - if retry_pop_count > 3: - raise Exception("blpop call failed to retrieve message") - retry_pop_count = retry_pop_count + 1 - time.sleep(1) - msg_received = True messages = faber.retrieve_basicmessages()['results'] assert "Hello Alice" in (msg['content'] for msg in messages) assert "Another Alice" in (msg['content'] for msg in messages) diff --git a/redis_events/redis_events/v1_0/redis_queue/events/__init__.py b/redis_events/redis_events/v1_0/redis_queue/events/__init__.py index 90f857404..622603981 100755 --- a/redis_events/redis_events/v1_0/redis_queue/events/__init__.py +++ b/redis_events/redis_events/v1_0/redis_queue/events/__init__.py @@ -1,21 +1,21 @@ """ACA-Py Event to Redis.""" +import base64 import json import logging -import base64 import re from string import Template from typing import Any, Optional, cast +from aries_cloudagent.config.injection_context import InjectionContext from aries_cloudagent.core.event_bus import Event, EventBus, EventWithMetadata from aries_cloudagent.core.profile import Profile from aries_cloudagent.core.util import SHUTDOWN_EVENT_PATTERN, STARTUP_EVENT_PATTERN -from aries_cloudagent.config.injection_context import InjectionContext from aries_cloudagent.transport.error import TransportError from redis.asyncio import RedisCluster -from redis.exceptions import RedisError, RedisClusterException +from redis.exceptions import RedisClusterException, RedisError -from ..config import OutboundConfig, get_config, EventConfig +from ..config import EventConfig, OutboundConfig, get_config LOGGER = logging.getLogger(__name__) @@ -99,8 +99,7 @@ async def handle_event(profile: Profile, event: EventWithMetadata): try: event_payload = process_event_payload(event.payload.payload) except TypeError: - event_payload = process_event_payload( - event.payload.enc_payload) + event_payload = process_event_payload(event.payload.enc_payload) payload = { "wallet_id": wallet_id or "base", "state": event_payload.get("state"), @@ -110,8 +109,7 @@ async def handle_event(profile: Profile, event: EventWithMetadata): } webhook_urls = profile.settings.get("admin.webhook_urls") try: - config_events = get_config( - profile.settings).event or EventConfig.default() + config_events = get_config(profile.settings).event or EventConfig.default() template = config_events.event_topic_maps[event.metadata.pattern.pattern] redis_topic = Template(template).substitute(**payload) LOGGER.info(f"Sending message {payload} with topic {redis_topic}") @@ -130,8 +128,7 @@ async def handle_event(profile: Profile, event: EventWithMetadata): # Deliver/dispatch events to webhook_urls directly if config_events.deliver_webhook and webhook_urls: config_outbound = ( - get_config( - profile.settings).outbound or OutboundConfig.default() + get_config(profile.settings).outbound or OutboundConfig.default() ) for endpoint in webhook_urls: api_key = None @@ -139,26 +136,22 @@ async def handle_event(profile: Profile, event: EventWithMetadata): endpoint_hash_split = endpoint.split("#") endpoint = endpoint_hash_split[0] api_key = endpoint_hash_split[1] - webhook_topic = config_events.event_webhook_topic_maps.get( - event.topic) - endpoint = f"{endpoint}/topic/{webhook_topic}/" - headers = {"x-wallet-id": wallet_id} if wallet_id else {} - if not api_key: - headers["x-api-key"] = api_key - outbound = str.encode( - json.dumps( - { - "service": {"url": endpoint}, - "payload": base64.urlsafe_b64encode( - str.encode(json.dumps(payload)) - ).decode(), - "headers": headers, - } - ), - ) - await redis.rpush( - config_outbound.acapy_outbound_topic, - outbound, - ) + webhook_topic = config_events.event_webhook_topic_maps.get(event.topic) + if webhook_topic: + endpoint = f"{endpoint}/topic/{webhook_topic}/" + headers = {"x-wallet-id": wallet_id} if wallet_id else {} + if api_key is not None: + headers["x-api-key"] = api_key + outbound_msg = { + "service": {"url": endpoint}, + "payload": base64.urlsafe_b64encode( + str.encode(json.dumps(payload)) + ).decode(), + "headers": headers, + } + await redis.rpush( + config_outbound.acapy_outbound_topic, + str.encode(json.dumps(outbound_msg)), + ) except (RedisError, RedisClusterException, ValueError) as err: LOGGER.exception(f"Failed to process and send webhook, {err}")