Skip to content

Commit

Permalink
feat: bulk publish (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
RaRhAeu authored Aug 8, 2024
1 parent e55266b commit 28da806
Show file tree
Hide file tree
Showing 26 changed files with 464 additions and 290 deletions.
7 changes: 1 addition & 6 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,4 @@ repos:
rev: v1.11.1
hooks:
- id: mypy
args:
[
"--install-types",
"--non-interactive",
"--enable-incomplete-feature=Unpack",
]
args: ["--install-types", "--non-interactive"]
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pip install 'eventiq[broker]'

```Python
import asyncio
from eventiq import Service, Middleware, CloudEvent
from eventiq import Service, Middleware, CloudEvent, GenericConsumer
from eventiq.backends.nats import JetStreamBroker

class SendMessageMiddleware(Middleware):
Expand All @@ -100,6 +100,13 @@ service = Service(
@service.subscribe(topic="test.topic")
async def example_run(message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")

@service.subscribe(topic="test.topic2")
class MyConsumer(GenericConsumer[CloudEvent]):
async def process(self, message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
await self.publish(CloudEvent(topic="test.topic", data={"response": "ok"})

```

Run with
Expand Down
2 changes: 1 addition & 1 deletion eventiq/__about__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.3.0rc1"
__version__ = "0.3.0rc2"
15 changes: 10 additions & 5 deletions eventiq/asyncapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def get_topic_parameters(

def generate_receive_operation(
consumer: Consumer,
broker: Broker,
service: Service,
channels_params: dict[str, Any],
spec: AsyncAPI,
tags: dict[str, Tag],
Expand All @@ -106,11 +106,16 @@ def generate_receive_operation(
params = get_topic_parameters(consumer.topic, consumer.parameters)
for k, v in params.items():
channels_params[channel_id].setdefault(k, v)
content_type = (
consumer.decoder.CONTENT_TYPE
if consumer.decoder
else service.decoder.CONTENT_TYPE
)
message = Message(
name=event_type,
title=event_type,
description=consumer.event_type.__doc__,
contentType=broker.encoder.CONTENT_TYPE,
contentType=content_type,
payload=Reference(ref=f"#/components/schemas/{event_type}"),
**consumer.asyncapi_extra.get("message", {}),
)
Expand All @@ -122,7 +127,7 @@ def generate_receive_operation(

channel = Channel(
address=consumer.topic,
servers=[Reference(ref=f"#/servers/{broker.name}")],
servers=[Reference(ref=f"#/servers/{service.broker.name}")],
messages={
event_type: Reference(ref=f"#/channels/{channel_id}/messages/{event_type}"),
},
Expand Down Expand Up @@ -204,7 +209,7 @@ def populate_spec(service: Service, spec: AsyncAPI) -> None:
)
generate_receive_operation(
consumer,
service.broker,
service,
channels_params,
spec,
tags,
Expand All @@ -229,7 +234,7 @@ def get_async_api_spec(service: Service) -> AsyncAPI:
version=service.version,
**service.async_api_extra,
),
defaultContentType=service.broker.encoder.CONTENT_TYPE,
defaultContentType=service.encoder.CONTENT_TYPE,
servers={
service.broker.name: Server(
protocol=service.broker.protocol,
Expand Down
61 changes: 29 additions & 32 deletions eventiq/backends/kafka.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from datetime import timedelta
from datetime import datetime, timedelta
from itertools import chain
from typing import TYPE_CHECKING, Annotated, Any

Expand All @@ -9,16 +9,14 @@
from pydantic import AnyUrl, Field, UrlConstraints

from eventiq.broker import UrlBroker
from eventiq.exceptions import BrokerError
from eventiq.settings import UrlBrokerSettings
from eventiq.utils import utc_now

if TYPE_CHECKING:
from anyio.streams.memory import MemoryObjectSendStream

from eventiq import CloudEvent, Consumer
from eventiq.types import DecodedMessage, Encoder

from eventiq import Consumer
from eventiq.types import ID, DecodedMessage

KafkaUrl = Annotated[AnyUrl, UrlConstraints(allowed_schemes=["kafka"])]

Expand All @@ -28,9 +26,10 @@ class KafkaSettings(UrlBrokerSettings[KafkaUrl]):


class KafkaBroker(UrlBroker[ConsumerRecord, None]):
"""Kafka backend
"""
Kafka backend
:param consumer_options: extra options (defaults) for AIOKafkaConsumer
:param kwargs: Broker base class parameters.
:param kwargs: Broker base class parameters
"""

WILDCARD_MANY = "*"
Expand Down Expand Up @@ -114,10 +113,9 @@ async def ack(self, raw_message: ConsumerRecord) -> None:
await subscriber.commit(
{
TopicPartition(
raw_message.topic,
raw_message.partition,
): raw_message.offset + 1,
},
raw_message.topic, raw_message.partition
): raw_message.offset + 1
}
)

async def nack(self, raw_message: ConsumerRecord, delay: int | None = None) -> None:
Expand All @@ -130,39 +128,38 @@ async def disconnect(self) -> None:
@property
def publisher(self) -> AIOKafkaProducer:
if self._publisher is None:
msg = "Broker not connected"
raise BrokerError(msg)
raise self.connection_error
return self._publisher

async def connect(self) -> None:
if self._publisher is None:
publisher = AIOKafkaProducer(
bootstrap_servers=self.url,
**self.connection_options,
_publisher = AIOKafkaProducer(
bootstrap_servers=self.url, **self.connection_options
)
await publisher.start()
self._publisher = publisher
self._publisher = _publisher
await _publisher.start()

async def publish(
self,
message: CloudEvent,
encoder: Encoder | None = None,
key: Any | None = None,
partition: Any | None = None,
headers: dict[str, str] | None = None,
topic: str,
body: bytes,
*,
headers: dict[str, str],
message_id: ID | None = None,
message_time: datetime | None = None,
timestamp_ms: int | None = None,
partition: int | None = None,
**kwargs: Any,
) -> None:
data = self._encode_message(message, encoder)
timestamp_ms = timestamp_ms or int(message.time.timestamp() * 1000)
key = key or getattr(message, "key", str(message.id))
headers = headers or {}
headers.setdefault("Content-Type", self.encoder.CONTENT_TYPE)
if message_id:
message_id = str(message_id)
if message_time and not timestamp_ms:
timestamp_ms = int(message_time.timestamp() * 1000)
await self.publisher.send(
topic=message.topic,
value=data,
key=key,
topic=topic,
value=body,
key=message_id,
partition=partition,
headers=headers,
timestamp_ms=timestamp_ms,
headers=headers,
)
53 changes: 22 additions & 31 deletions eventiq/backends/nats.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from eventiq.broker import R, UrlBroker
from eventiq.exceptions import BrokerError
from eventiq.results import AnyModel, ResultBackend
from eventiq.results import ResultBackend
from eventiq.settings import UrlBrokerSettings
from eventiq.utils import to_float, utc_now

Expand All @@ -40,7 +40,6 @@ class JetStreamSettings(NatsSettings):
from nats.js.kv import KeyValue

from eventiq import CloudEvent, Consumer
from eventiq.types import Encoder


class AbstractNatsBroker(UrlBroker[NatsMsg, R], ABC):
Expand Down Expand Up @@ -79,10 +78,6 @@ async def wrapped(error: Exception | None = None) -> None:
def decode_message(raw_message: NatsMsg) -> DecodedMessage:
return raw_message.data, raw_message.headers

@staticmethod
def get_message_data(raw_message: NatsMsg) -> bytes:
return raw_message.data

@staticmethod
def get_message_metadata(raw_message: NatsMsg) -> dict[str, str]:
try:
Expand Down Expand Up @@ -148,15 +143,14 @@ async def sender(

async def publish(
self,
message: CloudEvent,
encoder: Encoder | None = None,
topic: str,
body: bytes,
*,
headers: dict[str, str],
**kwargs: Any,
) -> None:
data = self._encode_message(message, encoder)
reply = kwargs.get("reply", "")
headers = message.headers
headers.setdefault("Content-Type", message.content_type)
await self.client.publish(message.topic, data, headers=headers, reply=reply)
await self.client.publish(topic, body, headers=headers, reply=reply)
if self._auto_flush or kwargs.get("flush"):
await self.flush()

Expand All @@ -173,6 +167,7 @@ class JetStreamBroker(
"""

Settings = JetStreamSettings
kv_error = "KeyVal not initialized"

def __init__(
self,
Expand All @@ -193,38 +188,34 @@ async def init_storage(self) -> None:
@property
def kv(self) -> KeyValue:
if self._kv is None:
err = "KeyVal not initialized"
raise BrokerError(err)
raise BrokerError(self.kv_error)
return self._kv

async def get_result(self, key: str) -> Any:
async def get_result(self, key: str) -> bytes | None:
try:
data = await self.kv.get(key)
if data.value:
return self.decoder.decode(data.value)
return data.value # noqa: TRY300
except KeyNotFoundError:
self.logger.warning("Key %s not found", key)

async def store_result(self, key: str, result: AnyModel) -> None:
await self.kv.put(key, self.encoder.encode(result))
async def store_result(self, key: str, result: bytes) -> None:
await self.kv.put(key, result)

async def publish(
self,
message: CloudEvent,
encoder: Encoder | None = None,
topic: str,
body: bytes,
*,
headers: dict[str, str],
message_id: str | None = None,
timeout: float | None = None,
stream: str | None = None,
**kwargs: Any,
) -> api.PubAck:
encoder = encoder or self.encoder
data = encoder.encode(message)
headers = message.headers
headers.setdefault("Content-Type", message.content_type)
headers.setdefault("Nats-Msg-Id", str(message.id))
if "Nats-Msg-Id" not in headers and message_id:
headers["Nats-Msg-Id"] = message_id
response = await self.js.publish(
subject=message.topic,
payload=data,
timeout=kwargs.get("timeout"),
stream=kwargs.get("stream"),
headers=headers,
topic, payload=body, timeout=timeout, stream=stream, headers=headers
)
if self._auto_flush:
await self.flush()
Expand Down
Loading

0 comments on commit 28da806

Please sign in to comment.