Skip to content

Commit

Permalink
fixing price service
Browse files Browse the repository at this point in the history
  • Loading branch information
valerii-kabisov-cll committed Oct 2, 2024
1 parent 3240c0f commit dd0c1b4
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 254 deletions.
96 changes: 4 additions & 92 deletions core/services/ccip/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,11 @@ import (
type GasPrice struct {
SourceChainSelector uint64
GasPrice *assets.Wei
CreatedAt time.Time
}

type GasPriceUpdate struct {
SourceChainSelector uint64
GasPrice *assets.Wei
}

type TokenPrice struct {
TokenAddr string
TokenPrice *assets.Wei
CreatedAt time.Time
}

type TokenPriceUpdate struct {
TokenAddr string
TokenPrice *assets.Wei
}

type ORM interface {
Expand All @@ -39,12 +27,6 @@ type ORM interface {

UpsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPrice) (int64, error)
UpsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPrice, interval time.Duration) (int64, error)

InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []GasPriceUpdate) error
InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []TokenPriceUpdate) error

ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error
ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error
}

type orm struct {
Expand All @@ -68,11 +50,9 @@ func NewORM(ds sqlutil.DataSource, lggr logger.Logger) (ORM, error) {
func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error) {
var gasPrices []GasPrice
stmt := `
SELECT DISTINCT ON (source_chain_selector)
source_chain_selector, gas_price, created_at
SELECT source_chain_selector, gas_price
FROM ccip.observed_gas_prices
WHERE chain_selector = $1
ORDER BY source_chain_selector, created_at DESC;
WHERE chain_selector = $1;
`
err := o.ds.SelectContext(ctx, &gasPrices, stmt, destChainSelector)
if err != nil {
Expand All @@ -85,11 +65,9 @@ func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uin
func (o *orm) GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error) {
var tokenPrices []TokenPrice
stmt := `
SELECT DISTINCT ON (token_addr)
token_addr, token_price, created_at
SELECT token_addr, token_price
FROM ccip.observed_token_prices
WHERE chain_selector = $1
ORDER BY token_addr, created_at DESC;
WHERE chain_selector = $1;
`
err := o.ds.SelectContext(ctx, &tokenPrices, stmt, destChainSelector)
if err != nil {
Expand Down Expand Up @@ -231,69 +209,3 @@ func tokenAddrsToBytes(tokens map[string]*assets.Wei) [][]byte {
}
return addrs
}

func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []GasPriceUpdate) error {
if len(gasPrices) == 0 {
return nil
}

insertData := make([]map[string]interface{}, 0, len(gasPrices))
for _, price := range gasPrices {
insertData = append(insertData, map[string]interface{}{
"chain_selector": destChainSelector,
"job_id": jobId,
"source_chain_selector": price.SourceChainSelector,
"gas_price": price.GasPrice,
})
}

// using statement_timestamp() to make testing easier
stmt := `INSERT INTO ccip.observed_gas_prices (chain_selector, job_id, source_chain_selector, gas_price, created_at)
VALUES (:chain_selector, :job_id, :source_chain_selector, :gas_price, statement_timestamp());`
_, err := o.ds.NamedExecContext(ctx, stmt, insertData)
if err != nil {
err = fmt.Errorf("error inserting gas prices for job %d: %w", jobId, err)
}

return err
}

func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []TokenPriceUpdate) error {
if len(tokenPrices) == 0 {
return nil
}

insertData := make([]map[string]interface{}, 0, len(tokenPrices))
for _, price := range tokenPrices {
insertData = append(insertData, map[string]interface{}{
"chain_selector": destChainSelector,
"job_id": jobId,
"token_addr": price.TokenAddr,
"token_price": price.TokenPrice,
})
}

// using statement_timestamp() to make testing easier
stmt := `INSERT INTO ccip.observed_token_prices (chain_selector, job_id, token_addr, token_price, created_at)
VALUES (:chain_selector, :job_id, :token_addr, :token_price, statement_timestamp());`
_, err := o.ds.NamedExecContext(ctx, stmt, insertData)
if err != nil {
err = fmt.Errorf("error inserting token prices for job %d: %w", jobId, err)
}

return err
}

func (o *orm) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error {
stmt := `DELETE FROM ccip.observed_gas_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')`

_, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec)
return err
}

func (o *orm) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error {
stmt := `DELETE FROM ccip.observed_token_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')`

_, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec)
return err
}
71 changes: 15 additions & 56 deletions core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
cciporm "github.com/smartcontractkit/chainlink/v2/core/services/ccip"
Expand All @@ -24,6 +23,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/pricegetter"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

// PriceService manages DB access for gas and token price data.
Expand All @@ -49,23 +49,11 @@ const (
// Token prices are refreshed every 10 minutes, we only report prices for blue chip tokens, DS&A simulation show
// their prices are stable, 10-minute resolution is accurate enough.
tokenPriceUpdateInterval = 10 * time.Minute

// Prices should expire after 25 minutes in DB. Prices should be fresh in the Commit plugin.
// 25 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while
// surfacing price update outages quickly enough.
priceExpireThreshold = 25 * time.Minute

// Cleanups are called every 10 minutes. For a given job, on average we may expect 3 token prices and 1 gas price.
// 10 minutes should result in ~13 rows being cleaned up per job, it is not a heavy load on DB, so there is no need
// to run cleanup more frequently. We shouldn't clean up less frequently than `priceExpireThreshold`.
priceCleanupInterval = 10 * time.Minute
)

