How can I create a durable consumer? #234
-
I am trying to
The code looks like this: private async Task CreateProcessor(NatsConsumerMessageProcessorOptions<T> options, CancellationToken cancellationToken)
{
var msgSerializer = new NatsMessagePackContextSerializer<T>();
var consumerConfig = new ConsumerConfig
{
MaxDeliver = options.MaxDeliver,
AckWait = options.AckWaitMs,
AckPolicy = ConsumerConfigAckPolicy.Explicit,
DurableName = options.NatsDurableName,
DeliverSubject = options.NatsSubjectName,
MaxAckPending = options.MaxAckPending,
MaxBatch = options.BatchSize,
};
var consumer = await js.CreateConsumerAsync(options.JetStreamName, consumerConfig, cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
try
{
await consumer.RefreshAsync(cancellationToken); // or try to recreate consumer
await foreach (var msg in consumer.ConsumeAsync(serializer: msgSerializer).WithCancellation(cancellationToken))
{
// Process message, let processor to ack/nack messages
await options.ProcessMsgFunc(msg, cancellationToken).ConfigureAwait(false);
await msg.AckAsync(cancellationToken: cancellationToken);
}
}
catch (NatsJSProtocolException e)
{
//no need to Nack any messages here because exception are already handled at this point
logger.Error(e, "Exception while processing NATS v2 message");
}
catch (NatsJSException e)
{
// log exception
logger.Error(e, "Exception while processing NATS NatsJSException v2 message");
await Task.Delay(1000, cancellationToken); // backoff
}
}
} What is the proper way to create a durable consumer? |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 8 replies
-
Off the top of my head I think server is expecting a var consumerConfig = new ConsumerConfig
{
MaxDeliver = options.MaxDeliver,
AckWait = options.AckWaitMs,
AckPolicy = ConsumerConfigAckPolicy.Explicit,
Name = options.NatsDurableName, // <--- here
DurableName = options.NatsDurableName,
// DeliverSubject = options.NatsSubjectName, <-- remove
MaxAckPending = options.MaxAckPending,
MaxBatch = options.BatchSize,
}; Or you can use the constructor which takes care of the minimum defaults: var consumerConfig = new ConsumerConfig(options.NatsDurableName)
{
MaxDeliver = options.MaxDeliver,
AckWait = options.AckWaitMs,
// Set by the .ctor
// AckPolicy = ConsumerConfigAckPolicy.Explicit,
// Name = options.NatsDurableName,
// DurableName = options.NatsDurableName,
// DeliverSubject = options.NatsSubjectName, <-- remove
MaxAckPending = options.MaxAckPending,
MaxBatch = options.BatchSize,
}; edit: |
Beta Was this translation helpful? Give feedback.
-
I've set the name in consumer config, but when trying to create the consumer, the following exception is thrown:
|
Beta Was this translation helpful? Give feedback.
-
I think it is. I think it should throw an exception at that point because it is not recoverable. I created an issue #236 feel free to chip in. |
Beta Was this translation helpful? Give feedback.
-
Something strange happens with the consumer code above: although I publish a single message in the stream, the consumer code is called multiple times immediately. The same message is delivered multiple times (NumDelivered increments) as if the previous delivery was not acked (I am acking manually in ProcessMsgFunc later on). The culprit might be the value of AckWait which is not set right. As it seems, the AckWait must be provided as nanoseconds, therefore the conversion in my code from millis. var consumerConfig = new ConsumerConfig(options.NatsDurableName)
{
DurableName = options.NatsDurableName,
MaxDeliver = options.MaxDeliver,
AckWait = options.AckWaitMs * 1_000_000,
//DeliverSubject = options.NatsSubjectName,
MaxAckPending = options.MaxAckPending,
MaxBatch = options.BatchSize,
FilterSubject = options.NatsSubjectName
};
var consumer = await js.CreateConsumerAsync(options.JetStreamName, consumerConfig, cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
try
{
await consumer.RefreshAsync(cancellationToken); // or try to recreate consumer
var opts = new NatsJSConsumeOpts { MaxMsgs = options.BatchSize };
await foreach (var msg in consumer.ConsumeAsync(opts: opts, serializer: msgSerializer).WithCancellation(cancellationToken))
{
// Process message, let processor to ack/nack messages
await options.ProcessMsgFunc(msg, cancellationToken).ConfigureAwait(false);
//await msg.AckAsync(cancellationToken: cancellationToken);
}
} |
Beta Was this translation helpful? Give feedback.
I think you might be hitting a bug. When you turn on logging do you see warnings of
409 Exceeded MaxRequestBatch
?Consume batch size is
1000
by default. Assuming youroptions.BatchSize
is less, can you try this see if it helps: