Skip to content

Commit

Permalink
Fix outbound queue with webhook-url
Browse files Browse the repository at this point in the history
Signed-off-by: jamshale <jamiehalebc@gmail.com>
  • Loading branch information
jamshale committed Feb 2, 2024
1 parent 2224d96 commit d69c50f
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 41 deletions.
2 changes: 2 additions & 0 deletions redis_events/docker/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ auto-accept-invites: true
auto-respond-messages: true

log-level: info

webhook-url: http://dummy-server:8080
10 changes: 10 additions & 0 deletions redis_events/docker/plugins-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ redis_queue:
acapy::revocation-notification::received: acapy-revocation-notification-received
acapy::revocation-notification-v2::received: acapy-revocation-notification-v2-received
acapy::forward::received: acapy-forward-received
event_webhook_topic_maps:
acapy::basicmessage::received: basicmessages
acapy::problem_report: problem_report
acapy::ping::received: ping
acapy::ping::response_received: ping
acapy::actionmenu::received: actionmenu
acapy::actionmenu::get-active-menu: get-active-menu
acapy::actionmenu::perform-menu-action: perform-menu-action
acapy::keylist::updated: keylist
acapy::record::connections::active: acapy-record-connections-active
deliver_webhook: true


Expand Down
10 changes: 10 additions & 0 deletions redis_events/integration/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ services:
- "alice:host-gateway"
- "faber:host-gateway"

dummy-server:
image: python:3.9-slim
ports:
- 8080:8080
command: python -m http.server 8080
networks:
- acapy_default

faber:
image: redis-events-integration
build:
Expand Down Expand Up @@ -221,12 +229,14 @@ services:
- redis-cluster
- faber
- alice
- dummy-server
networks:
- acapy_default
extra_hosts:
- "faber:host-gateway"
- "relay:host-gateway"
- "alice:host-gateway"
- "dummy-server:host-gateway"

networks:
acapy_default:
Expand Down
10 changes: 0 additions & 10 deletions redis_events/integration/tests/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,6 @@ async def test_base_redis_keys_are_set(redis):
async def test_outbound_queue_removes_messages_from_queue_and_deliver_sends_them(faber: Agent, established_connection: str, redis):
faber.send_message(established_connection, "Hello Alice")
faber.send_message(established_connection, "Another Alice")
msg_received = False
retry_pop_count = 0
while not msg_received:
msg = await redis.blpop("acapy_outbound", 2)
if not msg:
if retry_pop_count > 3:
raise Exception("blpop call failed to retrieve message")
retry_pop_count = retry_pop_count + 1
time.sleep(1)
msg_received = True
messages = faber.retrieve_basicmessages()['results']
assert "Hello Alice" in (msg['content'] for msg in messages)
assert "Another Alice" in (msg['content'] for msg in messages)
Expand Down
55 changes: 24 additions & 31 deletions redis_events/redis_events/v1_0/redis_queue/events/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
"""ACA-Py Event to Redis."""

import base64
import json
import logging
import base64
import re
from string import Template
from typing import Any, Optional, cast

from aries_cloudagent.config.injection_context import InjectionContext
from aries_cloudagent.core.event_bus import Event, EventBus, EventWithMetadata
from aries_cloudagent.core.profile import Profile
from aries_cloudagent.core.util import SHUTDOWN_EVENT_PATTERN, STARTUP_EVENT_PATTERN
from aries_cloudagent.config.injection_context import InjectionContext
from aries_cloudagent.transport.error import TransportError
from redis.asyncio import RedisCluster
from redis.exceptions import RedisError, RedisClusterException
from redis.exceptions import RedisClusterException, RedisError

from ..config import OutboundConfig, get_config, EventConfig
from ..config import EventConfig, OutboundConfig, get_config

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -99,8 +99,7 @@ async def handle_event(profile: Profile, event: EventWithMetadata):
try:
event_payload = process_event_payload(event.payload.payload)
except TypeError:
event_payload = process_event_payload(
event.payload.enc_payload)
event_payload = process_event_payload(event.payload.enc_payload)
payload = {
"wallet_id": wallet_id or "base",
"state": event_payload.get("state"),
Expand All @@ -110,8 +109,7 @@ async def handle_event(profile: Profile, event: EventWithMetadata):
}
webhook_urls = profile.settings.get("admin.webhook_urls")
try:
config_events = get_config(
profile.settings).event or EventConfig.default()
config_events = get_config(profile.settings).event or EventConfig.default()
template = config_events.event_topic_maps[event.metadata.pattern.pattern]
redis_topic = Template(template).substitute(**payload)
LOGGER.info(f"Sending message {payload} with topic {redis_topic}")
Expand All @@ -130,35 +128,30 @@ async def handle_event(profile: Profile, event: EventWithMetadata):
# Deliver/dispatch events to webhook_urls directly
if config_events.deliver_webhook and webhook_urls:
config_outbound = (
get_config(
profile.settings).outbound or OutboundConfig.default()
get_config(profile.settings).outbound or OutboundConfig.default()
)
for endpoint in webhook_urls:
api_key = None
if len(endpoint.split("#")) > 1:
endpoint_hash_split = endpoint.split("#")
endpoint = endpoint_hash_split[0]
api_key = endpoint_hash_split[1]
webhook_topic = config_events.event_webhook_topic_maps.get(
event.topic)
endpoint = f"{endpoint}/topic/{webhook_topic}/"
headers = {"x-wallet-id": wallet_id} if wallet_id else {}
if not api_key:
headers["x-api-key"] = api_key
outbound = str.encode(
json.dumps(
{
"service": {"url": endpoint},
"payload": base64.urlsafe_b64encode(
str.encode(json.dumps(payload))
).decode(),
"headers": headers,
}
),
)
await redis.rpush(
config_outbound.acapy_outbound_topic,
outbound,
)
webhook_topic = config_events.event_webhook_topic_maps.get(event.topic)
if webhook_topic:
endpoint = f"{endpoint}/topic/{webhook_topic}/"
headers = {"x-wallet-id": wallet_id} if wallet_id else {}
if api_key is not None:
headers["x-api-key"] = api_key
outbound_msg = {
"service": {"url": endpoint},
"payload": base64.urlsafe_b64encode(
str.encode(json.dumps(payload))
).decode(),
"headers": headers,
}
await redis.rpush(
config_outbound.acapy_outbound_topic,
str.encode(json.dumps(outbound_msg)),
)
except (RedisError, RedisClusterException, ValueError) as err:
LOGGER.exception(f"Failed to process and send webhook, {err}")

0 comments on commit d69c50f

Please sign in to comment.