Skip to content

Commit

Permalink
added locks, added unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bacherfl committed Jan 2, 2024
1 parent 8f783b3 commit 4b281a6
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 127 deletions.
5 changes: 3 additions & 2 deletions src/OpenFeature/Api.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,16 @@ public async Task Shutdown()
await this._repository.Shutdown().ConfigureAwait(false);
}

/// <inheritdoc />
public void AddHandler(ProviderEventTypes type, EventHandlerDelegate handler)
{
this.EventExecutor.AddApiLevelHandler(type, handler);
}

/// <inheritdoc />
public void RemoveHandler(ProviderEventTypes type, EventHandlerDelegate handler)
{
// TODO
throw new System.NotImplementedException();
this.EventExecutor.RemoveApiLevelHandler(type, handler);
}
}
}
146 changes: 87 additions & 59 deletions src/OpenFeature/EventExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,85 +11,109 @@ namespace OpenFeature
{
public class EventExecutor
{
private Mutex _mutex = new Mutex();
public readonly Channel<object> eventChannel = Channel.CreateBounded<object>(1);
private FeatureProviderReference _defaultProvider;
private readonly Dictionary<string, FeatureProviderReference> _namedProviderReferences = new Dictionary<string, FeatureProviderReference>();
private readonly List<FeatureProviderReference> _activeSubscriptions = new List<FeatureProviderReference>();
private readonly SemaphoreSlim _shutdownSemaphore = new SemaphoreSlim(0);

private readonly Dictionary<ProviderEventTypes, List<EventHandlerDelegate>> _apiLevelHandlers = new Dictionary<ProviderEventTypes, List<EventHandlerDelegate>>();
private readonly Dictionary<string, Dictionary<ProviderEventTypes, List<EventHandlerDelegate>>> _scopedApiHandlers = new Dictionary<string, Dictionary<ProviderEventTypes, List<EventHandlerDelegate>>>();
private readonly Dictionary<ProviderEventTypes, List<EventHandlerDelegate>> _apiHandlers = new Dictionary<ProviderEventTypes, List<EventHandlerDelegate>>();
private readonly Dictionary<string, Dictionary<ProviderEventTypes, List<EventHandlerDelegate>>> _clientHandlers = new Dictionary<string, Dictionary<ProviderEventTypes, List<EventHandlerDelegate>>>();

public EventExecutor()
{
this.ProcessEventAsync();

Check failure on line 26 in src/OpenFeature/EventExecutor.cs

View workflow job for this annotation

GitHub Actions / e2e-tests

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Check failure on line 26 in src/OpenFeature/EventExecutor.cs

View workflow job for this annotation

GitHub Actions / unit-tests-linux

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Check failure on line 26 in src/OpenFeature/EventExecutor.cs

View workflow job for this annotation

GitHub Actions / unit-tests-windows

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Check failure on line 26 in src/OpenFeature/EventExecutor.cs

View workflow job for this annotation

GitHub Actions / unit-tests-windows

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.
}

internal void AddApiLevelHandler(ProviderEventTypes eventType, EventHandlerDelegate handler)
{
if (!this._apiLevelHandlers.TryGetValue(eventType, out var eventHandlers))
this._mutex.WaitOne();
if (!this._apiHandlers.TryGetValue(eventType, out var eventHandlers))
{
eventHandlers = new List<EventHandlerDelegate>();
this._apiLevelHandlers[eventType] = eventHandlers;
this._apiHandlers[eventType] = eventHandlers;
}

eventHandlers.Add(handler);

this.EmitOnRegistration(this._defaultProvider, eventType, handler);
this._mutex.ReleaseMutex();
}
internal void RemoveGlobalHandler(ProviderEventTypes type, EventHandlerDelegate handler)

internal void RemoveApiLevelHandler(ProviderEventTypes type, EventHandlerDelegate handler)
{
if (this._apiLevelHandlers.TryGetValue(type, out var eventHandlers))
this._mutex.WaitOne();
if (this._apiHandlers.TryGetValue(type, out var eventHandlers))
{
eventHandlers.Remove(handler);
}
this._mutex.ReleaseMutex();
}
internal void AddNamedHandler(string client, ProviderEventTypes eventType, EventHandlerDelegate handler)

internal void AddClientHandler(string client, ProviderEventTypes eventType, EventHandlerDelegate handler)
{
this._mutex.WaitOne();
// check if there is already a list of handlers for the given client and event type
if (!this._scopedApiHandlers.TryGetValue(client, out var registry))
if (!this._clientHandlers.TryGetValue(client, out var registry))
{
registry = new Dictionary<ProviderEventTypes, List<EventHandlerDelegate>>();
this._scopedApiHandlers[client] = registry;
this._clientHandlers[client] = registry;
}

if (!this._scopedApiHandlers[client].TryGetValue(eventType, out var eventHandlers))
if (!this._clientHandlers[client].TryGetValue(eventType, out var eventHandlers))
{
eventHandlers = new List<EventHandlerDelegate>();
this._scopedApiHandlers[client][eventType] = eventHandlers;
this._clientHandlers[client][eventType] = eventHandlers;
}

this._scopedApiHandlers[client][eventType].Add(handler);
this._clientHandlers[client][eventType].Add(handler);

if (this._namedProviderReferences.TryGetValue(client, out var clientProviderReference))
{
this.EmitOnRegistration(clientProviderReference, eventType, handler);
}
this._mutex.ReleaseMutex();
}

internal void RemoveClientHandler(string client, ProviderEventTypes type, EventHandlerDelegate handler)
{
this._mutex.WaitOne();
if (this._clientHandlers.TryGetValue(client, out var clientEventHandlers))
{
if (clientEventHandlers.TryGetValue(type, out var eventHandlers))
{
eventHandlers.Remove(handler);
}
}
this._mutex.ReleaseMutex();
}

internal void RegisterDefaultFeatureProvider(FeatureProvider provider)
{
this._mutex.WaitOne();
var oldProvider = this._defaultProvider;

this._defaultProvider = new FeatureProviderReference(provider);

this.StartListeningAndShutdownOld(this._defaultProvider, oldProvider);
this._mutex.ReleaseMutex();
}

internal void RegisterClientFeatureProvider(string client, FeatureProvider provider)
{
this._mutex.WaitOne();
var newProvider = new FeatureProviderReference(provider);
FeatureProviderReference oldProvider = null;
if (this._namedProviderReferences.TryGetValue(client, out var foundOldProvider))
{
oldProvider = foundOldProvider;
}

this._namedProviderReferences.Add(client, newProvider);
this._namedProviderReferences[client] = newProvider;

this.StartListeningAndShutdownOld(newProvider, oldProvider);
this._mutex.ReleaseMutex();
}

private void StartListeningAndShutdownOld(FeatureProviderReference newProvider, FeatureProviderReference oldProvider)
Expand Down Expand Up @@ -141,19 +165,21 @@ private void EmitOnRegistration(FeatureProviderReference provider, ProviderEvent
if (status == ProviderStatus.Ready && eventType == ProviderEventTypes.PROVIDER_READY)
{
message = "Provider is ready";
} else if (status == ProviderStatus.Error && eventType == ProviderEventTypes.PROVIDER_ERROR)
}
else if (status == ProviderStatus.Error && eventType == ProviderEventTypes.PROVIDER_ERROR)
{
message = "Provider is in error state";
} else if (status == ProviderStatus.Stale && eventType == ProviderEventTypes.PROVIDER_STALE)
}
else if (status == ProviderStatus.Stale && eventType == ProviderEventTypes.PROVIDER_STALE)
{
message = "Provider is in stale state";
}

if (message != "")
{
handler.Invoke(new ProviderEventPayload
handler(new ProviderEventPayload
{
ProviderName = provider.ToString(),
ProviderName = provider.Provider.GetMetadata().Name,
Type = eventType,
Message = message,
});
Expand All @@ -169,7 +195,7 @@ private async Task ProcessFeatureProviderEventsAsync(FeatureProviderReference pr
switch (item)
{
case ProviderEventPayload eventPayload:
this.eventChannel.Writer.TryWrite(new Event{ Provider = providerRef, EventPayload = eventPayload });
this.eventChannel.Writer.TryWrite(new Event { Provider = providerRef, EventPayload = eventPayload });
break;
case ShutdownSignal _:
providerRef.ShutdownSemaphore.Release();
Expand All @@ -183,45 +209,47 @@ private async Task ProcessEventAsync()
{
while (true)
{
var item = await this.eventChannel.Reader.ReadAsync().ConfigureAwait(false);

switch (item)
{
case Event e:
if (this._apiLevelHandlers.TryGetValue(e.EventPayload.Type, out var eventHandlers))
{
foreach (var eventHandler in eventHandlers)
{
eventHandler.Invoke(e.EventPayload);
}
}

// look for client handlers and call invoke method there
foreach (var keyAndValue in this._namedProviderReferences)
{
if (keyAndValue.Value == e.Provider)
{
if (this._scopedApiHandlers.TryGetValue(keyAndValue.Key, out var clientRegistry))
{
if (clientRegistry.TryGetValue(e.EventPayload.Type, out var clientEventHandlers))
{
foreach (var eventHandler in clientEventHandlers)
{
eventHandler.Invoke(e.EventPayload);
}
}
}
}
}
break;
case ShutdownSignal _:
this._shutdownSemaphore.Release();
return;
}

var item = await this.eventChannel.Reader.ReadAsync().ConfigureAwait(false);

switch (item)
{
case Event e:
this._mutex.WaitOne();
if (this._apiHandlers.TryGetValue(e.EventPayload.Type, out var eventHandlers))
{
foreach (var eventHandler in eventHandlers)
{
eventHandler.Invoke(e.EventPayload);
}
}

// look for client handlers and call invoke method there
foreach (var keyAndValue in this._namedProviderReferences)
{
if (keyAndValue.Value == e.Provider)
{
if (this._clientHandlers.TryGetValue(keyAndValue.Key, out var clientRegistry))
{
if (clientRegistry.TryGetValue(e.EventPayload.Type, out var clientEventHandlers))
{
foreach (var eventHandler in clientEventHandlers)
{
eventHandler.Invoke(e.EventPayload);
}
}
}
}
}
this._mutex.ReleaseMutex();
break;
case ShutdownSignal _:
this._shutdownSemaphore.Release();
return;
}

}
}

// Method to signal shutdown
public async Task SignalShutdownAsync()
{
Expand Down
2 changes: 1 addition & 1 deletion src/OpenFeature/FeatureProvider.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using System.Collections.Immutable;
using System.Threading.Channels;
using System.Threading.Tasks;
using OpenFeature.Constant;
using OpenFeature.Model;
using System.Threading.Channels;

namespace OpenFeature
{
Expand Down
17 changes: 14 additions & 3 deletions src/OpenFeature/IEventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,19 @@

namespace OpenFeature
{
public interface IEventBus {
public interface IEventBus
{
/// <summary>
/// Adds an Event Handler for the given event type.
/// </summary>
/// <param name="type">The type of the event</param>
/// <param name="handler">Implementation of the <see cref="EventHandlerDelegate"/></param>
void AddHandler(ProviderEventTypes type, EventHandlerDelegate handler);
/// <summary>
/// Removes an Event Handler for the given event type.
/// </summary>
/// <param name="type">The type of the event</param>
/// <param name="handler">Implementation of the <see cref="EventHandlerDelegate"/></param>
void RemoveHandler(ProviderEventTypes type, EventHandlerDelegate handler);
}
}
}
}
12 changes: 1 addition & 11 deletions src/OpenFeature/IFeatureClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace OpenFeature
/// <summary>
/// Interface used to resolve flags of varying types.
/// </summary>
public interface IFeatureClient
public interface IFeatureClient : IEventBus
{
/// <summary>
/// Appends hooks to client
Expand All @@ -19,16 +19,6 @@ public interface IFeatureClient
/// <param name="hooks">A list of Hooks that implement the <see cref="Hook"/> interface</param>
void AddHooks(IEnumerable<Hook> hooks);

/// <summary>
/// Adds an Event Handler for the client
/// <para>
/// The appending operation will be atomic.
/// </para>
/// </summary>
/// <param name="eventType">The event type</param>
/// <param name="handler">An object that implements the <see cref="EventHandlerDelegate"/> interface</param>
void AddHandler(ProviderEventTypes eventType, EventHandlerDelegate handler);

/// <summary>
/// Enumerates the global hooks.
/// <para>
Expand Down
2 changes: 1 addition & 1 deletion src/OpenFeature/Model/ProviderEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class ProviderEventPayload
/// Name of the provider
/// </summary>
public string ProviderName { get; set; }
public ProviderEventTypes Type {get; set; }
public ProviderEventTypes Type { get; set; }
public string Message { get; set; }
public List<string> FlagChanges { get; set; }
public Dictionary<string, object> EventMetadata { get; set; }
Expand Down
17 changes: 8 additions & 9 deletions src/OpenFeature/OpenFeatureClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,16 @@ public FeatureClient(string name, string version, ILogger logger = null, Evaluat
/// <param name="hook">Hook that implements the <see cref="Hook"/> interface</param>
public void AddHooks(Hook hook) => this._hooks.Push(hook);

/// <summary>
/// Adds an Event Handler for the client
/// <para>
/// The appending operation will be atomic.
/// </para>
/// </summary>
/// <param name="eventType">The event type</param>
/// <param name="handler">An object that implements the <see cref="EventHandlerDelegate"/> interface</param>
/// <inheritdoc />
public void AddHandler(ProviderEventTypes eventType, EventHandlerDelegate handler)
{
Api.Instance.EventExecutor.AddNamedHandler(this._metadata.Name, eventType, handler);
Api.Instance.EventExecutor.AddClientHandler(this._metadata.Name, eventType, handler);
}

/// <inheritdoc />
public void RemoveHandler(ProviderEventTypes type, EventHandlerDelegate handler)
{
Api.Instance.EventExecutor.RemoveClientHandler(this._metadata.Name, type, handler);
}

/// <inheritdoc />
Expand Down
Loading

0 comments on commit 4b281a6

Please sign in to comment.