Skip to content

Commit

Permalink
Send subscription messages in order (#3458)
Browse files Browse the repository at this point in the history
* Send subscription messages in order

* Replace Task with Action
  • Loading branch information
dceleda authored Oct 4, 2021
1 parent 14cd05a commit 4a8c52c
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
using Nethermind.Db;
using Nethermind.Db.Blooms;
using Nethermind.Facade.Eth;
using Nethermind.Int256;
using Nethermind.JsonRpc.Modules;
using Nethermind.JsonRpc.Modules.Eth;
using Nethermind.JsonRpc.Modules.Subscribe;
Expand Down Expand Up @@ -231,7 +232,6 @@ public void NewHeadSubscription_on_BlockAddedToMain_event_with_null_block()
}

[Test]
[Retry(3)]
public void NewHeadSubscription_should_send_notifications_when_adding_multiple_blocks_at_once_and_after_reorgs()
{
MemDb blocksDb = new();
Expand All @@ -251,11 +251,11 @@ public void NewHeadSubscription_should_send_notifications_when_adding_multiple_b
ConcurrentQueue<JsonRpcResult> jsonRpcResult = new();

Block block0 = Build.A.Block.Genesis.WithTotalDifficulty(0L).TestObject;
Block block1 = Build.A.Block.WithParent(block0).WithDifficulty(0).WithTotalDifficulty(0L).TestObject;
Block block2 = Build.A.Block.WithParent(block1).WithDifficulty(0).WithTotalDifficulty(0L).TestObject;
Block block3 = Build.A.Block.WithParent(block2).WithDifficulty(0).WithTotalDifficulty(0L).TestObject;
Block block1B = Build.A.Block.WithParent(block0).WithDifficulty(0).WithTotalDifficulty(0L).TestObject;
Block block2B = Build.A.Block.WithParent(block1B).WithDifficulty(1).WithTotalDifficulty(1L).TestObject;
Block block1 = Build.A.Block.WithParent(block0).WithDifficulty(1).WithTotalDifficulty(1L).TestObject;
Block block2 = Build.A.Block.WithParent(block1).WithDifficulty(2).WithTotalDifficulty(3L).TestObject;
Block block3 = Build.A.Block.WithParent(block2).WithDifficulty(3).WithTotalDifficulty(6L).TestObject;
Block block1B = Build.A.Block.WithParent(block0).WithDifficulty(4).WithTotalDifficulty(4L).TestObject;
Block block2B = Build.A.Block.WithParent(block1B).WithDifficulty(5).WithTotalDifficulty(9L).TestObject;

blockTree.SuggestBlock(block0);
blockTree.SuggestBlock(block1);
Expand Down Expand Up @@ -283,8 +283,68 @@ public void NewHeadSubscription_should_send_notifications_when_adding_multiple_b
blockTree.Head.Should().Be(block2B);

string serialized = _jsonSerializer.Serialize(jsonRpcResult.Last().Response);
var expectedResult = string.Concat("{\"jsonrpc\":\"2.0\",\"method\":\"eth_subscription\",\"params\":{\"subscription\":\"", newHeadSubscription.Id, "\",\"result\":{\"author\":\"0x0000000000000000000000000000000000000000\",\"difficulty\":\"0x1\",\"extraData\":\"0x010203\",\"gasLimit\":\"0x3d0900\",\"gasUsed\":\"0x0\",\"hash\":\"0x2a50f16b6461467b6e5e58c3ac5205763641165455fa9bafc1e9e77a06dc1fe3\",\"logsBloom\":\"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000\",\"miner\":\"0x0000000000000000000000000000000000000000\",\"mixHash\":\"0x2ba5557a4c62a513c7e56d1bf13373e0da6bec016755483e91589fe1c6d212e2\",\"nonce\":\"0x00000000000003e8\",\"number\":\"0x2\",\"parentHash\":\"0x3a7518031f6de870575b043216dedcbb57188476103e40ac5c5d4862da2fbcc8\",\"receiptsRoot\":\"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421\",\"sha3Uncles\":\"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347\",\"size\":\"0x1fe\",\"stateRoot\":\"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421\",\"totalDifficulty\":\"0x1\",\"timestamp\":\"0xf4242\",\"transactions\":[],\"transactionsRoot\":\"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421\",\"uncles\":[]}}}");
expectedResult.Should().Be(serialized);
var expectedResult = string.Concat("{\"jsonrpc\":\"2.0\",\"method\":\"eth_subscription\",\"params\":{\"subscription\":\"", newHeadSubscription.Id, "\",\"result\":{\"author\":\"0x0000000000000000000000000000000000000000\",\"difficulty\":\"0x5\",\"extraData\":\"0x010203\",\"gasLimit\":\"0x3d0900\",\"gasUsed\":\"0x0\",\"hash\":\"0x13f51c304a84742a660b0327c003765af51cb255f7cfa8d1d6c41c99c1c3ecd4\",\"logsBloom\":\"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000\",\"miner\":\"0x0000000000000000000000000000000000000000\",\"mixHash\":\"0x2ba5557a4c62a513c7e56d1bf13373e0da6bec016755483e91589fe1c6d212e2\",\"nonce\":\"0x00000000000003e8\",\"number\":\"0x2\",\"parentHash\":\"0xd07062cc54724bd878b1b826bfa59f24cac986a11a151f2239b16f2a4436f9b2\",\"receiptsRoot\":\"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421\",\"sha3Uncles\":\"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347\",\"size\":\"0x1fe\",\"stateRoot\":\"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421\",\"totalDifficulty\":\"0x9\",\"timestamp\":\"0xf4242\",\"transactions\":[],\"transactionsRoot\":\"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421\",\"uncles\":[]}}}");
serialized.Should().Be(expectedResult);
}

