-
Hi! I'm just starting my journey with event sourcing and as a practical exercise, I'm migrating data feed processing from Celery to Nats Jetstream in my Django monolith app. I love how FastStream is structured - it is so cool to experiment with this library! However, I have a few design questions. In my previous Celery-based approach I've had separate queues for CPU and I/O-bound tasks. The CPU queue was processed by the prefork pool (one process per CPU core) while the I/O-bound task was consumed using a gevent pool of size 500-1000. Q1: Will it be correct if I:
Will I benefit from having multiple workers of my IO app consumer? Given that all handlers are using asyncio to process events. Q2: How do I throttle a maximum number of processed events of some type? I guess this is not something people usually do when processing events but here is my real-world case: a data processing handler emits thousands of events that require external HTTP requests. How can I make sure not to DDoS an endpoint? Q3: How to bulk read from a broker? I see there is an option of Q4: How do I keep handlers in separate locations of the app? I guess I need to setup my # io-consumer.py
from faststream import FastStream, Logger
from faststream.nats import JStream, NatsBroker
broker = NatsBroker("nats://localhost:4222")
stream = JStream(name="product_feeds")
app = FastStream(broker)
# project/app1/handlers.py
from serve import broker
@broker.subscriber(
"product_feeds",
stream=stream,
durable="my_app2",
)
async def handler(msg: str, logger: Logger):
logger.info(msg) Is this the correct approach? -- |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 5 replies
-
I've read the docs a little bit further. So I guess:
Am I right? |
Beta Was this translation helpful? Give feedback.
@pySilver sorry for answering so late, I am just missed the question. Next time be free to tag me directly or ask for help in discord.
Q1: it is ok to create two different FastStream instances to process CPU and I/O-bound tasks separately. It is required to don't block your broker at CPU-bounded ones and still process I/O tasks. Workers always gives you scaling benefits, but there is no reason to creat a lot of workers if you have no such strong incoming stream.
Q2: max_workers is an option allows you to consume multiple messages from the same subject in the same time without workers. It doesn't help you to throttle messages and we have no such mechanism in box, sorry
Q3: sure, PullSubscr…