From e9739374fae47cca4e8b7c927fa13360f5d4f9fe Mon Sep 17 00:00:00 2001 From: connorwstein Date: Tue, 12 Sep 2023 22:49:13 +0200 Subject: [PATCH] Adding missing database fix --- core/services/ocr2/delegate.go | 4 ++-- core/services/ocr2/plugins/ccip/commit_plugin.go | 14 +++++++------- .../ocr2/plugins/ccip/commit_plugin_test.go | 4 ++-- .../ocr2/plugins/ccip/commit_reporting_plugin.go | 11 ++++++----- .../ocr2/plugins/ccip/execution_plugin.go | 16 ++++++++-------- .../ocr2/plugins/ccip/execution_plugin_test.go | 3 +-- .../plugins/ccip/execution_reporting_plugin.go | 10 +++++----- .../services/ocr2/plugins/ccip/plugins_common.go | 10 ++++------ 8 files changed, 35 insertions(+), 37 deletions(-) diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 314a64f4d5..615bcbc67b 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -287,13 +287,13 @@ func (d *Delegate) cleanupEVM(jb job.Job, q pg.Queryer, relayID relay.ID) error d.lggr.Errorw("failed to derive ocr2keeper filter names from spec", "err", err, "spec", spec) } case types.CCIPCommit: - err = ccip.UnregisterCommitPluginLpFilters(context.Background(), q, spec, d.legacyChains) + err = ccip.UnregisterCommitPluginLpFilters(context.Background(), spec, d.legacyChains, pg.WithQueryer(q)) if err != nil { d.lggr.Errorw("failed to unregister ccip commit plugin filters", "err", err, "spec", spec) } return nil case types.CCIPExecution: - err = ccip.UnregisterExecPluginLpFilters(context.Background(), q, spec, d.legacyChains) + err = ccip.UnregisterExecPluginLpFilters(context.Background(), spec, d.legacyChains, pg.WithQueryer(q)) if err != nil { d.lggr.Errorw("failed to unregister ccip exec plugin filters", "err", err, "spec", spec) } diff --git a/core/services/ocr2/plugins/ccip/commit_plugin.go b/core/services/ocr2/plugins/ccip/commit_plugin.go index 2a44601767..170df6713b 100644 --- a/core/services/ocr2/plugins/ccip/commit_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_plugin.go @@ -34,7 +34,7 @@ const ( COMMIT_CCIP_SENDS = "Commit ccip sends" ) -func NewCommitServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) { +func NewCommitServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) { spec := jb.OCR2OracleSpec var pluginConfig ccipconfig.CommitPluginJobSpecConfig @@ -117,7 +117,7 @@ func NewCommitServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainC checkFinalityTags: sourceChain.Config().EVM().FinalityTagEnabled(), }) - err = wrappedPluginFactory.UpdateLogPollerFilters(zeroAddress) + err = wrappedPluginFactory.UpdateLogPollerFilters(zeroAddress, qopts...) if err != nil { return nil, err } @@ -206,7 +206,7 @@ func getCommitPluginDestLpFilters(priceRegistry common.Address, offRamp common.A } // UnregisterCommitPluginLpFilters unregisters all the registered filters for both source and dest chains. -func UnregisterCommitPluginLpFilters(ctx context.Context, q pg.Queryer, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer) error { +func UnregisterCommitPluginLpFilters(ctx context.Context, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) error { if spec == nil { return errors.New("spec is nil") } @@ -249,10 +249,10 @@ func UnregisterCommitPluginLpFilters(ctx context.Context, q pg.Queryer, spec *jo if err != nil { return err } - return unregisterCommitPluginFilters(ctx, q, sourceChain.LogPoller(), destChain.LogPoller(), commitStore, common.HexToAddress(pluginConfig.OffRamp)) + return unregisterCommitPluginFilters(ctx, sourceChain.LogPoller(), destChain.LogPoller(), commitStore, common.HexToAddress(pluginConfig.OffRamp), qopts...) } -func unregisterCommitPluginFilters(ctx context.Context, q pg.Queryer, sourceLP, destLP logpoller.LogPoller, destCommitStore commit_store.CommitStoreInterface, offRamp common.Address) error { +func unregisterCommitPluginFilters(ctx context.Context, sourceLP, destLP logpoller.LogPoller, destCommitStore commit_store.CommitStoreInterface, offRamp common.Address, qopts ...pg.QOpt) error { staticCfg, err := destCommitStore.GetStaticConfig(&bind.CallOpts{Context: ctx}) if err != nil { return err @@ -264,16 +264,16 @@ func unregisterCommitPluginFilters(ctx context.Context, q pg.Queryer, sourceLP, } if err := unregisterLpFilters( - q, sourceLP, getCommitPluginSourceLpFilters(staticCfg.OnRamp), + qopts..., ); err != nil { return err } return unregisterLpFilters( - q, destLP, getCommitPluginDestLpFilters(dynamicCfg.PriceRegistry, offRamp), + qopts..., ) } diff --git a/core/services/ocr2/plugins/ccip/commit_plugin_test.go b/core/services/ocr2/plugins/ccip/commit_plugin_test.go index d95c6b176a..a9dc54e94a 100644 --- a/core/services/ocr2/plugins/ccip/commit_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/commit_plugin_test.go @@ -70,7 +70,7 @@ func TestGetCommitPluginFilterNamesFromSpec(t *testing.T) { } } - err := UnregisterCommitPluginLpFilters(context.Background(), nil, tc.spec, chainSet) + err := UnregisterCommitPluginLpFilters(context.Background(), tc.spec, chainSet) if tc.expectingErr { assert.Error(t, err) } else { @@ -105,7 +105,7 @@ func TestGetCommitPluginFilterNames(t *testing.T) { dstLP.On("UnregisterFilter", "Token pool added - 0xDAFeA492D9c6733Ae3D56b7eD1AdB60692C98BC4", mock.Anything).Return(nil) dstLP.On("UnregisterFilter", "Token pool removed - 0xDAFeA492D9c6733Ae3D56b7eD1AdB60692C98BC4", mock.Anything).Return(nil) - err := unregisterCommitPluginFilters(context.Background(), nil, srcLP, dstLP, mockCommitStore, offRampAddr) + err := unregisterCommitPluginFilters(context.Background(), srcLP, dstLP, mockCommitStore, offRampAddr) assert.NoError(t, err) srcLP.AssertExpectations(t) diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go index 47acd57362..e6207fae90 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go @@ -29,6 +29,7 @@ import ( ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/hasher" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/merklemulti" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) const ( @@ -200,17 +201,17 @@ func (r *CommitReportingPlugin) Observation(ctx context.Context, epochAndRound t // UpdateLogPollerFilters updates the log poller filters for the source and destination chains. // pass zeroAddress if destPriceRegistry is unknown, filters with zero address are omitted. -func (rf *CommitReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry common.Address) error { +func (rf *CommitReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry common.Address, qopts ...pg.QOpt) error { rf.filtersMu.Lock() defer rf.filtersMu.Unlock() // source chain filters sourceFiltersBefore, sourceFiltersNow := rf.sourceChainFilters, getCommitPluginSourceLpFilters(rf.config.onRampAddress) created, deleted := filtersDiff(sourceFiltersBefore, sourceFiltersNow) - if err := unregisterLpFilters(nilQueryer, rf.config.sourceLP, deleted); err != nil { + if err := unregisterLpFilters(rf.config.sourceLP, deleted, qopts...); err != nil { return err } - if err := registerLpFilters(nilQueryer, rf.config.sourceLP, created); err != nil { + if err := registerLpFilters(rf.config.sourceLP, created, qopts...); err != nil { return err } rf.sourceChainFilters = sourceFiltersNow @@ -218,10 +219,10 @@ func (rf *CommitReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry // destination chain filters destFiltersBefore, destFiltersNow := rf.destChainFilters, getCommitPluginDestLpFilters(destPriceRegistry, rf.config.offRamp.Address()) created, deleted = filtersDiff(destFiltersBefore, destFiltersNow) - if err := unregisterLpFilters(nilQueryer, rf.config.destLP, deleted); err != nil { + if err := unregisterLpFilters(rf.config.destLP, deleted, qopts...); err != nil { return err } - if err := registerLpFilters(nilQueryer, rf.config.destLP, created); err != nil { + if err := registerLpFilters(rf.config.destLP, created, qopts...); err != nil { return err } rf.destChainFilters = destFiltersNow diff --git a/core/services/ocr2/plugins/ccip/execution_plugin.go b/core/services/ocr2/plugins/ccip/execution_plugin.go index 45185988a7..1a103cea7e 100644 --- a/core/services/ocr2/plugins/ccip/execution_plugin.go +++ b/core/services/ocr2/plugins/ccip/execution_plugin.go @@ -44,7 +44,7 @@ const ( FEE_TOKEN_REMOVED = "Fee token removed" ) -func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) { +func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) { spec := jb.OCR2OracleSpec var pluginConfig ccipconfig.ExecutionPluginJobSpecConfig err := json.Unmarshal(spec.PluginConfig.Bytes(), &pluginConfig) @@ -124,7 +124,7 @@ func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyCha leafHasher: hasher.NewLeafHasher(offRampConfig.SourceChainSelector, offRampConfig.ChainSelector, onRamp.Address(), hasher.NewKeccakCtx()), }) - err = wrappedPluginFactory.UpdateLogPollerFilters(zeroAddress) + err = wrappedPluginFactory.UpdateLogPollerFilters(zeroAddress, qopts...) if err != nil { return nil, err } @@ -213,7 +213,7 @@ func getExecutionPluginDestLpChainFilters(commitStore, offRamp, priceRegistry co } // UnregisterExecPluginLpFilters unregisters all the registered filters for both source and dest chains. -func UnregisterExecPluginLpFilters(ctx context.Context, q pg.Queryer, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer) error { +func UnregisterExecPluginLpFilters(ctx context.Context, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) error { if spec == nil { return errors.New("spec is nil") } @@ -260,18 +260,18 @@ func UnregisterExecPluginLpFilters(ctx context.Context, q pg.Queryer, spec *job. return errors.Wrap(err, "failed loading onRamp") } - return unregisterExecutionPluginLpFilters(ctx, q, sourceChain.LogPoller(), destChain.LogPoller(), offRamp, offRampConfig, sourceOnRamp, sourceChain.Client()) + return unregisterExecutionPluginLpFilters(ctx, sourceChain.LogPoller(), destChain.LogPoller(), offRamp, offRampConfig, sourceOnRamp, sourceChain.Client(), qopts...) } func unregisterExecutionPluginLpFilters( ctx context.Context, - q pg.Queryer, sourceLP logpoller.LogPoller, destLP logpoller.LogPoller, destOffRamp evm_2_evm_offramp.EVM2EVMOffRampInterface, destOffRampConfig evm_2_evm_offramp.EVM2EVMOffRampStaticConfig, sourceOnRamp evm_2_evm_onramp.EVM2EVMOnRampInterface, - sourceChainClient client.Client) error { + sourceChainClient client.Client, + qopts ...pg.QOpt) error { destOffRampDynCfg, err := destOffRamp.GetDynamicConfig(&bind.CallOpts{Context: ctx}) if err != nil { return err @@ -283,17 +283,17 @@ func unregisterExecutionPluginLpFilters( } if err := unregisterLpFilters( - q, sourceLP, getExecutionPluginSourceLpChainFilters(destOffRampConfig.OnRamp, onRampDynCfg.PriceRegistry), + qopts..., ); err != nil { return err } return unregisterLpFilters( - q, destLP, getExecutionPluginDestLpChainFilters(destOffRampConfig.CommitStore, destOffRamp.Address(), destOffRampDynCfg.PriceRegistry), + qopts..., ) } diff --git a/core/services/ocr2/plugins/ccip/execution_plugin_test.go b/core/services/ocr2/plugins/ccip/execution_plugin_test.go index 21fcfc023b..6ab5a0098e 100644 --- a/core/services/ocr2/plugins/ccip/execution_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/execution_plugin_test.go @@ -55,7 +55,7 @@ func TestGetExecutionPluginFilterNamesFromSpec(t *testing.T) { for _, tc := range testCases { chainSet := &mocks.LegacyChainContainer{} t.Run(tc.description, func(t *testing.T) { - err := UnregisterExecPluginLpFilters(context.Background(), nilQueryer, tc.spec, chainSet) + err := UnregisterExecPluginLpFilters(context.Background(), tc.spec, chainSet) if tc.expectingErr { assert.Error(t, err) } else { @@ -111,7 +111,6 @@ func TestGetExecutionPluginFilterNames(t *testing.T) { err := unregisterExecutionPluginLpFilters( context.Background(), - nilQueryer, srcLP, dstLP, mockOffRamp, diff --git a/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go b/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go index 21224cdc87..f7f0c6e219 100644 --- a/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go @@ -195,17 +195,17 @@ func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp ty // UpdateLogPollerFilters updates the log poller filters for the source and destination chains. // pass zeroAddress if dstPriceRegistry is unknown, filters with zero address are omitted. -func (rf *ExecutionReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry common.Address) error { +func (rf *ExecutionReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry common.Address, qopts ...pg.QOpt) error { rf.filtersMu.Lock() defer rf.filtersMu.Unlock() // source chain filters sourceFiltersBefore, sourceFiltersNow := rf.sourceChainFilters, getExecutionPluginSourceLpChainFilters(rf.config.onRamp.Address(), rf.config.sourcePriceRegistry.Address()) created, deleted := filtersDiff(sourceFiltersBefore, sourceFiltersNow) - if err := unregisterLpFilters(nilQueryer, rf.config.sourceLP, deleted); err != nil { + if err := unregisterLpFilters(rf.config.sourceLP, deleted, qopts...); err != nil { return err } - if err := registerLpFilters(nilQueryer, rf.config.sourceLP, created); err != nil { + if err := registerLpFilters(rf.config.sourceLP, created, qopts...); err != nil { return err } rf.sourceChainFilters = sourceFiltersNow @@ -213,10 +213,10 @@ func (rf *ExecutionReportingPluginFactory) UpdateLogPollerFilters(destPriceRegis // destination chain filters destFiltersBefore, destFiltersNow := rf.destChainFilters, getExecutionPluginDestLpChainFilters(rf.config.commitStore.Address(), rf.config.offRamp.Address(), destPriceRegistry) created, deleted = filtersDiff(destFiltersBefore, destFiltersNow) - if err := unregisterLpFilters(nilQueryer, rf.config.destLP, deleted); err != nil { + if err := unregisterLpFilters(rf.config.destLP, deleted, qopts...); err != nil { return err } - if err := registerLpFilters(nilQueryer, rf.config.destLP, created); err != nil { + if err := registerLpFilters(rf.config.destLP, created, qopts...); err != nil { return err } rf.destChainFilters = destFiltersNow diff --git a/core/services/ocr2/plugins/ccip/plugins_common.go b/core/services/ocr2/plugins/ccip/plugins_common.go index b0154bd5cf..f3df762e55 100644 --- a/core/services/ocr2/plugins/ccip/plugins_common.go +++ b/core/services/ocr2/plugins/ccip/plugins_common.go @@ -36,8 +36,6 @@ const ( var zeroAddress = common.HexToAddress("0") -var nilQueryer pg.Queryer - var ErrCommitStoreIsDown = errors.New("commitStore is down") func LoadOnRamp(onRampAddress common.Address, pluginName string, client client.Client) (evm_2_evm_onramp.EVM2EVMOnRampInterface, error) { @@ -186,24 +184,24 @@ func filterContainsZeroAddress(addrs []common.Address) bool { return false } -func registerLpFilters(q pg.Queryer, lp logpoller.LogPoller, filters []logpoller.Filter) error { +func registerLpFilters(lp logpoller.LogPoller, filters []logpoller.Filter, qopts ...pg.QOpt) error { for _, lpFilter := range filters { if filterContainsZeroAddress(lpFilter.Addresses) { continue } - if err := lp.RegisterFilter(lpFilter); err != nil { + if err := lp.RegisterFilter(lpFilter, qopts...); err != nil { return err } } return nil } -func unregisterLpFilters(q pg.Queryer, lp logpoller.LogPoller, filters []logpoller.Filter) error { +func unregisterLpFilters(lp logpoller.LogPoller, filters []logpoller.Filter, qopts ...pg.QOpt) error { for _, lpFilter := range filters { if filterContainsZeroAddress(lpFilter.Addresses) { continue } - if err := lp.UnregisterFilter(lpFilter.Name, pg.WithQueryer(q)); err != nil { + if err := lp.UnregisterFilter(lpFilter.Name, qopts...); err != nil { return err } }