Skip to content

Commit

Permalink
USDC filter fix (#383)
Browse files Browse the repository at this point in the history
  • Loading branch information
connorwstein authored Dec 15, 2023
1 parent 1360be8 commit a33858f
Show file tree
Hide file tree
Showing 15 changed files with 94 additions and 25 deletions.
2 changes: 1 addition & 1 deletion core/services/metatx/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ merge [type=merge left="{}" right="{\\\"%s\\\":$(link_parse), \\\"%s\\\":$(eth_p
defer wrappedDestTokenUSD.Close()
defer bankERC20USD.Close()

ccipContracts.SetUpNodesAndJobs(t, tokenPricesUSDPipeline)
ccipContracts.SetUpNodesAndJobs(t, tokenPricesUSDPipeline, "")

geCurrentSeqNum := 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func Test_CLOSpecApprovalFlow(t *testing.T) {
defer ethUSD.Close()

// Create initial job specs
jobParams := ccipTH.SetUpNodesAndJobs(t, tokenPricesUSDPipeline)
jobParams := ccipTH.SetUpNodesAndJobs(t, tokenPricesUSDPipeline, "http://blah.com")
ccipTH.SetupFeedsManager(t)

// Propose and approve new specs
Expand Down
13 changes: 9 additions & 4 deletions core/services/ocr2/plugins/ccip/execution_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
)

// TODO pass context?
func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) (*ExecutionPluginStaticConfig, *BackfillArgs, error) {
func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer) (*ExecutionPluginStaticConfig, *BackfillArgs, error) {
if jb.OCR2OracleSpec == nil {
return nil, nil, errors.New("spec is nil")
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.Lega
}

func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
execPluginConfig, backfillArgs, err := jobSpecToExecPluginConfig(lggr, jb, chainSet, qopts...)
execPluginConfig, backfillArgs, err := jobSpecToExecPluginConfig(lggr, jb, chainSet)
if err != nil {
return nil, err
}
Expand All @@ -166,7 +166,11 @@ func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyCha
if err1 := execPluginConfig.commitStoreReader.RegisterFilters(qopts...); err1 != nil {
return nil, err1
}

for _, dp := range execPluginConfig.tokenDataProviders {
if err1 := dp.RegisterFilters(qopts...); err1 != nil {
return nil, err1
}
}
destChainID, err := chainselectors.ChainIdFromSelector(execPluginConfig.destChainSelector)
if err != nil {
return nil, err
Expand Down Expand Up @@ -226,8 +230,9 @@ func getTokenDataProviders(lggr logger.Logger, pluginConfig ccipconfig.Execution

// UnregisterExecPluginLpFilters unregisters all the registered filters for both source and dest chains.
// See comment in UnregisterCommitPluginLpFilters
// It MUST mirror the filters registered in NewExecutionServices.
func UnregisterExecPluginLpFilters(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) error {
execPluginConfig, _, err := jobSpecToExecPluginConfig(lggr, jb, chainSet, qopts...)
execPluginConfig, _, err := jobSpecToExecPluginConfig(lggr, jb, chainSet)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions core/services/ocr2/plugins/ccip/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestIntegration_CCIP(t *testing.T) {
defer linkUSD.Close()
defer ethUSD.Close()

jobParams := ccipTH.SetUpNodesAndJobs(t, tokenPricesUSDPipeline)
jobParams := ccipTH.SetUpNodesAndJobs(t, tokenPricesUSDPipeline, "")

currentSeqNum := 1

Expand Down Expand Up @@ -264,7 +264,7 @@ func TestIntegration_CCIP(t *testing.T) {
ccipTH.Dest.Chain.Commit()

// create new jobs
jobParams = ccipTH.NewCCIPJobSpecParams(tokenPricesUSDPipeline, newConfigBlock)
jobParams = ccipTH.NewCCIPJobSpecParams(tokenPricesUSDPipeline, newConfigBlock, "")
jobParams.Version = "v2"
jobParams.SourceStartBlock = srcStartBlock
ccipTH.AddAllJobs(t, jobParams)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 10 additions & 9 deletions core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ccipdata

import (
"context"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
Expand All @@ -20,6 +19,7 @@ const (

//go:generate mockery --quiet --name USDCReader --filename usdc_reader_mock.go --case=underscore
type USDCReader interface {
RegisterFilters(qopts ...pg.QOpt) error
// GetLastUSDCMessagePriorToLogIndexInTx returns the last USDC message that was sent before the provided log index in the given transaction.
GetLastUSDCMessagePriorToLogIndexInTx(ctx context.Context, logIndex int64, txHash common.Hash) ([]byte, error)
Close(qopts ...pg.QOpt) error
Expand All @@ -28,12 +28,16 @@ type USDCReader interface {
type USDCReaderImpl struct {
usdcMessageSent common.Hash
lp logpoller.LogPoller
filterName string
filter logpoller.Filter
lggr logger.Logger
}

func (u *USDCReaderImpl) Close(qopts ...pg.QOpt) error {
return u.lp.UnregisterFilter(u.filterName, qopts...)
return u.lp.UnregisterFilter(u.filter.Name, qopts...)
}

func (u *USDCReaderImpl) RegisterFilters(qopts ...pg.QOpt) error {
return u.lp.RegisterFilter(u.filter, qopts...)
}

// usdcPayload has to match the onchain event emitted by the USDC message transmitter
Expand Down Expand Up @@ -79,19 +83,16 @@ func (u *USDCReaderImpl) GetLastUSDCMessagePriorToLogIndexInTx(ctx context.Conte
}

func NewUSDCReader(lggr logger.Logger, transmitter common.Address, lp logpoller.LogPoller) (*USDCReaderImpl, error) {
filterName := logpoller.FilterName(MESSAGE_SENT_FILTER_NAME, transmitter.Hex())
eventSig := utils.Keccak256Fixed([]byte("MessageSent(bytes)"))
if err := lp.RegisterFilter(logpoller.Filter{
Name: filterName,
filter := logpoller.Filter{
Name: logpoller.FilterName(MESSAGE_SENT_FILTER_NAME, transmitter.Hex()),
EventSigs: []common.Hash{eventSig},
Addresses: []common.Address{transmitter},
}); err != nil {
return nil, err
}
return &USDCReaderImpl{
lggr: lggr,
lp: lp,
usdcMessageSent: eventSig,
filterName: filterName,
filter: filter,
}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func TestLogPollerClient_GetLastUSDCMessagePriorToLogIndexInTx(t *testing.T) {

t.Run("multiple found", func(t *testing.T) {
lp := lpmocks.NewLogPoller(t)
lp.On("RegisterFilter", mock.Anything).Return(nil)
u, err := NewUSDCReader(lggr, utils.RandomAddress(), lp)
require.NoError(t, err)
lp.On("IndexedLogsByTxHash",
Expand All @@ -48,7 +47,6 @@ func TestLogPollerClient_GetLastUSDCMessagePriorToLogIndexInTx(t *testing.T) {

t.Run("none found", func(t *testing.T) {
lp := lpmocks.NewLogPoller(t)
lp.On("RegisterFilter", mock.Anything).Return(nil)
u, err := NewUSDCReader(lggr, utils.RandomAddress(), lp)
require.NoError(t, err)
lp.On("IndexedLogsByTxHash",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestUSDCReaderFilters(t *testing.T) {
ccipdata.AssertFilterRegistration(t, new(lpmocks.LogPoller), func(lp *lpmocks.LogPoller, addr common.Address) ccipdata.Closer {
c, err := ccipdata.NewUSDCReader(logger.TestLogger(t), addr, lp)
require.NoError(t, err)
require.NoError(t, c.RegisterFilters())
return c
}, 1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ const (
[pluginConfig]
destStartBlock = 50
[pluginConfig.USDCConfig]
AttestationAPI = "http://blah.com"
SourceMessageTransmitterAddress = "%s"
SourceTokenAddress = "%s"
AttestationAPITimeoutSeconds = 10
`
commitSpecTemplate = `
type = "offchainreporting2"
Expand Down Expand Up @@ -609,6 +615,8 @@ func (c *CCIPIntegrationTestHarness) ApproveJobSpecs(t *testing.T, jobParams CCI
1,
node.KeyBundle.ID(),
node.Transmitter.Hex(),
utils.RandomAddress().String(),
utils.RandomAddress().String(),
)
execId, err := f.ProposeJob(ctx, &execSpec)
require.NoError(t, err)
Expand Down Expand Up @@ -871,13 +879,13 @@ func (c *CCIPIntegrationTestHarness) SetupAndStartNodes(ctx context.Context, t *
return bootstrapNode, nodes, configBlock
}

func (c *CCIPIntegrationTestHarness) SetUpNodesAndJobs(t *testing.T, pricePipeline string) CCIPJobSpecParams {
func (c *CCIPIntegrationTestHarness) SetUpNodesAndJobs(t *testing.T, pricePipeline string, usdcAttestationAPI string) CCIPJobSpecParams {
// setup Jobs
ctx := context.Background()
// Starts nodes and configures them in the OCR contracts.
bootstrapNode, _, configBlock := c.SetupAndStartNodes(ctx, t, generateRandomBootstrapPort())

jobParams := c.NewCCIPJobSpecParams(pricePipeline, configBlock)
jobParams := c.NewCCIPJobSpecParams(pricePipeline, configBlock, usdcAttestationAPI)

// Add the bootstrap job
c.Bootstrap.AddBootstrapJob(t, jobParams.BootstrapJob(c.Dest.CommitStore.Address().Hex()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package integrationtesthelpers

import (
"fmt"
"github.com/smartcontractkit/chainlink/v2/core/utils"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -42,6 +43,7 @@ type CCIPJobSpecParams struct {
TokenPricesUSDPipeline string
SourceStartBlock uint64
DestStartBlock uint64
USDCAttestationAPI string
P2PV2Bootstrappers pq.StringArray
}

Expand Down Expand Up @@ -137,6 +139,12 @@ func (params CCIPJobSpecParams) ExecutionJobSpec() (*client.OCR2TaskJobSpec, err
if params.SourceStartBlock > 0 {
ocrSpec.PluginConfig["sourceStartBlock"] = params.SourceStartBlock
}
if params.USDCAttestationAPI != "" {
ocrSpec.PluginConfig["USDCConfig.AttestationAPI"] = fmt.Sprintf("\"%s\"", params.USDCAttestationAPI)
ocrSpec.PluginConfig["USDCConfig.SourceTokenAddress"] = fmt.Sprintf("\"%s\"", utils.RandomAddress().String())
ocrSpec.PluginConfig["USDCConfig.SourceMessageTransmitterAddress"] = fmt.Sprintf("\"%s\"", utils.RandomAddress().String())
ocrSpec.PluginConfig["USDCConfig.AttestationAPITimeoutSeconds"] = 5
}
return &client.OCR2TaskJobSpec{
OCR2OracleSpec: ocrSpec,
JobType: "offchainreporting2",
Expand All @@ -161,7 +169,7 @@ func (params CCIPJobSpecParams) BootstrapJob(contractID string) *client.OCR2Task
}
}

func (c *CCIPIntegrationTestHarness) NewCCIPJobSpecParams(tokenPricesUSDPipeline string, configBlock int64) CCIPJobSpecParams {
func (c *CCIPIntegrationTestHarness) NewCCIPJobSpecParams(tokenPricesUSDPipeline string, configBlock int64, usdcAttestationAPI string) CCIPJobSpecParams {
return CCIPJobSpecParams{
CommitStore: c.Dest.CommitStore.Address(),
OffRamp: c.Dest.OffRamp.Address(),
Expand All @@ -170,5 +178,6 @@ func (c *CCIPIntegrationTestHarness) NewCCIPJobSpecParams(tokenPricesUSDPipeline
DestChainName: "SimulatedDest",
TokenPricesUSDPipeline: tokenPricesUSDPipeline,
DestStartBlock: uint64(configBlock),
USDCAttestationAPI: usdcAttestationAPI,
}
}
1 change: 1 addition & 0 deletions core/services/ocr2/plugins/ccip/tokendata/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ var (
type Reader interface {
// ReadTokenData returns the attestation bytes if ready, and throws an error if not ready.
ReadTokenData(ctx context.Context, msg internal.EVM2EVMOnRampCCIPSendRequestedWithMeta) (tokenData []byte, err error)
RegisterFilters(qopts ...pg.QOpt) error
Close(qopts ...pg.QOpt) error
}
20 changes: 20 additions & 0 deletions core/services/ocr2/plugins/ccip/tokendata/reader_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,7 @@ func (s *TokenDataReader) callAttestationApi(ctx context.Context, usdcMessageHas
func (s *TokenDataReader) Close(qopts ...pg.QOpt) error {
return s.usdcReader.Close(qopts...)
}

func (s *TokenDataReader) RegisterFilters(qopts ...pg.QOpt) error {
return s.usdcReader.RegisterFilters(qopts...)
}
6 changes: 4 additions & 2 deletions core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func TestUSDCReader_callAttestationApiMock(t *testing.T) {

lggr := logger.TestLogger(t)
lp := mocks.NewLogPoller(t)
lp.On("RegisterFilter", mock.Anything).Return(nil)
usdcReader, err := ccipdata.NewUSDCReader(lggr, mockMsgTransmitter, lp)
require.NoError(t, err)
usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, 0)
Expand Down Expand Up @@ -160,10 +159,11 @@ func TestUSDCReader_callAttestationApiMockError(t *testing.T) {

lggr := logger.TestLogger(t)
lp := mocks.NewLogPoller(t)
lp.On("RegisterFilter", mock.Anything).Return(nil)
usdcReader, err := ccipdata.NewUSDCReader(lggr, mockMsgTransmitter, lp)
require.NoError(t, err)
usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, test.customTimeoutSeconds)
lp.On("RegisterFilter", mock.Anything).Return(nil)
require.NoError(t, usdcReader.RegisterFilters())

parentCtx, cancel := context.WithTimeout(context.Background(), time.Duration(test.parentTimeoutSeconds)*time.Second)
defer cancel()
Expand All @@ -174,6 +174,8 @@ func TestUSDCReader_callAttestationApiMockError(t *testing.T) {
if test.expectedError != nil {
require.True(t, errors.Is(err, test.expectedError))
}
lp.On("UnregisterFilter", mock.Anything).Return(nil)
require.NoError(t, usdcReader.Close())
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr2/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func ValidatedOracleSpecToml(config OCR2Config, insConf InsecureConfig, tomlStri
var spec job.OCR2OracleSpec
tree, err := toml.Load(tomlString)
if err != nil {
return jb, pkgerrors.Wrap(err, "toml error on load")
return jb, pkgerrors.Wrapf(err, "toml error on load %v", tomlString)
}
// Note this validates all the fields which implement an UnmarshalText
// i.e. TransmitterAddress, PeerID...
Expand Down

0 comments on commit a33858f

Please sign in to comment.