Skip to content

Commit

Permalink
Clean up from config
Browse files Browse the repository at this point in the history
  • Loading branch information
ferglor committed Jul 11, 2024
1 parent af066a3 commit 2e9e0ac
Show file tree
Hide file tree
Showing 7 changed files with 381 additions and 460 deletions.
3 changes: 1 addition & 2 deletions core/services/job/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ maxServiceWorkers = 100
cacheEvictionInterval = "1s"
mercuryCredentialName = "%s"
contractVersion = "v2.1"
useBufferV1 = %v
`
voterTurnoutDataSourceTemplate = `
// data source 1
Expand Down Expand Up @@ -277,7 +276,7 @@ func makeOCR2Keeper21JobSpec(t testing.TB, ks keystore.Master, transmitter commo
_, registry := cltest.MustInsertRandomKey(t, ks.Eth())

ocr2Keeper21Job := fmt.Sprintf(ocr2Keeper21JobSpecTemplate, registry.String(), kb.ID(), transmitter,
fmt.Sprintf("%s127.0.0.1:%d", bootstrapPeerID, bootstrapNodePort), chainID, "mercury cred", false)
fmt.Sprintf("%s127.0.0.1:%d", bootstrapPeerID, bootstrapNodePort), chainID, "mercury cred")

jobSpec := makeOCR2JobSpecFromToml(t, ocr2Keeper21Job)

Expand Down
3 changes: 0 additions & 3 deletions core/services/ocr2/plugins/ocr2keeper/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ type PluginConfig struct {
ContractVersion string `json:"contractVersion"`
// CaptureAutomationCustomTelemetry is a bool flag to toggle Custom Telemetry Service
CaptureAutomationCustomTelemetry *bool `json:"captureAutomationCustomTelemetry,omitempty"`
// UseBufferV1 is a bool flag to toggle the new log buffer implementation
// TODO: (AUTO-9355) remove once we have a single version
UseBufferV1 *bool `json:"useBufferV1,omitempty"`
}

func ValidatePluginConfig(cfg PluginConfig) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,106 +34,92 @@ import (
)

func TestIntegration_LogEventProvider(t *testing.T) {
tests := []struct {
name string
logLimit uint32
}{
{
name: "buffer v1",
logLimit: 10,
},
}
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()

backend, stopMining, accounts := setupBackend(t)
defer stopMining()
carrol := accounts[2]

db := setupDB(t)
defer db.Close()

opts := logprovider.NewOptions(200, big.NewInt(1))
opts.ReadInterval = time.Second / 2
opts.LogLimit = 10

lp, ethClient := setupDependencies(t, db, backend)
filterStore := logprovider.NewUpkeepFilterStore()
provider, _ := setup(logger.TestLogger(t), lp, nil, nil, filterStore, &opts)
logProvider := provider.(logprovider.LogEventProviderTest)

n := 10

backend.Commit()
lp.PollAndSaveLogs(ctx, 1) // Ensure log poller has a latest block

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()

backend, stopMining, accounts := setupBackend(t)
defer stopMining()
carrol := accounts[2]

db := setupDB(t)
defer db.Close()

opts := logprovider.NewOptions(200, big.NewInt(1))
opts.ReadInterval = time.Second / 2
opts.LogLimit = tc.logLimit

lp, ethClient := setupDependencies(t, db, backend)
filterStore := logprovider.NewUpkeepFilterStore()
provider, _ := setup(logger.TestLogger(t), lp, nil, nil, filterStore, &opts)
logProvider := provider.(logprovider.LogEventProviderTest)

n := 10

backend.Commit()
lp.PollAndSaveLogs(ctx, 1) // Ensure log poller has a latest block

ids, addrs, contracts := deployUpkeepCounter(ctx, t, n, ethClient, backend, carrol, logProvider)
lp.PollAndSaveLogs(ctx, int64(n))

go func() {
if err := logProvider.Start(ctx); err != nil {
t.Logf("error starting log provider: %s", err)
t.Fail()
}
}()
defer logProvider.Close()

logsRounds := 10

poll := pollFn(ctx, t, lp, ethClient)

triggerEvents(ctx, t, backend, carrol, logsRounds, poll, contracts...)

poll(backend.Commit())

waitLogPoller(ctx, t, backend, lp, ethClient)

waitLogProvider(ctx, t, logProvider, 3)

allPayloads := collectPayloads(ctx, t, logProvider, n, logsRounds/2)
require.GreaterOrEqual(t, len(allPayloads), n,
"failed to get logs after restart")

t.Run("Restart", func(t *testing.T) {
t.Log("restarting log provider")
// assuming that our service was closed and restarted,
// we should be able to backfill old logs and fetch new ones
filterStore := logprovider.NewUpkeepFilterStore()
logProvider2 := logprovider.NewLogProvider(logger.TestLogger(t), lp, big.NewInt(1), logprovider.NewLogEventsPacker(), filterStore, opts)

poll(backend.Commit())
go func() {
if err2 := logProvider2.Start(ctx); err2 != nil {
t.Logf("error starting log provider: %s", err2)
t.Fail()
}
}()
defer logProvider2.Close()

// re-register filters
for i, id := range ids {
err := logProvider2.RegisterFilter(ctx, logprovider.FilterOptions{
UpkeepID: id,
TriggerConfig: newPlainLogTriggerConfig(addrs[i]),
// using block number at which the upkeep was registered,
// before we emitted any logs
UpdateBlock: uint64(n),
})
require.NoError(t, err)
}

waitLogProvider(ctx, t, logProvider2, 2)

t.Log("getting logs after restart")
logsAfterRestart := collectPayloads(ctx, t, logProvider2, n, 5)
require.GreaterOrEqual(t, len(logsAfterRestart), n,
"failed to get logs after restart")
ids, addrs, contracts := deployUpkeepCounter(ctx, t, n, ethClient, backend, carrol, logProvider)
lp.PollAndSaveLogs(ctx, int64(n))

go func() {
if err := logProvider.Start(ctx); err != nil {
t.Logf("error starting log provider: %s", err)
t.Fail()
}
}()
defer logProvider.Close()

logsRounds := 10

poll := pollFn(ctx, t, lp, ethClient)

triggerEvents(ctx, t, backend, carrol, logsRounds, poll, contracts...)

poll(backend.Commit())

waitLogPoller(ctx, t, backend, lp, ethClient)

waitLogProvider(ctx, t, logProvider, 3)

allPayloads := collectPayloads(ctx, t, logProvider, n, logsRounds/2)
require.GreaterOrEqual(t, len(allPayloads), n,
"failed to get logs after restart")

t.Run("Restart", func(t *testing.T) {
t.Log("restarting log provider")
// assuming that our service was closed and restarted,
// we should be able to backfill old logs and fetch new ones
filterStore := logprovider.NewUpkeepFilterStore()
logProvider2 := logprovider.NewLogProvider(logger.TestLogger(t), lp, big.NewInt(1), logprovider.NewLogEventsPacker(), filterStore, opts)

poll(backend.Commit())
go func() {
if err2 := logProvider2.Start(ctx); err2 != nil {
t.Logf("error starting log provider: %s", err2)
t.Fail()
}
}()
defer logProvider2.Close()

// re-register filters
for i, id := range ids {
err := logProvider2.RegisterFilter(ctx, logprovider.FilterOptions{
UpkeepID: id,
TriggerConfig: newPlainLogTriggerConfig(addrs[i]),
// using block number at which the upkeep was registered,
// before we emitted any logs
UpdateBlock: uint64(n),
})
})
}
require.NoError(t, err)
}

waitLogProvider(ctx, t, logProvider2, 2)

t.Log("getting logs after restart")
logsAfterRestart := collectPayloads(ctx, t, logProvider2, n, 5)
require.GreaterOrEqual(t, len(logsAfterRestart), n,
"failed to get logs after restart")
})
}

