Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

offchain - commit store updates fetching optimization #195

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/ccip/commit_inflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (c *inflightCommitReportsContainer) maxInflightSeqNr() uint64 {
return max
}

// latestInflightGasPriceUpdates returns a map of the latest gas price updates.
// latestInflightGasPriceUpdates returns a map of the latest gas price updates indexed by chain selector.
func (c *inflightCommitReportsContainer) latestInflightGasPriceUpdates() map[uint64]update {
c.locker.RLock()
defer c.locker.RUnlock()
Expand Down
81 changes: 81 additions & 0 deletions core/services/ocr2/plugins/ccip/commit_price_updates_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package ccip

import (
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
)

type priceUpdatesCache struct {
tokenPriceUpdates map[common.Address]update
gasPriceUpdate update
mu *sync.RWMutex
}

func newPriceUpdatesCache() *priceUpdatesCache {
return &priceUpdatesCache{
tokenPriceUpdates: make(map[common.Address]update),
mu: &sync.RWMutex{},
}
}

func (c *priceUpdatesCache) mostRecentTokenPriceUpdate() time.Time {
c.mu.RLock()
defer c.mu.RUnlock()

ts := time.Time{}
for _, upd := range c.tokenPriceUpdates {
if upd.timestamp.After(ts) {
ts = upd.timestamp
}
}
return ts
}

func (c *priceUpdatesCache) updateTokenPriceIfMoreRecent(ts time.Time, tk common.Address, val *big.Int) bool {
c.mu.RLock()
v, exists := c.tokenPriceUpdates[tk]
c.mu.RUnlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we keep the lock over the entire method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep the lock over the entire method we will have a deadlock.

If we use only a Lock() over the entire method, instead of both RLock() for read and Lock() for updates then it's slightly less efficient.


if !exists || v.timestamp.Before(ts) {
c.mu.Lock()
c.tokenPriceUpdates[tk] = update{timestamp: ts, value: val}
c.mu.Unlock()
return true
}

return false
}

// getTokenPriceUpdates returns all the price updates with timestamp greater than or equal to the provided
func (c *priceUpdatesCache) getTokenPriceUpdates(minTs time.Time) map[common.Address]update {
c.mu.RLock()
defer c.mu.RUnlock()
cp := make(map[common.Address]update, len(c.tokenPriceUpdates))
for k, v := range c.tokenPriceUpdates {
if v.timestamp.Before(minTs) {
continue
}
cp[k] = v
}
return cp
}

func (c *priceUpdatesCache) getGasPriceUpdate() update {
c.mu.RLock()
defer c.mu.RUnlock()
return c.gasPriceUpdate
}

func (c *priceUpdatesCache) updateGasPriceIfMoreRecent(update update) bool {
c.mu.Lock()
defer c.mu.Unlock()
if update.timestamp.After(c.gasPriceUpdate.timestamp) {
c.gasPriceUpdate = update
return true
}

return false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package ccip

import (
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
)

func Test_tokenPriceUpdatesCache(t *testing.T) {
tk := common.HexToAddress("1")
ts := time.Now().Truncate(time.Second)

c := newPriceUpdatesCache()
assert.Equal(t, time.Time{}, c.mostRecentTokenPriceUpdate())

c.updateTokenPriceIfMoreRecent(ts, tk, big.NewInt(100))
assert.Equal(t, ts, c.mostRecentTokenPriceUpdate())
v := c.getTokenPriceUpdates(time.Time{})
assert.Equal(t, big.NewInt(100), v[tk].value)

// should not get updated if ts is older
c.updateTokenPriceIfMoreRecent(ts.Add(-1*time.Minute), tk, big.NewInt(101))
v = c.getTokenPriceUpdates(time.Time{})
assert.Equal(t, big.NewInt(100), v[tk].value)

// should not get anything when the provided timestamp is recent
v = c.getTokenPriceUpdates(time.Now())
assert.Len(t, v, 0)
}
53 changes: 35 additions & 18 deletions core/services/ocr2/plugins/ccip/commit_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type CommitReportingPlugin struct {
priceGetter pricegetter.PriceGetter
// State
inflightReports *inflightCommitReportsContainer
// Cache
priceUpdatesCache *priceUpdatesCache
}

type CommitReportingPluginFactory struct {
Expand Down Expand Up @@ -160,6 +162,7 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin
int64(rf.config.commitStore.OffchainConfig().DestFinalityDepth),
),
gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(),
priceUpdatesCache: newPriceUpdatesCache(),
},
types.ReportingPluginInfo{
Name: "CCIPCommit",
Expand Down Expand Up @@ -360,27 +363,29 @@ func calculateUsdPer1e18TokenAmount(price *big.Int, decimals uint8) *big.Int {
// Gets the latest token price updates based on logs within the heartbeat
// The updates returned by this function are guaranteed to not contain nil values.
func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context, now time.Time, checkInflight bool) (map[common.Address]update, error) {
tokenPriceUpdates, err := r.destPriceRegistryReader.GetTokenPriceUpdatesCreatedAfter(
ctx,
now.Add(-r.offchainConfig.TokenPriceHeartBeat),
0,
)
ts := now.Add(-r.offchainConfig.TokenPriceHeartBeat)
if mostRecentCachedTs := r.priceUpdatesCache.mostRecentTokenPriceUpdate(); mostRecentCachedTs.After(ts) {
ts = mostRecentCachedTs
}

newTokenPriceUpdates, err := r.destPriceRegistryReader.GetTokenPriceUpdatesCreatedAfter(ctx, ts, 0)
if err != nil {
return nil, err
}

latestUpdates := make(map[common.Address]update)
for _, tokenUpdate := range tokenPriceUpdates {
priceUpdate := tokenUpdate.Data
// Ordered by ascending timestamps
timestamp := time.Unix(priceUpdate.Timestamp.Int64(), 0)
if priceUpdate.Value != nil && !timestamp.Before(latestUpdates[priceUpdate.Token].timestamp) {
latestUpdates[priceUpdate.Token] = update{
timestamp: timestamp,
value: priceUpdate.Value,
}
for _, upd := range newTokenPriceUpdates {
if upd.Data.Value == nil {
continue
}

r.priceUpdatesCache.updateTokenPriceIfMoreRecent(
time.Unix(upd.Data.Timestamp.Int64(), 0),
upd.Data.Token,
upd.Data.Value,
)
}

latestUpdates := r.priceUpdatesCache.getTokenPriceUpdates(now.Add(-r.offchainConfig.TokenPriceHeartBeat))
if !checkInflight {
return latestUpdates, nil
}
Expand All @@ -401,6 +406,8 @@ func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context,
// getLatestGasPriceUpdate returns the latest gas price update based on logs within the heartbeat.
// If an update is found, it is not expected to contain a nil value. If no updates found, empty update with nil value is returned.
func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now time.Time, checkInflight bool) (gasUpdate update, error error) {
gasUpdate = r.priceUpdatesCache.getGasPriceUpdate()

if checkInflight {
latestInflightGasPriceUpdates := r.inflightReports.latestInflightGasPriceUpdates()
if inflightUpdate, exists := latestInflightGasPriceUpdates[r.sourceChainSelector]; exists {
Expand All @@ -413,17 +420,22 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now
}
}

// If there are no price updates inflight, check latest prices onchain
// If there are no price updates inflight, check the latest prices onchain.

// Query from the last cached update timestamp, if any.
queryFrom := now.Add(-r.offchainConfig.GasPriceHeartBeat)
if last := r.priceUpdatesCache.getGasPriceUpdate(); last.timestamp.After(queryFrom) {
queryFrom = last.timestamp
}
gasPriceUpdates, err := r.destPriceRegistryReader.GetGasPriceUpdatesCreatedAfter(
ctx,
r.sourceChainSelector,
now.Add(-r.offchainConfig.GasPriceHeartBeat),
queryFrom,
0,
)
if err != nil {
return update{}, err
}

for _, priceUpdate := range gasPriceUpdates {
// Ordered by ascending timestamps
timestamp := time.Unix(priceUpdate.Data.Timestamp.Int64(), 0)
Expand All @@ -432,9 +444,14 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now
timestamp: timestamp,
value: priceUpdate.Data.Value,
}
r.priceUpdatesCache.updateGasPriceIfMoreRecent(gasUpdate)
}
}

// if it's too old return an empty update
if gasUpdate.timestamp.Before(now.Add(-r.offchainConfig.GasPriceHeartBeat)) {
return update{}, nil
}
r.lggr.Infow("Latest gas price from log poller", "gasPriceUpdateVal", gasUpdate.value, "gasPriceUpdateTs", gasUpdate.timestamp)
return gasUpdate, nil
}
Expand Down
Loading