Skip to content

Commit

Permalink
Adding missing database fix
Browse files Browse the repository at this point in the history
  • Loading branch information
connorwstein authored and mateusz-sekara committed Sep 13, 2023
1 parent a08600f commit e973937
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 37 deletions.
4 changes: 2 additions & 2 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,13 @@ func (d *Delegate) cleanupEVM(jb job.Job, q pg.Queryer, relayID relay.ID) error
d.lggr.Errorw("failed to derive ocr2keeper filter names from spec", "err", err, "spec", spec)
}
case types.CCIPCommit:
err = ccip.UnregisterCommitPluginLpFilters(context.Background(), q, spec, d.legacyChains)
err = ccip.UnregisterCommitPluginLpFilters(context.Background(), spec, d.legacyChains, pg.WithQueryer(q))
if err != nil {
d.lggr.Errorw("failed to unregister ccip commit plugin filters", "err", err, "spec", spec)
}
return nil
case types.CCIPExecution:
err = ccip.UnregisterExecPluginLpFilters(context.Background(), q, spec, d.legacyChains)
err = ccip.UnregisterExecPluginLpFilters(context.Background(), spec, d.legacyChains, pg.WithQueryer(q))
if err != nil {
d.lggr.Errorw("failed to unregister ccip exec plugin filters", "err", err, "spec", spec)
}
Expand Down
14 changes: 7 additions & 7 deletions core/services/ocr2/plugins/ccip/commit_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
COMMIT_CCIP_SENDS = "Commit ccip sends"
)

func NewCommitServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) {
func NewCommitServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
spec := jb.OCR2OracleSpec

var pluginConfig ccipconfig.CommitPluginJobSpecConfig
Expand Down Expand Up @@ -117,7 +117,7 @@ func NewCommitServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainC
checkFinalityTags: sourceChain.Config().EVM().FinalityTagEnabled(),
})