func TestIntegration_LogEventProvider_UpdateConfig(t *testing.T) {
Expand Down Expand Up @@ -210,70 +196,55 @@ func TestIntegration_LogEventProvider_UpdateConfig(t *testing.T) {
}

func TestIntegration_LogEventProvider_Backfill(t *testing.T) {
tests := []struct {
name string
logLimit uint32
}{
{
name: "buffer v1",
logLimit: 10,
},
}

for _, tc := range tests {
limitLow := tc.logLimit
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(testutils.Context(t), time.Second*60)
defer cancel()
ctx, cancel := context.WithTimeout(testutils.Context(t), time.Second*60)
defer cancel()

backend, stopMining, accounts := setupBackend(t)
defer stopMining()
carrol := accounts[2]
backend, stopMining, accounts := setupBackend(t)
defer stopMining()
carrol := accounts[2]

db := setupDB(t)
defer db.Close()
db := setupDB(t)
defer db.Close()

opts := logprovider.NewOptions(200, big.NewInt(1))
opts.ReadInterval = time.Second / 4
opts.LogLimit = limitLow
opts := logprovider.NewOptions(200, big.NewInt(1))
opts.ReadInterval = time.Second / 4
opts.LogLimit = 10

lp, ethClient := setupDependencies(t, db, backend)
filterStore := logprovider.NewUpkeepFilterStore()
provider, _ := setup(logger.TestLogger(t), lp, nil, nil, filterStore, &opts)
logProvider := provider.(logprovider.LogEventProviderTest)
lp, ethClient := setupDependencies(t, db, backend)
filterStore := logprovider.NewUpkeepFilterStore()
provider, _ := setup(logger.TestLogger(t), lp, nil, nil, filterStore, &opts)
logProvider := provider.(logprovider.LogEventProviderTest)

n := 10
n := 10

backend.Commit()
lp.PollAndSaveLogs(ctx, 1) // Ensure log poller has a latest block
_, _, contracts := deployUpkeepCounter(ctx, t, n, ethClient, backend, carrol, logProvider)
backend.Commit()
lp.PollAndSaveLogs(ctx, 1) // Ensure log poller has a latest block
_, _, contracts := deployUpkeepCounter(ctx, t, n, ethClient, backend, carrol, logProvider)

poll := pollFn(ctx, t, lp, ethClient)
poll := pollFn(ctx, t, lp, ethClient)

rounds := 8
for i := 0; i < rounds; i++ {
poll(backend.Commit())
triggerEvents(ctx, t, backend, carrol, n, poll, contracts...)
poll(backend.Commit())
}
rounds := 8
for i := 0; i < rounds; i++ {
poll(backend.Commit())
triggerEvents(ctx, t, backend, carrol, n, poll, contracts...)
poll(backend.Commit())
}

waitLogPoller(ctx, t, backend, lp, ethClient)
waitLogPoller(ctx, t, backend, lp, ethClient)

// starting the log provider should backfill logs
go func() {
if startErr := logProvider.Start(ctx); startErr != nil {
t.Logf("error starting log provider: %s", startErr)
t.Fail()
}
}()
defer logProvider.Close()
// starting the log provider should backfill logs
go func() {
if startErr := logProvider.Start(ctx); startErr != nil {
t.Logf("error starting log provider: %s", startErr)
t.Fail()
}
}()
defer logProvider.Close()

waitLogProvider(ctx, t, logProvider, 3)
waitLogProvider(ctx, t, logProvider, 3)

allPayloads := collectPayloads(ctx, t, logProvider, n*rounds, 5)
require.GreaterOrEqual(t, len(allPayloads), len(contracts), "failed to backfill logs")
})
}
allPayloads := collectPayloads(ctx, t, logProvider, n*rounds, 5)
require.GreaterOrEqual(t, len(allPayloads), len(contracts), "failed to backfill logs")
}

