Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aggregate token and gas price heartbeat updates into batches #1282

Merged
merged 3 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 56 additions & 20 deletions core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,24 +461,42 @@ func (r *CommitReportingPlugin) selectPriceUpdates(ctx context.Context, now time
// The returned latestGasPrice and latestTokenPrices should not contain nil values.
func (r *CommitReportingPlugin) calculatePriceUpdates(gasPriceObs map[uint64][]*big.Int, tokenPriceObs map[cciptypes.Address][]*big.Int, latestGasPrice map[uint64]update, latestTokenPrices map[cciptypes.Address]update) ([]cciptypes.GasPrice, []cciptypes.TokenPrice, error) {
var tokenPriceUpdates []cciptypes.TokenPrice
// Token prices are mostly heartbeat driven. To maximize heartbeat batching, the price inclusion rule is as follows:
// If any token requires heartbeat update, include all token prices in the report.
// Otherwise, only include token prices that exceed deviation threshold.
needTokenHeartbeat := false
for token := range tokenPriceObs {
latestTokenPrice, exists := latestTokenPrices[token]
if !exists || time.Since(latestTokenPrice.timestamp) >= r.offchainConfig.TokenPriceHeartBeat {
r.lggr.Infow("Token requires heartbeat update", "token", token)
needTokenHeartbeat = true
break
}
}

for token, tokenPriceObservations := range tokenPriceObs {
medianPrice := ccipcalc.BigIntSortedMiddle(tokenPriceObservations)

if needTokenHeartbeat {
r.lggr.Debugw("Token price update included due to heartbeat", "token", token, "newPrice", medianPrice)
tokenPriceUpdates = append(tokenPriceUpdates, cciptypes.TokenPrice{
Token: token,
Value: medianPrice,
})
continue
}

latestTokenPrice, exists := latestTokenPrices[token]
if exists {
tokenPriceUpdatedRecently := time.Since(latestTokenPrice.timestamp) < r.offchainConfig.TokenPriceHeartBeat
tokenPriceNotChanged := !ccipcalc.Deviates(medianPrice, latestTokenPrice.value, int64(r.offchainConfig.TokenPriceDeviationPPB))
if tokenPriceUpdatedRecently && tokenPriceNotChanged {
r.lggr.Debugw("token price was updated recently, skipping the update",
if ccipcalc.Deviates(medianPrice, latestTokenPrice.value, int64(r.offchainConfig.TokenPriceDeviationPPB)) {
r.lggr.Debugw("Token price update included due to deviation",
"token", token, "newPrice", medianPrice, "existingPrice", latestTokenPrice.value)
continue // skip the update if we recently had a price update close to the new value
tokenPriceUpdates = append(tokenPriceUpdates, cciptypes.TokenPrice{
Token: token,
Value: medianPrice,
})
}
}

tokenPriceUpdates = append(tokenPriceUpdates, cciptypes.TokenPrice{
Token: token,
Value: medianPrice,
})
}

// Determinism required.
Expand All @@ -487,31 +505,49 @@ func (r *CommitReportingPlugin) calculatePriceUpdates(gasPriceObs map[uint64][]*
})

var gasPriceUpdate []cciptypes.GasPrice
// Gas prices are mostly heartbeat driven. To maximize heartbeat batching, the price inclusion rule is as follows:
// If any source chain gas price requires heartbeat update, include all gas prices in the report.
// Otherwise, only include gas prices that exceed deviation threshold.
needGasHeartbeat := false
for chainSelector := range gasPriceObs {
latestGasPrice, exists := latestGasPrice[chainSelector]
if !exists || latestGasPrice.value == nil || time.Since(latestGasPrice.timestamp) >= r.offchainConfig.GasPriceHeartBeat {
r.lggr.Infow("Chain gas price requires heartbeat update", "chainSelector", chainSelector)
needGasHeartbeat = true
break
}
}

for chainSelector, gasPriceObservations := range gasPriceObs {
newGasPrice, err := r.gasPriceEstimator.Median(gasPriceObservations) // Compute the median price
if err != nil {
return nil, nil, fmt.Errorf("failed to calculate median gas price for chain selector %d: %w", chainSelector, err)
}

// Default to updating so that we update if there are no prior updates.
if needGasHeartbeat {
r.lggr.Debugw("Gas price update included due to heartbeat", "chainSelector", chainSelector)
gasPriceUpdate = append(gasPriceUpdate, cciptypes.GasPrice{
DestChainSelector: chainSelector,
Value: newGasPrice,
})
continue
}

latestGasPrice, exists := latestGasPrice[chainSelector]
if exists && latestGasPrice.value != nil {
gasPriceUpdatedRecently := time.Since(latestGasPrice.timestamp) < r.offchainConfig.GasPriceHeartBeat
gasPriceDeviated, err := r.gasPriceEstimator.Deviates(newGasPrice, latestGasPrice.value)
if err != nil {
return nil, nil, err
}
if gasPriceUpdatedRecently && !gasPriceDeviated {
r.lggr.Debugw("gas price was updated recently and not deviated sufficiently, skipping the update",
if gasPriceDeviated {
r.lggr.Debugw("Gas price update included due to deviation",
"chainSelector", chainSelector, "newPrice", newGasPrice, "existingPrice", latestGasPrice.value)
continue
gasPriceUpdate = append(gasPriceUpdate, cciptypes.GasPrice{
DestChainSelector: chainSelector,
Value: newGasPrice,
})
}
}

gasPriceUpdate = append(gasPriceUpdate, cciptypes.GasPrice{
DestChainSelector: chainSelector,
Value: newGasPrice,
})
}

sort.Slice(gasPriceUpdate, func(i, j int) bool {
Expand Down
150 changes: 137 additions & 13 deletions core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,7 @@ func TestCommitReportingPlugin_calculatePriceUpdates(t *testing.T) {
expGasUpdates: []cciptypes.GasPrice{{DestChainSelector: defaultSourceChainSelector, Value: val1e18(20)}},
},
{
name: "multichain gas prices",
name: "multi-chain gas price updates due to heartbeat",
commitObservations: []ccip.CommitObservation{
{SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(1)}},
{SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector + 1: val1e18(11)}},
Expand Down Expand Up @@ -1162,9 +1162,47 @@ func TestCommitReportingPlugin_calculatePriceUpdates(t *testing.T) {
f: 1,
expGasUpdates: []cciptypes.GasPrice{
{DestChainSelector: defaultSourceChainSelector, Value: val1e18(2)},
{DestChainSelector: defaultSourceChainSelector + 1, Value: val1e18(22)},
{DestChainSelector: defaultSourceChainSelector + 2, Value: val1e18(222)},
},
},
{
name: "multi-chain gas prices but only one updates due to deviation",
commitObservations: []ccip.CommitObservation{
{SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(1)}},
{SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector + 1: val1e18(11)}},
{SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector + 2: val1e18(111)}},
{SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(2)}},
{SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector + 1: val1e18(22)}},
{SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector + 2: val1e18(222)}},
{SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(3)}},
{SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector + 1: val1e18(33)}},
{SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector + 2: val1e18(333)}},
},
gasPriceHeartBeat: *config.MustNewDuration(time.Hour),
daGasPriceDeviationPPB: 20e7,
execGasPriceDeviationPPB: 20e7,
tokenPriceHeartBeat: *config.MustNewDuration(time.Hour),
tokenPriceDeviationPPB: 20e7,
latestGasPrice: map[uint64]update{
defaultSourceChainSelector: {
timestamp: time.Now().Add(-30 * time.Minute), // recent
value: val1e18(9), // median deviates
},
defaultSourceChainSelector + 1: {
timestamp: time.Now().Add(-30 * time.Minute), // recent
value: val1e18(20), // median does not deviate
},
defaultSourceChainSelector + 2: {
timestamp: time.Now().Add(-30 * time.Minute), // recent
value: val1e18(220), // median does not deviate
},
},
f: 1,
expGasUpdates: []cciptypes.GasPrice{
{DestChainSelector: defaultSourceChainSelector, Value: val1e18(2)},
},
},
{
name: "median one token",
commitObservations: []ccip.CommitObservation{
Expand Down Expand Up @@ -1205,14 +1243,14 @@ func TestCommitReportingPlugin_calculatePriceUpdates(t *testing.T) {
expGasUpdates: []cciptypes.GasPrice{{DestChainSelector: defaultSourceChainSelector, Value: big.NewInt(0)}},
},
{
name: "token price update skipped because it is close to the latest",
name: "token price update skipped because it does not deviate and are recent",
commitObservations: []ccip.CommitObservation{
{
TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(11)},
TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(11), feeToken2: val1e18(11)},
SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(0)},
},
{
TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(12)},
TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(12), feeToken2: val1e18(12)},
SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(0)},
},
},
Expand All @@ -1227,10 +1265,81 @@ func TestCommitReportingPlugin_calculatePriceUpdates(t *testing.T) {
timestamp: time.Now().Add(-30 * time.Minute),
value: val1e18(10),
},
feeToken2: {
timestamp: time.Now().Add(-30 * time.Minute),
value: val1e18(10),
},
},
// We expect a gas update because no latest
expGasUpdates: []cciptypes.GasPrice{{DestChainSelector: defaultSourceChainSelector, Value: big.NewInt(0)}},
},
{
name: "multiple token price update due to staleness",
commitObservations: []ccip.CommitObservation{
{
TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(11), feeToken2: val1e18(11)},
SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(0)},
},
{
TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(12), feeToken2: val1e18(12)},
SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(0)},
},
},
f: 1,
gasPriceHeartBeat: *config.MustNewDuration(time.Hour),
daGasPriceDeviationPPB: 20e7,
execGasPriceDeviationPPB: 20e7,
tokenPriceHeartBeat: *config.MustNewDuration(time.Hour),
tokenPriceDeviationPPB: 20e7,
latestTokenPrices: map[cciptypes.Address]update{
feeToken1: {
timestamp: time.Now().Add(-90 * time.Minute),
value: val1e18(10),
},
feeToken2: {
timestamp: time.Now().Add(-30 * time.Minute),
value: val1e18(10),
},
},
expTokenUpdates: []cciptypes.TokenPrice{
{Token: feeToken1, Value: val1e18(12)},
{Token: feeToken2, Value: val1e18(12)},
},
expGasUpdates: []cciptypes.GasPrice{{DestChainSelector: defaultSourceChainSelector, Value: big.NewInt(0)}},
},
{
name: "multiple token exist but only one updates due to deviation",
commitObservations: []ccip.CommitObservation{
{
TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(11), feeToken2: val1e18(13)},
SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(0)},
},
{
TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(12), feeToken2: val1e18(14)},
SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(0)},
},
},
f: 1,
gasPriceHeartBeat: *config.MustNewDuration(time.Hour),
daGasPriceDeviationPPB: 20e7,
execGasPriceDeviationPPB: 20e7,
tokenPriceHeartBeat: *config.MustNewDuration(time.Hour),
tokenPriceDeviationPPB: 20e7,
latestTokenPrices: map[cciptypes.Address]update{
feeToken1: {
timestamp: time.Now().Add(-30 * time.Minute),
value: val1e18(10),
},
feeToken2: {
timestamp: time.Now().Add(-30 * time.Minute),
value: val1e18(10),
},
},
expTokenUpdates: []cciptypes.TokenPrice{
{Token: feeToken2, Value: val1e18(14)},
},
expGasUpdates: []cciptypes.GasPrice{{DestChainSelector: defaultSourceChainSelector, Value: big.NewInt(0)}},
},
{
name: "gas price and token price both included because they are not close to the latest",
commitObservations: []ccip.CommitObservation{
Expand Down Expand Up @@ -1331,12 +1440,18 @@ func TestCommitReportingPlugin_calculatePriceUpdates(t *testing.T) {
name: "gas price included because it deviates from latest and token price skipped because it does not deviate",
commitObservations: []ccip.CommitObservation{
{
TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(20)},
SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(10)},
TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(20)},
SourceGasPriceUSDPerChain: map[uint64]*big.Int{
defaultSourceChainSelector: val1e18(10),
defaultSourceChainSelector + 1: val1e18(20),
},
},
{
TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(21)},
SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(11)},
TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(21)},
SourceGasPriceUSDPerChain: map[uint64]*big.Int{
defaultSourceChainSelector: val1e18(11),
defaultSourceChainSelector + 1: val1e18(21),
},
},
},
f: 1,
Expand All @@ -1347,8 +1462,12 @@ func TestCommitReportingPlugin_calculatePriceUpdates(t *testing.T) {
tokenPriceDeviationPPB: 200e7,
latestGasPrice: map[uint64]update{
defaultSourceChainSelector: {
timestamp: time.Now().Add(-90 * time.Minute),
value: val1e18(9),
timestamp: time.Now().Add(-30 * time.Minute),
value: val1e18(8),
},
defaultSourceChainSelector + 1: {
timestamp: time.Now().Add(-30 * time.Minute),
value: val1e18(21),
},
},
latestTokenPrices: map[cciptypes.Address]update{
Expand All @@ -1363,11 +1482,11 @@ func TestCommitReportingPlugin_calculatePriceUpdates(t *testing.T) {
name: "gas price skipped because it does not deviate and token price included because it has not been updated recently",
commitObservations: []ccip.CommitObservation{
{
TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(20)},
TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(10), feeToken2: val1e18(20)},
SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(10)},
},
{
TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(21)},
TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(11), feeToken2: val1e18(21)},
SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(11)},
},
},
Expand All @@ -1386,11 +1505,16 @@ func TestCommitReportingPlugin_calculatePriceUpdates(t *testing.T) {
latestTokenPrices: map[cciptypes.Address]update{
feeToken1: {
timestamp: time.Now().Add(-4 * time.Hour),
value: val1e18(11),
},
feeToken2: {
timestamp: time.Now().Add(-1 * time.Hour),
value: val1e18(21),
},
},
expTokenUpdates: []cciptypes.TokenPrice{
{Token: feeToken1, Value: val1e18(21)},
{Token: feeToken1, Value: val1e18(11)},
{Token: feeToken2, Value: val1e18(21)},
},
expGasUpdates: nil,
},
Expand Down
Loading