Skip to content

Commit

Permalink
Track cleaned up and unvailable logs
Browse files Browse the repository at this point in the history
  • Loading branch information
ferglor committed Jul 18, 2024
1 parent 2e31006 commit 1b7e535
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,27 @@ type logBuffer struct {
// last block number seen by the buffer
lastBlockSeen *atomic.Int64
// map of upkeep id to its queue
queues map[string]*upkeepLogQueue
queueIDs []string
blockHashes map[int64]string
availableLogs map[string]map[int64][]int
dequeuedLogs map[string]map[int64][]int
queues map[string]*upkeepLogQueue
queueIDs []string
blockHashes map[int64]string
availableLogs map[string]map[int64][]int
dequeuedLogs map[string]map[int64][]int
unavailableLogs map[string]map[int64][]int

lock sync.RWMutex
}

func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer {
return &logBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
queueIDs: []string{},
blockHashes: map[int64]string{},
queues: make(map[string]*upkeepLogQueue),
availableLogs: map[string]map[int64][]int{},
dequeuedLogs: map[string]map[int64][]int{},
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
queueIDs: []string{},
blockHashes: map[int64]string{},
queues: make(map[string]*upkeepLogQueue),
availableLogs: map[string]map[int64][]int{},
dequeuedLogs: map[string]map[int64][]int{},
unavailableLogs: map[string]map[int64][]int{},
}
}

Expand Down Expand Up @@ -201,6 +203,21 @@ func (b *logBuffer) dequeue(start int64, capacity int, minimumDequeue bool) ([]B

logsInRange := q.sizeOfRange(start, end)
if logsInRange == 0 {
unavailableLogs, ok := b.unavailableLogs[qid]
if !ok {
unavailableLogs = map[int64][]int{}
}

series, ok3 := unavailableLogs[start]
if !ok3 {
series = []int{0}
} else {
series = append(series, 0)
}

unavailableLogs[start] = series

b.unavailableLogs[qid] = unavailableLogs
// if there are no logs in the range, skip the upkeep
continue
}
Expand Down Expand Up @@ -355,20 +372,22 @@ type upkeepLogQueue struct {

// states keeps track of the state of the logs that are known to the queue
// and the block number they were seen at
states map[string]logTriggerStateEntry
dequeued map[int64]int
lock sync.RWMutex
states map[string]logTriggerStateEntry
dequeued map[int64]int
cleanedUpLogs map[int64][]int
lock sync.RWMutex
}

func newUpkeepLogQueue(lggr logger.Logger, id *big.Int, opts *logBufferOptions) *upkeepLogQueue {
return &upkeepLogQueue{
lggr: lggr.With("upkeepID", id.String()),
id: id,
opts: opts,
logs: map[int64][]logpoller.Log{},
blockNumbers: make([]int64, 0),
states: make(map[string]logTriggerStateEntry),
dequeued: map[int64]int{},
lggr: lggr.With("upkeepID", id.String()),
id: id,
opts: opts,
logs: map[int64][]logpoller.Log{},
blockNumbers: make([]int64, 0),
states: make(map[string]logTriggerStateEntry),
cleanedUpLogs: map[int64][]int{},
dequeued: map[int64]int{},
}
}

Expand Down Expand Up @@ -544,6 +563,15 @@ func (q *upkeepLogQueue) clean(blockThreshold int64) int {
updated = append(updated, l)
}

series, ok3 := q.cleanedUpLogs[blockNumber]
if !ok3 {
series = []int{dropped + expired}
} else {
series = append(series, dropped+expired)
}

q.cleanedUpLogs[blockNumber] = series

if dropped > 0 || expired > 0 {
totalDropped += dropped
q.logs[blockNumber] = updated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,16 @@ func (p *logEventProvider) Close() error {
bufV1 := p.buffer.(*logBuffer)
availableLogsJSON, _ := json.Marshal(bufV1.availableLogs)
dequeuedLogsJSON, _ := json.Marshal(bufV1.dequeuedLogs)
p.lggr.Debugw("shutting down LogProvider", "availableLogsJSON", string(availableLogsJSON), "dequeuedLogsJSON", string(dequeuedLogsJSON))

unavailableLogsJSON, _ := json.Marshal(bufV1.unavailableLogs)

allCleanedUpLogs := map[string]map[int64][]int{}
for qid, q := range bufV1.queues {
allCleanedUpLogs[qid] = q.cleanedUpLogs
}
cleanedUpLogsJSON, _ := json.Marshal(allCleanedUpLogs)

p.lggr.Debugw("shutting down LogProvider", "availableLogsJSON", string(availableLogsJSON), "dequeuedLogsJSON", string(dequeuedLogsJSON), "cleanedUpLogsJSON", string(cleanedUpLogsJSON), "unavailableLogsJSON", string(unavailableLogsJSON))
return nil
})

Expand Down

0 comments on commit 1b7e535

Please sign in to comment.