Skip to content

Commit

Permalink
Make migrations atomic
Browse files Browse the repository at this point in the history
Do not process other events between reading the previous state and writing the migrated state.
  • Loading branch information
Viir committed Apr 3, 2020
1 parent e526f44 commit 6d4a7db
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 54 deletions.
12 changes: 11 additions & 1 deletion implement/PersistentProcess/PersistentProcess.WebHost/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,14 @@ public void ConfigureServices(IServiceCollection services)
return ImmutableList.Create(directoryName, directoryName + "T" + time.ToString("HH") + ".composition.jsonl");
});

var persistentProcessMap =
serviceProvider.GetService<PersistentProcessMap>()?.mapPersistentProcess
??
new Func<IPersistentProcess, IPersistentProcess>(persistentProcess => persistentProcess);

services.AddSingleton<ProcessStore.IProcessStoreReader>(processStore);
services.AddSingleton<ProcessStore.IProcessStoreWriter>(processStore);
services.AddSingleton<IPersistentProcess>(BuildPersistentProcess);
services.AddSingleton<IPersistentProcess>(serviceProvider => persistentProcessMap(BuildPersistentProcess(serviceProvider)));

var letsEncryptOptions = webAppConfigObject?.JsonStructure?.letsEncryptOptions;
if (letsEncryptOptions == null)
Expand Down Expand Up @@ -420,4 +425,9 @@ static InterfaceToHost.HttpRequestEvent AsPersistentProcessInterfaceHttpRequestE
};
}
}

public class PersistentProcessMap
{
public Func<IPersistentProcess, IPersistentProcess> mapPersistentProcess;
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using Kalmit.ProcessStore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.IO;
using System.Linq;
Expand Down Expand Up @@ -47,6 +49,13 @@ public void ConfigureServices(IServiceCollection services)
}
}

class PublicHostConfiguration
{
public SyncPersistentProcess syncPersistentProcess;

public IWebHost webHost;
}

public void Configure(
IApplicationBuilder app,
IWebHostEnvironment env,
Expand All @@ -71,17 +80,17 @@ public void Configure(

object publicAppLock = new object();

IWebHost publicAppWebHost = null;
PublicHostConfiguration publicAppHost = null;

void stopPublicApp()
{
lock (publicAppLock)
{
if (publicAppWebHost != null)
if (publicAppHost != null)
{
publicAppWebHost.StopAsync(TimeSpan.FromSeconds(10)).Wait();
publicAppWebHost.Dispose();
publicAppWebHost = null;
publicAppHost?.webHost.StopAsync(TimeSpan.FromSeconds(10)).Wait();
publicAppHost?.webHost.Dispose();
publicAppHost = null;
}
}
}
Expand Down Expand Up @@ -113,16 +122,27 @@ void startPublicApp(byte[] webAppConfigZipArchive, bool elmAppInitState)
elmAppProcessStore.DeleteFile(filePath);
}

var newPublicAppConfig = new PublicHostConfiguration { };

var webHost =
Program.CreateWebHostBuilder(null, overrideDefaultUrls: publicWebHostUrls)
.WithSettingAdminRootPassword(rootPassword)
.WithSettingDateTimeOffsetDelegate(getDateTimeOffset)
.WithWebAppConfigurationZipArchive(webAppConfigZipArchive)
.WithProcessStoreFileStore(elmAppProcessStore)
.ConfigureServices(services => services.AddSingleton(new PersistentProcessMap
{
mapPersistentProcess = originalPersistentProcess =>
{
return newPublicAppConfig.syncPersistentProcess = new SyncPersistentProcess(originalPersistentProcess);
}
}))
.Build();

newPublicAppConfig.webHost = webHost;

webHost.StartAsync(appLifetime.ApplicationStopping).Wait();
publicAppWebHost = webHost;
publicAppHost = newPublicAppConfig;
}
}

Expand Down Expand Up @@ -267,58 +287,43 @@ byte[] getPublicAppConfigFileFromStore() =>

if (prepareMigrateResult?.Ok == null)
{
if (publicAppWebHost == null)
if (publicAppHost == null)
{
context.Response.StatusCode = 400;
await context.Response.WriteAsync("Failed to prepare migration with this Elm app:\n" + prepareMigrateResult?.Err);
return;
}
}

