diff --git a/core/services/ocr2/plugins/ccip/commit_inflight.go b/core/services/ocr2/plugins/ccip/commit_inflight.go index 726bbf6020..670bfed6ca 100644 --- a/core/services/ocr2/plugins/ccip/commit_inflight.go +++ b/core/services/ocr2/plugins/ccip/commit_inflight.go @@ -68,7 +68,7 @@ func (c *inflightCommitReportsContainer) maxInflightSeqNr() uint64 { return max } -// latestInflightGasPriceUpdates returns a map of the latest gas price updates. +// latestInflightGasPriceUpdates returns a map of the latest gas price updates indexed by chain selector. func (c *inflightCommitReportsContainer) latestInflightGasPriceUpdates() map[uint64]update { c.locker.RLock() defer c.locker.RUnlock() diff --git a/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go b/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go new file mode 100644 index 0000000000..bd61f1f874 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/commit_price_updates_cache.go @@ -0,0 +1,81 @@ +package ccip + +import ( + "math/big" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" +) + +type priceUpdatesCache struct { + tokenPriceUpdates map[common.Address]update + gasPriceUpdate update + mu *sync.RWMutex +} + +func newPriceUpdatesCache() *priceUpdatesCache { + return &priceUpdatesCache{ + tokenPriceUpdates: make(map[common.Address]update), + mu: &sync.RWMutex{}, + } +} + +func (c *priceUpdatesCache) mostRecentTokenPriceUpdate() time.Time { + c.mu.RLock() + defer c.mu.RUnlock() + + ts := time.Time{} + for _, upd := range c.tokenPriceUpdates { + if upd.timestamp.After(ts) { + ts = upd.timestamp + } + } + return ts +} + +func (c *priceUpdatesCache) updateTokenPriceIfMoreRecent(ts time.Time, tk common.Address, val *big.Int) bool { + c.mu.RLock() + v, exists := c.tokenPriceUpdates[tk] + c.mu.RUnlock() + + if !exists || v.timestamp.Before(ts) { + c.mu.Lock() + c.tokenPriceUpdates[tk] = update{timestamp: ts, value: val} + c.mu.Unlock() + return true + } + + return false +} + +// getTokenPriceUpdates returns all the price updates with timestamp greater than or equal to the provided +func (c *priceUpdatesCache) getTokenPriceUpdates(minTs time.Time) map[common.Address]update { + c.mu.RLock() + defer c.mu.RUnlock() + cp := make(map[common.Address]update, len(c.tokenPriceUpdates)) + for k, v := range c.tokenPriceUpdates { + if v.timestamp.Before(minTs) { + continue + } + cp[k] = v + } + return cp +} + +func (c *priceUpdatesCache) getGasPriceUpdate() update { + c.mu.RLock() + defer c.mu.RUnlock() + return c.gasPriceUpdate +} + +func (c *priceUpdatesCache) updateGasPriceIfMoreRecent(update update) bool { + c.mu.Lock() + defer c.mu.Unlock() + if update.timestamp.After(c.gasPriceUpdate.timestamp) { + c.gasPriceUpdate = update + return true + } + + return false +} diff --git a/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go b/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go new file mode 100644 index 0000000000..3180dac38e --- /dev/null +++ b/core/services/ocr2/plugins/ccip/commit_price_updates_cache_test.go @@ -0,0 +1,32 @@ +package ccip + +import ( + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" +) + +func Test_tokenPriceUpdatesCache(t *testing.T) { + tk := common.HexToAddress("1") + ts := time.Now().Truncate(time.Second) + + c := newPriceUpdatesCache() + assert.Equal(t, time.Time{}, c.mostRecentTokenPriceUpdate()) + + c.updateTokenPriceIfMoreRecent(ts, tk, big.NewInt(100)) + assert.Equal(t, ts, c.mostRecentTokenPriceUpdate()) + v := c.getTokenPriceUpdates(time.Time{}) + assert.Equal(t, big.NewInt(100), v[tk].value) + + // should not get updated if ts is older + c.updateTokenPriceIfMoreRecent(ts.Add(-1*time.Minute), tk, big.NewInt(101)) + v = c.getTokenPriceUpdates(time.Time{}) + assert.Equal(t, big.NewInt(100), v[tk].value) + + // should not get anything when the provided timestamp is recent + v = c.getTokenPriceUpdates(time.Now()) + assert.Len(t, v, 0) +} diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go index bb24464574..f95ed4058d 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go @@ -81,6 +81,8 @@ type CommitReportingPlugin struct { priceGetter pricegetter.PriceGetter // State inflightReports *inflightCommitReportsContainer + // Cache + priceUpdatesCache *priceUpdatesCache } type CommitReportingPluginFactory struct { @@ -160,6 +162,7 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin int64(rf.config.commitStore.OffchainConfig().DestFinalityDepth), ), gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(), + priceUpdatesCache: newPriceUpdatesCache(), }, types.ReportingPluginInfo{ Name: "CCIPCommit", @@ -360,27 +363,29 @@ func calculateUsdPer1e18TokenAmount(price *big.Int, decimals uint8) *big.Int { // Gets the latest token price updates based on logs within the heartbeat // The updates returned by this function are guaranteed to not contain nil values. func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context, now time.Time, checkInflight bool) (map[common.Address]update, error) { - tokenPriceUpdates, err := r.destPriceRegistryReader.GetTokenPriceUpdatesCreatedAfter( - ctx, - now.Add(-r.offchainConfig.TokenPriceHeartBeat), - 0, - ) + ts := now.Add(-r.offchainConfig.TokenPriceHeartBeat) + if mostRecentCachedTs := r.priceUpdatesCache.mostRecentTokenPriceUpdate(); mostRecentCachedTs.After(ts) { + ts = mostRecentCachedTs + } + + newTokenPriceUpdates, err := r.destPriceRegistryReader.GetTokenPriceUpdatesCreatedAfter(ctx, ts, 0) if err != nil { return nil, err } - latestUpdates := make(map[common.Address]update) - for _, tokenUpdate := range tokenPriceUpdates { - priceUpdate := tokenUpdate.Data - // Ordered by ascending timestamps - timestamp := time.Unix(priceUpdate.Timestamp.Int64(), 0) - if priceUpdate.Value != nil && !timestamp.Before(latestUpdates[priceUpdate.Token].timestamp) { - latestUpdates[priceUpdate.Token] = update{ - timestamp: timestamp, - value: priceUpdate.Value, - } + for _, upd := range newTokenPriceUpdates { + if upd.Data.Value == nil { + continue } + + r.priceUpdatesCache.updateTokenPriceIfMoreRecent( + time.Unix(upd.Data.Timestamp.Int64(), 0), + upd.Data.Token, + upd.Data.Value, + ) } + + latestUpdates := r.priceUpdatesCache.getTokenPriceUpdates(now.Add(-r.offchainConfig.TokenPriceHeartBeat)) if !checkInflight { return latestUpdates, nil } @@ -401,6 +406,8 @@ func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context, // getLatestGasPriceUpdate returns the latest gas price update based on logs within the heartbeat. // If an update is found, it is not expected to contain a nil value. If no updates found, empty update with nil value is returned. func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now time.Time, checkInflight bool) (gasUpdate update, error error) { + gasUpdate = r.priceUpdatesCache.getGasPriceUpdate() + if checkInflight { latestInflightGasPriceUpdates := r.inflightReports.latestInflightGasPriceUpdates() if inflightUpdate, exists := latestInflightGasPriceUpdates[r.sourceChainSelector]; exists { @@ -413,17 +420,22 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now } } - // If there are no price updates inflight, check latest prices onchain + // If there are no price updates inflight, check the latest prices onchain. + + // Query from the last cached update timestamp, if any. + queryFrom := now.Add(-r.offchainConfig.GasPriceHeartBeat) + if last := r.priceUpdatesCache.getGasPriceUpdate(); last.timestamp.After(queryFrom) { + queryFrom = last.timestamp + } gasPriceUpdates, err := r.destPriceRegistryReader.GetGasPriceUpdatesCreatedAfter( ctx, r.sourceChainSelector, - now.Add(-r.offchainConfig.GasPriceHeartBeat), + queryFrom, 0, ) if err != nil { return update{}, err } - for _, priceUpdate := range gasPriceUpdates { // Ordered by ascending timestamps timestamp := time.Unix(priceUpdate.Data.Timestamp.Int64(), 0) @@ -432,9 +444,14 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now timestamp: timestamp, value: priceUpdate.Data.Value, } + r.priceUpdatesCache.updateGasPriceIfMoreRecent(gasUpdate) } } + // if it's too old return an empty update + if gasUpdate.timestamp.Before(now.Add(-r.offchainConfig.GasPriceHeartBeat)) { + return update{}, nil + } r.lggr.Infow("Latest gas price from log poller", "gasPriceUpdateVal", gasUpdate.value, "gasPriceUpdateTs", gasUpdate.timestamp) return gasUpdate, nil } diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go index d47d2bd123..cc36882629 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go @@ -170,6 +170,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) { p.lggr = logger.TestLogger(t) p.tokenDecimalsCache = tokenDecimalsCache p.F = 1 + p.priceUpdatesCache = newPriceUpdatesCache() o := CommitObservation{Interval: ccipdata.CommitStoreInterval{Min: 1, Max: 1}, SourceGasPriceUSD: big.NewInt(0)} obs, err := o.Marshal() @@ -277,7 +278,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) { gasPriceEstimator := prices.NewMockGasPriceEstimatorCommit(t) gasPriceEstimator.On("Median", mock.Anything).Return(gasPrice, nil) if tc.gasPriceUpdates != nil { - gasPriceEstimator.On("Deviates", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) + gasPriceEstimator.On("Deviates", mock.Anything, mock.Anything, mock.Anything).Return(false, nil).Maybe() } tokenDecimalsCache := cache.NewMockAutoSync[map[common.Address]uint8](t) @@ -298,6 +299,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) { p.gasPriceEstimator = gasPriceEstimator p.offchainConfig.GasPriceHeartBeat = gasPriceHeartBeat.Duration() p.commitStoreReader = commitStoreReader + p.priceUpdatesCache = newPriceUpdatesCache() p.F = tc.f aos := make([]types.AttributedObservation, 0, len(tc.observations)) @@ -1341,18 +1343,21 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { chainSelector := uint64(1234) testCases := []struct { - name string - checkInflight bool - inflightGasPriceUpdate *update - destGasPriceUpdates []update - expUpdate update - expErr bool + name string + checkInflight bool + inflightGasPriceUpdate *update + destGasPriceUpdates []update + cacheValue update + expUpdate update + expErr bool + expStartQuery time.Time + mockPriceRegistryReader *ccipdata.MockPriceRegistryReader }{ { name: "only inflight gas price", checkInflight: true, - inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(1000)}, - expUpdate: update{timestamp: now, value: big.NewInt(1000)}, + inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)}, + expUpdate: update{timestamp: now, value: big.NewInt(4000)}, expErr: false, }, { @@ -1360,22 +1365,53 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { checkInflight: true, inflightGasPriceUpdate: nil, destGasPriceUpdates: []update{ - {timestamp: now.Add(time.Minute), value: big.NewInt(2000)}, - {timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)}, + {timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)}, + {timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)}, + {timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, }, - expUpdate: update{timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)}, + expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, expErr: false, }, { - name: "inflight updates are skipped", + name: "inflight updates skipped and cache empty", checkInflight: false, - inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(1000)}, + inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)}, destGasPriceUpdates: []update{ - {timestamp: now.Add(time.Minute), value: big.NewInt(2000)}, - {timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)}, + {timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)}, + {timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)}, + {timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, }, - expUpdate: update{timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)}, - expErr: false, + expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + expErr: false, + expStartQuery: time.Time{}, + }, + { + name: "inflight updates skipped and cache not up to date", + checkInflight: false, + inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)}, + destGasPriceUpdates: []update{ + {timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)}, + {timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)}, + {timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + }, + cacheValue: update{timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)}, + expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + expErr: false, + expStartQuery: now.Add(-2 * time.Minute), + }, + { + name: "inflight updates skipped and cache up to date", + checkInflight: false, + inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)}, + destGasPriceUpdates: []update{ + {timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)}, + {timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)}, + {timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + }, + cacheValue: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + expErr: false, + expStartQuery: now.Add(-1 * time.Minute), }, } @@ -1389,6 +1425,9 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { p.lggr = lggr destPriceRegistry := ccipdata.NewMockPriceRegistryReader(t) p.destPriceRegistryReader = destPriceRegistry + p.priceUpdatesCache = newPriceUpdatesCache() + p.priceUpdatesCache.updateGasPriceIfMoreRecent(tc.cacheValue) + p.offchainConfig.GasPriceHeartBeat = 5 * time.Minute if tc.inflightGasPriceUpdate != nil { p.inflightReports.inFlightPriceUpdates = append( @@ -1403,20 +1442,26 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { ) } + // Build mocked result of GetGasPriceUpdatesCreatedAfter. + destReader := ccipdata.NewMockPriceRegistryReader(t) if len(tc.destGasPriceUpdates) > 0 { var events []ccipdata.Event[ccipdata.GasPriceUpdate] for _, u := range tc.destGasPriceUpdates { - events = append(events, ccipdata.Event[ccipdata.GasPriceUpdate]{ - Data: ccipdata.GasPriceUpdate{ - GasPrice: ccipdata.GasPrice{Value: u.value}, - Timestamp: big.NewInt(u.timestamp.Unix()), - }, - }) + if tc.cacheValue.timestamp.IsZero() || !u.timestamp.Before(tc.cacheValue.timestamp) { + events = append(events, ccipdata.Event[ccipdata.GasPriceUpdate]{ + Data: ccipdata.GasPriceUpdate{ + GasPrice: ccipdata.GasPrice{ + DestChainSelector: chainSelector, + Value: u.value}, + Timestamp: big.NewInt(u.timestamp.Unix()), + }, + }) + } } - destReader := ccipdata.NewMockPriceRegistryReader(t) - destReader.On("GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, 0).Return(events, nil) - p.destPriceRegistryReader = destReader + destReader.On("GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, mock.Anything).Return(events, nil) } + p.destPriceRegistryReader = destReader + tc.mockPriceRegistryReader = destReader priceUpdate, err := p.getLatestGasPriceUpdate(ctx, time.Now(), tc.checkInflight) if tc.expErr { @@ -1427,6 +1472,16 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { assert.NoError(t, err) assert.Equal(t, tc.expUpdate.timestamp.Truncate(time.Second), priceUpdate.timestamp.Truncate(time.Second)) assert.Equal(t, tc.expUpdate.value.Uint64(), priceUpdate.value.Uint64()) + + // Verify proper cache usage: if the cache is used, we shouldn't query the full range. + reader := tc.mockPriceRegistryReader + if tc.checkInflight && tc.inflightGasPriceUpdate != nil { + reader.AssertNotCalled(t, "GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, 0) + } else if tc.expStartQuery.IsZero() { + reader.AssertCalled(t, "GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, 0) + } else { + reader.AssertCalled(t, "GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, tc.expStartQuery, 0) + } }) } } @@ -1508,6 +1563,7 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates(t *testing.T) { //_, priceRegAddr := testhelpers.NewFakePriceRegistry(t) priceReg := ccipdata.NewMockPriceRegistryReader(t) p.destPriceRegistryReader = priceReg + p.priceUpdatesCache = newPriceUpdatesCache() //destReader := ccipdata.NewMockReader(t) var events []ccipdata.Event[ccipdata.TokenPriceUpdate] @@ -1547,6 +1603,73 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates(t *testing.T) { } +func TestCommitReportingPlugin_getLatestTokenPriceUpdates_cache(t *testing.T) { + ctx := testutils.Context(t) + priceReg := ccipdata.NewMockPriceRegistryReader(t) + p := &CommitReportingPlugin{ + priceUpdatesCache: newPriceUpdatesCache(), + destPriceRegistryReader: priceReg, + offchainConfig: ccipdata.CommitOffchainConfig{ + TokenPriceHeartBeat: 12 * time.Hour, + }, + } + + twoHoursAgo := time.Now().Add(-2 * time.Hour) + threeHoursAgo := time.Now().Add(-3 * time.Hour) + fourHoursAgo := time.Now().Add(-4 * time.Hour) + + tk1 := utils.RandomAddress() + now := time.Now() + + onChainUpdates := []ccipdata.Event[ccipdata.TokenPriceUpdate]{ + { + Data: ccipdata.TokenPriceUpdate{ + TokenPrice: ccipdata.TokenPrice{Token: tk1, Value: big.NewInt(100)}, + Timestamp: big.NewInt(0).SetInt64(fourHoursAgo.Unix()), + }, + }, + { + Data: ccipdata.TokenPriceUpdate{ + TokenPrice: ccipdata.TokenPrice{Token: tk1, Value: big.NewInt(102)}, + Timestamp: big.NewInt(0).SetInt64(twoHoursAgo.Unix()), + }, + }, + { + Data: ccipdata.TokenPriceUpdate{ + TokenPrice: ccipdata.TokenPrice{Token: tk1, Value: big.NewInt(101)}, + Timestamp: big.NewInt(0).SetInt64(threeHoursAgo.Unix()), + }, + }, + } + rand.Shuffle(len(onChainUpdates), func(i, j int) { onChainUpdates[i], onChainUpdates[j] = onChainUpdates[j], onChainUpdates[i] }) + priceReg.On( + "GetTokenPriceUpdatesCreatedAfter", + mock.Anything, + now.Add(-p.offchainConfig.TokenPriceHeartBeat), // first call should pass the token price heart beat duration + 0, + ).Return(onChainUpdates, nil).Once() + + priceUpdates, err := p.getLatestTokenPriceUpdates(ctx, now, false) + assert.NoError(t, err) + // we expect to get only one update, since all three updates above are for the same token + assert.Len(t, priceUpdates, 1) + // and we expect to get the latest price update + assert.Equal(t, big.NewInt(102), priceUpdates[tk1].value) + assert.Equal(t, twoHoursAgo.Unix(), priceUpdates[tk1].timestamp.Unix()) + + priceReg.On( + "GetTokenPriceUpdatesCreatedAfter", + mock.Anything, + twoHoursAgo.Truncate(time.Second), // now we expect to ask for price updates after the most recent price update + 0, + ).Return([]ccipdata.Event[ccipdata.TokenPriceUpdate]{}, nil).Once() + priceUpdates, err = p.getLatestTokenPriceUpdates(ctx, now, false) + assert.NoError(t, err) + // and we expect to get the exact same price update since there wasn't anything new recorded onchain + assert.Equal(t, big.NewInt(102), priceUpdates[tk1].value) + assert.Equal(t, twoHoursAgo.Unix(), priceUpdates[tk1].timestamp.Unix()) +} + func Test_commitReportSize(t *testing.T) { testParams := gopter.DefaultTestParameters() testParams.MinSuccessfulTests = 100