Skip to content

Commit

Permalink
Separate manual execution threshold from oracle message visibility th…
Browse files Browse the repository at this point in the history
…reshold. (#770)

Support offRamp's offchainConfig `msgVisibilityInterval`, this field
defines after what time the offchain code should stop attempting a ccip
msg.

If a value is not set, e.g. for existing offRamps that are not
configured, it fallsback to a hardcoded default of `8h`.

Also updated the tokenData worker to not rely on `msgVisibilityInterval`
or `permisionlessExecThreshold` and removed some unused methods.

chainlink-common PR:
smartcontractkit/chainlink-common#504

---------

Co-authored-by: Rens Rooimans <[email protected]>
  • Loading branch information
dimkouv and RensR authored Jun 20, 2024
1 parent 9945858 commit 7aaa830
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 49 deletions.
5 changes: 5 additions & 0 deletions .changeset/tasty-pianos-attend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"ccip": patch
---

add offchainConfig value to know after what time the offchain code should stop attempting a tx
10 changes: 9 additions & 1 deletion core/services/ocr2/plugins/ccip/ccipexec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPluginFn(config types.Rep
return reportingPluginAndInfo{}, fmt.Errorf("get onchain config from offramp: %w", err)
}

msgVisibilityInterval := offchainConfig.MessageVisibilityInterval.Duration()
if msgVisibilityInterval.Seconds() == 0 {
rf.config.lggr.Info("MessageVisibilityInterval not set, falling back to PermissionLessExecutionThreshold")
msgVisibilityInterval = onchainConfig.PermissionLessExecutionThresholdSeconds
}
rf.config.lggr.Infof("MessageVisibilityInterval set to: %s", msgVisibilityInterval)

lggr := rf.config.lggr.Named("ExecutionReportingPlugin")
plugin := &ExecutionReportingPlugin{
F: config.F,
Expand All @@ -129,10 +136,11 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPluginFn(config types.Rep
offRampReader: rf.config.offRampReader,
tokenPoolBatchedReader: rf.config.tokenPoolBatchedReader,
inflightReports: newInflightExecReportsContainer(offchainConfig.InflightCacheExpiry.Duration()),
commitRootsCache: cache.NewCommitRootsCache(lggr, onchainConfig.PermissionLessExecutionThresholdSeconds, offchainConfig.RootSnoozeTime.Duration()),
commitRootsCache: cache.NewCommitRootsCache(lggr, msgVisibilityInterval, offchainConfig.RootSnoozeTime.Duration()),
metricsCollector: rf.config.metricsCollector,
chainHealthcheck: rf.config.chainHealthcheck,
}

pluginInfo := types.ReportingPluginInfo{
Name: "CCIPExecution",
// Setting this to false saves on calldata since OffRamp doesn't require agreement between NOPs
Expand Down
22 changes: 13 additions & 9 deletions core/services/ocr2/plugins/ccip/ccipexec/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,16 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/promwrapper"
)

const numTokenDataWorkers = 5
var (
// tokenDataWorkerTimeout defines 1) The timeout while waiting for a bg call to the token data 3P provider.
// 2) When a client requests token data and does not specify a timeout this value is used as a default.
// 5 seconds is a reasonable value for a timeout.
// At this moment, minimum OCR Delta Round is set to 30s and deltaGrace to 5s. Based on this configuration
// 5s for token data worker timeout is a reasonable default.
tokenDataWorkerTimeout = 5 * time.Second
// tokenDataWorkerNumWorkers is the number of workers that will be processing token data in parallel.
tokenDataWorkerNumWorkers = 5
)

var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute}

Expand Down Expand Up @@ -282,16 +291,11 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J
params.offRampConfig.OnRamp,
)

onchainConfig, err := offRampReader.OnchainConfig(ctx)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("get onchain config from offramp reader: %w", err)
}

