diff --git a/src/Memstate.All/Memstate.All.csproj b/src/Memstate.All/Memstate.All.csproj index 473657a..d2a3c1b 100644 --- a/src/Memstate.All/Memstate.All.csproj +++ b/src/Memstate.All/Memstate.All.csproj @@ -19,7 +19,6 @@ - diff --git a/src/Memstate.Benchmarks/Memstate.Benchmarks.csproj b/src/Memstate.Benchmarks/Memstate.Benchmarks.csproj index 1282bf1..8bfe7d0 100644 --- a/src/Memstate.Benchmarks/Memstate.Benchmarks.csproj +++ b/src/Memstate.Benchmarks/Memstate.Benchmarks.csproj @@ -1,7 +1,7 @@  Exe - netcoreapp3.1 + net6.0 @@ -9,7 +9,6 @@ - diff --git a/src/Memstate.Benchmarks/MemstateBenchmarks.cs b/src/Memstate.Benchmarks/MemstateBenchmarks.cs index ae7bd10..462aea5 100644 --- a/src/Memstate.Benchmarks/MemstateBenchmarks.cs +++ b/src/Memstate.Benchmarks/MemstateBenchmarks.cs @@ -8,7 +8,6 @@ using Memstate.EventStore; using Memstate.Models; using Memstate.Models.KeyValue; -using Memstate.Postgres; namespace Memstate.Benchmarks { @@ -21,7 +20,6 @@ public class MemstateBenchmarks [Params( //typeof(InMemoryStorageProvider), - typeof(PostgresProvider), typeof(EventStoreProvider))] public Type StorageProviderTypes { get; set; } diff --git a/src/Memstate.Docs.GettingStarted/Memstate.Docs.GettingStarted.csproj b/src/Memstate.Docs.GettingStarted/Memstate.Docs.GettingStarted.csproj index 43e9670..b94d191 100644 --- a/src/Memstate.Docs.GettingStarted/Memstate.Docs.GettingStarted.csproj +++ b/src/Memstate.Docs.GettingStarted/Memstate.Docs.GettingStarted.csproj @@ -2,15 +2,15 @@ Library - netcoreapp3.1 + net6.0 - - + + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/src/Memstate.Host/Memstate.Host.csproj b/src/Memstate.Host/Memstate.Host.csproj index f5cd59c..5b94616 100644 --- a/src/Memstate.Host/Memstate.Host.csproj +++ b/src/Memstate.Host/Memstate.Host.csproj @@ -1,7 +1,7 @@  Library - netcoreapp3.1 + net6.0 Devrex Labs Devrex Labs Memstate @@ -27,7 +27,6 @@ - diff --git a/src/Memstate.JsonNet/Memstate.JsonNet.csproj b/src/Memstate.JsonNet/Memstate.JsonNet.csproj index 816669b..c7af5e8 100644 --- a/src/Memstate.JsonNet/Memstate.JsonNet.csproj +++ b/src/Memstate.JsonNet/Memstate.JsonNet.csproj @@ -24,7 +24,7 @@ - + diff --git a/src/Memstate.Postgresql.Tests/Domain/Create.cs b/src/Memstate.Postgresql.Tests/Domain/Create.cs deleted file mode 100644 index fdc377e..0000000 --- a/src/Memstate.Postgresql.Tests/Domain/Create.cs +++ /dev/null @@ -1,29 +0,0 @@ -using System; -using Newtonsoft.Json; - -namespace Memstate.Postgres.Tests.Domain -{ - public class Create : Command - { - public Create(Guid taskId, string description) - { - TaskId = taskId; - Description = description; - } - - [JsonProperty] - public string Description { get; private set; } - - [JsonProperty] - public Guid TaskId { get; private set; } - - public override Task Execute(Todo model) - { - var task = new Task(TaskId, Description); - - model.Tasks.Add(TaskId, task); - - return task; - } - } -} \ No newline at end of file diff --git a/src/Memstate.Postgresql.Tests/Domain/Resolve.cs b/src/Memstate.Postgresql.Tests/Domain/Resolve.cs deleted file mode 100644 index 7f26557..0000000 --- a/src/Memstate.Postgresql.Tests/Domain/Resolve.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System; -using Newtonsoft.Json; - -namespace Memstate.Postgres.Tests.Domain -{ - public class Resolve : Command - { - public Resolve(Guid taskId) - { - TaskId = taskId; - } - - [JsonProperty] - public Guid TaskId { get; private set; } - - public override void Execute(Todo model) - { - model.Tasks.Remove(TaskId); - } - } -} \ No newline at end of file diff --git a/src/Memstate.Postgresql.Tests/Domain/Task.cs b/src/Memstate.Postgresql.Tests/Domain/Task.cs deleted file mode 100644 index baf6946..0000000 --- a/src/Memstate.Postgresql.Tests/Domain/Task.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System; - -namespace Memstate.Postgres.Tests.Domain -{ - public class Task - { - public Task(Guid id, string description) - { - Id = id; - Description = description; - } - - public Guid Id { get; } - - public string Description { get; } - } -} \ No newline at end of file diff --git a/src/Memstate.Postgresql.Tests/Domain/Todo.cs b/src/Memstate.Postgresql.Tests/Domain/Todo.cs deleted file mode 100644 index 062c3ac..0000000 --- a/src/Memstate.Postgresql.Tests/Domain/Todo.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System; -using System.Collections.Generic; - -namespace Memstate.Postgres.Tests.Domain -{ - public class Todo - { - public Dictionary Tasks { get; set; } - } -} \ No newline at end of file diff --git a/src/Memstate.Postgresql.Tests/JournalReaderTests.cs b/src/Memstate.Postgresql.Tests/JournalReaderTests.cs deleted file mode 100644 index 6bc2055..0000000 --- a/src/Memstate.Postgresql.Tests/JournalReaderTests.cs +++ /dev/null @@ -1,105 +0,0 @@ -using System.Threading; -using System; -using System.Collections.Generic; -using Memstate.Postgres.Tests.Domain; -using Npgsql; -using NUnit.Framework; -using System.Linq; -using Memstate.Configuration; - -namespace Memstate.Postgres.Tests -{ - [TestFixture] - public class JournalReaderTests - { - private PostgresProvider _provider; - private IJournalReader _journalReader; - private IJournalWriter _journalWriter; - private ISerializer _serializer; - - [SetUp] - public void Setup() - { - var cfg = Config.Reset(); - cfg.GetSettings() - .WithRandomSuffixAppendedToStreamName(); - - _provider = new PostgresProvider(); - _provider.Initialize(); - - _journalReader = _provider.CreateJournalReader(); - _journalWriter = _provider.CreateJournalWriter(0); - - _serializer = Config.Current.CreateSerializer(); - } - - [Test] - public void CanRead() - { - var create = new Create(Guid.NewGuid(), "Resolve a Postgresql driver for Memstate"); - - InsertCommand(_serializer.Serialize(create)); - - var journalRecords = _journalReader.GetRecords(); - - Assert.AreEqual(1, journalRecords.Count()); - } - - [Test] - public void CanWrite() - { - var create = new Create(Guid.NewGuid(), "Resolve a Postgresql driver for Memstate"); - - _journalWriter.Send(create); - - Thread.Sleep(500); - - var journalRecords = GetJournalRecords(); - - Assert.AreEqual(1, journalRecords.Count()); - } - - private void InsertCommand(byte[] data) - { - using (var connection = new NpgsqlConnection(_provider.Settings.ConnectionString)) - using (var command = connection.CreateCommand()) - { - connection.Open(); - - command.CommandText = string.Format("INSERT INTO {0} (command) VALUES(@command);", - _provider.Settings.Table); - - command.Parameters.AddWithValue("@command", Convert.ToBase64String(data)); - - Assert.AreEqual(1, command.ExecuteNonQuery()); - } - } - - private IEnumerable GetJournalRecords() - { - var journalRecords = new List(); - - using (var connection = new NpgsqlConnection(_provider.Settings.ConnectionString)) - using (var command = connection.CreateCommand()) - { - connection.Open(); - - command.CommandText = string.Format( - "SELECT id, written FROM {0} ORDER BY id ASC;", - _provider.Settings.Table); - - using (var reader = command.ExecuteReader()) - { - while (reader.Read()) - { - var journalRecord = new JournalRecord((long) reader[0], (DateTime) reader[1], null); - - journalRecords.Add(journalRecord); - } - } - } - - return journalRecords; - } - } -} \ No newline at end of file diff --git a/src/Memstate.Postgresql.Tests/Memstate.Postgres.Test.csproj b/src/Memstate.Postgresql.Tests/Memstate.Postgres.Test.csproj deleted file mode 100644 index 8aa5c8c..0000000 --- a/src/Memstate.Postgresql.Tests/Memstate.Postgres.Test.csproj +++ /dev/null @@ -1,20 +0,0 @@ - - - netcoreapp3.1 - Memstate.Postgresql.Tests - - - - - - - - - - - - - - - - diff --git a/src/Memstate.Postgresql.Tests/PostgresSettingsTests.cs b/src/Memstate.Postgresql.Tests/PostgresSettingsTests.cs deleted file mode 100644 index f723182..0000000 --- a/src/Memstate.Postgresql.Tests/PostgresSettingsTests.cs +++ /dev/null @@ -1,101 +0,0 @@ -using System; -using Memstate.Configuration; -using Npgsql; -using NUnit.Framework; - -namespace Memstate.Postgres.Tests -{ - [TestFixture] - public class PostgresSettingsTests - { - private PostgresSettings _settings; - private EngineSettings _memstateSettings; - - [SetUp] - public void Setup() - { - _memstateSettings = new EngineSettings(); - _settings = new PostgresSettings(); - } - - [Test] - public void CanExtractInitSqlResource() - { - var initSql = _settings.InitSql.Value; - Assert.True(!String.IsNullOrEmpty(initSql)); - } - - [Test] - public void DefaultConnectionStringIsUsed() - { - var key = "Memstate:Postgres:Password"; - var defaultBuilder = new NpgsqlConnectionStringBuilder(PostgresSettings.DefaultConnectionString); - - //Appveyor workaround - //test failed on Appveyor because the pgsql password env variable is set! - defaultBuilder.Password = Environment.GetEnvironmentVariable(key) ?? defaultBuilder.Password; - - var actualBuilder = new NpgsqlConnectionStringBuilder(_settings.ConnectionString); - Assert.True(defaultBuilder.EquivalentTo(actualBuilder)); - } - - [Test] - public void PasswordCanBeSetUsingEnvironmentVariable() - { - string key = "MEMSTATE_POSTGRES_PASSWORD"; - string value = "Password12!"; - Environment.SetEnvironmentVariable(key, value); - var config = Config.Reset(); - var settings = config.GetSettings(); - Assert.AreEqual(value, settings.Password); - } - - [Test] - public void TableNameEndsWithSuffix() - { - Assert.True(_settings.Table.EndsWith(_settings.TableSuffix)); - } - - [Test] - public void TableNameStartsWithStreamName() - { - Assert.True(_settings.Table.StartsWith(_memstateSettings.StreamName)); - } - - [Test] - public void HostOverridesConnectionString() - { - var expected = Guid.NewGuid().ToString(); - _settings.Host = expected; - var connectionStringBuilder = new NpgsqlConnectionStringBuilder(_settings.ConnectionString); - Assert.AreEqual(expected, connectionStringBuilder.Host); - } - - [Test] - public void PasswordOverridesConnectionString() - { - var expected = Guid.NewGuid().ToString(); - _settings.Password = expected; - var connectionStringBuilder = new NpgsqlConnectionStringBuilder(_settings.ConnectionString); - Assert.AreEqual(expected, connectionStringBuilder.Password); - } - - [Test] - public void UsernameOverridesConnectionString() - { - var expected = Guid.NewGuid().ToString(); - _settings.Username = expected; - var connectionStringBuilder = new NpgsqlConnectionStringBuilder(_settings.ConnectionString); - Assert.AreEqual(expected, connectionStringBuilder.Username); - } - - [Test] - public void DatabaseOverridesConnectionString() - { - var expected = Guid.NewGuid().ToString(); - _settings.Database = expected; - var connectionStringBuilder = new NpgsqlConnectionStringBuilder(_settings.ConnectionString); - Assert.AreEqual(expected, connectionStringBuilder.Database); - } - } -} \ No newline at end of file diff --git a/src/Memstate.Postgresql/ConfigExtensions.cs b/src/Memstate.Postgresql/ConfigExtensions.cs deleted file mode 100644 index e8786b5..0000000 --- a/src/Memstate.Postgresql/ConfigExtensions.cs +++ /dev/null @@ -1,13 +0,0 @@ -using Memstate.Configuration; - -namespace Memstate.Postgres -{ - public static class ConfigExtensions - { - public static Config UsePostgresqlProvider(this Config config) - { - config.StorageProviderName = typeof(PostgresProvider).AssemblyQualifiedName; - return config; - } - } -} \ No newline at end of file diff --git a/src/Memstate.Postgresql/Memstate.Postgres.csproj b/src/Memstate.Postgresql/Memstate.Postgres.csproj deleted file mode 100644 index 9ef4269..0000000 --- a/src/Memstate.Postgresql/Memstate.Postgres.csproj +++ /dev/null @@ -1,39 +0,0 @@ - - - netstandard2.0 - 0.1.0-alpha - Devrex Labs - Devrex Labs - Memstate - Storage provider for Memstate based on PostgreSQL - Devrex Labs - https://github.com/devrexlabs/memstate - true - - - Memstate.Postgres - 0.2.0 - LGPL-3.0-or-later - Devrex Labs - https://github.com/devrexlabs/memstate - Storage provider for Memstate based on PostgreSQL - Memstate.Postgres - See https://github.com/DevrexLabs/memstate/releases - snupkg - - - - - - - - - - - - - - - - - diff --git a/src/Memstate.Postgresql/PostgresEngineBuilder.cs b/src/Memstate.Postgresql/PostgresEngineBuilder.cs deleted file mode 100644 index c01385f..0000000 --- a/src/Memstate.Postgresql/PostgresEngineBuilder.cs +++ /dev/null @@ -1,37 +0,0 @@ -namespace Memstate.Postgresql -{ - public class PostgresEngineBuilder : IEngineBuilder - { - private readonly Config _config; - - private readonly PostgresJournalReader _commandStore; - - public PostgresEngineBuilder(Config config) - : this(config, new PostgresJournalReader(config)) - { - } - - public PostgresEngineBuilder(Config config, PostgresJournalReader commandStore) - { - _config = config; - _commandStore = commandStore; - } - - public Engine Build() where T : class, new() - { - return Build(new T()); - } - - public Engine Build(T initialModel) where T : class - { - var loader = new ModelLoader(); - - var model = loader.Load(_commandStore, initialModel); - - // TODO: Figure out if next record should be 0 or calculated. - var engine = new Engine(_config, model, _commandStore, _commandStore, 0); - - return engine; - } - } -} \ No newline at end of file diff --git a/src/Memstate.Postgresql/PostgresJournalReader.cs b/src/Memstate.Postgresql/PostgresJournalReader.cs deleted file mode 100644 index c19cce8..0000000 --- a/src/Memstate.Postgresql/PostgresJournalReader.cs +++ /dev/null @@ -1,91 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Data; -using Npgsql; - -namespace Memstate.Postgres -{ - using System.Threading.Tasks; - using Memstate.Configuration; - - public class PostgresJournalReader : IJournalReader - { - private const string SelectSql = @"SELECT id, written, command FROM {0} - WHERE id >= @id - ORDER BY id ASC"; - - private readonly ISerializer _serializer; - - private readonly PostgresSettings _settings; - - public PostgresJournalReader(PostgresSettings settings) - { - Ensure.NotNull(settings, nameof(settings)); - - _settings = settings; - _serializer = Config.Current.CreateSerializer(); - } - - public IEnumerable GetRecords(long fromRecord = 0) - { - using (var connection = OpenConnection()) - { - do - { - using (var command = connection.CreateCommand()) - { - var recordsRead = 0; - - command.CommandText = string.Format(SelectSql, _settings.Table); - - command.Parameters.AddWithValue("id", fromRecord); - - using (var reader = command.ExecuteReader()) - { - while (reader.Read()) - { - recordsRead++; - - var record = ReadRecord(reader); - - fromRecord++; - - yield return record; - } - } - - if (recordsRead == 0) - { - break; - } - } - } - while (true); - } - } - - public Task DisposeAsync() - { - return Task.CompletedTask; - } - - private JournalRecord ReadRecord(IDataRecord reader) - { - var recordNumber = (long) reader[0]; - var written = (DateTime) reader[1]; - var commandData = Convert.FromBase64String((string) reader[2]); - var command = (Command) _serializer.Deserialize(commandData); - - return new JournalRecord(recordNumber, written, command); - } - - private NpgsqlConnection OpenConnection() - { - var connection = new NpgsqlConnection(_settings.ConnectionString); - - connection.Open(); - - return connection; - } - } -} \ No newline at end of file diff --git a/src/Memstate.Postgresql/PostgresJournalSubscription.cs b/src/Memstate.Postgresql/PostgresJournalSubscription.cs deleted file mode 100644 index 922291d..0000000 --- a/src/Memstate.Postgresql/PostgresJournalSubscription.cs +++ /dev/null @@ -1,139 +0,0 @@ -using System; -using System.Threading; -using Npgsql; - -namespace Memstate.Postgres -{ - public class PostgresJournalSubscription : IJournalSubscription - { - private readonly AutoResetEvent _readWaiter = new AutoResetEvent(false); - - private readonly Thread _listenerThread; - - private readonly Thread _readerThread; - - private readonly PostgresSettings _settings; - - private readonly Action _handler; - - private readonly PostgresJournalReader _journalReader; - - private bool _ready; - - private volatile bool _disposed; - - /// - /// Id of the next record to read - /// - private long _nextRecordId; - - public PostgresJournalSubscription(PostgresSettings settings, Action handler, long nextRecordId) - { - Ensure.NotNull(settings, nameof(settings)); - - _settings = settings; - _handler = handler; - _nextRecordId = nextRecordId; - - _journalReader = new PostgresJournalReader(settings); - - _listenerThread = new Thread(Listen) - { - Name = "Memstate:PostgresProviderType:NotificationsListener" - }; - - _readerThread = new Thread(Reader) - { - Name = "Memstate:PostgresProviderType:Reader" - }; - } - - - public void Start() - { - _readerThread.Start(); - _listenerThread.Start(); - - while (!Ready()) - { - Thread.Sleep(0); - } - } - - public void Dispose() - { - if (_disposed) - { - return; - } - - _disposed = true; - - _readWaiter.Set(); - - _readerThread.Join(TimeSpan.FromSeconds(10).Milliseconds); - _listenerThread.Join(TimeSpan.FromSeconds(10).Milliseconds); - } - - public bool Ready() - { - return _ready; - } - - private void Listen() - { - using (var connection = OpenConnection()) - { - connection.Notification += HandleNotification; - - SendListenCommand(connection); - - while (!_disposed) - { - connection.Wait(TimeSpan.FromSeconds(10)); - } - } - } - - private void Reader() - { - while (!_disposed) - { - foreach (var record in _journalReader.GetRecords(_nextRecordId)) - { - _nextRecordId++; - - _handler.Invoke(record); - } - - _ready = true; - - _readWaiter.WaitOne(TimeSpan.FromSeconds(1)); - } - } - - private void HandleNotification(object sender, NpgsqlNotificationEventArgs arguments) - { - _readWaiter.Set(); - } - - private void SendListenCommand(NpgsqlConnection connection) - { - using (var command = connection.CreateCommand()) - { - command.CommandText = $"LISTEN {_settings.SubscriptionStream};"; - - command.ExecuteNonQuery(); - } - } - - private NpgsqlConnection OpenConnection() - { - var connection = new NpgsqlConnection(_settings.ConnectionString); - - connection.Open(); - - return connection; - } - } -} \ No newline at end of file diff --git a/src/Memstate.Postgresql/PostgresJournalWriter.cs b/src/Memstate.Postgresql/PostgresJournalWriter.cs deleted file mode 100644 index 484efc4..0000000 --- a/src/Memstate.Postgresql/PostgresJournalWriter.cs +++ /dev/null @@ -1,56 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using Npgsql; -using Memstate.Logging; - -namespace Memstate.Postgres -{ - - public class PostgresJournalWriter : BatchingJournalWriter - { - private const string InsertSql = @"INSERT INTO {0} (""command"") VALUES {1};"; - - private readonly ILog _logger; - - private readonly ISerializer _serializer; - - private readonly PostgresSettings _settings; - - public PostgresJournalWriter(ISerializer serializer, PostgresSettings settings) - { - Ensure.NotNull(serializer, nameof(serializer)); - Ensure.NotNull(settings, nameof(settings)); - _settings = settings; - _serializer = serializer; - _logger = LogProvider.GetCurrentClassLogger(); - } - - protected override void OnCommandBatch(IEnumerable commands) - { - using (var connection = new NpgsqlConnection(_settings.ConnectionString)) - { - connection.Open(); - - commands = commands.ToList(); - - var count = commands.Count(); - - _logger.Debug($"OnCommandBatch received {count} commands"); - - var values = string.Join(",", Enumerable.Range(0, count).Select(i => $"(@{i})")); - - using (var sqlCommand = connection.CreateCommand()) - { - sqlCommand.CommandText = string.Format(InsertSql, _settings.Table, values); - - commands.Select((c, i) => new {Index = i, Value = Convert.ToBase64String(_serializer.Serialize(c))}) - .ToList() - .ForEach(item => sqlCommand.Parameters.AddWithValue($"@{item.Index}", item.Value)); - - sqlCommand.ExecuteNonQuery(); - } - } - } - } -} \ No newline at end of file diff --git a/src/Memstate.Postgresql/PostgresProvider.cs b/src/Memstate.Postgresql/PostgresProvider.cs deleted file mode 100644 index 4d492ec..0000000 --- a/src/Memstate.Postgresql/PostgresProvider.cs +++ /dev/null @@ -1,71 +0,0 @@ -using Memstate.Logging; -using System.Threading.Tasks; -using Npgsql; -using Memstate.Configuration; -using Npgsql.Logging; -using System.Diagnostics; - -namespace Memstate.Postgres -{ - public class PostgresProvider : StorageProvider - { - private readonly ILog _log; - private bool _initialized; - - public PostgresProvider() - { - _log = LogProvider.GetCurrentClassLogger(); - Settings = Config.Current.GetSettings(); - } - - static PostgresProvider() - { - EnableNpgsqlDebugLogging(); - } - - [Conditional("PGTRACE")] - private static void EnableNpgsqlDebugLogging() - { - NpgsqlLogManager.Provider = new ConsoleLoggingProvider(NpgsqlLogLevel.Trace, true, true); - NpgsqlLogManager.IsParameterLoggingEnabled = true; - } - - public PostgresSettings Settings { get; } - - public override void Initialize() - { - if (_initialized) return; - _log.Debug("Initializing..."); - - var sql = Settings.InitSql.Value; - - using (var connection = new NpgsqlConnection(Settings.ConnectionString)) - using (var command = connection.CreateCommand()) - { - connection.Open(); - command.CommandText = string.Format(sql, Settings.SubscriptionStream, Settings.Table); - command.ExecuteNonQuery(); - } - _initialized = true; - } - - public override IJournalReader CreateJournalReader() - { - return new PostgresJournalReader(Settings); - } - - public override IJournalWriter CreateJournalWriter(long nextRecordNumber) - { - // todo: nextRecordNumber unused - var serializer = Config.Current.CreateSerializer(); - return new PostgresJournalWriter(serializer, Settings); - } - - public override IJournalSubscriptionSource CreateJournalSubscriptionSource() - { - return new PostgresSubscriptionSource(Settings); - } - - public Task DisposeAsync() => Task.CompletedTask; - } -} \ No newline at end of file diff --git a/src/Memstate.Postgresql/PostgresSettings.cs b/src/Memstate.Postgresql/PostgresSettings.cs deleted file mode 100644 index 79031b3..0000000 --- a/src/Memstate.Postgresql/PostgresSettings.cs +++ /dev/null @@ -1,95 +0,0 @@ -using System; -using System.IO; -using System.Reflection; -using Memstate.Configuration; -using Npgsql; - -namespace Memstate.Postgres -{ - public class PostgresSettings : Settings - { - public override string Key { get; } = "Memstate:Postgres"; - - public const string DefaultConnectionString = "Host=localhost;Database=postgres;User ID=postgres;Password=postgres;"; - - public const string InitSqlResourceName = "Memstate.Postgres.init_sql"; - - - private string _connectionStringTemplate = DefaultConnectionString; - - private readonly EngineSettings _memstateSettings; - - public PostgresSettings() - { - _memstateSettings = Config.Current.GetSettings(); - } - - /// - /// Password to connect to the database, overrides value in ConnectionString if set - /// - public string Password { get; set; } - - /// - /// Username to connect to the database, overrides value in ConnectionString if set - /// - public string Username { get; set; } - - /// - /// Name of the database, overrides value in ConnectionString if set - /// - public string Database { get; set; } - - /// - /// Name of host to connect to, overrides value in ConnectionString if set - /// - public string Host { get; set; } - - public string ConnectionString - { - get - { - var builder = new NpgsqlConnectionStringBuilder(_connectionStringTemplate); - if (Host != null) builder.Host = Host; - if (Username != null) builder.Username = Username; - if (Database != null) builder.Database = Database; - if (Password != null) builder.Password = Password; - return builder.ToString(); - } - set => _connectionStringTemplate = value; - } - - public string TableSuffix { get; set; } = "_commands"; - - public string SubscriptionStreamSuffix { get; set; } = "_notifications"; - - public string Table => _memstateSettings.StreamName + TableSuffix; - - public string SubscriptionStream => _memstateSettings.StreamName + SubscriptionStreamSuffix; - - /// - /// Number of records to read per SELECT statement - /// - /// The size of the read batch. - public int ReadBatchSize { get; set; } = 1024; - - public Lazy InitSql => new Lazy(() => GetEmbeddedResource(InitSqlResourceName)); - - public override void Validate() - { - Ensure.NotNullOrEmpty(ConnectionString, nameof(ConnectionString)); - Ensure.NotNullOrEmpty(Table, nameof(Table)); - Ensure.NotNullOrEmpty(SubscriptionStream, nameof(SubscriptionStream)); - } - - private string GetEmbeddedResource(string resourceName) - { - var asm = Assembly.GetExecutingAssembly(); - - using (var stream = asm.GetManifestResourceStream(resourceName)) - using (var reader = new StreamReader(stream)) - { - return reader.ReadToEnd(); - } - } - } -} \ No newline at end of file diff --git a/src/Memstate.Postgresql/PostgresSubscriptionSource.cs b/src/Memstate.Postgresql/PostgresSubscriptionSource.cs deleted file mode 100644 index 73f18b9..0000000 --- a/src/Memstate.Postgresql/PostgresSubscriptionSource.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System; - -namespace Memstate.Postgres -{ - public class PostgresSubscriptionSource : IJournalSubscriptionSource - { - private readonly PostgresSettings _settings; - - public PostgresSubscriptionSource(PostgresSettings settings) - { - Ensure.NotNull(settings, nameof(settings)); - _settings = settings; - } - - public IJournalSubscription Subscribe(long from, Action handler) - { - var subscription = new PostgresJournalSubscription(_settings, handler, from); - subscription.Start(); - return subscription; - } - } -} \ No newline at end of file diff --git a/src/Memstate.Postgresql/init_sql b/src/Memstate.Postgresql/init_sql deleted file mode 100644 index fecf7bb..0000000 --- a/src/Memstate.Postgresql/init_sql +++ /dev/null @@ -1,23 +0,0 @@ -CREATE OR REPLACE FUNCTION {1}_notify_command() RETURNS TRIGGER AS $$ - BEGIN - PERFORM pg_notify('{0}', 'Data available'); - - RETURN NULL; - END; -$$ LANGUAGE plpgsql; - -CREATE TABLE IF NOT EXISTS "{1}" ( - "id" BIGINT NOT NULL PRIMARY KEY, - "written" TIMESTAMP WITHOUT TIME ZONE DEFAULT (NOW() AT TIME ZONE 'utc'), - "command" VARCHAR NOT NULL -); - -CREATE SEQUENCE IF NOT EXISTS "{1}_id_seq" MINVALUE 0 OWNED BY "{1}"."id"; - -ALTER TABLE "{1}" ALTER "id" SET DEFAULT nextval('{1}_id_seq'); - -DROP TRIGGER IF EXISTS "{1}_notify_command" ON "{1}"; - -CREATE TRIGGER "{1}_notify_command" - AFTER INSERT ON "{1}" - FOR EACH STATEMENT EXECUTE PROCEDURE {1}_notify_command(); diff --git a/src/Memstate.Pravega/Memstate.Pravega.csproj b/src/Memstate.Pravega/Memstate.Pravega.csproj deleted file mode 100644 index e8cd2f4..0000000 --- a/src/Memstate.Pravega/Memstate.Pravega.csproj +++ /dev/null @@ -1,15 +0,0 @@ - - - - netstandard2.1 - snupkg - - - - - - - - - - diff --git a/src/Memstate.Pravega/PravegaJournalReader.cs b/src/Memstate.Pravega/PravegaJournalReader.cs deleted file mode 100644 index f67d842..0000000 --- a/src/Memstate.Pravega/PravegaJournalReader.cs +++ /dev/null @@ -1,79 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using PravegaClient = PravegaGateway.PravegaGatewayClient; - -namespace Memstate.Pravega -{ - public class PravegaJournalReader : IJournalReader - { - private readonly CancellationToken _cancellationToken; - private readonly CancellationTokenSource _cts; - - private readonly PravegaClient _client; - private readonly ISerializer _serializer; - - private readonly string _scope; - private readonly string _stream = "mystream"; - - private Action _lastEventReadHandler; - - public PravegaJournalReader(PravegaClient client, ISerializer serializer, string scope, string stream, Action lastEventRead) - { - _client = client; - _cts = new CancellationTokenSource(); - _cancellationToken = _cts.Token; - _serializer = serializer; - _scope = scope; - _stream = stream; - _lastEventReadHandler = lastEventRead; - } - - public Task DisposeAsync() - { - _cts.Cancel(); - _cts.Token.WaitHandle.WaitOne(); - return Task.CompletedTask; - } - - public IEnumerable GetRecords(long fromRecord = 0) - { - - var endStreamCut = GetEndStreamCut(); - _lastEventReadHandler.Invoke(endStreamCut); - - var request = new ReadEventsRequest - { - Scope = _scope, - Stream = _stream, - ToStreamCut = endStreamCut - }; - - var recordNumber = 0; - using var call = _client.ReadEvents(request, cancellationToken: _cancellationToken); - - var responseStream = call.ResponseStream; - while (responseStream.MoveNext(_cancellationToken).GetAwaiter().GetResult()) - { - if (recordNumber <= fromRecord) continue; - var @event = responseStream.Current; - var bytes = @event.Event.ToByteArray(); - var savedRecord = (JournalRecord) _serializer.Deserialize(bytes); - var record = new JournalRecord(recordNumber++, savedRecord.Written, savedRecord.Command); - yield return record; - } - } - - /// - /// Get a reference to the current end of the stream - /// - /// - private StreamCut GetEndStreamCut() - { - var request = new GetStreamInfoRequest { Scope = _scope, Stream = _stream }; - var response = _client.GetStreamInfo(request); - return response.TailStreamCut; - } - } -} \ No newline at end of file diff --git a/src/Memstate.Pravega/PravegaJournalSubscription.cs b/src/Memstate.Pravega/PravegaJournalSubscription.cs deleted file mode 100644 index 03fdd17..0000000 --- a/src/Memstate.Pravega/PravegaJournalSubscription.cs +++ /dev/null @@ -1,64 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Grpc.Core; -using Memstate.Configuration; - -namespace Memstate.Pravega -{ - public class PravegaJournalSubscription : IJournalSubscription - { - private readonly Action _handler; - private readonly IAsyncStreamReader _reader; - private readonly ISerializer _serializer; - private Task _task; - private readonly CancellationTokenSource _cts; - - public PravegaJournalSubscription(Action handler, IAsyncStreamReader reader) - { - _handler = handler; - _reader = reader; - _serializer = Config.Current.CreateSerializer(); - _cts = new CancellationTokenSource(); - } - - public void Start() => _task = Run(); - private async Task Run() - { - var recordNumber = 0; - while (await _reader.MoveNext(_cts.Token)) - { - var eventsResponse = _reader.Current; - var bytes = eventsResponse.Event.ToByteArray(); - var savedRecord = (JournalRecord) _serializer.Deserialize(bytes); - var record = new JournalRecord(recordNumber++, savedRecord.Written, savedRecord.Command); - _handler.Invoke(record); - } - - } - - public void Dispose() - { - Console.WriteLine("Terminating subscription"); - _cts.Cancel(); - _task.GetAwaiter().GetResult(); - Console.WriteLine("Subscription terminated"); - } - - //TODO: figure out how to know - public bool Ready() => true; - } - - public static class ConfigExtensions - { - public static Config UsePravega(this Config config) - { - config.SerializerName = Serializers.Wire; - config.StorageProviderName = StorageProviders.Pravega; - var provider = new PravegaProvider(); - provider.Initialize(); - config.Container.Register(provider); - return config; - } - } -} \ No newline at end of file diff --git a/src/Memstate.Pravega/PravegaJournalWriter.cs b/src/Memstate.Pravega/PravegaJournalWriter.cs deleted file mode 100644 index 6fc9959..0000000 --- a/src/Memstate.Pravega/PravegaJournalWriter.cs +++ /dev/null @@ -1,53 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using Google.Protobuf; -using Grpc.Core; - -namespace Memstate.Pravega -{ - public class PravegaJournalWriter : BatchingJournalWriter - { - private readonly PravegaGateway.PravegaGatewayClient _client; - private readonly ISerializer _serializer; - private readonly AsyncClientStreamingCall _writer; - private readonly string _scope; - private readonly string _stream; - - public PravegaJournalWriter(PravegaGateway.PravegaGatewayClient client, ISerializer serializer, string scope, string stream) - { - _serializer = serializer; - _client = client; - _scope = scope; - _stream = stream; - _writer = _client.WriteEvents(); - } - - public override async Task DisposeAsync() - { - await base.DisposeAsync(); - _writer.Dispose(); - } - - - protected override void OnCommandBatch(IEnumerable commands) - { - foreach(var command in commands) - { - var record = new JournalRecord(0, DateTimeOffset.Now, command); - var bytes = _serializer.Serialize(record); - - var request = new WriteEventsRequest - { - Event = ByteString.CopyFrom(bytes), - Stream = _stream, - Scope = _scope - }; - - _writer.RequestStream.WriteAsync(request).GetAwaiter().GetResult(); - } - - _writer.RequestStream.CompleteAsync().GetAwaiter().GetResult(); - } - } -} \ No newline at end of file diff --git a/src/Memstate.Pravega/PravegaProvider.cs b/src/Memstate.Pravega/PravegaProvider.cs deleted file mode 100644 index 5913645..0000000 --- a/src/Memstate.Pravega/PravegaProvider.cs +++ /dev/null @@ -1,69 +0,0 @@ -using System; -using Grpc.Net.Client; -using Memstate.Configuration; - -namespace Memstate.Pravega -{ - public class PravegaProvider : StorageProvider - { - private readonly PravegaGateway.PravegaGatewayClient _client; - - private string _scope; - private string _stream; - - public override void Initialize() - { - var config = Config.Current; - var settings = config.GetSettings(); - var streamName = settings.StreamName; - - if (!streamName.Contains("/")) streamName += "/stream"; - - var parts = streamName.Split("/"); - if (parts.Length != 2) throw new ArgumentException("Bad scope/stream: " + settings.StreamName); - (_scope, _stream) = (parts[0], parts[1]); - - var request = new CreateScopeRequest { Scope = _scope }; - _client.CreateScope(request); - - _client.CreateStream(new CreateStreamRequest - { - Scope = _scope, - Stream = _stream, - ScalingPolicy = new ScalingPolicy - { - MinNumSegments = 1, - ScaleType = ScalingPolicy.Types.ScalingPolicyType.FixedNumSegments, - } - }); - } - - public override IJournalReader CreateJournalReader() - { - var serializer = Config.Current.CreateSerializer(); - return new PravegaJournalReader(_client, serializer, _scope, _stream, null); - } - - public override IJournalWriter CreateJournalWriter(long nextRecordNumber) - { - var serializer = Config.Current.CreateSerializer(); - return new PravegaJournalWriter(_client, serializer, _scope, _stream); - } - - public override IJournalSubscriptionSource CreateJournalSubscriptionSource() - { - return new PravegaSubscriptionSource(_client); - } - - public PravegaProvider() - { - // https://github.com/grpc/grpc-java/issues/6193#issuecomment-537745226 - // https://docs.microsoft.com/en-us/aspnet/core/grpc/troubleshoot?view=aspnetcore-3.0#call-insecure-grpc-services-with-net-core-client - AppContext.SetSwitch( - "System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); - - var channel = GrpcChannel.ForAddress("http://127.0.0.1:54672"); - _client = new PravegaGateway.PravegaGatewayClient(channel); - } - } -} \ No newline at end of file diff --git a/src/Memstate.Pravega/PravegaSubscriptionSource.cs b/src/Memstate.Pravega/PravegaSubscriptionSource.cs deleted file mode 100644 index b59bf1d..0000000 --- a/src/Memstate.Pravega/PravegaSubscriptionSource.cs +++ /dev/null @@ -1,27 +0,0 @@ -using System; -using Memstate.Configuration; - -namespace Memstate.Pravega -{ - public class PravegaSubscriptionSource : IJournalSubscriptionSource - { - private readonly PravegaGateway.PravegaGatewayClient _client; - - public PravegaSubscriptionSource(PravegaGateway.PravegaGatewayClient client) - { - _client = client; - } - - public IJournalSubscription Subscribe(long @from, Action handler) - { - var request = new ReadEventsRequest(); - request.Scope = Config.Current.GetSettings().StreamName; - request.Stream = "mystream"; - var response = _client.ReadEvents(request); - var streamReader = response.ResponseStream; - var sub = new PravegaJournalSubscription(handler, streamReader); - sub.Start(); - return sub; - } - } -} \ No newline at end of file diff --git a/src/Memstate.Runner/Memstate.Runner.csproj b/src/Memstate.Runner/Memstate.Runner.csproj index 3a280fb..77b3372 100644 --- a/src/Memstate.Runner/Memstate.Runner.csproj +++ b/src/Memstate.Runner/Memstate.Runner.csproj @@ -2,7 +2,7 @@ Exe - netcoreapp3.1 + net6.0 LGPL-3.0-or-later @@ -14,6 +14,6 @@ - + diff --git a/src/Memstate.SqlStreamStore/ConfigExtensions.cs b/src/Memstate.SqlStreamStore/ConfigExtensions.cs deleted file mode 100644 index dea9286..0000000 --- a/src/Memstate.SqlStreamStore/ConfigExtensions.cs +++ /dev/null @@ -1,33 +0,0 @@ -using System.Text; -using Memstate.Configuration; -using Npgsql; -using SqlStreamStore; - -namespace Memstate.SqlStreamStore -{ - public static class ConfigExtensions - { - /// - /// - /// - /// - /// - /// - /// There are two reader implementations. The subscriptionBasedReader - /// will in some cases have a better performance, the default is false - /// The number of to fetch - /// - public static Config UseSqlStreamStore(this Config config, - IStreamStore streamStore = null, - bool useSubscriptionBasedReader = false, - int maxRecordsPerRead = 100) - { - config.Data["SqlStreamStore.UseSubscriptionBasedReader"] = useSubscriptionBasedReader.ToString(); - config.Data["SqlStreamStore.MaxRecordsPerRead"] = maxRecordsPerRead.ToString(); - - config.StorageProviderName = StorageProviders.SqlStreamStore; - config.Container.Register(streamStore); - return config; - } - } -} \ No newline at end of file diff --git a/src/Memstate.SqlStreamStore/Memstate.SqlStreamStore.csproj b/src/Memstate.SqlStreamStore/Memstate.SqlStreamStore.csproj deleted file mode 100644 index c916938..0000000 --- a/src/Memstate.SqlStreamStore/Memstate.SqlStreamStore.csproj +++ /dev/null @@ -1,27 +0,0 @@ - - - - netstandard2.0 - true - 0.2 - Devrex Labs - snupkg - Storage provider based on SqlStreamStore allowing the use of any sql backend supported by SqlStreamStore. MsSql, PostgreSQL - Devrex Labs - -Devrex Labs - Storage provider for Memstate based on SqlStreamStore targeting any sql database supported by SqlStreamStore - Memstate.SqlStreamStore - true - - - - - - - - - - - - diff --git a/src/Memstate.SqlStreamStore/SqlSteamStoreSubscriptionJournalReader.cs b/src/Memstate.SqlStreamStore/SqlSteamStoreSubscriptionJournalReader.cs deleted file mode 100644 index b47fa07..0000000 --- a/src/Memstate.SqlStreamStore/SqlSteamStoreSubscriptionJournalReader.cs +++ /dev/null @@ -1,86 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Memstate.Logging; -using SqlStreamStore; -using SqlStreamStore.Streams; -using SqlStreamStore.Subscriptions; - -namespace Memstate.SqlStreamStore -{ - public class SqlSteamStoreSubscriptionJournalReader : IJournalReader - { - private readonly IStreamStore _streamStore; - private readonly StreamId _streamId; - private readonly ISerializer _serializer; - - private readonly ILog _log; - - public SqlSteamStoreSubscriptionJournalReader(IStreamStore streamStore, StreamId streamId, ISerializer serializer) - { - _serializer = serializer; - _streamId = streamId; - _streamStore = streamStore; - _log = LogProvider.For(); - } - - public Task DisposeAsync() - { - return Task.CompletedTask; - } - - public IEnumerable GetRecords(long fromRecord = 0) - { - using (var queue = new BlockingCollection()) - { - async Task MessageReceived(IStreamSubscription subscription, StreamMessage message, - CancellationToken cancellationToken) - { - var json = await message.GetJsonData(cancellationToken); - var command = (Command)_serializer.FromString(json); - var journalRecord = new JournalRecord(message.StreamVersion, message.CreatedUtc, command); - queue.Add(journalRecord); - } - - // pass null to subscribe from the beginning - //or the version of the previous record - int? version = null; - if (fromRecord > 0) version = (int)fromRecord - 1; - - var caughtUp = false; - - using ( - var sub = _streamStore.SubscribeToStream( - _streamId, - version, - MessageReceived, - SubscriptionDropped, - hasCaughtUp => caughtUp = hasCaughtUp)) - { - sub.MaxCountPerRead = 100; - - while (!caughtUp || queue.Any()) - { - if (queue.TryTake(out var journalRecord)) - yield return journalRecord; - else if (!caughtUp) - Thread.Sleep(100); - } - } - } - } - - private void SubscriptionDropped(IStreamSubscription subscription, SubscriptionDroppedReason reason, - Exception ex) - { - if (reason != SubscriptionDroppedReason.Disposed) - { - _log.FatalException("Subscription dropped", ex); - Environment.Exit(1); - } - } - } -} diff --git a/src/Memstate.SqlStreamStore/SqlStreamStoreJournalReader.cs b/src/Memstate.SqlStreamStore/SqlStreamStoreJournalReader.cs deleted file mode 100644 index 24551e4..0000000 --- a/src/Memstate.SqlStreamStore/SqlStreamStoreJournalReader.cs +++ /dev/null @@ -1,55 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using SqlStreamStore; -using SqlStreamStore.Streams; - -namespace Memstate.SqlStreamStore -{ - public class SqlStreamStoreJournalReader : IJournalReader - { - private readonly IStreamStore _streamStore; - private readonly StreamId _streamId; - private readonly ISerializer _serializer; - - public SqlStreamStoreJournalReader( - IStreamStore streamStore, - StreamId streamId, - ISerializer serializer) - { - _streamStore = streamStore; - _streamId = streamId; - _serializer = serializer; - } - - public Task DisposeAsync() - { - return Task.CompletedTask; - } - - public IEnumerable GetRecords(long fromRecord = 0) - { - var pageSize = 200; - - while (true) - { - var page = _streamStore.ReadStreamForwards( - _streamId, (int) fromRecord, pageSize).Result; - foreach (var message in page.Messages) - yield return RecordFromStreamMessage(message); - if (page.IsEnd) break; - fromRecord += page.Messages.Length; - } - } - - private JournalRecord RecordFromStreamMessage(StreamMessage streamMessage) - { - var commandString = streamMessage.GetJsonData().Result; - var command = (Command) _serializer.FromString(commandString); - return new JournalRecord( - streamMessage.StreamVersion, - streamMessage.CreatedUtc, - command); - } - } -} \ No newline at end of file diff --git a/src/Memstate.SqlStreamStore/SqlStreamStoreJournalWriter.cs b/src/Memstate.SqlStreamStore/SqlStreamStoreJournalWriter.cs deleted file mode 100644 index 6653d74..0000000 --- a/src/Memstate.SqlStreamStore/SqlStreamStoreJournalWriter.cs +++ /dev/null @@ -1,37 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using SqlStreamStore; -using SqlStreamStore.Streams; - -namespace Memstate.SqlStreamStore -{ - public class SqlStreamStoreJournalWriter : BatchingJournalWriter - { - private readonly IStreamStore _streamStore; - private readonly StreamId _streamId; - private readonly ISerializer _serializer; - - public SqlStreamStoreJournalWriter(IStreamStore streamStore, StreamId streamId, ISerializer serializer) - { - _streamStore = streamStore; - _streamId = streamId; - _serializer = serializer; - } - - protected override void OnCommandBatch(IEnumerable commands) - { - var messages = commands.Select(ToNewStreamMessage).ToArray(); - var result = _streamStore.AppendToStream(_streamId, ExpectedVersion.Any, messages ) - .GetAwaiter() - .GetResult(); - } - - private NewStreamMessage ToNewStreamMessage(Command command) - { - var commandAsString = _serializer.ToString(command); - var id = command.Id; - return new NewStreamMessage(id, command.GetType().Name, commandAsString); - } - } -} \ No newline at end of file diff --git a/src/Memstate.SqlStreamStore/SqlStreamStoreProvider.cs b/src/Memstate.SqlStreamStore/SqlStreamStoreProvider.cs deleted file mode 100644 index a6d3dbb..0000000 --- a/src/Memstate.SqlStreamStore/SqlStreamStoreProvider.cs +++ /dev/null @@ -1,90 +0,0 @@ -using System.Text; -using Memstate.Configuration; -using Npgsql; -using SqlStreamStore; -using SqlStreamStore.Streams; - -namespace Memstate.SqlStreamStore -{ - public class SqlStreamStoreProvider: StorageProvider - { - private readonly StreamId _streamId; - private readonly ISerializer _serializer; - private readonly IStreamStore _streamStore; - - private readonly bool UseSubscriptionBasedReader = false; - - public SqlStreamStoreProvider() : this(null) - { - - } - public SqlStreamStoreProvider(IStreamStore streamStore) - { - Config config = Config.Current; - _serializer = config.CreateSerializer(); - var settings = config.GetSettings(); - _streamId = new StreamId(settings.StreamName); - - if (streamStore == null) - { - if (!config.Container.TryResolve(out streamStore)) - streamStore = new InMemoryStreamStore(); - } - - _streamStore = streamStore; - } - - /// - public override IJournalReader CreateJournalReader() - { - return UseSubscriptionBasedReader ? (IJournalReader) - new SqlSteamStoreSubscriptionJournalReader( - _streamStore, - _streamId, - _serializer) : - new SqlStreamStoreJournalReader( - _streamStore, - _streamId, - _serializer); - } - - /// - public override IJournalWriter CreateJournalWriter(long nextRecordNumber) - { - return new SqlStreamStoreJournalWriter(_streamStore, _streamId, _serializer); - } - - /// - public override IJournalSubscriptionSource CreateJournalSubscriptionSource() - { - return new SqlStreamStoreSubscriptionSource(_streamStore, _streamId, _serializer); - } - - /// - /// Initialize a postgres database for use as a Memstate SqlStreamStore backend - /// You must use this method to initialize the database objects. - /// The normal SqlStreamStore schema uses the JSONB datatype - /// - /// A settings object with a valid connection string - public static void InitializePgSqlStreamStore(PostgresStreamStoreSettings settings) - { - var store = new PostgresStreamStore(settings); - var originalScript = store.GetSchemaCreationScript(); - - var sb = new StringBuilder("CREATE SCHEMA IF NOT EXISTS "); - sb.AppendLine(settings.Schema + ";"); - - sb.Append(originalScript); - var script = sb.ToString().Replace("JSONB", "JSON"); - - using (var connection = new NpgsqlConnection(settings.ConnectionString)) - { - connection.Open(); - var cmd = connection.CreateCommand(); - cmd.CommandText = script; - cmd.ExecuteNonQuery(); - connection.Close(); - } - } - } -} \ No newline at end of file diff --git a/src/Memstate.SqlStreamStore/SqlStreamStoreSubscriptionSource.cs b/src/Memstate.SqlStreamStore/SqlStreamStoreSubscriptionSource.cs deleted file mode 100644 index 88f87a6..0000000 --- a/src/Memstate.SqlStreamStore/SqlStreamStoreSubscriptionSource.cs +++ /dev/null @@ -1,84 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using SqlStreamStore; -using SqlStreamStore.Logging; -using SqlStreamStore.Streams; -using SqlStreamStore.Subscriptions; - -namespace Memstate.SqlStreamStore -{ - public class SqlStreamStoreSubscriptionSource - : IJournalSubscriptionSource - { - private class SqlStreamStoreSubscription : IJournalSubscription - { - private readonly Func _readyDelegate; - private readonly IStreamSubscription _subscription; - - public SqlStreamStoreSubscription(IStreamSubscription sub, Func readyDelegate) - { - _readyDelegate = readyDelegate; - _subscription = sub; - } - public void Dispose() - => _subscription.Dispose(); - - public bool Ready() - => _readyDelegate.Invoke(); - } - - private readonly IStreamStore _streamStore; - private readonly StreamId _streamId; - private readonly ISerializer _serializer; - - private readonly ILog _log; - - public SqlStreamStoreSubscriptionSource(IStreamStore streamStore, StreamId streamId, ISerializer serializer) - { - _serializer = serializer; - _streamId = streamId; - _streamStore = streamStore; - _log = LogProvider.For(); - } - - public IJournalSubscription Subscribe(long @from, Action handler) - { - async Task MessageReceived(IStreamSubscription subscription, StreamMessage message, - CancellationToken cancellationToken) - { - var command = (Command)_serializer.FromString(await message.GetJsonData()); - var journalRecord = new JournalRecord(message.StreamVersion, message.CreatedUtc, command); - handler.Invoke(journalRecord); - } - - // pass null to subscribe from the beginning - //or the version of the previous record - int? version = null; - if (from > 0) version = (int)from - 1; - - var caughtUp = false; - - var sub = _streamStore.SubscribeToStream( - _streamId, - version, - MessageReceived, - SubscriptionDropped, - hasCaughtUp => caughtUp = hasCaughtUp); - - sub.MaxCountPerRead = 100; - - return new SqlStreamStoreSubscription(sub, () => caughtUp); - } - - private void SubscriptionDropped(IStreamSubscription subscription, SubscriptionDroppedReason reason, - Exception ex) - { - if (reason != SubscriptionDroppedReason.Disposed) - { - _log.FatalException("Subscription dropped", ex); - Environment.Exit(1); - } - } - } -} \ No newline at end of file diff --git a/src/Memstate.Test/Memstate.Test.csproj b/src/Memstate.Test/Memstate.Test.csproj index 71e4a6a..ead016b 100644 --- a/src/Memstate.Test/Memstate.Test.csproj +++ b/src/Memstate.Test/Memstate.Test.csproj @@ -1,6 +1,6 @@  - netcoreapp3.0 + net6.0 false @@ -10,14 +10,13 @@ - - - - + + + + - diff --git a/src/Memstate.Test/SqlStreamStoreTests.cs b/src/Memstate.Test/SqlStreamStoreTests.cs deleted file mode 100644 index 345bd4e..0000000 --- a/src/Memstate.Test/SqlStreamStoreTests.cs +++ /dev/null @@ -1,79 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Memstate.Configuration; -using Memstate.Models.KeyValue; -using Memstate.SqlStreamStore; -using NUnit.Framework; -using SqlStreamStore; - -namespace Memstate.Test -{ - [TestFixture] - public class SqlStreamStoreTests - { - [Test] - public void ConfigUsingSqlStreamStoreDefault() - { - var config = Config.Current; - config.UseSqlStreamStore(); - var provider = config.GetStorageProvider(); - Assert.IsInstanceOf(provider); - } - - [Test] - public void ConfigUsingSqlStreamStore() - { - var config = Config.Current; - var streamStore = new InMemoryStreamStore(); - config.UseSqlStreamStore(streamStore); - - var resolvedStore = config.Container.Resolve(); - Assert.AreSame(streamStore, resolvedStore); - - var provider = config.GetStorageProvider(); - Assert.IsInstanceOf(provider); - } - - [Test] - public void ConfigSetStorageProvider() - { - var config = Config.Current; - var streamStore = new InMemoryStreamStore(); - var provider = new SqlStreamStoreProvider(streamStore); - - config.SetStorageProvider(provider); - var resolvedProvider = config.GetStorageProvider(); - - Assert.AreSame(provider, resolvedProvider); - } - - [Test, Ignore("Failing due to a concurrency bug")] - public async Task Smoke() - { - var config = Config.Current; - config.GetSettings().StreamName = "stream1"; - var streamStoreProvider = new SqlStreamStoreProvider(); - var writer = streamStoreProvider.CreateJournalWriter(0); - foreach(var i in Enumerable.Range(1,101)) - writer.Send(new Set("key" + i, i)); - await writer.DisposeAsync(); - - var reader = streamStoreProvider.CreateJournalReader(); - var records = new List(reader.GetRecords().ToArray()); - Assert.AreEqual(101, records.Count); - Assert.AreEqual("key1", ((Set) records[0].Command).Key); - - records.Clear(); - var sub = streamStoreProvider - .CreateJournalSubscriptionSource() - .Subscribe(0, jr => records.Add(jr)); - while (!sub.Ready()) await Task.Delay(TimeSpan.FromMilliseconds(50)); - Console.WriteLine("sub.Ready()"); - - Assert.AreEqual(101, records.Count); - Assert.AreEqual("key1", ((Set) records[0].Command).Key); - } - } -} \ No newline at end of file diff --git a/src/Memstate.sln b/src/Memstate.sln index 4bd0b5c..cba8372 100644 --- a/src/Memstate.sln +++ b/src/Memstate.sln @@ -7,8 +7,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Memstate.EventStore", "Mems EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Memstate.JsonNet", "Memstate.JsonNet\Memstate.JsonNet.csproj", "{5E6F8072-FFB2-44A2-B824-C1900C538F3F}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Memstate.Postgres", "Memstate.Postgresql\Memstate.Postgres.csproj", "{2C4D0AD4-D213-467F-A1E8-B48FE26A78FF}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Memstate.Host", "Memstate.Host\Memstate.Host.csproj", "{6FB2441B-9638-4591-96C0-F181E339BFFB}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Memstate.Test", "Memstate.Test\Memstate.Test.csproj", "{370CF2A7-3E4F-4314-BED9-043BF8555E2A}" @@ -17,8 +15,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Test", "Test", "{437BA763-4 EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "System.Test", "System.Test\System.Test.csproj", "{EFC78A26-AA12-45DD-815C-1D1BA0BE912F}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Memstate.Postgres.Test", "Memstate.Postgresql.Tests\Memstate.Postgres.Test.csproj", "{9EBD979B-00EE-472B-9D7F-94E4E72C5202}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Memstate.Benchmarks", "Memstate.Benchmarks\Memstate.Benchmarks.csproj", "{DC012469-986E-4953-9E95-BCCF7CD3745A}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Docs", "Docs", "{C51C66C9-3784-4C06-BAC5-EEE4B6684B69}" @@ -39,16 +35,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Memstate.Runner", "Memstate EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Memstate.All", "Memstate.All\Memstate.All.csproj", "{2A191739-1DD8-4CE8-8C63-F3AE94D3D0AC}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Memstate.SqlStreamStore", "Memstate.SqlStreamStore\Memstate.SqlStreamStore.csproj", "{734C5486-06BC-4E7C-9B53-D7F3A7DEBA37}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Trello.Core", "Trello.Core\Trello.Core.csproj", "{6BA8DDF2-098D-449A-B818-B8E7E098775C}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Trello.Test", "Trello.Test\Trello.Test.csproj", "{E769EAA1-54BC-4717-91B6-0CF36019D26A}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Trello.Web", "Trello.Web\Trello.Web.csproj", "{ABDF807A-F0F2-4A54-A43E-DAA4F2A059C3}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Memstate.Pravega", "Memstate.Pravega\Memstate.Pravega.csproj", "{EA906A46-9A51-4D92-91B3-50DB338EDE38}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -73,14 +65,6 @@ Global {5E6F8072-FFB2-44A2-B824-C1900C538F3F}.Release|Any CPU.Build.0 = Release|Any CPU {5E6F8072-FFB2-44A2-B824-C1900C538F3F}.Release|x64.ActiveCfg = Release|Any CPU {5E6F8072-FFB2-44A2-B824-C1900C538F3F}.Release|x64.Build.0 = Release|Any CPU - {2C4D0AD4-D213-467F-A1E8-B48FE26A78FF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {2C4D0AD4-D213-467F-A1E8-B48FE26A78FF}.Debug|Any CPU.Build.0 = Debug|Any CPU - {2C4D0AD4-D213-467F-A1E8-B48FE26A78FF}.Debug|x64.ActiveCfg = Debug|Any CPU - {2C4D0AD4-D213-467F-A1E8-B48FE26A78FF}.Debug|x64.Build.0 = Debug|Any CPU - {2C4D0AD4-D213-467F-A1E8-B48FE26A78FF}.Release|Any CPU.ActiveCfg = Release|Any CPU - {2C4D0AD4-D213-467F-A1E8-B48FE26A78FF}.Release|Any CPU.Build.0 = Release|Any CPU - {2C4D0AD4-D213-467F-A1E8-B48FE26A78FF}.Release|x64.ActiveCfg = Release|Any CPU - {2C4D0AD4-D213-467F-A1E8-B48FE26A78FF}.Release|x64.Build.0 = Release|Any CPU {6FB2441B-9638-4591-96C0-F181E339BFFB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {6FB2441B-9638-4591-96C0-F181E339BFFB}.Debug|Any CPU.Build.0 = Debug|Any CPU {6FB2441B-9638-4591-96C0-F181E339BFFB}.Debug|x64.ActiveCfg = Debug|Any CPU @@ -105,14 +89,6 @@ Global {EFC78A26-AA12-45DD-815C-1D1BA0BE912F}.Release|Any CPU.Build.0 = Release|Any CPU {EFC78A26-AA12-45DD-815C-1D1BA0BE912F}.Release|x64.ActiveCfg = Release|Any CPU {EFC78A26-AA12-45DD-815C-1D1BA0BE912F}.Release|x64.Build.0 = Release|Any CPU - {9EBD979B-00EE-472B-9D7F-94E4E72C5202}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {9EBD979B-00EE-472B-9D7F-94E4E72C5202}.Debug|Any CPU.Build.0 = Debug|Any CPU - {9EBD979B-00EE-472B-9D7F-94E4E72C5202}.Debug|x64.ActiveCfg = Debug|Any CPU - {9EBD979B-00EE-472B-9D7F-94E4E72C5202}.Debug|x64.Build.0 = Debug|Any CPU - {9EBD979B-00EE-472B-9D7F-94E4E72C5202}.Release|Any CPU.ActiveCfg = Release|Any CPU - {9EBD979B-00EE-472B-9D7F-94E4E72C5202}.Release|Any CPU.Build.0 = Release|Any CPU - {9EBD979B-00EE-472B-9D7F-94E4E72C5202}.Release|x64.ActiveCfg = Release|Any CPU - {9EBD979B-00EE-472B-9D7F-94E4E72C5202}.Release|x64.Build.0 = Release|Any CPU {DC012469-986E-4953-9E95-BCCF7CD3745A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {DC012469-986E-4953-9E95-BCCF7CD3745A}.Debug|Any CPU.Build.0 = Debug|Any CPU {DC012469-986E-4953-9E95-BCCF7CD3745A}.Debug|x64.ActiveCfg = Debug|Any CPU @@ -177,14 +153,6 @@ Global {2A191739-1DD8-4CE8-8C63-F3AE94D3D0AC}.Release|Any CPU.Build.0 = Release|Any CPU {2A191739-1DD8-4CE8-8C63-F3AE94D3D0AC}.Release|x64.ActiveCfg = Release|Any CPU {2A191739-1DD8-4CE8-8C63-F3AE94D3D0AC}.Release|x64.Build.0 = Release|Any CPU - {734C5486-06BC-4E7C-9B53-D7F3A7DEBA37}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {734C5486-06BC-4E7C-9B53-D7F3A7DEBA37}.Debug|Any CPU.Build.0 = Debug|Any CPU - {734C5486-06BC-4E7C-9B53-D7F3A7DEBA37}.Debug|x64.ActiveCfg = Debug|Any CPU - {734C5486-06BC-4E7C-9B53-D7F3A7DEBA37}.Debug|x64.Build.0 = Debug|Any CPU - {734C5486-06BC-4E7C-9B53-D7F3A7DEBA37}.Release|Any CPU.ActiveCfg = Release|Any CPU - {734C5486-06BC-4E7C-9B53-D7F3A7DEBA37}.Release|Any CPU.Build.0 = Release|Any CPU - {734C5486-06BC-4E7C-9B53-D7F3A7DEBA37}.Release|x64.ActiveCfg = Release|Any CPU - {734C5486-06BC-4E7C-9B53-D7F3A7DEBA37}.Release|x64.Build.0 = Release|Any CPU {6BA8DDF2-098D-449A-B818-B8E7E098775C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {6BA8DDF2-098D-449A-B818-B8E7E098775C}.Debug|Any CPU.Build.0 = Debug|Any CPU {6BA8DDF2-098D-449A-B818-B8E7E098775C}.Debug|x64.ActiveCfg = Debug|Any CPU @@ -209,14 +177,6 @@ Global {ABDF807A-F0F2-4A54-A43E-DAA4F2A059C3}.Release|Any CPU.Build.0 = Release|Any CPU {ABDF807A-F0F2-4A54-A43E-DAA4F2A059C3}.Release|x64.ActiveCfg = Release|Any CPU {ABDF807A-F0F2-4A54-A43E-DAA4F2A059C3}.Release|x64.Build.0 = Release|Any CPU - {EA906A46-9A51-4D92-91B3-50DB338EDE38}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {EA906A46-9A51-4D92-91B3-50DB338EDE38}.Debug|Any CPU.Build.0 = Debug|Any CPU - {EA906A46-9A51-4D92-91B3-50DB338EDE38}.Debug|x64.ActiveCfg = Debug|Any CPU - {EA906A46-9A51-4D92-91B3-50DB338EDE38}.Debug|x64.Build.0 = Debug|Any CPU - {EA906A46-9A51-4D92-91B3-50DB338EDE38}.Release|Any CPU.ActiveCfg = Release|Any CPU - {EA906A46-9A51-4D92-91B3-50DB338EDE38}.Release|Any CPU.Build.0 = Release|Any CPU - {EA906A46-9A51-4D92-91B3-50DB338EDE38}.Release|x64.ActiveCfg = Release|Any CPU - {EA906A46-9A51-4D92-91B3-50DB338EDE38}.Release|x64.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -224,7 +184,6 @@ Global GlobalSection(NestedProjects) = preSolution {370CF2A7-3E4F-4314-BED9-043BF8555E2A} = {437BA763-466E-46D7-9AB9-E5AA3B697CA0} {EFC78A26-AA12-45DD-815C-1D1BA0BE912F} = {437BA763-466E-46D7-9AB9-E5AA3B697CA0} - {9EBD979B-00EE-472B-9D7F-94E4E72C5202} = {437BA763-466E-46D7-9AB9-E5AA3B697CA0} {DC012469-986E-4953-9E95-BCCF7CD3745A} = {437BA763-466E-46D7-9AB9-E5AA3B697CA0} {1C144C3F-B32E-419C-B432-8050EF122401} = {C51C66C9-3784-4C06-BAC5-EEE4B6684B69} {6BA8DDF2-098D-449A-B818-B8E7E098775C} = {F9764AB9-8B67-4D1C-8CD8-CE87034B93CC} diff --git a/src/System.Test/SqlStreamStorePerformanceRepro.cs b/src/System.Test/SqlStreamStorePerformanceRepro.cs deleted file mode 100644 index 409b4a0..0000000 --- a/src/System.Test/SqlStreamStorePerformanceRepro.cs +++ /dev/null @@ -1,186 +0,0 @@ -using System.Data.Common; -using System.Data.SqlClient; -using System.Diagnostics; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Memstate; -using Memstate.Configuration; -using Memstate.SqlStreamStore; -using Npgsql; -using NUnit.Framework; -using SqlStreamStore; - -namespace System.Test -{ - [TestFixture, Ignore("Performance")] - public class SqlStreamStorePerformanceRepro - { - - private const int MessageSize = 400; - private const int NumMessages = 10000; - private const string PgsqlConnectionString - = "Host=localhost; Password=postgres; User ID=postgres; Database=postgres;"; - - private IJournalWriter _writer; - private SqlStreamStoreProvider _provider; - private String _streamName; - - private DbConnection _connection; - private IStreamStore _streamStore; - - private readonly Stopwatch _stopWatch = new Stopwatch(); - - [TearDown] - public async Task TearDown() - { - await _connection.DisposeAsync(); - _streamStore.Dispose(); - } - - [SetUp] - public void Setup() - { - SqlStreamStore.Logging.LogProvider.IsDisabled = true; - var config = Config.Current; - _streamName = "test-" + DateTime.Now.ToFileTimeUtc(); - config.GetSettings().StreamName = _streamName; - config.SerializerName = Serializers.NewtonsoftJson; - - //ConfigurePostgres(); - - ConfigureMssql2019(); - _provider = new SqlStreamStoreProvider(_streamStore); - _writer = _provider.CreateJournalWriter(0); - } - - private void ConfigurePostgres() - { - _connection = new NpgsqlConnection(PgsqlConnectionString); - var settings = new PostgresStreamStoreSettings(PgsqlConnectionString); - settings.Schema = "randy"; - SqlStreamStoreProvider.InitializePgSqlStreamStore(settings); - _streamStore = new PostgresStreamStore(settings); - } - - - private void ConfigureMssql2019() - { - CreateMsSqlDatabaseUnlessExists(); - var connectionString = "Server=localhost;Database=memstate;User Id=sa;Password=abc123ABC;"; - _connection = new SqlConnection(connectionString); - var settings = new MsSqlStreamStoreV3Settings(connectionString); - settings.Schema = "memstate"; - var store = new MsSqlStreamStoreV3(settings); - store.CreateSchemaIfNotExists().GetAwaiter().GetResult(); - _streamStore = store; - } - - private void CreateMsSqlDatabaseUnlessExists() - { - try - { - var connectionString = "Server=localhost;Database=master;User Id=sa;Password=abc123ABC;"; - _connection = new SqlConnection(connectionString); - _connection.Open(); - var cmd = _connection.CreateCommand(); - cmd.CommandText = "IF db_id('memstate') IS NULL CREATE DATABASE memstate"; - cmd.ExecuteNonQuery(); - } - finally - { - _connection.Close(); - } - } - - [Test] - public void DeserializationFailsWhenFirstJsonAttributeIsNotType() - { - Assert.Catch(() => - { - var json = - "{\"Id\": \"8d7693f9-9bfc-4c23-ac52-31fc22bd67f3\", \"$type\": \"System.Test.SqlStreamStorePerformanceRepro+MyCommand, System.Test\", \"Payload\": {\"$type\": \"System.Byte[], System.Private.CoreLib\", \"$value\": \"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==\"}}"; - var serializer = Config.Current.CreateSerializer(); - var command = (Command) serializer.FromString(json); - }); - } - - [Test] - public void CanDeserializeWithTypeFirst() - { - var json = "{\"$type\":\"System.Test.SqlStreamStorePerformanceRepro+MyCommand, System.Test\",\"Payload\":{\"$type\":\"System.Byte[], System.Private.CoreLib\",\"$value\":\"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==\"},\"Id\":\"e12c9b93-025d-4378-b370-24704acd134d\"}";; - var serializer = Config.Current.CreateSerializer(); - var command = (Command) serializer.FromString(json); - } - [Test] - public async Task WriteTruncateAndLoadUsingReader() - { - await WriteAndTruncateMessages(); - _stopWatch.Restart(); - var reader = _provider.CreateJournalReader(); - var records = reader.GetRecords().ToList(); - Console.WriteLine("Records read: " + records.Count); - Console.WriteLine("Read duration: " + _stopWatch.Elapsed); - await reader.DisposeAsync(); - } - - [Test] - public async Task WriteTruncateAndLoadUsingSubscription() - { - var messagesReceived = 0; - - await WriteAndTruncateMessages(); - _stopWatch.Restart(); - var sub = _provider.CreateJournalSubscriptionSource() - .Subscribe(0, jr => messagesReceived++); - while (!sub.Ready()) await Task.Delay(TimeSpan.FromMilliseconds(20)); - Console.WriteLine("Messages received: " + messagesReceived); - Console.WriteLine("Read with sub duration: " + _stopWatch.Elapsed); - sub.Dispose(); - } - - private async Task WriteAndTruncateMessages() - { - _stopWatch.Restart(); - await AppendMessages(NumMessages, MessageSize); - Console.WriteLine("Append duration:" + _stopWatch.Elapsed); - _stopWatch.Restart(); - //await DeleteMessages(); - } - private async Task DeleteMessages() - { - await _connection.OpenAsync(); - var cmd = _connection.CreateCommand(); - cmd.CommandText = "TRUNCATE TABLE messages"; - cmd.CommandTimeout = 1000 * 60 * 5; //no idea if this is 5 minutes or 5000 minutes - var result = await cmd.ExecuteNonQueryAsync(); - Console.WriteLine("Messages deleted: " + result); - } - - private async Task AppendMessages(int numMessages, int messageSize) - { - for (int i = 0; i < numMessages; i++) - { - _writer.Send(new MyCommand(messageSize)); - } - await _writer.DisposeAsync(); - Console.WriteLine("Append complete: " + DateTime.Now); - } - - public class MyCommand : Command - { - public byte[] Payload { get; set; } - - public MyCommand(int size) - { - Payload = new byte[size]; - } - - internal override object ExecuteImpl(object model) - { - //never called - throw new NotImplementedException(); - } - } - } -} \ No newline at end of file diff --git a/src/System.Test/System.Test.csproj b/src/System.Test/System.Test.csproj index 961230e..208ae9b 100644 --- a/src/System.Test/System.Test.csproj +++ b/src/System.Test/System.Test.csproj @@ -1,28 +1,25 @@  - netcoreapp3.1 + net6.0 false System.Test - - - + + + - - + + - + - - - diff --git a/src/System.Test/TestConfigurations.cs b/src/System.Test/TestConfigurations.cs index 91aa3b2..bc10c8d 100644 --- a/src/System.Test/TestConfigurations.cs +++ b/src/System.Test/TestConfigurations.cs @@ -43,8 +43,6 @@ public IEnumerable GetConfigurations() var settings = cfg.GetSettings(); settings.WithRandomSuffixAppendedToStreamName(); cfg.StorageProviderName = providerName; - if (providerName == "sqlstreamsource") - ConfigurePgSqlStreamStore(cfg); yield return cfg; } } @@ -59,11 +57,7 @@ private IEnumerable Serializers() protected virtual IEnumerable ProviderNames() { yield return "file"; - //yield return "postgres"; - yield return "eventstore"; - yield return "sqlstreamstore"; - //yield return "pravega"; } private object[] ToObjectArray(object o) @@ -71,25 +65,11 @@ private object[] ToObjectArray(object o) return new[] { o }; } - private static void ConfigurePgSqlStreamStore(Config config) - { - var connectionString = "Host=localhost;Port=5432;User Id=postgres;Database=postgres"; - var settings = new PostgresStreamStoreSettings(connectionString); - var pgStreamStore = new PostgresStreamStore(settings); - config.Container.Register(pgStreamStore); - pgStreamStore.CreateSchemaIfNotExists().GetAwaiter().GetResult(); - } - public class Cluster : TestConfigurations { protected override IEnumerable ProviderNames() { yield return "eventstore"; - yield return "sqlstreamstore"; - //yield return "pravega"; -#if POSTGRES - yield return "postgres"; -#endif } } diff --git a/src/Trello.Test/Trello.Test.csproj b/src/Trello.Test/Trello.Test.csproj index fdf04b1..80b025e 100644 --- a/src/Trello.Test/Trello.Test.csproj +++ b/src/Trello.Test/Trello.Test.csproj @@ -1,14 +1,14 @@ - netcoreapp3.1 + net6.0 false - - + + diff --git a/src/Trello.Web/Trello.Web.csproj b/src/Trello.Web/Trello.Web.csproj index e74c8b1..8c39bd8 100644 --- a/src/Trello.Web/Trello.Web.csproj +++ b/src/Trello.Web/Trello.Web.csproj @@ -1,7 +1,7 @@ - netcoreapp3.1 + net6.0 diff --git a/test-dependencies.sh b/test-dependencies.sh index 477baf1..bbd45d2 100755 --- a/test-dependencies.sh +++ b/test-dependencies.sh @@ -1,8 +1,6 @@ #!/bin/bash -export HOST_IP=`ipconfig getifaddr en0` -export PRAVEGA_CONTROLLER=tcp://$HOST_IP:9090 mssql_image=mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04 @@ -16,14 +14,14 @@ if [ "$1" == "stop" -o "$1" == "restart" ]; then docker rm -f eventstore docker rm -f postgres docker rm -f mssql2019 - docker-compose down +# docker-compose down fi if [ "$1" == "start" -o "$1" == "restart" ]; then docker run --name eventstore -d -p 2113:2113 -p 1113:1113 eventstore/eventstore:release-4.1.0 - docker run --name postgres -dp5432:5432 -e POSTGRES_PASSWORD='postgres' postgres:9.6.10 - docker run --name mssql2019 -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=abc123ABC' -p 1433:1433 -d $mssql_image - docker-compose up -d +# docker run --name postgres -dp5432:5432 -e POSTGRES_PASSWORD='postgres' postgres:9.6.10 +# docker run --name mssql2019 -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=abc123ABC' -p 1433:1433 -d $mssql_image +# docker-compose up -d fi