From 43cc720aa347b588f34154dd75844f96e3d8fefb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Armatys?= Date: Tue, 3 Sep 2024 17:29:41 +0200 Subject: [PATCH] Added support for bulk consumers --- .../DependencyInjection.cs | 36 +++++++++++++++---- .../SOFTURE.MessageBroker.Rabbit.csproj | 4 +++ 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/API/SOFTURE.MessageBroker.Rabbit/DependencyInjection.cs b/API/SOFTURE.MessageBroker.Rabbit/DependencyInjection.cs index 31783f2..08f6da5 100644 --- a/API/SOFTURE.MessageBroker.Rabbit/DependencyInjection.cs +++ b/API/SOFTURE.MessageBroker.Rabbit/DependencyInjection.cs @@ -2,6 +2,7 @@ using MassTransit; using Microsoft.Extensions.DependencyInjection; using SOFTURE.Common.HealthCheck; +using SOFTURE.Contract.Common.Messaging; using SOFTURE.MessageBroker.Rabbit.Filters; using SOFTURE.MessageBroker.Rabbit.HealthChecks; using SOFTURE.MessageBroker.Rabbit.Settings; @@ -39,14 +40,22 @@ public static IServiceCollection AddCommonPublisher(this IServiceColl return services; } - public static IServiceCollection AddCommonConsumers(this IServiceCollection services, Assembly assembly) + public static IServiceCollection AddCommonConsumers( + this IServiceCollection services, + Assembly assembly, + int retryCount = 0) where TSettings : IRabbitSettings { - var consumerTypes = GetConsumers(assembly); + var consumerTypes = GetConsumers(assembly); + var bulkConsumerTypes = GetConsumers(assembly); + + var allConsumerTypes = consumerTypes + .Concat(bulkConsumerTypes) + .ToList(); services.AddMassTransit(config => { - foreach (var type in consumerTypes) + foreach (var type in allConsumerTypes) { config.AddConsumer(type); } @@ -54,7 +63,7 @@ public static IServiceCollection AddCommonConsumers(this IServiceColl config.UsingRabbitMq((ctx, cfg) => { var consumerSettings = ctx.GetRequiredService().Rabbit; - + cfg.UseInMemoryOutbox(ctx); cfg.UseConsumeFilter(typeof(ContextConsumeLoggingFilter<>), ctx); @@ -75,6 +84,19 @@ public static IServiceCollection AddCommonConsumers(this IServiceColl foreach (var type in consumerTypes) { c.ConfigureConsumer(ctx, type); + c.PrefetchCount = 1; + + if (retryCount != 0) + c.UseMessageRetry(r => r.Immediate(retryCount)); + } + + foreach (var type in bulkConsumerTypes) + { + c.ConfigureConsumer(ctx, type); + c.PrefetchCount = 50; + + if (retryCount != 0) + c.UseMessageRetry(r => r.Immediate(retryCount)); } }); }); @@ -85,12 +107,14 @@ public static IServiceCollection AddCommonConsumers(this IServiceColl return services; } - private static List GetConsumers(Assembly assembly) + private static List GetConsumers(Assembly assembly) where T : class { var consumerTypes = assembly.GetTypes() .Where(t => t.GetInterfaces().Any(i => - i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IConsumer<>)) && + i.IsGenericType && + i.GetGenericTypeDefinition() == typeof(IConsumer<>) && + typeof(T).IsAssignableFrom(i.GetGenericArguments()[0])) && !t.IsAbstract) .ToList(); diff --git a/API/SOFTURE.MessageBroker.Rabbit/SOFTURE.MessageBroker.Rabbit.csproj b/API/SOFTURE.MessageBroker.Rabbit/SOFTURE.MessageBroker.Rabbit.csproj index 597c823..5e43b18 100644 --- a/API/SOFTURE.MessageBroker.Rabbit/SOFTURE.MessageBroker.Rabbit.csproj +++ b/API/SOFTURE.MessageBroker.Rabbit/SOFTURE.MessageBroker.Rabbit.csproj @@ -51,4 +51,8 @@ + + + +