tokenBackgroundWorker := tokendata.NewBackgroundWorker(
tokenDataProviders,
numTokenDataWorkers,
5*time.Second,
onchainConfig.PermissionLessExecutionThresholdSeconds,
tokenDataWorkerNumWorkers,
tokenDataWorkerTimeout,
2*tokenDataWorkerTimeout,
)
return &ExecutionPluginStaticConfig{
lggr: execLggr,
Expand Down
54 changes: 27 additions & 27 deletions core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

const (
// EvictionGracePeriod defines how long after the permissionless execution threshold a root is still kept in the cache
// EvictionGracePeriod defines how long after the messageVisibilityInterval a root is still kept in the cache
EvictionGracePeriod = 1 * time.Hour
// CleanupInterval defines how often roots cache is scanned to evict stale roots
CleanupInterval = 30 * time.Minute
Expand All @@ -25,7 +25,7 @@ type CommitsRootsCache interface {
Snooze(merkleRoot [32]byte)

// OldestRootTimestamp returns the oldest root timestamp that is not executed yet (minus 1 second).
// If there are no roots in the queue, it returns the permissionlessExecThreshold
// If there are no roots in the queue, it returns the messageVisibilityInterval
OldestRootTimestamp() time.Time
// AppendUnexecutedRoot appends the root to the unexecuted roots queue to keep track of the roots that are not executed yet
// Roots has to be added in the order they are fetched from the database
Expand All @@ -40,15 +40,15 @@ type commitRootsCache struct {
// snoozedRoots is used to keep track of the roots that are temporary snoozed
snoozedRoots *cache.Cache
// unexecutedRootsQueue is used to keep track of the unexecuted roots in the order they are fetched from database (should be ordered by block_number, log_index)
// First run of Exec will fill the queue with all the roots that are not executed yet within the [now-permissionlessExecThreshold, now] window.
// When a root is executed, it is removed from the queue. Next database query instead of using entire permissionlessExecThrehsold window
// First run of Exec will fill the queue with all the roots that are not executed yet within the [now-messageVisibilityInterval, now] window.
// When a root is executed, it is removed from the queue. Next database query instead of using entire messageVisibilityInterval window
// will use oldestRootTimestamp as the lower bound filter for block_timestamp.
// This way we can reduce the number of database rows fetched with every OCR round.
// We do it this way because roots for most of the cases are executed sequentially.
// Instead of skipping snoozed roots after we fetch them from the database, we do that on the db level by narrowing the search window.
//
// Example
// permissionLessExecThresholds - 10 days, now - 2010-10-15
// messageVisibilityInterval - 10 days, now - 2010-10-15
// We fetch all the roots that within the [2010-10-05, 2010-10-15] window and load them to the queue
// [0xA - 2010-10-10, 0xB - 2010-10-11, 0xC - 2010-10-12] -> 0xA is the oldest root
// We executed 0xA and a couple of rounds later, we mark 0xA as executed and snoozed that forever which removes it from the queue.
Expand All @@ -60,40 +60,40 @@ type commitRootsCache struct {
oldestRootTimestamp time.Time
rootsQueueMu sync.RWMutex

// Both rootSnoozedTime and permissionLessExecutionThresholdDuration can be kept in the commitRootsCache without need to be updated.
// Both rootSnoozedTime and messageVisibilityInterval can be kept in the commitRootsCache without need to be updated.
// Those config properties are populates via onchain/offchain config. When changed, OCR plugin will be restarted and cache initialized with new config.
rootSnoozedTime time.Duration
permissionLessExecutionThresholdDuration time.Duration
rootSnoozedTime time.Duration
messageVisibilityInterval time.Duration
}

func newCommitRootsCache(
lggr logger.Logger,
permissionLessExecutionThresholdDuration time.Duration,
messageVisibilityInterval time.Duration,
rootSnoozeTime time.Duration,
evictionGracePeriod time.Duration,
cleanupInterval time.Duration,
) *commitRootsCache {
executedRoots := cache.New(permissionLessExecutionThresholdDuration+evictionGracePeriod, cleanupInterval)
executedRoots := cache.New(messageVisibilityInterval+evictionGracePeriod, cleanupInterval)
snoozedRoots := cache.New(rootSnoozeTime, cleanupInterval)

return &commitRootsCache{
lggr: lggr,
executedRoots: executedRoots,
snoozedRoots: snoozedRoots,
unexecutedRootsQueue: orderedmap.New[string, time.Time](),
rootSnoozedTime: rootSnoozeTime,
permissionLessExecutionThresholdDuration: permissionLessExecutionThresholdDuration,
lggr: lggr,
executedRoots: executedRoots,
snoozedRoots: snoozedRoots,
unexecutedRootsQueue: orderedmap.New[string, time.Time](),
rootSnoozedTime: rootSnoozeTime,
messageVisibilityInterval: messageVisibilityInterval,
}
}

func NewCommitRootsCache(
lggr logger.Logger,
permissionLessExecutionThresholdDuration time.Duration,
messageVisibilityInterval time.Duration,
rootSnoozeTime time.Duration,
) *commitRootsCache {
return newCommitRootsCache(
lggr,
permissionLessExecutionThresholdDuration,
messageVisibilityInterval,
rootSnoozeTime,
EvictionGracePeriod,
CleanupInterval,
Expand Down Expand Up @@ -131,8 +131,8 @@ func (s *commitRootsCache) Snooze(merkleRoot [32]byte) {
}

func (s *commitRootsCache) OldestRootTimestamp() time.Time {
permissionlessExecWindow := time.Now().Add(-s.permissionLessExecutionThresholdDuration)
timestamp, ok := s.pickOldestRootBlockTimestamp(permissionlessExecWindow)
messageVisibilityInterval := time.Now().Add(-s.messageVisibilityInterval)
timestamp, ok := s.pickOldestRootBlockTimestamp(messageVisibilityInterval)

if ok {
return timestamp
Expand All @@ -141,22 +141,22 @@ func (s *commitRootsCache) OldestRootTimestamp() time.Time {
s.rootsQueueMu.Lock()
defer s.rootsQueueMu.Unlock()

// If rootsSearchFilter is before permissionlessExecWindow, it means that we have roots that are stuck forever and will never be executed
// In that case, we wipe out the entire queue. Next round should start from the permissionlessExecThreshold and rebuild cache from scratch.
// If rootsSearchFilter is before messageVisibilityInterval, it means that we have roots that are stuck forever and will never be executed
// In that case, we wipe out the entire queue. Next round should start from the messageVisibilityInterval and rebuild cache from scratch.
s.unexecutedRootsQueue = orderedmap.New[string, time.Time]()
return permissionlessExecWindow
return messageVisibilityInterval
}

func (s *commitRootsCache) pickOldestRootBlockTimestamp(permissionlessExecWindow time.Time) (time.Time, bool) {
func (s *commitRootsCache) pickOldestRootBlockTimestamp(messageVisibilityInterval time.Time) (time.Time, bool) {
s.rootsQueueMu.RLock()
defer s.rootsQueueMu.RUnlock()

// If there are no roots in the queue, we can return the permissionlessExecWindow
// If there are no roots in the queue, we can return the messageVisibilityInterval
if s.oldestRootTimestamp.IsZero() {
return permissionlessExecWindow, true
return messageVisibilityInterval, true
}

if s.oldestRootTimestamp.After(permissionlessExecWindow) {
if s.oldestRootTimestamp.After(messageVisibilityInterval) {
// Query used for fetching roots from the database is exclusive (block_timestamp > :timestamp)
// so we need to subtract 1 second from the head timestamp to make sure that this root is included in the results
return s.oldestRootTimestamp.Add(-time.Second), true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func Test_UnexecutedRootsStaleQueue(t *testing.T) {
assert.Equal(t, t1.Add(-time.Second), commitTs)

// Reducing permissionLessExecutionThreshold works as speeding the clock
c.permissionLessExecutionThresholdDuration = 1 * time.Hour
c.messageVisibilityInterval = 1 * time.Hour

commitTs = c.OldestRootTimestamp()
assert.True(t, commitTs.Before(time.Now().Add(-1*time.Hour)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ func (d ExecOnchainConfig) Validate() error {
return nil
}

func (d ExecOnchainConfig) PermissionLessExecutionThresholdDuration() time.Duration {
return time.Duration(d.PermissionLessExecutionThresholdSeconds) * time.Second
}

// ExecOffchainConfig is the configuration for nodes executing committed CCIP messages (v1.0–v1.2).
// It comes from the OffchainConfig field of the corresponding OCR2 plugin configuration.
// NOTE: do not change the JSON format of this struct without consulting with the RDD people first.
Expand All @@ -112,6 +108,8 @@ type ExecOffchainConfig struct {
InflightCacheExpiry config.Duration
// See [ccipdata.ExecOffchainConfig.RootSnoozeTime]
RootSnoozeTime config.Duration
// See [ccipdata.ExecOffchainConfig.MessageVisibilityInterval]
MessageVisibilityInterval config.Duration
}

func (c ExecOffchainConfig) Validate() error {
Expand Down Expand Up @@ -416,12 +414,14 @@ func (o *OffRamp) ChangeConfig(ctx context.Context, onchainConfigBytes []byte, o
RelativeBoostPerWaitHour: offchainConfigParsed.RelativeBoostPerWaitHour,
InflightCacheExpiry: offchainConfigParsed.InflightCacheExpiry,
RootSnoozeTime: offchainConfigParsed.RootSnoozeTime,
MessageVisibilityInterval: offchainConfigParsed.MessageVisibilityInterval,
}
onchainConfig := cciptypes.ExecOnchainConfig{
PermissionLessExecutionThresholdSeconds: time.Second * time.Duration(onchainConfigParsed.PermissionLessExecutionThresholdSeconds),
Router: cciptypes.Address(onchainConfigParsed.Router.String()),
}
gasPriceEstimator := prices.NewExecGasPriceEstimator(o.Estimator, o.DestMaxGasPrice, 0)

o.UpdateDynamicConfig(onchainConfig, offchainConfig, gasPriceEstimator)

o.Logger.Infow("Starting exec plugin",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestExecOffchainConfig100_Encoding(t *testing.T) {
RelativeBoostPerWaitHour: 0.07,
InflightCacheExpiry: *config.MustNewDuration(64 * time.Second),
RootSnoozeTime: *config.MustNewDuration(128 * time.Minute),
MessageVisibilityInterval: *config.MustNewDuration(6 * time.Hour),
},
},
{
Expand All @@ -48,6 +49,7 @@ func TestExecOffchainConfig100_Encoding(t *testing.T) {
RelativeBoostPerWaitHour: 0,
InflightCacheExpiry: *config.MustNewDuration(0),
RootSnoozeTime: *config.MustNewDuration(0),
MessageVisibilityInterval: *config.MustNewDuration(0),
},
expectErr: true,
},
Expand Down Expand Up @@ -83,7 +85,7 @@ func TestExecOffchainConfig100_Encoding(t *testing.T) {
}

func TestExecOffchainConfig100_AllFieldsRequired(t *testing.T) {
config := ExecOffchainConfig{
cfg := ExecOffchainConfig{
SourceFinalityDepth: 3,
DestOptimisticConfirmations: 6,
DestFinalityDepth: 3,
Expand All @@ -92,13 +94,17 @@ func TestExecOffchainConfig100_AllFieldsRequired(t *testing.T) {
InflightCacheExpiry: *config.MustNewDuration(64 * time.Second),
RootSnoozeTime: *config.MustNewDuration(128 * time.Minute),
}
encoded, err := ccipconfig.EncodeOffchainConfig(&config)
encoded, err := ccipconfig.EncodeOffchainConfig(&cfg)
require.NoError(t, err)

var configAsMap map[string]any
err = json.Unmarshal(encoded, &configAsMap)
require.NoError(t, err)
for keyToDelete := range configAsMap {
if keyToDelete == "MessageVisibilityInterval" {
continue // this field is optional
}

partialConfig := make(map[string]any)
for k, v := range configAsMap {
if k != keyToDelete {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ func (d ExecOnchainConfig) Validate() error {
return nil
}

func (d ExecOnchainConfig) PermissionLessExecutionThresholdDuration() time.Duration {
return time.Duration(d.PermissionLessExecutionThresholdSeconds) * time.Second
}

// JSONExecOffchainConfig is the configuration for nodes executing committed CCIP messages (v1.2).
// It comes from the OffchainConfig field of the corresponding OCR2 plugin configuration.
// NOTE: do not change the JSON format of this struct without consulting with the RDD people first.
Expand All @@ -98,6 +94,8 @@ type JSONExecOffchainConfig struct {
InflightCacheExpiry config.Duration
// See [ccipdata.ExecOffchainConfig.RootSnoozeTime]
RootSnoozeTime config.Duration
// See [ccipdata.ExecOffchainConfig.MessageVisibilityInterval]
MessageVisibilityInterval config.Duration
}

func (c JSONExecOffchainConfig) Validate() error {
Expand Down Expand Up @@ -173,6 +171,7 @@ func (o *OffRamp) ChangeConfig(ctx context.Context, onchainConfigBytes []byte, o
RelativeBoostPerWaitHour: offchainConfigParsed.RelativeBoostPerWaitHour,
InflightCacheExpiry: offchainConfigParsed.InflightCacheExpiry,
RootSnoozeTime: offchainConfigParsed.RootSnoozeTime,
MessageVisibilityInterval: offchainConfigParsed.MessageVisibilityInterval,
}
onchainConfig := cciptypes.ExecOnchainConfig{
PermissionLessExecutionThresholdSeconds: time.Second * time.Duration(onchainConfigParsed.PermissionLessExecutionThresholdSeconds),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/pkg/errors"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
Expand Down Expand Up @@ -157,6 +156,7 @@ func (o *OffRamp) ChangeConfig(ctx context.Context, onchainConfigBytes []byte, o
RelativeBoostPerWaitHour: offchainConfigParsed.RelativeBoostPerWaitHour,
InflightCacheExpiry: offchainConfigParsed.InflightCacheExpiry,
RootSnoozeTime: offchainConfigParsed.RootSnoozeTime,
MessageVisibilityInterval: offchainConfigParsed.MessageVisibilityInterval,
}
onchainConfig := cciptypes.ExecOnchainConfig{
PermissionLessExecutionThresholdSeconds: time.Second * time.Duration(onchainConfigParsed.PermissionLessExecutionThresholdSeconds),
Expand Down

0 comments on commit 7aaa830

Please sign in to comment.