Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix outbound queue with webhook-url #86

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions redis_events/docker/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ auto-accept-invites: true
auto-respond-messages: true

log-level: info

webhook-url: http://dummy-server:8080
10 changes: 10 additions & 0 deletions redis_events/docker/plugins-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ redis_queue:
acapy::revocation-notification::received: acapy-revocation-notification-received
acapy::revocation-notification-v2::received: acapy-revocation-notification-v2-received
acapy::forward::received: acapy-forward-received
event_webhook_topic_maps:
acapy::basicmessage::received: basicmessages
acapy::problem_report: problem_report
acapy::ping::received: ping
acapy::ping::response_received: ping
acapy::actionmenu::received: actionmenu
acapy::actionmenu::get-active-menu: get-active-menu
acapy::actionmenu::perform-menu-action: perform-menu-action
acapy::keylist::updated: keylist
acapy::record::connections::active: acapy-record-connections-active
deliver_webhook: true


Expand Down
10 changes: 10 additions & 0 deletions redis_events/integration/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ services:
- "alice:host-gateway"
- "faber:host-gateway"

dummy-server:
image: python:3.9-slim
ports:
- 8080:8080
command: python -m http.server 8080
networks:
- acapy_default

faber:
image: redis-events-integration
build:
Expand Down Expand Up @@ -221,12 +229,14 @@ services:
- redis-cluster
- faber
- alice
- dummy-server
networks:
- acapy_default
extra_hosts:
- "faber:host-gateway"
- "relay:host-gateway"
- "alice:host-gateway"
- "dummy-server:host-gateway"

networks:
acapy_default:
Expand Down
10 changes: 0 additions & 10 deletions redis_events/integration/tests/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,6 @@ async def test_base_redis_keys_are_set(redis):
async def test_outbound_queue_removes_messages_from_queue_and_deliver_sends_them(faber: Agent, established_connection: str, redis):
faber.send_message(established_connection, "Hello Alice")
faber.send_message(established_connection, "Another Alice")
msg_received = False
retry_pop_count = 0
while not msg_received:
msg = await redis.blpop("acapy_outbound", 2)
if not msg:
if retry_pop_count > 3:
raise Exception("blpop call failed to retrieve message")
retry_pop_count = retry_pop_count + 1
time.sleep(1)
msg_received = True
messages = faber.retrieve_basicmessages()['results']
assert "Hello Alice" in (msg['content'] for msg in messages)
assert "Another Alice" in (msg['content'] for msg in messages)
Expand Down
55 changes: 24 additions & 31 deletions redis_events/redis_events/v1_0/redis_queue/events/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
"""ACA-Py Event to Redis."""

import base64
import json
import logging
import base64
import re
from string import Template
from typing import Any, Optional, cast

from aries_cloudagent.config.injection_context import InjectionContext
from aries_cloudagent.core.event_bus import Event, EventBus, EventWithMetadata
from aries_cloudagent.core.profile import Profile
from aries_cloudagent.core.util import SHUTDOWN_EVENT_PATTERN, STARTUP_EVENT_PATTERN
from aries_cloudagent.config.injection_context import InjectionContext
from aries_cloudagent.transport.error import TransportError
from redis.asyncio import RedisCluster
from redis.exceptions import RedisError, RedisClusterException
from redis.exceptions import RedisClusterException, RedisError

from ..config import OutboundConfig, get_config, EventConfig
from ..config import EventConfig, OutboundConfig, get_config

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -99,8 +99,7 @@ async def handle_event(profile: Profile, event: EventWithMetadata):
try:
event_payload = process_event_payload(event.payload.payload)
except TypeError:
event_payload = process_event_payload(
event.payload.enc_payload)
event_payload = process_event_payload(event.payload.enc_payload)
payload = {
"wallet_id": wallet_id or "base",
"state": event_payload.get("state"),
Expand All @@ -110,8 +109,7 @@ async def handle_event(profile: Profile, event: EventWithMetadata):
}
webhook_urls = profile.settings.get("admin.webhook_urls")
try:
config_events = get_config(
profile.settings).event or EventConfig.default()
config_events = get_config(profile.settings).event or EventConfig.default()
template = config_events.event_topic_maps[event.metadata.pattern.pattern]
redis_topic = Template(template).substitute(**payload)
LOGGER.info(f"Sending message {payload} with topic {redis_topic}")
Expand All @@ -130,35 +128,30 @@ async def handle_event(profile: Profile, event: EventWithMetadata):
# Deliver/dispatch events to webhook_urls directly
if config_events.deliver_webhook and webhook_urls:
config_outbound = (
get_config(
profile.settings).outbound or OutboundConfig.default()
get_config(profile.settings).outbound or OutboundConfig.default()
)
for endpoint in webhook_urls:
api_key = None
if len(endpoint.split("#")) > 1:
endpoint_hash_split = endpoint.split("#")
endpoint = endpoint_hash_split[0]
api_key = endpoint_hash_split[1]
webhook_topic = config_events.event_webhook_topic_maps.get(
event.topic)
endpoint = f"{endpoint}/topic/{webhook_topic}/"
headers = {"x-wallet-id": wallet_id} if wallet_id else {}
if not api_key:
Copy link
Contributor Author

@jamshale jamshale Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the main mistake... Not sure how it got missed.

headers["x-api-key"] = api_key
outbound = str.encode(
json.dumps(
{
"service": {"url": endpoint},
"payload": base64.urlsafe_b64encode(
str.encode(json.dumps(payload))
).decode(),
"headers": headers,
}
),
)
await redis.rpush(
config_outbound.acapy_outbound_topic,
outbound,
)
webhook_topic = config_events.event_webhook_topic_maps.get(event.topic)
if endpoint and webhook_topic:
endpoint = f"{endpoint}/topic/{webhook_topic}/"
headers = {"x-wallet-id": wallet_id} if wallet_id else {}
if api_key is not None:
headers["x-api-key"] = api_key
outbound_msg = {
"service": {"url": endpoint},
"payload": base64.urlsafe_b64encode(
str.encode(json.dumps(payload))
).decode(),
"headers": headers,
}
await redis.rpush(
config_outbound.acapy_outbound_topic,
str.encode(json.dumps(outbound_msg)),
)
except (RedisError, RedisClusterException, ValueError) as err:
LOGGER.exception(f"Failed to process and send webhook, {err}")
Loading