Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/optional ws url #1527

Merged
merged 16 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/four-kangaroos-appear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add config validation so it requires ws url when http polling disabled #bugfix
11 changes: 11 additions & 0 deletions .changeset/happy-feet-rhyme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"chainlink": minor
---

This PR introduce few changes:
- Add a new config option `EVM.NodePool.NewHeadsPollInterval` (0 by default indicate disabled), which is an interval for polling new block periodically using http client rather than subscribe to ws feed.
- Updated new head handler for polling new head over http, and register the subscription in node lifecycle logic.
- If the polling new heads is enabled, WS new heads subscription will be replaced with the new http based polling.

Note: There will be another PR for making WS URL optional with some extra condition.
#added
13 changes: 13 additions & 0 deletions .changeset/kind-numbers-melt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
"chainlink": minor
---

Adding feature flag for `LogBroadcaster` called `LogBroadcasterEnabled`, which is `true` by default to support backwards compatibility.
Adding `LogBroadcasterEnabled` allows certain chains to completely disable the `LogBroadcaster` feature, which is an old feature (getting replaced by logPoller) that only few products are using it:
* OCR1 Median
* *OCR2 Median when ChainReader is disabled
* *pre-OCR2 Keeper
* Flux Monitor
* Direct RequestOCR1 Median

#added
8 changes: 8 additions & 0 deletions .changeset/moody-rules-agree.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"chainlink": patch
---

- register polling subscription to avoid subscription leaking when rpc client gets closed.
- add a temporary special treatment for SubscribeNewHead before we replace it with SubscribeToHeads. Add a goroutine that forwards new head from poller to caller channel.
- fix a deadlock in poller, by using a new lock for subs slice in rpc client.
#bugfix
8 changes: 8 additions & 0 deletions .changeset/silly-lies-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"chainlink": minor
---

Make websocket URL `WSURL` for `EVM.Nodes` optional, and apply logic so that:
* If WS URL was not provided, SubscribeFilterLogs should fail with an explicit error
* If WS URL was not provided LogBroadcaster should be disabled
#nops
3 changes: 2 additions & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ jobs:
only-new-issues: false # disabled for PRs due to unreliability
args: --out-format colored-line-number,checkstyle:golangci-lint-report.xml
working-directory: ${{ matrix.project.path }}
continue-on-error: true

build-chainlink:
environment: integration
Expand Down Expand Up @@ -584,7 +585,7 @@ jobs:
test_config_chainlink_version: ${{ inputs.evm-ref || github.sha }}
test_config_selected_networks: ${{ env.SELECTED_NETWORKS }}
test_config_logging_run_id: ${{ github.run_id }}
test_config_logstream_log_targets: ${{ vars.LOGSTREAM_LOG_TARGETS }}
test_config_logstream_log_targets: "file"
test_config_test_log_collect: ${{ vars.TEST_LOG_COLLECT }}
cl_repo: ${{ env.CHAINLINK_IMAGE }}
cl_image_tag: ${{ inputs.evm-ref || github.sha }}${{ matrix.product.tag_suffix }}
Expand Down
18 changes: 12 additions & 6 deletions common/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type NodeConfig interface {
FinalizedBlockPollInterval() time.Duration
EnforceRepeatableRead() bool
DeathDeclarationDelay() time.Duration
NewHeadsPollInterval() time.Duration
}

