Skip to content

Commit

Permalink
Small improvements to log recoverer (#10575)
Browse files Browse the repository at this point in the history
* Small improvements to log recoverer

* improve comments

* add performFinaliltyBuffer

* update comment

* fix tests

* resovle nit

* use finality depth instead of new constant

* fix tests
  • Loading branch information
infiloop2 authored Sep 11, 2023
1 parent 9269ec0 commit c23b078
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type LogTriggersOptions struct {
BlockRateLimit rate.Limit
// blockLimitBurst is the burst upper limit on the range of blocks the we fetch logs for.
BlockLimitBurst int
// Finality depth is the number of blocks to wait before considering a block final.
FinalityDepth int64
}

func NewOptions(finalityDepth int64) LogTriggersOptions {
Expand Down Expand Up @@ -63,4 +65,7 @@ func (o *LogTriggersOptions) Defaults(finalityDepth int64) {
if o.BlockRateLimit == 0 {
o.BlockRateLimit = rate.Every(o.ReadInterval)
}
if o.FinalityDepth == 0 {
o.FinalityDepth = finalityDepth
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type logRecoverer struct {
poller logpoller.LogPoller
client client.Client
blockTimeResolver *blockTimeResolver

finalityDepth int64
}

var _ LogRecoverer = &logRecoverer{}
Expand All @@ -101,6 +103,8 @@ func NewLogRecoverer(lggr logger.Logger, poller logpoller.LogPoller, client clie
packer: packer,
client: client,
blockTimeResolver: newBlockTimeResolver(poller),

finalityDepth: opts.FinalityDepth,
}

rec.lookbackBlocks.Store(opts.LookbackBlocks)
Expand Down Expand Up @@ -309,7 +313,7 @@ func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers.

r.pending = pending

r.lggr.Debugf("found %d pending payloads", len(pending))
r.lggr.Debugf("found %d recoverable payloads", len(results))

return results, nil
}
Expand Down Expand Up @@ -353,10 +357,10 @@ func (r *logRecoverer) recover(ctx context.Context) error {

// recoverFilter recovers logs for a single upkeep filter.
func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startBlock, offsetBlock int64) error {
start := f.lastRePollBlock
start := f.lastRePollBlock + 1 // NOTE: we expect f.lastRePollBlock + 1 <= offsetBlock, as others would have been filtered out
// ensure we don't recover logs from before the filter was created
// NOTE: we expect that filter with configUpdateBlock > offsetBlock were already filtered out.
if configUpdateBlock := int64(f.configUpdateBlock); start < configUpdateBlock {
// NOTE: we expect that configUpdateBlock <= offsetBlock, as others would have been filtered out
start = configUpdateBlock
}
if start < startBlock {
Expand All @@ -367,6 +371,7 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB
// If recoverer is lagging by a lot (more than 100x recoveryLogsBuffer), allow
// a range of recoveryLogsBurst
// Exploratory: Store lastRePollBlock in DB to prevent bursts during restarts
// (while also taking into account exisitng pending payloads)
end = start + recoveryLogsBurst
}
if end > offsetBlock {
Expand Down Expand Up @@ -477,15 +482,24 @@ func (r *logRecoverer) getRecoveryWindow(latest int64) (int64, int64) {
lookbackBlocks := r.lookbackBlocks.Load()
blockTime := r.blockTime.Load()
blocksInDay := int64(24*time.Hour) / blockTime
return latest - blocksInDay, latest - lookbackBlocks
start := latest - blocksInDay
// Exploratory: Instead of subtracting finality depth to account for finalized performs
// keep two pointers of lastRePollBlock for soft and hard finalization, i.e. manage
// unfinalized perform logs better
end := latest - lookbackBlocks - r.finalityDepth
if start > end {
// In this case, allow starting from more than a day behind
start = end
}
return start, end
}

// getFilterBatch returns a batch of filters that are ready to be recovered.
func (r *logRecoverer) getFilterBatch(offsetBlock int64) []upkeepFilter {
filters := r.filterStore.GetFilters(func(f upkeepFilter) bool {
// ensure we work only on filters that are ready to be recovered
// no need to recover in case f.configUpdateBlock is after offsetBlock
return f.lastRePollBlock <= offsetBlock && int64(f.configUpdateBlock) <= offsetBlock
return f.lastRePollBlock < offsetBlock && int64(f.configUpdateBlock) <= offsetBlock
})

sort.Slice(filters, func(i, j int) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func TestLogRecoverer_Recover(t *testing.T) {
nil,
nil,
[]string{"c207451fa897f9bb13b09d54d8655edf0644e027c53521b4a92eafbb64ba4d14"},
[]int64{200, 0, 450},
[]int64{201, 0, 450},
},
{
"lastRePollBlock updated with burst when lagging behind",
Expand All @@ -366,7 +366,7 @@ func TestLogRecoverer_Recover(t *testing.T) {
topics: []common.Hash{
common.HexToHash("0x1"),
},
lastRePollBlock: 100, // Should be updated with burst
lastRePollBlock: 99, // Should be updated with burst
},
},
[]ocr2keepers.UpkeepState{ocr2keepers.UnknownState},
Expand Down Expand Up @@ -778,7 +778,7 @@ func TestLogRecoverer_GetProposalData(t *testing.T) {
},
logPoller: &mockLogPoller{
LatestBlockFn: func(qopts ...pg.QOpt) (int64, error) {
return 100, nil
return 300, nil
},
},
stateReader: &mockStateReader{
Expand Down Expand Up @@ -813,7 +813,7 @@ func TestLogRecoverer_GetProposalData(t *testing.T) {
},
logPoller: &mockLogPoller{
LatestBlockFn: func(qopts ...pg.QOpt) (int64, error) {
return 100, nil
return 300, nil
},
},
client: &mockClient{
Expand Down Expand Up @@ -853,7 +853,7 @@ func TestLogRecoverer_GetProposalData(t *testing.T) {
},
logPoller: &mockLogPoller{
LatestBlockFn: func(qopts ...pg.QOpt) (int64, error) {
return 100, nil
return 300, nil
},
},
client: &mockClient{
Expand Down Expand Up @@ -885,7 +885,7 @@ func TestLogRecoverer_GetProposalData(t *testing.T) {
},
logPoller: &mockLogPoller{
LatestBlockFn: func(qopts ...pg.QOpt) (int64, error) {
return 100, nil
return 300, nil
},
LogsWithSigsFn: func(start, end int64, eventSigs []common.Hash, address common.Address, qopts ...pg.QOpt) ([]logpoller.Log, error) {
return nil, errors.New("logs with sigs boom")
Expand Down Expand Up @@ -920,7 +920,7 @@ func TestLogRecoverer_GetProposalData(t *testing.T) {
},
logPoller: &mockLogPoller{
LatestBlockFn: func(qopts ...pg.QOpt) (int64, error) {
return 100, nil
return 300, nil
},
LogsWithSigsFn: func(start, end int64, eventSigs []common.Hash, address common.Address, qopts ...pg.QOpt) ([]logpoller.Log, error) {
return []logpoller.Log{
Expand Down Expand Up @@ -968,7 +968,7 @@ func TestLogRecoverer_GetProposalData(t *testing.T) {
},
logPoller: &mockLogPoller{
LatestBlockFn: func(qopts ...pg.QOpt) (int64, error) {
return 100, nil
return 300, nil
},
LogsWithSigsFn: func(start, end int64, eventSigs []common.Hash, address common.Address, qopts ...pg.QOpt) ([]logpoller.Log, error) {
return []logpoller.Log{
Expand Down Expand Up @@ -1019,7 +1019,7 @@ func TestLogRecoverer_GetProposalData(t *testing.T) {
},
logPoller: &mockLogPoller{
LatestBlockFn: func(qopts ...pg.QOpt) (int64, error) {
return 100, nil
return 300, nil
},
LogsWithSigsFn: func(start, end int64, eventSigs []common.Hash, address common.Address, qopts ...pg.QOpt) ([]logpoller.Log, error) {
return []logpoller.Log{
Expand Down

0 comments on commit c23b078

Please sign in to comment.