type priceService struct {
priceExpireThreshold time.Duration
cleanupInterval time.Duration
gasUpdateInterval time.Duration
tokenUpdateInterval time.Duration
gasUpdateInterval time.Duration
tokenUpdateInterval time.Duration

lggr logger.Logger
orm cciporm.ORM
Expand Down Expand Up @@ -100,10 +88,8 @@ func NewPriceService(
ctx, cancel := context.WithCancel(context.Background())

pw := &priceService{
priceExpireThreshold: priceExpireThreshold,
cleanupInterval: utils.WithJitter(priceCleanupInterval), // use WithJitter to avoid multiple services impacting DB at same time
gasUpdateInterval: utils.WithJitter(gasPriceUpdateInterval),
tokenUpdateInterval: utils.WithJitter(tokenPriceUpdateInterval),
gasUpdateInterval: gasPriceUpdateInterval,
tokenUpdateInterval: tokenPriceUpdateInterval,

lggr: lggr,
orm: orm,
Expand Down Expand Up @@ -142,25 +128,18 @@ func (p *priceService) Close() error {
}

func (p *priceService) run() {
cleanupTicker := time.NewTicker(p.cleanupInterval)
gasUpdateTicker := time.NewTicker(p.gasUpdateInterval)
tokenUpdateTicker := time.NewTicker(p.tokenUpdateInterval)
gasUpdateTicker := time.NewTicker(utils.WithJitter(p.gasUpdateInterval))
tokenUpdateTicker := time.NewTicker(utils.WithJitter(p.tokenUpdateInterval))

go func() {
defer p.wg.Done()
defer cleanupTicker.Stop()
defer gasUpdateTicker.Stop()
defer tokenUpdateTicker.Stop()

for {
select {
case <-p.backgroundCtx.Done():
return
case <-cleanupTicker.C:
err := p.runCleanup(p.backgroundCtx)
if err != nil {
p.lggr.Errorw("Error when cleaning up in-db prices in the background", "err", err)
}
case <-gasUpdateTicker.C:
err := p.runGasPriceUpdate(p.backgroundCtx)
if err != nil {
Expand Down Expand Up @@ -240,28 +219,6 @@ func (p *priceService) GetGasAndTokenPrices(ctx context.Context, destChainSelect
return gasPrices, tokenPrices, nil
}

func (p *priceService) runCleanup(ctx context.Context) error {
eg := new(errgroup.Group)

eg.Go(func() error {
err := p.orm.ClearGasPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds()))
if err != nil {
return fmt.Errorf("error clearing gas prices: %w", err)
}
return nil
})

eg.Go(func() error {
err := p.orm.ClearTokenPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds()))
if err != nil {
return fmt.Errorf("error clearing token prices: %w", err)
}
return nil
})

return eg.Wait()
}

func (p *priceService) runGasPriceUpdate(ctx context.Context) error {
// Protect against concurrent updates of `gasPriceEstimator` and `destPriceRegistryReader`
// Price updates happen infrequently - once every `gasPriceUpdateInterval` seconds.
Expand Down Expand Up @@ -446,28 +403,29 @@ func (p *priceService) observeTokenPriceUpdates(
return tokenPricesUSD, nil
}

func (p *priceService) writeGasPricesToDB(ctx context.Context, sourceGasPriceUSD *big.Int) (err error) {
func (p *priceService) writeGasPricesToDB(ctx context.Context, sourceGasPriceUSD *big.Int) error {
if sourceGasPriceUSD == nil {
return nil
}

return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, p.jobId, []cciporm.GasPriceUpdate{
_, err := p.orm.UpsertGasPricesForDestChain(ctx, p.destChainSelector, []cciporm.GasPrice{
{
SourceChainSelector: p.sourceChainSelector,
GasPrice: assets.NewWei(sourceGasPriceUSD),
},
})
return err
}

func (p *priceService) writeTokenPricesToDB(ctx context.Context, tokenPricesUSD map[cciptypes.Address]*big.Int) (err error) {
func (p *priceService) writeTokenPricesToDB(ctx context.Context, tokenPricesUSD map[cciptypes.Address]*big.Int) error {
if tokenPricesUSD == nil {
return nil
}

var tokenPrices []cciporm.TokenPriceUpdate
var tokenPrices []cciporm.TokenPrice

for token, price := range tokenPricesUSD {
tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{
tokenPrices = append(tokenPrices, cciporm.TokenPrice{
TokenAddr: string(token),
TokenPrice: assets.NewWei(price),
})
Expand All @@ -478,7 +436,8 @@ func (p *priceService) writeTokenPricesToDB(ctx context.Context, tokenPricesUSD
return tokenPrices[i].TokenAddr < tokenPrices[j].TokenAddr
})

return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, p.jobId, tokenPrices)
_, err := p.orm.UpsertTokenPricesForDestChain(ctx, p.destChainSelector, tokenPrices, p.tokenUpdateInterval)
return err
}

// Input price is USD per full token, with 18 decimal precision
Expand Down
Loading

0 comments on commit dd0c1b4

Please sign in to comment.