diff --git a/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go b/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go index bf33ff92c19..1885dab1537 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go @@ -955,7 +955,6 @@ func TestExecutionReportingPlugin_getReportsWithSendRequests(t *testing.T) { var executedEvents []cciptypes.ExecutionStateChangedWithTxMeta for _, executedSeqNum := range tc.destExecutedSeqNums { - executedEvents = append(executedEvents, cciptypes.ExecutionStateChangedWithTxMeta{ ExecutionStateChanged: cciptypes.ExecutionStateChanged{ SequenceNumber: executedSeqNum, diff --git a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go index c14bccba659..5f8bd5edc56 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go @@ -7,9 +7,10 @@ import ( "time" "github.com/patrickmn/go-cache" - "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" orderedmap "github.com/wk8/go-ordered-map/v2" + "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink/v2/core/logger" ) @@ -82,7 +83,9 @@ type commitRootsCache struct { // Whenever the root is executed (all messages executed and ExecutionStateChange events are finalized), we remove the root from the map. finalizedRoots *orderedmap.OrderedMap[string, ccip.CommitStoreReportWithTxMeta] // snoozedRoots used only for temporary snoozing roots. It's a cache with TTL (usually around 5 minutes, but this configuration is set up on chain using rootSnoozeTime) - snoozedRoots *cache.Cache + snoozedRoots *cache.Cache + // executedRoots is a cache with TTL (usually around 8 hours, but this configuration is set up on chain using messageVisibilityInterval). + // We keep executed roots there to make sure we don't accidentally try to reprocess already executed CommitReport executedRoots *cache.Cache // latestFinalizedCommitRootTs is the timestamp of the latest finalized commit root (youngest in terms of timestamp). // It's used get only the logs that were considered as unfinalized in a previous run. @@ -108,7 +111,6 @@ func (r *commitRootsCache) RootsEligibleForExecution(ctx context.Context) ([]cci // 3. Join finalized commit reports with unfinalized reports and outfilter snoozed roots. // Return only the reports that are not snoozed. return r.pickReadyToExecute(finalizedReports, unfinalizedReports), nil - } // MarkAsExecuted marks the root as executed. It means that all the messages from the root were executed and the ExecutionStateChange event was finalized. @@ -136,6 +138,11 @@ func (r *commitRootsCache) isSnoozed(merkleRoot [32]byte) bool { return snoozed } +func (r *commitRootsCache) isExecuted(merkleRoot [32]byte) bool { + _, executed := r.executedRoots.Get(merkleRootToString(merkleRoot)) + return executed +} + func (r *commitRootsCache) fetchLogsFromCommitStore(ctx context.Context) ([]ccip.CommitStoreReportWithTxMeta, error) { r.cacheMu.Lock() messageVisibilityWindow := time.Now().Add(-r.messageVisibilityInterval) @@ -159,7 +166,7 @@ func (r *commitRootsCache) updateFinalizedRoots(logs []ccip.CommitStoreReportWit for _, log := range logs { prettyMerkleRoot := merkleRootToString(log.MerkleRoot) // Defensive check, if something is marked as executed, never allow it to come back to the cache - if _, executed := r.executedRoots.Get(prettyMerkleRoot); executed { + if r.isExecuted(log.MerkleRoot) { r.lggr.Debugw("Ignoring root marked as executed", "merkleRoot", prettyMerkleRoot, "blockTimestamp", log.BlockTimestampUnixMilli) continue } @@ -204,6 +211,10 @@ func (r *commitRootsCache) pickReadyToExecute(r1 []ccip.CommitStoreReportWithTxM eligibleReports := make([]ccip.CommitStoreReport, 0, len(allReports)) for _, report := range allReports { if r.isSnoozed(report.MerkleRoot) { + r.lggr.Debugw("Skipping snoozed root", + "minSeqNr", report.Interval.Min, + "maxSeqNr", report.Interval.Max, + "merkleRoot", merkleRootToString(report.MerkleRoot)) continue } eligibleReports = append(eligibleReports, report.CommitStoreReport) diff --git a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go index 0bb71ccab7a..5d60c60ea31 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go @@ -6,9 +6,10 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" "github.com/stretchr/testify/require" + cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" diff --git a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_unit_test.go b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_unit_test.go index 5b0e5130c6e..34a470ef907 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_unit_test.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_unit_test.go @@ -4,12 +4,13 @@ import ( "testing" "time" - "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" - "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks" @@ -80,7 +81,7 @@ func Test_CacheFullEviction(t *testing.T) { cache.Snooze(commitRoots[i].MerkleRoot) } } - + // Eventually everything should be entirely removed from cache. We need that check to verify if cache doesn't grow indefinitely require.Eventually(t, func() bool { mockCommitStoreReader(commitStoreReader, time.Time{}, []ccip.CommitStoreReportWithTxMeta{}) roots1, err1 := cache.RootsEligibleForExecution(tests.Context(t)) @@ -90,7 +91,6 @@ func Test_CacheFullEviction(t *testing.T) { cache.finalizedRoots.Len() == 0 && len(cache.snoozedRoots.Items()) == 0 && len(cache.executedRoots.Items()) == 0 - }, 10*time.Second, time.Second) } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go index 50065d3c533..eda2305b47a 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go @@ -12,12 +12,11 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/pkg/errors" - "github.com/smartcontractkit/chainlink-common/pkg/types/query" - "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink-common/pkg/config" - cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go index 46db5fcab06..d54cadc4b93 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go @@ -12,12 +12,11 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/pkg/errors" - "github.com/smartcontractkit/chainlink-common/pkg/types/query" - "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink-common/pkg/config" - cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"