From 6d140800299886528309ec5e927365dbf24bc32c Mon Sep 17 00:00:00 2001 From: Aleksandr Bukata Date: Mon, 9 Dec 2024 11:44:30 +0000 Subject: [PATCH] Reapply "CCIP-4403 LBTC attestation cherry-pick (#1564)" (#1566) This reverts commit 4ad51f8bd5bec73df35b3f77e7009df9508e7fe4. --- core/services/ocr2/delegate.go | 7 +- .../plugins/ccip/ccipexec/initializers.go | 14 + .../ocr2/plugins/ccip/config/config.go | 31 +- .../ccip/tokendata/http/http_client.go | 17 +- .../ocr2/plugins/ccip/tokendata/lbtc/lbtc.go | 275 ++++++++++ .../plugins/ccip/tokendata/lbtc/lbtc_test.go | 486 ++++++++++++++++++ core/services/relay/evm/evm.go | 8 +- core/services/relay/evm/exec_provider.go | 113 ++-- 8 files changed, 885 insertions(+), 66 deletions(-) create mode 100644 core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc.go create mode 100644 core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc_test.go diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 8bca97c37e..16c3440dc5 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -1894,7 +1894,7 @@ func (d *Delegate) ccipExecGetDstProvider(ctx context.Context, jb job.Job, plugi // PROVIDER BASED ARG CONSTRUCTION // Write PluginConfig bytes to send source/dest relayer provider + info outside of top level rargs/pargs over the wire - dstConfigBytes, err := newExecPluginConfig(false, pluginJobSpecConfig.SourceStartBlock, pluginJobSpecConfig.DestStartBlock, pluginJobSpecConfig.USDCConfig, string(jb.ID)).Encode() + dstConfigBytes, err := newExecPluginConfig(false, pluginJobSpecConfig.SourceStartBlock, pluginJobSpecConfig.DestStartBlock, pluginJobSpecConfig.USDCConfig, pluginJobSpecConfig.LBTCConfig, string(jb.ID)).Encode() if err != nil { return nil, err } @@ -1927,7 +1927,7 @@ func (d *Delegate) ccipExecGetDstProvider(ctx context.Context, jb job.Job, plugi func (d *Delegate) ccipExecGetSrcProvider(ctx context.Context, jb job.Job, pluginJobSpecConfig ccipconfig.ExecPluginJobSpecConfig, transmitterID string, dstProvider types.CCIPExecProvider) (srcProvider types.CCIPExecProvider, srcChainID uint64, err error) { spec := jb.OCR2OracleSpec - srcConfigBytes, err := newExecPluginConfig(true, pluginJobSpecConfig.SourceStartBlock, pluginJobSpecConfig.DestStartBlock, pluginJobSpecConfig.USDCConfig, string(jb.ID)).Encode() + srcConfigBytes, err := newExecPluginConfig(true, pluginJobSpecConfig.SourceStartBlock, pluginJobSpecConfig.DestStartBlock, pluginJobSpecConfig.USDCConfig, pluginJobSpecConfig.LBTCConfig, string(jb.ID)).Encode() if err != nil { return nil, 0, err } @@ -1976,12 +1976,13 @@ func (d *Delegate) ccipExecGetSrcProvider(ctx context.Context, jb job.Job, plugi return } -func newExecPluginConfig(isSourceProvider bool, srcStartBlock uint64, dstStartBlock uint64, usdcConfig ccipconfig.USDCConfig, jobID string) config.ExecPluginConfig { +func newExecPluginConfig(isSourceProvider bool, srcStartBlock uint64, dstStartBlock uint64, usdcConfig ccipconfig.USDCConfig, lbtcConfig ccipconfig.LBTCConfig, jobID string) config.ExecPluginConfig { return config.ExecPluginConfig{ IsSourceProvider: isSourceProvider, SourceStartBlock: srcStartBlock, DestStartBlock: dstStartBlock, USDCConfig: usdcConfig, + LBTCConfig: lbtcConfig, JobID: jobID, } } diff --git a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go index aa42ff2828..d2d3d32ce9 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go @@ -121,6 +121,20 @@ func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcPro } tokenDataProviders[cciptypes.Address(pluginConfig.USDCConfig.SourceTokenAddress.String())] = usdcReader } + // init lbtc token data provider + if pluginConfig.LBTCConfig.AttestationAPI != "" { + lggr.Infof("LBTC token data provider enabled") + err2 := pluginConfig.LBTCConfig.ValidateLBTCConfig() + if err2 != nil { + return nil, err2 + } + + lbtcReader, err2 := srcProvider.NewTokenDataReader(ctx, ccip.EvmAddrToGeneric(pluginConfig.LBTCConfig.SourceTokenAddress)) + if err2 != nil { + return nil, fmt.Errorf("new lbtc reader: %w", err2) + } + tokenDataProviders[cciptypes.Address(pluginConfig.LBTCConfig.SourceTokenAddress.String())] = lbtcReader + } // Prom wrappers onRampReader = observability.NewObservedOnRampReader(onRampReader, srcChainID, ccip.ExecPluginLabel) diff --git a/core/services/ocr2/plugins/ccip/config/config.go b/core/services/ocr2/plugins/ccip/config/config.go index a24a6edfd1..fbf8d590cf 100644 --- a/core/services/ocr2/plugins/ccip/config/config.go +++ b/core/services/ocr2/plugins/ccip/config/config.go @@ -108,6 +108,7 @@ func (c *DynamicPriceGetterConfig) Validate() error { type ExecPluginJobSpecConfig struct { SourceStartBlock, DestStartBlock uint64 // Only for first time job add. USDCConfig USDCConfig + LBTCConfig LBTCConfig } type USDCConfig struct { @@ -119,10 +120,19 @@ type USDCConfig struct { AttestationAPIIntervalMilliseconds int } +type LBTCConfig struct { + SourceTokenAddress common.Address + AttestationAPI string + AttestationAPITimeoutSeconds uint + // AttestationAPIIntervalMilliseconds can be set to -1 to disable or 0 to use a default interval. + AttestationAPIIntervalMilliseconds int +} + type ExecPluginConfig struct { SourceStartBlock, DestStartBlock uint64 // Only for first time job add. IsSourceProvider bool USDCConfig USDCConfig + LBTCConfig LBTCConfig JobID string } @@ -136,17 +146,30 @@ func (e ExecPluginConfig) Encode() ([]byte, error) { func (uc *USDCConfig) ValidateUSDCConfig() error { if uc.AttestationAPI == "" { - return errors.New("AttestationAPI is required") + return errors.New("USDCConfig: AttestationAPI is required") } if uc.AttestationAPIIntervalMilliseconds < -1 { - return errors.New("AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval") + return errors.New("USDCConfig: AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval") } if uc.SourceTokenAddress == utils.ZeroAddress { - return errors.New("SourceTokenAddress is required") + return errors.New("USDCConfig: SourceTokenAddress is required") } if uc.SourceMessageTransmitterAddress == utils.ZeroAddress { - return errors.New("SourceMessageTransmitterAddress is required") + return errors.New("USDCConfig: SourceMessageTransmitterAddress is required") } return nil } + +func (lc *LBTCConfig) ValidateLBTCConfig() error { + if lc.AttestationAPI == "" { + return errors.New("LBTCConfig: AttestationAPI is required") + } + if lc.AttestationAPIIntervalMilliseconds < -1 { + return errors.New("LBTCConfig: AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval") + } + if lc.SourceTokenAddress == utils.ZeroAddress { + return errors.New("LBTCConfig: SourceTokenAddress is required") + } + return nil +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go b/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go index 79ec21b1b8..dd303822c7 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go +++ b/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go @@ -12,18 +12,21 @@ import ( ) type IHttpClient interface { - // Get issue a GET request to the given url and return the response body and status code. + // Get issues a GET request to the given url and returns the response body and status code. Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, http.Header, error) + + // Post issues a POST request to the given url with the given request data and returns the response body and status code. + Post(ctx context.Context, url string, requestData io.Reader, timeout time.Duration) ([]byte, int, http.Header, error) } type HttpClient struct { } -func (s *HttpClient) Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, http.Header, error) { +func doRequest(ctx context.Context, url string, requestType string, requestBody io.Reader, timeout time.Duration) ([]byte, int, http.Header, error) { // Use a timeout to guard against attestation API hanging, causing observation timeout and failing to make any progress. timeoutCtx, cancel := context.WithTimeoutCause(ctx, timeout, tokendata.ErrTimeout) defer cancel() - req, err := http.NewRequestWithContext(timeoutCtx, http.MethodGet, url, nil) + req, err := http.NewRequestWithContext(timeoutCtx, requestType, url, requestBody) if err != nil { return nil, http.StatusBadRequest, nil, err } @@ -46,3 +49,11 @@ func (s *HttpClient) Get(ctx context.Context, url string, timeout time.Duration) body, err := io.ReadAll(res.Body) return body, res.StatusCode, res.Header, err } + +func (s *HttpClient) Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, http.Header, error) { + return doRequest(ctx, url, http.MethodGet, nil, timeout) +} + +func (s *HttpClient) Post(ctx context.Context, url string, requestBody io.Reader, timeout time.Duration) ([]byte, int, http.Header, error) { + return doRequest(ctx, url, http.MethodPost, requestBody, timeout) +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc.go b/core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc.go new file mode 100644 index 0000000000..94df0c76ad --- /dev/null +++ b/core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc.go @@ -0,0 +1,275 @@ +package lbtc + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/url" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/pkg/errors" + "golang.org/x/time/rate" + + cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/http" +) + +const ( + apiVersion = "v1" + attestationPath = "deposits/getByHash" + defaultAttestationTimeout = 5 * time.Second + + // defaultCoolDownDurationSec defines the default time to wait after getting rate limited. + // this value is only used if the 429 response does not contain the Retry-After header + defaultCoolDownDuration = 30 * time.Second + + // defaultRequestInterval defines the rate in requests per second that the attestation API can be called. + // this is set according to the APIs recommended 5 requests per second rate limit. + defaultRequestInterval = 200 * time.Millisecond + + // APIIntervalRateLimitDisabled is a special value to disable the rate limiting. + APIIntervalRateLimitDisabled = -1 + // APIIntervalRateLimitDefault is a special value to select the default rate limit interval. + APIIntervalRateLimitDefault = 0 +) + +type attestationStatus string + +const ( + attestationStatusUnspecified attestationStatus = "NOTARIZATION_STATUS_UNSPECIFIED" + attestationStatusPending attestationStatus = "NOTARIZATION_STATUS_PENDING" + attestationStatusSubmitted attestationStatus = "NOTARIZATION_STATUS_SUBMITTED" + attestationStatusSessionApproved attestationStatus = "NOTARIZATION_STATUS_SESSION_APPROVED" + attestationStatusFailed attestationStatus = "NOTARIZATION_STATUS_FAILED" +) + +var ( + ErrUnknownResponse = errors.New("unexpected response from attestation API") +) + +type TokenDataReader struct { + lggr logger.Logger + httpClient http.IHttpClient + attestationApi *url.URL + attestationApiTimeout time.Duration + lbtcTokenAddress common.Address + rate *rate.Limiter + + // coolDownUntil defines whether requests are blocked or not. + coolDownUntil time.Time + coolDownMu *sync.RWMutex +} + +type messageAttestationResponse struct { + MessageHash string `json:"message_hash"` + Status attestationStatus `json:"status"` + Attestation string `json:"attestation,omitempty"` // Attestation represented by abi.encode(payload, proof) +} + +type attestationRequest struct { + PayloadHashes []string `json:"messageHash"` +} + +type attestationResponse struct { + Attestations []messageAttestationResponse `json:"attestations"` +} + +type sourceTokenData struct { + SourcePoolAddress []byte + DestTokenAddress []byte + ExtraData []byte + DestGasAmount uint32 +} + +func (m sourceTokenData) AbiString() string { + return `[{ + "components": [ + {"name": "sourcePoolAddress", "type": "bytes"}, + {"name": "destTokenAddress", "type": "bytes"}, + {"name": "extraData", "type": "bytes"}, + {"name": "destGasAmount", "type": "uint32"} + ], + "type": "tuple" + }]` +} + +func (m sourceTokenData) Validate() error { + if len(m.SourcePoolAddress) == 0 { + return errors.New("sourcePoolAddress must be non-empty") + } + if len(m.DestTokenAddress) == 0 { + return errors.New("destTokenAddress must be non-empty") + } + if len(m.ExtraData) == 0 { + return errors.New("extraData must be non-empty") + } + return nil +} + +var _ tokendata.Reader = &TokenDataReader{} + +func NewLBTCTokenDataReader( + lggr logger.Logger, + lbtcAttestationApi *url.URL, + lbtcAttestationApiTimeoutSeconds int, + lbtcTokenAddress common.Address, + requestInterval time.Duration, +) *TokenDataReader { + timeout := time.Duration(lbtcAttestationApiTimeoutSeconds) * time.Second + if lbtcAttestationApiTimeoutSeconds == 0 { + timeout = defaultAttestationTimeout + } + + if requestInterval == APIIntervalRateLimitDisabled { + requestInterval = 0 + } else if requestInterval == APIIntervalRateLimitDefault { + requestInterval = defaultRequestInterval + } + + return &TokenDataReader{ + lggr: lggr, + httpClient: http.NewObservedIHttpClient(&http.HttpClient{}), + attestationApi: lbtcAttestationApi, + attestationApiTimeout: timeout, + lbtcTokenAddress: lbtcTokenAddress, + coolDownMu: &sync.RWMutex{}, + rate: rate.NewLimiter(rate.Every(requestInterval), 1), + } +} + +func NewLBTCTokenDataReaderWithHttpClient( + origin TokenDataReader, + httpClient http.IHttpClient, + lbtcTokenAddress common.Address, + requestInterval time.Duration, +) *TokenDataReader { + return &TokenDataReader{ + lggr: origin.lggr, + httpClient: httpClient, + attestationApi: origin.attestationApi, + attestationApiTimeout: origin.attestationApiTimeout, + coolDownMu: origin.coolDownMu, + lbtcTokenAddress: lbtcTokenAddress, + rate: rate.NewLimiter(rate.Every(requestInterval), 1), + } +} + +// ReadTokenData queries the LBTC attestation API. +func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta, tokenIndex int) ([]byte, error) { + if tokenIndex < 0 || tokenIndex >= len(msg.TokenAmounts) { + return nil, fmt.Errorf("token index out of bounds") + } + + if s.inCoolDownPeriod() { + // rate limiting cool-down period, we prevent new requests from being sent + return nil, tokendata.ErrRequestsBlocked + } + + if s.rate != nil { + // Wait blocks until it the attestation API can be called or the + // context is Done. + if waitErr := s.rate.Wait(ctx); waitErr != nil { + return nil, fmt.Errorf("lbtc rate limiting error: %w", waitErr) + } + } + + decodedSourceTokenData, err := abihelpers.DecodeAbiStruct[sourceTokenData](msg.SourceTokenData[tokenIndex]) + if err != nil { + return []byte{}, err + } + destTokenData := decodedSourceTokenData.ExtraData + // We don't have better way to determine if the extraData is a payload or sha256(payload) + // Last parameter of the payload struct is 32-bytes nonce (see Lombard's Bridge._deposit(...) method), + // so we can assume that payload always exceeds 32 bytes + if len(destTokenData) != 32 { + s.lggr.Infow("SourceTokenData.extraData size is not 32. This is deposit payload, not sha256(payload). Attestation is disabled onchain", + "destTokenData", hexutil.Encode(destTokenData)) + return destTokenData, nil + } + payloadHash := [32]byte(destTokenData) + + msgID := hexutil.Encode(msg.MessageID[:]) + payloadHashHex := hexutil.Encode(payloadHash[:]) + s.lggr.Infow("Calling attestation API", "messageBodyHash", payloadHashHex, "messageID", msgID) + + attestationResp, err := s.callAttestationApi(ctx, payloadHash) + if err != nil { + return nil, errors.Wrap(err, "failed calling lbtc attestation API") + } + if attestationResp.Attestations == nil || len(attestationResp.Attestations) == 0 { + return nil, errors.New("attestation response is empty") + } + if len(attestationResp.Attestations) > 1 { + s.lggr.Warnw("Multiple attestations received, expected one", "attestations", attestationResp.Attestations) + } + var attestation messageAttestationResponse + for _, attestationCandidate := range attestationResp.Attestations { + if attestationCandidate.MessageHash == payloadHashHex { + attestation = attestationCandidate + } + } + if attestation == (messageAttestationResponse{}) { + return nil, fmt.Errorf("requested attestation %s not found in response", payloadHashHex) + } + s.lggr.Infow("Got response from attestation API", "messageID", msgID, + "attestationStatus", attestation.Status, "attestation", attestation) + switch attestation.Status { + case attestationStatusSessionApproved: + payloadAndProof, err := hexutil.Decode(attestation.Attestation) + if err != nil { + return nil, err + } + return payloadAndProof, nil + case attestationStatusPending: + return nil, tokendata.ErrNotReady + case attestationStatusSubmitted: + return nil, tokendata.ErrNotReady + default: + s.lggr.Errorw("Unexpected response from attestation API", "attestation", attestation) + return nil, ErrUnknownResponse + } +} + +func (s *TokenDataReader) callAttestationApi(ctx context.Context, lbtcMessageHash [32]byte) (attestationResponse, error) { + attestationUrl := fmt.Sprintf("%s/bridge/%s/%s", s.attestationApi.String(), apiVersion, attestationPath) + request := attestationRequest{PayloadHashes: []string{hexutil.Encode(lbtcMessageHash[:])}} + encodedRequest, err := json.Marshal(request) + requestBuffer := bytes.NewBuffer(encodedRequest) + if err != nil { + return attestationResponse{}, err + } + respRaw, _, _, err := s.httpClient.Post(ctx, attestationUrl, requestBuffer, s.attestationApiTimeout) + switch { + case errors.Is(err, tokendata.ErrRateLimit): + s.setCoolDownPeriod(defaultCoolDownDuration) + return attestationResponse{}, tokendata.ErrRateLimit + case err != nil: + return attestationResponse{}, err + } + var attestationResp attestationResponse + err = json.Unmarshal(respRaw, &attestationResp) + return attestationResp, err +} + +func (s *TokenDataReader) setCoolDownPeriod(d time.Duration) { + s.coolDownMu.Lock() + s.coolDownUntil = time.Now().Add(d) + s.coolDownMu.Unlock() +} + +func (s *TokenDataReader) inCoolDownPeriod() bool { + s.coolDownMu.RLock() + defer s.coolDownMu.RUnlock() + return time.Now().Before(s.coolDownUntil) +} + +func (s *TokenDataReader) Close() error { + return nil +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc_test.go b/core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc_test.go new file mode 100644 index 0000000000..8b65685faa --- /dev/null +++ b/core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc_test.go @@ -0,0 +1,486 @@ +package lbtc + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + + cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata" +) + +var ( + lbtcMessageHash = "0xbc427abf571a5cfcf7c98799d1f0055f4db25f203f657d30026728a19d16f092" + lbtcMessageAttestation = "0x0000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000016000000000000000000000000000000000000000000000000000000000000000e45c70a5050000000000000000000000000000000000000000000000000000000000aa36a7000000000000000000000000845f8e3c214d8d0e4d83fc094f302aa26a12a0bc0000000000000000000000000000000000000000000000000000000000014a34000000000000000000000000845f8e3c214d8d0e4d83fc094f302aa26a12a0bc00000000000000000000000062f10ce5b727edf787ea45776bd050308a61150800000000000000000000000000000000000000000000000000000000000003e60000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000000c000000000000000000000000000000000000000000000000000000000000001200000000000000000000000000000000000000000000000000000000000000040277eeafba008d767c2636d9428f2ebb13ab29ac70337f4fc34b0f5606767cae546f9be3f12160de6d142e5b3c1c3ebd0bf4298662b32b597d0cc5970c7742fc10000000000000000000000000000000000000000000000000000000000000040bbcd60ecc9e06f2effe7c94161219498a1eb435b419387adadb86ec9a52dfb066ce027532517df7216404049d193a25b85c35edfa3e7c5aa4757bfe84887a3980000000000000000000000000000000000000000000000000000000000000040da4a6dc619b5ca2349783cabecc4efdbc910090d3e234d7b8d0430165f8fae532f9a965ceb85c18bb92e059adefa7ce5835850a705761ab9e026d2db4a13ef9a" + payloadAndProof, _ = hexutil.Decode(lbtcMessageAttestation) +) + +func getMockLBTCEndpoint(t *testing.T, response attestationResponse) *httptest.Server { + responseBytes, err := json.Marshal(response) + require.NoError(t, err) + + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write(responseBytes) + require.NoError(t, err) + })) +} + +func TestLBTCReader_callAttestationApi(t *testing.T) { + t.Skipf("Skipping test because it uses the real LBTC attestation API") + attestationURI, err := url.ParseRequestURI("https://bridge-manager.staging.lombard.finance") + require.NoError(t, err) + lggr := logger.TestLogger(t) + lbtcService := NewLBTCTokenDataReader(lggr, attestationURI, 0, common.Address{}, APIIntervalRateLimitDisabled) + + attestation, err := lbtcService.callAttestationApi(context.Background(), [32]byte(common.FromHex(lbtcMessageHash))) + require.NoError(t, err) + + require.Equal(t, lbtcMessageHash, attestation.Attestations[0].MessageHash) + require.Equal(t, attestationStatusSessionApproved, attestation.Attestations[0].Status) + require.Equal(t, lbtcMessageAttestation, attestation.Attestations[0].Attestation) +} + +func TestLBTCReader_callAttestationApiMock(t *testing.T) { + response := attestationResponse{ + Attestations: []messageAttestationResponse{ + { + MessageHash: lbtcMessageHash, + Status: attestationStatusSessionApproved, + Attestation: lbtcMessageAttestation, + }, + }, + } + + ts := getMockLBTCEndpoint(t, response) + defer ts.Close() + attestationURI, err := url.ParseRequestURI(ts.URL) + require.NoError(t, err) + + lggr := logger.TestLogger(t) + lbtcService := NewLBTCTokenDataReader(lggr, attestationURI, 0, common.Address{}, APIIntervalRateLimitDisabled) + attestation, err := lbtcService.callAttestationApi(context.Background(), [32]byte(common.FromHex(lbtcMessageHash))) + require.NoError(t, err) + + require.Equal(t, response.Attestations[0].Status, attestation.Attestations[0].Status) + require.Equal(t, response.Attestations[0].Attestation, attestation.Attestations[0].Attestation) +} + +func TestLBTCReader_callAttestationApiMockError(t *testing.T) { + t.Parallel() + + sessionApprovedResponse := attestationResponse{ + Attestations: []messageAttestationResponse{ + { + MessageHash: lbtcMessageHash, + Status: attestationStatusSessionApproved, + Attestation: lbtcMessageAttestation, + }, + }, + } + + tests := []struct { + name string + getTs func() *httptest.Server + parentTimeoutSeconds int + customTimeoutSeconds int + expectedError error + }{ + { + name: "server error", + getTs: func() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + }, + parentTimeoutSeconds: 60, + expectedError: nil, + }, + { + name: "default timeout", + getTs: func() *httptest.Server { + responseBytes, _ := json.Marshal(sessionApprovedResponse) + + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(defaultAttestationTimeout + time.Second) + _, err := w.Write(responseBytes) + require.NoError(t, err) + })) + }, + parentTimeoutSeconds: 60, + expectedError: tokendata.ErrTimeout, + }, + { + name: "custom timeout", + getTs: func() *httptest.Server { + responseBytes, _ := json.Marshal(sessionApprovedResponse) + + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(2*time.Second + time.Second) + _, err := w.Write(responseBytes) + require.NoError(t, err) + })) + }, + parentTimeoutSeconds: 60, + customTimeoutSeconds: 2, + expectedError: tokendata.ErrTimeout, + }, + { + name: "rate limit", + getTs: func() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusTooManyRequests) + })) + }, + parentTimeoutSeconds: 60, + expectedError: tokendata.ErrRateLimit, + }, + { + name: "parent context timeout", + getTs: func() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(defaultAttestationTimeout + time.Second) + })) + }, + parentTimeoutSeconds: 1, + expectedError: nil, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ts := test.getTs() + defer ts.Close() + + attestationURI, err := url.ParseRequestURI(ts.URL) + require.NoError(t, err) + + lggr := logger.TestLogger(t) + lbtcService := NewLBTCTokenDataReader(lggr, attestationURI, test.customTimeoutSeconds, common.Address{}, APIIntervalRateLimitDisabled) + + parentCtx, cancel := context.WithTimeout(context.Background(), time.Duration(test.parentTimeoutSeconds)*time.Second) + defer cancel() + + _, err = lbtcService.callAttestationApi(parentCtx, [32]byte(common.FromHex(lbtcMessageHash))) + require.Error(t, err) + + if test.expectedError != nil { + require.True(t, errors.Is(err, test.expectedError)) + } + }) + } +} + +func TestLBTCReader_rateLimiting(t *testing.T) { + sessionApprovedResponse := attestationResponse{ + Attestations: []messageAttestationResponse{ + { + MessageHash: lbtcMessageHash, + Status: attestationStatusSessionApproved, + Attestation: lbtcMessageAttestation, + }, + }, + } + + testCases := []struct { + name string + requests uint64 + rateConfig time.Duration + testDuration time.Duration + timeout time.Duration + err string + additionalErr string + }{ + { + name: "no rate limit when disabled", + requests: 10, + rateConfig: APIIntervalRateLimitDisabled, + testDuration: 1 * time.Millisecond, + }, + { + name: "yes rate limited with default config", + requests: 5, + rateConfig: APIIntervalRateLimitDefault, + testDuration: 4 * defaultRequestInterval, + }, + { + name: "yes rate limited with config", + requests: 10, + rateConfig: 50 * time.Millisecond, + testDuration: 9 * 50 * time.Millisecond, + }, + { + name: "request timeout", + requests: 5, + rateConfig: 100 * time.Millisecond, + testDuration: 1 * time.Millisecond, + timeout: 1 * time.Millisecond, + err: "lbtc rate limiting error:", + additionalErr: "token data API timed out", + }, + } + + extraData, err := hexutil.Decode(lbtcMessageHash) + require.NoError(t, err) + + srcTokenData, err := abihelpers.EncodeAbiStruct[sourceTokenData](sourceTokenData{ + SourcePoolAddress: utils.RandomAddress().Bytes(), + DestTokenAddress: utils.RandomAddress().Bytes(), + ExtraData: extraData, + }) + require.NoError(t, err) + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ts := getMockLBTCEndpoint(t, sessionApprovedResponse) + defer ts.Close() + attestationURI, err := url.ParseRequestURI(ts.URL) + require.NoError(t, err) + + lggr := logger.TestLogger(t) + lbtcService := NewLBTCTokenDataReader(lggr, attestationURI, 0, utils.RandomAddress(), tc.rateConfig) + + ctx := context.Background() + if tc.timeout > 0 { + var cf context.CancelFunc + ctx, cf = context.WithTimeout(ctx, tc.timeout) + defer cf() + } + + trigger := make(chan struct{}) + errorChan := make(chan error, tc.requests) + wg := sync.WaitGroup{} + for i := 0; i < int(tc.requests); i++ { + wg.Add(1) + go func() { + defer wg.Done() + + <-trigger + _, err := lbtcService.ReadTokenData(ctx, cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + EVM2EVMMessage: cciptypes.EVM2EVMMessage{ + SourceTokenData: [][]byte{srcTokenData}, + TokenAmounts: []cciptypes.TokenAmount{{Token: ccipcalc.EvmAddrToGeneric(utils.ZeroAddress), Amount: nil}}, // trigger failure due to wrong address + }, + }, 0) + + errorChan <- err + }() + } + + // Start the test + start := time.Now() + close(trigger) + + // Wait for requests to complete + wg.Wait() + finish := time.Now() + close(errorChan) + + // Collect errors + errorFound := false + for err := range errorChan { + if tc.err != "" && strings.Contains(err.Error(), tc.err) { + errorFound = true + } else if tc.additionalErr != "" && strings.Contains(err.Error(), tc.additionalErr) { + errorFound = true + } else if err != nil { + require.Fail(t, "unexpected error", err) + } + } + + if tc.err != "" { + assert.True(t, errorFound) + } + assert.WithinDuration(t, start.Add(tc.testDuration), finish, 50*time.Millisecond) + }) + } +} + +func TestLBTCReader_skipApiOnFullPayload(t *testing.T) { + sessionApprovedResponse := attestationResponse{ + Attestations: []messageAttestationResponse{ + { + MessageHash: lbtcMessageHash, + Status: attestationStatusSessionApproved, + Attestation: lbtcMessageAttestation, + }, + }, + } + + srcTokenData, err := abihelpers.EncodeAbiStruct[sourceTokenData](sourceTokenData{ + SourcePoolAddress: utils.RandomAddress().Bytes(), + DestTokenAddress: utils.RandomAddress().Bytes(), + ExtraData: []byte(lbtcMessageHash), // more than 32 bytes + }) + require.NoError(t, err) + + ts := getMockLBTCEndpoint(t, sessionApprovedResponse) + defer ts.Close() + attestationURI, err := url.ParseRequestURI(ts.URL) + require.NoError(t, err) + + lggr, logs := logger.TestLoggerObserved(t, zapcore.InfoLevel) + lbtcService := NewLBTCTokenDataReader(lggr, attestationURI, 0, utils.RandomAddress(), APIIntervalRateLimitDefault) + + ctx := context.Background() + + destTokenData, err := lbtcService.ReadTokenData(ctx, cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + EVM2EVMMessage: cciptypes.EVM2EVMMessage{ + SourceTokenData: [][]byte{srcTokenData}, + TokenAmounts: []cciptypes.TokenAmount{{Token: ccipcalc.EvmAddrToGeneric(utils.ZeroAddress), Amount: nil}}, // trigger failure due to wrong address + }, + }, 0) + require.NoError(t, err) + require.EqualValues(t, []byte(lbtcMessageHash), destTokenData) + + require.Equal(t, 1, logs.Len()) + require.Contains(t, logs.All()[0].Message, "SourceTokenData.extraData size is not 32. This is deposit payload, not sha256(payload). Attestation is disabled onchain") +} + +func TestLBTCReader_expectedOutput(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + response attestationResponse + expectedReturn []byte + expectedError error + }{ + { + name: "expected payloadAndProof when status SESSION_APPROVED", + response: attestationResponse{ + Attestations: []messageAttestationResponse{ + { + MessageHash: lbtcMessageHash, + Status: attestationStatusSessionApproved, + Attestation: lbtcMessageAttestation, + }, + }, + }, + expectedReturn: payloadAndProof, + expectedError: nil, + }, + { + name: "expected ErrNotReady on status PENDING", + response: attestationResponse{ + Attestations: []messageAttestationResponse{ + { + MessageHash: lbtcMessageHash, + Status: attestationStatusPending, + Attestation: lbtcMessageAttestation, + }, + }, + }, + expectedReturn: nil, + expectedError: tokendata.ErrNotReady, + }, + { + name: "expected ErrNotReady on status SUBMITTED", + response: attestationResponse{ + Attestations: []messageAttestationResponse{ + { + MessageHash: lbtcMessageHash, + Status: attestationStatusSubmitted, + Attestation: lbtcMessageAttestation, + }, + }, + }, + expectedReturn: nil, + expectedError: tokendata.ErrNotReady, + }, + { + name: "expected ErrUnknownResponse on status UNSPECIFIED", + response: attestationResponse{ + Attestations: []messageAttestationResponse{ + { + MessageHash: lbtcMessageHash, + Status: attestationStatusUnspecified, + Attestation: lbtcMessageAttestation, + }, + }, + }, + expectedReturn: nil, + expectedError: ErrUnknownResponse, + }, + { + name: "expected ErrUnknownResponse on status FAILED", + response: attestationResponse{ + Attestations: []messageAttestationResponse{ + { + MessageHash: lbtcMessageHash, + Status: attestationStatusFailed, + Attestation: lbtcMessageAttestation, + }, + }, + }, + expectedReturn: nil, + expectedError: ErrUnknownResponse, + }, + } + + extraData, err := hexutil.Decode(lbtcMessageHash) + require.NoError(t, err) + + srcTokenData, err := abihelpers.EncodeAbiStruct[sourceTokenData](sourceTokenData{ + SourcePoolAddress: utils.RandomAddress().Bytes(), + DestTokenAddress: utils.RandomAddress().Bytes(), + ExtraData: extraData, + }) + require.NoError(t, err) + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ts := getMockLBTCEndpoint(t, tc.response) + defer ts.Close() + attestationURI, err := url.ParseRequestURI(ts.URL) + require.NoError(t, err) + + lggr := logger.TestLogger(t) + lbtcService := NewLBTCTokenDataReader(lggr, attestationURI, 0, utils.RandomAddress(), APIIntervalRateLimitDefault) + + ctx := context.Background() + + payloadAndProof, err := lbtcService.ReadTokenData(ctx, cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + EVM2EVMMessage: cciptypes.EVM2EVMMessage{ + SourceTokenData: [][]byte{srcTokenData}, + TokenAmounts: []cciptypes.TokenAmount{{Token: ccipcalc.EvmAddrToGeneric(utils.ZeroAddress), Amount: nil}}, // trigger failure due to wrong address + }, + }, 0) + + if tc.expectedReturn != nil { + require.EqualValues(t, tc.expectedReturn, payloadAndProof) + } else if tc.expectedError != nil { + require.True(t, strings.Contains(err.Error(), tc.expectedError.Error())) + } + }) + } +} + +func Test_DecodeSourceTokenData(t *testing.T) { + input, err := hexutil.Decode("0x0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000008000000000000000000000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000249f00000000000000000000000000000000000000000000000000000000000000020000000000000000000000000267d40f64ecc4d95f3e8b2237df5f37b10812c250000000000000000000000000000000000000000000000000000000000000020000000000000000000000000c47e4b3124597fdf8dd07843d4a7052f2ee80c3000000000000000000000000000000000000000000000000000000000000000e45c70a5050000000000000000000000000000000000000000000000000000000000aa36a7000000000000000000000000845f8e3c214d8d0e4d83fc094f302aa26a12a0bc0000000000000000000000000000000000000000000000000000000000014a34000000000000000000000000845f8e3c214d8d0e4d83fc094f302aa26a12a0bc00000000000000000000000062f10ce5b727edf787ea45776bd050308a61150800000000000000000000000000000000000000000000000000000000000003e6000000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000") + require.NoError(t, err) + decoded, err := abihelpers.DecodeAbiStruct[sourceTokenData](input) + require.NoError(t, err) + expected, err := hexutil.Decode("0x5c70a5050000000000000000000000000000000000000000000000000000000000aa36a7000000000000000000000000845f8e3c214d8d0e4d83fc094f302aa26a12a0bc0000000000000000000000000000000000000000000000000000000000014a34000000000000000000000000845f8e3c214d8d0e4d83fc094f302aa26a12a0bc00000000000000000000000062f10ce5b727edf787ea45776bd050308a61150800000000000000000000000000000000000000000000000000000000000003e60000000000000000000000000000000000000000000000000000000000000006") + require.NoError(t, err) + require.Equal(t, expected, decoded.ExtraData) +} diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 1dff3e7b7a..516b418ac9 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -563,8 +563,6 @@ func (r *Relayer) NewCCIPExecProvider(rargs commontypes.RelayArgs, pargs commont return nil, err } - usdcConfig := execPluginConfig.USDCConfig - feeEstimatorConfig := estimatorconfig.NewFeeEstimatorConfigService() // CCIPExec reads when dest chain is mantle, and uses it to calc boosting in batching @@ -591,10 +589,8 @@ func (r *Relayer) NewCCIPExecProvider(rargs commontypes.RelayArgs, pargs commont r.chain.LogPoller(), execPluginConfig.SourceStartBlock, execPluginConfig.JobID, - usdcConfig.AttestationAPI, - int(usdcConfig.AttestationAPITimeoutSeconds), - usdcConfig.AttestationAPIIntervalMilliseconds, - usdcConfig.SourceMessageTransmitterAddress, + execPluginConfig.USDCConfig, + execPluginConfig.LBTCConfig, feeEstimatorConfig, ) } diff --git a/core/services/relay/evm/exec_provider.go b/core/services/relay/evm/exec_provider.go index 193f5e3b00..91d5b090fa 100644 --- a/core/services/relay/evm/exec_provider.go +++ b/core/services/relay/evm/exec_provider.go @@ -10,12 +10,13 @@ import ( "go.uber.org/multierr" "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/chainlink-common/pkg/types" commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/lbtc" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" @@ -29,21 +30,21 @@ import ( ) type SrcExecProvider struct { - lggr logger.Logger - versionFinder ccip.VersionFinder - client client.Client - lp logpoller.LogPoller - startBlock uint64 - estimator gas.EvmFeeEstimator - maxGasPrice *big.Int - usdcReader *ccip.USDCReaderImpl - usdcAttestationAPI string - usdcAttestationAPITimeoutSeconds int - usdcAttestationAPIIntervalMilliseconds int - usdcSrcMsgTransmitterAddr common.Address + lggr logger.Logger + versionFinder ccip.VersionFinder + client client.Client + lp logpoller.LogPoller + startBlock uint64 + estimator gas.EvmFeeEstimator + maxGasPrice *big.Int + usdcReader *ccip.USDCReaderImpl + usdcConfig config.USDCConfig + lbtcConfig config.LBTCConfig feeEstimatorConfig estimatorconfig.FeeEstimatorConfigProvider + // TODO: Add lbtc reader & api fields + // these values are nil and are updated for Close() seenOnRampAddress *cciptypes.Address seenSourceChainSelector *uint64 @@ -59,35 +60,31 @@ func NewSrcExecProvider( lp logpoller.LogPoller, startBlock uint64, jobID string, - usdcAttestationAPI string, - usdcAttestationAPITimeoutSeconds int, - usdcAttestationAPIIntervalMilliseconds int, - usdcSrcMsgTransmitterAddr common.Address, + usdcConfig config.USDCConfig, + lbtcConfig config.LBTCConfig, feeEstimatorConfig estimatorconfig.FeeEstimatorConfigProvider, ) (commontypes.CCIPExecProvider, error) { var usdcReader *ccip.USDCReaderImpl var err error - if usdcAttestationAPI != "" { - usdcReader, err = ccip.NewUSDCReader(lggr, jobID, usdcSrcMsgTransmitterAddr, lp, true) + if usdcConfig.AttestationAPI != "" { + usdcReader, err = ccip.NewUSDCReader(lggr, jobID, usdcConfig.SourceMessageTransmitterAddress, lp, true) if err != nil { return nil, fmt.Errorf("new usdc reader: %w", err) } } return &SrcExecProvider{ - lggr: lggr, - versionFinder: versionFinder, - client: client, - estimator: estimator, - maxGasPrice: maxGasPrice, - lp: lp, - startBlock: startBlock, - usdcReader: usdcReader, - usdcAttestationAPI: usdcAttestationAPI, - usdcAttestationAPITimeoutSeconds: usdcAttestationAPITimeoutSeconds, - usdcAttestationAPIIntervalMilliseconds: usdcAttestationAPIIntervalMilliseconds, - usdcSrcMsgTransmitterAddr: usdcSrcMsgTransmitterAddr, - feeEstimatorConfig: feeEstimatorConfig, + lggr: lggr, + versionFinder: versionFinder, + client: client, + estimator: estimator, + maxGasPrice: maxGasPrice, + lp: lp, + startBlock: startBlock, + usdcReader: usdcReader, + usdcConfig: usdcConfig, + lbtcConfig: lbtcConfig, + feeEstimatorConfig: feeEstimatorConfig, }, nil } @@ -116,10 +113,10 @@ func (s *SrcExecProvider) Close() error { return ccip.CloseOnRampReader(s.lggr, versionFinder, *s.seenSourceChainSelector, *s.seenDestChainSelector, *s.seenOnRampAddress, s.lp, s.client) }) unregisterFuncs = append(unregisterFuncs, func() error { - if s.usdcAttestationAPI == "" { + if s.usdcConfig.AttestationAPI == "" { return nil } - return ccip.CloseUSDCReader(s.lggr, s.lggr.Name(), s.usdcSrcMsgTransmitterAddr, s.lp) + return ccip.CloseUSDCReader(s.lggr, s.lggr.Name(), s.usdcConfig.SourceMessageTransmitterAddress, s.lp) }) var multiErr error for _, fn := range unregisterFuncs { @@ -195,24 +192,40 @@ func (s *SrcExecProvider) NewPriceRegistryReader(ctx context.Context, addr ccipt return } -func (s *SrcExecProvider) NewTokenDataReader(ctx context.Context, tokenAddress cciptypes.Address) (tokenDataReader cciptypes.TokenDataReader, err error) { - attestationURI, err2 := url.ParseRequestURI(s.usdcAttestationAPI) - if err2 != nil { - return nil, fmt.Errorf("failed to parse USDC attestation API: %w", err2) +func (s *SrcExecProvider) NewTokenDataReader(ctx context.Context, tokenAddress cciptypes.Address) (cciptypes.TokenDataReader, error) { + tokenAddr, err := ccip.GenericAddrToEvm(tokenAddress) + if err != nil { + return nil, fmt.Errorf("failed to parse token address: %w", err) } - tokenAddr, err2 := ccip.GenericAddrToEvm(tokenAddress) - if err2 != nil { - return nil, fmt.Errorf("failed to parse token address: %w", err2) + switch tokenAddr { + case s.usdcConfig.SourceTokenAddress: + attestationURI, err := url.ParseRequestURI(s.usdcConfig.AttestationAPI) + if err != nil { + return nil, fmt.Errorf("failed to parse USDC attestation API: %w", err) + } + return usdc.NewUSDCTokenDataReader( + s.lggr, + s.usdcReader, + attestationURI, + int(s.usdcConfig.AttestationAPITimeoutSeconds), + tokenAddr, + time.Duration(s.usdcConfig.AttestationAPIIntervalMilliseconds)*time.Millisecond, + ), nil + case s.lbtcConfig.SourceTokenAddress: + attestationURI, err := url.ParseRequestURI(s.lbtcConfig.AttestationAPI) + if err != nil { + return nil, fmt.Errorf("failed to parse LBTC attestation API: %w", err) + } + return lbtc.NewLBTCTokenDataReader( + s.lggr, + attestationURI, + int(s.lbtcConfig.AttestationAPITimeoutSeconds), + tokenAddr, + time.Duration(s.lbtcConfig.AttestationAPIIntervalMilliseconds)*time.Millisecond, + ), nil + default: + return nil, fmt.Errorf("unsupported token address: %s", tokenAddress) } - tokenDataReader = usdc.NewUSDCTokenDataReader( - s.lggr, - s.usdcReader, - attestationURI, - s.usdcAttestationAPITimeoutSeconds, - tokenAddr, - time.Duration(s.usdcAttestationAPIIntervalMilliseconds)*time.Millisecond, - ) - return } func (s *SrcExecProvider) NewTokenPoolBatchedReader(ctx context.Context, offRampAddr cciptypes.Address, sourceChainSelector uint64) (cciptypes.TokenPoolBatchedReader, error) {