func TestIntegration_LogRecoverer_Backfill(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type logEventProvider struct {
registerLock sync.Mutex

filterStore UpkeepFilterStore
bufferV1 LogBuffer
buffer LogBuffer

opts LogTriggersOptions

Expand All @@ -113,7 +113,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, chainID *big
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named("KeepersRegistry.LogEventProvider"),
packer: packer,
bufferV1: NewLogBuffer(lggr, uint32(opts.LookbackBlocks), opts.BlockRate, opts.LogLimit),
buffer: NewLogBuffer(lggr, uint32(opts.LookbackBlocks), opts.BlockRate, opts.LogLimit),
poller: poller,
opts: opts,
filterStore: filterStore,
Expand All @@ -140,7 +140,7 @@ func (p *logEventProvider) SetConfig(cfg ocr2keepers.LogEventProviderConfig) {
atomic.StoreUint32(&p.opts.BlockRate, blockRate)
atomic.StoreUint32(&p.opts.LogLimit, logLimit)

p.bufferV1.SetConfig(uint32(p.opts.LookbackBlocks), blockRate, logLimit)
p.buffer.SetConfig(uint32(p.opts.LookbackBlocks), blockRate, logLimit)
}

func (p *logEventProvider) Start(context.Context) error {
Expand Down Expand Up @@ -259,7 +259,7 @@ func (p *logEventProvider) minimumCommitmentDequeue(latestBlock, start int64) []
startWindow, _ := getBlockWindow(start, blockRate)

// dequeue the minimum number logs (log limit, varies by chain) per upkeep for this block window
logs, remaining := p.bufferV1.Dequeue(startWindow, MaxPayloads-len(payloads), true)
logs, remaining := p.buffer.Dequeue(startWindow, MaxPayloads-len(payloads), true)
if len(logs) > 0 {
p.lggr.Debugw("minimum commitment dequeue", "start", start, "latestBlock", latestBlock, "logs", len(logs), "remaining", remaining)
}
Expand All @@ -285,7 +285,7 @@ func (p *logEventProvider) bestEffortDequeue(latestBlock, start int64, payloads
startWindow, _ := getBlockWindow(start, blockRate)

// dequeue as many logs as we can, based on remaining capacity, for this block window
logs, remaining := p.bufferV1.Dequeue(startWindow, MaxPayloads-len(payloads), false)
logs, remaining := p.buffer.Dequeue(startWindow, MaxPayloads-len(payloads), false)
if len(logs) > 0 {
p.lggr.Debugw("best effort dequeue", "start", start, "latestBlock", latestBlock, "logs", len(logs), "remaining", remaining)
}
Expand Down Expand Up @@ -491,7 +491,7 @@ func (p *logEventProvider) readLogs(ctx context.Context, latest int64, filters [
}
filteredLogs := filter.Select(logs...)

p.bufferV1.Enqueue(filter.upkeepID, filteredLogs...)
p.buffer.Enqueue(filter.upkeepID, filteredLogs...)

// Update the lastPollBlock for filter in slice this is then
// updated into filter store in updateFiltersLastPoll
Expand All @@ -505,5 +505,5 @@ func (p *logEventProvider) syncBufferFilters() error {
p.lock.RLock()
defer p.lock.RUnlock()

return p.bufferV1.SyncFilters(p.filterStore)
return p.buffer.SyncFilters(p.filterStore)
}
Loading

0 comments on commit 2e9e0ac

Please sign in to comment.