[Test]
public void NewHeadSubscription_should_send_notifications_in_order()
{
MemDb blocksDb = new();
MemDb headersDb = new();
MemDb blocksInfosDb = new();
ChainLevelInfoRepository chainLevelInfoRepository = new(blocksInfosDb);
BlockTree blockTree = new(
blocksDb,
headersDb,
blocksInfosDb,
chainLevelInfoRepository,
MainnetSpecProvider.Instance,
NullBloomStorage.Instance,
LimboLogs.Instance);

NewHeadSubscription newHeadSubscription = new(_jsonRpcDuplexClient, blockTree, _logManager);
ConcurrentQueue<JsonRpcResult> jsonRpcResult = new();

Block block0 = Build.A.Block.Genesis.WithDifficulty(0).WithTotalDifficulty(0L).TestObject;

List<Block> blocks = new() { block0 };

for (int i = 1; i < 21; i++)
{
var difficulty = (UInt256)i;
blocks.Add(Build.A.Block.WithParent(blocks[i-1]).WithDifficulty(difficulty).WithTotalDifficulty(blocks[i-1].TotalDifficulty + difficulty).TestObject);
}

foreach (Block block in blocks)
{
blockTree.SuggestBlock(block);
}

ManualResetEvent manualResetEvent = new(false);
newHeadSubscription.JsonRpcDuplexClient.SendJsonRpcResult(Arg.Do<JsonRpcResult>(j =>
{
jsonRpcResult.Enqueue(j);

if (jsonRpcResult.Count == 21)
{
manualResetEvent.Set();
}
}));

blockTree.UpdateMainChain(blocks.ToArray(), true);

manualResetEvent.WaitOne();

jsonRpcResult.Count.Should().Be(21);
blockTree.Head.Should().Be(blocks[20]);

for (int i = 0; i < 21; i++)
{
jsonRpcResult.TryDequeue(out var result);

((BlockForRpc)((JsonRpcSubscriptionResponse)result.Response).Params.Result).Difficulty.Should().Be((UInt256)i);
}
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,14 @@ private void OnReceiptsInserted(object? sender, ReceiptsEventArgs e)

private void TryPublishReceiptsInBackground(BlockHeader blockHeader, Func<TxReceipt[]> getReceipts, string eventName)
{
Task.Run(() => TryPublishEvent(blockHeader, getReceipts(), eventName))
.ContinueWith(t =>
t.Exception?.Handle(ex =>
{
if (_logger.IsDebug) _logger.Debug($"Logs subscription {Id}: Failed Task.Run after {eventName} event.");
return true;
})
, TaskContinuationOptions.OnlyOnFaulted
);
ScheduleAction(() => TryPublishEvent(blockHeader, getReceipts(), eventName));
}

protected override string GetErrorMsg()
{
return $"Logs subscription {Id} failed.";
}

private void TryPublishEvent(BlockHeader blockHeader, TxReceipt[] receipts, string eventName)
{
BlockHeader fromBlock = _blockTree.FindHeader(_filter.FromBlock);
Expand Down Expand Up @@ -154,6 +151,7 @@ private IEnumerable<FilterLog> GetFilterLogs(BlockHeader blockHeader, TxReceipt[
public override SubscriptionType Type => SubscriptionType.Logs;
public override void Dispose()
{
base.Dispose();
_receiptStorage.ReceiptsInserted -= OnReceiptsInserted;
_blockTree.NewHeadBlock -= OnNewHeadBlock;
if(_logger.IsTrace) _logger.Trace($"Logs subscription {Id} will no longer track ReceiptsInserted.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ namespace Nethermind.JsonRpc.Modules.Subscribe
public class NewHeadSubscription : Subscription
{
private readonly IBlockTree _blockTree;
private readonly ILogger _logger;

public NewHeadSubscription(IJsonRpcDuplexClient jsonRpcDuplexClient, IBlockTree? blockTree, ILogManager? logManager)
: base(jsonRpcDuplexClient)
Expand All @@ -41,27 +40,25 @@ public NewHeadSubscription(IJsonRpcDuplexClient jsonRpcDuplexClient, IBlockTree?

private void OnBlockAddedToMain(object? sender, BlockReplacementEventArgs e)
{
Task.Run(() =>
ScheduleAction(() =>
{
JsonRpcResult result = CreateSubscriptionMessage(new BlockForRpc(e.Block, false));

JsonRpcDuplexClient.SendJsonRpcResult(result);
if(_logger.IsTrace) _logger.Trace($"NewHeads subscription {Id} printed new block");
}).ContinueWith(
t =>
t.Exception?.Handle(ex =>
{
if (_logger.IsDebug) _logger.Debug($"NewHeads subscription {Id}: Failed Task.Run after BlockAddedToMain event.");
return true;
})
, TaskContinuationOptions.OnlyOnFaulted
);
});
}

protected override string GetErrorMsg()
{
return $"NewHeads subscription {Id}: Failed Task.Run after BlockAddedToMain event.";
}

public override SubscriptionType Type => SubscriptionType.NewHeads;

public override void Dispose()
{
base.Dispose();
_blockTree.BlockAddedToMain -= OnBlockAddedToMain;
if(_logger.IsTrace) _logger.Trace($"NewHeads subscription {Id} will no longer track BlockAddedToMain");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ namespace Nethermind.JsonRpc.Modules.Subscribe
public class NewPendingTransactionsSubscription : Subscription
{
private readonly ITxPool _txPool;
private readonly ILogger _logger;

public NewPendingTransactionsSubscription(IJsonRpcDuplexClient jsonRpcDuplexClient, ITxPool? txPool, ILogManager? logManager)
: base(jsonRpcDuplexClient)
Expand All @@ -39,26 +38,24 @@ public NewPendingTransactionsSubscription(IJsonRpcDuplexClient jsonRpcDuplexClie

private void OnNewPending(object? sender, TxEventArgs e)
{
Task.Run(() =>
ScheduleAction(() =>
{
JsonRpcResult result = CreateSubscriptionMessage(e.Transaction.Hash);
JsonRpcDuplexClient.SendJsonRpcResult(result);
if(_logger.IsTrace) _logger.Trace($"NewPendingTransactions subscription {Id} printed hash of NewPendingTransaction.");
}).ContinueWith(
t =>
t.Exception?.Handle(ex =>
{
if (_logger.IsDebug) _logger.Debug($"NewPendingTransactions subscription {Id}: Failed Task.Run after NewPending event.");
return true;
})
, TaskContinuationOptions.OnlyOnFaulted
);
});
}

protected override string GetErrorMsg()
{
return $"NewPendingTransactions subscription {Id}: Failed Task.Run after NewPending event.";
}

public override SubscriptionType Type => SubscriptionType.NewPendingTransactions;

public override void Dispose()
{
base.Dispose();
_txPool.NewPending -= OnNewPending;
if(_logger.IsTrace) _logger.Trace($"NewPendingTransactions subscription {Id} will no longer track NewPendingTransactions");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,33 @@
//

using System;
using System.Threading.Channels;
using System.Threading.Tasks;
using Nethermind.Logging;

namespace Nethermind.JsonRpc.Modules.Subscribe
{
public abstract class Subscription : IDisposable
{
protected ILogger _logger;

protected Subscription(IJsonRpcDuplexClient jsonRpcDuplexClient)
{
Id = string.Concat("0x", Guid.NewGuid().ToString("N"));
JsonRpcDuplexClient = jsonRpcDuplexClient;
ProcessMessages();
}

public string Id { get; }
public abstract SubscriptionType Type { get; }
public IJsonRpcDuplexClient JsonRpcDuplexClient { get; }
public abstract void Dispose();

private Channel<Action> SendChannel { get; } = Channel.CreateUnbounded<Action>();

public virtual void Dispose()
{
SendChannel.Writer.Complete();
}

protected JsonRpcResult CreateSubscriptionMessage(object result)
{
Expand All @@ -44,5 +56,40 @@ protected JsonRpcResult CreateSubscriptionMessage(object result)
}
}, default);
}

protected void ScheduleAction(Action action)
{
SendChannel.Writer.TryWrite(action);
}

protected virtual string GetErrorMsg()
{
return $"Subscription {Id} failed.";
}

private void ProcessMessages()
{
Task.Factory.StartNew(async () =>
{
while (await SendChannel.Reader.WaitToReadAsync())
{
try
{
Action action = await SendChannel.Reader.ReadAsync();
action();
}
catch (Exception e)
{
if (_logger.IsDebug) _logger.Debug(GetErrorMsg());
}
}
}, TaskCreationOptions.LongRunning).ContinueWith(t =>
{
if (t.IsFaulted)
{
if (_logger.IsError) _logger.Error($"{nameof(ProcessMessages)} encountered an exception.", t.Exception);
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ public SyncingSubscription(IJsonRpcDuplexClient jsonRpcDuplexClient, IBlockTree?

private void OnConditionsChange(object? sender, BlockEventArgs e)
{
Task.Run(() =>
ScheduleAction(() =>
{
SyncingResult syncingResult = _ethSyncingInfo.GetFullInfo();
SyncingResult syncingResult = _ethSyncingInfo.GetFullInfo();
bool isSyncing = syncingResult.IsSyncing;

if (isSyncing == _lastIsSyncing)
Expand All @@ -76,23 +76,21 @@ private void OnConditionsChange(object? sender, BlockEventArgs e)
result = CreateSubscriptionMessage(syncingResult);
}


JsonRpcDuplexClient.SendJsonRpcResult(result);
_logger.Trace($"Syncing subscription {Id} printed SyncingResult object.");
}).ContinueWith(
t =>
t.Exception?.Handle(ex =>
{
if (_logger.IsDebug) _logger.Debug($"Syncing subscription {Id}: Failed Task.Run.");
return true;
})
, TaskContinuationOptions.OnlyOnFaulted
);
});
}

protected override string GetErrorMsg()
{
return $"Syncing subscription {Id}: Failed Task.Run.";
}

public override SubscriptionType Type => SubscriptionType.Syncing;
public override void Dispose()
{
base.Dispose();
_blockTree.NewBestSuggestedBlock -= OnConditionsChange;
if(_logger.IsTrace) _logger.Trace($"Syncing subscription {Id} will no longer track NewBestSuggestedBlocks");

Expand Down

0 comments on commit 4a8c52c

Please sign in to comment.