diff --git a/VERSION b/VERSION index 0933b09e0c..aa38f6a665 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.7.1-ccip1.2.3-release +2.7.1-ccip1.2.4-release diff --git a/core/services/metatx/integration_test.go b/core/services/metatx/integration_test.go index 748b2fe048..5ce7f3a72e 100644 --- a/core/services/metatx/integration_test.go +++ b/core/services/metatx/integration_test.go @@ -181,7 +181,7 @@ merge [type=merge left="{}" right="{\\\"%s\\\":$(link_parse), \\\"%s\\\":$(eth_p defer wrappedDestTokenUSD.Close() defer bankERC20USD.Close() - ccipContracts.SetUpNodesAndJobs(t, tokenPricesUSDPipeline, 29599) + ccipContracts.SetUpNodesAndJobs(t, tokenPricesUSDPipeline, "") geCurrentSeqNum := 1 diff --git a/core/services/ocr2/plugins/ccip/clo_ccip_integration_test.go b/core/services/ocr2/plugins/ccip/clo_ccip_integration_test.go new file mode 100644 index 0000000000..eeb3cc250a --- /dev/null +++ b/core/services/ocr2/plugins/ccip/clo_ccip_integration_test.go @@ -0,0 +1,58 @@ +package ccip_test + +import ( + "math/big" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/test-go/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/router" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/testhelpers" + integrationtesthelpers "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/testhelpers/integration" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +func Test_CLOSpecApprovalFlow(t *testing.T) { + ccipTH := integrationtesthelpers.SetupCCIPIntegrationTH(t, testhelpers.SourceChainID, testhelpers.SourceChainSelector, testhelpers.DestChainID, testhelpers.DestChainSelector) + tokenPricesUSDPipeline, linkUSD, ethUSD := ccipTH.CreatePricesPipeline(t) + defer linkUSD.Close() + defer ethUSD.Close() + + // Create initial job specs + jobParams := ccipTH.SetUpNodesAndJobs(t, tokenPricesUSDPipeline, "http://blah.com") + ccipTH.SetupFeedsManager(t) + + // Propose and approve new specs + ccipTH.ApproveJobSpecs(t, jobParams, tokenPricesUSDPipeline) + // TODO generate one more run with propose & approve + // ccipTH.ApproveJobSpecs(t, jobParams) + + // Sanity check that CCIP works after CLO flow + currentSeqNum := 1 + + extraArgs, err := testhelpers.GetEVMExtraArgsV1(big.NewInt(200_003), false) + require.NoError(t, err) + + msg := router.ClientEVM2AnyMessage{ + Receiver: testhelpers.MustEncodeAddress(t, ccipTH.Dest.Receivers[0].Receiver.Address()), + Data: utils.RandomAddress().Bytes(), + TokenAmounts: []router.ClientEVMTokenAmount{}, + FeeToken: ccipTH.Source.LinkToken.Address(), + ExtraArgs: extraArgs, + } + fee, err := ccipTH.Source.Router.GetFee(nil, testhelpers.DestChainSelector, msg) + require.NoError(t, err) + + _, err = ccipTH.Source.LinkToken.Approve(ccipTH.Source.User, ccipTH.Source.Router.Address(), new(big.Int).Set(fee)) + require.NoError(t, err) + ccipTH.Source.Chain.Commit() + + ccipTH.SendRequest(t, msg) + ccipTH.AllNodesHaveReqSeqNum(t, currentSeqNum) + ccipTH.EventuallyReportCommitted(t, currentSeqNum) + + executionLogs := ccipTH.AllNodesHaveExecutedSeqNums(t, currentSeqNum, currentSeqNum) + assert.Len(t, executionLogs, 1) + ccipTH.AssertExecState(t, executionLogs[0], testhelpers.ExecutionStateSuccess) +} diff --git a/core/services/ocr2/plugins/ccip/execution_plugin.go b/core/services/ocr2/plugins/ccip/execution_plugin.go index 5256918a27..38f9c3e578 100644 --- a/core/services/ocr2/plugins/ccip/execution_plugin.go +++ b/core/services/ocr2/plugins/ccip/execution_plugin.go @@ -33,7 +33,7 @@ import ( ) // TODO pass context? -func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) (*ExecutionPluginStaticConfig, *BackfillArgs, error) { +func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer) (*ExecutionPluginStaticConfig, *BackfillArgs, error) { if jb.OCR2OracleSpec == nil { return nil, nil, errors.New("spec is nil") } @@ -144,7 +144,7 @@ func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet evm.Lega } func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) { - execPluginConfig, backfillArgs, err := jobSpecToExecPluginConfig(lggr, jb, chainSet, qopts...) + execPluginConfig, backfillArgs, err := jobSpecToExecPluginConfig(lggr, jb, chainSet) if err != nil { return nil, err } @@ -159,7 +159,11 @@ func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyCha if err1 := execPluginConfig.commitStoreReader.RegisterFilters(qopts...); err1 != nil { return nil, err1 } - + for _, dp := range execPluginConfig.tokenDataProviders { + if err1 := dp.RegisterFilters(qopts...); err1 != nil { + return nil, err1 + } + } argsNoPlugin.ReportingPluginFactory = promwrapper.NewPromFactory(wrappedPluginFactory, "CCIPExecution", jb.OCR2OracleSpec.Relay, execPluginConfig.destChainEVMID) argsNoPlugin.Logger = relaylogger.NewOCRWrapper(execPluginConfig.lggr, true, logError) oracle, err := libocr2.NewOracle(argsNoPlugin) @@ -215,8 +219,9 @@ func getTokenDataProviders(lggr logger.Logger, pluginConfig ccipconfig.Execution // UnregisterExecPluginLpFilters unregisters all the registered filters for both source and dest chains. // See comment in UnregisterCommitPluginLpFilters +// It MUST mirror the filters registered in NewExecutionServices. func UnregisterExecPluginLpFilters(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) error { - execPluginConfig, _, err := jobSpecToExecPluginConfig(lggr, jb, chainSet, qopts...) + execPluginConfig, _, err := jobSpecToExecPluginConfig(lggr, jb, chainSet) if err != nil { return err } diff --git a/core/services/ocr2/plugins/ccip/integration_test.go b/core/services/ocr2/plugins/ccip/integration_test.go index ddaef7080c..36ada6f0a6 100644 --- a/core/services/ocr2/plugins/ccip/integration_test.go +++ b/core/services/ocr2/plugins/ccip/integration_test.go @@ -4,8 +4,6 @@ import ( "context" "fmt" "math/big" - "net/http" - "net/http/httptest" "sync" "testing" "time" @@ -24,30 +22,11 @@ import ( func TestIntegration_CCIP(t *testing.T) { ccipTH := integrationtesthelpers.SetupCCIPIntegrationTH(t, testhelpers.SourceChainID, testhelpers.SourceChainSelector, testhelpers.DestChainID, testhelpers.DestChainSelector) - linkUSD := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - _, err := w.Write([]byte(`{"UsdPerLink": "8000000000000000000"}`)) - require.NoError(t, err) - })) - ethUSD := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - _, err := w.Write([]byte(`{"UsdPerETH": "1700000000000000000000"}`)) - require.NoError(t, err) - })) - wrapped, err1 := ccipTH.Source.Router.GetWrappedNative(nil) - require.NoError(t, err1) - tokenPricesUSDPipeline := fmt.Sprintf(` -// Price 1 -link [type=http method=GET url="%s"]; -link_parse [type=jsonparse path="UsdPerLink"]; -link->link_parse; -eth [type=http method=GET url="%s"]; -eth_parse [type=jsonparse path="UsdPerETH"]; -eth->eth_parse; -merge [type=merge left="{}" right="{\\\"%s\\\":$(link_parse), \\\"%s\\\":$(eth_parse)}"];`, - linkUSD.URL, ethUSD.URL, ccipTH.Dest.LinkToken.Address(), wrapped) + tokenPricesUSDPipeline, linkUSD, ethUSD := ccipTH.CreatePricesPipeline(t) defer linkUSD.Close() defer ethUSD.Close() - jobParams := ccipTH.SetUpNodesAndJobs(t, tokenPricesUSDPipeline, 19399) + jobParams := ccipTH.SetUpNodesAndJobs(t, tokenPricesUSDPipeline, "") currentSeqNum := 1 @@ -284,7 +263,7 @@ merge [type=merge left="{}" right="{\\\"%s\\\":$(link_parse), \\\"%s\\\":$(eth_p ccipTH.Dest.Chain.Commit() // create new jobs - jobParams = ccipTH.NewCCIPJobSpecParams(tokenPricesUSDPipeline, newConfigBlock) + jobParams = ccipTH.NewCCIPJobSpecParams(tokenPricesUSDPipeline, newConfigBlock, "") jobParams.Version = "v2" jobParams.SourceStartBlock = srcStartBlock ccipTH.AddAllJobs(t, jobParams) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/usdc_reader_mock.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/usdc_reader_mock.go index 76ab22775a..50e7484317 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/usdc_reader_mock.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/usdc_reader_mock.go @@ -63,6 +63,26 @@ func (_m *USDCReader) GetLastUSDCMessagePriorToLogIndexInTx(ctx context.Context, return r0, r1 } +// RegisterFilters provides a mock function with given fields: qopts +func (_m *USDCReader) RegisterFilters(qopts ...pg.QOpt) error { + _va := make([]interface{}, len(qopts)) + for _i := range qopts { + _va[_i] = qopts[_i] + } + var _ca []interface{} + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 error + if rf, ok := ret.Get(0).(func(...pg.QOpt) error); ok { + r0 = rf(qopts...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // NewUSDCReader creates a new instance of USDCReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewUSDCReader(t interface { diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/offramp_v1_0_0.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/offramp_v1_0_0.go index 2ad48eddc4..c14055db22 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/offramp_v1_0_0.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/offramp_v1_0_0.go @@ -476,6 +476,7 @@ func NewOffRampV1_0_0(lggr logger.Logger, addr common.Address, ec client.Client, Addresses: []common.Address{addr}, }, } + return &OffRampV1_0_0{ offRamp: offRamp, ec: ec, diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_0_0.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_0_0.go index 7049dbe88b..91bbcdbdc7 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_0_0.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/onramp_v1_0_0.go @@ -160,9 +160,6 @@ func NewOnRampV1_0_0(lggr logger.Logger, sourceSelector, destSelector uint64, on Addresses: []common.Address{onRampAddress}, }, } - if err != nil { - return nil, err - } return &OnRampV1_0_0{ lggr: lggr, address: onRampAddress, diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader.go index c8021300f0..296be131d2 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader.go @@ -2,7 +2,6 @@ package ccipdata import ( "context" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" @@ -20,6 +19,7 @@ const ( //go:generate mockery --quiet --name USDCReader --filename usdc_reader_mock.go --case=underscore type USDCReader interface { + RegisterFilters(qopts ...pg.QOpt) error // GetLastUSDCMessagePriorToLogIndexInTx returns the last USDC message that was sent before the provided log index in the given transaction. GetLastUSDCMessagePriorToLogIndexInTx(ctx context.Context, logIndex int64, txHash common.Hash) ([]byte, error) Close(qopts ...pg.QOpt) error @@ -28,12 +28,16 @@ type USDCReader interface { type USDCReaderImpl struct { usdcMessageSent common.Hash lp logpoller.LogPoller - filterName string + filter logpoller.Filter lggr logger.Logger } func (u *USDCReaderImpl) Close(qopts ...pg.QOpt) error { - return u.lp.UnregisterFilter(u.filterName, qopts...) + return u.lp.UnregisterFilter(u.filter.Name, qopts...) +} + +func (u *USDCReaderImpl) RegisterFilters(qopts ...pg.QOpt) error { + return u.lp.RegisterFilter(u.filter, qopts...) } // usdcPayload has to match the onchain event emitted by the USDC message transmitter @@ -79,19 +83,16 @@ func (u *USDCReaderImpl) GetLastUSDCMessagePriorToLogIndexInTx(ctx context.Conte } func NewUSDCReader(lggr logger.Logger, transmitter common.Address, lp logpoller.LogPoller) (*USDCReaderImpl, error) { - filterName := logpoller.FilterName(MESSAGE_SENT_FILTER_NAME, transmitter.Hex()) eventSig := utils.Keccak256Fixed([]byte("MessageSent(bytes)")) - if err := lp.RegisterFilter(logpoller.Filter{ - Name: filterName, + filter := logpoller.Filter{ + Name: logpoller.FilterName(MESSAGE_SENT_FILTER_NAME, transmitter.Hex()), EventSigs: []common.Hash{eventSig}, Addresses: []common.Address{transmitter}, - }); err != nil { - return nil, err } return &USDCReaderImpl{ lggr: lggr, lp: lp, usdcMessageSent: eventSig, - filterName: filterName, + filter: filter, }, nil } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_internal_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_internal_test.go index d10f9a7af8..249d26bc0f 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_internal_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_internal_test.go @@ -26,7 +26,6 @@ func TestLogPollerClient_GetLastUSDCMessagePriorToLogIndexInTx(t *testing.T) { t.Run("multiple found", func(t *testing.T) { lp := lpmocks.NewLogPoller(t) - lp.On("RegisterFilter", mock.Anything).Return(nil) u, err := NewUSDCReader(lggr, utils.RandomAddress(), lp) require.NoError(t, err) lp.On("IndexedLogsByTxHash", @@ -48,7 +47,6 @@ func TestLogPollerClient_GetLastUSDCMessagePriorToLogIndexInTx(t *testing.T) { t.Run("none found", func(t *testing.T) { lp := lpmocks.NewLogPoller(t) - lp.On("RegisterFilter", mock.Anything).Return(nil) u, err := NewUSDCReader(lggr, utils.RandomAddress(), lp) require.NoError(t, err) lp.On("IndexedLogsByTxHash", diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_test.go index e540247eca..727430a05f 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader_test.go @@ -15,6 +15,7 @@ func TestUSDCReaderFilters(t *testing.T) { assertFilterRegistration(t, new(lpmocks.LogPoller), func(lp *lpmocks.LogPoller, addr common.Address) ccipdata.Closer { c, err := ccipdata.NewUSDCReader(logger.TestLogger(t), addr, lp) require.NoError(t, err) + require.NoError(t, c.RegisterFilters()) return c }, 1) } diff --git a/core/services/ocr2/plugins/ccip/testhelpers/integration/chainlink.go b/core/services/ocr2/plugins/ccip/testhelpers/integration/chainlink.go index cc0dd5e64f..c565189245 100644 --- a/core/services/ocr2/plugins/ccip/testhelpers/integration/chainlink.go +++ b/core/services/ocr2/plugins/ccip/testhelpers/integration/chainlink.go @@ -5,7 +5,10 @@ import ( "encoding/hex" "fmt" "math/big" + "math/rand" + "net" "net/http" + "net/http/httptest" "strconv" "strings" "testing" @@ -18,10 +21,11 @@ import ( "github.com/google/uuid" "github.com/onsi/gomega" "github.com/pkg/errors" - "github.com/smartcontractkit/libocr/commontypes" "github.com/smartcontractkit/libocr/offchainreporting2/confighelper" types4 "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/sqlx" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" "k8s.io/utils/pointer" @@ -42,10 +46,15 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/logger/audit" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" + feeds2 "github.com/smartcontractkit/chainlink/v2/core/services/feeds" + feedsMocks "github.com/smartcontractkit/chainlink/v2/core/services/feeds/mocks" + pb "github.com/smartcontractkit/chainlink/v2/core/services/feeds/proto" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/chaintype" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/csakey" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ocr2key" + ksMocks "github.com/smartcontractkit/chainlink/v2/core/services/keystore/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/testhelpers" @@ -55,9 +64,65 @@ import ( evmrelay "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" "github.com/smartcontractkit/chainlink/v2/core/store/models" "github.com/smartcontractkit/chainlink/v2/core/utils" + "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" "github.com/smartcontractkit/chainlink/v2/plugins" ) +const ( + execSpecTemplate = ` + type = "offchainreporting2" + schemaVersion = 1 + name = "ccip-exec-1" + externalJobID = "67ffad71-d90f-4fe3-b4e4-494924b707fb" + forwardingAllowed = false + maxTaskDuration = "0s" + contractID = "%s" + contractConfigConfirmations = 1 + contractConfigTrackerPollInterval = "20s" + ocrKeyBundleID = "%s" + relay = "evm" + pluginType = "ccip-execution" + transmitterID = "%s" + + [relayConfig] + chainID = 1_337 + + [pluginConfig] + destStartBlock = 50 + + [pluginConfig.USDCConfig] + AttestationAPI = "http://blah.com" + SourceMessageTransmitterAddress = "%s" + SourceTokenAddress = "%s" + AttestationAPITimeoutSeconds = 10 + ` + commitSpecTemplate = ` + type = "offchainreporting2" + schemaVersion = 1 + name = "ccip-commit-1" + externalJobID = "13c997cf-1a14-4ab7-9068-07ee6d2afa55" + forwardingAllowed = false + maxTaskDuration = "0s" + contractID = "%s" + contractConfigConfirmations = 1 + contractConfigTrackerPollInterval = "20s" + ocrKeyBundleID = "%s" + relay = "evm" + pluginType = "ccip-commit" + transmitterID = "%s" + + [relayConfig] + chainID = 1_337 + + [pluginConfig] + destStartBlock = 50 + offRamp = "%s" + tokenPricesUSDPipeline = """ + %s + """ + ` +) + type Node struct { App chainlink.Application Transmitter common.Address @@ -276,6 +341,7 @@ func setupNodeCCIP( c.Log.Level = &loglevel c.Feature.CCIP = &trueRef c.Feature.UICSAKeys = &trueRef + c.Feature.FeedsManager = &trueRef c.OCR.Enabled = &falseRef c.OCR.DefaultTransactionQueueDepth = pointer.Uint32(200) c.OCR2.Enabled = &trueRef @@ -313,7 +379,13 @@ func setupNodeCCIP( // signs 1337 see https://github.com/smartcontractkit/chainlink-ccip/blob/a24dd436810250a458d27d8bb3fb78096afeb79c/core/services/ocr2/plugins/ccip/testhelpers/simulated_backend.go#L35 sourceClient := client.NewSimulatedBackendClient(t, sourceChain, sourceChainID) destClient := client.NewSimulatedBackendClient(t, destChain, destChainID) - keyStore := keystore.New(db, utils.FastScryptParams, lggr, config.Database()) + csaKeyStore := ksMocks.NewCSA(t) + + key, err := csakey.NewV2() + require.NoError(t, err) + csaKeyStore.On("GetAll").Return([]csakey.KeyV2{key}, nil) + keyStore := NewKsa(db, lggr, csaKeyStore, config) + simEthKeyStore := testhelpers.EthKeyStoreSim{ ETHKS: keyStore.Eth(), CSAKS: keyStore.CSA(), @@ -437,6 +509,31 @@ func SetupCCIPIntegrationTH(t *testing.T, sourceChainID, sourceChainSelector, de } } +func (c *CCIPIntegrationTestHarness) CreatePricesPipeline(t *testing.T) (string, *httptest.Server, *httptest.Server) { + linkUSD := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, err := w.Write([]byte(`{"UsdPerLink": "8000000000000000000"}`)) + require.NoError(t, err) + })) + ethUSD := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, err := w.Write([]byte(`{"UsdPerETH": "1700000000000000000000"}`)) + require.NoError(t, err) + })) + wrapped, err1 := c.Source.Router.GetWrappedNative(nil) + require.NoError(t, err1) + tokenPricesUSDPipeline := fmt.Sprintf(` +// Price 1 +link [type=http method=GET url="%s"]; +link_parse [type=jsonparse path="UsdPerLink"]; +link->link_parse; +eth [type=http method=GET url="%s"]; +eth_parse [type=jsonparse path="UsdPerETH"]; +eth->eth_parse; +merge [type=merge left="{}" right="{\\\"%s\\\":$(link_parse), \\\"%s\\\":$(eth_parse)}"];`, + linkUSD.URL, ethUSD.URL, c.Dest.LinkToken.Address(), wrapped) + + return tokenPricesUSDPipeline, linkUSD, ethUSD +} + func (c *CCIPIntegrationTestHarness) AddAllJobs(t *testing.T, jobParams CCIPJobSpecParams) { jobParams.OffRamp = c.Dest.OffRamp.Address() @@ -451,6 +548,100 @@ func (c *CCIPIntegrationTestHarness) AddAllJobs(t *testing.T, jobParams CCIPJobS } } +func (c *CCIPIntegrationTestHarness) jobSpecProposal(t *testing.T, specTemplate string, f func() (*ctfClient.OCR2TaskJobSpec, error), feedsManagerId int64, version int32, opts ...any) feeds2.ProposeJobArgs { + spec, err := f() + require.NoError(t, err) + + args := []any{spec.OCR2OracleSpec.ContractID} + args = append(args, opts...) + + return feeds2.ProposeJobArgs{ + FeedsManagerID: feedsManagerId, + RemoteUUID: uuid.New(), + Multiaddrs: nil, + Version: version, + Spec: fmt.Sprintf(specTemplate, args...), + } +} + +func (c *CCIPIntegrationTestHarness) SetupFeedsManager(t *testing.T) { + for _, node := range c.Nodes { + f := node.App.GetFeedsService() + + managers, err := f.ListManagers() + require.NoError(t, err) + if len(managers) > 0 { + // Use at most one feeds manager, don't register if one already exists + continue + } + + secret := utils.RandomBytes32() + pkey, err := crypto.PublicKeyFromHex(hex.EncodeToString(secret[:])) + require.NoError(t, err) + + m := feeds2.RegisterManagerParams{ + Name: "CCIP", + URI: "http://localhost:8080", + PublicKey: *pkey, + } + + _, err = f.RegisterManager(testutils.Context(t), m) + require.NoError(t, err) + + connManager := feedsMocks.NewConnectionsManager(t) + connManager.On("GetClient", mock.Anything).Maybe().Return(NoopFeedsClient{}, nil) + connManager.On("Close").Maybe().Return() + connManager.On("IsConnected", mock.Anything).Maybe().Return(true) + f.Unsafe_SetConnectionsManager(connManager) + } +} + +func (c *CCIPIntegrationTestHarness) ApproveJobSpecs(t *testing.T, jobParams CCIPJobSpecParams, pipeline string) { + ctx := testutils.Context(t) + + for _, node := range c.Nodes { + f := node.App.GetFeedsService() + managers, err := f.ListManagers() + require.NoError(t, err) + require.Len(t, managers, 1, "expected exactly one feeds manager") + + execSpec := c.jobSpecProposal( + t, + execSpecTemplate, + jobParams.ExecutionJobSpec, + managers[0].ID, + 1, + node.KeyBundle.ID(), + node.Transmitter.Hex(), + utils.RandomAddress().String(), + utils.RandomAddress().String(), + ) + execId, err := f.ProposeJob(ctx, &execSpec) + require.NoError(t, err) + + err = f.ApproveSpec(ctx, execId, true) + require.NoError(t, err) + + commitSpec := c.jobSpecProposal( + t, + commitSpecTemplate, + jobParams.CommitJobSpec, + managers[0].ID, + 2, + node.KeyBundle.ID(), + node.Transmitter.Hex(), + jobParams.OffRamp.String(), + pipeline, + ) + + commitId, err := f.ProposeJob(ctx, &commitSpec) + require.NoError(t, err) + + err = f.ApproveSpec(ctx, commitId, true) + require.NoError(t, err) + } +} + func (c *CCIPIntegrationTestHarness) AllNodesHaveReqSeqNum(t *testing.T, seqNum int, onRampOpts ...common.Address) logpoller.Log { var log logpoller.Log nodes := c.Nodes @@ -686,13 +877,13 @@ func (c *CCIPIntegrationTestHarness) SetupAndStartNodes(ctx context.Context, t * return bootstrapNode, nodes, configBlock } -func (c *CCIPIntegrationTestHarness) SetUpNodesAndJobs(t *testing.T, pricePipeline string, bootstrapNodePort int64) CCIPJobSpecParams { +func (c *CCIPIntegrationTestHarness) SetUpNodesAndJobs(t *testing.T, pricePipeline string, usdcAttestationAPI string) CCIPJobSpecParams { // setup Jobs ctx := context.Background() // Starts nodes and configures them in the OCR contracts. - bootstrapNode, _, configBlock := c.SetupAndStartNodes(ctx, t, bootstrapNodePort) + bootstrapNode, _, configBlock := c.SetupAndStartNodes(ctx, t, generateRandomBootstrapPort()) - jobParams := c.NewCCIPJobSpecParams(pricePipeline, configBlock) + jobParams := c.NewCCIPJobSpecParams(pricePipeline, configBlock, usdcAttestationAPI) // Add the bootstrap job c.Bootstrap.AddBootstrapJob(t, jobParams.BootstrapJob(c.Dest.CommitStore.Address().Hex())) @@ -733,3 +924,62 @@ func DecodeExecOnChainConfig(encoded []byte) (ccipdata.ExecOnchainConfigV1_2_0, } return onchainConfig, nil } + +type ksa struct { + keystore.Master + csa keystore.CSA +} + +func (k *ksa) CSA() keystore.CSA { + return k.csa +} + +func NewKsa(db *sqlx.DB, lggr logger.Logger, csa keystore.CSA, config chainlink.GeneralConfig) *ksa { + return &ksa{ + Master: keystore.New(db, utils.FastScryptParams, lggr, config.Database()), + csa: csa, + } +} + +type NoopFeedsClient struct{} + +func (n NoopFeedsClient) ApprovedJob(ctx context.Context, in *pb.ApprovedJobRequest) (*pb.ApprovedJobResponse, error) { + return &pb.ApprovedJobResponse{}, nil +} + +func (n NoopFeedsClient) Healthcheck(ctx context.Context, in *pb.HealthcheckRequest) (*pb.HealthcheckResponse, error) { + return &pb.HealthcheckResponse{}, nil +} + +func (n NoopFeedsClient) UpdateNode(ctx context.Context, in *pb.UpdateNodeRequest) (*pb.UpdateNodeResponse, error) { + return &pb.UpdateNodeResponse{}, nil +} + +func (n NoopFeedsClient) RejectedJob(ctx context.Context, in *pb.RejectedJobRequest) (*pb.RejectedJobResponse, error) { + return &pb.RejectedJobResponse{}, nil +} + +func (n NoopFeedsClient) CancelledJob(ctx context.Context, in *pb.CancelledJobRequest) (*pb.CancelledJobResponse, error) { + return &pb.CancelledJobResponse{}, nil +} + +func generateRandomBootstrapPort() int64 { + minPort := int64(10000) + maxPort := int64(65500) + for { + port := rand.Int63n(maxPort-minPort+1) + minPort + if isPortAvailable(port) { + return port + } + } +} + +func isPortAvailable(port int64) bool { + address := fmt.Sprintf(":%d", port) + listener, err := net.Listen("tcp", address) + if err != nil { + return false + } + listener.Close() + return true +} diff --git a/core/services/ocr2/plugins/ccip/testhelpers/integration/jobspec.go b/core/services/ocr2/plugins/ccip/testhelpers/integration/jobspec.go index 13e75f8084..323cc83dc6 100644 --- a/core/services/ocr2/plugins/ccip/testhelpers/integration/jobspec.go +++ b/core/services/ocr2/plugins/ccip/testhelpers/integration/jobspec.go @@ -2,6 +2,7 @@ package integrationtesthelpers import ( "fmt" + "github.com/smartcontractkit/chainlink/v2/core/utils" "time" "github.com/ethereum/go-ethereum/common" @@ -42,6 +43,7 @@ type CCIPJobSpecParams struct { TokenPricesUSDPipeline string SourceStartBlock uint64 DestStartBlock uint64 + USDCAttestationAPI string P2PV2Bootstrappers pq.StringArray } @@ -137,6 +139,12 @@ func (params CCIPJobSpecParams) ExecutionJobSpec() (*client.OCR2TaskJobSpec, err if params.SourceStartBlock > 0 { ocrSpec.PluginConfig["sourceStartBlock"] = params.SourceStartBlock } + if params.USDCAttestationAPI != "" { + ocrSpec.PluginConfig["USDCConfig.AttestationAPI"] = fmt.Sprintf("\"%s\"", params.USDCAttestationAPI) + ocrSpec.PluginConfig["USDCConfig.SourceTokenAddress"] = fmt.Sprintf("\"%s\"", utils.RandomAddress().String()) + ocrSpec.PluginConfig["USDCConfig.SourceMessageTransmitterAddress"] = fmt.Sprintf("\"%s\"", utils.RandomAddress().String()) + ocrSpec.PluginConfig["USDCConfig.AttestationAPITimeoutSeconds"] = 5 + } return &client.OCR2TaskJobSpec{ OCR2OracleSpec: ocrSpec, JobType: "offchainreporting2", @@ -161,7 +169,7 @@ func (params CCIPJobSpecParams) BootstrapJob(contractID string) *client.OCR2Task } } -func (c *CCIPIntegrationTestHarness) NewCCIPJobSpecParams(tokenPricesUSDPipeline string, configBlock int64) CCIPJobSpecParams { +func (c *CCIPIntegrationTestHarness) NewCCIPJobSpecParams(tokenPricesUSDPipeline string, configBlock int64, usdcAttestationAPI string) CCIPJobSpecParams { return CCIPJobSpecParams{ CommitStore: c.Dest.CommitStore.Address(), OffRamp: c.Dest.OffRamp.Address(), @@ -170,5 +178,6 @@ func (c *CCIPIntegrationTestHarness) NewCCIPJobSpecParams(tokenPricesUSDPipeline DestChainName: "SimulatedDest", TokenPricesUSDPipeline: tokenPricesUSDPipeline, DestStartBlock: uint64(configBlock), + USDCAttestationAPI: usdcAttestationAPI, } } diff --git a/core/services/ocr2/plugins/ccip/tokendata/reader.go b/core/services/ocr2/plugins/ccip/tokendata/reader.go index 2ea32be998..07558cad5f 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/reader.go +++ b/core/services/ocr2/plugins/ccip/tokendata/reader.go @@ -20,5 +20,6 @@ var ( type Reader interface { // ReadTokenData returns the attestation bytes if ready, and throws an error if not ready. ReadTokenData(ctx context.Context, msg internal.EVM2EVMOnRampCCIPSendRequestedWithMeta) (tokenData []byte, err error) + RegisterFilters(qopts ...pg.QOpt) error Close(qopts ...pg.QOpt) error } diff --git a/core/services/ocr2/plugins/ccip/tokendata/reader_mock.go b/core/services/ocr2/plugins/ccip/tokendata/reader_mock.go index f88869bae7..c5d4717c3c 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/reader_mock.go +++ b/core/services/ocr2/plugins/ccip/tokendata/reader_mock.go @@ -62,6 +62,26 @@ func (_m *MockReader) ReadTokenData(ctx context.Context, msg internal.EVM2EVMOnR return r0, r1 } +// RegisterFilters provides a mock function with given fields: qopts +func (_m *MockReader) RegisterFilters(qopts ...pg.QOpt) error { + _va := make([]interface{}, len(qopts)) + for _i := range qopts { + _va[_i] = qopts[_i] + } + var _ca []interface{} + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 error + if rf, ok := ret.Get(0).(func(...pg.QOpt) error); ok { + r0 = rf(qopts...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // NewMockReader creates a new instance of MockReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockReader(t interface { diff --git a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go index 7ef4833be6..7cf98a9a8b 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go +++ b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go @@ -172,3 +172,7 @@ func (s *TokenDataReader) callAttestationApi(ctx context.Context, usdcMessageHas func (s *TokenDataReader) Close(qopts ...pg.QOpt) error { return s.usdcReader.Close(qopts...) } + +func (s *TokenDataReader) RegisterFilters(qopts ...pg.QOpt) error { + return s.usdcReader.RegisterFilters(qopts...) +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go index 8bd314992f..3bac237511 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go +++ b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go @@ -57,7 +57,6 @@ func TestUSDCReader_callAttestationApiMock(t *testing.T) { lggr := logger.TestLogger(t) lp := mocks.NewLogPoller(t) - lp.On("RegisterFilter", mock.Anything).Return(nil) usdcReader, err := ccipdata.NewUSDCReader(lggr, mockMsgTransmitter, lp) require.NoError(t, err) usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, 0) @@ -160,10 +159,11 @@ func TestUSDCReader_callAttestationApiMockError(t *testing.T) { lggr := logger.TestLogger(t) lp := mocks.NewLogPoller(t) - lp.On("RegisterFilter", mock.Anything).Return(nil) usdcReader, err := ccipdata.NewUSDCReader(lggr, mockMsgTransmitter, lp) require.NoError(t, err) usdcService := NewUSDCTokenDataReader(lggr, usdcReader, attestationURI, test.customTimeoutSeconds) + lp.On("RegisterFilter", mock.Anything).Return(nil) + require.NoError(t, usdcReader.RegisterFilters()) parentCtx, cancel := context.WithTimeout(context.Background(), time.Duration(test.parentTimeoutSeconds)*time.Second) defer cancel() @@ -174,6 +174,8 @@ func TestUSDCReader_callAttestationApiMockError(t *testing.T) { if test.expectedError != nil { require.True(t, errors.Is(err, test.expectedError)) } + lp.On("UnregisterFilter", mock.Anything).Return(nil) + require.NoError(t, usdcReader.Close()) }) } } diff --git a/core/services/ocr2/validate/validate.go b/core/services/ocr2/validate/validate.go index 40abb457c5..d6f34966a9 100644 --- a/core/services/ocr2/validate/validate.go +++ b/core/services/ocr2/validate/validate.go @@ -28,7 +28,7 @@ func ValidatedOracleSpecToml(config OCR2Config, insConf InsecureConfig, tomlStri var spec job.OCR2OracleSpec tree, err := toml.Load(tomlString) if err != nil { - return jb, pkgerrors.Wrap(err, "toml error on load") + return jb, pkgerrors.Wrapf(err, "toml error on load %v", tomlString) } // Note this validates all the fields which implement an UnmarshalText // i.e. TransmitterAddress, PeerID...