err = wrappedPluginFactory.UpdateLogPollerFilters(zeroAddress)
err = wrappedPluginFactory.UpdateLogPollerFilters(zeroAddress, qopts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func getCommitPluginDestLpFilters(priceRegistry common.Address, offRamp common.A
}

// UnregisterCommitPluginLpFilters unregisters all the registered filters for both source and dest chains.
func UnregisterCommitPluginLpFilters(ctx context.Context, q pg.Queryer, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer) error {
func UnregisterCommitPluginLpFilters(ctx context.Context, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) error {
if spec == nil {
return errors.New("spec is nil")
}
Expand Down Expand Up @@ -249,10 +249,10 @@ func UnregisterCommitPluginLpFilters(ctx context.Context, q pg.Queryer, spec *jo
if err != nil {
return err
}
return unregisterCommitPluginFilters(ctx, q, sourceChain.LogPoller(), destChain.LogPoller(), commitStore, common.HexToAddress(pluginConfig.OffRamp))
return unregisterCommitPluginFilters(ctx, sourceChain.LogPoller(), destChain.LogPoller(), commitStore, common.HexToAddress(pluginConfig.OffRamp), qopts...)
}

func unregisterCommitPluginFilters(ctx context.Context, q pg.Queryer, sourceLP, destLP logpoller.LogPoller, destCommitStore commit_store.CommitStoreInterface, offRamp common.Address) error {
func unregisterCommitPluginFilters(ctx context.Context, sourceLP, destLP logpoller.LogPoller, destCommitStore commit_store.CommitStoreInterface, offRamp common.Address, qopts ...pg.QOpt) error {
staticCfg, err := destCommitStore.GetStaticConfig(&bind.CallOpts{Context: ctx})
if err != nil {
return err
Expand All @@ -264,16 +264,16 @@ func unregisterCommitPluginFilters(ctx context.Context, q pg.Queryer, sourceLP,
}

if err := unregisterLpFilters(
q,
sourceLP,
getCommitPluginSourceLpFilters(staticCfg.OnRamp),
qopts...,
); err != nil {
return err
}

return unregisterLpFilters(
q,
destLP,
getCommitPluginDestLpFilters(dynamicCfg.PriceRegistry, offRamp),
qopts...,
)
}
4 changes: 2 additions & 2 deletions core/services/ocr2/plugins/ccip/commit_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestGetCommitPluginFilterNamesFromSpec(t *testing.T) {
}
}

err := UnregisterCommitPluginLpFilters(context.Background(), nil, tc.spec, chainSet)
err := UnregisterCommitPluginLpFilters(context.Background(), tc.spec, chainSet)
if tc.expectingErr {
assert.Error(t, err)
} else {
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestGetCommitPluginFilterNames(t *testing.T) {
dstLP.On("UnregisterFilter", "Token pool added - 0xDAFeA492D9c6733Ae3D56b7eD1AdB60692C98BC4", mock.Anything).Return(nil)
dstLP.On("UnregisterFilter", "Token pool removed - 0xDAFeA492D9c6733Ae3D56b7eD1AdB60692C98BC4", mock.Anything).Return(nil)

err := unregisterCommitPluginFilters(context.Background(), nil, srcLP, dstLP, mockCommitStore, offRampAddr)
err := unregisterCommitPluginFilters(context.Background(), srcLP, dstLP, mockCommitStore, offRampAddr)
assert.NoError(t, err)

srcLP.AssertExpectations(t)
Expand Down
11 changes: 6 additions & 5 deletions core/services/ocr2/plugins/ccip/commit_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/hasher"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/merklemulti"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

const (
Expand Down Expand Up @@ -200,28 +201,28 @@ func (r *CommitReportingPlugin) Observation(ctx context.Context, epochAndRound t

// UpdateLogPollerFilters updates the log poller filters for the source and destination chains.
// pass zeroAddress if destPriceRegistry is unknown, filters with zero address are omitted.
func (rf *CommitReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry common.Address) error {
func (rf *CommitReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry common.Address, qopts ...pg.QOpt) error {
rf.filtersMu.Lock()
defer rf.filtersMu.Unlock()

// source chain filters
sourceFiltersBefore, sourceFiltersNow := rf.sourceChainFilters, getCommitPluginSourceLpFilters(rf.config.onRampAddress)
created, deleted := filtersDiff(sourceFiltersBefore, sourceFiltersNow)
if err := unregisterLpFilters(nilQueryer, rf.config.sourceLP, deleted); err != nil {
if err := unregisterLpFilters(rf.config.sourceLP, deleted, qopts...); err != nil {
return err
}
if err := registerLpFilters(nilQueryer, rf.config.sourceLP, created); err != nil {
if err := registerLpFilters(rf.config.sourceLP, created, qopts...); err != nil {
return err
}
rf.sourceChainFilters = sourceFiltersNow

// destination chain filters
destFiltersBefore, destFiltersNow := rf.destChainFilters, getCommitPluginDestLpFilters(destPriceRegistry, rf.config.offRamp.Address())
created, deleted = filtersDiff(destFiltersBefore, destFiltersNow)
if err := unregisterLpFilters(nilQueryer, rf.config.destLP, deleted); err != nil {
if err := unregisterLpFilters(rf.config.destLP, deleted, qopts...); err != nil {
return err
}
if err := registerLpFilters(nilQueryer, rf.config.destLP, created); err != nil {
if err := registerLpFilters(rf.config.destLP, created, qopts...); err != nil {
return err
}
rf.destChainFilters = destFiltersNow
Expand Down
16 changes: 8 additions & 8 deletions core/services/ocr2/plugins/ccip/execution_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
FEE_TOKEN_REMOVED = "Fee token removed"
)

func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) {
func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
spec := jb.OCR2OracleSpec
var pluginConfig ccipconfig.ExecutionPluginJobSpecConfig
err := json.Unmarshal(spec.PluginConfig.Bytes(), &pluginConfig)
Expand Down Expand Up @@ -124,7 +124,7 @@ func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyCha
leafHasher: hasher.NewLeafHasher(offRampConfig.SourceChainSelector, offRampConfig.ChainSelector, onRamp.Address(), hasher.NewKeccakCtx()),
})

err = wrappedPluginFactory.UpdateLogPollerFilters(zeroAddress)
err = wrappedPluginFactory.UpdateLogPollerFilters(zeroAddress, qopts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func getExecutionPluginDestLpChainFilters(commitStore, offRamp, priceRegistry co
}

// UnregisterExecPluginLpFilters unregisters all the registered filters for both source and dest chains.
func UnregisterExecPluginLpFilters(ctx context.Context, q pg.Queryer, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer) error {
func UnregisterExecPluginLpFilters(ctx context.Context, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) error {
if spec == nil {
return errors.New("spec is nil")
}
Expand Down Expand Up @@ -260,18 +260,18 @@ func UnregisterExecPluginLpFilters(ctx context.Context, q pg.Queryer, spec *job.
return errors.Wrap(err, "failed loading onRamp")
}

return unregisterExecutionPluginLpFilters(ctx, q, sourceChain.LogPoller(), destChain.LogPoller(), offRamp, offRampConfig, sourceOnRamp, sourceChain.Client())
return unregisterExecutionPluginLpFilters(ctx, sourceChain.LogPoller(), destChain.LogPoller(), offRamp, offRampConfig, sourceOnRamp, sourceChain.Client(), qopts...)
}

func unregisterExecutionPluginLpFilters(
ctx context.Context,
q pg.Queryer,
sourceLP logpoller.LogPoller,
destLP logpoller.LogPoller,
destOffRamp evm_2_evm_offramp.EVM2EVMOffRampInterface,
destOffRampConfig evm_2_evm_offramp.EVM2EVMOffRampStaticConfig,
sourceOnRamp evm_2_evm_onramp.EVM2EVMOnRampInterface,
sourceChainClient client.Client) error {
sourceChainClient client.Client,
qopts ...pg.QOpt) error {
destOffRampDynCfg, err := destOffRamp.GetDynamicConfig(&bind.CallOpts{Context: ctx})
if err != nil {
return err
Expand All @@ -283,17 +283,17 @@ func unregisterExecutionPluginLpFilters(
}

if err := unregisterLpFilters(
q,
sourceLP,
getExecutionPluginSourceLpChainFilters(destOffRampConfig.OnRamp, onRampDynCfg.PriceRegistry),
qopts...,
); err != nil {
return err
}

return unregisterLpFilters(
q,
destLP,
getExecutionPluginDestLpChainFilters(destOffRampConfig.CommitStore, destOffRamp.Address(), destOffRampDynCfg.PriceRegistry),
qopts...,
)
}

Expand Down
3 changes: 1 addition & 2 deletions core/services/ocr2/plugins/ccip/execution_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestGetExecutionPluginFilterNamesFromSpec(t *testing.T) {
for _, tc := range testCases {
chainSet := &mocks.LegacyChainContainer{}
t.Run(tc.description, func(t *testing.T) {
err := UnregisterExecPluginLpFilters(context.Background(), nilQueryer, tc.spec, chainSet)
err := UnregisterExecPluginLpFilters(context.Background(), tc.spec, chainSet)
if tc.expectingErr {
assert.Error(t, err)
} else {
Expand Down Expand Up @@ -111,7 +111,6 @@ func TestGetExecutionPluginFilterNames(t *testing.T) {

err := unregisterExecutionPluginLpFilters(
context.Background(),
nilQueryer,
srcLP,
dstLP,
mockOffRamp,
Expand Down
10 changes: 5 additions & 5 deletions core/services/ocr2/plugins/ccip/execution_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,28 +195,28 @@ func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp ty

// UpdateLogPollerFilters updates the log poller filters for the source and destination chains.
// pass zeroAddress if dstPriceRegistry is unknown, filters with zero address are omitted.
func (rf *ExecutionReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry common.Address) error {
func (rf *ExecutionReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry common.Address, qopts ...pg.QOpt) error {
rf.filtersMu.Lock()
defer rf.filtersMu.Unlock()

// source chain filters
sourceFiltersBefore, sourceFiltersNow := rf.sourceChainFilters, getExecutionPluginSourceLpChainFilters(rf.config.onRamp.Address(), rf.config.sourcePriceRegistry.Address())
created, deleted := filtersDiff(sourceFiltersBefore, sourceFiltersNow)
if err := unregisterLpFilters(nilQueryer, rf.config.sourceLP, deleted); err != nil {
if err := unregisterLpFilters(rf.config.sourceLP, deleted, qopts...); err != nil {
return err
}
if err := registerLpFilters(nilQueryer, rf.config.sourceLP, created); err != nil {
if err := registerLpFilters(rf.config.sourceLP, created, qopts...); err != nil {
return err
}
rf.sourceChainFilters = sourceFiltersNow

// destination chain filters
destFiltersBefore, destFiltersNow := rf.destChainFilters, getExecutionPluginDestLpChainFilters(rf.config.commitStore.Address(), rf.config.offRamp.Address(), destPriceRegistry)
created, deleted = filtersDiff(destFiltersBefore, destFiltersNow)
if err := unregisterLpFilters(nilQueryer, rf.config.destLP, deleted); err != nil {
if err := unregisterLpFilters(rf.config.destLP, deleted, qopts...); err != nil {
return err
}
if err := registerLpFilters(nilQueryer, rf.config.destLP, created); err != nil {
if err := registerLpFilters(rf.config.destLP, created, qopts...); err != nil {
return err
}
rf.destChainFilters = destFiltersNow
Expand Down
10 changes: 4 additions & 6 deletions core/services/ocr2/plugins/ccip/plugins_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ const (

var zeroAddress = common.HexToAddress("0")

var nilQueryer pg.Queryer

var ErrCommitStoreIsDown = errors.New("commitStore is down")

func LoadOnRamp(onRampAddress common.Address, pluginName string, client client.Client) (evm_2_evm_onramp.EVM2EVMOnRampInterface, error) {
Expand Down Expand Up @@ -186,24 +184,24 @@ func filterContainsZeroAddress(addrs []common.Address) bool {
return false
}

func registerLpFilters(q pg.Queryer, lp logpoller.LogPoller, filters []logpoller.Filter) error {
func registerLpFilters(lp logpoller.LogPoller, filters []logpoller.Filter, qopts ...pg.QOpt) error {
for _, lpFilter := range filters {
if filterContainsZeroAddress(lpFilter.Addresses) {
continue
}
if err := lp.RegisterFilter(lpFilter); err != nil {
if err := lp.RegisterFilter(lpFilter, qopts...); err != nil {
return err
}
}
return nil
}

func unregisterLpFilters(q pg.Queryer, lp logpoller.LogPoller, filters []logpoller.Filter) error {
func unregisterLpFilters(lp logpoller.LogPoller, filters []logpoller.Filter, qopts ...pg.QOpt) error {
for _, lpFilter := range filters {
if filterContainsZeroAddress(lpFilter.Addresses) {
continue
}
if err := lp.UnregisterFilter(lpFilter.Name, pg.WithQueryer(q)); err != nil {
if err := lp.UnregisterFilter(lpFilter.Name, qopts...); err != nil {
return err
}
}
Expand Down

0 comments on commit e973937

Please sign in to comment.