Skip to content

Commit

Permalink
Added support for bulk consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
jarmatys committed Sep 3, 2024
1 parent 33feba5 commit 43cc720
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 6 deletions.
36 changes: 30 additions & 6 deletions API/SOFTURE.MessageBroker.Rabbit/DependencyInjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using SOFTURE.Common.HealthCheck;
using SOFTURE.Contract.Common.Messaging;
using SOFTURE.MessageBroker.Rabbit.Filters;
using SOFTURE.MessageBroker.Rabbit.HealthChecks;
using SOFTURE.MessageBroker.Rabbit.Settings;
Expand Down Expand Up @@ -39,22 +40,30 @@ public static IServiceCollection AddCommonPublisher<TSettings>(this IServiceColl
return services;
}

public static IServiceCollection AddCommonConsumers<TSettings>(this IServiceCollection services, Assembly assembly)
public static IServiceCollection AddCommonConsumers<TSettings>(
this IServiceCollection services,
Assembly assembly,
int retryCount = 0)
where TSettings : IRabbitSettings
{
var consumerTypes = GetConsumers(assembly);
var consumerTypes = GetConsumers<IMessage>(assembly);
var bulkConsumerTypes = GetConsumers<IBulkMessage>(assembly);

var allConsumerTypes = consumerTypes
.Concat(bulkConsumerTypes)
.ToList();

services.AddMassTransit(config =>
{
foreach (var type in consumerTypes)
foreach (var type in allConsumerTypes)
{
config.AddConsumer(type);
}

config.UsingRabbitMq((ctx, cfg) =>
{
var consumerSettings = ctx.GetRequiredService<TSettings>().Rabbit;

cfg.UseInMemoryOutbox(ctx);

cfg.UseConsumeFilter(typeof(ContextConsumeLoggingFilter<>), ctx);
Expand All @@ -75,6 +84,19 @@ public static IServiceCollection AddCommonConsumers<TSettings>(this IServiceColl
foreach (var type in consumerTypes)
{
c.ConfigureConsumer(ctx, type);
c.PrefetchCount = 1;

if (retryCount != 0)
c.UseMessageRetry(r => r.Immediate(retryCount));
}

foreach (var type in bulkConsumerTypes)
{
c.ConfigureConsumer(ctx, type);
c.PrefetchCount = 50;

if (retryCount != 0)
c.UseMessageRetry(r => r.Immediate(retryCount));
}
});
});
Expand All @@ -85,12 +107,14 @@ public static IServiceCollection AddCommonConsumers<TSettings>(this IServiceColl
return services;
}

private static List<Type> GetConsumers(Assembly assembly)
private static List<Type> GetConsumers<T>(Assembly assembly) where T : class
{
var consumerTypes = assembly.GetTypes()
.Where(t =>
t.GetInterfaces().Any(i =>
i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IConsumer<>)) &&
i.IsGenericType &&
i.GetGenericTypeDefinition() == typeof(IConsumer<>) &&
typeof(T).IsAssignableFrom(i.GetGenericArguments()[0])) &&
!t.IsAbstract)
.ToList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,8 @@
<None Include="..\..\LICENSE" Pack="true" PackagePath="\"/>
</ItemGroup>

<ItemGroup>
<PackageReference Include="SOFTURE.Contract.Common" Version="0.0.10" />
</ItemGroup>

</Project>

0 comments on commit 43cc720

Please sign in to comment.