Qluent is a Fluent Queue Client
Qluent provides a very simple Fluent API and wrapper classes around the Microsoft Azure Storage SDK, allowing you to interact with storage queues using strongly typed objects like this.
var q = await Builder
.CreateAQueueOf<Task>()
.UsingStorageQueue("my-test-queue")
.BuildAsync();
await q.PushAsync(new Task());
var consumer = Builder
.CreateAConsumerFor<Task>()
.UsingQueue(q)
.ThatHandlesMessagesUsing((msg) => { Console.WriteLine($"Processing {msg.Value.TaskId}"); return true; })
.Build();
await consumer.Start()
By default the builder will create a queue connected to development storage.
var q = await Builder
.CreateAQueueOf<Person>()
.UsingStorageQueue("my-test-queue")
.BuildAsync();
Or you can explicitly provide a connection string to a specific storage account.
var q = await Builder
.CreateAQueueOf<Person>()
.ConnectedToAccount("UseDevelopmentStorage=true")
.UsingStorageQueue("my-test-queue")
.BuildAsync();
You can clear all messages from a queue.
await q.PurgeAsync();
You can check the approximate message count on a queue.
var count = await q.CountAsync()
Queues are created for a specific type. You can push an object of that type directly to the queue.
var q = await Builder
.CreateAQueueOf<Person>()
.UsingStorageQueue("my-test-queue")
.BuildAsync();
var person = new Person("John");
await q.PushAsync(person);
You can also push an IEnumerable<T>
of messages to the queue.
List<Person> people = new List<Person>();
await q.PushAsync(people);
You can directly Pop an object off the queue. This will dequeue the
CloudQueueMessage
, attempt to deserialize it and if deserialization succeeds
remove it from the queue and return it to you.
If deserialization fails, the default behavior is to throw an exception. This will result in the message's dequeue count increasing, and it reappearing on the queue after it's visibility timeout expires.
See: [Handling Poison Messages] for more info.
var q = await Builder
.CreateAQueueOf<Person>()
.UsingStorageQueue("my-test-queue")
.BuildAsync();
var person = await q.PopAsync();
If you don't want to remove the object from the queue, you can peek at it instead.
var person = await q.PeekAsync();
You can also Peek or Pop multiple messages at a time by passing a message count.
IEnumerable<Person> peekedPeople = await q.PeekAsync(5);
IEnumerable<Person> poppedPeople = await q.PopAsync(5);
The Azure SDK supports a two phase dequeue process. First, the message is received from the queue. Second, the message is deleted from the queue. This allows a consumer to attempt processing in between these two steps, and if processing fails, the client can abort the operation and the message will appear on the queue again after it's visibilty timeout expires.
The previous PopAsync
methods perform both Get & Delete operations in one.
If more control is required, by the consumer, Qluent also provides:
GetAsync
, which returns aIMessage<T>
wrapper object including the underlying message Id & PopReceiptDeleteAsync
, which accepts aIMessage<T>
var q = await Builder
.CreateAQueueOf<Person>()
.UsingStorageQueue("my-test-queue")
.BuildAsync();
var wrappedPerson = await q.GetAsync();
try
{
//attempt to process wrappedPerson.Value;
await q.DeleteAsync(wrappedPerson);
}
catch(Exception ex){ ... }
Qluent provides a simple message consumer which you can provide with a queue and configure to handle your messages. This allows you to focus on the message processing without worrying about the dispatcher/polling logic.
To create a consumer, you first build a queue, and then build a consumer using that queue.
//Create Queue
var consumerQueue = await Builder
.CreateAQueueOf<Job>()
.UsingStorageQueue("my-job-queue")
.BuildAsync();
//Create Consumer
var consumer = Builder
.CreateAMessageConsumerFor<Job>()
.UsingQueue(consumerQueue)
...
.Build();
To run the consumer, you call the Start()
method. The polling loop can be terminated by triggering a cancellation token.
var cancellationTokenSource = new CancellationTokenSource();
await consumer.Start(cancellationTokenSource.Token);
//later
cancellationTokenSource.Cancel();
The message consumer supports 2 handlers for processing your messages.
- A message handler which describes how to process your messages
- A failed message handler which describes what to do when the message handler fails
var consumer = Builder
.CreateAMessageConsumerFor<Job>()
.UsingQueue(consumerQueue)
.ThatHandlesMessagesUsing(HandleMessage)
.AndHandlesFailedMessagesUsing(HandleFailure)
.Build();
The message handlers can be passed to the fluent api as either
- a
Func<IMessage<T>, bool>
- an implementation of
Qluent.Consumers.Handlers.IMessageHandler<T>
In the event the main message handler throws an exception, a special handler is available for post-processing messages.
var consumer = Builder
.CreateAMessageConsumerFor<Job>()
.UsingQueue(consumerQueue)
...
.AndHandlesExceptionsUsing(HandleException)
.Build();
The message exception handler can be passed to the fluent api as either
- a
Func<IMessage<T>, Exception, bool>
- a implementation of
Qluent.Consumers.Handlers.IMessageExceptionHandler<T>
The consumer supports a number of other settings as well.
You can specify a unique Id for your consumer. This is useful for logging if you intend to host multiple consumers in the same app.
var consumer = Builder
.CreateAMessageConsumerFor<Job>()
.UsingQueue(consumerQueue)
.WithAnIdOf("my-custom-id")
...
.Build();
You can specify whether or not an unhandled exception should kill or cause your app to continue. Can be used in conjunction with the exception handling behavior of a queue to bubble and trap serialization exceptions.
var consumer = Builder
.CreateAMessageConsumerFor<Job>()
.UsingQueue(consumerQueue)
.AndHandlesExceptions(By.Continuing)
...
.Build();
While polling an empty queue the Consumer can be configured how often to re-poll while waiting. By default the consumer will requery the queue every 5 seconds.
You can override this behavior by providing a custom Qluent.Consumers.Policies.IMessageConsumerQueuePollingPolicy
The library provides 2 built in policies
SetIntervalQueuePollingPolicy
which specifies a set number of seconds between pollsBackOffQueuePolingPolicy
which backs off the poling frequency by doubling the interval from 1 second to 60 seconds
var consumer = Builder
.CreateAMessageConsumerFor<Job>()
.UsingQueue(consumerQueue)
...
.WithAQueuePolingPolicyOf(new SetIntervalQueuePolingPolicy(5))
.Build();
The consumer supports very basic logging using NLog.
Simply configure NLog as you typically would (specifying targets, formats and filter rules) and the consumer will output messages on progress including startup, shutdown, handler execution & exception messages.
The IAzureStorageQueue
Builder supports a number of other advanced features that let
you control the more common aspects of StorageQueues without having to dive into the SDK.
You can provide a number of settings to override the various message visbility and time to live settings.
You can set a delay time before the message appears to consumers on the queue.
var q = await Builder
.CreateAQueueOf<Person>()
.UsingStorageQueue("my-test-queue")
.ThatDelaysMessageVisibilityAfterEnqueuingFor(TimeSpan.FromMinutes(1))
.BuildAsync();
You can specify the duration that a message remains invisible for after it's been dequeued, useful in combination with handling poison messages.
var q = await Builder
.CreateAQueueOf<Person>()
.UsingStorageQueue("my-test-queue")
.ThatKeepsMessagesInvisibleAfterDequeuingFor(TimeSpan.FromMinutes(1))
.BuildAsync();
You can specify the duration that message will remain alive on the queue if no consumers dequeue them.
var q = await Builder
.CreateAQueueOf<Person>()
.UsingStorageQueue("my-test-queue")
.ThatSetsAMessageTTLOf(TimeSpan.FromDays(1))
.BuildAsync();
When a message is removed from a Storage Queue, Qluent will attempt
to deserialize it into the <T>
you specified.
It is possible that deserialization might fail for a number of reasons. e.g. An unexpected/corrupted message may have been added to the queue which you cannot parse.
You can control how many times the library will attempt to dequeue and deserialize for you before it considers the message poison. Once considered poisonly, you can optionally choose to route it to another queue for analysis/later processing.
var jobQueue = await Builder
.CreateAQueueOf<Person>()
.UsingStorageQueue("my-test-queue")
.ThatConsidersMessagesPoisonAfter(3)
.AndSendsPoisonMessagesTo("my-poison-queue")
.BuildAsync();
You can specify what should happen when a poison message is detected. The default behavior
is to throw an exception each time the message fails to deserialize.
You can override that behavior by specifying that exceptions should be swallowed.
In this case, the Pop/Peek method will return null (or will remove the null result
from an IEnumerable<T>
).
var jobQueue = await Builder
.CreateAQueueOf<Job>()
.UsingStorageQueue("my-test-queue")
.ThatConsidersMessagesPoisonAfter(3)
.AndSendsPoisonMessagesTo("my-poison-queue")
.AndHandlesExceptionsOnPoisonMessages(By.SwallowingExceptions)
.BuildAsync();
The library is built against the .NET Standard 2.0 to target both .NET Framework & .NET Core. All Operations are asynchronous and support a method overload to pass a cancellation token.
var person = await q.PopAsync();
CancellationToken ct = new CancellationToken(false);
var person = await q.PopAsync(ct);
Within the library all asynchronous calls are postpended with a call to .ConfigureAwait(false)
.
During queue creation the library will perform an async operation to create the queue
if it doesn't exist. Therefore the builder (and previous examples) provide an awaitable
BuildAsync()
method. However if you need to create your queues in a non async manner
e.g. in a DI Container/Bootstrapper you can use the non async Build
method.
var q = Builder
.CreateAQueueOf<Person>()
.ConnectedToAccount("UseDevelopmentStorage=true")
.UsingStorageQueue("my-test-queue")
.Build();
By default Qluent will serialize your entities to Json Strings using
Json.NET.
Serialization is performed using the default JsonConvert
utility.
For scenarios, where your client does not control both ends of the queue, you may have to deal with messages that have been serialized differently.
To support this, Qluent allows your to pass your own custom binary or string serializer.
To create a custom binary serializer, implement the interface Qluent.Serialization.IBinaryMessageSerializer<T>
This will serialize/deserialize your message to a byte[]
and push/pop it to the queue as bytes.
var q = Builder
.CreateAQueueOf<Person>()
.ConnectedToAccount("UseDevelopmentStorage=true")
.UsingStorageQueue("my-test-queue")
.WithACustomSerializer(new CustomBinarySerializer())
.Build();
To create a custom binary serializer, implement the interface Qluent.Serialization.IStringMessageSerializer<T>
This will serialize/deserialize your message to a string
and push/pop it to the queue as string content.
var q = Builder
.CreateAQueueOf<Person>()
.ConnectedToAccount("UseDevelopmentStorage=true")
.UsingStorageQueue("my-test-queue")
.WithACustomSerializer(new CustomStringSerializer())
.Build();
There's a lot of ceremony involved when using the SDK for Azure Storage Queues. Create an account, create a client, create a queue reference, make sure it exists, object serialization/deserialization etc...
var storageAccount = CloudStorageAccount.Parse("UseDevelopmentStorage=true;");
var queueClient = storageAccount.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference("myqueue");
queue.CreateIfNotExists();
var person = new Person("John");
var serializedPerson = JsonConvert.Serialize(person)
var message = new CloudQueueMessage(serializedPerson);
queue.AddMessage(message);
var result = queue.GetMessage();
var deserializedPerson = JsonConvert.Deserialize<Person>(result.AsString);
queue.DeleteMessage(result);
I'm also not a fan of the architectural decision in the SDK to leave settings like message visbility up to the developer to decide on at the call site. If you're going to create your queues and access them via a DI framework, I'd prefer to centralize/standardize these settings at queue creation.
This project was borne out frustration with a several different things.
Firstly, my team and I have recently been working with Azure Durable Functions. They are still quite immature and given their static nature and the lack of support for DI frameworks like Autofac I found we were having to either write a lot of boiler plate code or rely on the built in binding/trigger capabilities which were lacking in certain amounts of control/capability. After seeing yet-another-queue-wrapper being written, I wanted to consolidate our approach.
Secondly, we have been wrestling with a legacy bug for a while now, which was the result of a predecessors decision to take a dependency on a nuget package for which support had long since waned and for which the source code was no longer available. After eventually decompiling and picking through some enterprise-fizz-buzz queue code, the problem was discovered.
Thirdly, I needed to scratch an itch 😏. I wanted to test out a number of things including building a fluent builder pattern, building a net standard 2.0 library that could be consumed by both NetFramework and NetCore20.
This is not an Enterprise Service Bus. It is a simple wrapper around Azure Storage Queues to make working with them a little easier.
There are lots of complicated things you may find yourself doing in a distributed environment. Complex Retry Policies; complicated routing paths; Pub/Sub models involving topics and queues; the list goes on.
If you find yourself needing to do something complex like this, then perhaps you should be looking at a different technology stack (Azure Service Bus, Event Hubs, Event Grid, Kafka, NService Bus, Mulesoft etc...)
Interface based RefactoringDocument calls properlySupport Cancellation Tokens so that they can be passed through.Support Pop Receipts so that the consumer can decide how to handle messagesWrite up the docs around message visibility when the above is done.NET Core TestsRemove the TransactionScope stuff & stick it in an experimental branchBuild a Sample Message ConsumerTest Harness- Unit Tests
DocumentationXMLDoc Comment the Public API
- Logging
NLog- Serilog
Big Documentation Tidy upNuget PackagesAppVeyor Setup