Skip to content

Commit

Permalink
Fixed the concurrency issue
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyzimarev committed May 18, 2022
1 parent 6d06b97 commit c08d964
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 132 deletions.
5 changes: 4 additions & 1 deletion src/Eventuous.Connector.Base/Diag/ExporterMappings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public ExporterMappings<T> Add(string name, Action<T> configure) {
public void RegisterExporters(T provider, string[]? exporters) {
var name = typeof(T).Name;
if (exporters == null) {
Log.Information("No exporters specified for {name}", name);
Log.Warning("No exporters for {name} available", name);
return;
}

Expand All @@ -22,6 +22,9 @@ public void RegisterExporters(T provider, string[]? exporters) {
Log.Information("Adding exporter {exporter} for {name}", exporter, name);
addExporter(provider);
}
else {
Log.Information("No exporters specified for {exporter}", exporter);
}
}
}
}
75 changes: 26 additions & 49 deletions src/Eventuous.Connector.Base/Grpc/GrpcFilter.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
using System.Diagnostics;
using System.Text;
using Eventuous.Diagnostics;
using Eventuous.Subscriptions.Context;
using Eventuous.Subscriptions.Filters;
using Google.Protobuf.WellKnownTypes;
Expand All @@ -9,58 +6,38 @@
namespace Eventuous.Connector.Base.Grpc;

