Skip to content

Commit

Permalink
CLO job proposal flow fix (#386)
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara authored Dec 15, 2023
2 parents 2702628 + 76bee1d commit 94fe348
Show file tree
Hide file tree
Showing 18 changed files with 399 additions and 53 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.7.1-ccip1.2.3-release
2.7.1-ccip1.2.4-release
2 changes: 1 addition & 1 deletion core/services/metatx/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ merge [type=merge left="{}" right="{\\\"%s\\\":$(link_parse), \\\"%s\\\":$(eth_p
defer wrappedDestTokenUSD.Close()
defer bankERC20USD.Close()

ccipContracts.SetUpNodesAndJobs(t, tokenPricesUSDPipeline, 29599)
ccipContracts.SetUpNodesAndJobs(t, tokenPricesUSDPipeline, "")

geCurrentSeqNum := 1

Expand Down
58 changes: 58 additions & 0 deletions core/services/ocr2/plugins/ccip/clo_ccip_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package ccip_test

import (
"math/big"
"testing"

"github.com/stretchr/testify/assert"
"github.com/test-go/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/router"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/testhelpers"
integrationtesthelpers "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/testhelpers/integration"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

func Test_CLOSpecApprovalFlow(t *testing.T) {
ccipTH := integrationtesthelpers.SetupCCIPIntegrationTH(t, testhelpers.SourceChainID, testhelpers.SourceChainSelector, testhelpers.DestChainID, testhelpers.DestChainSelector)
tokenPricesUSDPipeline, linkUSD, ethUSD := ccipTH.CreatePricesPipeline(t)
defer linkUSD.Close()
defer ethUSD.Close()

// Create initial job specs
jobParams := ccipTH.SetUpNodesAndJobs(t, tokenPricesUSDPipeline, "http://blah.com")
ccipTH.SetupFeedsManager(t)

// Propose and approve new specs
ccipTH.ApproveJobSpecs(t, jobParams, tokenPricesUSDPipeline)
// TODO generate one more run with propose & approve
// ccipTH.ApproveJobSpecs(t, jobParams)

// Sanity check that CCIP works after CLO flow
currentSeqNum := 1

extraArgs, err := testhelpers.GetEVMExtraArgsV1(big.NewInt(200_003), false)
require.NoError(t, err)

msg := router.ClientEVM2AnyMessage{
Receiver: testhelpers.MustEncodeAddress(t, ccipTH.Dest.Receivers[0].Receiver.Address()),
Data: utils.RandomAddress().Bytes(),
TokenAmounts: []router.ClientEVMTokenAmount{},
FeeToken: ccipTH.Source.LinkToken.Address(),
ExtraArgs: extraArgs,
}
fee, err := ccipTH.Source.Router.GetFee(nil, testhelpers.DestChainSelector, msg)
require.NoError(t, err)

_, err = ccipTH.Source.LinkToken.Approve(ccipTH.Source.User, ccipTH.Source.Router.Address(), new(big.Int).Set(fee))
require.NoError(t, err)
ccipTH.Source.Chain.Commit()

ccipTH.SendRequest(t, msg)
ccipTH.AllNodesHaveReqSeqNum(t, currentSeqNum)
ccipTH.EventuallyReportCommitted(t, currentSeqNum)

executionLogs := ccipTH.AllNodesHaveExecutedSeqNums(t, currentSeqNum, currentSeqNum)
assert.Len(t, executionLogs, 1)
ccipTH.AssertExecState(t, executionLogs[0], testhelpers.ExecutionStateSuccess)
}
13 changes: 9 additions & 4 deletions core/services/ocr2/plugins/ccip/execution_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

// TODO pass context?
func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) (*ExecutionPluginStaticConfig, *BackfillArgs, error) {
func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer) (*ExecutionPluginStaticConfig, *BackfillArgs, error) {
if jb.OCR2OracleSpec == nil {
return nil, nil, errors.New("spec is nil")
}
Expand Down Expand Up @@ -144,7 +144,7 @@ func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.Lega
}

func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
execPluginConfig, backfillArgs, err := jobSpecToExecPluginConfig(lggr, jb, chainSet, qopts...)
execPluginConfig, backfillArgs, err := jobSpecToExecPluginConfig(lggr, jb, chainSet)
if err != nil {
return nil, err
}
Expand All @@ -159,7 +159,11 @@ func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyCha
if err1 := execPluginConfig.commitStoreReader.RegisterFilters(qopts...); err1 != nil {
return nil, err1
}

for _, dp := range execPluginConfig.tokenDataProviders {
if err1 := dp.RegisterFilters(qopts...); err1 != nil {
return nil, err1
}
}
argsNoPlugin.ReportingPluginFactory = promwrapper.NewPromFactory(wrappedPluginFactory, "CCIPExecution", jb.OCR2OracleSpec.Relay, execPluginConfig.destChainEVMID)
argsNoPlugin.Logger = relaylogger.NewOCRWrapper(execPluginConfig.lggr, true, logError)
oracle, err := libocr2.NewOracle(argsNoPlugin)
Expand Down Expand Up @@ -215,8 +219,9 @@ func getTokenDataProviders(lggr logger.Logger, pluginConfig ccipconfig.Execution

// UnregisterExecPluginLpFilters unregisters all the registered filters for both source and dest chains.
// See comment in UnregisterCommitPluginLpFilters
// It MUST mirror the filters registered in NewExecutionServices.
func UnregisterExecPluginLpFilters(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) error {
execPluginConfig, _, err := jobSpecToExecPluginConfig(lggr, jb, chainSet, qopts...)
execPluginConfig, _, err := jobSpecToExecPluginConfig(lggr, jb, chainSet)
if err != nil {
return err
}
Expand Down
27 changes: 3 additions & 24 deletions core/services/ocr2/plugins/ccip/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"
"math/big"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
Expand All @@ -24,30 +22,11 @@ import (

func TestIntegration_CCIP(t *testing.T) {
ccipTH := integrationtesthelpers.SetupCCIPIntegrationTH(t, testhelpers.SourceChainID, testhelpers.SourceChainSelector, testhelpers.DestChainID, testhelpers.DestChainSelector)
linkUSD := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, err := w.Write([]byte(`{"UsdPerLink": "8000000000000000000"}`))
require.NoError(t, err)
}))
ethUSD := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, err := w.Write([]byte(`{"UsdPerETH": "1700000000000000000000"}`))
require.NoError(t, err)
}))
wrapped, err1 := ccipTH.Source.Router.GetWrappedNative(nil)
require.NoError(t, err1)
tokenPricesUSDPipeline := fmt.Sprintf(`
// Price 1
link [type=http method=GET url="%s"];
link_parse [type=jsonparse path="UsdPerLink"];
link->link_parse;
eth [type=http method=GET url="%s"];
eth_parse [type=jsonparse path="UsdPerETH"];
eth->eth_parse;
merge [type=merge left="{}" right="{\\\"%s\\\":$(link_parse), \\\"%s\\\":$(eth_parse)}"];`,
linkUSD.URL, ethUSD.URL, ccipTH.Dest.LinkToken.Address(), wrapped)
tokenPricesUSDPipeline, linkUSD, ethUSD := ccipTH.CreatePricesPipeline(t)
defer linkUSD.Close()
defer ethUSD.Close()

