From dd0c1b4059fb9dc14757265cd17ddd30d58c2367 Mon Sep 17 00:00:00 2001 From: "valerii.kabisov" Date: Thu, 3 Oct 2024 03:31:58 +0900 Subject: [PATCH] fixing price service --- core/services/ccip/orm.go | 96 +-------------- .../ccip/internal/ccipdb/price_service.go | 71 +++-------- .../internal/ccipdb/price_service_test.go | 113 ++---------------- 3 files changed, 26 insertions(+), 254 deletions(-) diff --git a/core/services/ccip/orm.go b/core/services/ccip/orm.go index 09cc0af835..1942c68fef 100644 --- a/core/services/ccip/orm.go +++ b/core/services/ccip/orm.go @@ -14,23 +14,11 @@ import ( type GasPrice struct { SourceChainSelector uint64 GasPrice *assets.Wei - CreatedAt time.Time -} - -type GasPriceUpdate struct { - SourceChainSelector uint64 - GasPrice *assets.Wei } type TokenPrice struct { TokenAddr string TokenPrice *assets.Wei - CreatedAt time.Time -} - -type TokenPriceUpdate struct { - TokenAddr string - TokenPrice *assets.Wei } type ORM interface { @@ -39,12 +27,6 @@ type ORM interface { UpsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPrice) (int64, error) UpsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPrice, interval time.Duration) (int64, error) - - InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []GasPriceUpdate) error - InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []TokenPriceUpdate) error - - ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error - ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error } type orm struct { @@ -68,11 +50,9 @@ func NewORM(ds sqlutil.DataSource, lggr logger.Logger) (ORM, error) { func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error) { var gasPrices []GasPrice stmt := ` - SELECT DISTINCT ON (source_chain_selector) - source_chain_selector, gas_price, created_at + SELECT source_chain_selector, gas_price FROM ccip.observed_gas_prices - WHERE chain_selector = $1 - ORDER BY source_chain_selector, created_at DESC; + WHERE chain_selector = $1; ` err := o.ds.SelectContext(ctx, &gasPrices, stmt, destChainSelector) if err != nil { @@ -85,11 +65,9 @@ func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uin func (o *orm) GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error) { var tokenPrices []TokenPrice stmt := ` - SELECT DISTINCT ON (token_addr) - token_addr, token_price, created_at + SELECT token_addr, token_price FROM ccip.observed_token_prices - WHERE chain_selector = $1 - ORDER BY token_addr, created_at DESC; + WHERE chain_selector = $1; ` err := o.ds.SelectContext(ctx, &tokenPrices, stmt, destChainSelector) if err != nil { @@ -231,69 +209,3 @@ func tokenAddrsToBytes(tokens map[string]*assets.Wei) [][]byte { } return addrs } - -func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []GasPriceUpdate) error { - if len(gasPrices) == 0 { - return nil - } - - insertData := make([]map[string]interface{}, 0, len(gasPrices)) - for _, price := range gasPrices { - insertData = append(insertData, map[string]interface{}{ - "chain_selector": destChainSelector, - "job_id": jobId, - "source_chain_selector": price.SourceChainSelector, - "gas_price": price.GasPrice, - }) - } - - // using statement_timestamp() to make testing easier - stmt := `INSERT INTO ccip.observed_gas_prices (chain_selector, job_id, source_chain_selector, gas_price, created_at) - VALUES (:chain_selector, :job_id, :source_chain_selector, :gas_price, statement_timestamp());` - _, err := o.ds.NamedExecContext(ctx, stmt, insertData) - if err != nil { - err = fmt.Errorf("error inserting gas prices for job %d: %w", jobId, err) - } - - return err -} - -func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []TokenPriceUpdate) error { - if len(tokenPrices) == 0 { - return nil - } - - insertData := make([]map[string]interface{}, 0, len(tokenPrices)) - for _, price := range tokenPrices { - insertData = append(insertData, map[string]interface{}{ - "chain_selector": destChainSelector, - "job_id": jobId, - "token_addr": price.TokenAddr, - "token_price": price.TokenPrice, - }) - } - - // using statement_timestamp() to make testing easier - stmt := `INSERT INTO ccip.observed_token_prices (chain_selector, job_id, token_addr, token_price, created_at) - VALUES (:chain_selector, :job_id, :token_addr, :token_price, statement_timestamp());` - _, err := o.ds.NamedExecContext(ctx, stmt, insertData) - if err != nil { - err = fmt.Errorf("error inserting token prices for job %d: %w", jobId, err) - } - - return err -} - -func (o *orm) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { - stmt := `DELETE FROM ccip.observed_gas_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')` - - _, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec) - return err -} - -func (o *orm) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { - stmt := `DELETE FROM ccip.observed_token_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')` - - _, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec) - return err -} diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go index ad44555477..2806c26e22 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go @@ -14,7 +14,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" - "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" cciporm "github.com/smartcontractkit/chainlink/v2/core/services/ccip" @@ -24,6 +23,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/pricegetter" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices" + "github.com/smartcontractkit/chainlink/v2/core/utils" ) // PriceService manages DB access for gas and token price data. @@ -49,23 +49,11 @@ const ( // Token prices are refreshed every 10 minutes, we only report prices for blue chip tokens, DS&A simulation show // their prices are stable, 10-minute resolution is accurate enough. tokenPriceUpdateInterval = 10 * time.Minute - - // Prices should expire after 25 minutes in DB. Prices should be fresh in the Commit plugin. - // 25 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while - // surfacing price update outages quickly enough. - priceExpireThreshold = 25 * time.Minute - - // Cleanups are called every 10 minutes. For a given job, on average we may expect 3 token prices and 1 gas price. - // 10 minutes should result in ~13 rows being cleaned up per job, it is not a heavy load on DB, so there is no need - // to run cleanup more frequently. We shouldn't clean up less frequently than `priceExpireThreshold`. - priceCleanupInterval = 10 * time.Minute ) type priceService struct { - priceExpireThreshold time.Duration - cleanupInterval time.Duration - gasUpdateInterval time.Duration - tokenUpdateInterval time.Duration + gasUpdateInterval time.Duration + tokenUpdateInterval time.Duration lggr logger.Logger orm cciporm.ORM @@ -100,10 +88,8 @@ func NewPriceService( ctx, cancel := context.WithCancel(context.Background()) pw := &priceService{ - priceExpireThreshold: priceExpireThreshold, - cleanupInterval: utils.WithJitter(priceCleanupInterval), // use WithJitter to avoid multiple services impacting DB at same time - gasUpdateInterval: utils.WithJitter(gasPriceUpdateInterval), - tokenUpdateInterval: utils.WithJitter(tokenPriceUpdateInterval), + gasUpdateInterval: gasPriceUpdateInterval, + tokenUpdateInterval: tokenPriceUpdateInterval, lggr: lggr, orm: orm, @@ -142,13 +128,11 @@ func (p *priceService) Close() error { } func (p *priceService) run() { - cleanupTicker := time.NewTicker(p.cleanupInterval) - gasUpdateTicker := time.NewTicker(p.gasUpdateInterval) - tokenUpdateTicker := time.NewTicker(p.tokenUpdateInterval) + gasUpdateTicker := time.NewTicker(utils.WithJitter(p.gasUpdateInterval)) + tokenUpdateTicker := time.NewTicker(utils.WithJitter(p.tokenUpdateInterval)) go func() { defer p.wg.Done() - defer cleanupTicker.Stop() defer gasUpdateTicker.Stop() defer tokenUpdateTicker.Stop() @@ -156,11 +140,6 @@ func (p *priceService) run() { select { case <-p.backgroundCtx.Done(): return - case <-cleanupTicker.C: - err := p.runCleanup(p.backgroundCtx) - if err != nil { - p.lggr.Errorw("Error when cleaning up in-db prices in the background", "err", err) - } case <-gasUpdateTicker.C: err := p.runGasPriceUpdate(p.backgroundCtx) if err != nil { @@ -240,28 +219,6 @@ func (p *priceService) GetGasAndTokenPrices(ctx context.Context, destChainSelect return gasPrices, tokenPrices, nil } -func (p *priceService) runCleanup(ctx context.Context) error { - eg := new(errgroup.Group) - - eg.Go(func() error { - err := p.orm.ClearGasPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds())) - if err != nil { - return fmt.Errorf("error clearing gas prices: %w", err) - } - return nil - }) - - eg.Go(func() error { - err := p.orm.ClearTokenPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds())) - if err != nil { - return fmt.Errorf("error clearing token prices: %w", err) - } - return nil - }) - - return eg.Wait() -} - func (p *priceService) runGasPriceUpdate(ctx context.Context) error { // Protect against concurrent updates of `gasPriceEstimator` and `destPriceRegistryReader` // Price updates happen infrequently - once every `gasPriceUpdateInterval` seconds. @@ -446,28 +403,29 @@ func (p *priceService) observeTokenPriceUpdates( return tokenPricesUSD, nil } -func (p *priceService) writeGasPricesToDB(ctx context.Context, sourceGasPriceUSD *big.Int) (err error) { +func (p *priceService) writeGasPricesToDB(ctx context.Context, sourceGasPriceUSD *big.Int) error { if sourceGasPriceUSD == nil { return nil } - return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, p.jobId, []cciporm.GasPriceUpdate{ + _, err := p.orm.UpsertGasPricesForDestChain(ctx, p.destChainSelector, []cciporm.GasPrice{ { SourceChainSelector: p.sourceChainSelector, GasPrice: assets.NewWei(sourceGasPriceUSD), }, }) + return err } -func (p *priceService) writeTokenPricesToDB(ctx context.Context, tokenPricesUSD map[cciptypes.Address]*big.Int) (err error) { +func (p *priceService) writeTokenPricesToDB(ctx context.Context, tokenPricesUSD map[cciptypes.Address]*big.Int) error { if tokenPricesUSD == nil { return nil } - var tokenPrices []cciporm.TokenPriceUpdate + var tokenPrices []cciporm.TokenPrice for token, price := range tokenPricesUSD { - tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{ + tokenPrices = append(tokenPrices, cciporm.TokenPrice{ TokenAddr: string(token), TokenPrice: assets.NewWei(price), }) @@ -478,7 +436,8 @@ func (p *priceService) writeTokenPricesToDB(ctx context.Context, tokenPricesUSD return tokenPrices[i].TokenAddr < tokenPrices[j].TokenAddr }) - return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, p.jobId, tokenPrices) + _, err := p.orm.UpsertTokenPricesForDestChain(ctx, p.destChainSelector, tokenPrices, p.tokenUpdateInterval) + return err } // Input price is USD per full token, with 18 decimal precision diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go index e741d82c73..a25c5d3c47 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go @@ -19,7 +19,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" cciporm "github.com/smartcontractkit/chainlink/v2/core/services/ccip" @@ -30,81 +29,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices" ) -func TestPriceService_priceCleanup(t *testing.T) { - lggr := logger.TestLogger(t) - jobId := int32(1) - destChainSelector := uint64(12345) - sourceChainSelector := uint64(67890) - - testCases := []struct { - name string - gasPriceError bool - tokenPriceError bool - expectedErr bool - }{ - { - name: "ORM called successfully", - gasPriceError: false, - tokenPriceError: false, - expectedErr: false, - }, - { - name: "gasPrice clear failed", - gasPriceError: true, - tokenPriceError: false, - expectedErr: true, - }, - { - name: "tokenPrice clear failed", - gasPriceError: false, - tokenPriceError: true, - expectedErr: true, - }, - { - name: "both ORM calls failed", - gasPriceError: true, - tokenPriceError: true, - expectedErr: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ctx := tests.Context(t) - - var gasPricesError error - var tokenPricesError error - if tc.gasPriceError { - gasPricesError = fmt.Errorf("gas prices error") - } - if tc.tokenPriceError { - tokenPricesError = fmt.Errorf("token prices error") - } - - mockOrm := ccipmocks.NewORM(t) - mockOrm.On("ClearGasPricesByDestChain", ctx, destChainSelector, int(priceExpireThreshold.Seconds())).Return(gasPricesError).Once() - mockOrm.On("ClearTokenPricesByDestChain", ctx, destChainSelector, int(priceExpireThreshold.Seconds())).Return(tokenPricesError).Once() - - priceService := NewPriceService( - lggr, - mockOrm, - jobId, - destChainSelector, - sourceChainSelector, - "", - nil, - nil, - ).(*priceService) - err := priceService.runCleanup(ctx) - if tc.expectedErr { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - }) - } -} - func TestPriceService_writeGasPrices(t *testing.T) { lggr := logger.TestLogger(t) jobId := int32(1) @@ -113,7 +37,7 @@ func TestPriceService_writeGasPrices(t *testing.T) { gasPrice := big.NewInt(1e18) - expectedGasPriceUpdate := []cciporm.GasPriceUpdate{ + expectedGasPriceUpdate := []cciporm.GasPrice{ { SourceChainSelector: sourceChainSelector, GasPrice: assets.NewWei(gasPrice), @@ -147,7 +71,7 @@ func TestPriceService_writeGasPrices(t *testing.T) { } mockOrm := ccipmocks.NewORM(t) - mockOrm.On("InsertGasPricesForDestChain", ctx, destChainSelector, jobId, expectedGasPriceUpdate).Return(gasPricesError).Once() + mockOrm.On("UpsertGasPricesForDestChain", ctx, destChainSelector, expectedGasPriceUpdate).Return(int64(0), gasPricesError).Once() priceService := NewPriceService( lggr, @@ -180,7 +104,7 @@ func TestPriceService_writeTokenPrices(t *testing.T) { "0x234": big.NewInt(3e18), } - expectedTokenPriceUpdate := []cciporm.TokenPriceUpdate{ + expectedTokenPriceUpdate := []cciporm.TokenPrice{ { TokenAddr: "0x123", TokenPrice: assets.NewWei(big.NewInt(2e18)), @@ -218,7 +142,8 @@ func TestPriceService_writeTokenPrices(t *testing.T) { } mockOrm := ccipmocks.NewORM(t) - mockOrm.On("InsertTokenPricesForDestChain", ctx, destChainSelector, jobId, expectedTokenPriceUpdate).Return(tokenPricesError).Once() + mockOrm.On("UpsertTokenPricesForDestChain", ctx, destChainSelector, expectedTokenPriceUpdate, tokenPriceUpdateInterval). + Return(int64(len(expectedTokenPriceUpdate)), tokenPricesError).Once() priceService := NewPriceService( lggr, @@ -802,7 +727,7 @@ func setupORM(t *testing.T) cciporm.ORM { t.Helper() db := pgtest.NewSqlxDB(t) - orm, err := cciporm.NewORM(db, logger.NullLogger) + orm, err := cciporm.NewORM(db, logger.TestLogger(t)) require.NoError(t, err) @@ -824,7 +749,7 @@ func checkResultLen(t *testing.T, priceService PriceService, destChainSelector u return nil } -func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { +func TestPriceService_priceWriteInBackground(t *testing.T) { lggr := logger.TestLogger(t) jobId := int32(1) destChainSelector := uint64(12345) @@ -896,16 +821,11 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { gasUpdateInterval := 2000 * time.Millisecond tokenUpdateInterval := 5000 * time.Millisecond - cleanupInterval := 3000 * time.Millisecond // run gas price task every 2 second priceService.gasUpdateInterval = gasUpdateInterval // run token price task every 5 second priceService.tokenUpdateInterval = tokenUpdateInterval - // run cleanup every 3 seconds - priceService.cleanupInterval = cleanupInterval - // expire all prices during every cleanup - priceService.priceExpireThreshold = time.Duration(0) // initially, db is empty assert.NoError(t, checkResultLen(t, priceService, destChainSelector, 0, 0)) @@ -918,24 +838,5 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { assert.NoError(t, err) assert.NoError(t, checkResultLen(t, priceService, destChainSelector, 1, len(laneTokens))) - // eventually prices will be cleaned - assert.Eventually(t, func() bool { - err := checkResultLen(t, priceService, destChainSelector, 0, 0) - return err == nil - }, testutils.WaitTimeout(t), testutils.TestInterval) - - // then prices will be updated again - assert.Eventually(t, func() bool { - err := checkResultLen(t, priceService, destChainSelector, 1, len(laneTokens)) - return err == nil - }, testutils.WaitTimeout(t), testutils.TestInterval) - assert.NoError(t, priceService.Close()) - assert.NoError(t, priceService.runCleanup(ctx)) - - // after stopping PriceService and runCleanup, no more updates are inserted - for i := 0; i < 5; i++ { - time.Sleep(time.Second) - assert.NoError(t, checkResultLen(t, priceService, destChainSelector, 0, 0)) - } }