From bd6c2cc1a51686df2f3a32f1200bba597d0890dd Mon Sep 17 00:00:00 2001 From: rofr Date: Tue, 19 May 2020 23:38:46 +0200 Subject: [PATCH] pravega WIP, sss tests --- .../Memstate.Docs.GettingStarted.csproj | 5 +- .../PravegaJournalSubscription.cs | 1 + src/Memstate.Pravega/PravegaJournalWriter.cs | 4 +- src/Memstate.Pravega/PravegaProvider.cs | 2 +- .../PravegaSubscriptionSource.cs | 2 +- ...ader.cs => SqlStreamStoreJournalReader.cs} | 4 +- .../SqlStreamStoreProvider.cs | 12 +- .../SqlStreamStorePerformanceRepro.cs | 108 ++++++++++++++++++ 8 files changed, 125 insertions(+), 13 deletions(-) rename src/Memstate.SqlStreamStore/{SqlStreamSourceJournalReader.cs => SqlStreamStoreJournalReader.cs} (93%) create mode 100644 src/System.Test/SqlStreamStorePerformanceRepro.cs diff --git a/src/Memstate.Docs.GettingStarted/Memstate.Docs.GettingStarted.csproj b/src/Memstate.Docs.GettingStarted/Memstate.Docs.GettingStarted.csproj index 3d7bd9e..87fd070 100644 --- a/src/Memstate.Docs.GettingStarted/Memstate.Docs.GettingStarted.csproj +++ b/src/Memstate.Docs.GettingStarted/Memstate.Docs.GettingStarted.csproj @@ -10,7 +10,10 @@ - + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + diff --git a/src/Memstate.Pravega/PravegaJournalSubscription.cs b/src/Memstate.Pravega/PravegaJournalSubscription.cs index 2b0f80c..03fdd17 100644 --- a/src/Memstate.Pravega/PravegaJournalSubscription.cs +++ b/src/Memstate.Pravega/PravegaJournalSubscription.cs @@ -25,6 +25,7 @@ public PravegaJournalSubscription(Action handler, IAsyncStreamRea public void Start() => _task = Run(); private async Task Run() { + var recordNumber = 0; while (await _reader.MoveNext(_cts.Token)) { var eventsResponse = _reader.Current; diff --git a/src/Memstate.Pravega/PravegaJournalWriter.cs b/src/Memstate.Pravega/PravegaJournalWriter.cs index ca3bb6f..6fc9959 100644 --- a/src/Memstate.Pravega/PravegaJournalWriter.cs +++ b/src/Memstate.Pravega/PravegaJournalWriter.cs @@ -23,10 +23,10 @@ public PravegaJournalWriter(PravegaGateway.PravegaGatewayClient client, ISeriali _writer = _client.WriteEvents(); } - public Task DisposeAsync() + public override async Task DisposeAsync() { + await base.DisposeAsync(); _writer.Dispose(); - return Task.CompletedTask; } diff --git a/src/Memstate.Pravega/PravegaProvider.cs b/src/Memstate.Pravega/PravegaProvider.cs index 3600ebd..5913645 100644 --- a/src/Memstate.Pravega/PravegaProvider.cs +++ b/src/Memstate.Pravega/PravegaProvider.cs @@ -41,7 +41,7 @@ public override void Initialize() public override IJournalReader CreateJournalReader() { var serializer = Config.Current.CreateSerializer(); - return new PravegaJournalReader(_client, serializer, _scope, _stream); + return new PravegaJournalReader(_client, serializer, _scope, _stream, null); } public override IJournalWriter CreateJournalWriter(long nextRecordNumber) diff --git a/src/Memstate.Pravega/PravegaSubscriptionSource.cs b/src/Memstate.Pravega/PravegaSubscriptionSource.cs index 479fb3b..b59bf1d 100644 --- a/src/Memstate.Pravega/PravegaSubscriptionSource.cs +++ b/src/Memstate.Pravega/PravegaSubscriptionSource.cs @@ -19,7 +19,7 @@ public IJournalSubscription Subscribe(long @from, Action handler) request.Stream = "mystream"; var response = _client.ReadEvents(request); var streamReader = response.ResponseStream; - var sub = new PravegaJournalSubscription(handler, streamReader, @from); + var sub = new PravegaJournalSubscription(handler, streamReader); sub.Start(); return sub; } diff --git a/src/Memstate.SqlStreamStore/SqlStreamSourceJournalReader.cs b/src/Memstate.SqlStreamStore/SqlStreamStoreJournalReader.cs similarity index 93% rename from src/Memstate.SqlStreamStore/SqlStreamSourceJournalReader.cs rename to src/Memstate.SqlStreamStore/SqlStreamStoreJournalReader.cs index 8e0ec78..d1cc01c 100644 --- a/src/Memstate.SqlStreamStore/SqlStreamSourceJournalReader.cs +++ b/src/Memstate.SqlStreamStore/SqlStreamStoreJournalReader.cs @@ -6,13 +6,13 @@ namespace Memstate.SqlStreamStore { - public class SqlStreamSourceJournalReader : IJournalReader + public class SqlStreamStoreJournalReader : IJournalReader { private readonly IStreamStore _streamStore; private readonly StreamId _streamId; private readonly ISerializer _serializer; - public SqlStreamSourceJournalReader( + public SqlStreamStoreJournalReader( IStreamStore streamStore, StreamId streamId, ISerializer serializer) diff --git a/src/Memstate.SqlStreamStore/SqlStreamStoreProvider.cs b/src/Memstate.SqlStreamStore/SqlStreamStoreProvider.cs index dcb81fa..914f3d2 100644 --- a/src/Memstate.SqlStreamStore/SqlStreamStoreProvider.cs +++ b/src/Memstate.SqlStreamStore/SqlStreamStoreProvider.cs @@ -32,15 +32,15 @@ public SqlStreamStoreProvider(IStreamStore streamStore) public override IJournalReader CreateJournalReader() { - return new SqlSteamStoreSubscriptionJournalReader( + // return new SqlSteamStoreSubscriptionJournalReader( + // _streamStore, + // _streamId, + // _serializer); + + return new SqlStreamStoreJournalReader( _streamStore, _streamId, _serializer); - - //return new SqlStreamSourceJournalReader( - // _streamStore, - // _streamId, - // _serializer); } public override IJournalWriter CreateJournalWriter(long nextRecordNumber) diff --git a/src/System.Test/SqlStreamStorePerformanceRepro.cs b/src/System.Test/SqlStreamStorePerformanceRepro.cs new file mode 100644 index 0000000..662da47 --- /dev/null +++ b/src/System.Test/SqlStreamStorePerformanceRepro.cs @@ -0,0 +1,108 @@ +using System.Linq; +using System.Threading.Tasks; +using Memstate; +using Memstate.Configuration; +using Memstate.SqlStreamStore; +using Npgsql; +using NUnit.Framework; +using SqlStreamStore; + +namespace System.Test +{ + [TestFixture] + public class SqlStreamStorePerformanceRepro + { + + private const int MessageSize = 1024; + private const int NumMessages = 200000; + private const string PgsqlConnectionString + = "Host=localhost; Password=postgres; User ID=postgres; Database=postgres;"; + + private IJournalWriter _writer; + private SqlStreamStoreProvider _provider; + private String _streamName; + + + [SetUp] + public void Setup() + { + var config = Config.Current; + _streamName = "test-" + DateTime.Now.ToFileTimeUtc(); + config.GetSettings().StreamName = _streamName; + + + config.SerializerName = Serializers.NewtonsoftJson; + var streamStore = Postgres(); + _provider = new SqlStreamStoreProvider(streamStore); + _writer = _provider.CreateJournalWriter(0); + + } + + [Test] + public async Task WriteTruncateAndLoadUsingReader() + { + Console.WriteLine(nameof(WriteTruncateAndLoadUsingReader) + " " + _streamName); + await AppendMessages(NumMessages, MessageSize); + await DeleteMessages(); + var reader = _provider.CreateJournalReader(); + var records = reader.GetRecords().ToList(); + Console.WriteLine("Records read: " + records.Count); + Console.WriteLine(DateTime.Now); + } + + [Test] + public async Task WriteTruncateAndLoadUsingSubscription() + { + Console.WriteLine(nameof(WriteTruncateAndLoadUsingSubscription) + " " + _streamName); + await AppendMessages(NumMessages, MessageSize); + await DeleteMessages(); + var sub = _provider.CreateJournalSubscriptionSource() + .Subscribe(0, jr => Console.WriteLine(jr.RecordNumber)); + while (!sub.Ready()) await Task.Delay(TimeSpan.FromMilliseconds(20)); + Console.WriteLine("Ready! " + DateTime.Now); + } + + private async Task DeleteMessages() + { + var connection = new NpgsqlConnection(PgsqlConnectionString); + await connection.OpenAsync(); + var cmd = connection.CreateCommand(); + cmd.CommandText = "DELETE FROM messages"; + var result = await cmd.ExecuteNonQueryAsync(); + Console.WriteLine("Messages deleted: " + result); + + } + private IStreamStore Postgres() + { + var settings = new PostgresStreamStoreSettings(PgsqlConnectionString); + var store = new PostgresStreamStore(settings); + store.CreateSchemaIfNotExists().GetAwaiter().GetResult(); + return store; + } + + private async Task AppendMessages(int numMessages, int messageSize) + { + for (int i = 0; i < numMessages; i++) + { + _writer.Send(new MyCommand(messageSize)); + } + await _writer.DisposeAsync(); + Console.WriteLine("Append complete: " + DateTime.Now); + } + + private class MyCommand : Command + { + public byte[] Payload { get; set; } + + public MyCommand(int size) + { + Payload = new byte[size]; + } + + internal override object ExecuteImpl(object model) + { + throw new NotImplementedException(); + } + } + } +} \ No newline at end of file