Skip to content

Commit

Permalink
fix autofac
Browse files Browse the repository at this point in the history
  • Loading branch information
ppossanzini committed Oct 11, 2023
1 parent 3148525 commit 394593f
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 46 deletions.
4 changes: 3 additions & 1 deletion Arbitrer.RabbitMQ/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private void InitConnection()
return JsonConvert.DeserializeObject<Messages.ResponseMessage<TResponse>>(result, options.SerializerSettings);
}

public async Task Notify<TRequest>(TRequest request, CancellationToken cancellationToken = default) where TRequest : INotification
public Task Notify<TRequest>(TRequest request, CancellationToken cancellationToken = default) where TRequest : INotification
{
var message = JsonConvert.SerializeObject(request, options.SerializerSettings);

Expand All @@ -118,6 +118,8 @@ public async Task Notify<TRequest>(TRequest request, CancellationToken cancellat
mandatory: false,
body: Encoding.UTF8.GetBytes(message)
);

return Task.CompletedTask;
}


Expand Down
4 changes: 3 additions & 1 deletion Arbitrer.standard20.RabbitMQ.Autofac/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private void InitConnection()
return JsonConvert.DeserializeObject<Messages.ResponseMessage<TResponse>>(result, options.SerializerSettings);
}

public async Task Notify<TRequest>(TRequest request, CancellationToken cancellationToken = default) where TRequest : INotification
public Task Notify<TRequest>(TRequest request, CancellationToken cancellationToken = default) where TRequest : INotification
{
var message = JsonConvert.SerializeObject(request, options.SerializerSettings);

Expand All @@ -119,6 +119,8 @@ public async Task Notify<TRequest>(TRequest request, CancellationToken cancellat
mandatory: false,
body: Encoding.UTF8.GetBytes(message)
);

return Task.CompletedTask;
}


