Skip to content

Commit

Permalink
Merge branch 'ccip-develop' into bump-chain-selec
Browse files Browse the repository at this point in the history
  • Loading branch information
KodeyThomas authored Aug 6, 2024
2 parents 5603006 + 2d55ddf commit df0ae3f
Show file tree
Hide file tree
Showing 17 changed files with 245 additions and 46 deletions.
5 changes: 5 additions & 0 deletions .changeset/violet-clouds-rhyme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Updated AutoPurge.Threshold and AutoPurge.MinAttempts configs to only be required for heuristic and added content-type header for Scroll API #internal
7 changes: 7 additions & 0 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,13 @@ jobs:
os: ubuntu-latest
file: ccip
run: -run ^TestSmokeCCIPOffRampAggRateLimit$
- name: ccip-smoke-leader-lane
nodes: 15
dir: ccip-tests/smoke
os: ubuntu-latest
file: ccip
run: -run ^TestSmokeCCIPForBidirectionalLane$
config_path: ./integration-tests/ccip-tests/testconfig/tomls/leader-lane.toml
- name: runlog
id: runlog
nodes: 2
Expand Down
8 changes: 4 additions & 4 deletions core/chains/evm/config/chain_scoped_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ func (a *autoPurgeConfig) Enabled() bool {
return *a.c.Enabled
}

