Skip to content

Commit

Permalink
CCIP-1290 Processing Commit Roots in steps (#284)
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara authored Nov 17, 2023
1 parent e275ee8 commit c617e5e
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 113 deletions.
257 changes: 144 additions & 113 deletions core/services/ocr2/plugins/ccip/execution_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (

// MaxDataLenPerBatch limits the total length of msg data that can be in a batch.
MaxDataLenPerBatch = 60_000
// MessagesIterationStep limits number of messages fetched to memory at once when iterating through unexpired CommitRoots
MessagesIterationStep = 800
)

var (
Expand Down Expand Up @@ -211,143 +213,139 @@ func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp ty
}

func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context, lggr logger.Logger, timestamp types.ReportTimestamp, inflight []InflightInternalExecutionReport) ([]ObservedMessage, error) {
unexpiredReports, err := getUnexpiredCommitReports(
unexpiredReports, err := r.getUnexpiredCommitReports(
ctx,
r.config.commitStoreReader,
r.onchainConfig.PermissionLessExecutionThresholdSeconds,
lggr,
)
if err != nil {
return nil, err
}
lggr.Infow("Unexpired roots", "n", len(unexpiredReports))

if len(unexpiredReports) == 0 {
return []ObservedMessage{}, nil
}

// This could result in slightly different values on each call as
// the function returns the allowed amount at the time of the last block.
// Since this will only increase over time, the highest observed value will
// always be the lower bound of what would be available on chain
// since we already account for inflight txs.
getAllowedTokenAmount := cache.LazyFetch(func() (evm_2_evm_offramp.RateLimiterTokenBucket, error) {
return r.config.offRampReader.CurrentRateLimiterState(ctx)
})
sourceToDestTokens, supportedDestTokens, err := r.sourceDestinationTokens(ctx)
if err != nil {
return nil, err
}
getSourceTokensPrices := cache.LazyFetch(func() (map[common.Address]*big.Int, error) {
sourceFeeTokens, err1 := r.cachedSourceFeeTokens.Get(ctx)
if err1 != nil {
return nil, err1
}
return getTokensPrices(ctx, sourceFeeTokens, r.config.sourcePriceRegistry, []common.Address{r.config.sourceWrappedNativeToken})
})
getDestTokensPrices := cache.LazyFetch(func() (map[common.Address]*big.Int, error) {
dstTokens, err1 := r.cachedDestTokens.Get(ctx)
if err1 != nil {
return nil, err1
}
return getTokensPrices(ctx, dstTokens.FeeTokens, r.destPriceRegistry, append(supportedDestTokens, r.destWrappedNative))
})
getDestGasPrice := cache.LazyFetch(func() (prices.GasPrice, error) {
return r.gasPriceEstimator.GetGasPrice(ctx)
})
for j := 0; j < len(unexpiredReports); {
unexpiredReportsPart, step := selectReportsToFillBatch(unexpiredReports[j:], MessagesIterationStep)
j += step

lggr.Infow("Processing unexpired reports", "n", len(unexpiredReports))
measureNumberOfReportsProcessed(timestamp, len(unexpiredReports))
reportIterationStart := time.Now()
defer func() {
measureReportsIterationDuration(timestamp, time.Since(reportIterationStart))
}()

unexpiredReportsWithSendReqs, err := r.getReportsWithSendRequests(ctx, unexpiredReports)
if err != nil {
return nil, err
}
// This could result in slightly different values on each call as
// the function returns the allowed amount at the time of the last block.
// Since this will only increase over time, the highest observed value will
// always be the lower bound of what would be available on chain
// since we already account for inflight txs.
getAllowedTokenAmount := cache.LazyFetch(func() (evm_2_evm_offramp.RateLimiterTokenBucket, error) {
return r.config.offRampReader.CurrentRateLimiterState(ctx)
})
sourceToDestTokens, supportedDestTokens, err := r.sourceDestinationTokens(ctx)
if err != nil {
return nil, err
}
getSourceTokensPrices := cache.LazyFetch(func() (map[common.Address]*big.Int, error) {
sourceFeeTokens, err1 := r.cachedSourceFeeTokens.Get(ctx)
if err1 != nil {
return nil, err1
}
return getTokensPrices(ctx, sourceFeeTokens, r.config.sourcePriceRegistry, []common.Address{r.config.sourceWrappedNativeToken})
})
getDestTokensPrices := cache.LazyFetch(func() (map[common.Address]*big.Int, error) {
dstTokens, err1 := r.cachedDestTokens.Get(ctx)
if err1 != nil {
return nil, err1
}
return getTokensPrices(ctx, dstTokens.FeeTokens, r.destPriceRegistry, append(supportedDestTokens, r.destWrappedNative))
})
getDestGasPrice := cache.LazyFetch(func() (prices.GasPrice, error) {
return r.gasPriceEstimator.GetGasPrice(ctx)
})

getDestPoolRateLimits := cache.LazyFetch(func() (map[common.Address]*big.Int, error) {
return r.destPoolRateLimits(ctx, unexpiredReportsWithSendReqs, sourceToDestTokens)
})
measureNumberOfReportsProcessed(timestamp, len(unexpiredReportsPart))

for _, rep := range unexpiredReportsWithSendReqs {
if ctx.Err() != nil {
lggr.Warn("Processing of roots killed by context")
break
unexpiredReportsWithSendReqs, err := r.getReportsWithSendRequests(ctx, unexpiredReportsPart)
if err != nil {
return nil, err
}

merkleRoot := rep.commitReport.MerkleRoot
getDestPoolRateLimits := cache.LazyFetch(func() (map[common.Address]*big.Int, error) {
return r.destPoolRateLimits(ctx, unexpiredReportsWithSendReqs, sourceToDestTokens)
})

rootLggr := lggr.With("root", hexutil.Encode(merkleRoot[:]),
"minSeqNr", rep.commitReport.Interval.Min,
"maxSeqNr", rep.commitReport.Interval.Max,
)
for _, rep := range unexpiredReportsWithSendReqs {
if ctx.Err() != nil {
lggr.Warn("Processing of roots killed by context")
break
}

if r.snoozedRoots.IsSnoozed(merkleRoot) {
rootLggr.Debug("Skipping snoozed root")
continue
}
merkleRoot := rep.commitReport.MerkleRoot

if err := rep.validate(); err != nil {
rootLggr.Errorw("Skipping invalid report", "err", err)
continue
}
rootLggr := lggr.With("root", hexutil.Encode(merkleRoot[:]),
"minSeqNr", rep.commitReport.Interval.Min,
"maxSeqNr", rep.commitReport.Interval.Max,
)

// If all messages are already executed and finalized, snooze the root for
// config.PermissionLessExecutionThresholdSeconds so it will never be considered again.
if allMsgsExecutedAndFinalized := rep.allRequestsAreExecutedAndFinalized(); allMsgsExecutedAndFinalized {
rootLggr.Infof("Snoozing root %s forever since there are no executable txs anymore", hex.EncodeToString(merkleRoot[:]))
r.snoozedRoots.MarkAsExecuted(merkleRoot)
incSkippedRequests(reasonAllExecuted)
continue
}
if err := rep.validate(); err != nil {
rootLggr.Errorw("Skipping invalid report", "err", err)
continue
}

blessed, err := r.config.commitStoreReader.IsBlessed(ctx, merkleRoot)
if err != nil {
return nil, err
}
if !blessed {
rootLggr.Infow("Report is accepted but not blessed")
incSkippedRequests(reasonNotBlessed)
continue
}
// If all messages are already executed and finalized, snooze the root for
// config.PermissionLessExecutionThresholdSeconds so it will never be considered again.
if allMsgsExecutedAndFinalized := rep.allRequestsAreExecutedAndFinalized(); allMsgsExecutedAndFinalized {
rootLggr.Infof("Snoozing root %s forever since there are no executable txs anymore", hex.EncodeToString(merkleRoot[:]))
r.snoozedRoots.MarkAsExecuted(merkleRoot)
incSkippedRequests(reasonAllExecuted)
continue
}

allowedTokenAmountValue, err := getAllowedTokenAmount()
if err != nil {
return nil, err
}
sourceTokensPricesValue, err := getSourceTokensPrices()
if err != nil {
return nil, fmt.Errorf("get source token prices: %w", err)
}
blessed, err := r.config.commitStoreReader.IsBlessed(ctx, merkleRoot)
if err != nil {
return nil, err
}
if !blessed {
rootLggr.Infow("Report is accepted but not blessed")
incSkippedRequests(reasonNotBlessed)
continue
}

destTokensPricesValue, err := getDestTokensPrices()
if err != nil {
return nil, fmt.Errorf("get dest token prices: %w", err)
}
allowedTokenAmountValue, err := getAllowedTokenAmount()
if err != nil {
return nil, err
}
sourceTokensPricesValue, err := getSourceTokensPrices()
if err != nil {
return nil, fmt.Errorf("get source token prices: %w", err)
}

destPoolRateLimits, err := getDestPoolRateLimits()
if err != nil {
return nil, fmt.Errorf("get dest pool rate limits: %w", err)
}
destTokensPricesValue, err := getDestTokensPrices()
if err != nil {
return nil, fmt.Errorf("get dest token prices: %w", err)
}

buildBatchDuration := time.Now()
batch := r.buildBatch(
ctx,
rootLggr,
rep,
inflight,
allowedTokenAmountValue.Tokens,
sourceTokensPricesValue,
destTokensPricesValue,
getDestGasPrice,
sourceToDestTokens,
destPoolRateLimits)
measureBatchBuildDuration(timestamp, time.Since(buildBatchDuration))
if len(batch) != 0 {
return batch, nil
destPoolRateLimits, err := getDestPoolRateLimits()
if err != nil {
return nil, fmt.Errorf("get dest pool rate limits: %w", err)
}

buildBatchDuration := time.Now()
batch := r.buildBatch(
ctx,
rootLggr,
rep,
inflight,
allowedTokenAmountValue.Tokens,
sourceTokensPricesValue,
destTokensPricesValue,
getDestGasPrice,
sourceToDestTokens,
destPoolRateLimits)
measureBatchBuildDuration(timestamp, time.Since(buildBatchDuration))
if len(batch) != 0 {
return batch, nil
}
r.snoozedRoots.Snooze(merkleRoot)
}
r.snoozedRoots.Snooze(merkleRoot)
}
return []ObservedMessage{}, nil
}
Expand Down Expand Up @@ -1142,10 +1140,11 @@ func getTokensPrices(ctx context.Context, feeTokens []common.Address, priceRegis
return prices, nil
}

func getUnexpiredCommitReports(
func (r *ExecutionReportingPlugin) getUnexpiredCommitReports(
ctx context.Context,
commitStoreReader ccipdata.CommitStoreReader,
permissionExecutionThreshold time.Duration,
lggr logger.Logger,
) ([]ccipdata.CommitStoreReport, error) {
acceptedReports, err := commitStoreReader.GetAcceptedCommitReportsGteTimestamp(
ctx,
Expand All @@ -1160,5 +1159,37 @@ func getUnexpiredCommitReports(
for _, acceptedReport := range acceptedReports {
reports = append(reports, acceptedReport.Data)
}
return reports, nil

notSnoozedReports := make([]ccipdata.CommitStoreReport, 0)
for _, report := range reports {
if r.snoozedRoots.IsSnoozed(report.MerkleRoot) {
lggr.Debug("Skipping snoozed root", "minSeqNr", report.Interval.Min, "maxSeqNr", report.Interval.Max)
continue
}
notSnoozedReports = append(notSnoozedReports, report)
}

lggr.Infow("Unexpired roots", "all", len(reports), "notSnoozed", len(notSnoozedReports))
return notSnoozedReports, nil
}

// selectReportsToFillBatch returns the reports to fill the message limit. Single Commit Root contains exactly (Interval.Max - Interval.Min + 1) messages.
// We keep adding reports until we reach the message limit. Please see the tests for more examples and edge cases.
// unexpiredReports have to be sorted by Interval.Min. Otherwise, the batching logic will not be efficient,
// because it picks messages and execution states based on the report[0].Interval.Min - report[len-1].Interval.Max range.
// Having unexpiredReports not sorted properly will lead to fetching more messages and execution states to the memory than the messagesLimit provided.
// However, logs from LogPoller are returned ordered by (block_number, log_index), so it should preserve the order of Interval.Min.
// Single CommitRoot can have up to 256 messages, with current MessagesIterationStep of 800, it means processing 4 CommitRoots at once.
func selectReportsToFillBatch(unexpiredReports []ccipdata.CommitStoreReport, messagesLimit uint64) ([]ccipdata.CommitStoreReport, int) {
currentNumberOfMessages := uint64(0)
var index int

for index = range unexpiredReports {
currentNumberOfMessages += unexpiredReports[index].Interval.Max - unexpiredReports[index].Interval.Min + 1
if currentNumberOfMessages >= messagesLimit {
break
}
}
index = min(index+1, len(unexpiredReports))
return unexpiredReports[:index], index
}
60 changes: 60 additions & 0 deletions core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1728,3 +1728,63 @@ func generateExecutionReport(t *testing.T, numMsgs, tokensPerMsg, bytesPerMsg in
ProofFlagBits: big.NewInt(rand.Int64()),
}
}

func Test_selectReportsToFillBatch(t *testing.T) {
reports := []ccipdata.CommitStoreReport{
{Interval: ccipdata.CommitStoreInterval{Min: 1, Max: 10}},
{Interval: ccipdata.CommitStoreInterval{Min: 11, Max: 20}},
{Interval: ccipdata.CommitStoreInterval{Min: 21, Max: 25}},
{Interval: ccipdata.CommitStoreInterval{Min: 26, Max: math.MaxUint64}},
}

tests := []struct {
name string
step uint64
numberOfBatches int
}{
{
name: "pick all at once when step size is high",
step: 100,
numberOfBatches: 1,
},
{
name: "pick one by one when step size is 1",
step: 1,
numberOfBatches: 4,
},
{
name: "pick two when step size doesn't match report",
step: 15,
numberOfBatches: 2,
},
{
name: "pick one by one when step size is smaller then reports",
step: 4,
numberOfBatches: 4,
},
{
name: "batch some reports together",
step: 7,
numberOfBatches: 3,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var unexpiredReportsBatches [][]ccipdata.CommitStoreReport
for i := 0; i < len(reports); {
unexpiredReports, step := selectReportsToFillBatch(reports[i:], tt.step)
unexpiredReportsBatches = append(unexpiredReportsBatches, unexpiredReports)
i += step
}
assert.Len(t, unexpiredReportsBatches, tt.numberOfBatches)

var flatten []ccipdata.CommitStoreReport
for _, r := range unexpiredReportsBatches {
flatten = append(flatten, r...)
}
assert.Len(t, flatten, len(reports))
assert.Equal(t, reports, flatten)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type CommitStoreReader interface {
// GetAcceptedCommitReportsGteSeqNum returns all the accepted commit reports that have max sequence number greater than or equal to the provided.
GetAcceptedCommitReportsGteSeqNum(ctx context.Context, seqNum uint64, confs int) ([]Event[CommitStoreReport], error)
// GetAcceptedCommitReportsGteTimestamp returns all the commit reports with timestamp greater than or equal to the provided.
// Returned Commit Reports have to be sorted by Interval.Min/Interval.Max in ascending order.
GetAcceptedCommitReportsGteTimestamp(ctx context.Context, ts time.Time, confs int) ([]Event[CommitStoreReport], error)
IsDown(ctx context.Context) (bool, error)
IsBlessed(ctx context.Context, root [32]byte) (bool, error)
Expand Down

0 comments on commit c617e5e

Please sign in to comment.