diff --git a/core/services/metatx/integration_test.go b/core/services/metatx/integration_test.go index 36f749572b..5ce7f3a72e 100644 --- a/core/services/metatx/integration_test.go +++ b/core/services/metatx/integration_test.go @@ -181,7 +181,7 @@ merge [type=merge left="{}" right="{\\\"%s\\\":$(link_parse), \\\"%s\\\":$(eth_p defer wrappedDestTokenUSD.Close() defer bankERC20USD.Close() - ccipContracts.SetUpNodesAndJobs(t, tokenPricesUSDPipeline) + ccipContracts.SetUpNodesAndJobs(t, tokenPricesUSDPipeline, "") geCurrentSeqNum := 1 diff --git a/core/services/ocr2/plugins/ccip/clo_ccip_integration_test.go b/core/services/ocr2/plugins/ccip/clo_ccip_integration_test.go index 01e2deaff1..eeb3cc250a 100644 --- a/core/services/ocr2/plugins/ccip/clo_ccip_integration_test.go +++ b/core/services/ocr2/plugins/ccip/clo_ccip_integration_test.go @@ -20,7 +20,7 @@ func Test_CLOSpecApprovalFlow(t *testing.T) { defer ethUSD.Close() // Create initial job specs - jobParams := ccipTH.SetUpNodesAndJobs(t, tokenPricesUSDPipeline) + jobParams := ccipTH.SetUpNodesAndJobs(t, tokenPricesUSDPipeline, "http://blah.com") ccipTH.SetupFeedsManager(t) // Propose and approve new specs diff --git a/core/services/ocr2/plugins/ccip/execution_plugin.go b/core/services/ocr2/plugins/ccip/execution_plugin.go index 52bd20d78e..1cf36add41 100644 --- a/core/services/ocr2/plugins/ccip/execution_plugin.go +++ b/core/services/ocr2/plugins/ccip/execution_plugin.go @@ -38,7 +38,7 @@ import ( ) // TODO pass context? -func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) (*ExecutionPluginStaticConfig, *BackfillArgs, error) { +func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer) (*ExecutionPluginStaticConfig, *BackfillArgs, error) { if jb.OCR2OracleSpec == nil { return nil, nil, errors.New("spec is nil") } @@ -151,7 +151,7 @@ func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.Lega } func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) { - execPluginConfig, backfillArgs, err := jobSpecToExecPluginConfig(lggr, jb, chainSet, qopts...) + execPluginConfig, backfillArgs, err := jobSpecToExecPluginConfig(lggr, jb, chainSet) if err != nil { return nil, err } @@ -166,7 +166,11 @@ func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyCha if err1 := execPluginConfig.commitStoreReader.RegisterFilters(qopts...); err1 != nil { return nil, err1 } - + for _, dp := range execPluginConfig.tokenDataProviders { + if err1 := dp.RegisterFilters(qopts...); err1 != nil { + return nil, err1 + } + } destChainID, err := chainselectors.ChainIdFromSelector(execPluginConfig.destChainSelector) if err != nil { return nil, err @@ -226,8 +230,9 @@ func getTokenDataProviders(lggr logger.Logger, pluginConfig ccipconfig.Execution // UnregisterExecPluginLpFilters unregisters all the registered filters for both source and dest chains. // See comment in UnregisterCommitPluginLpFilters +// It MUST mirror the filters registered in NewExecutionServices. func UnregisterExecPluginLpFilters(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) error { - execPluginConfig, _, err := jobSpecToExecPluginConfig(lggr, jb, chainSet, qopts...) + execPluginConfig, _, err := jobSpecToExecPluginConfig(lggr, jb, chainSet) if err != nil { return err } diff --git a/core/services/ocr2/plugins/ccip/integration_test.go b/core/services/ocr2/plugins/ccip/integration_test.go index 7c626d3f23..67f4ad5d6a 100644 --- a/core/services/ocr2/plugins/ccip/integration_test.go +++ b/core/services/ocr2/plugins/ccip/integration_test.go @@ -27,7 +27,7 @@ func TestIntegration_CCIP(t *testing.T) { defer linkUSD.Close() defer ethUSD.Close() - jobParams := ccipTH.SetUpNodesAndJobs(t, tokenPricesUSDPipeline) + jobParams := ccipTH.SetUpNodesAndJobs(t, tokenPricesUSDPipeline, "") currentSeqNum := 1 @@ -264,7 +264,7 @@ func TestIntegration_CCIP(t *testing.T) { ccipTH.Dest.Chain.Commit() // create new jobs - jobParams = ccipTH.NewCCIPJobSpecParams(tokenPricesUSDPipeline, newConfigBlock) + jobParams = ccipTH.NewCCIPJobSpecParams(tokenPricesUSDPipeline, newConfigBlock, "") jobParams.Version = "v2" jobParams.SourceStartBlock = srcStartBlock ccipTH.AddAllJobs(t, jobParams) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/usdc_reader_mock.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/usdc_reader_mock.go index 76ab22775a..50e7484317 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/usdc_reader_mock.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/usdc_reader_mock.go @@ -63,6 +63,26 @@ func (_m *USDCReader) GetLastUSDCMessagePriorToLogIndexInTx(ctx context.Context, return r0, r1 } +// RegisterFilters provides a mock function with given fields: qopts +func (_m *USDCReader) RegisterFilters(qopts ...pg.QOpt) error { + _va := make([]interface{}, len(qopts)) + for _i := range qopts { + _va[_i] = qopts[_i] + } + var _ca []interface{} + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 error + if rf, ok := ret.Get(0).(func(...pg.QOpt) error); ok { + r0 = rf(qopts...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // NewUSDCReader creates a new instance of USDCReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewUSDCReader(t interface { diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader.go index c8021300f0..296be131d2 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader.go @@ -2,7 +2,6 @@ package ccipdata import ( "context" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" @@ -20,6 +19,7 @@ const ( //go:generate mockery --quiet --name USDCReader --filename usdc_reader_mock.go --case=underscore type USDCReader interface { + RegisterFilters(qopts ...pg.QOpt) error // GetLastUSDCMessagePriorToLogIndexInTx returns the last USDC message that was sent before the provided log index in the given transaction. GetLastUSDCMessagePriorToLogIndexInTx(ctx context.Context, logIndex int64, txHash common.Hash) ([]byte, error) Close(qopts ...pg.QOpt) error @@ -28,12 +28,16 @@ type USDCReader interface { type USDCReaderImpl struct { usdcMessageSent common.Hash lp logpoller.LogPoller - filterName string + filter logpoller.Filter lggr logger.Logger } func (u *USDCReaderImpl) Close(qopts ...pg.QOpt) error { - return u.lp.UnregisterFilter(u.filterName, qopts...) + return u.lp.UnregisterFilter(u.filter.Name, qopts...) +} + +func (u *USDCReaderImpl) RegisterFilters(qopts ...pg.QOpt) error { + return u.lp.RegisterFilter(u.filter, qopts...) } // usdcPayload has to match the onchain event emitted by the USDC message transmitter @@ -79,19 +83,16 @@ func (u *USDCReaderImpl) GetLastUSDCMessagePriorToLogIndexInTx(ctx context.Conte } func NewUSDCReader(lggr logger.Logger, transmitter common.Address, lp logpoller.LogPoller) (*USDCReaderImpl, error) { - filterName := logpoller.FilterName(MESSAGE_SENT_FILTER_NAME, transmitter.Hex()) eventSig := utils.Keccak256Fixed([]byte("MessageSent(bytes)")) - if err := lp.RegisterFilter(logpoller.Filter{ - Name: filterName, + filter := logpoller.Filter{ + Name: logpoller.FilterName(MESSAGE_SENT_FILTER_NAME, transmitter.Hex()), EventSigs: []common.Hash{eventSig}, Addresses: []common.Address{transmitter}, - }); err != nil { - return nil, err } return &USDCReaderImpl{ lggr: lggr, lp: lp, usdcMessageSent: eventSig, - filterName: filterName, + filter: filter, }, nil } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_internal_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_internal_test.go index d10f9a7af8..249d26bc0f 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_internal_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_internal_test.go @@ -26,7 +26,6 @@ func TestLogPollerClient_GetLastUSDCMessagePriorToLogIndexInTx(t *testing.T) { t.Run("multiple found", func(t *testing.T) { lp := lpmocks.NewLogPoller(t) - lp.On("RegisterFilter", mock.Anything).Return(nil) u, err := NewUSDCReader(lggr, utils.RandomAddress(), lp) require.NoError(t, err) lp.On("IndexedLogsByTxHash", @@ -48,7 +47,6 @@ func TestLogPollerClient_GetLastUSDCMessagePriorToLogIndexInTx(t *testing.T) { t.Run("none found", func(t *testing.T) { lp := lpmocks.NewLogPoller(t) - lp.On("RegisterFilter", mock.Anything).Return(nil) u, err := NewUSDCReader(lggr, utils.RandomAddress(), lp) require.NoError(t, err) lp.On("IndexedLogsByTxHash", diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_test.go index dd99dad33d..c518ef1f1d 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_test.go @@ -15,6 +15,7 @@ func TestUSDCReaderFilters(t *testing.T) { ccipdata.AssertFilterRegistration(t, new(lpmocks.LogPoller), func(lp *lpmocks.LogPoller, addr common.Address) ccipdata.Closer { c, err := ccipdata.NewUSDCReader(logger.TestLogger(t), addr, lp) require.NoError(t, err) + require.NoError(t, c.RegisterFilters()) return c }, 1) } diff --git a/core/services/ocr2/plugins/ccip/testhelpers/integration/chainlink.go b/core/services/ocr2/plugins/ccip/testhelpers/integration/chainlink.go index 124c3986ee..7f7c7a4e14 100644 --- a/core/services/ocr2/plugins/ccip/testhelpers/integration/chainlink.go +++ b/core/services/ocr2/plugins/ccip/testhelpers/integration/chainlink.go @@ -91,6 +91,12 @@ const ( [pluginConfig] destStartBlock = 50 + + [pluginConfig.USDCConfig] + AttestationAPI = "http://blah.com" + SourceMessageTransmitterAddress = "%s" + SourceTokenAddress = "%s" + AttestationAPITimeoutSeconds = 10 ` commitSpecTemplate = ` type = "offchainreporting2" @@ -609,6 +615,8 @@ func (c *CCIPIntegrationTestHarness) ApproveJobSpecs(t *testing.T, jobParams CCI 1, node.KeyBundle.ID(), node.Transmitter.Hex(), + utils.RandomAddress().String(), + utils.RandomAddress().String(), ) execId, err := f.ProposeJob(ctx, &execSpec) require.NoError(t, err) @@ -871,13 +879,13 @@ func (c *CCIPIntegrationTestHarness) SetupAndStartNodes(ctx context.Context, t * return bootstrapNode, nodes, configBlock } -func (c *CCIPIntegrationTestHarness) SetUpNodesAndJobs(t *testing.T, pricePipeline string) CCIPJobSpecParams { +func (c *CCIPIntegrationTestHarness) SetUpNodesAndJobs(t *testing.T, pricePipeline string, usdcAttestationAPI string) CCIPJobSpecParams { // setup Jobs ctx := context.Background() // Starts nodes and configures them in the OCR contracts. bootstrapNode, _, configBlock := c.SetupAndStartNodes(ctx, t, generateRandomBootstrapPort()) - jobParams := c.NewCCIPJobSpecParams(pricePipeline, configBlock) + jobParams := c.NewCCIPJobSpecParams(pricePipeline, configBlock, usdcAttestationAPI) // Add the bootstrap job c.Bootstrap.AddBootstrapJob(t, jobParams.BootstrapJob(c.Dest.CommitStore.Address().Hex())) diff --git a/core/services/ocr2/plugins/ccip/testhelpers/integration/jobspec.go b/core/services/ocr2/plugins/ccip/testhelpers/integration/jobspec.go index 13e75f8084..323cc83dc6 100644 --- a/core/services/ocr2/plugins/ccip/testhelpers/integration/jobspec.go +++ b/core/services/ocr2/plugins/ccip/testhelpers/integration/jobspec.go @@ -2,6 +2,7 @@ package integrationtesthelpers import ( "fmt" + "github.com/smartcontractkit/chainlink/v2/core/utils" "time" "github.com/ethereum/go-ethereum/common" @@ -42,6 +43,7 @@ type CCIPJobSpecParams struct { TokenPricesUSDPipeline string SourceStartBlock uint64 DestStartBlock uint64 + USDCAttestationAPI string P2PV2Bootstrappers pq.StringArray } @@ -137,6 +139,12 @@ func (params CCIPJobSpecParams) ExecutionJobSpec() (*client.OCR2TaskJobSpec, err if params.SourceStartBlock > 0 { ocrSpec.PluginConfig["sourceStartBlock"] = params.SourceStartBlock } + if params.USDCAttestationAPI != "" { + ocrSpec.PluginConfig["USDCConfig.AttestationAPI"] = fmt.Sprintf("\"%s\"", params.USDCAttestationAPI) + ocrSpec.PluginConfig["USDCConfig.SourceTokenAddress"] = fmt.Sprintf("\"%s\"", utils.RandomAddress().String()) + ocrSpec.PluginConfig["USDCConfig.SourceMessageTransmitterAddress"] = fmt.Sprintf("\"%s\"", utils.RandomAddress().String()) + ocrSpec.PluginConfig["USDCConfig.AttestationAPITimeoutSeconds"] = 5 + } return &client.OCR2TaskJobSpec{ OCR2OracleSpec: ocrSpec, JobType: "offchainreporting2", @@ -161,7 +169,7 @@ func (params CCIPJobSpecParams) BootstrapJob(contractID string) *client.OCR2Task } } -func (c *CCIPIntegrationTestHarness) NewCCIPJobSpecParams(tokenPricesUSDPipeline string, configBlock int64) CCIPJobSpecParams { +func (c *CCIPIntegrationTestHarness) NewCCIPJobSpecParams(tokenPricesUSDPipeline string, configBlock int64, usdcAttestationAPI string) CCIPJobSpecParams { return CCIPJobSpecParams{ CommitStore: c.Dest.CommitStore.Address(), OffRamp: c.Dest.OffRamp.Address(), @@ -170,5 +178,6 @@ func (c *CCIPIntegrationTestHarness) NewCCIPJobSpecParams(tokenPricesUSDPipeline DestChainName: "SimulatedDest", TokenPricesUSDPipeline: tokenPricesUSDPipeline, DestStartBlock: uint64(configBlock), + USDCAttestationAPI: usdcAttestationAPI, } } diff --git a/core/services/ocr2/plugins/ccip/tokendata/reader.go b/core/services/ocr2/plugins/ccip/tokendata/reader.go index 2ea32be998..07558cad5f 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/reader.go +++ b/core/services/ocr2/plugins/ccip/tokendata/reader.go @@ -20,5 +20,6 @@ var ( type Reader interface { // ReadTokenData returns the attestation bytes if ready, and throws an error if not ready. ReadTokenData(ctx context.Context, msg internal.EVM2EVMOnRampCCIPSendRequestedWithMeta) (tokenData []byte, err error) + RegisterFilters(qopts ...pg.QOpt) error Close(qopts ...pg.QOpt) error } diff --git a/core/services/ocr2/plugins/ccip/tokendata/reader_mock.go b/core/services/ocr2/plugins/ccip/tokendata/reader_mock.go index f88869bae7..c5d4717c3c 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/reader_mock.go +++ b/core/services/ocr2/plugins/ccip/tokendata/reader_mock.go @@ -62,6 +62,26 @@ func (_m *MockReader) ReadTokenData(ctx context.Context, msg internal.EVM2EVMOnR return r0, r1 } +// RegisterFilters provides a mock function with given fields: qopts +func (_m *MockReader) RegisterFilters(qopts ...pg.QOpt) error { + _va := make([]interface{}, len(qopts)) + for _i := range qopts { + _va[_i] = qopts[_i] + } + var _ca []interface{} + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 error + if rf, ok := ret.Get(0).(func(...pg.QOpt) error); ok { + r0 = rf(qopts...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // NewMockReader creates a new instance of MockReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockReader(t interface { diff --git a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go index 7ef4833be6..7cf98a9a8b 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go +++ b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go @@ -172,3 +172,7 @@ func (s *TokenDataReader) callAttestationApi(ctx context.Context, usdcMessageHas func (s *TokenDataReader) Close(qopts ...pg.QOpt) error { return s.usdcReader.Close(qopts...) } + +func (s *TokenDataReader) RegisterFilters(qopts ...pg.QOpt) error { + return s.usdcReader.RegisterFilters(qopts...) +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go index 8bd314992f..3bac237511 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go +++ b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go @@ -57,7 +57,6 @@ func TestUSDCReader_callAttestationApiMock(t *testing.T) { lggr := logger.TestLogger(t) lp := mocks.NewLogPoller(t) - lp.On("RegisterFilter", mock.Anything).Return(nil) usdcReader, err := ccipdata.NewUSDCReader(lggr, mockMsgTransmitter, lp) require.NoError(t, err) usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, 0) @@ -160,10 +159,11 @@ func TestUSDCReader_callAttestationApiMockError(t *testing.T) { lggr := logger.TestLogger(t) lp := mocks.NewLogPoller(t) - lp.On("RegisterFilter", mock.Anything).Return(nil) usdcReader, err := ccipdata.NewUSDCReader(lggr, mockMsgTransmitter, lp) require.NoError(t, err) usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, test.customTimeoutSeconds) + lp.On("RegisterFilter", mock.Anything).Return(nil) + require.NoError(t, usdcReader.RegisterFilters()) parentCtx, cancel := context.WithTimeout(context.Background(), time.Duration(test.parentTimeoutSeconds)*time.Second) defer cancel() @@ -174,6 +174,8 @@ func TestUSDCReader_callAttestationApiMockError(t *testing.T) { if test.expectedError != nil { require.True(t, errors.Is(err, test.expectedError)) } + lp.On("UnregisterFilter", mock.Anything).Return(nil) + require.NoError(t, usdcReader.Close()) }) } } diff --git a/core/services/ocr2/validate/validate.go b/core/services/ocr2/validate/validate.go index 2f1e60c9d7..4b22458379 100644 --- a/core/services/ocr2/validate/validate.go +++ b/core/services/ocr2/validate/validate.go @@ -30,7 +30,7 @@ func ValidatedOracleSpecToml(config OCR2Config, insConf InsecureConfig, tomlStri var spec job.OCR2OracleSpec tree, err := toml.Load(tomlString) if err != nil { - return jb, pkgerrors.Wrap(err, "toml error on load") + return jb, pkgerrors.Wrapf(err, "toml error on load %v", tomlString) } // Note this validates all the fields which implement an UnmarshalText // i.e. TransmitterAddress, PeerID...