Skip to content

Commit

Permalink
Change keepers to use the default contract transmitter (#11308)
Browse files Browse the repository at this point in the history
* Switch keepers to use the default contract transmitter
Pass the gas limit into the transmitter constructor so we can specify the automation gas limit
Update tests

* Fix streams import

* Clean up function calls

* Remove pipeline runner dependency for automation

* Use contractTransmitterOpts to specify a pluginGasLimit

* Make the pluginGasLimit a pointer

* Clean up the pipeline transmitter

* Attempt to listen for transmits
Clean up linter
Extract function
Intentionally fail test
Rework transmit listen
Try filtering for transmits
Update test

* Revert "Attempt to listen for transmits"

This reverts commit 198e6669a6f64768c84acb82e01b2773c89426ce.

* Listen for performed events

* Goimports

* Add wrapper function that lets us specify a count of performed

* Update integration test

* Pass the configWatcher as a parameter to indicate that its required
  • Loading branch information
ferglor authored Dec 12, 2023
1 parent 96b7ab6 commit 00e1c55
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 329 deletions.
4 changes: 2 additions & 2 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ func (d *Delegate) newServicesOCR2Keepers21(
return nil, fmt.Errorf("keeper2 services: failed to get chain %s: %w", rid.ChainID, err2)
}

keeperProvider, services, err2 := ocr2keeper.EVMDependencies21(jb, d.db, lggr, chain, d.pipelineRunner, mc, kb, d.cfg.Database())
keeperProvider, services, err2 := ocr2keeper.EVMDependencies21(jb, d.db, lggr, chain, mc, kb, d.cfg.Database(), d.ethKs)
if err2 != nil {
return nil, errors.Wrap(err2, "could not build dependencies for ocr2 keepers")
}
Expand Down Expand Up @@ -1201,7 +1201,7 @@ func (d *Delegate) newServicesOCR2Keepers20(
return nil, fmt.Errorf("keepers2.0 services: failed to get chain (%s): %w", rid.ChainID, err2)
}

keeperProvider, rgstry, encoder, logProvider, err2 := ocr2keeper.EVMDependencies20(jb, d.db, lggr, chain, d.pipelineRunner)
keeperProvider, rgstry, encoder, logProvider, err2 := ocr2keeper.EVMDependencies20(jb, d.db, lggr, chain, d.ethKs)
if err2 != nil {
return nil, errors.Wrap(err2, "could not build dependencies for ocr2 keepers")
}
Expand Down
93 changes: 35 additions & 58 deletions core/services/ocr2/plugins/ocr2keeper/integration_21_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
)