jobParams := ccipTH.SetUpNodesAndJobs(t, tokenPricesUSDPipeline, 19399)
jobParams := ccipTH.SetUpNodesAndJobs(t, tokenPricesUSDPipeline, "")

currentSeqNum := 1

Expand Down Expand Up @@ -284,7 +263,7 @@ merge [type=merge left="{}" right="{\\\"%s\\\":$(link_parse), \\\"%s\\\":$(eth_p
ccipTH.Dest.Chain.Commit()

// create new jobs
jobParams = ccipTH.NewCCIPJobSpecParams(tokenPricesUSDPipeline, newConfigBlock)
jobParams = ccipTH.NewCCIPJobSpecParams(tokenPricesUSDPipeline, newConfigBlock, "")
jobParams.Version = "v2"
jobParams.SourceStartBlock = srcStartBlock
ccipTH.AddAllJobs(t, jobParams)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ func NewOffRampV1_0_0(lggr logger.Logger, addr common.Address, ec client.Client,
Addresses: []common.Address{addr},
},
}

return &OffRampV1_0_0{
offRamp: offRamp,
ec: ec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,6 @@ func NewOnRampV1_0_0(lggr logger.Logger, sourceSelector, destSelector uint64, on
Addresses: []common.Address{onRampAddress},
},
}
if err != nil {
return nil, err
}
return &OnRampV1_0_0{
lggr: lggr,
address: onRampAddress,
Expand Down
19 changes: 10 additions & 9 deletions core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ccipdata

import (
"context"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
Expand All @@ -20,6 +19,7 @@ const (

//go:generate mockery --quiet --name USDCReader --filename usdc_reader_mock.go --case=underscore
type USDCReader interface {
RegisterFilters(qopts ...pg.QOpt) error
// GetLastUSDCMessagePriorToLogIndexInTx returns the last USDC message that was sent before the provided log index in the given transaction.
GetLastUSDCMessagePriorToLogIndexInTx(ctx context.Context, logIndex int64, txHash common.Hash) ([]byte, error)
Close(qopts ...pg.QOpt) error
Expand All @@ -28,12 +28,16 @@ type USDCReader interface {
type USDCReaderImpl struct {
usdcMessageSent common.Hash
lp logpoller.LogPoller
filterName string
filter logpoller.Filter
lggr logger.Logger
}

func (u *USDCReaderImpl) Close(qopts ...pg.QOpt) error {
return u.lp.UnregisterFilter(u.filterName, qopts...)
return u.lp.UnregisterFilter(u.filter.Name, qopts...)
}

func (u *USDCReaderImpl) RegisterFilters(qopts ...pg.QOpt) error {
return u.lp.RegisterFilter(u.filter, qopts...)
}

// usdcPayload has to match the onchain event emitted by the USDC message transmitter
Expand Down Expand Up @@ -79,19 +83,16 @@ func (u *USDCReaderImpl) GetLastUSDCMessagePriorToLogIndexInTx(ctx context.Conte
}

func NewUSDCReader(lggr logger.Logger, transmitter common.Address, lp logpoller.LogPoller) (*USDCReaderImpl, error) {
filterName := logpoller.FilterName(MESSAGE_SENT_FILTER_NAME, transmitter.Hex())
eventSig := utils.Keccak256Fixed([]byte("MessageSent(bytes)"))
if err := lp.RegisterFilter(logpoller.Filter{
Name: filterName,
filter := logpoller.Filter{
Name: logpoller.FilterName(MESSAGE_SENT_FILTER_NAME, transmitter.Hex()),
EventSigs: []common.Hash{eventSig},
Addresses: []common.Address{transmitter},
}); err != nil {
return nil, err
}
return &USDCReaderImpl{
lggr: lggr,
lp: lp,
usdcMessageSent: eventSig,
filterName: filterName,
filter: filter,
}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func TestLogPollerClient_GetLastUSDCMessagePriorToLogIndexInTx(t *testing.T) {

t.Run("multiple found", func(t *testing.T) {
lp := lpmocks.NewLogPoller(t)
lp.On("RegisterFilter", mock.Anything).Return(nil)
u, err := NewUSDCReader(lggr, utils.RandomAddress(), lp)
require.NoError(t, err)
lp.On("IndexedLogsByTxHash",
Expand All @@ -48,7 +47,6 @@ func TestLogPollerClient_GetLastUSDCMessagePriorToLogIndexInTx(t *testing.T) {

t.Run("none found", func(t *testing.T) {
lp := lpmocks.NewLogPoller(t)
lp.On("RegisterFilter", mock.Anything).Return(nil)
u, err := NewUSDCReader(lggr, utils.RandomAddress(), lp)
require.NoError(t, err)
lp.On("IndexedLogsByTxHash",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestUSDCReaderFilters(t *testing.T) {
assertFilterRegistration(t, new(lpmocks.LogPoller), func(lp *lpmocks.LogPoller, addr common.Address) ccipdata.Closer {
c, err := ccipdata.NewUSDCReader(logger.TestLogger(t), addr, lp)
require.NoError(t, err)
require.NoError(t, c.RegisterFilters())
return c
}, 1)
}
Loading

0 comments on commit 94fe348

Please sign in to comment.