Expand Down
90 changes: 49 additions & 41 deletions Arbitrer.standard20.RabbitMQ.Autofac/RequestsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading;
using System.Threading.Tasks;
using Autofac;
using Autofac.Core.Lifetime;
using MediatR;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
Expand All @@ -19,55 +20,55 @@ namespace Arbitrer.RabbitMQ
{
public class RequestsManager : IHostedService
{
private readonly ILogger<RequestsManager> logger;
private readonly IArbitrer arbitrer;
private readonly IContainer provider;
private readonly ILogger<RequestsManager> _logger;
private readonly IArbitrer _arbitrer;
private readonly ILifetimeScope _provider;

private IConnection _connection = null;
private IModel _channel = null;

private readonly HashSet<string> _deduplicationcache = new HashSet<string>();
private readonly HashSet<string> _deDuplicationCache = new HashSet<string>();
private readonly SHA256 _hasher = SHA256.Create();

private readonly MessageDispatcherOptions options;
private readonly MessageDispatcherOptions _options;

public RequestsManager(IOptions<MessageDispatcherOptions> options, ILogger<RequestsManager> logger, IArbitrer arbitrer, IContainer provider)
public RequestsManager(IOptions<MessageDispatcherOptions> options, ILogger<RequestsManager> logger, IArbitrer arbitrer, ILifetimeScope provider)
{
this.options = options.Value;
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
this.arbitrer = arbitrer;
this.provider = provider;
this._options = options.Value;
this._logger = logger ?? throw new ArgumentNullException(nameof(logger));
this._arbitrer = arbitrer;
this._provider = provider;
}

public Task StartAsync(CancellationToken cancellationToken)
{
if (_connection == null)
{
logger.LogInformation($"ARBITRER: Creating RabbitMQ Conection to '{options.HostName}'...");
_logger.LogInformation($"ARBITRER: Creating RabbitMQ Connection to '{_options.HostName}'...");
var factory = new ConnectionFactory
{
HostName = options.HostName,
UserName = options.UserName,
Password = options.Password,
VirtualHost = options.VirtualHost,
Port = options.Port,
HostName = _options.HostName,
UserName = _options.UserName,
Password = _options.Password,
VirtualHost = _options.VirtualHost,
Port = _options.Port,
DispatchConsumersAsync = true,
};

_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(Consts.ArbitrerExchangeName, ExchangeType.Topic);

logger.LogInformation($"ARBITRER: ready !");
_logger.LogInformation($"ARBITRER: ready !");
}


foreach (var t in arbitrer.GetLocalRequestsTypes())
foreach (var t in _arbitrer.GetLocalRequestsTypes())
{
var isNotification = typeof(INotification).IsAssignableFrom(t);
var queuename = $"{t.TypeQueueName()}${(isNotification ? Guid.NewGuid().ToString() : "")}";

_channel.QueueDeclare(queue: queuename, durable: options.Durable, exclusive: isNotification, autoDelete: options.AutoDelete, arguments: null);
_channel.QueueDeclare(queue: queuename, durable: _options.Durable, exclusive: isNotification, autoDelete: _options.AutoDelete, arguments: null);
_channel.QueueBind(queuename, Consts.ArbitrerExchangeName, t.TypeQueueName());


Expand All @@ -85,7 +86,7 @@ public Task StartAsync(CancellationToken cancellationToken)
}
catch (Exception e)
{
logger.LogError(e.Message, e);
_logger.LogError(e.Message, e);
throw;
}
};
Expand All @@ -100,47 +101,49 @@ private async Task ConsumeChannelNotification<T>(object sender, BasicDeliverEven
{
var msg = ea.Body.ToArray();

if (options.DeDuplicationEnabled)
if (_options.DeDuplicationEnabled)
{
var hash = msg.GetHash(_hasher);
lock (_deduplicationcache)
if (_deduplicationcache.Contains(hash))
lock (_deDuplicationCache)
if (_deDuplicationCache.Contains(hash))
{
logger.LogDebug($"duplicated message received : {ea.Exchange}/{ea.RoutingKey}");
_logger.LogDebug($"duplicated message received : {ea.Exchange}/{ea.RoutingKey}");
return;
}

lock (_deduplicationcache)
_deduplicationcache.Add(hash);
lock (_deDuplicationCache)
_deDuplicationCache.Add(hash);

// Do not await this task
#pragma warning disable CS4014
Task.Run(async () =>
{
await Task.Delay(options.DeDuplicationTTL);
lock (_deduplicationcache)
_deduplicationcache.Remove(hash);
await Task.Delay(_options.DeDuplicationTTL);
lock (_deDuplicationCache)
_deDuplicationCache.Remove(hash);
});
#pragma warning restore CS4014
}

logger.LogDebug("Elaborating notification : {0}", Encoding.UTF8.GetString(msg));
var message = JsonConvert.DeserializeObject<T>(Encoding.UTF8.GetString(msg), options.SerializerSettings);
_logger.LogDebug("Elaborating notification : {0}", Encoding.UTF8.GetString(msg));
var message = JsonConvert.DeserializeObject<T>(Encoding.UTF8.GetString(msg), _options.SerializerSettings);

var replyProps = _channel.CreateBasicProperties();
replyProps.CorrelationId = ea.BasicProperties.CorrelationId;

var mediator = provider.Resolve<IMediator>();
try
{
IMediator mediator = null;
if (!_provider.BeginLifetimeScope(MatchingScopeLifetimeTags.RequestLifetimeScopeTag).TryResolve<IMediator>(out mediator))
mediator = _provider.BeginLifetimeScope().Resolve<IMediator>();

var arbitrer = mediator as ArbitredMediatr;
arbitrer?.StopPropagating();
await mediator.Publish(message);
arbitrer?.ResetPropagating();
}
catch (Exception ex)
{
logger.LogError(ex, $"Error executing message of type {typeof(T)} from external service");
_logger.LogError(ex, $"Error executing message of type {typeof(T)} from external service");
}
finally
{
Expand All @@ -150,25 +153,28 @@ private async Task ConsumeChannelNotification<T>(object sender, BasicDeliverEven
private async Task ConsumeChannelMessage<T>(object sender, BasicDeliverEventArgs ea)
{
var msg = ea.Body.ToArray();
logger.LogDebug("Elaborating message : {0}", Encoding.UTF8.GetString(msg));
var message = JsonConvert.DeserializeObject<T>(Encoding.UTF8.GetString(msg), options.SerializerSettings);
_logger.LogDebug("Elaborating message : {0}", Encoding.UTF8.GetString(msg));
var message = JsonConvert.DeserializeObject<T>(Encoding.UTF8.GetString(msg), _options.SerializerSettings);

var replyProps = _channel.CreateBasicProperties();
replyProps.CorrelationId = ea.BasicProperties.CorrelationId;
string responseMsg = null;
try
{
var mediator = provider.Resolve<IMediator>();
IMediator mediator = null;
if (!_provider.BeginLifetimeScope(MatchingScopeLifetimeTags.RequestLifetimeScopeTag).TryResolve<IMediator>(out mediator))
mediator = _provider.BeginLifetimeScope().Resolve<IMediator>();

var response = await mediator.Send(message);
responseMsg = JsonConvert.SerializeObject(new Messages.ResponseMessage { Content = response, Status = Messages.StatusEnum.Ok },
options.SerializerSettings);
logger.LogDebug("Elaborating sending response : {0}", responseMsg);
_options.SerializerSettings);
_logger.LogDebug("Elaborating sending response : {0}", responseMsg);
}
catch (Exception ex)
{
responseMsg = JsonConvert.SerializeObject(new Messages.ResponseMessage { Exception = ex, Status = Messages.StatusEnum.Exception },
options.SerializerSettings);
logger.LogError(ex, $"Error executing message of type {typeof(T)} from external service");
_options.SerializerSettings);
_logger.LogError(ex, $"Error executing message of type {typeof(T)} from external service");
}
finally
{
Expand All @@ -186,6 +192,7 @@ public Task StopAsync(CancellationToken cancellationToken)
}
catch
{
// ignored
}

try
Expand All @@ -194,6 +201,7 @@ public Task StopAsync(CancellationToken cancellationToken)
}
catch
{
// ignored
}

return Task.CompletedTask;
Expand Down
4 changes: 3 additions & 1 deletion Arbitrer.standard20.RabbitMQ/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private void InitConnection()
return JsonConvert.DeserializeObject<Messages.ResponseMessage<TResponse>>(result, options.SerializerSettings);
}

public async Task Notify<TRequest>(TRequest request, CancellationToken cancellationToken = default) where TRequest : INotification
public Task Notify<TRequest>(TRequest request, CancellationToken cancellationToken = default) where TRequest : INotification
{
var message = JsonConvert.SerializeObject(request, options.SerializerSettings);

Expand All @@ -119,6 +119,8 @@ public async Task Notify<TRequest>(TRequest request, CancellationToken cancellat
mandatory: false,
body: Encoding.UTF8.GetBytes(message)
);

return Task.CompletedTask;
}


Expand Down
4 changes: 2 additions & 2 deletions Arbitrer.standard20/pipelines/ArbitrerPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace Arbitrer.Pipelines
public class ArbitrerPipeline<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TRequest : IRequest<TResponse>
{
private readonly IArbitrer _arbitrer;
private readonly ILogger<ArbitrerPipeline<TRequest, TResponse>> _logger;
private readonly ILogger<Arbitrer> _logger;

public ArbitrerPipeline(IArbitrer arbitrer, ILogger<ArbitrerPipeline<TRequest, TResponse>> logger)
public ArbitrerPipeline(IArbitrer arbitrer, ILogger<Arbitrer> logger)
{
this._arbitrer = arbitrer;
_logger = logger;
Expand Down
2 changes: 2 additions & 0 deletions arbitrer.sln.DotSettings.user
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/UserDictionary/Words/=arbitrer/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>

0 comments on commit 394593f

Please sign in to comment.