From c617e5edd83644865754c60147190728380e29f6 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Fri, 17 Nov 2023 09:15:29 +0100 Subject: [PATCH] CCIP-1290 Processing Commit Roots in steps (#284) --- .../ccip/execution_reporting_plugin.go | 257 ++++++++++-------- .../ccip/execution_reporting_plugin_test.go | 60 ++++ .../internal/ccipdata/commit_store_reader.go | 1 + 3 files changed, 205 insertions(+), 113 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go b/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go index 1f69a20ce5..8a383f9f88 100644 --- a/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go @@ -38,6 +38,8 @@ const ( // MaxDataLenPerBatch limits the total length of msg data that can be in a batch. MaxDataLenPerBatch = 60_000 + // MessagesIterationStep limits number of messages fetched to memory at once when iterating through unexpired CommitRoots + MessagesIterationStep = 800 ) var ( @@ -211,143 +213,139 @@ func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp ty } func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context, lggr logger.Logger, timestamp types.ReportTimestamp, inflight []InflightInternalExecutionReport) ([]ObservedMessage, error) { - unexpiredReports, err := getUnexpiredCommitReports( + unexpiredReports, err := r.getUnexpiredCommitReports( ctx, r.config.commitStoreReader, r.onchainConfig.PermissionLessExecutionThresholdSeconds, + lggr, ) if err != nil { return nil, err } - lggr.Infow("Unexpired roots", "n", len(unexpiredReports)) + if len(unexpiredReports) == 0 { return []ObservedMessage{}, nil } - // This could result in slightly different values on each call as - // the function returns the allowed amount at the time of the last block. - // Since this will only increase over time, the highest observed value will - // always be the lower bound of what would be available on chain - // since we already account for inflight txs. - getAllowedTokenAmount := cache.LazyFetch(func() (evm_2_evm_offramp.RateLimiterTokenBucket, error) { - return r.config.offRampReader.CurrentRateLimiterState(ctx) - }) - sourceToDestTokens, supportedDestTokens, err := r.sourceDestinationTokens(ctx) - if err != nil { - return nil, err - } - getSourceTokensPrices := cache.LazyFetch(func() (map[common.Address]*big.Int, error) { - sourceFeeTokens, err1 := r.cachedSourceFeeTokens.Get(ctx) - if err1 != nil { - return nil, err1 - } - return getTokensPrices(ctx, sourceFeeTokens, r.config.sourcePriceRegistry, []common.Address{r.config.sourceWrappedNativeToken}) - }) - getDestTokensPrices := cache.LazyFetch(func() (map[common.Address]*big.Int, error) { - dstTokens, err1 := r.cachedDestTokens.Get(ctx) - if err1 != nil { - return nil, err1 - } - return getTokensPrices(ctx, dstTokens.FeeTokens, r.destPriceRegistry, append(supportedDestTokens, r.destWrappedNative)) - }) - getDestGasPrice := cache.LazyFetch(func() (prices.GasPrice, error) { - return r.gasPriceEstimator.GetGasPrice(ctx) - }) + for j := 0; j < len(unexpiredReports); { + unexpiredReportsPart, step := selectReportsToFillBatch(unexpiredReports[j:], MessagesIterationStep) + j += step - lggr.Infow("Processing unexpired reports", "n", len(unexpiredReports)) - measureNumberOfReportsProcessed(timestamp, len(unexpiredReports)) - reportIterationStart := time.Now() - defer func() { - measureReportsIterationDuration(timestamp, time.Since(reportIterationStart)) - }() - - unexpiredReportsWithSendReqs, err := r.getReportsWithSendRequests(ctx, unexpiredReports) - if err != nil { - return nil, err - } + // This could result in slightly different values on each call as + // the function returns the allowed amount at the time of the last block. + // Since this will only increase over time, the highest observed value will + // always be the lower bound of what would be available on chain + // since we already account for inflight txs. + getAllowedTokenAmount := cache.LazyFetch(func() (evm_2_evm_offramp.RateLimiterTokenBucket, error) { + return r.config.offRampReader.CurrentRateLimiterState(ctx) + }) + sourceToDestTokens, supportedDestTokens, err := r.sourceDestinationTokens(ctx) + if err != nil { + return nil, err + } + getSourceTokensPrices := cache.LazyFetch(func() (map[common.Address]*big.Int, error) { + sourceFeeTokens, err1 := r.cachedSourceFeeTokens.Get(ctx) + if err1 != nil { + return nil, err1 + } + return getTokensPrices(ctx, sourceFeeTokens, r.config.sourcePriceRegistry, []common.Address{r.config.sourceWrappedNativeToken}) + }) + getDestTokensPrices := cache.LazyFetch(func() (map[common.Address]*big.Int, error) { + dstTokens, err1 := r.cachedDestTokens.Get(ctx) + if err1 != nil { + return nil, err1 + } + return getTokensPrices(ctx, dstTokens.FeeTokens, r.destPriceRegistry, append(supportedDestTokens, r.destWrappedNative)) + }) + getDestGasPrice := cache.LazyFetch(func() (prices.GasPrice, error) { + return r.gasPriceEstimator.GetGasPrice(ctx) + }) - getDestPoolRateLimits := cache.LazyFetch(func() (map[common.Address]*big.Int, error) { - return r.destPoolRateLimits(ctx, unexpiredReportsWithSendReqs, sourceToDestTokens) - }) + measureNumberOfReportsProcessed(timestamp, len(unexpiredReportsPart)) - for _, rep := range unexpiredReportsWithSendReqs { - if ctx.Err() != nil { - lggr.Warn("Processing of roots killed by context") - break + unexpiredReportsWithSendReqs, err := r.getReportsWithSendRequests(ctx, unexpiredReportsPart) + if err != nil { + return nil, err } - merkleRoot := rep.commitReport.MerkleRoot + getDestPoolRateLimits := cache.LazyFetch(func() (map[common.Address]*big.Int, error) { + return r.destPoolRateLimits(ctx, unexpiredReportsWithSendReqs, sourceToDestTokens) + }) - rootLggr := lggr.With("root", hexutil.Encode(merkleRoot[:]), - "minSeqNr", rep.commitReport.Interval.Min, - "maxSeqNr", rep.commitReport.Interval.Max, - ) + for _, rep := range unexpiredReportsWithSendReqs { + if ctx.Err() != nil { + lggr.Warn("Processing of roots killed by context") + break + } - if r.snoozedRoots.IsSnoozed(merkleRoot) { - rootLggr.Debug("Skipping snoozed root") - continue - } + merkleRoot := rep.commitReport.MerkleRoot - if err := rep.validate(); err != nil { - rootLggr.Errorw("Skipping invalid report", "err", err) - continue - } + rootLggr := lggr.With("root", hexutil.Encode(merkleRoot[:]), + "minSeqNr", rep.commitReport.Interval.Min, + "maxSeqNr", rep.commitReport.Interval.Max, + ) - // If all messages are already executed and finalized, snooze the root for - // config.PermissionLessExecutionThresholdSeconds so it will never be considered again. - if allMsgsExecutedAndFinalized := rep.allRequestsAreExecutedAndFinalized(); allMsgsExecutedAndFinalized { - rootLggr.Infof("Snoozing root %s forever since there are no executable txs anymore", hex.EncodeToString(merkleRoot[:])) - r.snoozedRoots.MarkAsExecuted(merkleRoot) - incSkippedRequests(reasonAllExecuted) - continue - } + if err := rep.validate(); err != nil { + rootLggr.Errorw("Skipping invalid report", "err", err) + continue + } - blessed, err := r.config.commitStoreReader.IsBlessed(ctx, merkleRoot) - if err != nil { - return nil, err - } - if !blessed { - rootLggr.Infow("Report is accepted but not blessed") - incSkippedRequests(reasonNotBlessed) - continue - } + // If all messages are already executed and finalized, snooze the root for + // config.PermissionLessExecutionThresholdSeconds so it will never be considered again. + if allMsgsExecutedAndFinalized := rep.allRequestsAreExecutedAndFinalized(); allMsgsExecutedAndFinalized { + rootLggr.Infof("Snoozing root %s forever since there are no executable txs anymore", hex.EncodeToString(merkleRoot[:])) + r.snoozedRoots.MarkAsExecuted(merkleRoot) + incSkippedRequests(reasonAllExecuted) + continue + } - allowedTokenAmountValue, err := getAllowedTokenAmount() - if err != nil { - return nil, err - } - sourceTokensPricesValue, err := getSourceTokensPrices() - if err != nil { - return nil, fmt.Errorf("get source token prices: %w", err) - } + blessed, err := r.config.commitStoreReader.IsBlessed(ctx, merkleRoot) + if err != nil { + return nil, err + } + if !blessed { + rootLggr.Infow("Report is accepted but not blessed") + incSkippedRequests(reasonNotBlessed) + continue + } - destTokensPricesValue, err := getDestTokensPrices() - if err != nil { - return nil, fmt.Errorf("get dest token prices: %w", err) - } + allowedTokenAmountValue, err := getAllowedTokenAmount() + if err != nil { + return nil, err + } + sourceTokensPricesValue, err := getSourceTokensPrices() + if err != nil { + return nil, fmt.Errorf("get source token prices: %w", err) + } - destPoolRateLimits, err := getDestPoolRateLimits() - if err != nil { - return nil, fmt.Errorf("get dest pool rate limits: %w", err) - } + destTokensPricesValue, err := getDestTokensPrices() + if err != nil { + return nil, fmt.Errorf("get dest token prices: %w", err) + } - buildBatchDuration := time.Now() - batch := r.buildBatch( - ctx, - rootLggr, - rep, - inflight, - allowedTokenAmountValue.Tokens, - sourceTokensPricesValue, - destTokensPricesValue, - getDestGasPrice, - sourceToDestTokens, - destPoolRateLimits) - measureBatchBuildDuration(timestamp, time.Since(buildBatchDuration)) - if len(batch) != 0 { - return batch, nil + destPoolRateLimits, err := getDestPoolRateLimits() + if err != nil { + return nil, fmt.Errorf("get dest pool rate limits: %w", err) + } + + buildBatchDuration := time.Now() + batch := r.buildBatch( + ctx, + rootLggr, + rep, + inflight, + allowedTokenAmountValue.Tokens, + sourceTokensPricesValue, + destTokensPricesValue, + getDestGasPrice, + sourceToDestTokens, + destPoolRateLimits) + measureBatchBuildDuration(timestamp, time.Since(buildBatchDuration)) + if len(batch) != 0 { + return batch, nil + } + r.snoozedRoots.Snooze(merkleRoot) } - r.snoozedRoots.Snooze(merkleRoot) } return []ObservedMessage{}, nil } @@ -1142,10 +1140,11 @@ func getTokensPrices(ctx context.Context, feeTokens []common.Address, priceRegis return prices, nil } -func getUnexpiredCommitReports( +func (r *ExecutionReportingPlugin) getUnexpiredCommitReports( ctx context.Context, commitStoreReader ccipdata.CommitStoreReader, permissionExecutionThreshold time.Duration, + lggr logger.Logger, ) ([]ccipdata.CommitStoreReport, error) { acceptedReports, err := commitStoreReader.GetAcceptedCommitReportsGteTimestamp( ctx, @@ -1160,5 +1159,37 @@ func getUnexpiredCommitReports( for _, acceptedReport := range acceptedReports { reports = append(reports, acceptedReport.Data) } - return reports, nil + + notSnoozedReports := make([]ccipdata.CommitStoreReport, 0) + for _, report := range reports { + if r.snoozedRoots.IsSnoozed(report.MerkleRoot) { + lggr.Debug("Skipping snoozed root", "minSeqNr", report.Interval.Min, "maxSeqNr", report.Interval.Max) + continue + } + notSnoozedReports = append(notSnoozedReports, report) + } + + lggr.Infow("Unexpired roots", "all", len(reports), "notSnoozed", len(notSnoozedReports)) + return notSnoozedReports, nil +} + +// selectReportsToFillBatch returns the reports to fill the message limit. Single Commit Root contains exactly (Interval.Max - Interval.Min + 1) messages. +// We keep adding reports until we reach the message limit. Please see the tests for more examples and edge cases. +// unexpiredReports have to be sorted by Interval.Min. Otherwise, the batching logic will not be efficient, +// because it picks messages and execution states based on the report[0].Interval.Min - report[len-1].Interval.Max range. +// Having unexpiredReports not sorted properly will lead to fetching more messages and execution states to the memory than the messagesLimit provided. +// However, logs from LogPoller are returned ordered by (block_number, log_index), so it should preserve the order of Interval.Min. +// Single CommitRoot can have up to 256 messages, with current MessagesIterationStep of 800, it means processing 4 CommitRoots at once. +func selectReportsToFillBatch(unexpiredReports []ccipdata.CommitStoreReport, messagesLimit uint64) ([]ccipdata.CommitStoreReport, int) { + currentNumberOfMessages := uint64(0) + var index int + + for index = range unexpiredReports { + currentNumberOfMessages += unexpiredReports[index].Interval.Max - unexpiredReports[index].Interval.Min + 1 + if currentNumberOfMessages >= messagesLimit { + break + } + } + index = min(index+1, len(unexpiredReports)) + return unexpiredReports[:index], index } diff --git a/core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go b/core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go index 96e2205d43..310be34760 100644 --- a/core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go @@ -1728,3 +1728,63 @@ func generateExecutionReport(t *testing.T, numMsgs, tokensPerMsg, bytesPerMsg in ProofFlagBits: big.NewInt(rand.Int64()), } } + +func Test_selectReportsToFillBatch(t *testing.T) { + reports := []ccipdata.CommitStoreReport{ + {Interval: ccipdata.CommitStoreInterval{Min: 1, Max: 10}}, + {Interval: ccipdata.CommitStoreInterval{Min: 11, Max: 20}}, + {Interval: ccipdata.CommitStoreInterval{Min: 21, Max: 25}}, + {Interval: ccipdata.CommitStoreInterval{Min: 26, Max: math.MaxUint64}}, + } + + tests := []struct { + name string + step uint64 + numberOfBatches int + }{ + { + name: "pick all at once when step size is high", + step: 100, + numberOfBatches: 1, + }, + { + name: "pick one by one when step size is 1", + step: 1, + numberOfBatches: 4, + }, + { + name: "pick two when step size doesn't match report", + step: 15, + numberOfBatches: 2, + }, + { + name: "pick one by one when step size is smaller then reports", + step: 4, + numberOfBatches: 4, + }, + { + name: "batch some reports together", + step: 7, + numberOfBatches: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var unexpiredReportsBatches [][]ccipdata.CommitStoreReport + for i := 0; i < len(reports); { + unexpiredReports, step := selectReportsToFillBatch(reports[i:], tt.step) + unexpiredReportsBatches = append(unexpiredReportsBatches, unexpiredReports) + i += step + } + assert.Len(t, unexpiredReportsBatches, tt.numberOfBatches) + + var flatten []ccipdata.CommitStoreReport + for _, r := range unexpiredReportsBatches { + flatten = append(flatten, r...) + } + assert.Len(t, flatten, len(reports)) + assert.Equal(t, reports, flatten) + }) + } +} diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_reader.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_reader.go index 6252890a6c..88be66a2f0 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_reader.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/commit_store_reader.go @@ -99,6 +99,7 @@ type CommitStoreReader interface { // GetAcceptedCommitReportsGteSeqNum returns all the accepted commit reports that have max sequence number greater than or equal to the provided. GetAcceptedCommitReportsGteSeqNum(ctx context.Context, seqNum uint64, confs int) ([]Event[CommitStoreReport], error) // GetAcceptedCommitReportsGteTimestamp returns all the commit reports with timestamp greater than or equal to the provided. + // Returned Commit Reports have to be sorted by Interval.Min/Interval.Max in ascending order. GetAcceptedCommitReportsGteTimestamp(ctx context.Context, ts time.Time, confs int) ([]Event[CommitStoreReport], error) IsDown(ctx context.Context) (bool, error) IsBlessed(ctx context.Context, root [32]byte) (bool, error)