Skip to content

Commit

Permalink
apply ws optional url PR
Browse files Browse the repository at this point in the history
  • Loading branch information
huangzhen1997 committed Sep 26, 2024
1 parent f83aad3 commit 46decc6
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 31 deletions.
4 changes: 2 additions & 2 deletions core/chains/evm/client/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ func NewEvmClient(cfg evmconfig.NodePool, chainCfg commonclient.ChainConfig, cli
ws = (url.URL)(*node.WSURL)
}
if node.SendOnly != nil && *node.SendOnly {
rpc := NewRPCClient(lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID,
rpc := NewRPCClient(lggr, &ws, (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID,
commonclient.Secondary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
sendonly := commonclient.NewSendOnlyNode(lggr, (url.URL)(*node.HTTPURL),
*node.Name, chainID, rpc)
sendonlys = append(sendonlys, sendonly)
} else {
rpc := NewRPCClient(lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, int32(i),
rpc := NewRPCClient(lggr, &ws, (*url.URL)(node.HTTPURL), *node.Name, int32(i),
chainID, commonclient.Primary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
primaryNode := commonclient.NewNode(cfg, chainCfg,
lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID, *node.Order,
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/client/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func NewChainClientWithTestNode(
}

lggr := logger.Test(t)
rpc := NewRPCClient(lggr, *parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := NewRPCClient(lggr, parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")

n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient](
nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM")
Expand All @@ -157,7 +157,7 @@ func NewChainClientWithTestNode(
return nil, pkgerrors.Errorf("sendonly ethereum rpc url scheme must be http(s): %s", u.String())
}
var empty url.URL
rpc := NewRPCClient(lggr, empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := NewRPCClient(lggr, &empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
s := commonclient.NewSendOnlyNode[*big.Int, RPCClient](
lggr, u, fmt.Sprintf("eth-sendonly-%d", i), chainID, rpc)
sendonlys = append(sendonlys, s)
Expand Down
40 changes: 27 additions & 13 deletions core/chains/evm/client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type rpcClient struct {
newHeadsPollInterval time.Duration
chainType chaintype.ChainType

ws rawclient
ws *rawclient
http *rawclient

stateMu sync.RWMutex // protects state* fields
Expand All @@ -152,7 +152,7 @@ type rpcClient struct {
// NewRPCCLient returns a new *rpcClient as commonclient.RPC
func NewRPCClient(
lggr logger.Logger,
wsuri url.URL,
wsuri *url.URL,
httpuri *url.URL,
name string,
id int32,
Expand All @@ -173,9 +173,11 @@ func NewRPCClient(
r.id = id
r.chainID = chainID
r.tier = tier
r.ws.uri = wsuri
r.finalizedBlockPollInterval = finalizedBlockPollInterval
r.newHeadsPollInterval = newHeadsPollInterval
if wsuri != nil {
r.ws = &rawclient{uri: *wsuri}
}
if httpuri != nil {
r.http = &rawclient{uri: *httpuri}
}
Expand All @@ -197,13 +199,13 @@ func (r *rpcClient) Dial(callerCtx context.Context) error {
ctx, cancel := r.makeQueryCtx(callerCtx, r.rpcTimeout)
defer cancel()

if r.ws.uri.String() == "" && r.http == nil {
if r.ws == nil && r.http == nil {
return errors.New("cannot dial rpc client when both ws and http info are missing")
}

promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc()
lggr := r.rpcLog
if r.ws.uri.String() != "" {
if r.ws != nil {
lggr = lggr.With("wsuri", r.ws.uri.Redacted())
wsrpc, err := rpc.DialWebsocket(ctx, r.ws.uri.String(), "")
if err != nil {
Expand Down Expand Up @@ -232,7 +234,7 @@ func (r *rpcClient) Dial(callerCtx context.Context) error {
// It can only return error if the URL is malformed.
func (r *rpcClient) DialHTTP() error {
promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc()
lggr := r.rpcLog.With("httpuri", r.ws.uri.Redacted())
lggr := r.rpcLog.With("httpuri", r.http.uri.Redacted())
lggr.Debugw("RPC dial: evmclient.Client#dial")

var httprpc *rpc.Client
Expand Down Expand Up @@ -271,7 +273,10 @@ func (r *rpcClient) cancelInflightRequests() {
}

func (r *rpcClient) String() string {
s := fmt.Sprintf("(%s)%s:%s", r.tier.String(), r.name, r.ws.uri.Redacted())
s := fmt.Sprintf("(%s)%s", r.tier.String(), r.name)
if r.ws != nil {
s = s + fmt.Sprintf(":%s", r.ws.uri.Redacted())
}
if r.http != nil {
s = s + fmt.Sprintf(":%s", r.http.uri.Redacted())
}
Expand Down Expand Up @@ -503,6 +508,11 @@ func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtyp
return nil, err
}

err = r.registerSub(&poller, chStopInFlight)
if err != nil {
return
}

go func() {
for head := range pollerCh {
select {
Expand Down Expand Up @@ -1268,11 +1278,12 @@ func (r *rpcClient) ClientVersion(ctx context.Context) (version string, err erro
}

func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (_ ethereum.Subscription, err error) {
if r.ws.uri.String() == "" {
return nil, errors.New("SubscribeFilterLogs is not allowed without ws url")
}
ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx, r.rpcTimeout)
defer cancel()
if ws == nil {
return nil, errors.New("SubscribeFilterLogs is not allowed without ws url")
}

lggr := r.newRqLggr().With("q", q)

lggr.Debug("RPC call: evmclient.Client#SubscribeFilterLogs")
Expand Down Expand Up @@ -1377,18 +1388,21 @@ func (r *rpcClient) wrapHTTP(err error) error {
}

// makeLiveQueryCtxAndSafeGetClients wraps makeQueryCtx
func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, ws rawclient, http *rawclient) {
func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, ws *rawclient, http *rawclient) {
ctx, cancel, _, ws, http = r.acquireQueryCtx(parentCtx, timeout)
return
}

func (r *rpcClient) acquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc,
chStopInFlight chan struct{}, ws rawclient, http *rawclient) {
chStopInFlight chan struct{}, ws *rawclient, http *rawclient) {
// Need to wrap in mutex because state transition can cancel and replace the
// context
r.stateMu.RLock()
chStopInFlight = r.chStopInFlight
ws = r.ws
if r.ws != nil {
cp := *r.ws
ws = &cp
}
if r.http != nil {
cp := *r.http
http = &cp
Expand Down
26 changes: 13 additions & 13 deletions core/chains/evm/client/rpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) {
t.Run("WS and HTTP URL cannot be both empty", func(t *testing.T) {
// ws is optional when LogBroadcaster is disabled, however SubscribeFilterLogs will return error if ws is missing
observedLggr, _ := logger.TestObserved(t, zap.DebugLevel)
rpcClient := client.NewRPCClient(observedLggr, url.URL{}, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpcClient := client.NewRPCClient(observedLggr, nil, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
require.Equal(t, errors.New("cannot dial rpc client when both ws and http info are missing"), rpcClient.Dial(ctx))
})
t.Run("Updates chain info on new blocks", func(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()

rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))
// set to default values
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()

rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))
ch := make(chan *evmtypes.Head)
Expand All @@ -142,7 +142,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()

rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))
var wg sync.WaitGroup
Expand All @@ -166,7 +166,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) {
t.Run("Block's chain ID matched configured", func(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))
ch := make(chan *evmtypes.Head)
Expand All @@ -183,7 +183,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) {
})
wsURL := server.WSURL()
observedLggr, observed := logger.TestObserved(t, zap.DebugLevel)
rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(observedLggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
require.NoError(t, rpc.Dial(ctx))
server.Close()
_, err := rpc.SubscribeNewHead(ctx, make(chan *evmtypes.Head))
Expand All @@ -193,7 +193,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) {
t.Run("Subscription error is properly wrapper", func(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))
sub, err := rpc.SubscribeNewHead(ctx, make(chan *evmtypes.Head))
Expand All @@ -218,7 +218,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) {
t.Run("Failed SubscribeFilterLogs when WSURL is empty", func(t *testing.T) {
// ws is optional when LogBroadcaster is disabled, however SubscribeFilterLogs will return error if ws is missing
observedLggr, _ := logger.TestObserved(t, zap.DebugLevel)
rpcClient := client.NewRPCClient(observedLggr, url.URL{}, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpcClient := client.NewRPCClient(observedLggr, nil, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
require.Nil(t, rpcClient.Dial(ctx))

_, err := rpcClient.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log))
Expand All @@ -230,7 +230,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) {
})
wsURL := server.WSURL()
observedLggr, observed := logger.TestObserved(t, zap.DebugLevel)
rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(observedLggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
require.NoError(t, rpc.Dial(ctx))
server.Close()
_, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log))
Expand All @@ -247,7 +247,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) {
return resp
})
wsURL := server.WSURL()
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))
sub, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log))
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestRPCClient_LatestFinalizedBlock(t *testing.T) {
}

server := createRPCServer()
rpc := client.NewRPCClient(lggr, *server.URL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(lggr, server.URL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
require.NoError(t, rpc.Dial(ctx))
defer rpc.Close()
server.Head = &evmtypes.Head{Number: 128}
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestRpcClientLargePayloadTimeout(t *testing.T) {
// use something unreasonably large for RPC timeout to ensure that we use largePayloadRPCTimeout
const rpcTimeout = time.Hour
const largePayloadRPCTimeout = tests.TestInterval
rpc := client.NewRPCClient(logger.Test(t), *rpcURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, largePayloadRPCTimeout, rpcTimeout, "")
rpc := client.NewRPCClient(logger.Test(t), rpcURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, largePayloadRPCTimeout, rpcTimeout, "")
require.NoError(t, rpc.Dial(ctx))
defer rpc.Close()
err := testCase.Fn(ctx, rpc)
Expand Down Expand Up @@ -446,7 +446,7 @@ func TestAstarCustomFinality(t *testing.T) {

const expectedFinalizedBlockNumber = int64(4)
const expectedFinalizedBlockHash = "0x7441e97acf83f555e0deefef86db636bc8a37eb84747603412884e4df4d22804"
rpcClient := client.NewRPCClient(logger.Test(t), *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainAstar)
rpcClient := client.NewRPCClient(logger.Test(t), wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainAstar)
defer rpcClient.Close()
err := rpcClient.Dial(tests.Context(t))
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# If you want to use a specific commit or a branch you need to switch to the internal ECR in `~/.testsecrets`
# E2E_TEST_CHAINLINK_IMAGE="<aws account number>.dkr.ecr.<aws region>.amazonaws.com/chainlink-ccip"
[CCIP.Env.NewCLCluster.Common.ChainlinkImage]
version = "sha-b4c9bf3-root"
version = "sha-f83aad3-root"

[CCIP]
[CCIP.ContractVersions]
Expand Down

0 comments on commit 46decc6

Please sign in to comment.