public sealed class GrpcProjectionFilter : ConsumeFilter<DelayedAckConsumeContext>, IAsyncDisposable {
readonly Projector _projector;

public GrpcProjectionFilter(string host, ChannelCredentials credentials) {
_projector = new Projector(host, credentials, Handler);
_projector.Run(default);
}

async Task Handler(ProjectionResponse result, CancellationToken cancellationToken) {
var ctx = _contexts.Single(x => x.Context.MessageId == result.EventId);

using var activity = Start();
_contexts.Remove(ctx);
await ctx.Next!(ctx.Context.WithItem("projectionResult", result));

Activity? Start()
=> ctx.TraceId == null || ctx.SpanId == null ? null
: EventuousDiagnostics.ActivitySource.StartActivity(
ActivityKind.Producer,
new ActivityContext(
ctx.TraceId.Value,
ctx.SpanId.Value,
ActivityTraceFlags.Recorded
)
);
readonly Projector[] _projectors;
readonly GrpcResponseHandler[] _responseHandlers;

public GrpcProjectionFilter(string host, ChannelCredentials credentials, int partitionCount) {
_projectors = new Projector[partitionCount];
_responseHandlers = new GrpcResponseHandler[partitionCount];

for (var i = 0; i < partitionCount; i++) {
var responseHandler = new GrpcResponseHandler();
var projector = new Projector(host, credentials, responseHandler.Handler);
projector.Run(default);
_projectors[i] = projector;
_responseHandlers[i] = responseHandler;
}
}

public override async ValueTask Send(
DelayedAckConsumeContext context,
Func<DelayedAckConsumeContext, ValueTask>? next
) {
var activity = context.Items.TryGetItem<Activity>("activity");
_contexts.Add(new LocalContext(context, next, activity?.Context.TraceId, activity?.Context.SpanId));
var json = Encoding.UTF8.GetString((context.Message as byte[])!);

await _projector.Project(
new ProjectionRequest {
Stream = context.Stream,
EventType = context.MessageType,
EventId = context.MessageId,
EventPayload = Struct.Parser.ParseJson(json)
}
);
var json = _responseHandlers[context.PartitionId].Prepare(context, next);

await _projectors[context.PartitionId]
.Project(
new ProjectionRequest {
Stream = context.Stream,
EventType = context.MessageType,
EventId = context.MessageId,
EventPayload = Struct.Parser.ParseJson(json)
}
);
}

public ValueTask DisposeAsync() => _projector.DisposeAsync();

record LocalContext(
DelayedAckConsumeContext Context,
Func<DelayedAckConsumeContext, ValueTask>? Next,
ActivityTraceId? TraceId,
ActivitySpanId? SpanId
);

readonly List<LocalContext> _contexts = new();
public async ValueTask DisposeAsync() => await _projectors.Select(x => x.DisposeAsync()).WhenAll();
}
46 changes: 46 additions & 0 deletions src/Eventuous.Connector.Base/Grpc/GrpcResponseHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Diagnostics;
using System.Text;
using Eventuous.Diagnostics;
using Eventuous.Subscriptions.Context;

namespace Eventuous.Connector.Base.Grpc;

public class GrpcResponseHandler {

readonly List<LocalContext> _contexts = new();

public string Prepare(
DelayedAckConsumeContext context,
Func<DelayedAckConsumeContext, ValueTask>? next
) {
var activity = context.Items.TryGetItem<Activity>("activity");
_contexts.Add(new LocalContext(context, next, activity?.Context.TraceId, activity?.Context.SpanId));
return Encoding.UTF8.GetString((context.Message as byte[])!);
}

public async Task Handler(ProjectionResponse result, CancellationToken cancellationToken) {
var ctx = _contexts.Single(x => x.Context.MessageId == result.EventId);

using var activity = Start();
_contexts.Remove(ctx);
await ctx.Next!(ctx.Context.WithItem("projectionResult", result));

Activity? Start()
=> ctx.TraceId == null || ctx.SpanId == null ? null
: EventuousDiagnostics.ActivitySource.StartActivity(
ActivityKind.Producer,
new ActivityContext(
ctx.TraceId.Value,
ctx.SpanId.Value,
ActivityTraceFlags.Recorded
)
);
}

record LocalContext(
DelayedAckConsumeContext Context,
Func<DelayedAckConsumeContext, ValueTask>? Next,
ActivityTraceId? TraceId,
ActivitySpanId? SpanId
);
}
16 changes: 11 additions & 5 deletions src/Eventuous.Connector.Base/Grpc/Projector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace Eventuous.Connector.Base.Grpc;

public sealed class Projector : IAsyncDisposable{
public sealed class Projector : IAsyncDisposable {
readonly MethodConfig _defaultMethodConfig = new() {
Names = { MethodName.Default },
RetryPolicy = new RetryPolicy {
Expand All @@ -17,7 +17,7 @@ public sealed class Projector : IAsyncDisposable{
}
};

readonly Projection.ProjectionClient _client;
readonly Projection.ProjectionClient _client;
readonly Func<ProjectionResponse, CancellationToken, Task> _handler;

Task? _readTask;
Expand All @@ -26,7 +26,11 @@ public sealed class Projector : IAsyncDisposable{

AsyncDuplexStreamingCall<ProjectionRequest, ProjectionResponse>? _call;

public Projector(string host, ChannelCredentials credentials, Func<ProjectionResponse, CancellationToken, Task> handler) {
public Projector(
string host,
ChannelCredentials credentials,
Func<ProjectionResponse, CancellationToken, Task> handler
) {
_handler = handler;

var channel = GrpcChannel.ForAddress(
Expand All @@ -47,6 +51,8 @@ public void Run(CancellationToken cancellationToken) {

_call = _client.Project(cancellationToken: linked.Token);

_call.RequestStream.WriteOptions = new WriteOptions(WriteFlags.BufferHint);

async Task HandleResponses() {
Log.Information("[Grpc] Subscribing...");

Expand All @@ -64,7 +70,7 @@ async Task Resubscribe() {
if (_disposing) {
return;
}

_cts?.Cancel();
_call?.Dispose();

Expand Down Expand Up @@ -103,7 +109,7 @@ async Task<ProjectResult> ProjectInternal() {
await _call!.RequestStream.WriteAsync(projectionContext);
return ProjectResult.Ok;
}
catch (InvalidOperationException e)
catch (InvalidOperationException e)
when (e.Message.Contains("previous write is in progress")) {
// TODO: this is a hack, it needs to open multiple streams for concurrent projectors
Log.Warning("[Grpc] Write already in progress");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
namespace Eventuous.Connector.Base.Tools;

public static class SubscriptionBuilderExtensions {
public static void AddGrpcProjector(this SubscriptionBuilder builder, GrpcProjectorSettings? settings)
public static void AddGrpcProjector(this SubscriptionBuilder builder, GrpcProjectorSettings? settings, uint partitionCount)
=> builder.AddConsumeFilterLast(
new GrpcProjectionFilter(settings.GetHost(), settings.GetCredentials())
new GrpcProjectionFilter(settings.GetHost(), settings.GetCredentials(), (int)partitionCount)
);
}
2 changes: 1 addition & 1 deletion src/Eventuous.Connector.EsdbElastic/ConnectorStartup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ static ConnectorBuilder<
b => {
b.UseCheckpointStore<ElasticCheckpointStore>();
b.WithPartitioningByStream(concurrencyLimit);
b.AddGrpcProjector(config.Grpc);
b.AddGrpcProjector(config.Grpc, concurrencyLimit);
}
);

Expand Down
2 changes: 1 addition & 1 deletion src/Eventuous.Connector.EsdbMongo/ConnectorStartup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ static ConnectorBuilder<
b => {
b.UseCheckpointStore<MongoCheckpointStore>();
b.WithPartitioningByStream(concurrencyLimit);
b.AddGrpcProjector(config.Grpc);
b.AddGrpcProjector(config.Grpc, concurrencyLimit);
}
);

Expand Down
2 changes: 1 addition & 1 deletion src/Eventuous.Connector.EsdbSqlServer/ConnectorStartup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ static ConnectorBuilder<
b => {
b.UseCheckpointStore<SqlCheckpointStore>();
b.WithPartitioningByStream(concurrencyLimit);
b.AddGrpcProjector(config.Grpc);
b.AddGrpcProjector(config.Grpc, concurrencyLimit);
}
);

Expand Down
4 changes: 1 addition & 3 deletions src/Eventuous.Connector/Eventuous.Connector.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
</ItemGroup>

<ItemGroup Condition="$(Configuration) == 'Debug'">
<None Update="config.yaml">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="config.yaml" CopyToOutputDirectory="Always" CopyToPublishDirectory="Never" />
</ItemGroup>
</Project>
3 changes: 1 addition & 2 deletions src/Eventuous.Connector/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
.Add("prometheus", b => b.AddPrometheusExporter())
.Add("otlp", b => b.AddOtlpExporter());

using var app = new StartupBuilder()
.Configure("config.yaml", args)
var app = new StartupBuilder("config.yaml", args)
.BuildApplication(tracingExporters, metricsExporters);

return await app.Run();
49 changes: 21 additions & 28 deletions src/Eventuous.Connector/StartupBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,39 @@
using OpenTelemetry.Trace;
using Serilog;
using Serilog.Core;
using ILogger = Serilog.ILogger;

namespace Eventuous.Connector;

sealed class StartupBuilder : IDisposable {
ConnectorConfig? _config;
ConnectorApp? _app;
string? _configFile;
sealed class StartupBuilder {
readonly ConnectorConfig? _config;
ConnectorApp? _app;
readonly string? _configFile;

readonly Logger _log;

public StartupBuilder() {
var env = new StartupEnvironment();
_log = Logging.GetLogger(env);
}

public StartupBuilder Configure(string configFile, string[] args) {
_log.Information("Configuring connector using config file {configFile}", configFile);
static readonly ILogger Log = Serilog.Log.ForContext<StartupBuilder>();

public StartupBuilder(string configFile, string[] args) {
Serilog.Log.Logger = Logging.GetLogger(new StartupEnvironment());
Log.Information("Configuring connector using config file {configFile}", configFile);

_configFile = configFile;
var hostBuilder = Host.CreateDefaultBuilder(args);
hostBuilder.ConfigureHostConfiguration(c => c.AddYamlFile(configFile));
using var tempHost = hostBuilder.Build();
_config = tempHost.Services.GetService<IConfiguration>().Get<ConnectorConfig>();

if (string.IsNullOrWhiteSpace(_config.Connector.ConnectorAssembly)) {
_log.Fatal($"Connector assembly must be specified in {configFile}");
Log.Fatal($"Connector assembly must be specified in {configFile}");
throw new ApplicationException();
}

return this;
}

public StartupBuilder BuildApplication(
ExporterMappings<TracerProviderBuilder> tracingExporters,
ExporterMappings<MeterProviderBuilder> metricsExporters
) {
if (_config == null) {
_log.Fatal("Call Configure() first");
Log.Fatal("Call Configure() first");
throw new ApplicationException();
}

Expand All @@ -56,19 +51,19 @@ ExporterMappings<MeterProviderBuilder> metricsExporters
? _config.Connector.ConnectorAssembly
: _config.Connector.ConnectorAssembly + ".dll";

_log.Information("Loading connector assembly {assemblyFileName}", assemblyFileName);
Log.Information("Loading connector assembly {assemblyFileName}", assemblyFileName);

var assembly = Assembly.LoadFrom(Path.Join(path, assemblyFileName));
var startup = assembly.GetTypes().FirstOrDefault(x => x.IsAssignableTo(typeof(IConnectorStartup)));

if (startup == null) {
_log.Fatal("Connector assembly must have an implementation of IConnectorStartup");
Log.Fatal("Connector assembly must have an implementation of IConnectorStartup");
throw new ApplicationException();
}

var startupInstance = Activator.CreateInstance(startup) as IConnectorStartup;

_log.Information("Building connector application");
Log.Information("Building connector application");

_app = startupInstance!.BuildConnectorApp(_configFile!, tracingExporters, metricsExporters);

Expand All @@ -77,38 +72,36 @@ ExporterMappings<MeterProviderBuilder> metricsExporters

public async Task<int> Run() {
if (_config == null) {
_log.Fatal("Call Configure() first");
Log.Fatal("Call Configure() first");
throw new ApplicationException();
}

if (_app == null) {
_log.Fatal("Call BuildApplication() first");
Log.Fatal("Call BuildApplication() first");
throw new ApplicationException();
}

if (_config.Connector.Diagnostics.Enabled && _config.Connector.Diagnostics.Metrics?.Enabled == true
&& _config.Connector.Diagnostics.Metrics.Exporters?.Any(
x => x == "prometheus"
) == true) {
_log.Information("Adding Prometheus metrics exporter");
Log.Information("Adding Prometheus metrics exporter");
_app.Host.UseOpenTelemetryPrometheusScrapingEndpoint();
}

_app.Host.AddEventuousLogs();
_app.Host.MapGet("ping", ctx => ctx.Response.WriteAsync("pong"));
_app.Host.MapHealthChecks("/health");

_log.Information("Starting connector application");
Log.Information("Starting connector application");

try {
return await _app.Run();
}
catch (Exception e) {
_log.Fatal(e, "Connector application failed");
Log.CloseAndFlush();
Log.Fatal(e, "Connector application failed");
Serilog.Log.CloseAndFlush();
return -1;
}
}

public void Dispose() => _log.Dispose();
}
Loading

0 comments on commit c08d964

Please sign in to comment.