Skip to content

Commit

Permalink
Improve robustness for notifications about arrived time
Browse files Browse the repository at this point in the history
+ Use fewer threads: Use a single timer instead of starting a new task for each request.
+ Also, improve some names in the implementation.
  • Loading branch information
Viir committed Jun 21, 2020
1 parent 2545325 commit ab39fbe
Showing 3 changed files with 84 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -2,6 +2,6 @@ namespace Kalmit.PersistentProcess.WebHost
{
public class Program
{
static public string AppVersionId => "2020-06-20";
static public string AppVersionId => "2020-06-21";
}
}
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@ public class StartupPublicApp
{
private readonly ILogger<StartupPublicApp> _logger;

static TimeSpan notifyTimeHasArrivedMaximumDistance => TimeSpan.FromSeconds(10);

public StartupPublicApp(ILogger<StartupPublicApp> logger)
{
_logger = logger;
@@ -80,11 +82,11 @@ public void Configure(
throw new Exception("Missing reference to the web app config.");
}

var tasksCancellationTokenSource = new System.Threading.CancellationTokenSource();
var applicationStoppingCancellationTokenSource = new System.Threading.CancellationTokenSource();

appLifetime.ApplicationStopping.Register(() =>
{
tasksCancellationTokenSource.Cancel();
applicationStoppingCancellationTokenSource.Cancel();
_logger?.LogInformation("Public app noticed ApplicationStopping.");
});

@@ -97,6 +99,18 @@ public void Configure(

var volatileHosts = new ConcurrentDictionary<string, VolatileHost>();

var appTaskCompleteHttpResponse = new ConcurrentDictionary<string, InterfaceToHost.HttpResponse>();

System.Threading.Timer notifyTimeHasArrivedTimer = null;
DateTimeOffset? lastAppEventTimeHasArrived = null;
InterfaceToHost.NotifyWhenArrivedAtTimeRequestStructure nextTimeToNotify = null;
var nextTimeToNotifyLock = new object();

/*
2020-06-21 TODO: Remove temporary flag 'appAskedToBeNotifiedWhenTimeArrived' when apps have been migrated.
*/
bool appEverAskedToBeNotifiedWhenTimeArrived = false;

InterfaceToHost.Result<InterfaceToHost.TaskResult.RequestToVolatileHostError, InterfaceToHost.TaskResult.RequestToVolatileHostComplete>
performProcessTaskRequestToVolatileHost(
InterfaceToHost.Task.RequestToVolatileHostStructure requestToVolatileHost)
@@ -204,13 +218,9 @@ void performProcessTaskAndFeedbackEvent(InterfaceToHost.StartTask taskWithId)
processEventAndResultingRequests(interfaceEvent);
}

var processRequestCompleteHttpResponse = new ConcurrentDictionary<string, InterfaceToHost.HttpResponse>();

InterfaceToHost.NotifyWhenArrivedAtTimeRequestStructure nextTimeToNotify = null;

void processEventAndResultingRequests(InterfaceToHost.AppEventStructure interfaceEvent)
{
if (tasksCancellationTokenSource.IsCancellationRequested)
if (applicationStoppingCancellationTokenSource.IsCancellationRequested)
return;

var serializedInterfaceEvent = Newtonsoft.Json.JsonConvert.SerializeObject(interfaceEvent, jsonSerializerSettings);
@@ -257,35 +267,13 @@ void processEventAndResultingRequests(InterfaceToHost.AppEventStructure interfac

if (structuredResponse.DecodeEventSuccess.notifyWhenArrivedAtTime != null)
{
appEverAskedToBeNotifiedWhenTimeArrived = true;

System.Threading.Tasks.Task.Run(() =>
{
if (tasksCancellationTokenSource.IsCancellationRequested)
return;

nextTimeToNotify = structuredResponse.DecodeEventSuccess.notifyWhenArrivedAtTime;

while (!tasksCancellationTokenSource.IsCancellationRequested)
lock (nextTimeToNotifyLock)
{
if (nextTimeToNotify != structuredResponse.DecodeEventSuccess.notifyWhenArrivedAtTime)
return;

var remainingDelayMilliseconds =
structuredResponse.DecodeEventSuccess.notifyWhenArrivedAtTime.posixTimeMilli -
getDateTimeOffset().ToUnixTimeMilliseconds();

if (remainingDelayMilliseconds <= 0)
{
processEventAndResultingRequests(new InterfaceToHost.AppEventStructure
{
ArrivedAtTimeEvent = new InterfaceToHost.ArrivedAtTimeEventStructure
{
posixTimeMilli = getDateTimeOffset().ToUnixTimeMilliseconds()
}
});
return;
}

System.Threading.Thread.Sleep(10);
nextTimeToNotify = structuredResponse.DecodeEventSuccess.notifyWhenArrivedAtTime;
}
});
}
@@ -297,11 +285,69 @@ void processEventAndResultingRequests(InterfaceToHost.AppEventStructure interfac

foreach (var completeHttpResponse in structuredResponse.DecodeEventSuccess.completeHttpResponses)
{
processRequestCompleteHttpResponse[completeHttpResponse.httpRequestId] =
appTaskCompleteHttpResponse[completeHttpResponse.httpRequestId] =
completeHttpResponse.response;
}
}

void processEventTimeHasArrived()
{
var currentTime = getDateTimeOffset();

lastAppEventTimeHasArrived = currentTime;

processEventAndResultingRequests(new InterfaceToHost.AppEventStructure
{
ArrivedAtTimeEvent = new InterfaceToHost.ArrivedAtTimeEventStructure
{
posixTimeMilli = currentTime.ToUnixTimeMilliseconds()
}
});
}

notifyTimeHasArrivedTimer = new System.Threading.Timer(
callback: _ =>
{
if (applicationStoppingCancellationTokenSource.IsCancellationRequested)
{
notifyTimeHasArrivedTimer?.Dispose();
return;
}

lock (nextTimeToNotifyLock)
{
if (applicationStoppingCancellationTokenSource.IsCancellationRequested)
{
notifyTimeHasArrivedTimer?.Dispose();
return;
}

var localNextTimeToNotify = nextTimeToNotify;

if (localNextTimeToNotify != null && localNextTimeToNotify.posixTimeMilli <= getDateTimeOffset().ToUnixTimeMilliseconds())
{
nextTimeToNotify = null;
processEventTimeHasArrived();
return;
}
}

if (!appEverAskedToBeNotifiedWhenTimeArrived)
return;

if (lastAppEventTimeHasArrived.HasValue
?
notifyTimeHasArrivedMaximumDistance <= (getDateTimeOffset() - lastAppEventTimeHasArrived.Value)
:
true)
{
processEventTimeHasArrived();
}
},
state: null,
dueTime: TimeSpan.Zero,
period: TimeSpan.FromMilliseconds(10));

app
.Use(async (context, next) => await Asp.MiddlewareFromWebAppConfig(webAppAndElmAppConfig.WebAppConfiguration, context, next))
.Run(async (context) =>
@@ -328,7 +374,7 @@ void processEventAndResultingRequests(InterfaceToHost.AppEventStructure interfac

while (true)
{
if (processRequestCompleteHttpResponse.TryRemove(httpRequestId, out var httpResponse))
if (appTaskCompleteHttpResponse.TryRemove(httpRequestId, out var httpResponse))
{
var headerContentType =
httpResponse.headersToAdd
4 changes: 2 additions & 2 deletions implement/elm-fullstack/elm-fullstack.csproj
Original file line number Diff line number Diff line change
@@ -5,8 +5,8 @@
<TargetFramework>netcoreapp3.1</TargetFramework>
<RootNamespace>elm_fullstack</RootNamespace>
<AssemblyName>elm-fullstack</AssemblyName>
<AssemblyVersion>2020.0620.0.0</AssemblyVersion>
<FileVersion>2020.0620.0.0</FileVersion>
<AssemblyVersion>2020.0621.0.0</AssemblyVersion>
<FileVersion>2020.0621.0.0</FileVersion>
</PropertyGroup>

<PropertyGroup>

0 comments on commit ab39fbe

Please sign in to comment.