From f21a6d33716b7d3699c6c90bd3171fd4f5490ac4 Mon Sep 17 00:00:00 2001 From: dimkouv Date: Tue, 10 Oct 2023 13:09:40 +0300 Subject: [PATCH 01/15] optimize price updates fetching query --- .../ccip/commit_price_updates_cache.go | 79 +++++++++++++++++++ .../ccip/commit_price_updates_cache_test.go | 52 ++++++++++++ .../plugins/ccip/commit_reporting_plugin.go | 38 +++++---- .../ccip/commit_reporting_plugin_test.go | 68 ++++++++++++++++ 4 files changed, 220 insertions(+), 17 deletions(-) create mode 100644 core/services/ocr2/plugins/ccip/commit_price_updates_cache.go create mode 100644 core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go diff --git a/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go b/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go new file mode 100644 index 0000000000..7b61a16ea1 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go @@ -0,0 +1,79 @@ +package ccip + +import ( + "context" + "math/big" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" +) + +type tokenPriceUpdatesCache struct { + mem map[common.Address]update + mu *sync.RWMutex + expiry time.Duration +} + +func newTokenPriceUpdatesCache(ctx context.Context, expiry time.Duration) *tokenPriceUpdatesCache { + c := &tokenPriceUpdatesCache{ + mem: make(map[common.Address]update), + mu: &sync.RWMutex{}, + expiry: expiry, + } + go c.expirationWorker(ctx) + return c +} + +func (c *tokenPriceUpdatesCache) expirationWorker(ctx context.Context) { + tick := time.NewTicker(c.expiry) + + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + c.mu.Lock() + c.mem = make(map[common.Address]update) + c.mu.Unlock() + } + } +} + +func (c *tokenPriceUpdatesCache) mostRecentTs() time.Time { + c.mu.RLock() + defer c.mu.RUnlock() + + ts := time.Time{} + for _, upd := range c.mem { + if upd.timestamp.After(ts) { + ts = upd.timestamp + } + } + return ts +} + +func (c *tokenPriceUpdatesCache) updateIfMoreRecent(ts time.Time, tk common.Address, val *big.Int) bool { + c.mu.RLock() + v, exists := c.mem[tk] + c.mu.RUnlock() + + if !exists || v.timestamp.Before(ts) { + c.mu.Lock() + c.mem[tk] = update{timestamp: ts, value: val} + c.mu.Unlock() + return true + } + + return false +} + +func (c *tokenPriceUpdatesCache) get() map[common.Address]update { + c.mu.RLock() + defer c.mu.RUnlock() + cp := make(map[common.Address]update, len(c.mem)) + for k, v := range c.mem { + cp[k] = v + } + return cp +} diff --git a/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go b/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go new file mode 100644 index 0000000000..3239c5d5cd --- /dev/null +++ b/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go @@ -0,0 +1,52 @@ +package ccip + +import ( + "context" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" +) + +func Test_tokenPriceUpdatesCache(t *testing.T) { + ctx := context.Background() + + tk := common.HexToAddress("1") + ts := time.Now().Truncate(time.Second) + + t.Run("base", func(t *testing.T) { + c := newTokenPriceUpdatesCache(ctx, time.Minute) + assert.Equal(t, time.Time{}, c.mostRecentTs()) + + c.updateIfMoreRecent(ts, tk, big.NewInt(100)) + assert.Equal(t, ts, c.mostRecentTs()) + v := c.get() + assert.Equal(t, big.NewInt(100), v[tk].value) + + // should not get updated if ts is older + c.updateIfMoreRecent(ts.Add(-1*time.Minute), tk, big.NewInt(101)) + v = c.get() + assert.Equal(t, big.NewInt(100), v[tk].value) + }) + + t.Run("test expiration", func(t *testing.T) { + c := newTokenPriceUpdatesCache(ctx, 20*time.Nanosecond) // every 1ns cache expires + assert.Equal(t, time.Time{}, c.mostRecentTs()) + c.updateIfMoreRecent(ts, tk, big.NewInt(100)) + time.Sleep(time.Millisecond) + assert.Equal(t, time.Time{}, c.mostRecentTs()) // should have expired + assert.Len(t, c.get(), 0) + }) + + t.Run("test expiration worker cancellation", func(t *testing.T) { + ctx, cf := context.WithCancel(context.Background()) + c := newTokenPriceUpdatesCache(ctx, time.Nanosecond) // every 1ns cache expires + cf() // stop the cancellation worker + c.updateIfMoreRecent(ts, tk, big.NewInt(100)) + time.Sleep(10 * time.Nanosecond) + assert.Equal(t, ts, c.mostRecentTs()) // should not have expired, since worker stopped + assert.Len(t, c.get(), 1) + }) +} diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go index e27cd8022a..83ccde982d 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go @@ -80,7 +80,8 @@ type CommitReportingPlugin struct { // Offchain priceGetter pricegetter.PriceGetter // State - inflightReports *inflightCommitReportsContainer + inflightReports *inflightCommitReportsContainer + tokenPriceUpdatesCache *tokenPriceUpdatesCache } type CommitReportingPluginFactory struct { @@ -159,7 +160,8 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin rf.config.destClient, int64(rf.config.commitStore.OffchainConfig().DestFinalityDepth), ), - gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(), + gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(), + tokenPriceUpdatesCache: newTokenPriceUpdatesCache(context.Background(), time.Hour), }, types.ReportingPluginInfo{ Name: "CCIPCommit", @@ -360,27 +362,29 @@ func calculateUsdPer1e18TokenAmount(price *big.Int, decimals uint8) *big.Int { // Gets the latest token price updates based on logs within the heartbeat // The updates returned by this function are guaranteed to not contain nil values. func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context, now time.Time, checkInflight bool) (map[common.Address]update, error) { - tokenPriceUpdates, err := r.destPriceRegistryReader.GetTokenPriceUpdatesCreatedAfter( - ctx, - now.Add(-r.offchainConfig.TokenPriceHeartBeat), - 0, - ) + ts := now.Add(-r.offchainConfig.TokenPriceHeartBeat) + if mostRecentCachedTs := r.tokenPriceUpdatesCache.mostRecentTs(); mostRecentCachedTs.After(ts) { + ts = mostRecentCachedTs + } + + newTokenPriceUpdates, err := r.destPriceRegistryReader.GetTokenPriceUpdatesCreatedAfter(ctx, ts, 0) if err != nil { return nil, err } - latestUpdates := make(map[common.Address]update) - for _, tokenUpdate := range tokenPriceUpdates { - priceUpdate := tokenUpdate.Data - // Ordered by ascending timestamps - timestamp := time.Unix(priceUpdate.Timestamp.Int64(), 0) - if priceUpdate.Value != nil && !timestamp.Before(latestUpdates[priceUpdate.Token].timestamp) { - latestUpdates[priceUpdate.Token] = update{ - timestamp: timestamp, - value: priceUpdate.Value, - } + for _, upd := range newTokenPriceUpdates { + if upd.Data.Value == nil { + continue } + + r.tokenPriceUpdatesCache.updateIfMoreRecent( + time.Unix(upd.Data.Timestamp.Int64(), 0), + upd.Data.Token, + upd.Data.Value, + ) } + + latestUpdates := r.tokenPriceUpdatesCache.get() if !checkInflight { return latestUpdates, nil } diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go index e38b5b1538..32d199db88 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go @@ -298,6 +298,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) { p.gasPriceEstimator = gasPriceEstimator p.offchainConfig.GasPriceHeartBeat = gasPriceHeartBeat.Duration() p.commitStoreReader = commitStoreReader + p.tokenPriceUpdatesCache = newTokenPriceUpdatesCache(context.Background(), time.Hour) p.F = tc.f aos := make([]types.AttributedObservation, 0, len(tc.observations)) @@ -1506,6 +1507,7 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates(t *testing.T) { //_, priceRegAddr := testhelpers.NewFakePriceRegistry(t) priceReg := ccipdata.NewMockPriceRegistryReader(t) p.destPriceRegistryReader = priceReg + p.tokenPriceUpdatesCache = newTokenPriceUpdatesCache(context.Background(), time.Hour) //destReader := ccipdata.NewMockReader(t) var events []ccipdata.Event[ccipdata.TokenPriceUpdate] @@ -1545,6 +1547,72 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates(t *testing.T) { } +func TestCommitReportingPlugin_getLatestTokenPriceUpdates_cache(t *testing.T) { + priceReg := ccipdata.NewMockPriceRegistryReader(t) + p := &CommitReportingPlugin{ + tokenPriceUpdatesCache: newTokenPriceUpdatesCache(context.Background(), time.Hour), + destPriceRegistryReader: priceReg, + offchainConfig: ccipdata.CommitOffchainConfig{ + TokenPriceHeartBeat: 12 * time.Hour, + }, + } + + twoHoursAgo := time.Now().Add(-2 * time.Hour) + threeHoursAgo := time.Now().Add(-3 * time.Hour) + fourHoursAgo := time.Now().Add(-4 * time.Hour) + + tk1 := utils.RandomAddress() + now := time.Now() + + onChainUpdates := []ccipdata.Event[ccipdata.TokenPriceUpdate]{ + { + Data: ccipdata.TokenPriceUpdate{ + TokenPrice: ccipdata.TokenPrice{Token: tk1, Value: big.NewInt(100)}, + Timestamp: big.NewInt(0).SetInt64(fourHoursAgo.Unix()), + }, + }, + { + Data: ccipdata.TokenPriceUpdate{ + TokenPrice: ccipdata.TokenPrice{Token: tk1, Value: big.NewInt(102)}, + Timestamp: big.NewInt(0).SetInt64(twoHoursAgo.Unix()), + }, + }, + { + Data: ccipdata.TokenPriceUpdate{ + TokenPrice: ccipdata.TokenPrice{Token: tk1, Value: big.NewInt(101)}, + Timestamp: big.NewInt(0).SetInt64(threeHoursAgo.Unix()), + }, + }, + } + rand.Shuffle(len(onChainUpdates), func(i, j int) { onChainUpdates[i], onChainUpdates[j] = onChainUpdates[j], onChainUpdates[i] }) + priceReg.On( + "GetTokenPriceUpdatesCreatedAfter", + mock.Anything, + now.Add(-p.offchainConfig.TokenPriceHeartBeat), // first call should pass the token price heart beat duration + 0, + ).Return(onChainUpdates, nil).Once() + + priceUpdates, err := p.getLatestTokenPriceUpdates(context.Background(), now, false) + assert.NoError(t, err) + // we expect to get only one update, since all three updates above are for the same token + assert.Len(t, priceUpdates, 1) + // and we expect to get the latest price update + assert.Equal(t, big.NewInt(102), priceUpdates[tk1].value) + assert.Equal(t, twoHoursAgo.Unix(), priceUpdates[tk1].timestamp.Unix()) + + priceReg.On( + "GetTokenPriceUpdatesCreatedAfter", + mock.Anything, + twoHoursAgo.Truncate(time.Second), // now we expect to ask for price updates after the most recent price update + 0, + ).Return([]ccipdata.Event[ccipdata.TokenPriceUpdate]{}, nil).Once() + priceUpdates, err = p.getLatestTokenPriceUpdates(context.Background(), now, false) + assert.NoError(t, err) + // and we expect to get the exact same price update since there wasn't anything new recorded onchain + assert.Equal(t, big.NewInt(102), priceUpdates[tk1].value) + assert.Equal(t, twoHoursAgo.Unix(), priceUpdates[tk1].timestamp.Unix()) +} + func Test_commitReportSize(t *testing.T) { testParams := gopter.DefaultTestParameters() testParams.MinSuccessfulTests = 100 From f8ef35cc6f1225645ea375caa65508272db879b3 Mon Sep 17 00:00:00 2001 From: dimkouv Date: Tue, 10 Oct 2023 13:23:10 +0300 Subject: [PATCH 02/15] increase test durations --- .../ocr2/plugins/ccip/commit_price_updates_cache_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go b/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go index 3239c5d5cd..ce51e90f67 100644 --- a/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go +++ b/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go @@ -32,10 +32,10 @@ func Test_tokenPriceUpdatesCache(t *testing.T) { }) t.Run("test expiration", func(t *testing.T) { - c := newTokenPriceUpdatesCache(ctx, 20*time.Nanosecond) // every 1ns cache expires + c := newTokenPriceUpdatesCache(ctx, 200*time.Nanosecond) // every 1ns cache expires assert.Equal(t, time.Time{}, c.mostRecentTs()) c.updateIfMoreRecent(ts, tk, big.NewInt(100)) - time.Sleep(time.Millisecond) + time.Sleep(5 * time.Millisecond) assert.Equal(t, time.Time{}, c.mostRecentTs()) // should have expired assert.Len(t, c.get(), 0) }) From ecd83bdc0c3b0fa760b4238b58ee6a7ebca6fa93 Mon Sep 17 00:00:00 2001 From: dimkouv Date: Wed, 11 Oct 2023 12:28:08 +0300 Subject: [PATCH 03/15] remove cache expiration --- .../ccip/commit_price_updates_cache.go | 35 ++++---------- .../ccip/commit_price_updates_cache_test.go | 46 ++++++------------- .../plugins/ccip/commit_reporting_plugin.go | 4 +- .../ccip/commit_reporting_plugin_test.go | 11 +++-- 4 files changed, 31 insertions(+), 65 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go b/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go index 7b61a16ea1..11530b433a 100644 --- a/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go +++ b/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go @@ -1,7 +1,6 @@ package ccip import ( - "context" "math/big" "sync" "time" @@ -10,36 +9,18 @@ import ( ) type tokenPriceUpdatesCache struct { - mem map[common.Address]update - mu *sync.RWMutex - expiry time.Duration + mem map[common.Address]update + mu *sync.RWMutex } -func newTokenPriceUpdatesCache(ctx context.Context, expiry time.Duration) *tokenPriceUpdatesCache { +func newTokenPriceUpdatesCache() *tokenPriceUpdatesCache { c := &tokenPriceUpdatesCache{ - mem: make(map[common.Address]update), - mu: &sync.RWMutex{}, - expiry: expiry, + mem: make(map[common.Address]update), + mu: &sync.RWMutex{}, } - go c.expirationWorker(ctx) return c } -func (c *tokenPriceUpdatesCache) expirationWorker(ctx context.Context) { - tick := time.NewTicker(c.expiry) - - for { - select { - case <-ctx.Done(): - return - case <-tick.C: - c.mu.Lock() - c.mem = make(map[common.Address]update) - c.mu.Unlock() - } - } -} - func (c *tokenPriceUpdatesCache) mostRecentTs() time.Time { c.mu.RLock() defer c.mu.RUnlock() @@ -68,11 +49,15 @@ func (c *tokenPriceUpdatesCache) updateIfMoreRecent(ts time.Time, tk common.Addr return false } -func (c *tokenPriceUpdatesCache) get() map[common.Address]update { +// get returns all the price updates with timestamp greater than or equal to the provided +func (c *tokenPriceUpdatesCache) get(minTs time.Time) map[common.Address]update { c.mu.RLock() defer c.mu.RUnlock() cp := make(map[common.Address]update, len(c.mem)) for k, v := range c.mem { + if v.timestamp.Before(minTs) { + continue + } cp[k] = v } return cp diff --git a/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go b/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go index ce51e90f67..c42126233b 100644 --- a/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go +++ b/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go @@ -1,7 +1,6 @@ package ccip import ( - "context" "math/big" "testing" "time" @@ -11,42 +10,23 @@ import ( ) func Test_tokenPriceUpdatesCache(t *testing.T) { - ctx := context.Background() - tk := common.HexToAddress("1") ts := time.Now().Truncate(time.Second) - t.Run("base", func(t *testing.T) { - c := newTokenPriceUpdatesCache(ctx, time.Minute) - assert.Equal(t, time.Time{}, c.mostRecentTs()) - - c.updateIfMoreRecent(ts, tk, big.NewInt(100)) - assert.Equal(t, ts, c.mostRecentTs()) - v := c.get() - assert.Equal(t, big.NewInt(100), v[tk].value) + c := newTokenPriceUpdatesCache() + assert.Equal(t, time.Time{}, c.mostRecentTs()) - // should not get updated if ts is older - c.updateIfMoreRecent(ts.Add(-1*time.Minute), tk, big.NewInt(101)) - v = c.get() - assert.Equal(t, big.NewInt(100), v[tk].value) - }) + c.updateIfMoreRecent(ts, tk, big.NewInt(100)) + assert.Equal(t, ts, c.mostRecentTs()) + v := c.get(time.Time{}) + assert.Equal(t, big.NewInt(100), v[tk].value) - t.Run("test expiration", func(t *testing.T) { - c := newTokenPriceUpdatesCache(ctx, 200*time.Nanosecond) // every 1ns cache expires - assert.Equal(t, time.Time{}, c.mostRecentTs()) - c.updateIfMoreRecent(ts, tk, big.NewInt(100)) - time.Sleep(5 * time.Millisecond) - assert.Equal(t, time.Time{}, c.mostRecentTs()) // should have expired - assert.Len(t, c.get(), 0) - }) + // should not get updated if ts is older + c.updateIfMoreRecent(ts.Add(-1*time.Minute), tk, big.NewInt(101)) + v = c.get(time.Time{}) + assert.Equal(t, big.NewInt(100), v[tk].value) - t.Run("test expiration worker cancellation", func(t *testing.T) { - ctx, cf := context.WithCancel(context.Background()) - c := newTokenPriceUpdatesCache(ctx, time.Nanosecond) // every 1ns cache expires - cf() // stop the cancellation worker - c.updateIfMoreRecent(ts, tk, big.NewInt(100)) - time.Sleep(10 * time.Nanosecond) - assert.Equal(t, ts, c.mostRecentTs()) // should not have expired, since worker stopped - assert.Len(t, c.get(), 1) - }) + // should not get anything when the provided timestamp is recent + v = c.get(time.Now()) + assert.Len(t, v, 0) } diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go index 83ccde982d..95d857e182 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go @@ -161,7 +161,7 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin int64(rf.config.commitStore.OffchainConfig().DestFinalityDepth), ), gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(), - tokenPriceUpdatesCache: newTokenPriceUpdatesCache(context.Background(), time.Hour), + tokenPriceUpdatesCache: newTokenPriceUpdatesCache(), }, types.ReportingPluginInfo{ Name: "CCIPCommit", @@ -384,7 +384,7 @@ func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context, ) } - latestUpdates := r.tokenPriceUpdatesCache.get() + latestUpdates := r.tokenPriceUpdatesCache.get(now.Add(-r.offchainConfig.TokenPriceHeartBeat)) if !checkInflight { return latestUpdates, nil } diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go index 32d199db88..e2b3d04215 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go @@ -298,7 +298,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) { p.gasPriceEstimator = gasPriceEstimator p.offchainConfig.GasPriceHeartBeat = gasPriceHeartBeat.Duration() p.commitStoreReader = commitStoreReader - p.tokenPriceUpdatesCache = newTokenPriceUpdatesCache(context.Background(), time.Hour) + p.tokenPriceUpdatesCache = newTokenPriceUpdatesCache() p.F = tc.f aos := make([]types.AttributedObservation, 0, len(tc.observations)) @@ -1507,7 +1507,7 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates(t *testing.T) { //_, priceRegAddr := testhelpers.NewFakePriceRegistry(t) priceReg := ccipdata.NewMockPriceRegistryReader(t) p.destPriceRegistryReader = priceReg - p.tokenPriceUpdatesCache = newTokenPriceUpdatesCache(context.Background(), time.Hour) + p.tokenPriceUpdatesCache = newTokenPriceUpdatesCache() //destReader := ccipdata.NewMockReader(t) var events []ccipdata.Event[ccipdata.TokenPriceUpdate] @@ -1548,9 +1548,10 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates(t *testing.T) { } func TestCommitReportingPlugin_getLatestTokenPriceUpdates_cache(t *testing.T) { + ctx := testutils.Context(t) priceReg := ccipdata.NewMockPriceRegistryReader(t) p := &CommitReportingPlugin{ - tokenPriceUpdatesCache: newTokenPriceUpdatesCache(context.Background(), time.Hour), + tokenPriceUpdatesCache: newTokenPriceUpdatesCache(), destPriceRegistryReader: priceReg, offchainConfig: ccipdata.CommitOffchainConfig{ TokenPriceHeartBeat: 12 * time.Hour, @@ -1592,7 +1593,7 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates_cache(t *testing.T) { 0, ).Return(onChainUpdates, nil).Once() - priceUpdates, err := p.getLatestTokenPriceUpdates(context.Background(), now, false) + priceUpdates, err := p.getLatestTokenPriceUpdates(ctx, now, false) assert.NoError(t, err) // we expect to get only one update, since all three updates above are for the same token assert.Len(t, priceUpdates, 1) @@ -1606,7 +1607,7 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates_cache(t *testing.T) { twoHoursAgo.Truncate(time.Second), // now we expect to ask for price updates after the most recent price update 0, ).Return([]ccipdata.Event[ccipdata.TokenPriceUpdate]{}, nil).Once() - priceUpdates, err = p.getLatestTokenPriceUpdates(context.Background(), now, false) + priceUpdates, err = p.getLatestTokenPriceUpdates(ctx, now, false) assert.NoError(t, err) // and we expect to get the exact same price update since there wasn't anything new recorded onchain assert.Equal(t, big.NewInt(102), priceUpdates[tk1].value) From ce09bc0b408b5f60f50107ddbc635e85257714f8 Mon Sep 17 00:00:00 2001 From: Jean Arnaud Date: Wed, 11 Oct 2023 16:47:49 +0200 Subject: [PATCH 04/15] CCIP-1147 - Cache last update --- .../ocr2/plugins/ccip/commit_inflight.go | 2 +- .../plugins/ccip/commit_reporting_plugin.go | 16 ++++++++-- .../ccip/commit_reporting_plugin_test.go | 1 + .../ocr2/plugins/ccip/price_updates_cache.go | 31 +++++++++++++++++++ 4 files changed, 46 insertions(+), 4 deletions(-) create mode 100644 core/services/ocr2/plugins/ccip/price_updates_cache.go diff --git a/core/services/ocr2/plugins/ccip/commit_inflight.go b/core/services/ocr2/plugins/ccip/commit_inflight.go index 726bbf6020..670bfed6ca 100644 --- a/core/services/ocr2/plugins/ccip/commit_inflight.go +++ b/core/services/ocr2/plugins/ccip/commit_inflight.go @@ -68,7 +68,7 @@ func (c *inflightCommitReportsContainer) maxInflightSeqNr() uint64 { return max } -// latestInflightGasPriceUpdates returns a map of the latest gas price updates. +// latestInflightGasPriceUpdates returns a map of the latest gas price updates indexed by chain selector. func (c *inflightCommitReportsContainer) latestInflightGasPriceUpdates() map[uint64]update { c.locker.RLock() defer c.locker.RUnlock() diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go index bb24464574..37d51d736f 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go @@ -81,6 +81,8 @@ type CommitReportingPlugin struct { priceGetter pricegetter.PriceGetter // State inflightReports *inflightCommitReportsContainer + // Cache + priceUpdatesCache *priceUpdatesCache } type CommitReportingPluginFactory struct { @@ -160,6 +162,7 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin int64(rf.config.commitStore.OffchainConfig().DestFinalityDepth), ), gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(), + priceUpdatesCache: newPriceUpdatesCache(), }, types.ReportingPluginInfo{ Name: "CCIPCommit", @@ -413,17 +416,22 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now } } - // If there are no price updates inflight, check latest prices onchain + // If there are no price updates inflight, check the latest prices onchain. + + // Query from the last cached update timestamp, if any. + queryFrom := now.Add(-r.offchainConfig.GasPriceHeartBeat) + if last := r.priceUpdatesCache.get(); last.timestamp.After(queryFrom) { + queryFrom = last.timestamp + } gasPriceUpdates, err := r.destPriceRegistryReader.GetGasPriceUpdatesCreatedAfter( ctx, r.sourceChainSelector, - now.Add(-r.offchainConfig.GasPriceHeartBeat), + queryFrom, 0, ) if err != nil { return update{}, err } - for _, priceUpdate := range gasPriceUpdates { // Ordered by ascending timestamps timestamp := time.Unix(priceUpdate.Data.Timestamp.Int64(), 0) @@ -432,6 +440,8 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now timestamp: timestamp, value: priceUpdate.Data.Value, } + // Update the cache. + r.priceUpdatesCache.updateCache(gasUpdate) } } diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go index d47d2bd123..6fec381ba5 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go @@ -1389,6 +1389,7 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { p.lggr = lggr destPriceRegistry := ccipdata.NewMockPriceRegistryReader(t) p.destPriceRegistryReader = destPriceRegistry + p.priceUpdatesCache = newPriceUpdatesCache() if tc.inflightGasPriceUpdate != nil { p.inflightReports.inFlightPriceUpdates = append( diff --git a/core/services/ocr2/plugins/ccip/price_updates_cache.go b/core/services/ocr2/plugins/ccip/price_updates_cache.go new file mode 100644 index 0000000000..fa3e01fceb --- /dev/null +++ b/core/services/ocr2/plugins/ccip/price_updates_cache.go @@ -0,0 +1,31 @@ +package ccip + +import ( + "time" +) + +type priceUpdatesCache struct { + lastUpdate update +} + +func newPriceUpdatesCache() *priceUpdatesCache { + return &priceUpdatesCache{ + lastUpdate: update{}, + } +} + +func (c *priceUpdatesCache) containsData() bool { + return c.lastUpdate.timestamp != time.Time{} +} + +func (c *priceUpdatesCache) lastCheckpoint() time.Time { + return c.lastUpdate.timestamp +} + +func (c *priceUpdatesCache) get() update { + return c.lastUpdate +} + +func (c *priceUpdatesCache) updateCache(update update) { + c.lastUpdate = update +} From c7c6b8ab1bbfd28a92d182a5698f9adb695953a1 Mon Sep 17 00:00:00 2001 From: Jean Arnaud Date: Thu, 12 Oct 2023 09:52:25 +0200 Subject: [PATCH 05/15] CCIP-1147 - Update cache only with most recent data --- core/services/ocr2/plugins/ccip/price_updates_cache.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ccip/price_updates_cache.go b/core/services/ocr2/plugins/ccip/price_updates_cache.go index fa3e01fceb..c6639ffd76 100644 --- a/core/services/ocr2/plugins/ccip/price_updates_cache.go +++ b/core/services/ocr2/plugins/ccip/price_updates_cache.go @@ -27,5 +27,7 @@ func (c *priceUpdatesCache) get() update { } func (c *priceUpdatesCache) updateCache(update update) { - c.lastUpdate = update + if update.timestamp.After(c.lastUpdate.timestamp) { + c.lastUpdate = update + } } From ea8c8933beb573b20f9627f76b074379e4b5a323 Mon Sep 17 00:00:00 2001 From: Jean Arnaud Date: Thu, 12 Oct 2023 10:31:04 +0200 Subject: [PATCH 06/15] CCIP-1147 - Remove unused methods --- .../ocr2/plugins/ccip/price_updates_cache.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/price_updates_cache.go b/core/services/ocr2/plugins/ccip/price_updates_cache.go index c6639ffd76..087c7c4ed2 100644 --- a/core/services/ocr2/plugins/ccip/price_updates_cache.go +++ b/core/services/ocr2/plugins/ccip/price_updates_cache.go @@ -1,9 +1,5 @@ package ccip -import ( - "time" -) - type priceUpdatesCache struct { lastUpdate update } @@ -14,14 +10,6 @@ func newPriceUpdatesCache() *priceUpdatesCache { } } -func (c *priceUpdatesCache) containsData() bool { - return c.lastUpdate.timestamp != time.Time{} -} - -func (c *priceUpdatesCache) lastCheckpoint() time.Time { - return c.lastUpdate.timestamp -} - func (c *priceUpdatesCache) get() update { return c.lastUpdate } From 0d701bfc5caaba3498cce400590c6a25b4a21c19 Mon Sep 17 00:00:00 2001 From: Jean Arnaud Date: Thu, 12 Oct 2023 11:05:43 +0200 Subject: [PATCH 07/15] CCIP-1147 - Fix test setup --- core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go index 6fec381ba5..239548eda1 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go @@ -170,6 +170,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) { p.lggr = logger.TestLogger(t) p.tokenDecimalsCache = tokenDecimalsCache p.F = 1 + p.priceUpdatesCache = newPriceUpdatesCache() o := CommitObservation{Interval: ccipdata.CommitStoreInterval{Min: 1, Max: 1}, SourceGasPriceUSD: big.NewInt(0)} obs, err := o.Marshal() @@ -299,6 +300,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) { p.offchainConfig.GasPriceHeartBeat = gasPriceHeartBeat.Duration() p.commitStoreReader = commitStoreReader p.F = tc.f + p.priceUpdatesCache = newPriceUpdatesCache() aos := make([]types.AttributedObservation, 0, len(tc.observations)) for _, o := range tc.observations { From f5cb16d40e9a72913d55c23c52fef86d0442216c Mon Sep 17 00:00:00 2001 From: Jean Arnaud Date: Thu, 12 Oct 2023 15:16:38 +0200 Subject: [PATCH 08/15] CCIP-1147 - Test scenario for cache usage --- .../ccip/commit_reporting_plugin_test.go | 104 +++++++++++++----- 1 file changed, 78 insertions(+), 26 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go index 239548eda1..89064ba407 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go @@ -1343,18 +1343,21 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { chainSelector := uint64(1234) testCases := []struct { - name string - checkInflight bool - inflightGasPriceUpdate *update - destGasPriceUpdates []update - expUpdate update - expErr bool + name string + checkInflight bool + inflightGasPriceUpdate *update + destGasPriceUpdates []update + cacheValue update + expUpdate update + expErr bool + expStartQuery time.Time + mockPriceRegistryReader *ccipdata.MockPriceRegistryReader }{ { name: "only inflight gas price", checkInflight: true, - inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(1000)}, - expUpdate: update{timestamp: now, value: big.NewInt(1000)}, + inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)}, + expUpdate: update{timestamp: now, value: big.NewInt(4000)}, expErr: false, }, { @@ -1362,22 +1365,53 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { checkInflight: true, inflightGasPriceUpdate: nil, destGasPriceUpdates: []update{ - {timestamp: now.Add(time.Minute), value: big.NewInt(2000)}, - {timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)}, + {timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)}, + {timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)}, + {timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, }, - expUpdate: update{timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)}, + expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, expErr: false, }, { - name: "inflight updates are skipped", + name: "inflight updates skipped and cache empty", checkInflight: false, - inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(1000)}, + inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)}, destGasPriceUpdates: []update{ - {timestamp: now.Add(time.Minute), value: big.NewInt(2000)}, - {timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)}, + {timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)}, + {timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)}, + {timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, }, - expUpdate: update{timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)}, - expErr: false, + expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + expErr: false, + expStartQuery: time.Time{}, + }, + { + name: "inflight updates skipped and cache not up to date", + checkInflight: false, + inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)}, + destGasPriceUpdates: []update{ + {timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)}, + {timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)}, + {timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + }, + cacheValue: update{timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)}, + expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + expErr: false, + expStartQuery: now.Add(-2 * time.Minute), + }, + { + name: "inflight updates skipped and cache up to date", + checkInflight: false, + inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)}, + destGasPriceUpdates: []update{ + {timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)}, + {timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)}, + {timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + }, + cacheValue: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + expErr: false, + expStartQuery: now.Add(-1 * time.Minute), }, } @@ -1392,6 +1426,8 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { destPriceRegistry := ccipdata.NewMockPriceRegistryReader(t) p.destPriceRegistryReader = destPriceRegistry p.priceUpdatesCache = newPriceUpdatesCache() + p.priceUpdatesCache.updateCache(tc.cacheValue) + p.offchainConfig.GasPriceHeartBeat = 5 * time.Minute if tc.inflightGasPriceUpdate != nil { p.inflightReports.inFlightPriceUpdates = append( @@ -1406,20 +1442,26 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { ) } + // Build mocked result of GetGasPriceUpdatesCreatedAfter. + destReader := ccipdata.NewMockPriceRegistryReader(t) if len(tc.destGasPriceUpdates) > 0 { var events []ccipdata.Event[ccipdata.GasPriceUpdate] for _, u := range tc.destGasPriceUpdates { - events = append(events, ccipdata.Event[ccipdata.GasPriceUpdate]{ - Data: ccipdata.GasPriceUpdate{ - GasPrice: ccipdata.GasPrice{Value: u.value}, - Timestamp: big.NewInt(u.timestamp.Unix()), - }, - }) + if tc.cacheValue.timestamp.IsZero() || !u.timestamp.Before(tc.cacheValue.timestamp) { + events = append(events, ccipdata.Event[ccipdata.GasPriceUpdate]{ + Data: ccipdata.GasPriceUpdate{ + GasPrice: ccipdata.GasPrice{ + DestChainSelector: chainSelector, + Value: u.value}, + Timestamp: big.NewInt(u.timestamp.Unix()), + }, + }) + } } - destReader := ccipdata.NewMockPriceRegistryReader(t) - destReader.On("GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, 0).Return(events, nil) - p.destPriceRegistryReader = destReader + destReader.On("GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, mock.Anything).Return(events, nil) } + p.destPriceRegistryReader = destReader + tc.mockPriceRegistryReader = destReader priceUpdate, err := p.getLatestGasPriceUpdate(ctx, time.Now(), tc.checkInflight) if tc.expErr { @@ -1430,6 +1472,16 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { assert.NoError(t, err) assert.Equal(t, tc.expUpdate.timestamp.Truncate(time.Second), priceUpdate.timestamp.Truncate(time.Second)) assert.Equal(t, tc.expUpdate.value.Uint64(), priceUpdate.value.Uint64()) + + // Verify proper cache usage: if the cache is used, we shouldn't query the full range. + reader := tc.mockPriceRegistryReader + if tc.checkInflight && tc.inflightGasPriceUpdate != nil { + reader.AssertNotCalled(t, "GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, 0) + } else if tc.expStartQuery.IsZero() { + reader.AssertCalled(t, "GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, 0) + } else { + reader.AssertCalled(t, "GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, tc.expStartQuery, 0) + } }) } } From 48eea0c845f488bce116df4339c59b1d44e6ebbb Mon Sep 17 00:00:00 2001 From: Jean Arnaud Date: Thu, 12 Oct 2023 16:32:49 +0200 Subject: [PATCH 09/15] CCIP-1147 - Remove useless comment --- core/services/ocr2/plugins/ccip/commit_reporting_plugin.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go index 37d51d736f..ed5bd4ea39 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go @@ -440,7 +440,6 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now timestamp: timestamp, value: priceUpdate.Data.Value, } - // Update the cache. r.priceUpdatesCache.updateCache(gasUpdate) } } From 040a801f64c27abab83827b8faae38fc76883139 Mon Sep 17 00:00:00 2001 From: dimkouv Date: Thu, 12 Oct 2023 18:23:53 +0300 Subject: [PATCH 10/15] run gofmt --- core/services/ocr2/plugins/ccip/commit_reporting_plugin.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go index 702b73f6d1..ea8ab2e429 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go @@ -82,7 +82,7 @@ type CommitReportingPlugin struct { // State inflightReports *inflightCommitReportsContainer // Cache - priceUpdatesCache *priceUpdatesCache + priceUpdatesCache *priceUpdatesCache tokenPriceUpdatesCache *tokenPriceUpdatesCache } @@ -162,8 +162,8 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin rf.config.destClient, int64(rf.config.commitStore.OffchainConfig().DestFinalityDepth), ), - gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(), - priceUpdatesCache: newPriceUpdatesCache(), + gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(), + priceUpdatesCache: newPriceUpdatesCache(), tokenPriceUpdatesCache: newTokenPriceUpdatesCache(), }, types.ReportingPluginInfo{ From 7ff9ba8285d016ff6367bb26aeee088c9d54c679 Mon Sep 17 00:00:00 2001 From: dimkouv Date: Fri, 13 Oct 2023 14:25:06 +0300 Subject: [PATCH 11/15] use single cache instance --- .../ccip/commit_price_updates_cache.go | 50 +++++++++++++------ .../ccip/commit_price_updates_cache_test.go | 16 +++--- .../plugins/ccip/commit_reporting_plugin.go | 18 +++---- .../ccip/commit_reporting_plugin_test.go | 9 ++-- .../ocr2/plugins/ccip/price_updates_cache.go | 21 -------- 5 files changed, 54 insertions(+), 60 deletions(-) delete mode 100644 core/services/ocr2/plugins/ccip/price_updates_cache.go diff --git a/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go b/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go index 11530b433a..9274242096 100644 --- a/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go +++ b/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go @@ -8,25 +8,26 @@ import ( "github.com/ethereum/go-ethereum/common" ) -type tokenPriceUpdatesCache struct { - mem map[common.Address]update - mu *sync.RWMutex +type priceUpdatesCache struct { + tokenPriceUpdates map[common.Address]update + gasPriceUpdate update + mu *sync.RWMutex } -func newTokenPriceUpdatesCache() *tokenPriceUpdatesCache { - c := &tokenPriceUpdatesCache{ - mem: make(map[common.Address]update), - mu: &sync.RWMutex{}, +func newPriceUpdatesCache() *priceUpdatesCache { + c := &priceUpdatesCache{ + tokenPriceUpdates: make(map[common.Address]update), + mu: &sync.RWMutex{}, } return c } -func (c *tokenPriceUpdatesCache) mostRecentTs() time.Time { +func (c *priceUpdatesCache) mostRecentTokenPriceUpdate() time.Time { c.mu.RLock() defer c.mu.RUnlock() ts := time.Time{} - for _, upd := range c.mem { + for _, upd := range c.tokenPriceUpdates { if upd.timestamp.After(ts) { ts = upd.timestamp } @@ -34,14 +35,14 @@ func (c *tokenPriceUpdatesCache) mostRecentTs() time.Time { return ts } -func (c *tokenPriceUpdatesCache) updateIfMoreRecent(ts time.Time, tk common.Address, val *big.Int) bool { +func (c *priceUpdatesCache) updateTokenPriceIfMoreRecent(ts time.Time, tk common.Address, val *big.Int) bool { c.mu.RLock() - v, exists := c.mem[tk] + v, exists := c.tokenPriceUpdates[tk] c.mu.RUnlock() if !exists || v.timestamp.Before(ts) { c.mu.Lock() - c.mem[tk] = update{timestamp: ts, value: val} + c.tokenPriceUpdates[tk] = update{timestamp: ts, value: val} c.mu.Unlock() return true } @@ -49,12 +50,12 @@ func (c *tokenPriceUpdatesCache) updateIfMoreRecent(ts time.Time, tk common.Addr return false } -// get returns all the price updates with timestamp greater than or equal to the provided -func (c *tokenPriceUpdatesCache) get(minTs time.Time) map[common.Address]update { +// getTokenPriceUpdates returns all the price updates with timestamp greater than or equal to the provided +func (c *priceUpdatesCache) getTokenPriceUpdates(minTs time.Time) map[common.Address]update { c.mu.RLock() defer c.mu.RUnlock() - cp := make(map[common.Address]update, len(c.mem)) - for k, v := range c.mem { + cp := make(map[common.Address]update, len(c.tokenPriceUpdates)) + for k, v := range c.tokenPriceUpdates { if v.timestamp.Before(minTs) { continue } @@ -62,3 +63,20 @@ func (c *tokenPriceUpdatesCache) get(minTs time.Time) map[common.Address]update } return cp } + +func (c *priceUpdatesCache) getGasPriceUpdate() update { + c.mu.RLock() + defer c.mu.RUnlock() + return c.gasPriceUpdate +} + +func (c *priceUpdatesCache) updateGasPriceIfMoreRecent(update update) bool { + c.mu.Lock() + defer c.mu.Unlock() + if update.timestamp.After(c.gasPriceUpdate.timestamp) { + c.gasPriceUpdate = update + return true + } + + return false +} diff --git a/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go b/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go index c42126233b..3180dac38e 100644 --- a/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go +++ b/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go @@ -13,20 +13,20 @@ func Test_tokenPriceUpdatesCache(t *testing.T) { tk := common.HexToAddress("1") ts := time.Now().Truncate(time.Second) - c := newTokenPriceUpdatesCache() - assert.Equal(t, time.Time{}, c.mostRecentTs()) + c := newPriceUpdatesCache() + assert.Equal(t, time.Time{}, c.mostRecentTokenPriceUpdate()) - c.updateIfMoreRecent(ts, tk, big.NewInt(100)) - assert.Equal(t, ts, c.mostRecentTs()) - v := c.get(time.Time{}) + c.updateTokenPriceIfMoreRecent(ts, tk, big.NewInt(100)) + assert.Equal(t, ts, c.mostRecentTokenPriceUpdate()) + v := c.getTokenPriceUpdates(time.Time{}) assert.Equal(t, big.NewInt(100), v[tk].value) // should not get updated if ts is older - c.updateIfMoreRecent(ts.Add(-1*time.Minute), tk, big.NewInt(101)) - v = c.get(time.Time{}) + c.updateTokenPriceIfMoreRecent(ts.Add(-1*time.Minute), tk, big.NewInt(101)) + v = c.getTokenPriceUpdates(time.Time{}) assert.Equal(t, big.NewInt(100), v[tk].value) // should not get anything when the provided timestamp is recent - v = c.get(time.Now()) + v = c.getTokenPriceUpdates(time.Now()) assert.Len(t, v, 0) } diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go index ea8ab2e429..547b071c43 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go @@ -82,8 +82,7 @@ type CommitReportingPlugin struct { // State inflightReports *inflightCommitReportsContainer // Cache - priceUpdatesCache *priceUpdatesCache - tokenPriceUpdatesCache *tokenPriceUpdatesCache + priceUpdatesCache *priceUpdatesCache } type CommitReportingPluginFactory struct { @@ -162,9 +161,8 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin rf.config.destClient, int64(rf.config.commitStore.OffchainConfig().DestFinalityDepth), ), - gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(), - priceUpdatesCache: newPriceUpdatesCache(), - tokenPriceUpdatesCache: newTokenPriceUpdatesCache(), + gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(), + priceUpdatesCache: newPriceUpdatesCache(), }, types.ReportingPluginInfo{ Name: "CCIPCommit", @@ -366,7 +364,7 @@ func calculateUsdPer1e18TokenAmount(price *big.Int, decimals uint8) *big.Int { // The updates returned by this function are guaranteed to not contain nil values. func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context, now time.Time, checkInflight bool) (map[common.Address]update, error) { ts := now.Add(-r.offchainConfig.TokenPriceHeartBeat) - if mostRecentCachedTs := r.tokenPriceUpdatesCache.mostRecentTs(); mostRecentCachedTs.After(ts) { + if mostRecentCachedTs := r.priceUpdatesCache.mostRecentTokenPriceUpdate(); mostRecentCachedTs.After(ts) { ts = mostRecentCachedTs } @@ -380,14 +378,14 @@ func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context, continue } - r.tokenPriceUpdatesCache.updateIfMoreRecent( + r.priceUpdatesCache.updateTokenPriceIfMoreRecent( time.Unix(upd.Data.Timestamp.Int64(), 0), upd.Data.Token, upd.Data.Value, ) } - latestUpdates := r.tokenPriceUpdatesCache.get(now.Add(-r.offchainConfig.TokenPriceHeartBeat)) + latestUpdates := r.priceUpdatesCache.getTokenPriceUpdates(now.Add(-r.offchainConfig.TokenPriceHeartBeat)) if !checkInflight { return latestUpdates, nil } @@ -424,7 +422,7 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now // Query from the last cached update timestamp, if any. queryFrom := now.Add(-r.offchainConfig.GasPriceHeartBeat) - if last := r.priceUpdatesCache.get(); last.timestamp.After(queryFrom) { + if last := r.priceUpdatesCache.getGasPriceUpdate(); last.timestamp.After(queryFrom) { queryFrom = last.timestamp } gasPriceUpdates, err := r.destPriceRegistryReader.GetGasPriceUpdatesCreatedAfter( @@ -444,7 +442,7 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now timestamp: timestamp, value: priceUpdate.Data.Value, } - r.priceUpdatesCache.updateCache(gasUpdate) + r.priceUpdatesCache.updateGasPriceIfMoreRecent(gasUpdate) } } diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go index c8432667be..5dd2a81bed 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go @@ -299,9 +299,8 @@ func TestCommitReportingPlugin_Report(t *testing.T) { p.gasPriceEstimator = gasPriceEstimator p.offchainConfig.GasPriceHeartBeat = gasPriceHeartBeat.Duration() p.commitStoreReader = commitStoreReader - p.tokenPriceUpdatesCache = newTokenPriceUpdatesCache() - p.F = tc.f p.priceUpdatesCache = newPriceUpdatesCache() + p.F = tc.f aos := make([]types.AttributedObservation, 0, len(tc.observations)) for _, o := range tc.observations { @@ -1427,7 +1426,7 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { destPriceRegistry := ccipdata.NewMockPriceRegistryReader(t) p.destPriceRegistryReader = destPriceRegistry p.priceUpdatesCache = newPriceUpdatesCache() - p.priceUpdatesCache.updateCache(tc.cacheValue) + p.priceUpdatesCache.updateGasPriceIfMoreRecent(tc.cacheValue) p.offchainConfig.GasPriceHeartBeat = 5 * time.Minute if tc.inflightGasPriceUpdate != nil { @@ -1564,7 +1563,7 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates(t *testing.T) { //_, priceRegAddr := testhelpers.NewFakePriceRegistry(t) priceReg := ccipdata.NewMockPriceRegistryReader(t) p.destPriceRegistryReader = priceReg - p.tokenPriceUpdatesCache = newTokenPriceUpdatesCache() + p.priceUpdatesCache = newPriceUpdatesCache() //destReader := ccipdata.NewMockReader(t) var events []ccipdata.Event[ccipdata.TokenPriceUpdate] @@ -1608,7 +1607,7 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates_cache(t *testing.T) { ctx := testutils.Context(t) priceReg := ccipdata.NewMockPriceRegistryReader(t) p := &CommitReportingPlugin{ - tokenPriceUpdatesCache: newTokenPriceUpdatesCache(), + priceUpdatesCache: newPriceUpdatesCache(), destPriceRegistryReader: priceReg, offchainConfig: ccipdata.CommitOffchainConfig{ TokenPriceHeartBeat: 12 * time.Hour, diff --git a/core/services/ocr2/plugins/ccip/price_updates_cache.go b/core/services/ocr2/plugins/ccip/price_updates_cache.go deleted file mode 100644 index 087c7c4ed2..0000000000 --- a/core/services/ocr2/plugins/ccip/price_updates_cache.go +++ /dev/null @@ -1,21 +0,0 @@ -package ccip - -type priceUpdatesCache struct { - lastUpdate update -} - -func newPriceUpdatesCache() *priceUpdatesCache { - return &priceUpdatesCache{ - lastUpdate: update{}, - } -} - -func (c *priceUpdatesCache) get() update { - return c.lastUpdate -} - -func (c *priceUpdatesCache) updateCache(update update) { - if update.timestamp.After(c.lastUpdate.timestamp) { - c.lastUpdate = update - } -} From ce34089549fd2605dd8072aac16259c4451c6585 Mon Sep 17 00:00:00 2001 From: dimkouv Date: Fri, 13 Oct 2023 14:25:48 +0300 Subject: [PATCH 12/15] nit --- core/services/ocr2/plugins/ccip/commit_price_updates_cache.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go b/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go index 9274242096..bd61f1f874 100644 --- a/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go +++ b/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go @@ -15,11 +15,10 @@ type priceUpdatesCache struct { } func newPriceUpdatesCache() *priceUpdatesCache { - c := &priceUpdatesCache{ + return &priceUpdatesCache{ tokenPriceUpdates: make(map[common.Address]update), mu: &sync.RWMutex{}, } - return c } func (c *priceUpdatesCache) mostRecentTokenPriceUpdate() time.Time { From b374a1573d1637312e029b2def87c8152123ddf4 Mon Sep 17 00:00:00 2001 From: dimkouv Date: Fri, 13 Oct 2023 14:39:42 +0300 Subject: [PATCH 13/15] start with existing gas price update to prevent empty result --- core/services/ocr2/plugins/ccip/commit_reporting_plugin.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go index 547b071c43..e76175fcc9 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go @@ -406,6 +406,8 @@ func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context, // getLatestGasPriceUpdate returns the latest gas price update based on logs within the heartbeat. // If an update is found, it is not expected to contain a nil value. If no updates found, empty update with nil value is returned. func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now time.Time, checkInflight bool) (gasUpdate update, error error) { + gasUpdate = r.priceUpdatesCache.getGasPriceUpdate() + if checkInflight { latestInflightGasPriceUpdates := r.inflightReports.latestInflightGasPriceUpdates() if inflightUpdate, exists := latestInflightGasPriceUpdates[r.sourceChainSelector]; exists { From 3221121e351ae03a7cebbc087add51e2f68d12ac Mon Sep 17 00:00:00 2001 From: dimkouv Date: Fri, 13 Oct 2023 14:44:01 +0300 Subject: [PATCH 14/15] return empty update if it's too old --- core/services/ocr2/plugins/ccip/commit_reporting_plugin.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go index e76175fcc9..f95ed4058d 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go @@ -448,6 +448,10 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now } } + // if it's too old return an empty update + if gasUpdate.timestamp.Before(now.Add(-r.offchainConfig.GasPriceHeartBeat)) { + return update{}, nil + } r.lggr.Infow("Latest gas price from log poller", "gasPriceUpdateVal", gasUpdate.value, "gasPriceUpdateTs", gasUpdate.timestamp) return gasUpdate, nil } From a21673c762c7b815db882d6a1979c57329b4669f Mon Sep 17 00:00:00 2001 From: dimkouv Date: Mon, 16 Oct 2023 13:35:33 +0300 Subject: [PATCH 15/15] fix test --- core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go index 5dd2a81bed..cc36882629 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go @@ -278,7 +278,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) { gasPriceEstimator := prices.NewMockGasPriceEstimatorCommit(t) gasPriceEstimator.On("Median", mock.Anything).Return(gasPrice, nil) if tc.gasPriceUpdates != nil { - gasPriceEstimator.On("Deviates", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) + gasPriceEstimator.On("Deviates", mock.Anything, mock.Anything, mock.Anything).Return(false, nil).Maybe() } tokenDecimalsCache := cache.NewMockAutoSync[map[common.Address]uint8](t)