Skip to content

Commit

Permalink
✨ Re-add NATS Plugin after reset to upstream/main (#813)
Browse files Browse the repository at this point in the history
* 🎉 NATS Plugin 🚧 (#666)

* ⬆️ Update lock files

Signed-off-by: ff137 <ff137@proton.me>

* 🚀 take on NATS plugin

Copied from redis_events plugin, and replaced redis with nats-py (and json with orjson)

Signed-off-by: ff137 <ff137@proton.me>

* ✅ basic unit tests for events

Signed-off-by: ff137 <ff137@proton.me>

* 👷 skip nats_events integration testing

Signed-off-by: ff137 <ff137@proton.me>

* ✨ read `NATS_CREDS_FILE` from env vars

Signed-off-by: ff137 <ff137@proton.me>

* Update nats_events/README.md

Co-authored-by: Henry Msiska <henrymsi+dev@gmail.com>
Signed-off-by: Mourits de Beer <31511766+ff137@users.noreply.github.com>

* 🎨 rename nats topic to subject

Signed-off-by: ff137 <ff137@proton.me>

* 🎨 replace f-string interpolation in logs

Signed-off-by: ff137 <ff137@proton.me>

* ✨ use JetStream. Define streams on startup

Signed-off-by: ff137 <ff137@proton.me>

* ✨ bind and re-use JetStream context

Signed-off-by: ff137 <ff137@proton.me>

* ✨ handle jetstream publish ack

Signed-off-by: ff137 <ff137@proton.me>

* 🎨 remove use of `cast`

Signed-off-by: ff137 <ff137@proton.me>

* 🎨 more clear setup of jetstream context, instead of just nats

Signed-off-by: ff137 <ff137@proton.me>

* ✅ fix tests

Signed-off-by: ff137 <ff137@proton.me>

* 🎨 update NATS subject map to use `.`-pattern

Signed-off-by: ff137 <ff137@proton.me>

* Update nats_events/README.md

Co-authored-by: Henry Msiska <henrymsi+dev@gmail.com>
Signed-off-by: Mourits de Beer <31511766+ff137@users.noreply.github.com>

* 🙈 don't ignore vscode settings

Signed-off-by: ff137 <ff137@proton.me>

* 🔧 vscode cspell settings

Signed-off-by: ff137 <ff137@proton.me>

* 🎨

Signed-off-by: ff137 <ff137@proton.me>

* 🚧 add some verbose logging for testing

Signed-off-by: ff137 <ff137@proton.me>

* 🎨 update method for nats connection args

Signed-off-by: ff137 <ff137@proton.me>

* 🚧 debug timeout error for defining stream, change stream name

Signed-off-by: ff137 <ff137@proton.me>

* 🚧 debug on_startup connecting to jetstream

Signed-off-by: ff137 <ff137@proton.me>

* 🚧 test replacing . in name with _

Signed-off-by: ff137 <ff137@proton.me>

* ✨ publish with retry -- correctly handle PubAck

Signed-off-by: ff137 <ff137@proton.me>

* 🎨 log payload and no longer only write `with-state` records

Signed-off-by: ff137 <ff137@proton.me>

* 🎨 fix payload log type

Signed-off-by: ff137 <ff137@proton.me>

* 🎨 log levels

Signed-off-by: ff137 <ff137@proton.me>

* ✅ fix tests

Signed-off-by: ff137 <ff137@proton.me>

* ✨ handle OutboundMessage type (make it serialisable)

Signed-off-by: ff137 <ff137@proton.me>

* 🎨 retry should log error instead of raising exception

Signed-off-by: ff137 <ff137@proton.me>

* 🎨 handle optional target types

Signed-off-by: ff137 <ff137@proton.me>

* ✅ fix tests

Signed-off-by: ff137 <ff137@proton.me>

* 🎨 add words to cspell list

Signed-off-by: ff137 <ff137@proton.me>

* 🚧 remove define stream

Signed-off-by: ff137 <ff137@proton.me>

* 🎨 define one stream with many subjects

Signed-off-by: ff137 <ff137@proton.me>

* ⬆️ Update lock files

Signed-off-by: ff137 <ff137@proton.me>

* ⬆️ Update lock files

Signed-off-by: ff137 <ff137@proton.me>

---------

Signed-off-by: ff137 <ff137@proton.me>
Signed-off-by: Mourits de Beer <31511766+ff137@users.noreply.github.com>
Co-authored-by: Henry Msiska <henrymsi+dev@gmail.com>
Signed-off-by: ff137 <ff137@proton.me>

* 🎨 nats_events: update log levels (#733)

* 🎨 Update log levels

Signed-off-by: ff137 <ff137@proton.me>

* ⬆️ Upgrade uvicorn

Signed-off-by: ff137 <ff137@proton.me>

* ⬆️ Upgrade fastapi-slim

Signed-off-by: ff137 <ff137@proton.me>

* ⬆️ Upgrade pytest-asyncio

Signed-off-by: ff137 <ff137@proton.me>

* ⬆️ Update lock files

Signed-off-by: ff137 <ff137@proton.me>

* 🐛 trace -> debug

Signed-off-by: ff137 <ff137@proton.me>

---------

Signed-off-by: ff137 <ff137@proton.me>

* Skip-nats-outbound-messages (#734)

* ⚡ skip outbound message types, temporarily for performance reasons

Signed-off-by: ff137 <ff137@proton.me>

* ⬆️ Update lock files

Signed-off-by: ff137 <ff137@proton.me>

---------

Signed-off-by: ff137 <ff137@proton.me>

* ⬆️ Update dependencies

Signed-off-by: ff137 <ff137@proton.me>

* ⬆️ Update lock file

Signed-off-by: ff137 <ff137@proton.me>

* ⏪ Revert changes to other lock files

Signed-off-by: ff137 <ff137@proton.me>

---------

Signed-off-by: ff137 <ff137@proton.me>
Signed-off-by: Mourits de Beer <31511766+ff137@users.noreply.github.com>
Co-authored-by: Henry Msiska <henrymsi+dev@gmail.com>
  • Loading branch information
ff137 and henrymsiska committed Nov 12, 2024
1 parent 63cd990 commit c2b0267
Show file tree
Hide file tree
Showing 12 changed files with 4,007 additions and 1 deletion.
12 changes: 12 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,18 @@ updates:
- dependency-name: "*"
update-types: ["version-update:semver-major"]

# Maintain dependencies for Python Packages
- package-ecosystem: "pip"
directory: "/nats_events"
schedule:
interval: "weekly"
day: "monday"
time: "04:00"
timezone: "Canada/Pacific"
ignore:
- dependency-name: "*"
update-types: ["version-update:semver-major"]

# Maintain dependencies for Python Packages
- package-ecosystem: "pip"
directory: "/oid4vci/integration/afj_runner"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr-integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
declare -a changed_dirs=()
for dir in ./*/; do
current_folder=$(basename "$dir")
if [[ $current_folder == "plugin_globals" ]]; then
if [[ $current_folder == "plugin_globals" || [[ $current_folder == "nats_events" ]]; then
continue
fi
for changed_file in ${{ steps.changed-files.outputs.all_changed_files }}; do
Expand Down
20 changes: 20 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"cSpell.words": [
"acapy",
"actionmenu",
"aiohttp",
"basicmessage",
"basicmessages",
"cloudagent",
"CREDS",
"crids",
"devcontainer",
"didcomm",
"jetstream",
"keylist",
"linedata",
"Pydantic",
"resp",
"verkey"
]
}
22 changes: 22 additions & 0 deletions nats_events/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# NATS Events Plugin

## Overview

Welcome to the NATS Events Plugin! This plugin is designed to facilitate event handling using NATS as the messaging system. It is currently under active development, and we are working hard to implement all the necessary features and ensure robust functionality.

## Status

**Under Construction** 🚧

Please note that this plugin is still in the early stages of development. The following components are yet to be implemented:

- **Inbound Transporters**: Mechanisms to receive and process incoming messages.
- **Outbound Transporters**: Mechanisms to send messages to external systems.
- **Documentation**: Comprehensive documentation to guide users on setup, configuration, and usage.
- **Testing**: Thorough testing to ensure reliability and performance.

## Features

- **Event Handling**: Push events to NATS subjects.
- **Dynamic Topic Mapping**: Map event patterns to NATS subjects (analogous to topics in other systems systems like Kafka) using templates.
- **Metadata Enrichment**: Add metadata to events before publishing.
10 changes: 10 additions & 0 deletions nats_events/nats_events/definition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Version definitions for this plugin."""

versions = [
{
"major_version": 1,
"minimum_minor_version": 0,
"current_minor_version": 0,
"path": "v1_0",
}
]
9 changes: 9 additions & 0 deletions nats_events/nats_events/v1_0/nats_queue/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""ACA-Py Over NATS."""

import logging

from .config import get_config

LOGGER = logging.getLogger(__name__)

__all__ = ["get_config"]
164 changes: 164 additions & 0 deletions nats_events/nats_events/v1_0/nats_queue/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""NATS Queue configuration."""

import logging
from typing import Any, Mapping, Optional

from pydantic import BaseModel

LOGGER = logging.getLogger(__name__)

EVENT_TOPIC_MAP = {
"^acapy::webhook::(.*)$": "acapy.webhook.$wallet_id",
"^acapy::record::([^:]*)::([^:]*)$": "acapy.record-with-state.$wallet_id",
"^acapy::record::([^:])?": "acapy.record.$wallet_id",
"acapy::basicmessage::received": "acapy.basicmessage.received",
"acapy::problem_report": "acapy.problem_report",
"acapy::ping::received": "acapy.ping.received",
"acapy::ping::response_received": "acapy.ping.response_received",
"acapy::actionmenu::received": "acapy.actionmenu.received",
"acapy::actionmenu::get-active-menu": "acapy.actionmenu.get-active-menu",
"acapy::actionmenu::perform-menu-action": "acapy.actionmenu.perform-menu-action",
"acapy::keylist::updated": "acapy.keylist.updated",
"acapy::revocation-notification::received": "acapy.revocation-notification.received",
"acapy::revocation-notification-v2::received": "acapy.revocation-notification-v2.received", # noqa: E501
"acapy::forward::received": "acapy.forward.received",
"acapy::outbound-message::queued_for_delivery": "acapy.outbound-message.queued-for-delivery", # noqa: E501
}

EVENT_WEBHOOK_TOPIC_MAP = {
"acapy::basicmessage::received": "basicmessages",
"acapy::problem_report": "problem_report",
"acapy::ping::received": "ping",
"acapy::ping::response_received": "ping",
"acapy::actionmenu::received": "actionmenu",
"acapy::actionmenu::get-active-menu": "get-active-menu",
"acapy::actionmenu::perform-menu-action": "perform-menu-action",
"acapy::keylist::updated": "keylist",
}


def _alias_generator(key: str) -> str:
return key.replace("_", "-")


class ConnectionConfig(BaseModel):
"""Connection configuration model."""

connection_url: str

class Config:
"""Pydantic config."""

alias_generator = _alias_generator
populate_by_name = True

@classmethod
def default(cls):
"""Default connection configuration."""
return cls(connection_url="nats://default:test1234@localhost:4222")


class EventConfig(BaseModel):
"""Event configuration model."""

event_topic_maps: Mapping[str, str] = EVENT_TOPIC_MAP
event_webhook_topic_maps: Mapping[str, str] = EVENT_WEBHOOK_TOPIC_MAP
deliver_webhook: bool = True

class Config:
"""Pydantic config."""

alias_generator = _alias_generator
populate_by_name = True

@classmethod
def default(cls):
"""Default event configuration."""
return cls(
event_topic_maps=EVENT_TOPIC_MAP,
event_webhook_topic_maps=EVENT_WEBHOOK_TOPIC_MAP,
deliver_webhook=True,
)


class InboundConfig(BaseModel):
"""Inbound configuration model."""

acapy_inbound_topic: str = "acapy_inbound"
acapy_direct_resp_topic: str = "acapy_inbound_direct_resp"

class Config:
"""Pydantic config."""

alias_generator = _alias_generator
populate_by_name = True

@classmethod
def default(cls):
"""Default inbound configuration."""
return cls(
acapy_inbound_topic="acapy_inbound",
acapy_direct_resp_topic="acapy_inbound_direct_resp",
)


class OutboundConfig(BaseModel):
"""Outbound configuration model."""

acapy_outbound_topic: str = "acapy_outbound"
mediator_mode: bool = False

@classmethod
def default(cls):
"""Default outbound configuration."""
return cls(
acapy_outbound_topic="acapy_outbound",
mediator_mode=False,
)


class NATSConfig(BaseModel):
"""NATS configuration model."""

event: Optional[EventConfig] = EventConfig.default()
inbound: Optional[InboundConfig] = InboundConfig.default()
outbound: Optional[OutboundConfig] = OutboundConfig.default()
connection: ConnectionConfig

@classmethod
def default(cls):
"""Default NATS configuration."""
return cls(
event=EventConfig.default(),
inbound=InboundConfig.default(),
outbound=OutboundConfig.default(),
connection=ConnectionConfig.default(),
)


def process_config_dict(config_dict: dict) -> dict:
"""Add connection to inbound, outbound, event and return updated config."""
filter = ["inbound", "event", "outbound", "connection"]
for key, value in config_dict.items():
if key in filter:
config_dict[key] = value
return config_dict


def get_config(settings: Mapping[str, Any]) -> NATSConfig:
"""Retrieve producer configuration from settings."""
try:
LOGGER.debug("Constructing config from: %s", settings.get("plugin_config"))
config_dict = settings["plugin_config"].get("nats_queue", {})
LOGGER.debug("Retrieved: %s", config_dict)
config_dict = process_config_dict(config_dict)
config = NATSConfig(**config_dict)
except KeyError:
LOGGER.warning("Using default configuration")
config = NATSConfig.default()

LOGGER.debug("Returning config: %s", config.model_dump_json(indent=2))
LOGGER.debug(
"Returning config(aliases): %s", config.model_dump_json(by_alias=True, indent=2)
)
return config
Loading

0 comments on commit c2b0267

Please sign in to comment.