func (a *autoPurgeConfig) Threshold() uint32 {
return *a.c.Threshold
func (a *autoPurgeConfig) Threshold() *uint32 {
return a.c.Threshold
}

func (a *autoPurgeConfig) MinAttempts() uint32 {
return *a.c.MinAttempts
func (a *autoPurgeConfig) MinAttempts() *uint32 {
return a.c.MinAttempts
}

func (a *autoPurgeConfig) DetectionApiUrl() *url.URL {
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ type Transactions interface {

type AutoPurgeConfig interface {
Enabled() bool
Threshold() uint32
MinAttempts() uint32
Threshold() *uint32
MinAttempts() *uint32
DetectionApiUrl() *url.URL
}

Expand Down
23 changes: 17 additions & 6 deletions core/chains/evm/txmgr/stuck_tx_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"net/http"
Expand Down Expand Up @@ -37,8 +38,8 @@ type stuckTxDetectorTxStore interface {

type stuckTxDetectorConfig interface {
Enabled() bool
Threshold() uint32
MinAttempts() uint32
Threshold() *uint32
MinAttempts() *uint32
DetectionApiUrl() *url.URL
}

Expand Down Expand Up @@ -78,7 +79,7 @@ func NewStuckTxDetector(lggr logger.Logger, chainID *big.Int, chainType chaintyp

func (d *stuckTxDetector) LoadPurgeBlockNumMap(ctx context.Context, addresses []common.Address) error {
// Skip loading purge block num map if auto-purge feature disabled or Threshold is set to 0
if !d.cfg.Enabled() || d.cfg.Threshold() == 0 {
if !d.cfg.Enabled() || d.cfg.Threshold() == nil || *d.cfg.Threshold() == 0 {
return nil
}
d.purgeBlockNumLock.Lock()
Expand Down Expand Up @@ -172,6 +173,11 @@ func (d *stuckTxDetector) FindUnconfirmedTxWithLowestNonce(ctx context.Context,
// 4. If 3 is true, check if the latest attempt's gas price is higher than what our gas estimator's GetFee method returns
// 5. If 4 is true, the transaction is likely stuck due to overflow
func (d *stuckTxDetector) detectStuckTransactionsHeuristic(ctx context.Context, txs []Tx, blockNum int64) ([]Tx, error) {
if d.cfg.Threshold() == nil || d.cfg.MinAttempts() == nil {
err := errors.New("missing required configs for the stuck transaction heuristic. Transactions.AutoPurge.Threshold and Transactions.AutoPurge.MinAttempts are required")
d.lggr.Error(err.Error())
return txs, err
}
d.purgeBlockNumLock.RLock()
defer d.purgeBlockNumLock.RUnlock()
// Get gas price from internal gas estimator
Expand All @@ -187,17 +193,17 @@ func (d *stuckTxDetector) detectStuckTransactionsHeuristic(ctx context.Context,
d.purgeBlockNumLock.RLock()
lastPurgeBlockNum := d.purgeBlockNumMap[tx.FromAddress]
d.purgeBlockNumLock.RUnlock()
if lastPurgeBlockNum > blockNum-int64(d.cfg.Threshold()) {
if lastPurgeBlockNum > blockNum-int64(*d.cfg.Threshold()) {
continue
}
// Tx attempts are loaded from newest to oldest
oldestBroadcastAttempt, newestBroadcastAttempt, broadcastedAttemptsCount := findBroadcastedAttempts(tx)
// 2. Check if Threshold amount of blocks have passed since the oldest attempt's broadcast block num
if *oldestBroadcastAttempt.BroadcastBeforeBlockNum > blockNum-int64(d.cfg.Threshold()) {
if *oldestBroadcastAttempt.BroadcastBeforeBlockNum > blockNum-int64(*d.cfg.Threshold()) {
continue
}
// 3. Check if the transaction has at least MinAttempts amount of broadcasted attempts
if broadcastedAttemptsCount < d.cfg.MinAttempts() {
if broadcastedAttemptsCount < *d.cfg.MinAttempts() {
continue
}
// 4. Check if the newest broadcasted attempt's gas price is higher than what our gas estimator's GetFee method returns
Expand Down Expand Up @@ -278,6 +284,10 @@ func (d *stuckTxDetector) detectStuckTransactionsScroll(ctx context.Context, txs
if err != nil {
return nil, fmt.Errorf("failed to make new request with context: %w", err)
}

// Add Content-Type header
postReq.Header.Add("Content-Type", "application/json")

// Send request
resp, err := d.httpClient.Do(postReq)
if err != nil {
Expand All @@ -287,6 +297,7 @@ func (d *stuckTxDetector) detectStuckTransactionsScroll(ctx context.Context, txs
if resp.StatusCode != 200 {
return nil, fmt.Errorf("request failed with status %d", resp.StatusCode)
}

// Decode the response into expected type
scrollResp := new(scrollResponse)
err = json.NewDecoder(resp.Body).Decode(scrollResp)
Expand Down
16 changes: 8 additions & 8 deletions core/chains/evm/txmgr/stuck_tx_detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func TestStuckTxDetector_LoadPurgeBlockNumMap(t *testing.T) {
autoPurgeMinAttempts := uint32(3)
autoPurgeCfg := testAutoPurgeConfig{
enabled: true, // Enable auto-purge feature for testing
threshold: autoPurgeThreshold,
minAttempts: autoPurgeMinAttempts,
threshold: &autoPurgeThreshold,
minAttempts: &autoPurgeMinAttempts,
}
stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, "", assets.NewWei(assets.NewEth(100).ToInt()), autoPurgeCfg, feeEstimator, txStore, ethClient)

Expand Down Expand Up @@ -176,8 +176,8 @@ func TestStuckTxDetector_DetectStuckTransactionsHeuristic(t *testing.T) {
autoPurgeMinAttempts := uint32(3)
autoPurgeCfg := testAutoPurgeConfig{
enabled: true, // Enable auto-purge feature for testing
threshold: autoPurgeThreshold,
minAttempts: autoPurgeMinAttempts,
threshold: &autoPurgeThreshold,
minAttempts: &autoPurgeMinAttempts,
}
blockNum := int64(100)
stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, "", assets.NewWei(assets.NewEth(100).ToInt()), autoPurgeCfg, feeEstimator, txStore, ethClient)
Expand Down Expand Up @@ -423,12 +423,12 @@ func mustInsertUnconfirmedEthTxWithBroadcastPurgeAttempt(t *testing.T, txStore t

type testAutoPurgeConfig struct {
enabled bool
threshold uint32
minAttempts uint32
threshold *uint32
minAttempts *uint32
detectionApiUrl *url.URL
}

func (t testAutoPurgeConfig) Enabled() bool { return t.enabled }
func (t testAutoPurgeConfig) Threshold() uint32 { return t.threshold }
func (t testAutoPurgeConfig) MinAttempts() uint32 { return t.minAttempts }
func (t testAutoPurgeConfig) Threshold() *uint32 { return t.threshold }
func (t testAutoPurgeConfig) MinAttempts() *uint32 { return t.minAttempts }
func (t testAutoPurgeConfig) DetectionApiUrl() *url.URL { return t.detectionApiUrl }
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,16 @@ func NewCommitOffchainConfig(
ExecGasPriceDeviationPPB uint32,
TokenPriceHeartBeat config.Duration,
TokenPriceDeviationPPB uint32,
InflightCacheExpiry config.Duration) CommitOffchainConfig {
InflightCacheExpiry config.Duration,
priceReportingDisabled bool) CommitOffchainConfig {
return CommitOffchainConfig{v1_2_0.JSONCommitOffchainConfig{
GasPriceHeartBeat: GasPriceHeartBeat,
DAGasPriceDeviationPPB: DAGasPriceDeviationPPB,
ExecGasPriceDeviationPPB: ExecGasPriceDeviationPPB,
TokenPriceHeartBeat: TokenPriceHeartBeat,
TokenPriceDeviationPPB: TokenPriceDeviationPPB,
InflightCacheExpiry: InflightCacheExpiry,
PriceReportingDisabled: priceReportingDisabled,
}}
}

Expand Down
1 change: 1 addition & 0 deletions core/services/ocr2/plugins/ccip/testhelpers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (c *CCIPContracts) createCommitOffchainConfig(t *testing.T, feeUpdateHearBe
*config.MustNewDuration(feeUpdateHearBeat),
1,
*config.MustNewDuration(inflightCacheExpiry),
false,
).Encode()
require.NoError(t, err)
return config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,16 @@ func NewCommitOffchainConfig(
ExecGasPriceDeviationPPB uint32,
TokenPriceHeartBeat config.Duration,
TokenPriceDeviationPPB uint32,
InflightCacheExpiry config.Duration) CommitOffchainConfig {
InflightCacheExpiry config.Duration,
priceReportingDisabled bool) CommitOffchainConfig {
return CommitOffchainConfig{v1_2_0.JSONCommitOffchainConfig{
GasPriceHeartBeat: GasPriceHeartBeat,
DAGasPriceDeviationPPB: DAGasPriceDeviationPPB,
ExecGasPriceDeviationPPB: ExecGasPriceDeviationPPB,
TokenPriceHeartBeat: TokenPriceHeartBeat,
TokenPriceDeviationPPB: TokenPriceDeviationPPB,
InflightCacheExpiry: InflightCacheExpiry,
PriceReportingDisabled: priceReportingDisabled,
}}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (c *CCIPContracts) createCommitOffchainConfig(t *testing.T, feeUpdateHearBe
*config.MustNewDuration(feeUpdateHearBeat),
1,
*config.MustNewDuration(inflightCacheExpiry),
false,
).Encode()
require.NoError(t, err)
return config
Expand Down
61 changes: 38 additions & 23 deletions integration-tests/ccip-tests/actions/ccip_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/smartcontractkit/chainlink-testing-framework/k8s/pkg/helm/mockserver"
"github.com/smartcontractkit/chainlink-testing-framework/k8s/pkg/helm/reorg"
"github.com/smartcontractkit/chainlink-testing-framework/networks"

"github.com/smartcontractkit/chainlink/integration-tests/ccip-tests/contracts"
"github.com/smartcontractkit/chainlink/integration-tests/ccip-tests/contracts/laneconfig"
"github.com/smartcontractkit/chainlink/integration-tests/ccip-tests/testconfig"
Expand Down Expand Up @@ -185,6 +184,7 @@ type CCIPCommon struct {
tokenPriceUpdateWatcher map[common.Address]*big.Int // key - token; value - timestamp of update
gasUpdateWatcherMu *sync.Mutex
gasUpdateWatcher map[uint64]*big.Int // key - destchain id; value - timestamp of update
GasUpdateEvents []contracts.GasUpdateEvent
}

// FreeUpUnusedSpace sets nil to various elements of ccipModule which are only used
Expand Down Expand Up @@ -524,6 +524,9 @@ func (ccipModule *CCIPCommon) WaitForPriceUpdates(
}
}

// WatchForPriceUpdates helps to ensure the price updates are happening in price registry by subscribing to a couple
// of price update events and add the event details to watchers. It subscribes to 'UsdPerUnitGasUpdated'
// and 'UsdPerTokenUpdated' event.
func (ccipModule *CCIPCommon) WatchForPriceUpdates(ctx context.Context, lggr *zerolog.Logger) error {
gasUpdateEventLatest := make(chan *price_registry.PriceRegistryUsdPerUnitGasUpdated)
tokenUpdateEvent := make(chan *price_registry.PriceRegistryUsdPerTokenUpdated)
Expand All @@ -549,20 +552,28 @@ func (ccipModule *CCIPCommon) WatchForPriceUpdates(ctx context.Context, lggr *ze
if tokenUpdateSub == nil {
return fmt.Errorf("no event subscription found")
}
processEvent := func(timestamp *big.Int, destChainSelector uint64) error {
processEvent := func(value, timestamp *big.Int, destChainSelector uint64, raw types.Log) error {
destChain, err := chainselectors.ChainIdFromSelector(destChainSelector)
if err != nil {
return err
}
ccipModule.gasUpdateWatcherMu.Lock()
ccipModule.gasUpdateWatcher[destChain] = timestamp

ccipModule.GasUpdateEvents = append(ccipModule.GasUpdateEvents, contracts.GasUpdateEvent{
Sender: raw.Address.Hex(),
Tx: raw.TxHash.Hex(),
Value: value,
DestChain: destChain,
Source: ccipModule.ChainClient.GetNetworkName(),
})
ccipModule.gasUpdateWatcherMu.Unlock()
lggr.Info().
Uint64("chainSelector", destChainSelector).
Str("source_chain", ccipModule.ChainClient.GetNetworkName()).
Uint64("dest_chain", destChain).
Str("price_registry", ccipModule.PriceRegistry.Address()).
Msgf("UsdPerUnitGasUpdated event received for dest chain %d source chain %s",
Str("tx hash", raw.TxHash.Hex()).
Msgf("UsdPerUnitGasUpdated event received for dest chain: %d, source chain: %s",
destChain, ccipModule.ChainClient.GetNetworkName())
return nil
}
Expand All @@ -572,13 +583,14 @@ func (ccipModule *CCIPCommon) WatchForPriceUpdates(ctx context.Context, lggr *ze
tokenUpdateSub.Unsubscribe()
ccipModule.gasUpdateWatcher = nil
ccipModule.gasUpdateWatcherMu = nil
ccipModule.GasUpdateEvents = nil
ccipModule.tokenPriceUpdateWatcher = nil
ccipModule.tokenPriceUpdateWatcherMu = nil
}()
for {
select {
case e := <-gasUpdateEventLatest:
err := processEvent(e.Timestamp, e.DestChain)
err := processEvent(e.Value, e.Timestamp, e.DestChain, e.Raw)
if err != nil {
continue
}
Expand Down Expand Up @@ -2623,23 +2635,24 @@ func CCIPRequestFromTxHash(txHash common.Hash, chainClient blockchain.EVMClient)
}

type CCIPLane struct {
Test *testing.T
Logger *zerolog.Logger
SourceNetworkName string
DestNetworkName string
SourceChain blockchain.EVMClient
DestChain blockchain.EVMClient
Source *SourceCCIPModule
Dest *DestCCIPModule
NumberOfReq int
Reports *testreporters.CCIPLaneStats
Balance *BalanceSheet
SentReqs map[common.Hash][]CCIPRequest
TotalFee *big.Int // total fee for all the requests. Used for balance validation.
ValidationTimeout time.Duration
Context context.Context
SrcNetworkLaneCfg *laneconfig.LaneConfig
DstNetworkLaneCfg *laneconfig.LaneConfig
Test *testing.T
Logger *zerolog.Logger
SourceNetworkName string
DestNetworkName string
SourceChain blockchain.EVMClient
DestChain blockchain.EVMClient
Source *SourceCCIPModule
Dest *DestCCIPModule
NumberOfReq int
Reports *testreporters.CCIPLaneStats
Balance *BalanceSheet
SentReqs map[common.Hash][]CCIPRequest
TotalFee *big.Int // total fee for all the requests. Used for balance validation.
ValidationTimeout time.Duration
Context context.Context
SrcNetworkLaneCfg *laneconfig.LaneConfig
DstNetworkLaneCfg *laneconfig.LaneConfig
PriceReportingDisabled bool
}

func (lane *CCIPLane) TokenPricesConfig() (string, error) {
Expand Down Expand Up @@ -3645,7 +3658,7 @@ func (lane *CCIPLane) DeployNewCCIPLane(

jobParams.P2PV2Bootstrappers = []string{p2pBootstrappersCommit.P2PV2Bootstrapper()}

err = SetOCR2Config(lane.Context, lane.Logger, *testConf, commitNodes, execNodes, *lane.Dest)
err = SetOCR2Config(lane.Context, lane.Logger, *testConf, commitNodes, execNodes, *lane.Dest, lane.PriceReportingDisabled)
if err != nil {
return fmt.Errorf("failed to set ocr2 config: %w", err)
}
Expand Down Expand Up @@ -3683,6 +3696,7 @@ func SetOCR2Config(
commitNodes,
execNodes []*client.CLNodesWithKeys,
destCCIP DestCCIPModule,
priceReportingDisabled bool,
) error {
inflightExpiryExec := commonconfig.MustNewDuration(InflightExpiryExec)
inflightExpiryCommit := commonconfig.MustNewDuration(InflightExpiryCommit)
Expand Down Expand Up @@ -3719,6 +3733,7 @@ func SetOCR2Config(
*commonconfig.MustNewDuration(5 * time.Second),
1e6,
*inflightExpiryCommit,
priceReportingDisabled,
)
if err != nil {
return fmt.Errorf("failed to create commit offchain config: %w", err)
Expand Down
5 changes: 4 additions & 1 deletion integration-tests/ccip-tests/contracts/contract_deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1399,7 +1399,8 @@ func NewCommitOffchainConfig(
ExecGasPriceDeviationPPB uint32,
TokenPriceHeartBeat config.Duration,
TokenPriceDeviationPPB uint32,
InflightCacheExpiry config.Duration) (ccipconfig.OffchainConfig, error) {
InflightCacheExpiry config.Duration,
priceReportingDisabled bool) (ccipconfig.OffchainConfig, error) {
switch VersionMap[CommitStoreContract] {
case Latest:
return testhelpers.NewCommitOffchainConfig(
Expand All @@ -1409,6 +1410,7 @@ func NewCommitOffchainConfig(
TokenPriceHeartBeat,
TokenPriceDeviationPPB,
InflightCacheExpiry,
priceReportingDisabled,
), nil
case V1_2_0:
return testhelpers_1_4_0.NewCommitOffchainConfig(
Expand All @@ -1418,6 +1420,7 @@ func NewCommitOffchainConfig(
TokenPriceHeartBeat,
TokenPriceDeviationPPB,
InflightCacheExpiry,
priceReportingDisabled,
), nil
default:
return nil, fmt.Errorf("version not supported: %s", VersionMap[CommitStoreContract])
Expand Down
Loading

0 comments on commit df0ae3f

Please sign in to comment.