From c19cef424024bd1c2d8c362f70a14eadf5713000 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Thu, 8 Aug 2024 13:58:11 +0200 Subject: [PATCH 1/4] Simplify codebase and improve performance by switching to upsert --- core/services/ccip/mocks/orm.go | 138 +------ core/services/ccip/orm.go | 75 ++-- core/services/ccip/orm_test.go | 52 +-- .../ccip/internal/ccipdb/price_service.go | 267 +++++------- .../internal/ccipdb/price_service_test.go | 391 +++++------------- .../migrations/0249_ccip_token_prices_fix.sql | 49 +++ 6 files changed, 317 insertions(+), 655 deletions(-) create mode 100644 core/store/migrate/migrations/0249_ccip_token_prices_fix.sql diff --git a/core/services/ccip/mocks/orm.go b/core/services/ccip/mocks/orm.go index 8a987c2160..35ca2353bf 100644 --- a/core/services/ccip/mocks/orm.go +++ b/core/services/ccip/mocks/orm.go @@ -23,102 +23,6 @@ func (_m *ORM) EXPECT() *ORM_Expecter { return &ORM_Expecter{mock: &_m.Mock} } -// ClearGasPricesByDestChain provides a mock function with given fields: ctx, destChainSelector, expireSec -func (_m *ORM) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { - ret := _m.Called(ctx, destChainSelector, expireSec) - - if len(ret) == 0 { - panic("no return value specified for ClearGasPricesByDestChain") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, int) error); ok { - r0 = rf(ctx, destChainSelector, expireSec) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// ORM_ClearGasPricesByDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ClearGasPricesByDestChain' -type ORM_ClearGasPricesByDestChain_Call struct { - *mock.Call -} - -// ClearGasPricesByDestChain is a helper method to define mock.On call -// - ctx context.Context -// - destChainSelector uint64 -// - expireSec int -func (_e *ORM_Expecter) ClearGasPricesByDestChain(ctx interface{}, destChainSelector interface{}, expireSec interface{}) *ORM_ClearGasPricesByDestChain_Call { - return &ORM_ClearGasPricesByDestChain_Call{Call: _e.mock.On("ClearGasPricesByDestChain", ctx, destChainSelector, expireSec)} -} - -func (_c *ORM_ClearGasPricesByDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, expireSec int)) *ORM_ClearGasPricesByDestChain_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].(int)) - }) - return _c -} - -func (_c *ORM_ClearGasPricesByDestChain_Call) Return(_a0 error) *ORM_ClearGasPricesByDestChain_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *ORM_ClearGasPricesByDestChain_Call) RunAndReturn(run func(context.Context, uint64, int) error) *ORM_ClearGasPricesByDestChain_Call { - _c.Call.Return(run) - return _c -} - -// ClearTokenPricesByDestChain provides a mock function with given fields: ctx, destChainSelector, expireSec -func (_m *ORM) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { - ret := _m.Called(ctx, destChainSelector, expireSec) - - if len(ret) == 0 { - panic("no return value specified for ClearTokenPricesByDestChain") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, int) error); ok { - r0 = rf(ctx, destChainSelector, expireSec) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// ORM_ClearTokenPricesByDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ClearTokenPricesByDestChain' -type ORM_ClearTokenPricesByDestChain_Call struct { - *mock.Call -} - -// ClearTokenPricesByDestChain is a helper method to define mock.On call -// - ctx context.Context -// - destChainSelector uint64 -// - expireSec int -func (_e *ORM_Expecter) ClearTokenPricesByDestChain(ctx interface{}, destChainSelector interface{}, expireSec interface{}) *ORM_ClearTokenPricesByDestChain_Call { - return &ORM_ClearTokenPricesByDestChain_Call{Call: _e.mock.On("ClearTokenPricesByDestChain", ctx, destChainSelector, expireSec)} -} - -func (_c *ORM_ClearTokenPricesByDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, expireSec int)) *ORM_ClearTokenPricesByDestChain_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].(int)) - }) - return _c -} - -func (_c *ORM_ClearTokenPricesByDestChain_Call) Return(_a0 error) *ORM_ClearTokenPricesByDestChain_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *ORM_ClearTokenPricesByDestChain_Call) RunAndReturn(run func(context.Context, uint64, int) error) *ORM_ClearTokenPricesByDestChain_Call { - _c.Call.Return(run) - return _c -} - // GetGasPricesByDestChain provides a mock function with given fields: ctx, destChainSelector func (_m *ORM) GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]ccip.GasPrice, error) { ret := _m.Called(ctx, destChainSelector) @@ -237,17 +141,17 @@ func (_c *ORM_GetTokenPricesByDestChain_Call) RunAndReturn(run func(context.Cont return _c } -// InsertGasPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, jobId, gasPrices -func (_m *ORM) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []ccip.GasPriceUpdate) error { - ret := _m.Called(ctx, destChainSelector, jobId, gasPrices) +// InsertGasPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, gasPrices +func (_m *ORM) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []ccip.GasPriceUpdate) error { + ret := _m.Called(ctx, destChainSelector, gasPrices) if len(ret) == 0 { panic("no return value specified for InsertGasPricesForDestChain") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, int32, []ccip.GasPriceUpdate) error); ok { - r0 = rf(ctx, destChainSelector, jobId, gasPrices) + if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.GasPriceUpdate) error); ok { + r0 = rf(ctx, destChainSelector, gasPrices) } else { r0 = ret.Error(0) } @@ -263,15 +167,14 @@ type ORM_InsertGasPricesForDestChain_Call struct { // InsertGasPricesForDestChain is a helper method to define mock.On call // - ctx context.Context // - destChainSelector uint64 -// - jobId int32 // - gasPrices []ccip.GasPriceUpdate -func (_e *ORM_Expecter) InsertGasPricesForDestChain(ctx interface{}, destChainSelector interface{}, jobId interface{}, gasPrices interface{}) *ORM_InsertGasPricesForDestChain_Call { - return &ORM_InsertGasPricesForDestChain_Call{Call: _e.mock.On("InsertGasPricesForDestChain", ctx, destChainSelector, jobId, gasPrices)} +func (_e *ORM_Expecter) InsertGasPricesForDestChain(ctx interface{}, destChainSelector interface{}, gasPrices interface{}) *ORM_InsertGasPricesForDestChain_Call { + return &ORM_InsertGasPricesForDestChain_Call{Call: _e.mock.On("InsertGasPricesForDestChain", ctx, destChainSelector, gasPrices)} } -func (_c *ORM_InsertGasPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []ccip.GasPriceUpdate)) *ORM_InsertGasPricesForDestChain_Call { +func (_c *ORM_InsertGasPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, gasPrices []ccip.GasPriceUpdate)) *ORM_InsertGasPricesForDestChain_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].(int32), args[3].([]ccip.GasPriceUpdate)) + run(args[0].(context.Context), args[1].(uint64), args[2].([]ccip.GasPriceUpdate)) }) return _c } @@ -281,22 +184,22 @@ func (_c *ORM_InsertGasPricesForDestChain_Call) Return(_a0 error) *ORM_InsertGas return _c } -func (_c *ORM_InsertGasPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, int32, []ccip.GasPriceUpdate) error) *ORM_InsertGasPricesForDestChain_Call { +func (_c *ORM_InsertGasPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, []ccip.GasPriceUpdate) error) *ORM_InsertGasPricesForDestChain_Call { _c.Call.Return(run) return _c } -// InsertTokenPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, jobId, tokenPrices -func (_m *ORM) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []ccip.TokenPriceUpdate) error { - ret := _m.Called(ctx, destChainSelector, jobId, tokenPrices) +// InsertTokenPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, tokenPrices +func (_m *ORM) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []ccip.TokenPriceUpdate) error { + ret := _m.Called(ctx, destChainSelector, tokenPrices) if len(ret) == 0 { panic("no return value specified for InsertTokenPricesForDestChain") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, int32, []ccip.TokenPriceUpdate) error); ok { - r0 = rf(ctx, destChainSelector, jobId, tokenPrices) + if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.TokenPriceUpdate) error); ok { + r0 = rf(ctx, destChainSelector, tokenPrices) } else { r0 = ret.Error(0) } @@ -312,15 +215,14 @@ type ORM_InsertTokenPricesForDestChain_Call struct { // InsertTokenPricesForDestChain is a helper method to define mock.On call // - ctx context.Context // - destChainSelector uint64 -// - jobId int32 // - tokenPrices []ccip.TokenPriceUpdate -func (_e *ORM_Expecter) InsertTokenPricesForDestChain(ctx interface{}, destChainSelector interface{}, jobId interface{}, tokenPrices interface{}) *ORM_InsertTokenPricesForDestChain_Call { - return &ORM_InsertTokenPricesForDestChain_Call{Call: _e.mock.On("InsertTokenPricesForDestChain", ctx, destChainSelector, jobId, tokenPrices)} +func (_e *ORM_Expecter) InsertTokenPricesForDestChain(ctx interface{}, destChainSelector interface{}, tokenPrices interface{}) *ORM_InsertTokenPricesForDestChain_Call { + return &ORM_InsertTokenPricesForDestChain_Call{Call: _e.mock.On("InsertTokenPricesForDestChain", ctx, destChainSelector, tokenPrices)} } -func (_c *ORM_InsertTokenPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []ccip.TokenPriceUpdate)) *ORM_InsertTokenPricesForDestChain_Call { +func (_c *ORM_InsertTokenPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, tokenPrices []ccip.TokenPriceUpdate)) *ORM_InsertTokenPricesForDestChain_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].(int32), args[3].([]ccip.TokenPriceUpdate)) + run(args[0].(context.Context), args[1].(uint64), args[2].([]ccip.TokenPriceUpdate)) }) return _c } @@ -330,7 +232,7 @@ func (_c *ORM_InsertTokenPricesForDestChain_Call) Return(_a0 error) *ORM_InsertT return _c } -func (_c *ORM_InsertTokenPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, int32, []ccip.TokenPriceUpdate) error) *ORM_InsertTokenPricesForDestChain_Call { +func (_c *ORM_InsertTokenPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, []ccip.TokenPriceUpdate) error) *ORM_InsertTokenPricesForDestChain_Call { _c.Call.Return(run) return _c } diff --git a/core/services/ccip/orm.go b/core/services/ccip/orm.go index d074ea7473..b513b13cc5 100644 --- a/core/services/ccip/orm.go +++ b/core/services/ccip/orm.go @@ -36,11 +36,8 @@ type ORM interface { GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error) GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error) - InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []GasPriceUpdate) error - InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []TokenPriceUpdate) error - - ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error - ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error + InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPriceUpdate) error + InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPriceUpdate) error } type orm struct { @@ -62,8 +59,7 @@ func NewORM(ds sqlutil.DataSource) (ORM, error) { func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error) { var gasPrices []GasPrice stmt := ` - SELECT DISTINCT ON (source_chain_selector) - source_chain_selector, gas_price, created_at + SELECT source_chain_selector, gas_price, created_at FROM ccip.observed_gas_prices WHERE chain_selector = $1 ORDER BY source_chain_selector, created_at DESC; @@ -79,11 +75,10 @@ func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uin func (o *orm) GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error) { var tokenPrices []TokenPrice stmt := ` - SELECT DISTINCT ON (token_addr) - token_addr, token_price, created_at + SELECT token_addr, token_price, created_at FROM ccip.observed_token_prices WHERE chain_selector = $1 - ORDER BY token_addr, created_at DESC; + ORDER BY token_addr; ` err := o.ds.SelectContext(ctx, &tokenPrices, stmt, destChainSelector) if err != nil { @@ -93,68 +88,64 @@ func (o *orm) GetTokenPricesByDestChain(ctx context.Context, destChainSelector u return tokenPrices, nil } -func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []GasPriceUpdate) error { +func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPriceUpdate) error { if len(gasPrices) == 0 { return nil } - insertData := make([]map[string]interface{}, 0, len(gasPrices)) - for _, price := range gasPrices { + uniqueGasUpdates := make(map[string]GasPriceUpdate) + for _, gasPrice := range gasPrices { + key := fmt.Sprintf("%d-%d", gasPrice.SourceChainSelector, destChainSelector) + uniqueGasUpdates[key] = gasPrice + } + + insertData := make([]map[string]interface{}, 0, len(uniqueGasUpdates)) + for _, price := range uniqueGasUpdates { insertData = append(insertData, map[string]interface{}{ "chain_selector": destChainSelector, - "job_id": jobId, "source_chain_selector": price.SourceChainSelector, "gas_price": price.GasPrice, }) } - // using statement_timestamp() to make testing easier - stmt := `INSERT INTO ccip.observed_gas_prices (chain_selector, job_id, source_chain_selector, gas_price, created_at) - VALUES (:chain_selector, :job_id, :source_chain_selector, :gas_price, statement_timestamp());` + stmt := `INSERT INTO ccip.observed_gas_prices (chain_selector, source_chain_selector, gas_price, created_at) + VALUES (:chain_selector, :source_chain_selector, :gas_price, statement_timestamp()) + ON CONFLICT (chain_selector, source_chain_selector) + DO UPDATE SET gas_price = EXCLUDED.gas_price, created_at = EXCLUDED.created_at;` _, err := o.ds.NamedExecContext(ctx, stmt, insertData) if err != nil { - err = fmt.Errorf("error inserting gas prices for job %d: %w", jobId, err) + err = fmt.Errorf("error inserting gas prices %w", err) } - return err } -func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []TokenPriceUpdate) error { +func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPriceUpdate) error { if len(tokenPrices) == 0 { return nil } - insertData := make([]map[string]interface{}, 0, len(tokenPrices)) - for _, price := range tokenPrices { + uniqueTokenPrices := make(map[string]TokenPriceUpdate) + for _, tokenPrice := range tokenPrices { + key := fmt.Sprintf("%s-%d", tokenPrice.TokenAddr, destChainSelector) + uniqueTokenPrices[key] = tokenPrice + } + + insertData := make([]map[string]interface{}, 0, len(uniqueTokenPrices)) + for _, price := range uniqueTokenPrices { insertData = append(insertData, map[string]interface{}{ "chain_selector": destChainSelector, - "job_id": jobId, "token_addr": price.TokenAddr, "token_price": price.TokenPrice, }) } - // using statement_timestamp() to make testing easier - stmt := `INSERT INTO ccip.observed_token_prices (chain_selector, job_id, token_addr, token_price, created_at) - VALUES (:chain_selector, :job_id, :token_addr, :token_price, statement_timestamp());` + stmt := `INSERT INTO ccip.observed_token_prices (chain_selector, token_addr, token_price, created_at) + VALUES (:chain_selector, :token_addr, :token_price, statement_timestamp()) + ON CONFLICT (chain_selector, token_addr) + DO UPDATE SET token_price = EXCLUDED.token_price, created_at = EXCLUDED.created_at;` _, err := o.ds.NamedExecContext(ctx, stmt, insertData) if err != nil { - err = fmt.Errorf("error inserting token prices for job %d: %w", jobId, err) + err = fmt.Errorf("error inserting token prices %w", err) } - - return err -} - -func (o *orm) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { - stmt := `DELETE FROM ccip.observed_gas_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')` - - _, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec) - return err -} - -func (o *orm) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { - stmt := `DELETE FROM ccip.observed_token_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')` - - _, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec) return err } diff --git a/core/services/ccip/orm_test.go b/core/services/ccip/orm_test.go index 7b7b8d8271..f7b8a6d5a4 100644 --- a/core/services/ccip/orm_test.go +++ b/core/services/ccip/orm_test.go @@ -145,9 +145,9 @@ func TestORM_InsertAndGetGasPrices(t *testing.T) { for selector, updatesPerSelector := range updates { lastIndex := len(updatesPerSelector) - 1 - err := orm.InsertGasPricesForDestChain(ctx, destSelector, int32(i), updatesPerSelector[:lastIndex]) + err := orm.InsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector[:lastIndex]) assert.NoError(t, err) - err = orm.InsertGasPricesForDestChain(ctx, destSelector, int32(i), updatesPerSelector[lastIndex:]) + err = orm.InsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector[lastIndex:]) assert.NoError(t, err) expectedPrices[selector] = updatesPerSelector[lastIndex] @@ -156,7 +156,7 @@ func TestORM_InsertAndGetGasPrices(t *testing.T) { // verify number of rows inserted numRows := getGasTableRowCount(t, db) - assert.Equal(t, numJobs*numSourceChainSelectors*numUpdatesPerSourceSelector, numRows) + assert.Equal(t, numSourceChainSelectors, numRows) prices, err := orm.GetGasPricesByDestChain(ctx, destSelector) assert.NoError(t, err) @@ -176,9 +176,9 @@ func TestORM_InsertAndGetGasPrices(t *testing.T) { expectedPrices[selector] = updatesPerSelector[0] } - err = orm.InsertGasPricesForDestChain(ctx, destSelector, 1, combinedUpdates) + err = orm.InsertGasPricesForDestChain(ctx, destSelector, combinedUpdates) assert.NoError(t, err) - assert.Equal(t, numJobs*numSourceChainSelectors*numUpdatesPerSourceSelector+numSourceChainSelectors, getGasTableRowCount(t, db)) + assert.Equal(t, numSourceChainSelectors, getGasTableRowCount(t, db)) prices, err = orm.GetGasPricesByDestChain(ctx, destSelector) assert.NoError(t, err) @@ -208,7 +208,7 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) { } for _, updatesPerSelector := range updates { - err := orm.InsertGasPricesForDestChain(ctx, destSelector, 1, updatesPerSelector) + err := orm.InsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector) assert.NoError(t, err) } @@ -217,21 +217,11 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) { // insert for the 2nd time after interimTimeStamp for _, updatesPerSelector := range updates { - err := orm.InsertGasPricesForDestChain(ctx, destSelector, 1, updatesPerSelector) + err := orm.InsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector) assert.NoError(t, err) } - assert.Equal(t, 2*numSourceChainSelectors*numUpdatesPerSourceSelector, getGasTableRowCount(t, db)) - - // clear by sleepSec should delete rows inserted before it - err := orm.ClearGasPricesByDestChain(ctx, destSelector, sleepSec) - assert.NoError(t, err) - assert.Equal(t, numSourceChainSelectors*numUpdatesPerSourceSelector, getGasTableRowCount(t, db)) - - // clear by 0 expiration seconds should delete all rows - err = orm.ClearGasPricesByDestChain(ctx, destSelector, 0) - assert.NoError(t, err) - assert.Equal(t, 0, getGasTableRowCount(t, db)) + assert.Equal(t, numSourceChainSelectors, getGasTableRowCount(t, db)) } func TestORM_InsertAndGetTokenPrices(t *testing.T) { @@ -258,9 +248,9 @@ func TestORM_InsertAndGetTokenPrices(t *testing.T) { for addr, updatesPerAddr := range updates { lastIndex := len(updatesPerAddr) - 1 - err := orm.InsertTokenPricesForDestChain(ctx, destSelector, int32(i), updatesPerAddr[:lastIndex]) + err := orm.InsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr[:lastIndex]) assert.NoError(t, err) - err = orm.InsertTokenPricesForDestChain(ctx, destSelector, int32(i), updatesPerAddr[lastIndex:]) + err = orm.InsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr[lastIndex:]) assert.NoError(t, err) expectedPrices[addr] = updatesPerAddr[lastIndex] @@ -269,7 +259,7 @@ func TestORM_InsertAndGetTokenPrices(t *testing.T) { // verify number of rows inserted numRows := getTokenTableRowCount(t, db) - assert.Equal(t, numJobs*numAddresses*numUpdatesPerAddress, numRows) + assert.Equal(t, numAddresses, numRows) prices, err := orm.GetTokenPricesByDestChain(ctx, destSelector) assert.NoError(t, err) @@ -289,9 +279,9 @@ func TestORM_InsertAndGetTokenPrices(t *testing.T) { expectedPrices[addr] = updatesPerAddr[0] } - err = orm.InsertTokenPricesForDestChain(ctx, destSelector, 1, combinedUpdates) + err = orm.InsertTokenPricesForDestChain(ctx, destSelector, combinedUpdates) assert.NoError(t, err) - assert.Equal(t, numJobs*numAddresses*numUpdatesPerAddress+numAddresses, getTokenTableRowCount(t, db)) + assert.Equal(t, numAddresses, getTokenTableRowCount(t, db)) prices, err = orm.GetTokenPricesByDestChain(ctx, destSelector) assert.NoError(t, err) @@ -321,7 +311,7 @@ func TestORM_InsertAndDeleteTokenPrices(t *testing.T) { } for _, updatesPerAddr := range updates { - err := orm.InsertTokenPricesForDestChain(ctx, destSelector, 1, updatesPerAddr) + err := orm.InsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr) assert.NoError(t, err) } @@ -330,19 +320,9 @@ func TestORM_InsertAndDeleteTokenPrices(t *testing.T) { // insert for the 2nd time after interimTimeStamp for _, updatesPerAddr := range updates { - err := orm.InsertTokenPricesForDestChain(ctx, destSelector, 1, updatesPerAddr) + err := orm.InsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr) assert.NoError(t, err) } - assert.Equal(t, 2*numAddresses*numUpdatesPerAddress, getTokenTableRowCount(t, db)) - - // clear by sleepSec should delete rows inserted before it - err := orm.ClearTokenPricesByDestChain(ctx, destSelector, sleepSec) - assert.NoError(t, err) - assert.Equal(t, numAddresses*numUpdatesPerAddress, getTokenTableRowCount(t, db)) - - // clear by 0 expiration seconds should delete all rows - err = orm.ClearTokenPricesByDestChain(ctx, destSelector, 0) - assert.NoError(t, err) - assert.Equal(t, 0, getTokenTableRowCount(t, db)) + assert.Equal(t, numAddresses, getTokenTableRowCount(t, db)) } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go index 2118d5832d..23ea3dcbd0 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go @@ -42,28 +42,18 @@ type PriceService interface { var _ PriceService = (*priceService)(nil) const ( - // Gas prices are refreshed every 1 minute, they are sufficiently accurate, and consistent with Commit OCR round time. - gasPriceUpdateInterval = 1 * time.Minute - // Token prices are refreshed every 10 minutes, we only report prices for blue chip tokens, DS&A simulation show - // their prices are stable, 10-minute resolution is accurate enough. - tokenPriceUpdateInterval = 10 * time.Minute - - // Prices should expire after 25 minutes in DB. Prices should be fresh in the Commit plugin. - // 25 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while + // Prices should expire after 10 minutes in DB. Prices should be fresh in the Commit plugin. + // 10 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while // surfacing price update outages quickly enough. - priceExpireThreshold = 25 * time.Minute + priceExpireSec = 600 - // Cleanups are called every 10 minutes. For a given job, on average we may expect 3 token prices and 1 gas price. - // 10 minutes should result in ~13 rows being cleaned up per job, it is not a heavy load on DB, so there is no need - // to run cleanup more frequently. We shouldn't clean up less frequently than `priceExpireThreshold`. - priceCleanupInterval = 10 * time.Minute + // Prices are refreshed every 1 minute, they are sufficiently accurate, and consistent with Commit OCR round time. + priceUpdateInterval = 60 * time.Second ) type priceService struct { - priceExpireThreshold time.Duration - cleanupInterval time.Duration - gasUpdateInterval time.Duration - tokenUpdateInterval time.Duration + priceExpireSec int + updateInterval time.Duration lggr logger.Logger orm cciporm.ORM @@ -98,10 +88,8 @@ func NewPriceService( ctx, cancel := context.WithCancel(context.Background()) pw := &priceService{ - priceExpireThreshold: priceExpireThreshold, - cleanupInterval: utils.WithJitter(priceCleanupInterval), // use WithJitter to avoid multiple services impacting DB at same time - gasUpdateInterval: utils.WithJitter(gasPriceUpdateInterval), - tokenUpdateInterval: utils.WithJitter(tokenPriceUpdateInterval), + priceExpireSec: priceExpireSec, + updateInterval: utils.WithJitter(priceUpdateInterval), lggr: lggr, orm: orm, @@ -140,34 +128,19 @@ func (p *priceService) Close() error { } func (p *priceService) run() { - cleanupTicker := time.NewTicker(p.cleanupInterval) - gasUpdateTicker := time.NewTicker(p.gasUpdateInterval) - tokenUpdateTicker := time.NewTicker(p.tokenUpdateInterval) + updateTicker := time.NewTicker(p.updateInterval) go func() { defer p.wg.Done() - defer cleanupTicker.Stop() - defer gasUpdateTicker.Stop() - defer tokenUpdateTicker.Stop() for { select { case <-p.backgroundCtx.Done(): return - case <-cleanupTicker.C: - err := p.runCleanup(p.backgroundCtx) + case <-updateTicker.C: + err := p.runUpdate(p.backgroundCtx) if err != nil { - p.lggr.Errorw("Error when cleaning up in-db prices in the background", "err", err) - } - case <-gasUpdateTicker.C: - err := p.runGasPriceUpdate(p.backgroundCtx) - if err != nil { - p.lggr.Errorw("Error when updating gas prices in the background", "err", err) - } - case <-tokenUpdateTicker.C: - err := p.runTokenPriceUpdate(p.backgroundCtx) - if err != nil { - p.lggr.Errorw("Error when updating token prices in the background", "err", err) + p.lggr.Errorw("Error when updating prices in the background", "err", err) } } } @@ -182,11 +155,8 @@ func (p *priceService) UpdateDynamicConfig(ctx context.Context, gasPriceEstimato // Config update may substantially change the prices, refresh the prices immediately, this also makes testing easier // for not having to wait to the full update interval. - if err := p.runGasPriceUpdate(ctx); err != nil { - p.lggr.Errorw("Error when updating gas prices after dynamic config update", "err", err) - } - if err := p.runTokenPriceUpdate(ctx); err != nil { - p.lggr.Errorw("Error when updating token prices after dynamic config update", "err", err) + if err := p.runUpdate(ctx); err != nil { + p.lggr.Errorw("Error when updating prices after dynamic config update", "err", err) } return nil @@ -238,156 +208,84 @@ func (p *priceService) GetGasAndTokenPrices(ctx context.Context, destChainSelect return gasPrices, tokenPrices, nil } -func (p *priceService) runCleanup(ctx context.Context) error { - eg := new(errgroup.Group) - - eg.Go(func() error { - err := p.orm.ClearGasPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds())) - if err != nil { - return fmt.Errorf("error clearing gas prices: %w", err) - } - return nil - }) - - eg.Go(func() error { - err := p.orm.ClearTokenPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds())) - if err != nil { - return fmt.Errorf("error clearing token prices: %w", err) - } - return nil - }) - - return eg.Wait() -} - -func (p *priceService) runGasPriceUpdate(ctx context.Context) error { +func (p *priceService) runUpdate(ctx context.Context) error { // Protect against concurrent updates of `gasPriceEstimator` and `destPriceRegistryReader` - // Price updates happen infrequently - once every `gasPriceUpdateInterval` seconds. + // Price updates happen infrequently - once every `priceUpdateInterval` seconds. // It does not happen on any code path that is performance sensitive. // We can afford to have non-performant unlocks here that is simple and safe. p.dynamicConfigMu.RLock() defer p.dynamicConfigMu.RUnlock() // There may be a period of time between service is started and dynamic config is updated - if p.gasPriceEstimator == nil { - p.lggr.Info("Skipping gas price update due to gasPriceEstimator not ready") + if p.gasPriceEstimator == nil || p.destPriceRegistryReader == nil { + p.lggr.Info("Skipping price update due to gasPriceEstimator and/or destPriceRegistry not ready") return nil } - sourceGasPriceUSD, err := p.observeGasPriceUpdates(ctx, p.lggr) + sourceGasPriceUSD, tokenPricesUSD, err := p.observePriceUpdates(ctx, p.lggr) if err != nil { - return fmt.Errorf("failed to observe gas price updates: %w", err) + return fmt.Errorf("failed to observe price updates: %w", err) } - err = p.writeGasPricesToDB(ctx, sourceGasPriceUSD) + err = p.writePricesToDB(ctx, sourceGasPriceUSD, tokenPricesUSD) if err != nil { - return fmt.Errorf("failed to write gas prices to db: %w", err) + return fmt.Errorf("failed to write prices to db: %w", err) } return nil } -func (p *priceService) runTokenPriceUpdate(ctx context.Context) error { - // Protect against concurrent updates of `tokenPriceEstimator` and `destPriceRegistryReader` - // Price updates happen infrequently - once every `tokenPriceUpdateInterval` seconds. - p.dynamicConfigMu.RLock() - defer p.dynamicConfigMu.RUnlock() - - // There may be a period of time between service is started and dynamic config is updated - if p.destPriceRegistryReader == nil { - p.lggr.Info("Skipping token price update due to destPriceRegistry not ready") - return nil - } - - tokenPricesUSD, err := p.observeTokenPriceUpdates(ctx, p.lggr) - if err != nil { - return fmt.Errorf("failed to observe token price updates: %w", err) - } - - err = p.writeTokenPricesToDB(ctx, tokenPricesUSD) - if err != nil { - return fmt.Errorf("failed to write token prices to db: %w", err) - } - - return nil -} - -func (p *priceService) observeGasPriceUpdates( +func (p *priceService) observePriceUpdates( ctx context.Context, lggr logger.Logger, -) (sourceGasPriceUSD *big.Int, err error) { - if p.gasPriceEstimator == nil { - return nil, fmt.Errorf("gasPriceEstimator is not set yet") +) (sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) { + if p.gasPriceEstimator == nil || p.destPriceRegistryReader == nil { + return nil, nil, fmt.Errorf("gasPriceEstimator and/or destPriceRegistry is not set yet") } - // Include wrapped native to identify the source native USD price, notice USD is in 1e18 scale, i.e. $1 = 1e18 - rawTokenPricesUSD, err := p.priceGetter.TokenPricesUSD(ctx, []cciptypes.Address{p.sourceNative}) - if err != nil { - return nil, fmt.Errorf("failed to fetch source native price (%s): %w", p.sourceNative, err) - } + sortedLaneTokens, filteredLaneTokens, err := ccipcommon.GetFilteredSortedLaneTokens(ctx, p.offRampReader, p.destPriceRegistryReader, p.priceGetter) - sourceNativePriceUSD, exists := rawTokenPricesUSD[p.sourceNative] - if !exists { - return nil, fmt.Errorf("missing source native (%s) price", p.sourceNative) - } + lggr.Debugw("Filtered bridgeable tokens with no configured price getter", "filteredLaneTokens", filteredLaneTokens) - sourceGasPrice, err := p.gasPriceEstimator.GetGasPrice(ctx) if err != nil { - return nil, err - } - if sourceGasPrice == nil { - return nil, fmt.Errorf("missing gas price") - } - sourceGasPriceUSD, err = p.gasPriceEstimator.DenoteInUSD(sourceGasPrice, sourceNativePriceUSD) - if err != nil { - return nil, err + return nil, nil, fmt.Errorf("get destination tokens: %w", err) } - lggr.Infow("PriceService observed latest gas price", - "sourceChainSelector", p.sourceChainSelector, - "destChainSelector", p.destChainSelector, - "sourceNative", p.sourceNative, - "gasPriceWei", sourceGasPrice, - "sourceNativePriceUSD", sourceNativePriceUSD, - "sourceGasPriceUSD", sourceGasPriceUSD, - ) - return sourceGasPriceUSD, nil + return p.generatePriceUpdates(ctx, lggr, sortedLaneTokens) } // All prices are USD ($1=1e18) denominated. All prices must be not nil. // Return token prices should contain the exact same tokens as in tokenDecimals. -func (p *priceService) observeTokenPriceUpdates( +func (p *priceService) generatePriceUpdates( ctx context.Context, lggr logger.Logger, -) (tokenPricesUSD map[cciptypes.Address]*big.Int, err error) { - if p.destPriceRegistryReader == nil { - return nil, fmt.Errorf("destPriceRegistry is not set yet") - } - - sortedLaneTokens, filteredLaneTokens, err := ccipcommon.GetFilteredSortedLaneTokens(ctx, p.offRampReader, p.destPriceRegistryReader, p.priceGetter) - if err != nil { - return nil, fmt.Errorf("get destination tokens: %w", err) - } - - lggr.Debugw("Filtered bridgeable tokens with no configured price getter", "filteredLaneTokens", filteredLaneTokens) + sortedLaneTokens []cciptypes.Address, +) (sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) { + // Include wrapped native in our token query as way to identify the source native USD price. + // notice USD is in 1e18 scale, i.e. $1 = 1e18 + queryTokens := ccipcommon.FlattenUniqueSlice([]cciptypes.Address{p.sourceNative}, sortedLaneTokens) - queryTokens := ccipcommon.FlattenUniqueSlice(sortedLaneTokens) rawTokenPricesUSD, err := p.priceGetter.TokenPricesUSD(ctx, queryTokens) if err != nil { - return nil, fmt.Errorf("failed to fetch token prices (%v): %w", queryTokens, err) + return nil, nil, err } lggr.Infow("Raw token prices", "rawTokenPrices", rawTokenPricesUSD) // make sure that we got prices for all the tokens of our query for _, token := range queryTokens { if rawTokenPricesUSD[token] == nil { - return nil, fmt.Errorf("missing token price: %+v", token) + return nil, nil, fmt.Errorf("missing token price: %+v", token) } } + sourceNativePriceUSD, exists := rawTokenPricesUSD[p.sourceNative] + if !exists { + return nil, nil, fmt.Errorf("missing source native (%s) price", p.sourceNative) + } + destTokensDecimals, err := p.destPriceRegistryReader.GetTokensDecimals(ctx, sortedLaneTokens) if err != nil { - return nil, fmt.Errorf("get tokens decimals: %w", err) + return nil, nil, fmt.Errorf("get tokens decimals: %w", err) } tokenPricesUSD = make(map[cciptypes.Address]*big.Int, len(rawTokenPricesUSD)) @@ -395,47 +293,68 @@ func (p *priceService) observeTokenPriceUpdates( tokenPricesUSD[token] = calculateUsdPer1e18TokenAmount(rawTokenPricesUSD[token], destTokensDecimals[i]) } - lggr.Infow("PriceService observed latest token prices", + sourceGasPrice, err := p.gasPriceEstimator.GetGasPrice(ctx) + if err != nil { + return nil, nil, err + } + if sourceGasPrice == nil { + return nil, nil, fmt.Errorf("missing gas price") + } + sourceGasPriceUSD, err = p.gasPriceEstimator.DenoteInUSD(sourceGasPrice, sourceNativePriceUSD) + if err != nil { + return nil, nil, err + } + + lggr.Infow("PriceService observed latest price", "sourceChainSelector", p.sourceChainSelector, "destChainSelector", p.destChainSelector, + "gasPriceWei", sourceGasPrice, + "sourceNativePriceUSD", sourceNativePriceUSD, + "sourceGasPriceUSD", sourceGasPriceUSD, "tokenPricesUSD", tokenPricesUSD, ) - return tokenPricesUSD, nil + return sourceGasPriceUSD, tokenPricesUSD, nil } -func (p *priceService) writeGasPricesToDB(ctx context.Context, sourceGasPriceUSD *big.Int) (err error) { - if sourceGasPriceUSD == nil { - return nil +func (p *priceService) writePricesToDB( + ctx context.Context, + sourceGasPriceUSD *big.Int, + tokenPricesUSD map[cciptypes.Address]*big.Int, +) (err error) { + eg := new(errgroup.Group) + + if sourceGasPriceUSD != nil { + eg.Go(func() error { + return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, []cciporm.GasPriceUpdate{ + { + SourceChainSelector: p.sourceChainSelector, + GasPrice: assets.NewWei(sourceGasPriceUSD), + }, + }) + }) } - return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, p.jobId, []cciporm.GasPriceUpdate{ - { - SourceChainSelector: p.sourceChainSelector, - GasPrice: assets.NewWei(sourceGasPriceUSD), - }, - }) -} + if tokenPricesUSD != nil { + var tokenPrices []cciporm.TokenPriceUpdate -func (p *priceService) writeTokenPricesToDB(ctx context.Context, tokenPricesUSD map[cciptypes.Address]*big.Int) (err error) { - if tokenPricesUSD == nil { - return nil - } + for token, price := range tokenPricesUSD { + tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{ + TokenAddr: string(token), + TokenPrice: assets.NewWei(price), + }) + } - var tokenPrices []cciporm.TokenPriceUpdate + // Sort token by addr to make price updates ordering deterministic, easier to testing and debugging + sort.Slice(tokenPrices, func(i, j int) bool { + return tokenPrices[i].TokenAddr < tokenPrices[j].TokenAddr + }) - for token, price := range tokenPricesUSD { - tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{ - TokenAddr: string(token), - TokenPrice: assets.NewWei(price), + eg.Go(func() error { + return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, tokenPrices) }) } - // Sort token by addr to make price updates ordering deterministic, easier for testing and debugging - sort.Slice(tokenPrices, func(i, j int) bool { - return tokenPrices[i].TokenAddr < tokenPrices[j].TokenAddr - }) - - return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, p.jobId, tokenPrices) + return eg.Wait() } // Input price is USD per full token, with 18 decimal precision diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go index 26721bdf8e..872d933a94 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go @@ -30,12 +30,35 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices" ) -func TestPriceService_priceCleanup(t *testing.T) { +func TestPriceService_priceWrite(t *testing.T) { lggr := logger.TestLogger(t) jobId := int32(1) destChainSelector := uint64(12345) sourceChainSelector := uint64(67890) + gasPrice := big.NewInt(1e18) + tokenPrices := map[cciptypes.Address]*big.Int{ + "0x123": big.NewInt(2e18), + "0x234": big.NewInt(3e18), + } + + expectedGasPriceUpdate := []cciporm.GasPriceUpdate{ + { + SourceChainSelector: sourceChainSelector, + GasPrice: assets.NewWei(gasPrice), + }, + } + expectedTokenPriceUpdate := []cciporm.TokenPriceUpdate{ + { + TokenAddr: "0x123", + TokenPrice: assets.NewWei(big.NewInt(2e18)), + }, + { + TokenAddr: "0x234", + TokenPrice: assets.NewWei(big.NewInt(3e18)), + }, + } + testCases := []struct { name string gasPriceError bool @@ -81,73 +104,9 @@ func TestPriceService_priceCleanup(t *testing.T) { tokenPricesError = fmt.Errorf("token prices error") } - mockOrm := ccipmocks.NewORM(t) - mockOrm.On("ClearGasPricesByDestChain", ctx, destChainSelector, int(priceExpireThreshold.Seconds())).Return(gasPricesError).Once() - mockOrm.On("ClearTokenPricesByDestChain", ctx, destChainSelector, int(priceExpireThreshold.Seconds())).Return(tokenPricesError).Once() - - priceService := NewPriceService( - lggr, - mockOrm, - jobId, - destChainSelector, - sourceChainSelector, - "", - nil, - nil, - ).(*priceService) - err := priceService.runCleanup(ctx) - if tc.expectedErr { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - }) - } -} - -func TestPriceService_writeGasPrices(t *testing.T) { - lggr := logger.TestLogger(t) - jobId := int32(1) - destChainSelector := uint64(12345) - sourceChainSelector := uint64(67890) - - gasPrice := big.NewInt(1e18) - - expectedGasPriceUpdate := []cciporm.GasPriceUpdate{ - { - SourceChainSelector: sourceChainSelector, - GasPrice: assets.NewWei(gasPrice), - }, - } - - testCases := []struct { - name string - gasPriceError bool - expectedErr bool - }{ - { - name: "ORM called successfully", - gasPriceError: false, - expectedErr: false, - }, - { - name: "gasPrice clear failed", - gasPriceError: true, - expectedErr: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ctx := tests.Context(t) - - var gasPricesError error - if tc.gasPriceError { - gasPricesError = fmt.Errorf("gas prices error") - } - mockOrm := ccipmocks.NewORM(t) mockOrm.On("InsertGasPricesForDestChain", ctx, destChainSelector, jobId, expectedGasPriceUpdate).Return(gasPricesError).Once() + mockOrm.On("InsertTokenPricesForDestChain", ctx, destChainSelector, jobId, expectedTokenPriceUpdate).Return(tokenPricesError).Once() priceService := NewPriceService( lggr, @@ -159,7 +118,7 @@ func TestPriceService_writeGasPrices(t *testing.T) { nil, nil, ).(*priceService) - err := priceService.writeGasPricesToDB(ctx, gasPrice) + err := priceService.writePricesToDB(ctx, gasPrice, tokenPrices) if tc.expectedErr { assert.Error(t, err) } else { @@ -169,86 +128,22 @@ func TestPriceService_writeGasPrices(t *testing.T) { } } -func TestPriceService_writeTokenPrices(t *testing.T) { +func TestPriceService_generatePriceUpdates(t *testing.T) { lggr := logger.TestLogger(t) jobId := int32(1) destChainSelector := uint64(12345) sourceChainSelector := uint64(67890) - tokenPrices := map[cciptypes.Address]*big.Int{ - "0x123": big.NewInt(2e18), - "0x234": big.NewInt(3e18), - } - - expectedTokenPriceUpdate := []cciporm.TokenPriceUpdate{ - { - TokenAddr: "0x123", - TokenPrice: assets.NewWei(big.NewInt(2e18)), - }, - { - TokenAddr: "0x234", - TokenPrice: assets.NewWei(big.NewInt(3e18)), - }, - } - - testCases := []struct { - name string - tokenPriceError bool - expectedErr bool - }{ - { - name: "ORM called successfully", - tokenPriceError: false, - expectedErr: false, - }, - { - name: "tokenPrice clear failed", - tokenPriceError: true, - expectedErr: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ctx := tests.Context(t) - - var tokenPricesError error - if tc.tokenPriceError { - tokenPricesError = fmt.Errorf("token prices error") - } - - mockOrm := ccipmocks.NewORM(t) - mockOrm.On("InsertTokenPricesForDestChain", ctx, destChainSelector, jobId, expectedTokenPriceUpdate).Return(tokenPricesError).Once() - - priceService := NewPriceService( - lggr, - mockOrm, - jobId, - destChainSelector, - sourceChainSelector, - "", - nil, - nil, - ).(*priceService) - err := priceService.writeTokenPricesToDB(ctx, tokenPrices) - if tc.expectedErr { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - }) + const nTokens = 10 + tokens := make([]cciptypes.Address, nTokens) + for i := range tokens { + tokens[i] = cciptypes.Address(utils.RandomAddress().String()) } -} - -func TestPriceService_observeGasPriceUpdates(t *testing.T) { - lggr := logger.TestLogger(t) - jobId := int32(1) - destChainSelector := uint64(12345) - sourceChainSelector := uint64(67890) - sourceNativeToken := cciptypes.Address(utils.RandomAddress().String()) + sort.Slice(tokens, func(i, j int) bool { return tokens[i] < tokens[j] }) testCases := []struct { name string + tokenDecimals map[cciptypes.Address]uint8 sourceNativeToken cciptypes.Address priceGetterRespData map[cciptypes.Address]*big.Int priceGetterRespErr error @@ -256,179 +151,108 @@ func TestPriceService_observeGasPriceUpdates(t *testing.T) { feeEstimatorRespErr error maxGasPrice uint64 expSourceGasPriceUSD *big.Int + expTokenPricesUSD map[cciptypes.Address]*big.Int expErr bool }{ { - name: "base", - sourceNativeToken: sourceNativeToken, + name: "base", + tokenDecimals: map[cciptypes.Address]uint8{ + tokens[0]: 18, + tokens[1]: 12, + }, + sourceNativeToken: tokens[0], priceGetterRespData: map[cciptypes.Address]*big.Int{ - sourceNativeToken: val1e18(100), + tokens[0]: val1e18(100), + tokens[1]: val1e18(200), + tokens[2]: val1e18(300), // price getter returned a price for this token even though we didn't request it (should be skipped) }, priceGetterRespErr: nil, feeEstimatorRespFee: big.NewInt(10), feeEstimatorRespErr: nil, maxGasPrice: 1e18, expSourceGasPriceUSD: big.NewInt(1000), - expErr: false, + expTokenPricesUSD: map[cciptypes.Address]*big.Int{ + tokens[0]: val1e18(100), + tokens[1]: val1e18(200 * 1e6), + }, + expErr: false, }, { - name: "price getter returned an error", - sourceNativeToken: sourceNativeToken, + name: "price getter returned an error", + tokenDecimals: map[cciptypes.Address]uint8{ + tokens[0]: 18, + tokens[1]: 18, + }, + sourceNativeToken: tokens[0], priceGetterRespData: nil, priceGetterRespErr: fmt.Errorf("some random network error"), expErr: true, }, { - name: "price getter did not return source native gas price", - sourceNativeToken: sourceNativeToken, + name: "price getter skipped a requested price", + tokenDecimals: map[cciptypes.Address]uint8{ + tokens[0]: 18, + tokens[1]: 18, + }, + sourceNativeToken: tokens[0], priceGetterRespData: map[cciptypes.Address]*big.Int{ - "0x1": val1e18(100), + tokens[0]: val1e18(100), }, priceGetterRespErr: nil, expErr: true, }, { - name: "dynamic fee cap overrides legacy", - sourceNativeToken: sourceNativeToken, - priceGetterRespData: map[cciptypes.Address]*big.Int{ - sourceNativeToken: val1e18(100), + name: "price getter skipped source native price", + tokenDecimals: map[cciptypes.Address]uint8{ + tokens[0]: 18, + tokens[1]: 18, }, - priceGetterRespErr: nil, - feeEstimatorRespFee: big.NewInt(20), - feeEstimatorRespErr: nil, - maxGasPrice: 1e18, - expSourceGasPriceUSD: big.NewInt(2000), - expErr: false, - }, - { - name: "nil gas price", - sourceNativeToken: sourceNativeToken, + sourceNativeToken: tokens[2], priceGetterRespData: map[cciptypes.Address]*big.Int{ - sourceNativeToken: val1e18(100), + tokens[0]: val1e18(100), + tokens[1]: val1e18(200), }, - feeEstimatorRespFee: nil, - maxGasPrice: 1e18, - expErr: true, + priceGetterRespErr: nil, + expErr: true, }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - priceGetter := pricegetter.NewMockPriceGetter(t) - defer priceGetter.AssertExpectations(t) - - gasPriceEstimator := prices.NewMockGasPriceEstimatorCommit(t) - defer gasPriceEstimator.AssertExpectations(t) - - priceGetter.On("TokenPricesUSD", mock.Anything, []cciptypes.Address{tc.sourceNativeToken}).Return(tc.priceGetterRespData, tc.priceGetterRespErr) - - if tc.maxGasPrice > 0 { - gasPriceEstimator.On("GetGasPrice", mock.Anything).Return(tc.feeEstimatorRespFee, tc.feeEstimatorRespErr) - if tc.feeEstimatorRespFee != nil { - pUSD := ccipcalc.CalculateUsdPerUnitGas(tc.feeEstimatorRespFee, tc.priceGetterRespData[tc.sourceNativeToken]) - gasPriceEstimator.On("DenoteInUSD", mock.Anything, mock.Anything).Return(pUSD, nil) - } - } - - priceService := NewPriceService( - lggr, - nil, - jobId, - destChainSelector, - sourceChainSelector, - tc.sourceNativeToken, - priceGetter, - nil, - ).(*priceService) - priceService.gasPriceEstimator = gasPriceEstimator - - sourceGasPriceUSD, err := priceService.observeGasPriceUpdates(context.Background(), lggr) - if tc.expErr { - assert.Error(t, err) - return - } - assert.NoError(t, err) - assert.True(t, tc.expSourceGasPriceUSD.Cmp(sourceGasPriceUSD) == 0) - }) - } -} - -func TestPriceService_observeTokenPriceUpdates(t *testing.T) { - lggr := logger.TestLogger(t) - jobId := int32(1) - destChainSelector := uint64(12345) - sourceChainSelector := uint64(67890) - - const nTokens = 10 - tokens := make([]cciptypes.Address, nTokens) - for i := range tokens { - tokens[i] = cciptypes.Address(utils.RandomAddress().String()) - } - sort.Slice(tokens, func(i, j int) bool { return tokens[i] < tokens[j] }) - - testCases := []struct { - name string - tokenDecimals map[cciptypes.Address]uint8 - filterOutTokens []cciptypes.Address - priceGetterRespData map[cciptypes.Address]*big.Int - priceGetterRespErr error - expTokenPricesUSD map[cciptypes.Address]*big.Int - expErr bool - }{ { - name: "base", + name: "dynamic fee cap overrides legacy", tokenDecimals: map[cciptypes.Address]uint8{ tokens[0]: 18, - tokens[1]: 12, + tokens[1]: 18, }, - filterOutTokens: []cciptypes.Address{tokens[2]}, + sourceNativeToken: tokens[0], priceGetterRespData: map[cciptypes.Address]*big.Int{ tokens[0]: val1e18(100), tokens[1]: val1e18(200), tokens[2]: val1e18(300), // price getter returned a price for this token even though we didn't request it (should be skipped) }, - priceGetterRespErr: nil, + priceGetterRespErr: nil, + feeEstimatorRespFee: big.NewInt(20), + feeEstimatorRespErr: nil, + maxGasPrice: 1e18, + expSourceGasPriceUSD: big.NewInt(2000), expTokenPricesUSD: map[cciptypes.Address]*big.Int{ tokens[0]: val1e18(100), - tokens[1]: val1e18(200 * 1e6), + tokens[1]: val1e18(200), }, expErr: false, }, { - name: "price getter returned an error", - tokenDecimals: map[cciptypes.Address]uint8{ - tokens[0]: 18, - tokens[1]: 18, - }, - priceGetterRespData: nil, - priceGetterRespErr: fmt.Errorf("some random network error"), - expErr: true, - }, - { - name: "price getter skipped a requested price", + name: "nil gas price", tokenDecimals: map[cciptypes.Address]uint8{ tokens[0]: 18, tokens[1]: 18, }, + sourceNativeToken: tokens[0], priceGetterRespData: map[cciptypes.Address]*big.Int{ tokens[0]: val1e18(100), - }, - priceGetterRespErr: nil, - expErr: true, - }, - { - name: "nil token price", - tokenDecimals: map[cciptypes.Address]uint8{ - tokens[0]: 18, - tokens[1]: 18, - }, - filterOutTokens: []cciptypes.Address{tokens[2]}, - priceGetterRespData: map[cciptypes.Address]*big.Int{ - tokens[0]: nil, tokens[1]: val1e18(200), - tokens[2]: val1e18(300), + tokens[2]: val1e18(300), // price getter returned a price for this token even though we didn't request it (should be skipped) }, - expErr: true, + feeEstimatorRespFee: nil, + maxGasPrice: 1e18, + expErr: true, }, } @@ -437,6 +261,9 @@ func TestPriceService_observeTokenPriceUpdates(t *testing.T) { priceGetter := pricegetter.NewMockPriceGetter(t) defer priceGetter.AssertExpectations(t) + gasPriceEstimator := prices.NewMockGasPriceEstimatorCommit(t) + defer gasPriceEstimator.AssertExpectations(t) + var destTokens []cciptypes.Address for tk := range tc.tokenDecimals { destTokens = append(destTokens, tk) @@ -449,21 +276,22 @@ func TestPriceService_observeTokenPriceUpdates(t *testing.T) { destDecimals = append(destDecimals, tc.tokenDecimals[token]) } - queryTokens := ccipcommon.FlattenUniqueSlice(destTokens) + queryTokens := ccipcommon.FlattenUniqueSlice([]cciptypes.Address{tc.sourceNativeToken}, destTokens) if len(queryTokens) > 0 { priceGetter.On("TokenPricesUSD", mock.Anything, queryTokens).Return(tc.priceGetterRespData, tc.priceGetterRespErr) - priceGetter.On("FilterConfiguredTokens", mock.Anything, mock.Anything).Return(destTokens, tc.filterOutTokens, nil) } - offRampReader := ccipdatamocks.NewOffRampReader(t) - offRampReader.On("GetTokens", mock.Anything).Return(cciptypes.OffRampTokens{ - DestinationTokens: destTokens, - }, nil).Maybe() + if tc.maxGasPrice > 0 { + gasPriceEstimator.On("GetGasPrice", mock.Anything).Return(tc.feeEstimatorRespFee, tc.feeEstimatorRespErr) + if tc.feeEstimatorRespFee != nil { + pUSD := ccipcalc.CalculateUsdPerUnitGas(tc.feeEstimatorRespFee, tc.expTokenPricesUSD[tc.sourceNativeToken]) + gasPriceEstimator.On("DenoteInUSD", mock.Anything, mock.Anything).Return(pUSD, nil) + } + } destPriceReg := ccipdatamocks.NewPriceRegistryReader(t) destPriceReg.On("GetTokensDecimals", mock.Anything, destTokens).Return(destDecimals, nil).Maybe() - destPriceReg.On("GetFeeTokens", mock.Anything).Return([]cciptypes.Address{destTokens[0]}, nil).Maybe() priceService := NewPriceService( lggr, @@ -471,18 +299,20 @@ func TestPriceService_observeTokenPriceUpdates(t *testing.T) { jobId, destChainSelector, sourceChainSelector, - "0x123", + tc.sourceNativeToken, priceGetter, - offRampReader, + nil, ).(*priceService) + priceService.gasPriceEstimator = gasPriceEstimator priceService.destPriceRegistryReader = destPriceReg - tokenPricesUSD, err := priceService.observeTokenPriceUpdates(context.Background(), lggr) + sourceGasPriceUSD, tokenPricesUSD, err := priceService.generatePriceUpdates(context.Background(), lggr, destTokens) if tc.expErr { assert.Error(t, err) return } assert.NoError(t, err) + assert.True(t, tc.expSourceGasPriceUSD.Cmp(sourceGasPriceUSD) == 0) assert.True(t, reflect.DeepEqual(tc.expTokenPricesUSD, tokenPricesUSD)) }) } @@ -775,10 +605,8 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { gasPriceEstimator := prices.NewMockGasPriceEstimatorCommit(t) defer gasPriceEstimator.AssertExpectations(t) - priceGetter.On("TokenPricesUSD", mock.Anything, tokens[:1]).Return(map[cciptypes.Address]*big.Int{ + priceGetter.On("TokenPricesUSD", mock.Anything, tokens).Return(map[cciptypes.Address]*big.Int{ tokens[0]: val1e18(tokenPrices[0]), - }, nil) - priceGetter.On("TokenPricesUSD", mock.Anything, tokens[1:]).Return(map[cciptypes.Address]*big.Int{ tokens[1]: val1e18(tokenPrices[1]), tokens[2]: val1e18(tokenPrices[2]), }, nil) @@ -808,18 +636,12 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { offRampReader, ).(*priceService) - gasUpdateInterval := 2000 * time.Millisecond - tokenUpdateInterval := 5000 * time.Millisecond - cleanupInterval := 3000 * time.Millisecond + updateInterval := 2000 * time.Millisecond - // run gas price task every 2 second - priceService.gasUpdateInterval = gasUpdateInterval - // run token price task every 5 second - priceService.tokenUpdateInterval = tokenUpdateInterval - // run cleanup every 3 seconds - priceService.cleanupInterval = cleanupInterval + // run write task every 2 second + priceService.updateInterval = updateInterval // expire all prices during every cleanup - priceService.priceExpireThreshold = time.Duration(0) + priceService.priceExpireSec = 0 // initially, db is empty assert.NoError(t, checkResultLen(t, priceService, destChainSelector, 0, 0)) @@ -845,7 +667,6 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { }, testutils.WaitTimeout(t), testutils.TestInterval) assert.NoError(t, priceService.Close()) - assert.NoError(t, priceService.runCleanup(ctx)) // after stopping PriceService and runCleanup, no more updates are inserted for i := 0; i < 5; i++ { diff --git a/core/store/migrate/migrations/0249_ccip_token_prices_fix.sql b/core/store/migrate/migrations/0249_ccip_token_prices_fix.sql new file mode 100644 index 0000000000..ebadf54468 --- /dev/null +++ b/core/store/migrate/migrations/0249_ccip_token_prices_fix.sql @@ -0,0 +1,49 @@ +-- +goose Up + +-- We need to re-create tables from scratch because of the unique constraint on tokens and chains selectors +DROP TABLE ccip.observed_token_prices; +DROP TABLE ccip.observed_gas_prices; + +CREATE TABLE ccip.observed_token_prices +( + chain_selector NUMERIC(20, 0) NOT NULL, + token_addr BYTEA NOT NULL, + token_price NUMERIC(78, 0) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT unique_token_chain_selector UNIQUE (token_addr, chain_selector) +); + +CREATE TABLE ccip.observed_gas_prices +( + chain_selector NUMERIC(20, 0) NOT NULL, + source_chain_selector NUMERIC(20, 0) NOT NULL, + gas_price NUMERIC(78, 0) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT unique_source_dest_chain UNIQUE (source_chain_selector, chain_selector) +); + +-- +goose Down +DROP TABLE ccip.observed_token_prices; +DROP TABLE ccip.observed_gas_prices; + +-- Restore state from migration 0236_ccip_prices_cache.sql +CREATE TABLE ccip.observed_gas_prices +( + chain_selector NUMERIC(20, 0) NOT NULL, + job_id INTEGER NOT NULL, + source_chain_selector NUMERIC(20, 0) NOT NULL, + gas_price NUMERIC(78, 0) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE ccip.observed_token_prices +( + chain_selector NUMERIC(20, 0) NOT NULL, + job_id INTEGER NOT NULL, + token_addr BYTEA NOT NULL, + token_price NUMERIC(78, 0) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_ccip_gas_prices_chain_gas_price_timestamp ON ccip.observed_gas_prices (chain_selector, source_chain_selector, created_at DESC); +CREATE INDEX idx_ccip_token_prices_token_price_timestamp ON ccip.observed_token_prices (chain_selector, token_addr, created_at DESC); From 94a13644232e10bd1f473c1c245db739d9201354 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Thu, 8 Aug 2024 14:06:20 +0200 Subject: [PATCH 2/4] Added detailed tests for observability layer Fixing tests and adding missing changeset --- .changeset/thick-mails-applaud.md | 5 + core/services/ccip/mocks/orm.go | 99 +++++++----- core/services/ccip/observability.go | 115 ++++++++++++++ core/services/ccip/observability_test.go | 94 +++++++++++ core/services/ccip/orm.go | 142 ++++++++++++----- core/services/ccip/orm_test.go | 148 ++++++++++++------ .../plugins/ccip/ccipcommit/initializers.go | 2 +- .../ccip/internal/ccipdb/price_service.go | 23 ++- .../internal/ccipdb/price_service_test.go | 35 +---- 9 files changed, 497 insertions(+), 166 deletions(-) create mode 100644 .changeset/thick-mails-applaud.md create mode 100644 core/services/ccip/observability.go create mode 100644 core/services/ccip/observability_test.go diff --git a/.changeset/thick-mails-applaud.md b/.changeset/thick-mails-applaud.md new file mode 100644 index 0000000000..ceec9e64fd --- /dev/null +++ b/.changeset/thick-mails-applaud.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Simplify how token and gas prices are stored in the database - user upsert instead of insert/delete flow #db_update diff --git a/core/services/ccip/mocks/orm.go b/core/services/ccip/mocks/orm.go index 35ca2353bf..0c9086def7 100644 --- a/core/services/ccip/mocks/orm.go +++ b/core/services/ccip/mocks/orm.go @@ -8,6 +8,8 @@ import ( ccip "github.com/smartcontractkit/chainlink/v2/core/services/ccip" mock "github.com/stretchr/testify/mock" + + time "time" ) // ORM is an autogenerated mock type for the ORM type @@ -141,98 +143,119 @@ func (_c *ORM_GetTokenPricesByDestChain_Call) RunAndReturn(run func(context.Cont return _c } -// InsertGasPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, gasPrices -func (_m *ORM) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []ccip.GasPriceUpdate) error { +// UpsertGasPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, gasPrices +func (_m *ORM) UpsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []ccip.GasPrice) (int64, error) { ret := _m.Called(ctx, destChainSelector, gasPrices) if len(ret) == 0 { - panic("no return value specified for InsertGasPricesForDestChain") + panic("no return value specified for UpsertGasPricesForDestChain") } - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.GasPriceUpdate) error); ok { + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.GasPrice) (int64, error)); ok { + return rf(ctx, destChainSelector, gasPrices) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.GasPrice) int64); ok { r0 = rf(ctx, destChainSelector, gasPrices) } else { - r0 = ret.Error(0) + r0 = ret.Get(0).(int64) } - return r0 + if rf, ok := ret.Get(1).(func(context.Context, uint64, []ccip.GasPrice) error); ok { + r1 = rf(ctx, destChainSelector, gasPrices) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } -// ORM_InsertGasPricesForDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InsertGasPricesForDestChain' -type ORM_InsertGasPricesForDestChain_Call struct { +// ORM_UpsertGasPricesForDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertGasPricesForDestChain' +type ORM_UpsertGasPricesForDestChain_Call struct { *mock.Call } -// InsertGasPricesForDestChain is a helper method to define mock.On call +// UpsertGasPricesForDestChain is a helper method to define mock.On call // - ctx context.Context // - destChainSelector uint64 -// - gasPrices []ccip.GasPriceUpdate -func (_e *ORM_Expecter) InsertGasPricesForDestChain(ctx interface{}, destChainSelector interface{}, gasPrices interface{}) *ORM_InsertGasPricesForDestChain_Call { - return &ORM_InsertGasPricesForDestChain_Call{Call: _e.mock.On("InsertGasPricesForDestChain", ctx, destChainSelector, gasPrices)} +// - gasPrices []ccip.GasPrice +func (_e *ORM_Expecter) UpsertGasPricesForDestChain(ctx interface{}, destChainSelector interface{}, gasPrices interface{}) *ORM_UpsertGasPricesForDestChain_Call { + return &ORM_UpsertGasPricesForDestChain_Call{Call: _e.mock.On("UpsertGasPricesForDestChain", ctx, destChainSelector, gasPrices)} } -func (_c *ORM_InsertGasPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, gasPrices []ccip.GasPriceUpdate)) *ORM_InsertGasPricesForDestChain_Call { +func (_c *ORM_UpsertGasPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, gasPrices []ccip.GasPrice)) *ORM_UpsertGasPricesForDestChain_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].([]ccip.GasPriceUpdate)) + run(args[0].(context.Context), args[1].(uint64), args[2].([]ccip.GasPrice)) }) return _c } -func (_c *ORM_InsertGasPricesForDestChain_Call) Return(_a0 error) *ORM_InsertGasPricesForDestChain_Call { - _c.Call.Return(_a0) +func (_c *ORM_UpsertGasPricesForDestChain_Call) Return(_a0 int64, _a1 error) *ORM_UpsertGasPricesForDestChain_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *ORM_InsertGasPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, []ccip.GasPriceUpdate) error) *ORM_InsertGasPricesForDestChain_Call { +func (_c *ORM_UpsertGasPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, []ccip.GasPrice) (int64, error)) *ORM_UpsertGasPricesForDestChain_Call { _c.Call.Return(run) return _c } -// InsertTokenPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, tokenPrices -func (_m *ORM) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []ccip.TokenPriceUpdate) error { - ret := _m.Called(ctx, destChainSelector, tokenPrices) +// UpsertTokenPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, tokenPrices, interval +func (_m *ORM) UpsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []ccip.TokenPrice, interval time.Duration) (int64, error) { + ret := _m.Called(ctx, destChainSelector, tokenPrices, interval) if len(ret) == 0 { - panic("no return value specified for InsertTokenPricesForDestChain") + panic("no return value specified for UpsertTokenPricesForDestChain") } - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.TokenPriceUpdate) error); ok { - r0 = rf(ctx, destChainSelector, tokenPrices) + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.TokenPrice, time.Duration) (int64, error)); ok { + return rf(ctx, destChainSelector, tokenPrices, interval) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.TokenPrice, time.Duration) int64); ok { + r0 = rf(ctx, destChainSelector, tokenPrices, interval) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64, []ccip.TokenPrice, time.Duration) error); ok { + r1 = rf(ctx, destChainSelector, tokenPrices, interval) } else { - r0 = ret.Error(0) + r1 = ret.Error(1) } - return r0 + return r0, r1 } -// ORM_InsertTokenPricesForDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InsertTokenPricesForDestChain' -type ORM_InsertTokenPricesForDestChain_Call struct { +// ORM_UpsertTokenPricesForDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertTokenPricesForDestChain' +type ORM_UpsertTokenPricesForDestChain_Call struct { *mock.Call } -// InsertTokenPricesForDestChain is a helper method to define mock.On call +// UpsertTokenPricesForDestChain is a helper method to define mock.On call // - ctx context.Context // - destChainSelector uint64 -// - tokenPrices []ccip.TokenPriceUpdate -func (_e *ORM_Expecter) InsertTokenPricesForDestChain(ctx interface{}, destChainSelector interface{}, tokenPrices interface{}) *ORM_InsertTokenPricesForDestChain_Call { - return &ORM_InsertTokenPricesForDestChain_Call{Call: _e.mock.On("InsertTokenPricesForDestChain", ctx, destChainSelector, tokenPrices)} +// - tokenPrices []ccip.TokenPrice +// - interval time.Duration +func (_e *ORM_Expecter) UpsertTokenPricesForDestChain(ctx interface{}, destChainSelector interface{}, tokenPrices interface{}, interval interface{}) *ORM_UpsertTokenPricesForDestChain_Call { + return &ORM_UpsertTokenPricesForDestChain_Call{Call: _e.mock.On("UpsertTokenPricesForDestChain", ctx, destChainSelector, tokenPrices, interval)} } -func (_c *ORM_InsertTokenPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, tokenPrices []ccip.TokenPriceUpdate)) *ORM_InsertTokenPricesForDestChain_Call { +func (_c *ORM_UpsertTokenPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, tokenPrices []ccip.TokenPrice, interval time.Duration)) *ORM_UpsertTokenPricesForDestChain_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].([]ccip.TokenPriceUpdate)) + run(args[0].(context.Context), args[1].(uint64), args[2].([]ccip.TokenPrice), args[3].(time.Duration)) }) return _c } -func (_c *ORM_InsertTokenPricesForDestChain_Call) Return(_a0 error) *ORM_InsertTokenPricesForDestChain_Call { - _c.Call.Return(_a0) +func (_c *ORM_UpsertTokenPricesForDestChain_Call) Return(_a0 int64, _a1 error) *ORM_UpsertTokenPricesForDestChain_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *ORM_InsertTokenPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, []ccip.TokenPriceUpdate) error) *ORM_InsertTokenPricesForDestChain_Call { +func (_c *ORM_UpsertTokenPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, []ccip.TokenPrice, time.Duration) (int64, error)) *ORM_UpsertTokenPricesForDestChain_Call { _c.Call.Return(run) return _c } diff --git a/core/services/ccip/observability.go b/core/services/ccip/observability.go new file mode 100644 index 0000000000..8a061893ce --- /dev/null +++ b/core/services/ccip/observability.go @@ -0,0 +1,115 @@ +package ccip + +import ( + "context" + "strconv" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +var ( + sqlLatencyBuckets = []float64{ + float64(10 * time.Millisecond), + float64(20 * time.Millisecond), + float64(30 * time.Millisecond), + float64(40 * time.Millisecond), + float64(50 * time.Millisecond), + float64(70 * time.Millisecond), + float64(90 * time.Millisecond), + float64(100 * time.Millisecond), + float64(200 * time.Millisecond), + float64(300 * time.Millisecond), + float64(400 * time.Millisecond), + float64(500 * time.Millisecond), + float64(750 * time.Millisecond), + float64(1 * time.Second), + } + ccipQueryDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "ccip_orm_query_duration", + Buckets: sqlLatencyBuckets, + }, []string{"query", "destChainSelector"}) + ccipQueryDatasets = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "ccip_orm_dataset_size", + }, []string{"query", "destChainSelector"}) +) + +type observedORM struct { + ORM + queryDuration *prometheus.HistogramVec + datasetSize *prometheus.GaugeVec +} + +var _ ORM = (*observedORM)(nil) + +func NewObservedORM(ds sqlutil.DataSource, lggr logger.Logger) (*observedORM, error) { + delegate, err := NewORM(ds, lggr) + if err != nil { + return nil, err + } + + return &observedORM{ + ORM: delegate, + queryDuration: ccipQueryDuration, + datasetSize: ccipQueryDatasets, + }, nil +} + +func (o *observedORM) GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error) { + return withObservedQueryAndResults(o, "GetGasPricesByDestChain", destChainSelector, func() ([]GasPrice, error) { + return o.ORM.GetGasPricesByDestChain(ctx, destChainSelector) + }) +} + +func (o *observedORM) GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error) { + return withObservedQueryAndResults(o, "GetTokenPricesByDestChain", destChainSelector, func() ([]TokenPrice, error) { + return o.ORM.GetTokenPricesByDestChain(ctx, destChainSelector) + }) +} + +func (o *observedORM) UpsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPrice) (int64, error) { + return withObservedQueryAndRowsAffected(o, "UpsertGasPricesForDestChain", destChainSelector, func() (int64, error) { + return o.ORM.UpsertGasPricesForDestChain(ctx, destChainSelector, gasPrices) + }) +} + +func (o *observedORM) UpsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPrice, interval time.Duration) (int64, error) { + return withObservedQueryAndRowsAffected(o, "UpsertTokenPricesForDestChain", destChainSelector, func() (int64, error) { + return o.ORM.UpsertTokenPricesForDestChain(ctx, destChainSelector, tokenPrices, interval) + }) +} + +func withObservedQueryAndRowsAffected(o *observedORM, queryName string, chainSelector uint64, query func() (int64, error)) (int64, error) { + rowsAffected, err := withObservedQuery(o, queryName, chainSelector, query) + if err == nil { + o.datasetSize. + WithLabelValues(queryName, strconv.FormatUint(chainSelector, 10)). + Set(float64(rowsAffected)) + } + return rowsAffected, err +} + +func withObservedQueryAndResults[T any](o *observedORM, queryName string, chainSelector uint64, query func() ([]T, error)) ([]T, error) { + results, err := withObservedQuery(o, queryName, chainSelector, query) + if err == nil { + o.datasetSize. + WithLabelValues(queryName, strconv.FormatUint(chainSelector, 10)). + Set(float64(len(results))) + } + return results, err +} + +func withObservedQuery[T any](o *observedORM, queryName string, chainSelector uint64, query func() (T, error)) (T, error) { + queryStarted := time.Now() + defer func() { + o.queryDuration. + WithLabelValues(queryName, strconv.FormatUint(chainSelector, 10)). + Observe(float64(time.Since(queryStarted))) + }() + return query() +} diff --git a/core/services/ccip/observability_test.go b/core/services/ccip/observability_test.go new file mode 100644 index 0000000000..24bfb4a9ec --- /dev/null +++ b/core/services/ccip/observability_test.go @@ -0,0 +1,94 @@ +package ccip + +import ( + "math/big" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +func Test_MetricsAreTrackedForAllMethods(t *testing.T) { + ctx := testutils.Context(t) + db := pgtest.NewSqlxDB(t) + ccipORM, err := NewObservedORM(db, logger.TestLogger(t)) + require.NoError(t, err) + + tokenPrices := []TokenPrice{ + { + TokenAddr: "0xA", + TokenPrice: assets.NewWei(big.NewInt(1e18)), + }, + { + TokenAddr: "0xB", + TokenPrice: assets.NewWei(big.NewInt(1e18)), + }, + } + tokensUpserted, err := ccipORM.UpsertTokenPricesForDestChain(ctx, 100, tokenPrices, time.Second) + require.NoError(t, err) + assert.Equal(t, len(tokenPrices), int(tokensUpserted)) + assert.Equal(t, len(tokenPrices), counterFromGaugeByLabels(ccipORM.datasetSize, "UpsertTokenPricesForDestChain", "100")) + assert.Equal(t, 0, counterFromGaugeByLabels(ccipORM.datasetSize, "UpsertTokenPricesForDestChain", "200")) + + tokens, err := ccipORM.GetTokenPricesByDestChain(ctx, 100) + require.NoError(t, err) + assert.Equal(t, len(tokenPrices), len(tokens)) + assert.Equal(t, len(tokenPrices), counterFromGaugeByLabels(ccipORM.datasetSize, "GetTokenPricesByDestChain", "100")) + assert.Equal(t, 1, counterFromHistogramByLabels(t, ccipORM.queryDuration, "GetTokenPricesByDestChain", "100")) + + gasPrices := []GasPrice{ + { + SourceChainSelector: 200, + GasPrice: assets.NewWei(big.NewInt(1e18)), + }, + { + SourceChainSelector: 201, + GasPrice: assets.NewWei(big.NewInt(1e18)), + }, + { + SourceChainSelector: 202, + GasPrice: assets.NewWei(big.NewInt(1e18)), + }, + } + gasUpserted, err := ccipORM.UpsertGasPricesForDestChain(ctx, 100, gasPrices) + require.NoError(t, err) + assert.Equal(t, len(gasPrices), int(gasUpserted)) + assert.Equal(t, len(gasPrices), counterFromGaugeByLabels(ccipORM.datasetSize, "UpsertGasPricesForDestChain", "100")) + assert.Equal(t, 0, counterFromGaugeByLabels(ccipORM.datasetSize, "UpsertGasPricesForDestChain", "200")) + + gas, err := ccipORM.GetGasPricesByDestChain(ctx, 100) + require.NoError(t, err) + assert.Equal(t, len(gasPrices), len(gas)) + assert.Equal(t, len(gasPrices), counterFromGaugeByLabels(ccipORM.datasetSize, "GetGasPricesByDestChain", "100")) + assert.Equal(t, 1, counterFromHistogramByLabels(t, ccipORM.queryDuration, "GetGasPricesByDestChain", "100")) +} + +func counterFromHistogramByLabels(t *testing.T, histogramVec *prometheus.HistogramVec, labels ...string) int { + observer, err := histogramVec.GetMetricWithLabelValues(labels...) + require.NoError(t, err) + + metricCh := make(chan prometheus.Metric, 1) + observer.(prometheus.Histogram).Collect(metricCh) + close(metricCh) + + metric := <-metricCh + pb := &io_prometheus_client.Metric{} + err = metric.Write(pb) + require.NoError(t, err) + + return int(pb.GetHistogram().GetSampleCount()) +} + +func counterFromGaugeByLabels(gaugeVec *prometheus.GaugeVec, labels ...string) int { + value := testutil.ToFloat64(gaugeVec.WithLabelValues(labels...)) + return int(value) +} diff --git a/core/services/ccip/orm.go b/core/services/ccip/orm.go index b513b13cc5..0c84135232 100644 --- a/core/services/ccip/orm.go +++ b/core/services/ccip/orm.go @@ -8,58 +8,55 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" + "github.com/smartcontractkit/chainlink/v2/core/logger" ) type GasPrice struct { SourceChainSelector uint64 GasPrice *assets.Wei - CreatedAt time.Time -} - -type GasPriceUpdate struct { - SourceChainSelector uint64 - GasPrice *assets.Wei } type TokenPrice struct { TokenAddr string TokenPrice *assets.Wei - CreatedAt time.Time -} - -type TokenPriceUpdate struct { - TokenAddr string - TokenPrice *assets.Wei } type ORM interface { GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error) GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error) - InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPriceUpdate) error - InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPriceUpdate) error + UpsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPrice) (int64, error) + UpsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPrice, interval time.Duration) (int64, error) +} + +type tokenPriceRow struct { + TokenAddr string + TokenPrice *assets.Wei + CreatedAt time.Time } type orm struct { - ds sqlutil.DataSource + ds sqlutil.DataSource + lggr logger.Logger } var _ ORM = (*orm)(nil) -func NewORM(ds sqlutil.DataSource) (ORM, error) { +func NewORM(ds sqlutil.DataSource, lggr logger.Logger) (ORM, error) { if ds == nil { return nil, fmt.Errorf("datasource to CCIP NewORM cannot be nil") } return &orm{ - ds: ds, + ds: ds, + lggr: lggr, }, nil } func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error) { var gasPrices []GasPrice stmt := ` - SELECT source_chain_selector, gas_price, created_at + SELECT source_chain_selector, gas_price FROM ccip.observed_gas_prices WHERE chain_selector = $1 ORDER BY source_chain_selector, created_at DESC; @@ -75,7 +72,7 @@ func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uin func (o *orm) GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error) { var tokenPrices []TokenPrice stmt := ` - SELECT token_addr, token_price, created_at + SELECT token_addr, token_price FROM ccip.observed_token_prices WHERE chain_selector = $1 ORDER BY token_addr; @@ -84,16 +81,15 @@ func (o *orm) GetTokenPricesByDestChain(ctx context.Context, destChainSelector u if err != nil { return nil, err } - return tokenPrices, nil } -func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPriceUpdate) error { +func (o *orm) UpsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPrice) (int64, error) { if len(gasPrices) == 0 { - return nil + return 0, nil } - uniqueGasUpdates := make(map[string]GasPriceUpdate) + uniqueGasUpdates := make(map[string]GasPrice) for _, gasPrice := range gasPrices { key := fmt.Sprintf("%d-%d", gasPrice.SourceChainSelector, destChainSelector) uniqueGasUpdates[key] = gasPrice @@ -110,28 +106,32 @@ func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector stmt := `INSERT INTO ccip.observed_gas_prices (chain_selector, source_chain_selector, gas_price, created_at) VALUES (:chain_selector, :source_chain_selector, :gas_price, statement_timestamp()) - ON CONFLICT (chain_selector, source_chain_selector) + ON CONFLICT (source_chain_selector, chain_selector) DO UPDATE SET gas_price = EXCLUDED.gas_price, created_at = EXCLUDED.created_at;` - _, err := o.ds.NamedExecContext(ctx, stmt, insertData) + + result, err := o.ds.NamedExecContext(ctx, stmt, insertData) if err != nil { - err = fmt.Errorf("error inserting gas prices %w", err) + return 0, fmt.Errorf("error inserting gas prices %w", err) } - return err + return result.RowsAffected() } -func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPriceUpdate) error { +// UpsertTokenPricesForDestChain inserts or updates only relevant token prices. +// In order to reduce locking an unnecessary writes to the table, we start with fetching current prices. +// If price for a token doesn't change or was updated recently we don't include that token to the upsert query. +// We don't run in TX intentionally, because we don't want to lock the table and conflicts are resolved on the insert level +func (o *orm) UpsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPrice, interval time.Duration) (int64, error) { if len(tokenPrices) == 0 { - return nil + return 0, nil } - uniqueTokenPrices := make(map[string]TokenPriceUpdate) - for _, tokenPrice := range tokenPrices { - key := fmt.Sprintf("%s-%d", tokenPrice.TokenAddr, destChainSelector) - uniqueTokenPrices[key] = tokenPrice + tokensToUpdate, err := o.pickOnlyRelevantTokensForUpdate(ctx, destChainSelector, tokenPrices, interval) + if err != nil || len(tokensToUpdate) == 0 { + return 0, err } - insertData := make([]map[string]interface{}, 0, len(uniqueTokenPrices)) - for _, price := range uniqueTokenPrices { + insertData := make([]map[string]interface{}, 0, len(tokensToUpdate)) + for _, price := range tokensToUpdate { insertData = append(insertData, map[string]interface{}{ "chain_selector": destChainSelector, "token_addr": price.TokenAddr, @@ -141,11 +141,75 @@ func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelect stmt := `INSERT INTO ccip.observed_token_prices (chain_selector, token_addr, token_price, created_at) VALUES (:chain_selector, :token_addr, :token_price, statement_timestamp()) - ON CONFLICT (chain_selector, token_addr) + ON CONFLICT (token_addr, chain_selector) DO UPDATE SET token_price = EXCLUDED.token_price, created_at = EXCLUDED.created_at;` - _, err := o.ds.NamedExecContext(ctx, stmt, insertData) + result, err := o.ds.NamedExecContext(ctx, stmt, insertData) if err != nil { - err = fmt.Errorf("error inserting token prices %w", err) + return 0, fmt.Errorf("error inserting token prices %w", err) + } + return result.RowsAffected() +} + +// pickOnlyRelevantTokensForUpdate returns only tokens that need to be updated. Multiple jobs can be updating the same tokens, +// in order to reduce table locking and redundant upserts we start with reading the table and checking which tokens are eligible for update. +// A token is eligible for update when: +// * price has changed and created_at date is older than the interval +// * it's not present in the result set from the db query (e.g. it's a new token) +// Therefore if there are no price changes for a single token we won't update the state of the database +func (o *orm) pickOnlyRelevantTokensForUpdate( + ctx context.Context, + destChainSelector uint64, + tokenPrices []TokenPrice, + interval time.Duration, +) ([]TokenPrice, error) { + stmt := ` + SELECT token_addr, token_price, created_at + FROM ccip.observed_token_prices + WHERE chain_selector = $1 and token_addr = ANY($2); + ` + + var dbTokenPrices []tokenPriceRow + if err := o.ds.SelectContext(ctx, &dbTokenPrices, stmt, destChainSelector, tokenAddrsToBytes(tokenPrices)); err != nil { + return nil, err + } + + updateThrehsold := time.Now().Add(-interval) + tokenPricesByAddr := toTokensByAddress(tokenPrices) + dbTokenPricesByAddr := toTokenRowsByAddress(dbTokenPrices) + tokenPricesToUpdate := make([]TokenPrice, 0, len(tokenPrices)) + for addr, price := range tokenPricesByAddr { + dbToken, ok := dbTokenPricesByAddr[addr] + eligibleForUpdate := false + if !ok || + (dbToken.CreatedAt.Before(updateThrehsold) && !dbToken.TokenPrice.Equal(price)) { + tokenPricesToUpdate = append(tokenPricesToUpdate, TokenPrice{addr, price}) + eligibleForUpdate = true + } + o.lggr.Debugw("Token price database update", "eligibleForUpdate", eligibleForUpdate, "token", addr, "price", price) + } + return tokenPricesToUpdate, nil +} + +func toTokensByAddress(tokens []TokenPrice) map[string]*assets.Wei { + tokensByAddr := make(map[string]*assets.Wei, len(tokens)) + for _, tk := range tokens { + tokensByAddr[tk.TokenAddr] = tk.TokenPrice + } + return tokensByAddr +} + +func toTokenRowsByAddress(tokens []tokenPriceRow) map[string]tokenPriceRow { + tokensByAddr := make(map[string]tokenPriceRow, len(tokens)) + for _, tk := range tokens { + tokensByAddr[tk.TokenAddr] = tk + } + return tokensByAddr +} + +func tokenAddrsToBytes(tokens []TokenPrice) [][]byte { + addrs := make([][]byte, 0, len(tokens)) + for _, tk := range tokens { + addrs = append(addrs, []byte(tk.TokenAddr)) } - return err + return addrs } diff --git a/core/services/ccip/orm_test.go b/core/services/ccip/orm_test.go index f7b8a6d5a4..9c51d620f5 100644 --- a/core/services/ccip/orm_test.go +++ b/core/services/ccip/orm_test.go @@ -15,13 +15,18 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +var ( + r = rand.New(rand.NewSource(time.Now().UnixNano())) ) func setupORM(t *testing.T) (ORM, sqlutil.DataSource) { t.Helper() db := pgtest.NewSqlxDB(t) - orm, err := NewORM(db) + orm, err := NewORM(db, logger.TestLogger(t)) require.NoError(t, err) @@ -37,12 +42,12 @@ func generateChainSelectors(n int) []uint64 { return selectors } -func generateGasPriceUpdates(chainSelector uint64, n int) []GasPriceUpdate { - updates := make([]GasPriceUpdate, n) +func generateGasPrices(chainSelector uint64, n int) []GasPrice { + updates := make([]GasPrice, n) for i := 0; i < n; i++ { // gas prices can take up whole range of uint256 uint256Max := new(big.Int).Sub(new(big.Int).Exp(big.NewInt(2), big.NewInt(256), nil), big.NewInt(1)) - row := GasPriceUpdate{ + row := GasPrice{ SourceChainSelector: chainSelector, GasPrice: assets.NewWei(new(big.Int).Sub(uint256Max, big.NewInt(int64(i)))), } @@ -61,10 +66,21 @@ func generateTokenAddresses(n int) []string { return addrs } -func generateTokenPriceUpdates(tokenAddr string, n int) []TokenPriceUpdate { - updates := make([]TokenPriceUpdate, n) +func generateRandomTokenPrices(tokenAddrs []string) []TokenPrice { + updates := make([]TokenPrice, 0, len(tokenAddrs)) + for _, addr := range tokenAddrs { + updates = append(updates, TokenPrice{ + TokenAddr: addr, + TokenPrice: assets.NewWei(new(big.Int).Rand(r, big.NewInt(1e18))), + }) + } + return updates +} + +func generateTokenPrices(tokenAddr string, n int) []TokenPrice { + updates := make([]TokenPrice, n) for i := 0; i < n; i++ { - row := TokenPriceUpdate{ + row := TokenPrice{ TokenAddr: tokenAddr, TokenPrice: assets.NewWei(new(big.Int).Mul(big.NewInt(1e18), big.NewInt(int64(i)))), } @@ -134,20 +150,20 @@ func TestORM_InsertAndGetGasPrices(t *testing.T) { sourceSelectors := generateChainSelectors(numSourceChainSelectors) - updates := make(map[uint64][]GasPriceUpdate) + updates := make(map[uint64][]GasPrice) for _, selector := range sourceSelectors { - updates[selector] = generateGasPriceUpdates(selector, numUpdatesPerSourceSelector) + updates[selector] = generateGasPrices(selector, numUpdatesPerSourceSelector) } // 5 jobs, each inserting prices for 10 chains, with 20 updates per chain. - expectedPrices := make(map[uint64]GasPriceUpdate) + expectedPrices := make(map[uint64]GasPrice) for i := 0; i < numJobs; i++ { for selector, updatesPerSelector := range updates { lastIndex := len(updatesPerSelector) - 1 - err := orm.InsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector[:lastIndex]) + _, err := orm.UpsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector[:lastIndex]) assert.NoError(t, err) - err = orm.InsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector[lastIndex:]) + _, err = orm.UpsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector[lastIndex:]) assert.NoError(t, err) expectedPrices[selector] = updatesPerSelector[lastIndex] @@ -170,13 +186,13 @@ func TestORM_InsertAndGetGasPrices(t *testing.T) { } // after the initial inserts, insert new round of prices, 1 price per selector this time - var combinedUpdates []GasPriceUpdate + var combinedUpdates []GasPrice for selector, updatesPerSelector := range updates { combinedUpdates = append(combinedUpdates, updatesPerSelector[0]) expectedPrices[selector] = updatesPerSelector[0] } - err = orm.InsertGasPricesForDestChain(ctx, destSelector, combinedUpdates) + _, err = orm.UpsertGasPricesForDestChain(ctx, destSelector, combinedUpdates) assert.NoError(t, err) assert.Equal(t, numSourceChainSelectors, getGasTableRowCount(t, db)) @@ -190,7 +206,7 @@ func TestORM_InsertAndGetGasPrices(t *testing.T) { } } -func TestORM_InsertAndDeleteGasPrices(t *testing.T) { +func TestORM_UpsertGasPrices(t *testing.T) { t.Parallel() ctx := testutils.Context(t) @@ -202,13 +218,13 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) { sourceSelectors := generateChainSelectors(numSourceChainSelectors) - updates := make(map[uint64][]GasPriceUpdate) + updates := make(map[uint64][]GasPrice) for _, selector := range sourceSelectors { - updates[selector] = generateGasPriceUpdates(selector, numUpdatesPerSourceSelector) + updates[selector] = generateGasPrices(selector, numUpdatesPerSourceSelector) } for _, updatesPerSelector := range updates { - err := orm.InsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector) + _, err := orm.UpsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector) assert.NoError(t, err) } @@ -217,7 +233,7 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) { // insert for the 2nd time after interimTimeStamp for _, updatesPerSelector := range updates { - err := orm.InsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector) + _, err := orm.UpsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector) assert.NoError(t, err) } @@ -237,20 +253,20 @@ func TestORM_InsertAndGetTokenPrices(t *testing.T) { addrs := generateTokenAddresses(numAddresses) - updates := make(map[string][]TokenPriceUpdate) + updates := make(map[string][]TokenPrice) for _, addr := range addrs { - updates[addr] = generateTokenPriceUpdates(addr, numUpdatesPerAddress) + updates[addr] = generateTokenPrices(addr, numUpdatesPerAddress) } // 5 jobs, each inserting prices for 10 chains, with 20 updates per chain. - expectedPrices := make(map[string]TokenPriceUpdate) + expectedPrices := make(map[string]TokenPrice) for i := 0; i < numJobs; i++ { for addr, updatesPerAddr := range updates { lastIndex := len(updatesPerAddr) - 1 - err := orm.InsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr[:lastIndex]) + _, err := orm.UpsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr[:lastIndex], 0) assert.NoError(t, err) - err = orm.InsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr[lastIndex:]) + _, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr[lastIndex:], 0) assert.NoError(t, err) expectedPrices[addr] = updatesPerAddr[lastIndex] @@ -273,13 +289,13 @@ func TestORM_InsertAndGetTokenPrices(t *testing.T) { } // after the initial inserts, insert new round of prices, 1 price per selector this time - var combinedUpdates []TokenPriceUpdate + var combinedUpdates []TokenPrice for addr, updatesPerAddr := range updates { combinedUpdates = append(combinedUpdates, updatesPerAddr[0]) expectedPrices[addr] = updatesPerAddr[0] } - err = orm.InsertTokenPricesForDestChain(ctx, destSelector, combinedUpdates) + _, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, combinedUpdates, 0) assert.NoError(t, err) assert.Equal(t, numAddresses, getTokenTableRowCount(t, db)) @@ -293,36 +309,74 @@ func TestORM_InsertAndGetTokenPrices(t *testing.T) { } } -func TestORM_InsertAndDeleteTokenPrices(t *testing.T) { +func TestORM_InsertTokenPricesWhenExpired(t *testing.T) { t.Parallel() ctx := testutils.Context(t) - - orm, db := setupORM(t) + orm, _ := setupORM(t) numAddresses := 10 - numUpdatesPerAddress := 20 - destSelector := uint64(1) - + destSelector := rand.Uint64() addrs := generateTokenAddresses(numAddresses) + initTokenUpdates := generateRandomTokenPrices(addrs) - updates := make(map[string][]TokenPriceUpdate) - for _, addr := range addrs { - updates[addr] = generateTokenPriceUpdates(addr, numUpdatesPerAddress) - } + // Insert the first time, table is initialized + rowsUpdated, err := orm.UpsertTokenPricesForDestChain(ctx, destSelector, initTokenUpdates, time.Minute) + require.NoError(t, err) + assert.Equal(t, int64(numAddresses), rowsUpdated) - for _, updatesPerAddr := range updates { - err := orm.InsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr) - assert.NoError(t, err) - } + //time.Sleep(100 * time.Millisecond) - sleepSec := 2 - time.Sleep(time.Duration(sleepSec) * time.Second) + // Insert the second time, no updates, because prices haven't changed + rowsUpdated, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, initTokenUpdates, time.Minute) + require.NoError(t, err) + assert.Equal(t, int64(0), rowsUpdated) - // insert for the 2nd time after interimTimeStamp - for _, updatesPerAddr := range updates { - err := orm.InsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr) - assert.NoError(t, err) + // There are new prices, but we still haven't reached interval + newPrices := generateRandomTokenPrices(addrs) + rowsUpdated, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, newPrices, time.Minute) + require.NoError(t, err) + assert.Equal(t, int64(0), rowsUpdated) + + time.Sleep(100 * time.Millisecond) + + // Again with the same new prices, but this time interval is reached + rowsUpdated, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, newPrices, time.Millisecond) + require.NoError(t, err) + assert.Equal(t, int64(numAddresses), rowsUpdated) + + dbTokenPrices, err := orm.GetTokenPricesByDestChain(ctx, destSelector) + require.NoError(t, err) + assert.Len(t, dbTokenPrices, numAddresses) + + dbTokenPricesByAddr := toTokensByAddress(dbTokenPrices) + for _, tkPrice := range newPrices { + dbToken, ok := dbTokenPricesByAddr[tkPrice.TokenAddr] + assert.True(t, ok) + assert.Equal(t, dbToken, tkPrice.TokenPrice) } - assert.Equal(t, numAddresses, getTokenTableRowCount(t, db)) + // Single token gets an update + newPrices[0].TokenPrice = assets.NewWei(new(big.Int).Add(newPrices[0].TokenPrice.ToInt(), big.NewInt(1))) + rowsUpdated, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, newPrices, time.Nanosecond) + require.NoError(t, err) + assert.Equal(t, int64(1), rowsUpdated) +} + +func Benchmark_UpsertsTheSameTokenPrices(b *testing.B) { + db := pgtest.NewSqlxDB(b) + orm, err := NewORM(db, logger.NullLogger) + require.NoError(b, err) + + ctx := testutils.Context(b) + numAddresses := 50 + destSelector := rand.Uint64() + addrs := generateTokenAddresses(numAddresses) + tokenUpdates := generateRandomTokenPrices(addrs) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, err1 := orm.UpsertTokenPricesForDestChain(ctx, destSelector, tokenUpdates, time.Second) + require.NoError(b, err1) + } } diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go index 30926cfd4e..a40c8b620b 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go @@ -162,7 +162,7 @@ func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider c onRampAddress, ) - orm, err := cciporm.NewORM(ds) + orm, err := cciporm.NewObservedORM(ds, lggr) if err != nil { return nil, err } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go index 23ea3dcbd0..be77e8f1c3 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go @@ -13,7 +13,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" - "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" cciporm "github.com/smartcontractkit/chainlink/v2/core/services/ccip" @@ -22,6 +21,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/pricegetter" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices" + "github.com/smartcontractkit/chainlink/v2/core/utils" ) // PriceService manages DB access for gas and token price data. @@ -42,17 +42,11 @@ type PriceService interface { var _ PriceService = (*priceService)(nil) const ( - // Prices should expire after 10 minutes in DB. Prices should be fresh in the Commit plugin. - // 10 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while - // surfacing price update outages quickly enough. - priceExpireSec = 600 - // Prices are refreshed every 1 minute, they are sufficiently accurate, and consistent with Commit OCR round time. priceUpdateInterval = 60 * time.Second ) type priceService struct { - priceExpireSec int updateInterval time.Duration lggr logger.Logger @@ -88,8 +82,7 @@ func NewPriceService( ctx, cancel := context.WithCancel(context.Background()) pw := &priceService{ - priceExpireSec: priceExpireSec, - updateInterval: utils.WithJitter(priceUpdateInterval), + updateInterval: priceUpdateInterval, lggr: lggr, orm: orm, @@ -128,7 +121,7 @@ func (p *priceService) Close() error { } func (p *priceService) run() { - updateTicker := time.NewTicker(p.updateInterval) + updateTicker := time.NewTicker(utils.WithJitter(p.updateInterval)) go func() { defer p.wg.Done() @@ -325,20 +318,21 @@ func (p *priceService) writePricesToDB( if sourceGasPriceUSD != nil { eg.Go(func() error { - return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, []cciporm.GasPriceUpdate{ + _, err1 := p.orm.UpsertGasPricesForDestChain(ctx, p.destChainSelector, []cciporm.GasPrice{ { SourceChainSelector: p.sourceChainSelector, GasPrice: assets.NewWei(sourceGasPriceUSD), }, }) + return err1 }) } if tokenPricesUSD != nil { - var tokenPrices []cciporm.TokenPriceUpdate + var tokenPrices []cciporm.TokenPrice for token, price := range tokenPricesUSD { - tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{ + tokenPrices = append(tokenPrices, cciporm.TokenPrice{ TokenAddr: string(token), TokenPrice: assets.NewWei(price), }) @@ -350,7 +344,8 @@ func (p *priceService) writePricesToDB( }) eg.Go(func() error { - return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, tokenPrices) + _, err1 := p.orm.UpsertTokenPricesForDestChain(ctx, p.destChainSelector, tokenPrices, p.updateInterval) + return err1 }) } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go index 872d933a94..245bda995c 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go @@ -18,7 +18,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" cciporm "github.com/smartcontractkit/chainlink/v2/core/services/ccip" @@ -42,13 +41,13 @@ func TestPriceService_priceWrite(t *testing.T) { "0x234": big.NewInt(3e18), } - expectedGasPriceUpdate := []cciporm.GasPriceUpdate{ + expectedGasPriceUpdate := []cciporm.GasPrice{ { SourceChainSelector: sourceChainSelector, GasPrice: assets.NewWei(gasPrice), }, } - expectedTokenPriceUpdate := []cciporm.TokenPriceUpdate{ + expectedTokenPriceUpdate := []cciporm.TokenPrice{ { TokenAddr: "0x123", TokenPrice: assets.NewWei(big.NewInt(2e18)), @@ -105,8 +104,10 @@ func TestPriceService_priceWrite(t *testing.T) { } mockOrm := ccipmocks.NewORM(t) - mockOrm.On("InsertGasPricesForDestChain", ctx, destChainSelector, jobId, expectedGasPriceUpdate).Return(gasPricesError).Once() - mockOrm.On("InsertTokenPricesForDestChain", ctx, destChainSelector, jobId, expectedTokenPriceUpdate).Return(tokenPricesError).Once() + mockOrm.On("UpsertGasPricesForDestChain", ctx, destChainSelector, expectedGasPriceUpdate). + Return(int64(len(expectedGasPriceUpdate)), gasPricesError).Once() + mockOrm.On("UpsertTokenPricesForDestChain", ctx, destChainSelector, expectedTokenPriceUpdate, priceUpdateInterval). + Return(int64(len(expectedTokenPriceUpdate)), tokenPricesError).Once() priceService := NewPriceService( lggr, @@ -555,7 +556,7 @@ func setupORM(t *testing.T) cciporm.ORM { t.Helper() db := pgtest.NewSqlxDB(t) - orm, err := cciporm.NewORM(db) + orm, err := cciporm.NewORM(db, logger.TestLogger(t)) require.NoError(t, err) @@ -577,7 +578,7 @@ func checkResultLen(t *testing.T, priceService PriceService, destChainSelector u return nil } -func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { +func TestPriceService_priceWriteInBackground(t *testing.T) { lggr := logger.TestLogger(t) jobId := int32(1) destChainSelector := uint64(12345) @@ -640,8 +641,6 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { // run write task every 2 second priceService.updateInterval = updateInterval - // expire all prices during every cleanup - priceService.priceExpireSec = 0 // initially, db is empty assert.NoError(t, checkResultLen(t, priceService, destChainSelector, 0, 0)) @@ -654,23 +653,5 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { assert.NoError(t, err) assert.NoError(t, checkResultLen(t, priceService, destChainSelector, 1, len(laneTokens))) - // eventually prices will be cleaned - assert.Eventually(t, func() bool { - err := checkResultLen(t, priceService, destChainSelector, 0, 0) - return err == nil - }, testutils.WaitTimeout(t), testutils.TestInterval) - - // then prices will be updated again - assert.Eventually(t, func() bool { - err := checkResultLen(t, priceService, destChainSelector, 1, len(laneTokens)) - return err == nil - }, testutils.WaitTimeout(t), testutils.TestInterval) - assert.NoError(t, priceService.Close()) - - // after stopping PriceService and runCleanup, no more updates are inserted - for i := 0; i < 5; i++ { - time.Sleep(time.Second) - assert.NoError(t, checkResultLen(t, priceService, destChainSelector, 0, 0)) - } } From 83eb4b6108eb0101b84a5d62f76fe864a91c03c1 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Wed, 14 Aug 2024 12:55:08 +0200 Subject: [PATCH 3/4] Better indexing --- core/services/ccip/orm.go | 6 ++---- .../store/migrate/migrations/0249_ccip_token_prices_fix.sql | 4 ++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/core/services/ccip/orm.go b/core/services/ccip/orm.go index 0c84135232..4676d59093 100644 --- a/core/services/ccip/orm.go +++ b/core/services/ccip/orm.go @@ -58,8 +58,7 @@ func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uin stmt := ` SELECT source_chain_selector, gas_price FROM ccip.observed_gas_prices - WHERE chain_selector = $1 - ORDER BY source_chain_selector, created_at DESC; + WHERE chain_selector = $1; ` err := o.ds.SelectContext(ctx, &gasPrices, stmt, destChainSelector) if err != nil { @@ -74,8 +73,7 @@ func (o *orm) GetTokenPricesByDestChain(ctx context.Context, destChainSelector u stmt := ` SELECT token_addr, token_price FROM ccip.observed_token_prices - WHERE chain_selector = $1 - ORDER BY token_addr; + WHERE chain_selector = $1; ` err := o.ds.SelectContext(ctx, &tokenPrices, stmt, destChainSelector) if err != nil { diff --git a/core/store/migrate/migrations/0249_ccip_token_prices_fix.sql b/core/store/migrate/migrations/0249_ccip_token_prices_fix.sql index ebadf54468..686c9d1061 100644 --- a/core/store/migrate/migrations/0249_ccip_token_prices_fix.sql +++ b/core/store/migrate/migrations/0249_ccip_token_prices_fix.sql @@ -10,7 +10,7 @@ CREATE TABLE ccip.observed_token_prices token_addr BYTEA NOT NULL, token_price NUMERIC(78, 0) NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - CONSTRAINT unique_token_chain_selector UNIQUE (token_addr, chain_selector) + PRIMARY KEY (chain_selector, token_addr) ); CREATE TABLE ccip.observed_gas_prices @@ -19,7 +19,7 @@ CREATE TABLE ccip.observed_gas_prices source_chain_selector NUMERIC(20, 0) NOT NULL, gas_price NUMERIC(78, 0) NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - CONSTRAINT unique_source_dest_chain UNIQUE (source_chain_selector, chain_selector) + PRIMARY KEY (chain_selector, source_chain_selector) ); -- +goose Down From 2320567633e2ee762535c367ae33333e21e928a3 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Fri, 16 Aug 2024 13:57:13 +0200 Subject: [PATCH 4/4] Test --- core/services/ccip/orm.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/core/services/ccip/orm.go b/core/services/ccip/orm.go index 4676d59093..28e08be2a3 100644 --- a/core/services/ccip/orm.go +++ b/core/services/ccip/orm.go @@ -3,6 +3,7 @@ package ccip import ( "context" "fmt" + "strings" "time" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" @@ -60,6 +61,7 @@ func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uin FROM ccip.observed_gas_prices WHERE chain_selector = $1; ` + o.withAnalyze(ctx, "GetGasPricesByDestChain", stmt, destChainSelector) err := o.ds.SelectContext(ctx, &gasPrices, stmt, destChainSelector) if err != nil { return nil, err @@ -75,6 +77,7 @@ func (o *orm) GetTokenPricesByDestChain(ctx context.Context, destChainSelector u FROM ccip.observed_token_prices WHERE chain_selector = $1; ` + o.withAnalyze(ctx, "GetTokenPricesByDestChain", stmt, destChainSelector) err := o.ds.SelectContext(ctx, &tokenPrices, stmt, destChainSelector) if err != nil { return nil, err @@ -166,6 +169,8 @@ func (o *orm) pickOnlyRelevantTokensForUpdate( WHERE chain_selector = $1 and token_addr = ANY($2); ` + o.withAnalyze(ctx, "pickOnlyRelevantTokensForUpdate", stmt, destChainSelector, tokenAddrsToBytes(tokenPrices)) + var dbTokenPrices []tokenPriceRow if err := o.ds.SelectContext(ctx, &dbTokenPrices, stmt, destChainSelector, tokenAddrsToBytes(tokenPrices)); err != nil { return nil, err @@ -211,3 +216,16 @@ func tokenAddrsToBytes(tokens []TokenPrice) [][]byte { } return addrs } + +func (o *orm) withAnalyze(ctx context.Context, queryName string, query string, args ...interface{}) { + query = "EXPLAIN (ANALYZE, BUFFERS) " + query + + var response []string + err := o.ds.SelectContext(ctx, &response, query, args...) + if err != nil { + return + } + if len(response) > 0 { + o.lggr.Infow("Analyze query", "query", queryName, "response", strings.Join(response, "\n")) + } +}