From 1b7e5358405fc0cf75ce78f3fda690957604f010 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Thu, 18 Jul 2024 17:13:44 +0100 Subject: [PATCH] Track cleaned up and unvailable logs --- .../evmregistry/v21/logprovider/buffer_v1.go | 74 +++++++++++++------ .../evmregistry/v21/logprovider/provider.go | 11 ++- 2 files changed, 61 insertions(+), 24 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index a722b878563..559bf8edaef 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -70,25 +70,27 @@ type logBuffer struct { // last block number seen by the buffer lastBlockSeen *atomic.Int64 // map of upkeep id to its queue - queues map[string]*upkeepLogQueue - queueIDs []string - blockHashes map[int64]string - availableLogs map[string]map[int64][]int - dequeuedLogs map[string]map[int64][]int + queues map[string]*upkeepLogQueue + queueIDs []string + blockHashes map[int64]string + availableLogs map[string]map[int64][]int + dequeuedLogs map[string]map[int64][]int + unavailableLogs map[string]map[int64][]int lock sync.RWMutex } func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer { return &logBuffer{ - lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), - opts: newLogBufferOptions(lookback, blockRate, logLimit), - lastBlockSeen: new(atomic.Int64), - queueIDs: []string{}, - blockHashes: map[int64]string{}, - queues: make(map[string]*upkeepLogQueue), - availableLogs: map[string]map[int64][]int{}, - dequeuedLogs: map[string]map[int64][]int{}, + lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), + opts: newLogBufferOptions(lookback, blockRate, logLimit), + lastBlockSeen: new(atomic.Int64), + queueIDs: []string{}, + blockHashes: map[int64]string{}, + queues: make(map[string]*upkeepLogQueue), + availableLogs: map[string]map[int64][]int{}, + dequeuedLogs: map[string]map[int64][]int{}, + unavailableLogs: map[string]map[int64][]int{}, } } @@ -201,6 +203,21 @@ func (b *logBuffer) dequeue(start int64, capacity int, minimumDequeue bool) ([]B logsInRange := q.sizeOfRange(start, end) if logsInRange == 0 { + unavailableLogs, ok := b.unavailableLogs[qid] + if !ok { + unavailableLogs = map[int64][]int{} + } + + series, ok3 := unavailableLogs[start] + if !ok3 { + series = []int{0} + } else { + series = append(series, 0) + } + + unavailableLogs[start] = series + + b.unavailableLogs[qid] = unavailableLogs // if there are no logs in the range, skip the upkeep continue } @@ -355,20 +372,22 @@ type upkeepLogQueue struct { // states keeps track of the state of the logs that are known to the queue // and the block number they were seen at - states map[string]logTriggerStateEntry - dequeued map[int64]int - lock sync.RWMutex + states map[string]logTriggerStateEntry + dequeued map[int64]int + cleanedUpLogs map[int64][]int + lock sync.RWMutex } func newUpkeepLogQueue(lggr logger.Logger, id *big.Int, opts *logBufferOptions) *upkeepLogQueue { return &upkeepLogQueue{ - lggr: lggr.With("upkeepID", id.String()), - id: id, - opts: opts, - logs: map[int64][]logpoller.Log{}, - blockNumbers: make([]int64, 0), - states: make(map[string]logTriggerStateEntry), - dequeued: map[int64]int{}, + lggr: lggr.With("upkeepID", id.String()), + id: id, + opts: opts, + logs: map[int64][]logpoller.Log{}, + blockNumbers: make([]int64, 0), + states: make(map[string]logTriggerStateEntry), + cleanedUpLogs: map[int64][]int{}, + dequeued: map[int64]int{}, } } @@ -544,6 +563,15 @@ func (q *upkeepLogQueue) clean(blockThreshold int64) int { updated = append(updated, l) } + series, ok3 := q.cleanedUpLogs[blockNumber] + if !ok3 { + series = []int{dropped + expired} + } else { + series = append(series, dropped+expired) + } + + q.cleanedUpLogs[blockNumber] = series + if dropped > 0 || expired > 0 { totalDropped += dropped q.logs[blockNumber] = updated diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index a8747284c6a..e28fc773cf3 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -198,7 +198,16 @@ func (p *logEventProvider) Close() error { bufV1 := p.buffer.(*logBuffer) availableLogsJSON, _ := json.Marshal(bufV1.availableLogs) dequeuedLogsJSON, _ := json.Marshal(bufV1.dequeuedLogs) - p.lggr.Debugw("shutting down LogProvider", "availableLogsJSON", string(availableLogsJSON), "dequeuedLogsJSON", string(dequeuedLogsJSON)) + + unavailableLogsJSON, _ := json.Marshal(bufV1.unavailableLogs) + + allCleanedUpLogs := map[string]map[int64][]int{} + for qid, q := range bufV1.queues { + allCleanedUpLogs[qid] = q.cleanedUpLogs + } + cleanedUpLogsJSON, _ := json.Marshal(allCleanedUpLogs) + + p.lggr.Debugw("shutting down LogProvider", "availableLogsJSON", string(availableLogsJSON), "dequeuedLogsJSON", string(dequeuedLogsJSON), "cleanedUpLogsJSON", string(cleanedUpLogsJSON), "unavailableLogsJSON", string(unavailableLogsJSON)) return nil })