-
Is there an .Net example of how get the Dead Letter Queue for Max Deliver ($JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES)?. |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 11 replies
-
I don't know of a c# example but Jeremy discussed it on YouTube a few weeks back. Have you seen that? edit: here it is: https://www.youtube.com/watch?v=7Jp3tyCGMZs&t=1510s |
Beta Was this translation helpful? Give feedback.
-
Thank you for the sample code |
Beta Was this translation helpful? Give feedback.
-
I have started a new instance of Docker container. Now I cannot get the DLQ. What am I missing?
CODE using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using System.Text;
namespace ConsoleNatsDeadLetterQueue
{
internal class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("Hello, World DLQ!");
var opt = new NatsOpts
{
Url = "nats://127.0.0.1:4222",
Name = "Nats_Connect",
};
var consumerName = "processor";
var ackWait = TimeSpan.FromSeconds(10);
var ackPolicy = ConsumerConfigAckPolicy.Explicit;
var connect = new NatsConnection(opt);
var jsContext = new NatsJSContext(connect);
var dlqstream = await jsContext.GetStreamAsync("EVENTS");
var stream = await jsContext.CreateStreamAsync(
new StreamConfig("DLQ", new[] { "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.EVENTS.*" }));
var consumer = await jsContext.CreateOrUpdateConsumerAsync("DLQ", new ConsumerConfig(consumerName)
{
DeliverPolicy = ConsumerConfigDeliverPolicy.All,
FilterSubject = "data.>",
AckPolicy = ackPolicy,
AckWait = ackWait,
});
await foreach (NatsJSMsg<byte[]> msg in consumer.ConsumeAsync<byte[]>())
{
Console.WriteLine($"DLQ Subject: {msg.Subject} Size: {msg.Size} Subject: {msg.Subject}");
Console.WriteLine("Data: " + Encoding.UTF8.GetString(msg.Data));
var md = msg.Metadata;
if (md.HasValue)
{
Console.WriteLine($"DLQ Num Delivered: {md.Value.NumDelivered} TimeStamp: {md.Value.Timestamp.ToLocalTime()}");
Console.WriteLine($"DLQ Consumer Seq: {md.Value.Sequence.Consumer} Stream Seq: {md.Value.Sequence.Stream}");
StreamMsgGetRequest streamMsgGetRequest = new StreamMsgGetRequest { Seq = md.Value.Sequence.Stream };
var sget = await stream.GetAsync(streamMsgGetRequest);
Console.WriteLine("DLQ Message: " + Encoding.UTF8.GetString(Convert.FromBase64String(sget.Message.Data)));
var resmsg = await dlqstream.GetAsync(streamMsgGetRequest);
Console.WriteLine("DLQ Data: " + Encoding.UTF8.GetString(Convert.FromBase64String(resmsg.Message.Data)));
}
}
Console.WriteLine("Done");
}
public class Person
{
public string Name { get; set; } = string.Empty;
public string Address { get; set; } = string.Empty;
public string Serial { get; set; } = "SerialNumber";
}
}
} |
Beta Was this translation helpful? Give feedback.
-
should you use |
Beta Was this translation helpful? Give feedback.
you can get directly from the stream. note message data is base64 encoded: