Skip to content

Commit

Permalink
Read logs for block number and trigger config (#10354)
Browse files Browse the repository at this point in the history
* Read logs for block number and trigger config

* Use indexed logs

* Cleanup

* Reduce confs

* Update test coverage

* Continue if id cannot be parsed

* Please the linter

* Batch the log trigger upkeep refresh

* Batch the upkeep refresh

* No longer use errgroup

* Dont error on a single batch, add a processing delay, add comments

* Update tests

* Add a todo

* Refresh active upkeeps before batching

* update comment

* Only take max values

* Filter log upkeeps at a higher level

* Refresh after collecting the log upkeep IDs, generate hashes
  • Loading branch information
ferglor authored Aug 30, 2023
1 parent b41e626 commit 76999e5
Show file tree
Hide file tree
Showing 4 changed files with 477 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var (
)

func (p *logEventProvider) RefreshActiveUpkeeps(ids ...*big.Int) ([]*big.Int, error) {
// Exploratory: investigate how we can batch the refresh
if len(ids) == 0 {
return nil, nil
}
Expand Down
107 changes: 92 additions & 15 deletions core/services/ocr2/plugins/ocr2keeper/evm21/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
coreTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/pkg/errors"

"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types"
"go.uber.org/multierr"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
Expand All @@ -37,6 +37,9 @@ const (
defaultAllowListExpiration = 20 * time.Minute
// allowListCleanupInterval decides when the expired items in allowList cache will be deleted.
allowListCleanupInterval = 5 * time.Minute
// TODO decide on a value for this
indexedLogsConfirmations = 10
logTriggerRefreshBatchSize = 32
)

var (
Expand Down Expand Up @@ -270,34 +273,108 @@ func (r *EvmRegistry) refreshActiveUpkeeps() error {
}
r.active.Reset(ids...)

return r.refreshLogTriggerUpkeeps(ids)
}

// refreshLogTriggerUpkeeps refreshes the active upkeep ids for log trigger upkeeps
//
// TODO: check for updated config for log trigger upkeeps and update it, currently we ignore them.
func (r *EvmRegistry) refreshLogTriggerUpkeeps(ids []*big.Int) error {
logTriggerIDs := make([]*big.Int, 0)
var logTriggerIDs []*big.Int
for _, id := range ids {
uid := &ocr2keepers.UpkeepIdentifier{}
if ok := uid.FromBigInt(id); !ok {
r.lggr.Warnf("failed to parse upkeep id %s", id.String())
continue
}
switch core.GetUpkeepType(*uid) {
case ocr2keepers.LogTrigger:
logTriggerIDs = append(logTriggerIDs, id)
default:
}
}

newUpkeeps, err := r.logEventProvider.RefreshActiveUpkeeps(logTriggerIDs...)
if err != nil {
return fmt.Errorf("failed to refresh active upkeep ids in log event provider: %w", err)
}

return r.refreshLogTriggerUpkeeps(newUpkeeps)
}

// refreshLogTriggerUpkeeps refreshes the active upkeep ids for log trigger upkeeps
//
// TODO: check for updated config for log trigger upkeeps and update it, currently we ignore them.
func (r *EvmRegistry) refreshLogTriggerUpkeeps(ids []*big.Int) error {
var err error
for i := 0; i < len(ids); i += logTriggerRefreshBatchSize {
end := i + logTriggerRefreshBatchSize
if end > len(ids) {
end = len(ids)
}
idBatch := ids[i:end]

if batchErr := r.refreshLogTriggerUpkeepsBatch(idBatch); batchErr != nil {
multierr.AppendInto(&err, batchErr)
}

time.Sleep(500 * time.Millisecond)
}

return err
}

func (r *EvmRegistry) refreshLogTriggerUpkeepsBatch(logTriggerIDs []*big.Int) error {
var logTriggerHashes []common.Hash
for _, id := range logTriggerIDs {
logTriggerHashes = append(logTriggerHashes, common.BigToHash(id))
}

unpausedLogs, err := r.poller.IndexedLogs(iregistry21.IKeeperRegistryMasterUpkeepUnpaused{}.Topic(), r.addr, 1, logTriggerHashes, indexedLogsConfirmations, pg.WithParentCtx(r.ctx))
if err != nil {
return err
}
configSetLogs, err := r.poller.IndexedLogs(iregistry21.IKeeperRegistryMasterUpkeepTriggerConfigSet{}.Topic(), r.addr, 1, logTriggerHashes, indexedLogsConfirmations, pg.WithParentCtx(r.ctx))
if err != nil {
return err
}

logs := append(unpausedLogs, configSetLogs...)

configSetBlockNumbers := map[string]uint64{}
unpausedBlockNumbers := map[string]uint64{}
perUpkeepConfig := map[string][]byte{}

for _, log := range logs {
rawLog := log.ToGethLog()
abilog, err := r.registry.ParseLog(rawLog)
if err != nil {
return err
}
switch l := abilog.(type) {
case *iregistry21.IKeeperRegistryMasterUpkeepTriggerConfigSet:
if rawLog.BlockNumber > configSetBlockNumbers[l.Id.String()] {
configSetBlockNumbers[l.Id.String()] = rawLog.BlockNumber
perUpkeepConfig[l.Id.String()] = l.TriggerConfig
}
case *iregistry21.IKeeperRegistryMasterUpkeepUnpaused:
if rawLog.BlockNumber > unpausedBlockNumbers[l.Id.String()] {
unpausedBlockNumbers[l.Id.String()] = rawLog.BlockNumber
}
}
}

var merr error
for _, id := range newUpkeeps {
// TODO: find the ConfigSet/UpkeepUnpaused events for this upkeep and pass cfg and block number
// block number should be taken from UpkeepUnpaused if it's block is higher than ConfigSet
if err := r.updateTriggerConfig(id, nil, 0); err != nil {
for _, id := range logTriggerIDs {
logBlock, ok := configSetBlockNumbers[id.String()]
if !ok {
r.lggr.Warnf("unable to find config set block number for %s", id.String())
continue
}

config, ok := perUpkeepConfig[id.String()]
if !ok {
r.lggr.Warnf("unable to find per upkeep config for %s", id.String())
continue
}

// In case an upkeep was paused then unpaused after a config set event, start the config from the unpaused block number
if unpausedBlockNumbers[id.String()] > logBlock {
logBlock = unpausedBlockNumbers[id.String()]
}
if err := r.updateTriggerConfig(id, config, logBlock); err != nil {
merr = goerrors.Join(merr, fmt.Errorf("failed to update trigger config for upkeep id %s: %w", id.String(), err))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,17 @@ func TestRegistry_VerifyCheckBlock(t *testing.T) {
type mockLogPoller struct {
logpoller.LogPoller
GetBlocksRangeFn func(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]logpoller.LogPollerBlock, error)
IndexedLogsFn func(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs int, qopts ...pg.QOpt) ([]logpoller.Log, error)
}

func (p *mockLogPoller) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]logpoller.LogPollerBlock, error) {
return p.GetBlocksRangeFn(ctx, numbers, qopts...)
}

func (p *mockLogPoller) IndexedLogs(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs int, qopts ...pg.QOpt) ([]logpoller.Log, error) {
return p.IndexedLogsFn(eventSig, address, topicIndex, topicValues, confs, qopts...)
}

func TestRegistry_VerifyLogExists(t *testing.T) {
lggr := logger.TestLogger(t)
upkeepId := ocr2keepers.UpkeepIdentifier{}
Expand Down
Loading

0 comments on commit 76999e5

Please sign in to comment.