Skip to content

Commit

Permalink
pravega WIP, sss tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rofr committed May 19, 2020
1 parent 4a9f38e commit bd6c2cc
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
<ItemGroup>
<PackageReference Include="microsoft.net.test.sdk" Version="16.6.1" />
<PackageReference Include="nunit" Version="3.12.0" />
<PackageReference Include="nunit3testadapter" Version="3.16.1" />
<PackageReference Include="NUnit3TestAdapter" Version="3.16.1">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
Expand Down
1 change: 1 addition & 0 deletions src/Memstate.Pravega/PravegaJournalSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public PravegaJournalSubscription(Action<JournalRecord> handler, IAsyncStreamRea
public void Start() => _task = Run();
private async Task Run()
{
var recordNumber = 0;
while (await _reader.MoveNext(_cts.Token))
{
var eventsResponse = _reader.Current;
Expand Down
4 changes: 2 additions & 2 deletions src/Memstate.Pravega/PravegaJournalWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ public PravegaJournalWriter(PravegaGateway.PravegaGatewayClient client, ISeriali
_writer = _client.WriteEvents();
}

public Task DisposeAsync()
public override async Task DisposeAsync()
{
await base.DisposeAsync();
_writer.Dispose();
return Task.CompletedTask;
}


Expand Down
2 changes: 1 addition & 1 deletion src/Memstate.Pravega/PravegaProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public override void Initialize()
public override IJournalReader CreateJournalReader()
{
var serializer = Config.Current.CreateSerializer();
return new PravegaJournalReader(_client, serializer, _scope, _stream);
return new PravegaJournalReader(_client, serializer, _scope, _stream, null);
}

public override IJournalWriter CreateJournalWriter(long nextRecordNumber)
Expand Down
2 changes: 1 addition & 1 deletion src/Memstate.Pravega/PravegaSubscriptionSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public IJournalSubscription Subscribe(long @from, Action<JournalRecord> handler)
request.Stream = "mystream";
var response = _client.ReadEvents(request);
var streamReader = response.ResponseStream;
var sub = new PravegaJournalSubscription(handler, streamReader, @from);
var sub = new PravegaJournalSubscription(handler, streamReader);
sub.Start();
return sub;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

namespace Memstate.SqlStreamStore
{
public class SqlStreamSourceJournalReader : IJournalReader
public class SqlStreamStoreJournalReader : IJournalReader
{
private readonly IStreamStore _streamStore;
private readonly StreamId _streamId;
private readonly ISerializer _serializer;

public SqlStreamSourceJournalReader(
public SqlStreamStoreJournalReader(
IStreamStore streamStore,
StreamId streamId,
ISerializer serializer)
Expand Down
12 changes: 6 additions & 6 deletions src/Memstate.SqlStreamStore/SqlStreamStoreProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ public SqlStreamStoreProvider(IStreamStore streamStore)

public override IJournalReader CreateJournalReader()
{
return new SqlSteamStoreSubscriptionJournalReader(
// return new SqlSteamStoreSubscriptionJournalReader(
// _streamStore,
// _streamId,
// _serializer);

return new SqlStreamStoreJournalReader(
_streamStore,
_streamId,
_serializer);

//return new SqlStreamSourceJournalReader(
// _streamStore,
// _streamId,
// _serializer);
}

public override IJournalWriter CreateJournalWriter(long nextRecordNumber)
Expand Down
108 changes: 108 additions & 0 deletions src/System.Test/SqlStreamStorePerformanceRepro.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
using System.Linq;
using System.Threading.Tasks;
using Memstate;
using Memstate.Configuration;
using Memstate.SqlStreamStore;
using Npgsql;
using NUnit.Framework;
using SqlStreamStore;

namespace System.Test
{
[TestFixture]
public class SqlStreamStorePerformanceRepro
{

private const int MessageSize = 1024;
private const int NumMessages = 200000;
private const string PgsqlConnectionString
= "Host=localhost; Password=postgres; User ID=postgres; Database=postgres;";

private IJournalWriter _writer;
private SqlStreamStoreProvider _provider;
private String _streamName;


[SetUp]
public void Setup()
{
var config = Config.Current;
_streamName = "test-" + DateTime.Now.ToFileTimeUtc();
config.GetSettings<EngineSettings>().StreamName = _streamName;


config.SerializerName = Serializers.NewtonsoftJson;
var streamStore = Postgres();
_provider = new SqlStreamStoreProvider(streamStore);
_writer = _provider.CreateJournalWriter(0);

}

[Test]
public async Task WriteTruncateAndLoadUsingReader()
{
Console.WriteLine(nameof(WriteTruncateAndLoadUsingReader) + " " + _streamName);
await AppendMessages(NumMessages, MessageSize);
await DeleteMessages();
var reader = _provider.CreateJournalReader();
var records = reader.GetRecords().ToList();
Console.WriteLine("Records read: " + records.Count);
Console.WriteLine(DateTime.Now);
}

[Test]
public async Task WriteTruncateAndLoadUsingSubscription()
{
Console.WriteLine(nameof(WriteTruncateAndLoadUsingSubscription) + " " + _streamName);
await AppendMessages(NumMessages, MessageSize);
await DeleteMessages();
var sub = _provider.CreateJournalSubscriptionSource()
.Subscribe(0, jr => Console.WriteLine(jr.RecordNumber));
while (!sub.Ready()) await Task.Delay(TimeSpan.FromMilliseconds(20));
Console.WriteLine("Ready! " + DateTime.Now);
}

private async Task DeleteMessages()
{
var connection = new NpgsqlConnection(PgsqlConnectionString);
await connection.OpenAsync();
var cmd = connection.CreateCommand();
cmd.CommandText = "DELETE FROM messages";
var result = await cmd.ExecuteNonQueryAsync();
Console.WriteLine("Messages deleted: " + result);

}
private IStreamStore Postgres()
{
var settings = new PostgresStreamStoreSettings(PgsqlConnectionString);
var store = new PostgresStreamStore(settings);
store.CreateSchemaIfNotExists().GetAwaiter().GetResult();
return store;
}

private async Task AppendMessages(int numMessages, int messageSize)
{
for (int i = 0; i < numMessages; i++)
{
_writer.Send(new MyCommand(messageSize));
}
await _writer.DisposeAsync();
Console.WriteLine("Append complete: " + DateTime.Now);
}

private class MyCommand : Command
{
public byte[] Payload { get; set; }

public MyCommand(int size)
{
Payload = new byte[size];
}

internal override object ExecuteImpl(object model)
{
throw new NotImplementedException();
}
}
}
}

0 comments on commit bd6c2cc

Please sign in to comment.