diff --git a/src/Memstate.SqlStreamStore/ConfigExtensions.cs b/src/Memstate.SqlStreamStore/ConfigExtensions.cs
index 587aab4..dea9286 100644
--- a/src/Memstate.SqlStreamStore/ConfigExtensions.cs
+++ b/src/Memstate.SqlStreamStore/ConfigExtensions.cs
@@ -1,4 +1,6 @@
+using System.Text;
using Memstate.Configuration;
+using Npgsql;
using SqlStreamStore;
namespace Memstate.SqlStreamStore
@@ -10,9 +12,19 @@ public static class ConfigExtensions
///
///
///
+ ///
+ /// There are two reader implementations. The subscriptionBasedReader
+ /// will in some cases have a better performance, the default is false
+ /// The number of to fetch
///
- public static Config UseSqlStreamStore(this Config config, IStreamStore streamStore = null)
+ public static Config UseSqlStreamStore(this Config config,
+ IStreamStore streamStore = null,
+ bool useSubscriptionBasedReader = false,
+ int maxRecordsPerRead = 100)
{
+ config.Data["SqlStreamStore.UseSubscriptionBasedReader"] = useSubscriptionBasedReader.ToString();
+ config.Data["SqlStreamStore.MaxRecordsPerRead"] = maxRecordsPerRead.ToString();
+
config.StorageProviderName = StorageProviders.SqlStreamStore;
config.Container.Register(streamStore);
return config;
diff --git a/src/Memstate.SqlStreamStore/Memstate.SqlStreamStore.csproj b/src/Memstate.SqlStreamStore/Memstate.SqlStreamStore.csproj
index 14b8da4..8f17c8c 100644
--- a/src/Memstate.SqlStreamStore/Memstate.SqlStreamStore.csproj
+++ b/src/Memstate.SqlStreamStore/Memstate.SqlStreamStore.csproj
@@ -20,6 +20,7 @@ Devrex Labs
+
diff --git a/src/Memstate.SqlStreamStore/SqlSteamStoreSubscriptionJournalReader.cs b/src/Memstate.SqlStreamStore/SqlSteamStoreSubscriptionJournalReader.cs
index bc4f17f..b47fa07 100644
--- a/src/Memstate.SqlStreamStore/SqlSteamStoreSubscriptionJournalReader.cs
+++ b/src/Memstate.SqlStreamStore/SqlSteamStoreSubscriptionJournalReader.cs
@@ -39,9 +39,8 @@ public IEnumerable GetRecords(long fromRecord = 0)
async Task MessageReceived(IStreamSubscription subscription, StreamMessage message,
CancellationToken cancellationToken)
{
- if (message.StreamVersion < fromRecord)
- return;
- var command = (Command)_serializer.FromString(await message.GetJsonData());
+ var json = await message.GetJsonData(cancellationToken);
+ var command = (Command)_serializer.FromString(json);
var journalRecord = new JournalRecord(message.StreamVersion, message.CreatedUtc, command);
queue.Add(journalRecord);
}
@@ -63,11 +62,9 @@ async Task MessageReceived(IStreamSubscription subscription, StreamMessage messa
{
sub.MaxCountPerRead = 100;
- JournalRecord journalRecord;
while (!caughtUp || queue.Any())
{
-
- if (queue.TryTake(out journalRecord))
+ if (queue.TryTake(out var journalRecord))
yield return journalRecord;
else if (!caughtUp)
Thread.Sleep(100);
diff --git a/src/Memstate.SqlStreamStore/SqlStreamStoreJournalReader.cs b/src/Memstate.SqlStreamStore/SqlStreamStoreJournalReader.cs
index d1cc01c..24551e4 100644
--- a/src/Memstate.SqlStreamStore/SqlStreamStoreJournalReader.cs
+++ b/src/Memstate.SqlStreamStore/SqlStreamStoreJournalReader.cs
@@ -29,7 +29,7 @@ public Task DisposeAsync()
public IEnumerable GetRecords(long fromRecord = 0)
{
- var pageSize = 20;
+ var pageSize = 200;
while (true)
{
diff --git a/src/Memstate.SqlStreamStore/SqlStreamStoreJournalWriter.cs b/src/Memstate.SqlStreamStore/SqlStreamStoreJournalWriter.cs
index cf95373..6653d74 100644
--- a/src/Memstate.SqlStreamStore/SqlStreamStoreJournalWriter.cs
+++ b/src/Memstate.SqlStreamStore/SqlStreamStoreJournalWriter.cs
@@ -1,3 +1,4 @@
+using System;
using System.Collections.Generic;
using System.Linq;
using SqlStreamStore;
@@ -21,7 +22,7 @@ public SqlStreamStoreJournalWriter(IStreamStore streamStore, StreamId streamId,
protected override void OnCommandBatch(IEnumerable commands)
{
var messages = commands.Select(ToNewStreamMessage).ToArray();
- var result = _streamStore.AppendToStream(_streamId, ExpectedVersion.Any, messages )
+ var result = _streamStore.AppendToStream(_streamId, ExpectedVersion.Any, messages )
.GetAwaiter()
.GetResult();
}
diff --git a/src/Memstate.SqlStreamStore/SqlStreamStoreProvider.cs b/src/Memstate.SqlStreamStore/SqlStreamStoreProvider.cs
index 914f3d2..a6d3dbb 100644
--- a/src/Memstate.SqlStreamStore/SqlStreamStoreProvider.cs
+++ b/src/Memstate.SqlStreamStore/SqlStreamStoreProvider.cs
@@ -1,4 +1,6 @@
-using Memstate.Configuration;
+using System.Text;
+using Memstate.Configuration;
+using Npgsql;
using SqlStreamStore;
using SqlStreamStore.Streams;
@@ -10,6 +12,8 @@ public class SqlStreamStoreProvider: StorageProvider
private readonly ISerializer _serializer;
private readonly IStreamStore _streamStore;
+ private readonly bool UseSubscriptionBasedReader = false;
+
public SqlStreamStoreProvider() : this(null)
{
@@ -30,27 +34,57 @@ public SqlStreamStoreProvider(IStreamStore streamStore)
_streamStore = streamStore;
}
+ ///
public override IJournalReader CreateJournalReader()
{
- // return new SqlSteamStoreSubscriptionJournalReader(
- // _streamStore,
- // _streamId,
- // _serializer);
-
- return new SqlStreamStoreJournalReader(
- _streamStore,
- _streamId,
- _serializer);
+ return UseSubscriptionBasedReader ? (IJournalReader)
+ new SqlSteamStoreSubscriptionJournalReader(
+ _streamStore,
+ _streamId,
+ _serializer) :
+ new SqlStreamStoreJournalReader(
+ _streamStore,
+ _streamId,
+ _serializer);
}
+ ///
public override IJournalWriter CreateJournalWriter(long nextRecordNumber)
{
return new SqlStreamStoreJournalWriter(_streamStore, _streamId, _serializer);
}
+ ///
public override IJournalSubscriptionSource CreateJournalSubscriptionSource()
{
return new SqlStreamStoreSubscriptionSource(_streamStore, _streamId, _serializer);
}
+
+ ///
+ /// Initialize a postgres database for use as a Memstate SqlStreamStore backend
+ /// You must use this method to initialize the database objects.
+ /// The normal SqlStreamStore schema uses the JSONB datatype
+ ///
+ /// A settings object with a valid connection string
+ public static void InitializePgSqlStreamStore(PostgresStreamStoreSettings settings)
+ {
+ var store = new PostgresStreamStore(settings);
+ var originalScript = store.GetSchemaCreationScript();
+
+ var sb = new StringBuilder("CREATE SCHEMA IF NOT EXISTS ");
+ sb.AppendLine(settings.Schema + ";");
+
+ sb.Append(originalScript);
+ var script = sb.ToString().Replace("JSONB", "JSON");
+
+ using (var connection = new NpgsqlConnection(settings.ConnectionString))
+ {
+ connection.Open();
+ var cmd = connection.CreateCommand();
+ cmd.CommandText = script;
+ cmd.ExecuteNonQuery();
+ connection.Close();
+ }
+ }
}
}
\ No newline at end of file
diff --git a/src/System.Test/SqlStreamStorePerformanceRepro.cs b/src/System.Test/SqlStreamStorePerformanceRepro.cs
index 7873bc9..409b4a0 100644
--- a/src/System.Test/SqlStreamStorePerformanceRepro.cs
+++ b/src/System.Test/SqlStreamStorePerformanceRepro.cs
@@ -1,4 +1,8 @@
-using System.Linq;
+using System.Data.Common;
+using System.Data.SqlClient;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
using System.Threading.Tasks;
using Memstate;
using Memstate.Configuration;
@@ -13,8 +17,8 @@ namespace System.Test
public class SqlStreamStorePerformanceRepro
{
- private const int MessageSize = 1024;
- private const int NumMessages = 200000;
+ private const int MessageSize = 400;
+ private const int NumMessages = 10000;
private const string PgsqlConnectionString
= "Host=localhost; Password=postgres; User ID=postgres; Database=postgres;";
@@ -22,62 +26,135 @@ private const string PgsqlConnectionString
private SqlStreamStoreProvider _provider;
private String _streamName;
+ private DbConnection _connection;
+ private IStreamStore _streamStore;
+
+ private readonly Stopwatch _stopWatch = new Stopwatch();
+
+ [TearDown]
+ public async Task TearDown()
+ {
+ await _connection.DisposeAsync();
+ _streamStore.Dispose();
+ }
[SetUp]
public void Setup()
{
+ SqlStreamStore.Logging.LogProvider.IsDisabled = true;
var config = Config.Current;
_streamName = "test-" + DateTime.Now.ToFileTimeUtc();
config.GetSettings().StreamName = _streamName;
-
-
config.SerializerName = Serializers.NewtonsoftJson;
- var streamStore = Postgres();
- _provider = new SqlStreamStoreProvider(streamStore);
+
+ //ConfigurePostgres();
+
+ ConfigureMssql2019();
+ _provider = new SqlStreamStoreProvider(_streamStore);
_writer = _provider.CreateJournalWriter(0);
+ }
+
+ private void ConfigurePostgres()
+ {
+ _connection = new NpgsqlConnection(PgsqlConnectionString);
+ var settings = new PostgresStreamStoreSettings(PgsqlConnectionString);
+ settings.Schema = "randy";
+ SqlStreamStoreProvider.InitializePgSqlStreamStore(settings);
+ _streamStore = new PostgresStreamStore(settings);
+ }
+
+
+ private void ConfigureMssql2019()
+ {
+ CreateMsSqlDatabaseUnlessExists();
+ var connectionString = "Server=localhost;Database=memstate;User Id=sa;Password=abc123ABC;";
+ _connection = new SqlConnection(connectionString);
+ var settings = new MsSqlStreamStoreV3Settings(connectionString);
+ settings.Schema = "memstate";
+ var store = new MsSqlStreamStoreV3(settings);
+ store.CreateSchemaIfNotExists().GetAwaiter().GetResult();
+ _streamStore = store;
+ }
+
+ private void CreateMsSqlDatabaseUnlessExists()
+ {
+ try
+ {
+ var connectionString = "Server=localhost;Database=master;User Id=sa;Password=abc123ABC;";
+ _connection = new SqlConnection(connectionString);
+ _connection.Open();
+ var cmd = _connection.CreateCommand();
+ cmd.CommandText = "IF db_id('memstate') IS NULL CREATE DATABASE memstate";
+ cmd.ExecuteNonQuery();
+ }
+ finally
+ {
+ _connection.Close();
+ }
+ }
+ [Test]
+ public void DeserializationFailsWhenFirstJsonAttributeIsNotType()
+ {
+ Assert.Catch(() =>
+ {
+ var json =
+ "{\"Id\": \"8d7693f9-9bfc-4c23-ac52-31fc22bd67f3\", \"$type\": \"System.Test.SqlStreamStorePerformanceRepro+MyCommand, System.Test\", \"Payload\": {\"$type\": \"System.Byte[], System.Private.CoreLib\", \"$value\": \"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==\"}}";
+ var serializer = Config.Current.CreateSerializer();
+ var command = (Command) serializer.FromString(json);
+ });
}
+ [Test]
+ public void CanDeserializeWithTypeFirst()
+ {
+ var json = "{\"$type\":\"System.Test.SqlStreamStorePerformanceRepro+MyCommand, System.Test\",\"Payload\":{\"$type\":\"System.Byte[], System.Private.CoreLib\",\"$value\":\"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==\"},\"Id\":\"e12c9b93-025d-4378-b370-24704acd134d\"}";;
+ var serializer = Config.Current.CreateSerializer();
+ var command = (Command) serializer.FromString(json);
+ }
[Test]
public async Task WriteTruncateAndLoadUsingReader()
{
- Console.WriteLine(nameof(WriteTruncateAndLoadUsingReader) + " " + _streamName);
- await AppendMessages(NumMessages, MessageSize);
- await DeleteMessages();
+ await WriteAndTruncateMessages();
+ _stopWatch.Restart();
var reader = _provider.CreateJournalReader();
var records = reader.GetRecords().ToList();
Console.WriteLine("Records read: " + records.Count);
- Console.WriteLine(DateTime.Now);
+ Console.WriteLine("Read duration: " + _stopWatch.Elapsed);
+ await reader.DisposeAsync();
}
[Test]
public async Task WriteTruncateAndLoadUsingSubscription()
{
- Console.WriteLine(nameof(WriteTruncateAndLoadUsingSubscription) + " " + _streamName);
- await AppendMessages(NumMessages, MessageSize);
- await DeleteMessages();
+ var messagesReceived = 0;
+
+ await WriteAndTruncateMessages();
+ _stopWatch.Restart();
var sub = _provider.CreateJournalSubscriptionSource()
- .Subscribe(0, jr => Console.WriteLine(jr.RecordNumber));
+ .Subscribe(0, jr => messagesReceived++);
while (!sub.Ready()) await Task.Delay(TimeSpan.FromMilliseconds(20));
- Console.WriteLine("Ready! " + DateTime.Now);
+ Console.WriteLine("Messages received: " + messagesReceived);
+ Console.WriteLine("Read with sub duration: " + _stopWatch.Elapsed);
+ sub.Dispose();
}
+ private async Task WriteAndTruncateMessages()
+ {
+ _stopWatch.Restart();
+ await AppendMessages(NumMessages, MessageSize);
+ Console.WriteLine("Append duration:" + _stopWatch.Elapsed);
+ _stopWatch.Restart();
+ //await DeleteMessages();
+ }
private async Task DeleteMessages()
{
- var connection = new NpgsqlConnection(PgsqlConnectionString);
- await connection.OpenAsync();
- var cmd = connection.CreateCommand();
- cmd.CommandText = "DELETE FROM messages";
+ await _connection.OpenAsync();
+ var cmd = _connection.CreateCommand();
+ cmd.CommandText = "TRUNCATE TABLE messages";
+ cmd.CommandTimeout = 1000 * 60 * 5; //no idea if this is 5 minutes or 5000 minutes
var result = await cmd.ExecuteNonQueryAsync();
Console.WriteLine("Messages deleted: " + result);
-
- }
- private IStreamStore Postgres()
- {
- var settings = new PostgresStreamStoreSettings(PgsqlConnectionString);
- var store = new PostgresStreamStore(settings);
- store.CreateSchemaIfNotExists().GetAwaiter().GetResult();
- return store;
}
private async Task AppendMessages(int numMessages, int messageSize)
@@ -90,7 +167,7 @@ private async Task AppendMessages(int numMessages, int messageSize)
Console.WriteLine("Append complete: " + DateTime.Now);
}
- private class MyCommand : Command
+ public class MyCommand : Command
{
public byte[] Payload { get; set; }
@@ -101,6 +178,7 @@ public MyCommand(int size)
internal override object ExecuteImpl(object model)
{
+ //never called
throw new NotImplementedException();
}
}
diff --git a/src/System.Test/System.Test.csproj b/src/System.Test/System.Test.csproj
index 4343fe3..961230e 100644
--- a/src/System.Test/System.Test.csproj
+++ b/src/System.Test/System.Test.csproj
@@ -11,7 +11,9 @@
+
+