diff --git a/eth/eventsyncer/event_syncer.go b/eth/eventsyncer/event_syncer.go index c6c55829af..f28c4a9c23 100644 --- a/eth/eventsyncer/event_syncer.go +++ b/eth/eventsyncer/event_syncer.go @@ -6,6 +6,7 @@ import ( "context" "errors" "fmt" + "math/big" "time" "go.uber.org/zap" @@ -80,6 +81,16 @@ func (es *EventSyncer) Healthy(ctx context.Context) error { if time.Since(es.lastProcessedBlockChange) > es.stalenessThreshold { return fmt.Errorf("syncing is stuck at block %d", lastProcessedBlock.Uint64()) } + + // Check if the block is too old. + header, err := es.executionClient.(*executionclient.ExecutionClient).HeaderByNumber(ctx, big.NewInt(int64(es.lastProcessedBlock))) + if err != nil { + return fmt.Errorf("failed to get header for block %d: %w", es.lastProcessedBlock, err) + } + if header.Time < uint64(time.Now().Add(-1*time.Minute).Unix()) { + return fmt.Errorf("block %d is too old", es.lastProcessedBlock) + } + return nil } @@ -110,6 +121,15 @@ func (es *EventSyncer) SyncHistory(ctx context.Context, fromBlock uint64) (lastP return 0, fmt.Errorf("event replay: lastProcessedBlock (%d) is lower than fromBlock (%d)", lastProcessedBlock, fromBlock) } + // Check if the block is too old. + header, err := es.executionClient.(*executionclient.ExecutionClient).HeaderByNumber(ctx, big.NewInt(int64(es.lastProcessedBlock))) + if err != nil { + return 0, fmt.Errorf("failed to get header for block %d: %w", es.lastProcessedBlock, err) + } + if header.Time < uint64(time.Now().Add(-3*time.Minute).Unix()) { + return 0, fmt.Errorf("block %d is too old", es.lastProcessedBlock) + } + es.logger.Info("finished syncing historical events", zap.Uint64("from_block", fromBlock), zap.Uint64("last_processed_block", lastProcessedBlock)) diff --git a/eth/executionclient/execution_client.go b/eth/executionclient/execution_client.go index 804d577599..193a49ba8f 100644 --- a/eth/executionclient/execution_client.go +++ b/eth/executionclient/execution_client.go @@ -178,14 +178,20 @@ func (ec *ExecutionClient) fetchLogsInBatches(ctx context.Context, startBlock, e } validLogs = append(validLogs, log) } - if len(validLogs) == 0 { - // Emit empty block logs to indicate that we have advanced to this block. - logs <- BlockLogs{BlockNumber: toBlock} - } else { - for _, blockLogs := range PackLogs(validLogs) { - logs <- blockLogs + + var highestBlock uint64 + for _, blockLogs := range PackLogs(validLogs) { + logs <- blockLogs + if blockLogs.BlockNumber > highestBlock { + highestBlock = blockLogs.BlockNumber } } + + // Emit empty block logs to indicate that we have advanced to this block. + logs <- BlockLogs{BlockNumber: toBlock} + if highestBlock < toBlock { + logs <- BlockLogs{BlockNumber: toBlock} + } } } }() @@ -292,6 +298,18 @@ func (ec *ExecutionClient) BlockByNumber(ctx context.Context, blockNumber *big.I return b, nil } +func (ec *ExecutionClient) HeaderByNumber(ctx context.Context, blockNumber *big.Int) (*ethtypes.Header, error) { + h, err := ec.client.HeaderByNumber(ctx, blockNumber) + if err != nil { + ec.logger.Error(elResponseErrMsg, + zap.String("method", "eth_getBlockByNumber"), + zap.Error(err)) + return nil, err + } + + return h, nil +} + func (ec *ExecutionClient) isClosed() bool { select { case <-ec.closed: