Skip to content

Commit

Permalink
Fix infinite await in hive tests (#3691)
Browse files Browse the repository at this point in the history
* logs everywhere

* check

* fix order of processing blocks

* one more try

* check with BlockAddedToMain instead of NewHeadBlock

* add timeout

* order experiment

* simplification

* missed arg

* Revert "logs everywhere"

This reverts commit 284af19.

* cosmetic

* not added block as a valid result, not exception

* decrease timeout to try to pass forkStressTests

* Revert "check with BlockAddedToMain instead of NewHeadBlock"

This reverts commit dd054f2.

* change event to ProcessingQueueEmpty

* revert to infinite timeout

* release on both events

* try with awaiting for empty queue event only at the end

* small refactor

* Revert "try with awaiting for empty queue event only at the end"

This reverts commit cc03ecf.

* try with two events

* one more try

* refactor tracer to catch skipped blocks

* change doc comment
  • Loading branch information
marcindsobczak authored Dec 15, 2021
1 parent 71113ad commit f5f0d94
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public Task StopAsync(bool processRemainingBlocks = false)
return _processor.StopAsync(processRemainingBlocks);
}

public Block Process(Block block, ProcessingOptions options, IBlockTracer tracer)
public Block? Process(Block block, ProcessingOptions options, IBlockTracer tracer)
{
lock (_lock)
{
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Blockchain/Tracing/ITracer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public interface ITracer
/// </summary>
/// <param name="block">Block to trace.</param>
/// <param name="tracer">Trace to act on block processing events.</param>
/// <returns>Post trace state root</returns>
Keccak Trace(Block block, IBlockTracer tracer);
/// <returns>Processed block</returns>
Block? Trace(Block block, IBlockTracer tracer);

void Accept(ITreeVisitor visitor, Keccak stateRoot);
}
Expand Down
7 changes: 4 additions & 3 deletions src/Nethermind/Nethermind.Blockchain/Tracing/Tracer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,17 @@ public Tracer(IStateProvider stateProvider, IBlockchainProcessor blockProcessor,
_processingOptions = processingOptions;
}

public Keccak Trace(Block block, IBlockTracer blockTracer)
public Block? Trace(Block block, IBlockTracer blockTracer)
{
/* We force process since we want to process a block that has already been processed in the past and normally it would be ignored.
We also want to make it read only so the state is not modified persistently in any way. */

blockTracer.StartNewBlockTrace(block);

Block? processedBlock = null;
try
{
_blockProcessor.Process(block, _processingOptions, blockTracer);
processedBlock = _blockProcessor.Process(block, _processingOptions, blockTracer);
}
catch (Exception)
{
Expand All @@ -56,7 +57,7 @@ We also want to make it read only so the state is not modified persistently in a

blockTracer.EndBlockTrace();

return _stateProvider.StateRoot;
return processedBlock;
}

public void Accept(ITreeVisitor visitor, Keccak stateRoot)
Expand Down
37 changes: 21 additions & 16 deletions src/Nethermind/Nethermind.Hive/HiveRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Processing;
using Nethermind.Blockchain.Tracing;
using Nethermind.Blockchain.Validators;
using Nethermind.Config;
Expand Down Expand Up @@ -66,21 +67,21 @@ public HiveRunner(
public async Task Start(CancellationToken cancellationToken)
{
if (_logger.IsInfo) _logger.Info("HIVE initialization started");
_blockTree.NewHeadBlock += BlockTreeOnNewHeadBlock;
_blockTree.BlockAddedToMain += BlockTreeOnBlockAddedToMain;
IHiveConfig hiveConfig = _configurationProvider.GetConfig<IHiveConfig>();

ListEnvironmentVariables();
await InitializeBlocks(hiveConfig.BlocksDir, cancellationToken);
await InitializeChain(hiveConfig.ChainFile);

_blockTree.NewHeadBlock -= BlockTreeOnNewHeadBlock;
_blockTree.BlockAddedToMain -= BlockTreeOnBlockAddedToMain;

if (_logger.IsInfo) _logger.Info("HIVE initialization completed");
}

private void BlockTreeOnNewHeadBlock(object? sender, BlockEventArgs e)
private void BlockTreeOnBlockAddedToMain(object? sender, BlockEventArgs e)
{
_logger.Info($"HIVE new head block {e.Block.ToString(Block.Format.Short)}");
_logger.Info($"HIVE block added to main: {e.Block.ToString(Block.Format.Short)}");
_resetEvent.Release(1);
}

Expand Down Expand Up @@ -133,20 +134,21 @@ private async Task InitializeBlocks(string blocksDir, CancellationToken cancella
if (_logger.IsInfo) _logger.Info($"HIVE Loading blocks from {blocksDir}");

string[] files = Directory.GetFiles(blocksDir).OrderBy(x => x).ToArray();
var blocks = files.Select(x => new {File = x, Block = DecodeBlock(x)}).OrderBy(x => x.Block.Header.Number)
.ToArray();
if (_logger.IsInfo) _logger.Info($"Loaded {files.Length} files with blocks to process.");

foreach (var block in blocks)
foreach (var file in files)
{
if (cancellationToken.IsCancellationRequested)
{
break;
}

Block block = DecodeBlock(file);

if (_logger.IsInfo)
_logger.Info(
$"HIVE Processing block file: {block.File} - {block.Block.ToString(Block.Format.Short)}");
await ProcessBlock(block.Block);
$"HIVE Processing block file: {file} - {block.ToString(Block.Format.Short)}");
await ProcessBlock(block);
}
}

Expand Down Expand Up @@ -190,11 +192,11 @@ private Block DecodeBlock(string file)
return Rlp.Decode<Block>(blockRlp);
}

private async Task WaitAsync(SemaphoreSlim semaphore, string error)
private async Task WaitForBlockProcessing(SemaphoreSlim semaphore)
{
if (!await semaphore.WaitAsync(-1))
{
throw new InvalidOperationException(error);
throw new InvalidOperationException();
}
}

Expand All @@ -218,23 +220,26 @@ private async Task ProcessBlock(Block block)

try
{
_tracer.Trace(block, NullBlockTracer.Instance);
if (_tracer.Trace(block, NullBlockTracer.Instance) is null)
{
return;
}
}
catch (Exception ex)
{
if (_logger.IsError) _logger.Error($"Failed to process block {block}", ex);
return;
}

await WaitAsync(_resetEvent, string.Empty);

if (_logger.IsInfo)
_logger.Info(
$"HIVE suggested {block.ToString(Block.Format.Short)}, now best suggested header {_blockTree.BestSuggestedHeader}, head {_blockTree.Head?.Header?.ToString(BlockHeader.Format.Short)}");

await WaitForBlockProcessing(_resetEvent);
}
catch (Exception e)
{
_logger.Error($"HIVE Invalid block: {block.Hash}, ignoring", e);
_resetEvent.Release(1);
_logger.Error($"HIVE Invalid block: {block.Hash}, ignoring. ", e);
}
}
}
Expand Down

0 comments on commit f5f0d94

Please sign in to comment.