Expand Down Expand Up @@ -117,7 +116,7 @@ func TestIntegration_KeeperPluginConditionalUpkeep(t *testing.T) {
require.NoError(t, err)
registry := deployKeeper21Registry(t, steve, backend, linkAddr, linkFeedAddr, gasFeedAddr)

nodes, _ := setupNodes(t, nodeKeys, registry, backend, steve)
setupNodes(t, nodeKeys, registry, backend, steve)

<-time.After(time.Second * 5)

Expand Down Expand Up @@ -160,8 +159,6 @@ func TestIntegration_KeeperPluginConditionalUpkeep(t *testing.T) {
}
g.Eventually(receivedBytes, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.Equal(payload1))

checkPipelineRuns(t, nodes, 1)

// change payload
_, err = upkeepContract.SetBytesToSend(carrol, payload2)
require.NoError(t, err)
Expand Down Expand Up @@ -204,7 +201,7 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) {
require.NoError(t, err)

registry := deployKeeper21Registry(t, steve, backend, linkAddr, linkFeedAddr, gasFeedAddr)
nodes, _ := setupNodes(t, nodeKeys, registry, backend, steve)
setupNodes(t, nodeKeys, registry, backend, steve)
upkeeps := 1

_, err = linkToken.Transfer(sergey, carrol.From, big.NewInt(0).Mul(oneHunEth, big.NewInt(int64(upkeeps+1))))
Expand All @@ -228,35 +225,36 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) {
g.Eventually(listener, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.BeTrue())
done()

runs := checkPipelineRuns(t, nodes, 1)

t.Run("recover logs", func(t *testing.T) {

addr, contract := addrs[0], contracts[0]
upkeepID := registerUpkeep(t, registry, addr, carrol, steve, backend)
backend.Commit()
t.Logf("Registered new upkeep %s for address %s", upkeepID.String(), addr.String())
// Emit 100 logs in a burst
emits := 100
recoverEmits := 100
i := 0
emitEvents(testutils.Context(t), t, 100, []*log_upkeep_counter_wrapper.LogUpkeepCounter{contract}, carrol, func() {
i++
if i%(emits/4) == 0 {
if i%(recoverEmits/4) == 0 {
backend.Commit()
time.Sleep(time.Millisecond * 250) // otherwise we get "invalid transaction nonce" errors
}
})
// Mine enough blocks to ensre these logs don't fall into log provider range

beforeDummyBlocks := backend.Blockchain().CurrentBlock().Number.Uint64()

// Mine enough blocks to ensure these logs don't fall into log provider range
dummyBlocks := 500
for i := 0; i < dummyBlocks; i++ {
backend.Commit()
time.Sleep(time.Millisecond * 10)
}
t.Logf("Mined %d blocks, waiting for logs to be recovered", dummyBlocks)

expectedPostRecover := runs + emits
waitPipelineRuns(t, nodes, expectedPostRecover, testutils.WaitTimeout(t), cltest.DBPollingInterval)
t.Logf("Mined %d blocks, waiting for logs to be recovered", dummyBlocks)

listener, done := listenPerformedN(t, backend, registry, ids, int64(beforeDummyBlocks), recoverEmits)
g.Eventually(listener, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.BeTrue())
done()
})
}

Expand Down Expand Up @@ -296,7 +294,7 @@ func TestIntegration_KeeperPluginLogUpkeep_Retry(t *testing.T) {

registry := deployKeeper21Registry(t, registryOwner, backend, linkAddr, linkFeedAddr, gasFeedAddr)

nodes, mercuryServer := setupNodes(t, nodeKeys, registry, backend, registryOwner)
_, mercuryServer := setupNodes(t, nodeKeys, registry, backend, registryOwner)

const upkeepCount = 10
const mercuryFailCount = upkeepCount * 3 * 2
Expand Down Expand Up @@ -374,39 +372,6 @@ func TestIntegration_KeeperPluginLogUpkeep_Retry(t *testing.T) {
g.Eventually(listener, testutils.WaitTimeout(t)-(5*time.Second), cltest.DBPollingInterval).Should(gomega.BeTrue())

done()

_ = checkPipelineRuns(t, nodes, 1*len(nodes)) // TODO: TBD
}

func waitPipelineRuns(t *testing.T, nodes []Node, n int, timeout, interval time.Duration) {
ctx, cancel := context.WithTimeout(testutils.Context(t), timeout)
defer cancel()
var allRuns []pipeline.Run
for len(allRuns) < n && ctx.Err() == nil {
allRuns = []pipeline.Run{}
for _, node := range nodes {
runs, err := node.App.PipelineORM().GetAllRuns()
require.NoError(t, err)
allRuns = append(allRuns, runs...)
}
time.Sleep(interval)
}
runs := len(allRuns)
t.Logf("found %d pipeline runs", runs)
require.GreaterOrEqual(t, runs, n)
}

func checkPipelineRuns(t *testing.T, nodes []Node, n int) int {
var allRuns []pipeline.Run
for _, node := range nodes {
runs, err2 := node.App.PipelineORM().GetAllRuns()
require.NoError(t, err2)
allRuns = append(allRuns, runs...)
}
runs := len(allRuns)
t.Logf("found %d pipeline runs", runs)
require.GreaterOrEqual(t, runs, n)
return runs
}

func emitEvents(ctx context.Context, t *testing.T, n int, contracts []*log_upkeep_counter_wrapper.LogUpkeepCounter, carrol *bind.TransactOpts, afterEmit func()) {
Expand All @@ -424,32 +389,32 @@ func mapListener(m *sync.Map, n int) func() bool {
return func() bool {
count := 0
m.Range(func(key, value interface{}) bool {
count++
count += value.(int)
return true
})
return count > n
}
}

func listenPerformed(t *testing.T, backend *backends.SimulatedBackend, registry *iregistry21.IKeeperRegistryMaster, ids []*big.Int, startBlock int64) (func() bool, func()) {
func listenPerformedN(t *testing.T, backend *backends.SimulatedBackend, registry *iregistry21.IKeeperRegistryMaster, ids []*big.Int, startBlock int64, count int) (func() bool, func()) {
cache := &sync.Map{}
ctx, cancel := context.WithCancel(testutils.Context(t))
start := startBlock

go func() {
for ctx.Err() == nil {
bl := backend.Blockchain().CurrentBlock().Number.Uint64()
currentBlock := backend.Blockchain().CurrentBlock().Number.Uint64()

sc := make([]bool, len(ids))
for i := range sc {
sc[i] = true
success := make([]bool, len(ids))
for i := range success {
success[i] = true
}

iter, err := registry.FilterUpkeepPerformed(&bind.FilterOpts{
Start: uint64(start),
End: &bl,
End: &currentBlock,
Context: ctx,
}, ids, sc)
}, ids, success)

if ctx.Err() != nil {
return
Expand All @@ -460,7 +425,15 @@ func listenPerformed(t *testing.T, backend *backends.SimulatedBackend, registry
for iter.Next() {
if iter.Event != nil {
t.Logf("[automation-ocr3 | EvmRegistry] upkeep performed event emitted for id %s", iter.Event.Id.String())
cache.Store(iter.Event.Id.String(), true)

//cache.Store(iter.Event.Id.String(), true)
count, ok := cache.Load(iter.Event.Id.String())
if !ok {
cache.Store(iter.Event.Id.String(), 1)
continue
}
countI := count.(int)
cache.Store(iter.Event.Id.String(), countI+1)
}
}

Expand All @@ -470,7 +443,11 @@ func listenPerformed(t *testing.T, backend *backends.SimulatedBackend, registry
}
}()

return mapListener(cache, 0), cancel
return mapListener(cache, count), cancel
}

func listenPerformed(t *testing.T, backend *backends.SimulatedBackend, registry *iregistry21.IKeeperRegistryMaster, ids []*big.Int, startBlock int64) (func() bool, func()) {
return listenPerformedN(t, backend, registry, ids, startBlock, 0)
}

func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IKeeperRegistryMaster, backend *backends.SimulatedBackend, usr *bind.TransactOpts) ([]Node, *SimulatedMercuryServer) {
Expand Down
19 changes: 0 additions & 19 deletions core/services/ocr2/plugins/ocr2keeper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/validate"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrbootstrap"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
)
Expand Down Expand Up @@ -411,15 +410,6 @@ func TestIntegration_KeeperPluginBasic(t *testing.T) {
}
g.Eventually(receivedBytes, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.Equal(payload1))

// check pipeline runs
var allRuns []pipeline.Run
for _, node := range nodes {
runs, err2 := node.App.PipelineORM().GetAllRuns()
require.NoError(t, err2)
allRuns = append(allRuns, runs...)
}
require.GreaterOrEqual(t, len(allRuns), 1)

// change payload
_, err = upkeepContract.SetBytesToSend(carrol, payload2)
require.NoError(t, err)
Expand Down Expand Up @@ -683,15 +673,6 @@ func TestIntegration_KeeperPluginForwarderEnabled(t *testing.T) {
}
g.Eventually(receivedBytes, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.Equal(payload1))

// check pipeline runs
var allRuns []pipeline.Run
for _, node := range nodes {
runs, err2 := node.App.PipelineORM().GetAllRuns()
require.NoError(t, err2)
allRuns = append(allRuns, runs...)
}
require.GreaterOrEqual(t, len(allRuns), 1)

// change payload
_, err = upkeepContract.SetBytesToSend(carrol, payload2)
require.NoError(t, err)
Expand Down
14 changes: 7 additions & 7 deletions core/services/ocr2/plugins/ocr2keeper/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/models"
evmregistry20 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v20"
evmregistry21 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21"
evmregistry21transmit "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/transmit"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
evmrelay "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
)

Expand All @@ -43,9 +43,9 @@ var (
ErrNoChainFromSpec = fmt.Errorf("could not create chain from spec")
)

func EVMProvider(db *sqlx.DB, chain legacyevm.Chain, lggr logger.Logger, spec job.Job, pr pipeline.Runner) (evmrelay.OCR2KeeperProvider, error) {
func EVMProvider(db *sqlx.DB, chain legacyevm.Chain, lggr logger.Logger, spec job.Job, ethKeystore keystore.Eth) (evmrelay.OCR2KeeperProvider, error) {
oSpec := spec.OCR2OracleSpec
ocr2keeperRelayer := evmrelay.NewOCR2KeeperRelayer(db, chain, pr, spec, lggr.Named("OCR2KeeperRelayer"))
ocr2keeperRelayer := evmrelay.NewOCR2KeeperRelayer(db, chain, lggr.Named("OCR2KeeperRelayer"), ethKeystore)

keeperProvider, err := ocr2keeperRelayer.NewOCR2KeeperProvider(
types.RelayArgs{
Expand All @@ -71,15 +71,15 @@ func EVMDependencies20(
db *sqlx.DB,
lggr logger.Logger,
chain legacyevm.Chain,
pr pipeline.Runner,
ethKeystore keystore.Eth,
) (evmrelay.OCR2KeeperProvider, *evmregistry20.EvmRegistry, Encoder20, *evmregistry20.LogProvider, error) {
var err error

var keeperProvider evmrelay.OCR2KeeperProvider
var registry *evmregistry20.EvmRegistry

// the provider will be returned as a dependency
if keeperProvider, err = EVMProvider(db, chain, lggr, spec, pr); err != nil {
if keeperProvider, err = EVMProvider(db, chain, lggr, spec, ethKeystore); err != nil {
return nil, nil, nil, nil, err
}

Expand Down Expand Up @@ -112,17 +112,17 @@ func EVMDependencies21(
db *sqlx.DB,
lggr logger.Logger,
chain legacyevm.Chain,
pr pipeline.Runner,
mc *models.MercuryCredentials,
keyring ocrtypes.OnchainKeyring,
dbCfg pg.QConfig,
ethKeystore keystore.Eth,
) (evmrelay.OCR2KeeperProvider, evmregistry21.AutomationServices, error) {
var err error
var keeperProvider evmrelay.OCR2KeeperProvider

oSpec := spec.OCR2OracleSpec
// the provider will be returned as a dependency
if keeperProvider, err = EVMProvider(db, chain, lggr, spec, pr); err != nil {
if keeperProvider, err = EVMProvider(db, chain, lggr, spec, ethKeystore); err != nil {
return nil, nil, err
}

Expand Down
Loading

0 comments on commit 00e1c55

Please sign in to comment.