From 084fb6f5967d0d84375257cd375649c8dfdc3fb7 Mon Sep 17 00:00:00 2001 From: connorwstein Date: Tue, 12 Sep 2023 16:49:13 -0400 Subject: [PATCH] Port db fix --- .../setup-postgres/wait-for-healthy-postgres.sh | 16 +++++++++++++++- VERSION | 2 +- core/services/ocr2/delegate.go | 8 ++++---- core/services/ocr2/plugins/ccip/commit_plugin.go | 14 +++++++------- .../ocr2/plugins/ccip/commit_plugin_test.go | 4 ++-- .../ocr2/plugins/ccip/commit_reporting_plugin.go | 10 +++++----- .../ocr2/plugins/ccip/execution_plugin.go | 16 ++++++++-------- .../ocr2/plugins/ccip/execution_plugin_test.go | 3 +-- .../plugins/ccip/execution_reporting_plugin.go | 10 +++++----- .../services/ocr2/plugins/ccip/plugins_common.go | 10 ++++------ 10 files changed, 52 insertions(+), 41 deletions(-) diff --git a/.github/actions/setup-postgres/wait-for-healthy-postgres.sh b/.github/actions/setup-postgres/wait-for-healthy-postgres.sh index 3f0efd66f3..438cfbaff3 100755 --- a/.github/actions/setup-postgres/wait-for-healthy-postgres.sh +++ b/.github/actions/setup-postgres/wait-for-healthy-postgres.sh @@ -2,10 +2,24 @@ RETRIES=10 until [ $RETRIES -eq 0 ]; do - if docker compose ps postgres --status running --format json | jq >/dev/null -e 'if (.[0].Health == "healthy") then true else false end'; then + DOCKER_OUTPUT=$(docker compose ps postgres --status running --format json) + JSON_TYPE=$(echo "$DOCKER_OUTPUT" | jq -r 'type') + + if [ "$JSON_TYPE" == "array" ]; then + HEALTH_STATUS=$(echo "$DOCKER_OUTPUT" | jq -r '.[0].Health') + elif [ "$JSON_TYPE" == "object" ]; then + HEALTH_STATUS=$(echo "$DOCKER_OUTPUT" | jq -r '.Health') + else + HEALTH_STATUS="Unknown JSON type: $JSON_TYPE" + fi + + echo "postgres health status: $HEALTH_STATUS" + if [ "$HEALTH_STATUS" == "healthy" ]; then exit 0 fi + echo "Waiting for postgres server, $((RETRIES--)) remaining attempts..." sleep 2 done + exit 1 diff --git a/VERSION b/VERSION index cdb8ebb8c3..7d34869554 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v2.5.0-ccip1.1.0 +v2.5.0-ccip1.1.1 diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index d9100ec69d..53feedd512 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -288,13 +288,13 @@ func (d *Delegate) cleanupEVM(jb job.Job, q pg.Queryer, relayID relay.ID) error d.lggr.Errorw("failed to derive ocr2keeper filter names from spec", "err", err, "spec", spec) } case job.CCIPCommit: - err = ccip.UnregisterCommitPluginLpFilters(context.Background(), q, spec, d.legacyChains) + err = ccip.UnregisterCommitPluginLpFilters(context.Background(), spec, d.legacyChains, pg.WithQueryer(q)) if err != nil { d.lggr.Errorw("failed to unregister ccip commit plugin filters", "err", err, "spec", spec) } return nil case job.CCIPExecution: - err = ccip.UnregisterExecPluginLpFilters(context.Background(), q, spec, d.legacyChains) + err = ccip.UnregisterExecPluginLpFilters(context.Background(), spec, d.legacyChains, pg.WithQueryer(q)) if err != nil { d.lggr.Errorw("failed to unregister ccip exec plugin filters", "err", err, "spec", spec) } @@ -479,7 +479,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) ([]job.ServiceC OffchainKeyring: kb, OnchainKeyring: kb, } - return ccip.NewCommitServices(lggr, jb, d.legacyChains, d.isNewlyCreatedJob, d.pipelineRunner, oracleArgsNoPlugin, logError) + return ccip.NewCommitServices(lggr, jb, d.legacyChains, d.isNewlyCreatedJob, d.pipelineRunner, oracleArgsNoPlugin, logError, qopts...) case job.CCIPExecution: if spec.Relay != relay.EVM { return nil, errors.New("Non evm chains are not supported for CCIP execution") @@ -523,7 +523,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) ([]job.ServiceC OnchainKeyring: kb, } - return ccip.NewExecutionServices(lggr, jb, d.legacyChains, d.isNewlyCreatedJob, oracleArgsNoPlugin, logError) + return ccip.NewExecutionServices(lggr, jb, d.legacyChains, d.isNewlyCreatedJob, oracleArgsNoPlugin, logError, qopts...) case job.OCR2Functions: const ( _ int32 = iota diff --git a/core/services/ocr2/plugins/ccip/commit_plugin.go b/core/services/ocr2/plugins/ccip/commit_plugin.go index 491c923cf7..23c168c8e0 100644 --- a/core/services/ocr2/plugins/ccip/commit_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_plugin.go @@ -36,7 +36,7 @@ const ( COMMIT_CCIP_SENDS = "Commit ccip sends" ) -func NewCommitServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) { +func NewCommitServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) { spec := jb.OCR2OracleSpec var pluginConfig ccipconfig.CommitPluginJobSpecConfig @@ -118,7 +118,7 @@ func NewCommitServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainC checkFinalityTags: sourceChain.Config().EVM().FinalityTagEnabled(), }) - err = wrappedPluginFactory.UpdateLogPollerFilters(zeroAddress) + err = wrappedPluginFactory.UpdateLogPollerFilters(zeroAddress, qopts...) if err != nil { return nil, err } @@ -217,7 +217,7 @@ func getCommitPluginDestLpFilters(priceRegistry common.Address, offRamp common.A } // UnregisterCommitPluginLpFilters unregisters all the registered filters for both source and dest chains. -func UnregisterCommitPluginLpFilters(ctx context.Context, q pg.Queryer, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer) error { +func UnregisterCommitPluginLpFilters(ctx context.Context, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) error { if spec == nil { return errors.New("spec is nil") } @@ -260,10 +260,10 @@ func UnregisterCommitPluginLpFilters(ctx context.Context, q pg.Queryer, spec *jo if err != nil { return err } - return unregisterCommitPluginFilters(ctx, q, sourceChain.LogPoller(), destChain.LogPoller(), commitStore, common.HexToAddress(pluginConfig.OffRamp)) + return unregisterCommitPluginFilters(ctx, sourceChain.LogPoller(), destChain.LogPoller(), commitStore, common.HexToAddress(pluginConfig.OffRamp), qopts...) } -func unregisterCommitPluginFilters(ctx context.Context, q pg.Queryer, sourceLP, destLP logpoller.LogPoller, destCommitStore commit_store.CommitStoreInterface, offRamp common.Address) error { +func unregisterCommitPluginFilters(ctx context.Context, sourceLP, destLP logpoller.LogPoller, destCommitStore commit_store.CommitStoreInterface, offRamp common.Address, qopts ...pg.QOpt) error { staticCfg, err := destCommitStore.GetStaticConfig(&bind.CallOpts{Context: ctx}) if err != nil { return err @@ -275,16 +275,16 @@ func unregisterCommitPluginFilters(ctx context.Context, q pg.Queryer, sourceLP, } if err := unregisterLpFilters( - q, sourceLP, getCommitPluginSourceLpFilters(staticCfg.OnRamp), + qopts..., ); err != nil { return err } return unregisterLpFilters( - q, destLP, getCommitPluginDestLpFilters(dynamicCfg.PriceRegistry, offRamp), + qopts..., ) } diff --git a/core/services/ocr2/plugins/ccip/commit_plugin_test.go b/core/services/ocr2/plugins/ccip/commit_plugin_test.go index d95c6b176a..a9dc54e94a 100644 --- a/core/services/ocr2/plugins/ccip/commit_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/commit_plugin_test.go @@ -70,7 +70,7 @@ func TestGetCommitPluginFilterNamesFromSpec(t *testing.T) { } } - err := UnregisterCommitPluginLpFilters(context.Background(), nil, tc.spec, chainSet) + err := UnregisterCommitPluginLpFilters(context.Background(), tc.spec, chainSet) if tc.expectingErr { assert.Error(t, err) } else { @@ -105,7 +105,7 @@ func TestGetCommitPluginFilterNames(t *testing.T) { dstLP.On("UnregisterFilter", "Token pool added - 0xDAFeA492D9c6733Ae3D56b7eD1AdB60692C98BC4", mock.Anything).Return(nil) dstLP.On("UnregisterFilter", "Token pool removed - 0xDAFeA492D9c6733Ae3D56b7eD1AdB60692C98BC4", mock.Anything).Return(nil) - err := unregisterCommitPluginFilters(context.Background(), nil, srcLP, dstLP, mockCommitStore, offRampAddr) + err := unregisterCommitPluginFilters(context.Background(), srcLP, dstLP, mockCommitStore, offRampAddr) assert.NoError(t, err) srcLP.AssertExpectations(t) diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go index 5e5fcd809e..f84dd050c8 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go @@ -200,17 +200,17 @@ func (r *CommitReportingPlugin) Observation(ctx context.Context, epochAndRound t // UpdateLogPollerFilters updates the log poller filters for the source and destination chains. // pass zeroAddress if destPriceRegistry is unknown, filters with zero address are omitted. -func (rf *CommitReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry common.Address) error { +func (rf *CommitReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry common.Address, qopts ...pg.QOpt) error { rf.filtersMu.Lock() defer rf.filtersMu.Unlock() // source chain filters sourceFiltersBefore, sourceFiltersNow := rf.sourceChainFilters, getCommitPluginSourceLpFilters(rf.config.onRampAddress) created, deleted := filtersDiff(sourceFiltersBefore, sourceFiltersNow) - if err := unregisterLpFilters(nilQueryer, rf.config.sourceLP, deleted); err != nil { + if err := unregisterLpFilters(rf.config.sourceLP, deleted, qopts...); err != nil { return err } - if err := registerLpFilters(nilQueryer, rf.config.sourceLP, created); err != nil { + if err := registerLpFilters(rf.config.sourceLP, created, qopts...); err != nil { return err } rf.sourceChainFilters = sourceFiltersNow @@ -218,10 +218,10 @@ func (rf *CommitReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry // destination chain filters destFiltersBefore, destFiltersNow := rf.destChainFilters, getCommitPluginDestLpFilters(destPriceRegistry, rf.config.offRamp.Address()) created, deleted = filtersDiff(destFiltersBefore, destFiltersNow) - if err := unregisterLpFilters(nilQueryer, rf.config.destLP, deleted); err != nil { + if err := unregisterLpFilters(rf.config.destLP, deleted, qopts...); err != nil { return err } - if err := registerLpFilters(nilQueryer, rf.config.destLP, created); err != nil { + if err := registerLpFilters(rf.config.destLP, created, qopts...); err != nil { return err } rf.destChainFilters = destFiltersNow diff --git a/core/services/ocr2/plugins/ccip/execution_plugin.go b/core/services/ocr2/plugins/ccip/execution_plugin.go index 8fbb527f29..0c94d31b4f 100644 --- a/core/services/ocr2/plugins/ccip/execution_plugin.go +++ b/core/services/ocr2/plugins/ccip/execution_plugin.go @@ -43,7 +43,7 @@ const ( FEE_TOKEN_REMOVED = "Fee token removed" ) -func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) { +func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) { spec := jb.OCR2OracleSpec var pluginConfig ccipconfig.ExecutionPluginJobSpecConfig err := json.Unmarshal(spec.PluginConfig.Bytes(), &pluginConfig) @@ -120,7 +120,7 @@ func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyCha leafHasher: hasher.NewLeafHasher(offRampConfig.SourceChainSelector, offRampConfig.ChainSelector, onRamp.Address(), hasher.NewKeccakCtx()), }) - err = wrappedPluginFactory.UpdateLogPollerFilters(zeroAddress) + err = wrappedPluginFactory.UpdateLogPollerFilters(zeroAddress, qopts...) if err != nil { return nil, err } @@ -209,7 +209,7 @@ func getExecutionPluginDestLpChainFilters(commitStore, offRamp, priceRegistry co } // UnregisterExecPluginLpFilters unregisters all the registered filters for both source and dest chains. -func UnregisterExecPluginLpFilters(ctx context.Context, q pg.Queryer, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer) error { +func UnregisterExecPluginLpFilters(ctx context.Context, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) error { if spec == nil { return errors.New("spec is nil") } @@ -256,18 +256,18 @@ func UnregisterExecPluginLpFilters(ctx context.Context, q pg.Queryer, spec *job. return errors.Wrap(err, "failed loading onRamp") } - return unregisterExecutionPluginLpFilters(ctx, q, sourceChain.LogPoller(), destChain.LogPoller(), offRamp, offRampConfig, sourceOnRamp, sourceChain.Client()) + return unregisterExecutionPluginLpFilters(ctx, sourceChain.LogPoller(), destChain.LogPoller(), offRamp, offRampConfig, sourceOnRamp, sourceChain.Client(), qopts...) } func unregisterExecutionPluginLpFilters( ctx context.Context, - q pg.Queryer, sourceLP logpoller.LogPoller, destLP logpoller.LogPoller, destOffRamp evm_2_evm_offramp.EVM2EVMOffRampInterface, destOffRampConfig evm_2_evm_offramp.EVM2EVMOffRampStaticConfig, sourceOnRamp evm_2_evm_onramp.EVM2EVMOnRampInterface, - sourceChainClient client.Client) error { + sourceChainClient client.Client, + qopts ...pg.QOpt) error { destOffRampDynCfg, err := destOffRamp.GetDynamicConfig(&bind.CallOpts{Context: ctx}) if err != nil { return err @@ -279,17 +279,17 @@ func unregisterExecutionPluginLpFilters( } if err := unregisterLpFilters( - q, sourceLP, getExecutionPluginSourceLpChainFilters(destOffRampConfig.OnRamp, onRampDynCfg.PriceRegistry), + qopts..., ); err != nil { return err } return unregisterLpFilters( - q, destLP, getExecutionPluginDestLpChainFilters(destOffRampConfig.CommitStore, destOffRamp.Address(), destOffRampDynCfg.PriceRegistry), + qopts..., ) } diff --git a/core/services/ocr2/plugins/ccip/execution_plugin_test.go b/core/services/ocr2/plugins/ccip/execution_plugin_test.go index 85728682b6..c6ea632998 100644 --- a/core/services/ocr2/plugins/ccip/execution_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/execution_plugin_test.go @@ -55,7 +55,7 @@ func TestGetExecutionPluginFilterNamesFromSpec(t *testing.T) { for _, tc := range testCases { chainSet := &mocks.LegacyChainContainer{} t.Run(tc.description, func(t *testing.T) { - err := UnregisterExecPluginLpFilters(context.Background(), nilQueryer, tc.spec, chainSet) + err := UnregisterExecPluginLpFilters(context.Background(), tc.spec, chainSet) if tc.expectingErr { assert.Error(t, err) } else { @@ -111,7 +111,6 @@ func TestGetExecutionPluginFilterNames(t *testing.T) { err := unregisterExecutionPluginLpFilters( context.Background(), - nilQueryer, srcLP, dstLP, mockOffRamp, diff --git a/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go b/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go index f707d525e3..3a14d6fd7f 100644 --- a/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go @@ -191,17 +191,17 @@ func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp ty // UpdateLogPollerFilters updates the log poller filters for the source and destination chains. // pass zeroAddress if dstPriceRegistry is unknown, filters with zero address are omitted. -func (rf *ExecutionReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry common.Address) error { +func (rf *ExecutionReportingPluginFactory) UpdateLogPollerFilters(destPriceRegistry common.Address, qopts ...pg.QOpt) error { rf.filtersMu.Lock() defer rf.filtersMu.Unlock() // source chain filters sourceFiltersBefore, sourceFiltersNow := rf.sourceChainFilters, getExecutionPluginSourceLpChainFilters(rf.config.onRamp.Address(), rf.config.sourcePriceRegistry.Address()) created, deleted := filtersDiff(sourceFiltersBefore, sourceFiltersNow) - if err := unregisterLpFilters(nilQueryer, rf.config.sourceLP, deleted); err != nil { + if err := unregisterLpFilters(rf.config.sourceLP, deleted, qopts...); err != nil { return err } - if err := registerLpFilters(nilQueryer, rf.config.sourceLP, created); err != nil { + if err := registerLpFilters(rf.config.sourceLP, created, qopts...); err != nil { return err } rf.sourceChainFilters = sourceFiltersNow @@ -209,10 +209,10 @@ func (rf *ExecutionReportingPluginFactory) UpdateLogPollerFilters(destPriceRegis // destination chain filters destFiltersBefore, destFiltersNow := rf.destChainFilters, getExecutionPluginDestLpChainFilters(rf.config.commitStore.Address(), rf.config.offRamp.Address(), destPriceRegistry) created, deleted = filtersDiff(destFiltersBefore, destFiltersNow) - if err := unregisterLpFilters(nilQueryer, rf.config.destLP, deleted); err != nil { + if err := unregisterLpFilters(rf.config.destLP, deleted, qopts...); err != nil { return err } - if err := registerLpFilters(nilQueryer, rf.config.destLP, created); err != nil { + if err := registerLpFilters(rf.config.destLP, created, qopts...); err != nil { return err } rf.destChainFilters = destFiltersNow diff --git a/core/services/ocr2/plugins/ccip/plugins_common.go b/core/services/ocr2/plugins/ccip/plugins_common.go index 026559b44c..cb3ea8dd88 100644 --- a/core/services/ocr2/plugins/ccip/plugins_common.go +++ b/core/services/ocr2/plugins/ccip/plugins_common.go @@ -34,8 +34,6 @@ const ( var zeroAddress = common.HexToAddress("0") -var nilQueryer pg.Queryer - var ErrCommitStoreIsDown = errors.New("commitStore is down") func LoadOnRamp(onRampAddress common.Address, pluginName string, client client.Client) (evm_2_evm_onramp.EVM2EVMOnRampInterface, error) { @@ -174,24 +172,24 @@ func filterContainsZeroAddress(addrs []common.Address) bool { return false } -func registerLpFilters(q pg.Queryer, lp logpoller.LogPoller, filters []logpoller.Filter) error { +func registerLpFilters(lp logpoller.LogPoller, filters []logpoller.Filter, qopts ...pg.QOpt) error { for _, lpFilter := range filters { if filterContainsZeroAddress(lpFilter.Addresses) { continue } - if err := lp.RegisterFilter(lpFilter); err != nil { + if err := lp.RegisterFilter(lpFilter, qopts...); err != nil { return err } } return nil } -func unregisterLpFilters(q pg.Queryer, lp logpoller.LogPoller, filters []logpoller.Filter) error { +func unregisterLpFilters(lp logpoller.LogPoller, filters []logpoller.Filter, qopts ...pg.QOpt) error { for _, lpFilter := range filters { if filterContainsZeroAddress(lpFilter.Addresses) { continue } - if err := lp.UnregisterFilter(lpFilter.Name, pg.WithQueryer(q)); err != nil { + if err := lp.UnregisterFilter(lpFilter.Name, qopts...); err != nil { return err } }