if (publicAppWebHost == null)
if (publicAppHost == null)
{
context.Response.StatusCode = 400;
await context.Response.WriteAsync("Migration not possible because there is no app (state).");
return;
}

var publicAppWebHostAddresses =
publicAppWebHost.ServerFeatures.Get<Microsoft.AspNetCore.Hosting.Server.Features.IServerAddressesFeature>().Addresses;

string elmAppStateSerializedBefore = null;

var httpPathToProcessState = Configuration.AdminPath + Configuration.ApiPersistentProcessStatePath;

System.Net.Http.HttpClient createPublicHostClientWithAuthorizationHeader()
if (publicAppHost.syncPersistentProcess == null)
{
var adminClient = new System.Net.Http.HttpClient()
{
BaseAddress = new Uri(publicAppWebHostAddresses.First())
};

adminClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue(
"Basic",
Convert.ToBase64String(System.Text.Encoding.UTF8.GetBytes(
WebHost.Configuration.BasicAuthenticationForAdminRoot(rootPassword))));

return adminClient;
context.Response.StatusCode = 500;
await context.Response.WriteAsync("syncPersistentProcess == null");
return;
}

using (var publicAppClient = createPublicHostClientWithAuthorizationHeader())
Result<string, string> attemptMigrateResult = null;

publicAppHost.syncPersistentProcess.RunInLock(persistentProcess =>
{
var getStateResponse = publicAppClient.GetAsync(httpPathToProcessState).Result;
var elmAppStateSerializedBefore = persistentProcess.ReductionRecordForCurrentState().ReducedValueLiteralString;

elmAppStateSerializedBefore = getStateResponse.Content.ReadAsStringAsync().Result;
}
attemptMigrateResult = prepareMigrateResult.Ok(elmAppStateSerializedBefore);

if (!(0 < elmAppStateSerializedBefore?.Length))
{
context.Response.StatusCode = 500;
await context.Response.WriteAsync("Failed to read the current app state.");
return;
}
if (attemptMigrateResult?.Ok == null)
return;

// TODO: Add write event to history.

var attemptMigrateResult = prepareMigrateResult.Ok(elmAppStateSerializedBefore);
persistentProcess.SetState(attemptMigrateResult.Ok);
});

if (attemptMigrateResult?.Ok == null)
{
Expand All @@ -327,21 +332,6 @@ System.Net.Http.HttpClient createPublicHostClientWithAuthorizationHeader()
return;
}

using (var publicAppClient = createPublicHostClientWithAuthorizationHeader())
{
var setStateResponse = publicAppClient.PostAsync(
httpPathToProcessState, new System.Net.Http.StringContent(attemptMigrateResult.Ok)).Result;

if (!setStateResponse.IsSuccessStatusCode)
{
context.Response.StatusCode = 500;
await context.Response.WriteAsync(
"Failed to set the migrated state:\nstatus code: " + setStateResponse.StatusCode +
"\nContent:\n" + setStateResponse.Content?.ReadAsStringAsync().Result);
return;
}
}

context.Response.StatusCode = 200;
await context.Response.WriteAsync("Completed migration.");
return;
Expand Down Expand Up @@ -564,4 +554,48 @@ Ok valueToEncodeOk ->
}
}
}

public class SyncPersistentProcess : IPersistentProcess
{
readonly object @lock = new object();

readonly IPersistentProcess persistentProcess;

public SyncPersistentProcess(IPersistentProcess persistentProcess)
{
this.persistentProcess = persistentProcess;
}

public void RunInLock(Action<IPersistentProcess> action)
{
lock (@lock)
{
action(persistentProcess);
}
}

public (IReadOnlyList<string> responses, (byte[] serializedCompositionRecord, byte[] serializedCompositionRecordHash)) ProcessEvents(IReadOnlyList<string> serializedEvents)
{
lock (@lock)
{
return persistentProcess.ProcessEvents(serializedEvents);
}
}

public ReductionRecord ReductionRecordForCurrentState()
{
lock (@lock)
{
return persistentProcess.ReductionRecordForCurrentState();
}
}

public (byte[] serializedCompositionRecord, byte[] serializedCompositionRecordHash) SetState(string state)
{
lock (@lock)
{
return persistentProcess.SetState(state);
}
}
}
}

0 comments on commit 6d4a7db

Please sign in to comment.