Skip to content

Commit

Permalink
Add options for SqlStreamStore
Browse files Browse the repository at this point in the history
UseSqlStreamStore extension now takes options maxReadCount
and useSubscriptionBasedReader

Added method to initialize postgresql schema with compatible a json type
  • Loading branch information
rofr committed Jun 30, 2020
1 parent c68ca1b commit 0a2700d
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 48 deletions.
14 changes: 13 additions & 1 deletion src/Memstate.SqlStreamStore/ConfigExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Text;
using Memstate.Configuration;
using Npgsql;
using SqlStreamStore;

namespace Memstate.SqlStreamStore
Expand All @@ -10,9 +12,19 @@ public static class ConfigExtensions
/// </summary>
/// <param name="config"></param>
/// <param name="streamStore"></param>
/// <param name="useSubscriptionBasedReader">
/// There are two reader implementations. The subscriptionBasedReader
/// will in some cases have a better performance, the default is false</param>
/// <param name="maxRecordsPerRead">The number of to fetch</param>
/// <returns></returns>
public static Config UseSqlStreamStore(this Config config, IStreamStore streamStore = null)
public static Config UseSqlStreamStore(this Config config,
IStreamStore streamStore = null,
bool useSubscriptionBasedReader = false,
int maxRecordsPerRead = 100)
{
config.Data["SqlStreamStore.UseSubscriptionBasedReader"] = useSubscriptionBasedReader.ToString();
config.Data["SqlStreamStore.MaxRecordsPerRead"] = maxRecordsPerRead.ToString();

config.StorageProviderName = StorageProviders.SqlStreamStore;
config.Container.Register(streamStore);
return config;
Expand Down
1 change: 1 addition & 0 deletions src/Memstate.SqlStreamStore/Memstate.SqlStreamStore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Devrex Labs</Owners>

<ItemGroup>
<PackageReference Include="SqlStreamStore" Version="1.2.0-beta.8" />
<PackageReference Include="SqlStreamStore.Postgres" Version="1.2.0-beta.8" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ public IEnumerable<JournalRecord> GetRecords(long fromRecord = 0)
async Task MessageReceived(IStreamSubscription subscription, StreamMessage message,
CancellationToken cancellationToken)
{
if (message.StreamVersion < fromRecord)
return;
var command = (Command)_serializer.FromString(await message.GetJsonData());
var json = await message.GetJsonData(cancellationToken);
var command = (Command)_serializer.FromString(json);
var journalRecord = new JournalRecord(message.StreamVersion, message.CreatedUtc, command);
queue.Add(journalRecord);
}
Expand All @@ -63,11 +62,9 @@ async Task MessageReceived(IStreamSubscription subscription, StreamMessage messa
{
sub.MaxCountPerRead = 100;

JournalRecord journalRecord;
while (!caughtUp || queue.Any())
{

if (queue.TryTake(out journalRecord))
if (queue.TryTake(out var journalRecord))
yield return journalRecord;
else if (!caughtUp)
Thread.Sleep(100);
Expand Down
2 changes: 1 addition & 1 deletion src/Memstate.SqlStreamStore/SqlStreamStoreJournalReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public Task DisposeAsync()

public IEnumerable<JournalRecord> GetRecords(long fromRecord = 0)
{
var pageSize = 20;
var pageSize = 200;

while (true)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Memstate.SqlStreamStore/SqlStreamStoreJournalWriter.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using SqlStreamStore;
Expand All @@ -21,7 +22,7 @@ public SqlStreamStoreJournalWriter(IStreamStore streamStore, StreamId streamId,
protected override void OnCommandBatch(IEnumerable<Command> commands)
{
var messages = commands.Select(ToNewStreamMessage).ToArray();
var result = _streamStore.AppendToStream(_streamId, ExpectedVersion.Any, messages )
var result = _streamStore.AppendToStream(_streamId, ExpectedVersion.Any, messages )
.GetAwaiter()
.GetResult();
}
Expand Down
54 changes: 44 additions & 10 deletions src/Memstate.SqlStreamStore/SqlStreamStoreProvider.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Memstate.Configuration;
using System.Text;
using Memstate.Configuration;
using Npgsql;
using SqlStreamStore;
using SqlStreamStore.Streams;

Expand All @@ -10,6 +12,8 @@ public class SqlStreamStoreProvider: StorageProvider
private readonly ISerializer _serializer;
private readonly IStreamStore _streamStore;

private readonly bool UseSubscriptionBasedReader = false;

public SqlStreamStoreProvider() : this(null)
{

Expand All @@ -30,27 +34,57 @@ public SqlStreamStoreProvider(IStreamStore streamStore)
_streamStore = streamStore;
}

/// <inheritdoc/>
public override IJournalReader CreateJournalReader()
{
// return new SqlSteamStoreSubscriptionJournalReader(
// _streamStore,
// _streamId,
// _serializer);

return new SqlStreamStoreJournalReader(
_streamStore,
_streamId,
_serializer);
return UseSubscriptionBasedReader ? (IJournalReader)
new SqlSteamStoreSubscriptionJournalReader(
_streamStore,
_streamId,
_serializer) :
new SqlStreamStoreJournalReader(
_streamStore,
_streamId,
_serializer);
}

/// <inheritdoc/>
public override IJournalWriter CreateJournalWriter(long nextRecordNumber)
{
return new SqlStreamStoreJournalWriter(_streamStore, _streamId, _serializer);
}

/// <inheritdoc/>
public override IJournalSubscriptionSource CreateJournalSubscriptionSource()
{
return new SqlStreamStoreSubscriptionSource(_streamStore, _streamId, _serializer);
}

/// <summary>
/// Initialize a postgres database for use as a Memstate SqlStreamStore backend
/// <remarks>You must use this method to initialize the database objects.
/// The normal SqlStreamStore schema uses the JSONB datatype</remarks>
/// </summary>
/// <param name="settings">A settings object with a valid connection string</param>
public static void InitializePgSqlStreamStore(PostgresStreamStoreSettings settings)
{
var store = new PostgresStreamStore(settings);
var originalScript = store.GetSchemaCreationScript();

var sb = new StringBuilder("CREATE SCHEMA IF NOT EXISTS ");
sb.AppendLine(settings.Schema + ";");

sb.Append(originalScript);
var script = sb.ToString().Replace("JSONB", "JSON");

using (var connection = new NpgsqlConnection(settings.ConnectionString))
{
connection.Open();
var cmd = connection.CreateCommand();
cmd.CommandText = script;
cmd.ExecuteNonQuery();
connection.Close();
}
}
}
}
136 changes: 107 additions & 29 deletions src/System.Test/SqlStreamStorePerformanceRepro.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
using System.Linq;
using System.Data.Common;
using System.Data.SqlClient;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Memstate;
using Memstate.Configuration;
Expand All @@ -13,71 +17,144 @@ namespace System.Test
public class SqlStreamStorePerformanceRepro
{

private const int MessageSize = 1024;
private const int NumMessages = 200000;
private const int MessageSize = 400;
private const int NumMessages = 10000;
private const string PgsqlConnectionString
= "Host=localhost; Password=postgres; User ID=postgres; Database=postgres;";

private IJournalWriter _writer;
private SqlStreamStoreProvider _provider;
private String _streamName;

private DbConnection _connection;
private IStreamStore _streamStore;

private readonly Stopwatch _stopWatch = new Stopwatch();

[TearDown]
public async Task TearDown()
{
await _connection.DisposeAsync();
_streamStore.Dispose();
}

[SetUp]
public void Setup()
{
SqlStreamStore.Logging.LogProvider.IsDisabled = true;
var config = Config.Current;
_streamName = "test-" + DateTime.Now.ToFileTimeUtc();
config.GetSettings<EngineSettings>().StreamName = _streamName;


config.SerializerName = Serializers.NewtonsoftJson;
var streamStore = Postgres();
_provider = new SqlStreamStoreProvider(streamStore);

//ConfigurePostgres();

ConfigureMssql2019();
_provider = new SqlStreamStoreProvider(_streamStore);
_writer = _provider.CreateJournalWriter(0);
}

private void ConfigurePostgres()
{
_connection = new NpgsqlConnection(PgsqlConnectionString);
var settings = new PostgresStreamStoreSettings(PgsqlConnectionString);
settings.Schema = "randy";
SqlStreamStoreProvider.InitializePgSqlStreamStore(settings);
_streamStore = new PostgresStreamStore(settings);
}


private void ConfigureMssql2019()
{
CreateMsSqlDatabaseUnlessExists();
var connectionString = "Server=localhost;Database=memstate;User Id=sa;Password=abc123ABC;";
_connection = new SqlConnection(connectionString);
var settings = new MsSqlStreamStoreV3Settings(connectionString);
settings.Schema = "memstate";
var store = new MsSqlStreamStoreV3(settings);
store.CreateSchemaIfNotExists().GetAwaiter().GetResult();
_streamStore = store;
}

private void CreateMsSqlDatabaseUnlessExists()
{
try
{
var connectionString = "Server=localhost;Database=master;User Id=sa;Password=abc123ABC;";
_connection = new SqlConnection(connectionString);
_connection.Open();
var cmd = _connection.CreateCommand();
cmd.CommandText = "IF db_id('memstate') IS NULL CREATE DATABASE memstate";
cmd.ExecuteNonQuery();
}
finally
{
_connection.Close();
}
}

[Test]
public void DeserializationFailsWhenFirstJsonAttributeIsNotType()
{
Assert.Catch(() =>
{
var json =
"{\"Id\": \"8d7693f9-9bfc-4c23-ac52-31fc22bd67f3\", \"$type\": \"System.Test.SqlStreamStorePerformanceRepro+MyCommand, System.Test\", \"Payload\": {\"$type\": \"System.Byte[], System.Private.CoreLib\", \"$value\": \"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==\"}}";
var serializer = Config.Current.CreateSerializer();
var command = (Command) serializer.FromString(json);
});
}

[Test]
public void CanDeserializeWithTypeFirst()
{
var json = "{\"$type\":\"System.Test.SqlStreamStorePerformanceRepro+MyCommand, System.Test\",\"Payload\":{\"$type\":\"System.Byte[], System.Private.CoreLib\",\"$value\":\"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==\"},\"Id\":\"e12c9b93-025d-4378-b370-24704acd134d\"}";;
var serializer = Config.Current.CreateSerializer();
var command = (Command) serializer.FromString(json);
}
[Test]
public async Task WriteTruncateAndLoadUsingReader()
{
Console.WriteLine(nameof(WriteTruncateAndLoadUsingReader) + " " + _streamName);
await AppendMessages(NumMessages, MessageSize);
await DeleteMessages();
await WriteAndTruncateMessages();
_stopWatch.Restart();
var reader = _provider.CreateJournalReader();
var records = reader.GetRecords().ToList();
Console.WriteLine("Records read: " + records.Count);
Console.WriteLine(DateTime.Now);
Console.WriteLine("Read duration: " + _stopWatch.Elapsed);
await reader.DisposeAsync();
}

[Test]
public async Task WriteTruncateAndLoadUsingSubscription()
{
Console.WriteLine(nameof(WriteTruncateAndLoadUsingSubscription) + " " + _streamName);
await AppendMessages(NumMessages, MessageSize);
await DeleteMessages();
var messagesReceived = 0;

await WriteAndTruncateMessages();
_stopWatch.Restart();
var sub = _provider.CreateJournalSubscriptionSource()
.Subscribe(0, jr => Console.WriteLine(jr.RecordNumber));
.Subscribe(0, jr => messagesReceived++);
while (!sub.Ready()) await Task.Delay(TimeSpan.FromMilliseconds(20));
Console.WriteLine("Ready! " + DateTime.Now);
Console.WriteLine("Messages received: " + messagesReceived);
Console.WriteLine("Read with sub duration: " + _stopWatch.Elapsed);
sub.Dispose();
}

private async Task WriteAndTruncateMessages()
{
_stopWatch.Restart();
await AppendMessages(NumMessages, MessageSize);
Console.WriteLine("Append duration:" + _stopWatch.Elapsed);
_stopWatch.Restart();
//await DeleteMessages();
}
private async Task DeleteMessages()
{
var connection = new NpgsqlConnection(PgsqlConnectionString);
await connection.OpenAsync();
var cmd = connection.CreateCommand();
cmd.CommandText = "DELETE FROM messages";
await _connection.OpenAsync();
var cmd = _connection.CreateCommand();
cmd.CommandText = "TRUNCATE TABLE messages";
cmd.CommandTimeout = 1000 * 60 * 5; //no idea if this is 5 minutes or 5000 minutes
var result = await cmd.ExecuteNonQueryAsync();
Console.WriteLine("Messages deleted: " + result);

}
private IStreamStore Postgres()
{
var settings = new PostgresStreamStoreSettings(PgsqlConnectionString);
var store = new PostgresStreamStore(settings);
store.CreateSchemaIfNotExists().GetAwaiter().GetResult();
return store;
}

private async Task AppendMessages(int numMessages, int messageSize)
Expand All @@ -90,7 +167,7 @@ private async Task AppendMessages(int numMessages, int messageSize)
Console.WriteLine("Append complete: " + DateTime.Now);
}

private class MyCommand : Command
public class MyCommand : Command
{
public byte[] Payload { get; set; }

Expand All @@ -101,6 +178,7 @@ public MyCommand(int size)

internal override object ExecuteImpl(object model)
{
//never called
throw new NotImplementedException();
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/System.Test/System.Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
<PackageReference Include="NUnit.Runners" Version="3.11.1" />
<PackageReference Include="Appveyor.TestLogger" Version="2.0.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
<PackageReference Include="SqlStreamStore.MsSql" Version="1.2.0-beta.8" />
<PackageReference Include="SqlStreamStore.Postgres" Version="1.2.0-beta.8" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Memstate.EventStore\Memstate.EventStore.csproj" />
Expand Down

0 comments on commit 0a2700d

Please sign in to comment.