Skip to content

Commit

Permalink
first implementation Pravega storage provider
Browse files Browse the repository at this point in the history
compiles, connects, creates stream but then hangs
  • Loading branch information
rofr committed May 19, 2020
1 parent 5ebfd7d commit cc002e0
Show file tree
Hide file tree
Showing 16 changed files with 553 additions and 27 deletions.
4 changes: 2 additions & 2 deletions src/Memstate.Core/Configuration/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void SetStorageProvider(StorageProvider storageProvider)
/// <summary>
/// Name of a well known storage provider OR resolvable type name.
/// </summary>
public string StorageProviderName { get; set; } = StorageProviders.FILE;
public string StorageProviderName { get; set; } = StorageProviders.File;

public StorageProvider GetStorageProvider()
{
Expand All @@ -183,7 +183,7 @@ public StorageProvider GetStorageProvider()
/// Name of a well known serializer or resolvable type name OR the value Auto (default)
/// </summary>
/// <value>The name of the serializer.</value>
public string SerializerName { get; set; } = Serializers.AUTO;
public string SerializerName { get; set; } = Serializers.Auto;

private object Convert(string value, Type type)
{
Expand Down
21 changes: 11 additions & 10 deletions src/Memstate.Core/Configuration/Serializers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,37 @@ namespace Memstate
internal class Serializers : Providers<ISerializer>
{
/// <summary>
/// Take the first available serializer in the order: Newtonsoft.Json, Wire.
/// Take the first available serializer
/// as determined by <see cref="AutoResolutionCandidates"/>
/// </summary>
public const string AUTO = "Auto";
public const string Auto = "Auto";

/// <summary>
/// Wire binary serializer
/// </summary>
public const string WIRE = "Wire";
public const string Wire = "Wire";

/// <summary>
/// NewtonSoft JSON serializer
/// </summary>
public const string NEWTONSOFT_JSON = "Newtonsoft.Json";
public const string NewtonsoftJson = "Newtonsoft.Json";

private const string Wire = "Memstate.Wire.WireSerializerAdapter, Memstate.Wire";
private const string NewtonSoftJson = "Memstate.JsonNet.JsonSerializerAdapter, Memstate.JsonNet";
private const string WireTypeName = "Memstate.Wire.WireSerializerAdapter, Memstate.Wire";
private const string NewtonSoftJsonTypeName = "Memstate.JsonNet.JsonSerializerAdapter, Memstate.JsonNet";

public Serializers()
{
Register("Auto", AutoResolve);
Register("BinaryFormatter", () => InstanceFromTypeName(nameof(BinaryFormatterAdapter)));
Register("Wire", () => InstanceFromTypeName(Wire));
Register("NewtonSoft.Json", () => InstanceFromTypeName(NewtonSoftJson));
Register("Wire", () => InstanceFromTypeName(WireTypeName));
Register("NewtonSoft.Json", () => InstanceFromTypeName(NewtonSoftJsonTypeName));
}

protected override IEnumerable<string> AutoResolutionCandidates()
{
yield return "BinaryFormatter";
yield return "Newtonsoft.Json";
yield return "Wire";
yield return "Newtonsoft.Json";
yield return "BinaryFormatter";
}
}
}
1 change: 1 addition & 0 deletions src/Memstate.Core/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
[assembly: InternalsVisibleTo("Memstate.EventStore")]
[assembly: InternalsVisibleTo("Memstate.Postgressql.Tests")]
[assembly: InternalsVisibleTo("Memstate.SqlStreamStore")]
[assembly: InternalsVisibleTo("Memstate.Pravega")]
21 changes: 11 additions & 10 deletions src/Memstate.Core/Storage/StorageProviders.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,25 @@ namespace Memstate
internal class StorageProviders : Providers<StorageProvider>
{
//well known storage provider names
public const string EVENTSTORE = "EventStore";
public const string FILE = "File";
public const string POSTGRES = "Postgres";
public const string SQLSTREAMSTORE = "SqlStreamStore";
public const string EventStore = nameof(EventStore);
public const string File = nameof(File);
public const string Postgres = nameof(Postgres);
public const string SqlStreamStore = nameof(SqlStreamStore);
public const string Pravega = nameof(Pravega);


private const string EventStoreProviderType = "Memstate.EventStore.EventStoreProvider, Memstate.EventStore";

private const string PostgresProviderType = "Memstate.Postgres.PostgresProvider, Memstate.Postgres";

private const string SqlStreamStoreProviderType = "Memstate.SqlStreamStore.SqlStreamStoreProvider, Memstate.SqlStreamStore";
private const string PravegaProviderType = "Memstate.Pravega.PravegaProvider, Memstate.Pravega";

public StorageProviders()
{
Register(FILE, () => new FileStorageProvider());
Register(EVENTSTORE, () => InstanceFromTypeName(EventStoreProviderType));
Register(POSTGRES, () => InstanceFromTypeName(PostgresProviderType));
Register(SQLSTREAMSTORE, () => InstanceFromTypeName(SqlStreamStoreProviderType));
Register(File, () => new FileStorageProvider());
Register(EventStore, () => InstanceFromTypeName(EventStoreProviderType));
Register(Postgres, () => InstanceFromTypeName(PostgresProviderType));
Register(SqlStreamStore, () => InstanceFromTypeName(SqlStreamStoreProviderType));
Register(Pravega, () => InstanceFromTypeName(PravegaProviderType));
}

protected override IEnumerable<string> AutoResolutionCandidates()
Expand Down
21 changes: 21 additions & 0 deletions src/Memstate.Pravega/Memstate.Pravega.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.11.4" />
<PackageReference Include="Grpc.Net.Client" Version="2.28.0" />
<PackageReference Include="Grpc.Tools" Version="2.28.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Memstate.Core\Memstate.Core.csproj" />
</ItemGroup>

<ItemGroup>
<Protobuf Include="pravega.proto" GrpcServices="Client" />
</ItemGroup>

</Project>
65 changes: 65 additions & 0 deletions src/Memstate.Pravega/PravegaJournalReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Memstate.Configuration;

namespace Memstate.Pravega
{
public class PravegaJournalReader : IJournalReader
{
private readonly CancellationToken _cancellationToken;
private readonly CancellationTokenSource _cts;

private readonly PravegaGateway.PravegaGatewayClient _client;
private readonly ISerializer _serializer;

private readonly string _scope;
private readonly string _streamName = "mystream";

public PravegaJournalReader(PravegaGateway.PravegaGatewayClient client, ISerializer serializer)
{
_client = client;
_cts = new CancellationTokenSource();
_cancellationToken = _cts.Token;
_serializer = serializer;
_scope = Config.Current.GetSettings<EngineSettings>().StreamName;
}
public Task DisposeAsync()
{
_cts.Cancel();
_cts.Token.WaitHandle.WaitOne();
return Task.CompletedTask;
}

public IEnumerable<JournalRecord> GetRecords(long fromRecord = 0)
{
var request = new ReadEventsRequest
{
Scope = _scope,
Stream = _streamName,
};

var recordNumber = 0;
var response = _client.ReadEvents(request, cancellationToken: _cancellationToken);
while (!_cancellationToken.IsCancellationRequested)
{
var responseStream = response.ResponseStream;
while (responseStream.MoveNext().GetAwaiter().GetResult())
{
if (recordNumber <= fromRecord) continue;
var @event = responseStream.Current;
var bytes = @event.Event.ToByteArray();
Console.WriteLine("Position:" + @event.Position);
Console.WriteLine("StreamCut:" + @event.StreamCut);
Console.WriteLine("EventPointer:" + @event.EventPointer);

var savedRecord = (JournalRecord) _serializer.Deserialize(bytes);
var record = new JournalRecord(recordNumber++, savedRecord.Written, savedRecord.Command);
yield return record;
}
}
}
}
}
69 changes: 69 additions & 0 deletions src/Memstate.Pravega/PravegaJournalSubscription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Memstate.Configuration;

namespace Memstate.Pravega
{
public class PravegaJournalSubscription : IJournalSubscription
{
private readonly Action<JournalRecord> _handler;
private readonly IAsyncStreamReader<ReadEventsResponse> _reader;
private readonly ISerializer _serializer;
private readonly long _firstRecordNumberToRead;
private Task _task;
private readonly CancellationTokenSource _cts;

public PravegaJournalSubscription(Action<JournalRecord> handler, IAsyncStreamReader<ReadEventsResponse> reader, long fromRecord)
{
_handler = handler;
_reader = reader;
_serializer = Config.Current.CreateSerializer();
_firstRecordNumberToRead = fromRecord;
_cts = new CancellationTokenSource();
}

public void Start() => _task = Run();
private async Task Run()
{
var recordNumber = 0;
while (await _reader.MoveNext(_cts.Token))
{
//Skip forward to the position we want to start reading from
//todo: learn how to request from a specific StreamCut
if (recordNumber < _firstRecordNumberToRead) continue;
var eventsResponse = _reader.Current;
var bytes = eventsResponse.Event.ToByteArray();
var savedRecord = (JournalRecord) _serializer.Deserialize(bytes);
var record = new JournalRecord(recordNumber++, savedRecord.Written, savedRecord.Command);
_handler.Invoke(record);
}

}

public void Dispose()
{
Console.WriteLine("Terminating subscription");
_cts.Cancel();
_task.GetAwaiter().GetResult();
Console.WriteLine("Subscription terminated");
}

//TODO: figure out how to know
public bool Ready() => true;
}

public static class ConfigExtensions
{
public static Config UsePravega(this Config config)
{
config.SerializerName = Serializers.Wire;
config.StorageProviderName = StorageProviders.Pravega;
var provider = new PravegaProvider();
provider.Initialize();
config.Container.Register(provider);
return config;
}
}
}
40 changes: 40 additions & 0 deletions src/Memstate.Pravega/PravegaJournalWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Memstate.Configuration;

namespace Memstate.Pravega
{
public class PravegaJournalWriter : IJournalWriter
{
private readonly PravegaGateway.PravegaGatewayClient _client;
private readonly ISerializer _serializer;
private readonly AsyncClientStreamingCall<WriteEventsRequest, WriteEventsResponse> _writer;
public PravegaJournalWriter(PravegaGateway.PravegaGatewayClient client, ISerializer serializer)
{
_serializer = serializer;
_client = client;
_writer = _client.WriteEvents();
}

public Task DisposeAsync()
{
return Task.CompletedTask;
}

public async void Send(Command command)
{
var record = new JournalRecord(0, DateTimeOffset.Now, command);
var bytes = _serializer.Serialize(record);

var request = new WriteEventsRequest
{
Event = ByteString.CopyFrom(bytes),
Stream = "mystream",
Scope = Config.Current.GetSettings<EngineSettings>().StreamName
};
await _writer.RequestStream.WriteAsync(request);
}
}
}
71 changes: 71 additions & 0 deletions src/Memstate.Pravega/PravegaProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using System;
using Grpc.Net.Client;
using Memstate.Configuration;

namespace Memstate.Pravega
{
public class PravegaProvider : StorageProvider
{
private readonly PravegaGateway.PravegaGatewayClient _client;

public override void Initialize()
{
var config = Config.Current;
var settings = config.GetSettings<EngineSettings>();
var parts = (settings.StreamName + "/mystream").Split("/");
if (parts.Length != 2) throw new ArgumentException("Bad scope/stream: " + settings.StreamName);
var (scope, stream) = (parts[0], parts[1]);

var createScopeResponse = _client.CreateScope(new CreateScopeRequest
{
Scope = scope

});
if (createScopeResponse.Created)
{
Console.WriteLine("Created scope: " + scope);
}
var createStreamResponse = _client.CreateStream(new CreateStreamRequest
{
Scope = scope,
Stream = stream,
ScalingPolicy = new ScalingPolicy
{
MinNumSegments = 1,
ScaleType = ScalingPolicy.Types.ScalingPolicyType.FixedNumSegments,
}
});
if (createStreamResponse.Created)
{
Console.WriteLine("Create stream " + "myscope/mystream");
}
}
public override IJournalReader CreateJournalReader()
{
var serializer = Config.Current.CreateSerializer();
return new PravegaJournalReader(_client, serializer);
}

public override IJournalWriter CreateJournalWriter(long nextRecordNumber)
{
var serializer = Config.Current.CreateSerializer();
return new PravegaJournalWriter(_client, serializer);
}

public override IJournalSubscriptionSource CreateJournalSubscriptionSource()
{
return new PravegaSubscriptionSource(_client);
}

public PravegaProvider()
{
// https://github.com/grpc/grpc-java/issues/6193#issuecomment-537745226
// https://docs.microsoft.com/en-us/aspnet/core/grpc/troubleshoot?view=aspnetcore-3.0#call-insecure-grpc-services-with-net-core-client
AppContext.SetSwitch(
"System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
// AppContext.SetSwitch("System.Net.Http.UseSocketsHttpHandler", false);
var channel = GrpcChannel.ForAddress("http://localhost:54672");
_client = new PravegaGateway.PravegaGatewayClient(channel);
}
}
}
Loading

0 comments on commit cc002e0

Please sign in to comment.