From 6ac3cddfbcb311c82cdd2a33db7c6b6605d87d0d Mon Sep 17 00:00:00 2001 From: Ryan Stout Date: Tue, 9 Jul 2024 02:13:29 -0700 Subject: [PATCH] Commit NewReportingPlugin retries on error (#1160) Previously, if there was an error when starting the Commit Pluging (i.e. calling NewReportingPlugin), the Commit Plugin would remain in a non-started state. Now, NewReportingPlugin will retry until the Commit Plugin successfully starts. ## Motivation ## Solution Co-authored-by: dimitris --- .changeset/red-balloons-repeat.md | 5 + .../ocr2/plugins/ccip/ccipcommit/factory.go | 79 +++++++++----- .../plugins/ccip/ccipcommit/factory_test.go | 100 ++++++++++++++++++ .../plugins/ccip/ccipcommit/initializers.go | 26 +++-- .../ocr2/plugins/ccip/ccipcommit/ocr2.go | 3 +- 5 files changed, 174 insertions(+), 39 deletions(-) create mode 100644 .changeset/red-balloons-repeat.md create mode 100644 core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go diff --git a/.changeset/red-balloons-repeat.md b/.changeset/red-balloons-repeat.md new file mode 100644 index 0000000000..674cae9602 --- /dev/null +++ b/.changeset/red-balloons-repeat.md @@ -0,0 +1,5 @@ +--- +"ccip": patch +--- + +Commit NewReportingPlugin retries on error diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/factory.go b/core/services/ocr2/plugins/ccip/ccipcommit/factory.go index 95d6398f56..648f62a23a 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/factory.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/factory.go @@ -9,6 +9,7 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2plus/types" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcommon" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc" @@ -64,40 +65,60 @@ func (rf *CommitReportingPluginFactory) UpdateDynamicReaders(ctx context.Context return nil } -// NewReportingPlugin returns the ccip CommitReportingPlugin and satisfies the ReportingPluginFactory interface. +type reportingPluginAndInfo struct { + plugin types.ReportingPlugin + pluginInfo types.ReportingPluginInfo +} + +// NewReportingPlugin registers a new ReportingPlugin func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.ReportingPluginConfig) (types.ReportingPlugin, types.ReportingPluginInfo, error) { - ctx := context.Background() // todo: consider adding some timeout + initialRetryDelay := rf.config.newReportingPluginRetryConfig.InitialDelay + maxDelay := rf.config.newReportingPluginRetryConfig.MaxDelay - destPriceReg, err := rf.config.commitStore.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig) + pluginAndInfo, err := ccipcommon.RetryUntilSuccess(rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay) if err != nil { return nil, types.ReportingPluginInfo{}, err } + return pluginAndInfo.plugin, pluginAndInfo.pluginInfo, err +} - priceRegEvmAddr, err := ccipcalc.GenericAddrToEvm(destPriceReg) - if err != nil { - return nil, types.ReportingPluginInfo{}, err - } - if err = rf.UpdateDynamicReaders(ctx, priceRegEvmAddr); err != nil { - return nil, types.ReportingPluginInfo{}, err - } +// NewReportingPluginFn implements the NewReportingPlugin logic. It is defined as a function so that it can easily be +// retried via RetryUntilSuccess. NewReportingPlugin must return successfully in order for the Commit plugin to +// function, hence why we can only keep retrying it until it succeeds. +func (rf *CommitReportingPluginFactory) NewReportingPluginFn(config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) { + return func() (reportingPluginAndInfo, error) { + ctx := context.Background() // todo: consider adding some timeout - pluginOffChainConfig, err := rf.config.commitStore.OffchainConfig(ctx) - if err != nil { - return nil, types.ReportingPluginInfo{}, err - } + destPriceReg, err := rf.config.commitStore.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig) + if err != nil { + return reportingPluginAndInfo{}, err + } - gasPriceEstimator, err := rf.config.commitStore.GasPriceEstimator(ctx) - if err != nil { - return nil, types.ReportingPluginInfo{}, err - } + priceRegEvmAddr, err := ccipcalc.GenericAddrToEvm(destPriceReg) + if err != nil { + return reportingPluginAndInfo{}, err + } + if err = rf.UpdateDynamicReaders(ctx, priceRegEvmAddr); err != nil { + return reportingPluginAndInfo{}, err + } - err = rf.config.priceService.UpdateDynamicConfig(ctx, gasPriceEstimator, rf.destPriceRegReader) - if err != nil { - return nil, types.ReportingPluginInfo{}, err - } + pluginOffChainConfig, err := rf.config.commitStore.OffchainConfig(ctx) + if err != nil { + return reportingPluginAndInfo{}, err + } + + gasPriceEstimator, err := rf.config.commitStore.GasPriceEstimator(ctx) + if err != nil { + return reportingPluginAndInfo{}, err + } - lggr := rf.config.lggr.Named("CommitReportingPlugin") - return &CommitReportingPlugin{ + err = rf.config.priceService.UpdateDynamicConfig(ctx, gasPriceEstimator, rf.destPriceRegReader) + if err != nil { + return reportingPluginAndInfo{}, err + } + + lggr := rf.config.lggr.Named("CommitReportingPlugin") + plugin := &CommitReportingPlugin{ sourceChainSelector: rf.config.sourceChainSelector, sourceNative: rf.config.sourceNative, onRampReader: rf.config.onRampReader, @@ -112,8 +133,9 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin metricsCollector: rf.config.metricsCollector, chainHealthcheck: rf.config.chainHealthcheck, priceService: rf.config.priceService, - }, - types.ReportingPluginInfo{ + } + + pluginInfo := types.ReportingPluginInfo{ Name: "CCIPCommit", UniqueReports: false, // See comment in CommitStore constructor. Limits: types.ReportingPluginLimits{ @@ -121,5 +143,8 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin MaxObservationLength: ccip.MaxObservationLength, MaxReportLength: MaxCommitReportLength, }, - }, nil + } + + return reportingPluginAndInfo{plugin, pluginInfo}, nil + } } diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go b/core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go new file mode 100644 index 0000000000..825026bd17 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go @@ -0,0 +1,100 @@ +package ccipcommit + +import ( + "errors" + "testing" + "time" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" + ccipdataprovidermocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider/mocks" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks" + dbMocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdb/mocks" +) + +// Assert that NewReportingPlugin keeps retrying until it succeeds. +// +// NewReportingPlugin makes several calls (e.g. CommitStoreReader.ChangeConfig) that can fail. We use mocks to cause the +// first call to each of these functions to fail, then all subsequent calls succeed. We assert that NewReportingPlugin +// retries a sufficient number of times to get through the transient errors and eventually succeed. +func TestNewReportingPluginRetriesUntilSuccess(t *testing.T) { + commitConfig := CommitPluginStaticConfig{} + + // For this unit test, ensure that there is no delay between retries + commitConfig.newReportingPluginRetryConfig = ccipdata.RetryConfig{ + InitialDelay: 0 * time.Nanosecond, + MaxDelay: 0 * time.Nanosecond, + } + + // Set up the OffRampReader mock + mockCommitStore := new(mocks.CommitStoreReader) + + // The first call is set to return an error, the following calls return a nil error + mockCommitStore. + On("ChangeConfig", mock.Anything, mock.Anything, mock.Anything). + Return(ccip.Address(""), errors.New("")). + Once() + mockCommitStore. + On("ChangeConfig", mock.Anything, mock.Anything, mock.Anything). + Return(ccip.Address("0x7c6e4F0BDe29f83BC394B75a7f313B7E5DbD2d77"), nil). + Times(5) + + mockCommitStore. + On("OffchainConfig", mock.Anything). + Return(ccip.CommitOffchainConfig{}, errors.New("")). + Once() + mockCommitStore. + On("OffchainConfig", mock.Anything). + Return(ccip.CommitOffchainConfig{}, nil). + Times(3) + + mockCommitStore. + On("GasPriceEstimator", mock.Anything). + Return(nil, errors.New("")). + Once() + mockCommitStore. + On("GasPriceEstimator", mock.Anything). + Return(nil, nil). + Times(2) + + commitConfig.commitStore = mockCommitStore + + mockPriceService := new(dbMocks.PriceService) + + mockPriceService. + On("UpdateDynamicConfig", mock.Anything, mock.Anything, mock.Anything). + Return(errors.New("")). + Once() + mockPriceService. + On("UpdateDynamicConfig", mock.Anything, mock.Anything, mock.Anything). + Return(nil) + + commitConfig.priceService = mockPriceService + + priceRegistryProvider := new(ccipdataprovidermocks.PriceRegistry) + priceRegistryProvider. + On("NewPriceRegistryReader", mock.Anything, mock.Anything). + Return(nil, errors.New("")). + Once() + priceRegistryProvider. + On("NewPriceRegistryReader", mock.Anything, mock.Anything). + Return(nil, nil). + Once() + commitConfig.priceRegistryProvider = priceRegistryProvider + + commitConfig.lggr, _ = logger.NewLogger() + + factory := NewCommitReportingPluginFactory(commitConfig) + reportingConfig := types.ReportingPluginConfig{} + reportingConfig.OnchainConfig = []byte{1, 2, 3} + reportingConfig.OffchainConfig = []byte{1, 2, 3} + + // Assert that NewReportingPlugin succeeds despite many transient internal failures (mocked out above) + _, _, err := factory.NewReportingPlugin(reportingConfig) + assert.Equal(t, nil, err) +} diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go index 0d8a5414dc..729d8ca5c1 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "strings" + "time" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/pricegetter" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/rpclib" @@ -43,6 +44,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" ) +var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute} + func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider commontypes.CCIPCommitProvider, dstProvider commontypes.CCIPCommitProvider, chainSet legacyevm.LegacyChainContainer, jb job.Job, lggr logger.Logger, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, new bool, sourceChainID int64, destChainID int64, logError func(string)) ([]job.ServiceCtx, error) { spec := jb.OCR2OracleSpec @@ -171,17 +174,18 @@ func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider c ) wrappedPluginFactory := NewCommitReportingPluginFactory(CommitPluginStaticConfig{ - lggr: lggr, - onRampReader: onRampReader, - sourceChainSelector: staticConfig.SourceChainSelector, - sourceNative: sourceNative, - offRamp: offRampReader, - commitStore: commitStoreReader, - destChainSelector: staticConfig.ChainSelector, - priceRegistryProvider: ccip.NewChainAgnosticPriceRegistry(dstProvider), - metricsCollector: metricsCollector, - chainHealthcheck: chainHealthCheck, - priceService: priceService, + lggr: lggr, + newReportingPluginRetryConfig: defaultNewReportingPluginRetryConfig, + onRampReader: onRampReader, + sourceChainSelector: staticConfig.SourceChainSelector, + sourceNative: sourceNative, + offRamp: offRampReader, + commitStore: commitStoreReader, + destChainSelector: staticConfig.ChainSelector, + priceRegistryProvider: ccip.NewChainAgnosticPriceRegistry(dstProvider), + metricsCollector: metricsCollector, + chainHealthcheck: chainHealthCheck, + priceService: priceService, }) argsNoPlugin.ReportingPluginFactory = promwrapper.NewPromFactory(wrappedPluginFactory, "CCIPCommit", jb.OCR2OracleSpec.Relay, big.NewInt(0).SetInt64(destChainID)) argsNoPlugin.Logger = commonlogger.NewOCRWrapper(commitLggr, true, logError) diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go index 3c96940b8d..2f0fc4e795 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go @@ -50,7 +50,8 @@ type update struct { } type CommitPluginStaticConfig struct { - lggr logger.Logger + lggr logger.Logger + newReportingPluginRetryConfig ccipdata.RetryConfig // Source onRampReader ccipdata.OnRampReader sourceChainSelector uint64