type ChainConfig interface {
Expand Down Expand Up @@ -90,14 +91,14 @@ type node[
services.StateMachine
lfcLog logger.Logger
name string
id int32
id int
chainID CHAIN_ID
nodePoolCfg NodeConfig
chainCfg ChainConfig
order int32
chainFamily string

ws url.URL
ws *url.URL
http *url.URL

rpc RPC
Expand All @@ -120,10 +121,10 @@ func NewNode[
nodeCfg NodeConfig,
chainCfg ChainConfig,
lggr logger.Logger,
wsuri url.URL,
wsuri *url.URL,
httpuri *url.URL,
name string,
id int32,
id int,
chainID CHAIN_ID,
nodeOrder int32,
rpc RPC,
Expand All @@ -135,8 +136,10 @@ func NewNode[
n.chainID = chainID
n.nodePoolCfg = nodeCfg
n.chainCfg = chainCfg
n.ws = wsuri
n.order = nodeOrder
if wsuri != nil {
n.ws = wsuri
}
if httpuri != nil {
n.http = httpuri
}
Expand All @@ -156,7 +159,10 @@ func NewNode[
}

func (n *node[CHAIN_ID, HEAD, RPC]) String() string {
s := fmt.Sprintf("(%s)%s:%s", Primary.String(), n.name, n.ws.String())
s := fmt.Sprintf("(%s)%s", Primary.String(), n.name)
if n.ws != nil {
s = s + fmt.Sprintf(":%s", n.ws.String())
}
if n.http != nil {
s = s + fmt.Sprintf(":%s", n.http.String())
}
Expand Down
9 changes: 7 additions & 2 deletions common/client/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ type testNodeConfig struct {
enforceRepeatableRead bool
finalizedBlockPollInterval time.Duration
deathDeclarationDelay time.Duration
newHeadsPollInterval time.Duration
}

func (n testNodeConfig) NewHeadsPollInterval() time.Duration {
return n.newHeadsPollInterval
}

func (n testNodeConfig) PollFailureThreshold() uint32 {
Expand Down Expand Up @@ -62,10 +67,10 @@ type testNodeOpts struct {
config testNodeConfig
chainConfig clientMocks.ChainConfig
lggr logger.Logger
wsuri url.URL
wsuri *url.URL
httpuri *url.URL
name string
id int32
id int
chainID types.ID
nodeOrder int32
rpc *mockNodeClient[types.ID, Head]
Expand Down
21 changes: 14 additions & 7 deletions core/chains/evm/client/config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewClientConfigs(
deathDeclarationDelay time.Duration,
noNewFinalizedHeadsThreshold time.Duration,
finalizedBlockPollInterval time.Duration,

newHeadsPollInterval time.Duration,
) (commonclient.ChainConfig, evmconfig.NodePool, []*toml.Node, error) {
nodes, err := parseNodeConfigs(nodeCfgs)
if err != nil {
Expand All @@ -59,6 +59,7 @@ func NewClientConfigs(
EnforceRepeatableRead: enforceRepeatableRead,
DeathDeclarationDelay: commonconfig.MustNewDuration(deathDeclarationDelay),
FinalizedBlockPollInterval: commonconfig.MustNewDuration(finalizedBlockPollInterval),
NewHeadsPollInterval: commonconfig.MustNewDuration(newHeadsPollInterval),
}
nodePoolCfg := &evmconfig.NodePoolConfig{C: nodePool}
chainConfig := &evmconfig.EVMConfig{
Expand All @@ -79,15 +80,21 @@ func NewClientConfigs(
func parseNodeConfigs(nodeCfgs []NodeConfig) ([]*toml.Node, error) {
nodes := make([]*toml.Node, len(nodeCfgs))
for i, nodeCfg := range nodeCfgs {
if nodeCfg.WSURL == nil || nodeCfg.HTTPURL == nil {
return nil, fmt.Errorf("node config [%d]: missing WS or HTTP URL", i)
var wsURL, httpURL *commonconfig.URL
// wsUrl requirement will be checked in EVMConfig validation
if nodeCfg.WSURL != nil {
wsURL = commonconfig.MustParseURL(*nodeCfg.WSURL)
}

if nodeCfg.HTTPURL == nil {
return nil, fmt.Errorf("node config [%d]: missing HTTP URL", i)
}
wsUrl := commonconfig.MustParseURL(*nodeCfg.WSURL)
httpUrl := commonconfig.MustParseURL(*nodeCfg.HTTPURL)

httpURL = commonconfig.MustParseURL(*nodeCfg.HTTPURL)
node := &toml.Node{
Name: nodeCfg.Name,
WSURL: wsUrl,
HTTPURL: httpUrl,
WSURL: wsURL,
HTTPURL: httpURL,
SendOnly: nodeCfg.SendOnly,
Order: nodeCfg.Order,
}
Expand Down
9 changes: 6 additions & 3 deletions core/chains/evm/client/config_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ func TestClientConfigBuilder(t *testing.T) {
finalityDepth := ptr(uint32(10))
finalityTagEnabled := ptr(true)
noNewHeadsThreshold := time.Second
newHeadsPollInterval := 0 * time.Second
chainCfg, nodePool, nodes, err := client.NewClientConfigs(selectionMode, leaseDuration, chainTypeStr, nodeConfigs,
pollFailureThreshold, pollInterval, syncThreshold, nodeIsSyncingEnabled, noNewHeadsThreshold, finalityDepth,
finalityTagEnabled, finalizedBlockOffset, enforceRepeatableRead, deathDeclarationDelay, noNewFinalizedBlocksThreshold, pollInterval)
finalityTagEnabled, finalizedBlockOffset, enforceRepeatableRead, deathDeclarationDelay, noNewFinalizedBlocksThreshold,
pollInterval, newHeadsPollInterval)
require.NoError(t, err)

// Validate node pool configs
Expand All @@ -52,6 +54,7 @@ func TestClientConfigBuilder(t *testing.T) {
require.Equal(t, *enforceRepeatableRead, nodePool.EnforceRepeatableRead())
require.Equal(t, deathDeclarationDelay, nodePool.DeathDeclarationDelay())
require.Equal(t, pollInterval, nodePool.FinalizedBlockPollInterval())
require.Equal(t, newHeadsPollInterval, nodePool.NewHeadsPollInterval())

// Validate node configs
require.Equal(t, *nodeConfigs[0].Name, *nodes[0].Name)
Expand Down Expand Up @@ -90,15 +93,15 @@ func TestNodeConfigs(t *testing.T) {
require.Len(t, tomlNodes, len(nodeConfigs))
})

t.Run("parsing missing ws url fails", func(t *testing.T) {
t.Run("ws can be optional", func(t *testing.T) {
nodeConfigs := []client.NodeConfig{
{
Name: ptr("foo1"),
HTTPURL: ptr("http://foo1.test"),
},
}
_, err := client.ParseTestNodeConfigs(nodeConfigs)
require.Error(t, err)
require.Nil(t, err)
})

t.Run("parsing missing http url fails", func(t *testing.T) {
Expand Down
15 changes: 9 additions & 6 deletions core/chains/evm/client/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,25 @@ import (
)

func NewEvmClient(cfg evmconfig.NodePool, chainCfg commonclient.ChainConfig, clientErrors evmconfig.ClientErrors, lggr logger.Logger, chainID *big.Int, nodes []*toml.Node, chainType chaintype.ChainType) Client {
var empty url.URL
var primaries []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]
var sendonlys []commonclient.SendOnlyNode[*big.Int, RPCClient]
largePayloadRPCTimeout, defaultRPCTimeout := getRPCTimeouts(chainType)
for i, node := range nodes {
var ws *url.URL
if node.WSURL != nil {
ws = (*url.URL)(node.WSURL)
}
if node.SendOnly != nil && *node.SendOnly {
rpc := NewRPCClient(lggr, empty, (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID,
commonclient.Secondary, cfg.FinalizedBlockPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
rpc := NewRPCClient(lggr, nil, (*url.URL)(node.HTTPURL), *node.Name, i, chainID,
commonclient.Secondary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
sendonly := commonclient.NewSendOnlyNode(lggr, (url.URL)(*node.HTTPURL),
*node.Name, chainID, rpc)
sendonlys = append(sendonlys, sendonly)
} else {
rpc := NewRPCClient(lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i),
chainID, commonclient.Primary, cfg.FinalizedBlockPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
rpc := NewRPCClient(lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, i,
chainID, commonclient.Primary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
primaryNode := commonclient.NewNode(cfg, chainCfg,
lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID, *node.Order,
lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, i, chainID, *node.Order,
rpc, "EVM")
primaries = append(primaries, primaryNode)
}
Expand Down
4 changes: 3 additions & 1 deletion core/chains/evm/client/evm_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestNewEvmClient(t *testing.T) {
deathDeclarationDelay := time.Second * 3
noNewFinalizedBlocksThreshold := time.Second * 5
finalizedBlockPollInterval := time.Second * 4
newHeadsPollInterval := time.Second * 4
nodeConfigs := []client.NodeConfig{
{
Name: ptr("foo"),
Expand All @@ -40,7 +41,8 @@ func TestNewEvmClient(t *testing.T) {
finalityTagEnabled := ptr(true)
chainCfg, nodePool, nodes, err := client.NewClientConfigs(selectionMode, leaseDuration, chainTypeStr, nodeConfigs,
pollFailureThreshold, pollInterval, syncThreshold, nodeIsSyncingEnabled, noNewHeadsThreshold, finalityDepth,
finalityTagEnabled, finalizedBlockOffset, enforceRepeatableRead, deathDeclarationDelay, noNewFinalizedBlocksThreshold, finalizedBlockPollInterval)
finalityTagEnabled, finalizedBlockOffset, enforceRepeatableRead, deathDeclarationDelay, noNewFinalizedBlocksThreshold,
finalizedBlockPollInterval, newHeadsPollInterval)
require.NoError(t, err)

client := client.NewEvmClient(nodePool, chainCfg, nil, logger.Test(t), testutils.FixtureChainID, nodes, chaintype.ChainType(chainTypeStr))
Expand Down
15 changes: 10 additions & 5 deletions core/chains/evm/client/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type TestNodePoolConfig struct {
NodeErrors config.ClientErrors
EnforceRepeatableReadVal bool
NodeDeathDeclarationDelay time.Duration
NodeNewHeadsPollInterval time.Duration
}

func (tc TestNodePoolConfig) PollFailureThreshold() uint32 { return tc.NodePollFailureThreshold }
Expand All @@ -107,6 +108,10 @@ func (tc TestNodePoolConfig) FinalizedBlockPollInterval() time.Duration {
return tc.NodeFinalizedBlockPollInterval
}

func (tc TestNodePoolConfig) NewHeadsPollInterval() time.Duration {
return tc.NodeNewHeadsPollInterval
}

func (tc TestNodePoolConfig) Errors() config.ClientErrors {
return tc.NodeErrors
}
Expand All @@ -127,7 +132,7 @@ func NewChainClientWithTestNode(
rpcUrl string,
rpcHTTPURL *url.URL,
sendonlyRPCURLs []url.URL,
id int32,
id int,
chainID *big.Int,
) (Client, error) {
parsed, err := url.ParseRequestURI(rpcUrl)
Expand All @@ -140,10 +145,10 @@ func NewChainClientWithTestNode(
}

lggr := logger.Test(t)
rpc := NewRPCClient(lggr, *parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := NewRPCClient(lggr, parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")

n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient](
nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM")
nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM")
primaries := []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]{n}

var sendonlys []commonclient.SendOnlyNode[*big.Int, RPCClient]
Expand All @@ -152,7 +157,7 @@ func NewChainClientWithTestNode(
return nil, pkgerrors.Errorf("sendonly ethereum rpc url scheme must be http(s): %s", u.String())
}
var empty url.URL
rpc := NewRPCClient(lggr, empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := NewRPCClient(lggr, &empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
s := commonclient.NewSendOnlyNode[*big.Int, RPCClient](
lggr, u, fmt.Sprintf("eth-sendonly-%d", i), chainID, rpc)
sendonlys = append(sendonlys, s)
Expand Down Expand Up @@ -198,7 +203,7 @@ func NewChainClientWithMockedRpc(
parsed, _ := url.ParseRequestURI("ws://test")

n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient](
cfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, nil, "eth-primary-node-0", 1, chainID, 1, rpc, "EVM")
cfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, parsed, nil, "eth-primary-node-0", 1, chainID, 1, rpc, "EVM")
primaries := []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]{n}
clientErrors := NewTestClientErrors()
c := NewChainClient(lggr, selectionMode, leaseDuration, noNewHeadsThreshold, primaries, nil, chainID, chainType, &clientErrors, 0)
Expand Down
Loading
Loading