From 8e8247c4c26a793ffdcb6017dd11c95e46702065 Mon Sep 17 00:00:00 2001 From: kostekIV <27210860+kostekIV@users.noreply.github.com> Date: Thu, 16 May 2024 04:59:06 +0200 Subject: [PATCH] Prometheus metrics (#174) * Update jsonrpsee * Add prometheus metrics * Add prometheus to config * Revert one unwanted change * clippy * Update clippy in test & fix warnings * fix ci * Apply suggestions from code review Co-authored-by: Xiliang Chen * More explicit expect --------- Co-authored-by: Xiliang Chen --- .github/workflows/test.yml | 6 +- Cargo.lock | 233 +++++++++++++----- Cargo.toml | 3 +- benches/bench/rate_limit.rs | 6 +- configs/config.yml | 4 + src/extensions/client/endpoint.rs | 18 +- src/extensions/client/health.rs | 6 +- src/extensions/client/mod.rs | 7 +- src/extensions/mod.rs | 2 + src/extensions/prometheus/mod.rs | 93 +++++++ src/extensions/prometheus/rpc_metrics.rs | 115 +++++++++ src/extensions/rate_limit/connection.rs | 5 +- src/extensions/server/mod.rs | 32 ++- src/extensions/server/prometheus.rs | 81 ++++++ src/extensions/server/proxy_get_request.rs | 4 +- src/middlewares/methods/block_tag.rs | 4 +- src/middlewares/methods/cache.rs | 38 ++- src/middlewares/methods/inject_params.rs | 4 +- .../subscriptions/merge_subscription.rs | 6 +- src/server.rs | 9 +- src/utils/mod.rs | 4 +- 21 files changed, 568 insertions(+), 112 deletions(-) create mode 100644 src/extensions/prometheus/mod.rs create mode 100644 src/extensions/prometheus/rpc_metrics.rs create mode 100644 src/extensions/server/prometheus.rs diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 00c0a10..57d63e8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -22,15 +22,15 @@ jobs: - name: Install toolchain uses: dtolnay/rust-toolchain@stable with: - toolchain: nightly-2023-09-29 + toolchain: nightly-2024-02-09 components: rustfmt, clippy - uses: actions/checkout@v3 with: submodules: recursive - name: Check format - run: cargo +nightly-2023-09-29 fmt --all -- --check + run: cargo +nightly-2024-02-09 fmt --all -- --check - name: Check clippy - run: cargo +nightly-2023-09-29 clippy --all-targets --all-features -- -D warnings + run: cargo +nightly-2024-02-09 clippy --all-targets --all-features -- -D warnings - name: Build run: cargo build --verbose - name: Run tests diff --git a/Cargo.lock b/Cargo.lock index aa39f5b..160cd18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -182,7 +182,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.61", + "syn", ] [[package]] @@ -193,7 +193,7 @@ checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" dependencies = [ "proc-macro2", "quote", - "syn 2.0.61", + "syn", ] [[package]] @@ -274,6 +274,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "beef" version = "0.5.2" @@ -543,7 +549,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.61", + "syn", ] [[package]] @@ -832,7 +838,7 @@ checksum = "5c785274071b1b420972453b306eeca06acf4633829db4223b58a2a8c5953bc4" dependencies = [ "proc-macro2", "quote", - "syn 2.0.61", + "syn", ] [[package]] @@ -1005,7 +1011,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.61", + "syn", ] [[package]] @@ -1076,7 +1082,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn 2.0.61", + "syn", ] [[package]] @@ -1141,9 +1147,9 @@ dependencies = [ [[package]] name = "gloo-net" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ac9e8288ae2c632fa9f8657ac70bfe38a1530f345282d7ba66a1f70b72b7dc4" +checksum = "43aaa242d1239a8822c15c645f02166398da4f8b5c4bae795c1f5b44e9eee173" dependencies = [ "futures-channel", "futures-core", @@ -1353,10 +1359,10 @@ dependencies = [ "http", "hyper", "log", - "rustls", - "rustls-native-certs", + "rustls 0.21.12", + "rustls-native-certs 0.6.3", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", ] [[package]] @@ -1641,7 +1647,9 @@ dependencies = [ [[package]] name = "jsonrpsee" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfdb12a2381ea5b2e68c3469ec604a007b367778cdb14d09612c8069ebd616ad" dependencies = [ "jsonrpsee-client-transport", "jsonrpsee-core", @@ -1657,7 +1665,9 @@ dependencies = [ [[package]] name = "jsonrpsee-client-transport" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4978087a58c3ab02efc5b07c5e5e2803024536106fd5506f558db172c889b3aa" dependencies = [ "futures-channel", "futures-util", @@ -1665,11 +1675,12 @@ dependencies = [ "http", "jsonrpsee-core", "pin-project", - "rustls-native-certs", + "rustls-native-certs 0.7.0", + "rustls-pki-types", "soketto", "thiserror", "tokio", - "tokio-rustls", + "tokio-rustls 0.25.0", "tokio-util 0.7.11", "tracing", "url", @@ -1678,10 +1689,11 @@ dependencies = [ [[package]] name = "jsonrpsee-core" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4b257e1ec385e07b0255dde0b933f948b5c8b8c28d42afda9587c3a967b896d" dependencies = [ "anyhow", - "async-lock", "async-trait", "beef", "futures-timer", @@ -1689,20 +1701,23 @@ dependencies = [ "hyper", "jsonrpsee-types", "parking_lot 0.12.2", + "pin-project", "rand 0.8.5", "rustc-hash", "serde", "serde_json", - "soketto", "thiserror", "tokio", + "tokio-stream", "tracing", "wasm-bindgen-futures", ] [[package]] name = "jsonrpsee-http-client" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ccf93fc4a0bfe05d851d37d7c32b7f370fe94336b52a2f0efc5f1981895c2e5" dependencies = [ "async-trait", "hyper", @@ -1720,18 +1735,22 @@ dependencies = [ [[package]] name = "jsonrpsee-proc-macros" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d0bb047e79a143b32ea03974a6bf59b62c2a4c5f5d42a381c907a8bbb3f75c0" dependencies = [ "heck 0.4.1", "proc-macro-crate", "proc-macro2", "quote", - "syn 1.0.109", + "syn", ] [[package]] name = "jsonrpsee-server" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12d8b6a9674422a8572e0b0abb12feeb3f2aeda86528c80d0350c2bd0923ab41" dependencies = [ "futures-util", "http", @@ -1753,19 +1772,22 @@ dependencies = [ [[package]] name = "jsonrpsee-types" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "150d6168405890a7a3231a3c74843f58b8959471f6df76078db2619ddee1d07d" dependencies = [ "anyhow", "beef", "serde", "serde_json", "thiserror", - "tracing", ] [[package]] name = "jsonrpsee-wasm-client" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f448d8eacd945cc17b6c0b42c361531ca36a962ee186342a97cdb8fca679cd77" dependencies = [ "jsonrpsee-client-transport", "jsonrpsee-core", @@ -1774,7 +1796,9 @@ dependencies = [ [[package]] name = "jsonrpsee-ws-client" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58b9db2dfd5bb1194b0ce921504df9ceae210a345bc2f6c5a61432089bbab070" dependencies = [ "http", "jsonrpsee-client-transport", @@ -2366,7 +2390,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.61", + "syn", ] [[package]] @@ -2445,11 +2469,10 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro-crate" -version = "2.0.2" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b00f26d3400549137f92511a46ac1cd8ce37cb5598a96d382381458b992a5d24" +checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" dependencies = [ - "toml_datetime", "toml_edit", ] @@ -2462,6 +2485,20 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if 1.0.0", + "fnv", + "lazy_static", + "memchr", + "parking_lot 0.12.2", + "thiserror", +] + [[package]] name = "prost" version = "0.12.4" @@ -2482,7 +2519,7 @@ dependencies = [ "itertools 0.12.1", "proc-macro2", "quote", - "syn 2.0.61", + "syn", ] [[package]] @@ -2854,10 +2891,24 @@ checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ "log", "ring", - "rustls-webpki", + "rustls-webpki 0.101.7", "sct", ] +[[package]] +name = "rustls" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +dependencies = [ + "log", + "ring", + "rustls-pki-types", + "rustls-webpki 0.102.3", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -2865,7 +2916,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 1.0.4", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-native-certs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.2", + "rustls-pki-types", "schannel", "security-framework", ] @@ -2879,6 +2943,22 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +dependencies = [ + "base64 0.22.1", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -2889,6 +2969,17 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustls-webpki" +version = "0.102.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3bce581c0dd41bce533ce695a1437fa16a7ab5ac3ccfa99fe1a620a7885eabf" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.16" @@ -2987,7 +3078,7 @@ checksum = "c5e405930b9796f1c00bee880d03fc7e0bb4b9a11afc776885ffe84320da2865" dependencies = [ "proc-macro2", "quote", - "syn 2.0.61", + "syn", ] [[package]] @@ -3162,7 +3253,20 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.61", + "syn", +] + +[[package]] +name = "substrate-prometheus-endpoint" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d8fe06b03b8a291c09507c42f92a2c2c10dd3d62975d02c7f64a92d87bfe09b" +dependencies = [ + "hyper", + "log", + "prometheus", + "thiserror", + "tokio", ] [[package]] @@ -3204,6 +3308,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "substrate-prometheus-endpoint", "tokio", "tower", "tower-http", @@ -3235,17 +3340,6 @@ dependencies = [ "symbolic-common", ] -[[package]] -name = "syn" -version = "1.0.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - [[package]] name = "syn" version = "2.0.61" @@ -3319,7 +3413,7 @@ checksum = "e2470041c06ec3ac1ab38d0356a6119054dedaea53e12fbefc0de730a1c08524" dependencies = [ "proc-macro2", "quote", - "syn 2.0.61", + "syn", ] [[package]] @@ -3417,7 +3511,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.61", + "syn", ] [[package]] @@ -3426,7 +3520,18 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls", + "rustls 0.21.12", + "tokio", +] + +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.4", + "rustls-pki-types", "tokio", ] @@ -3439,6 +3544,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util 0.7.11", ] [[package]] @@ -3471,15 +3577,15 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.3" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" +checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" [[package]] name = "toml_edit" -version = "0.20.2" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "396e4d48bbb2b7554c944bde63101b5ae446cff6ec4a24227428f15eb72ef338" +checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ "indexmap 2.2.6", "toml_datetime", @@ -3596,7 +3702,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.61", + "syn", ] [[package]] @@ -3813,7 +3919,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.61", + "syn", "wasm-bindgen-shared", ] @@ -3847,7 +3953,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.61", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3870,9 +3976,12 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.25.4" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009" +dependencies = [ + "rustls-pki-types", +] [[package]] name = "winapi" @@ -4111,9 +4220,15 @@ checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.61", + "syn", ] +[[package]] +name = "zeroize" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" + [[package]] name = "zstd" version = "0.13.1" diff --git a/Cargo.toml b/Cargo.toml index 7e9b72b..b3bb361 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ regex = "1.10.4" serde = "1.0.152" serde_json = "1.0.92" serde_yaml = "0.9.17" +substrate-prometheus-endpoint = "0.17.0" tokio = { version = "1.24.2", features = ["full"] } tower = { version = "0.4.13", features = ["full"] } tower-http = { version = "0.4", features = ["full"] } @@ -44,7 +45,7 @@ tracing-serde = "0.1.3" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } garde = { version = "0.18", features = ["full"] } -jsonrpsee = { path = "./vendor/jsonrpsee/jsonrpsee", features = ["full"] } +jsonrpsee = { version = "0.22.1",features = ["full"] } governor = { path = "./vendor/governor/governor" } [dev-dependencies] diff --git a/benches/bench/rate_limit.rs b/benches/bench/rate_limit.rs index e89678c..6ad6310 100644 --- a/benches/bench/rate_limit.rs +++ b/benches/bench/rate_limit.rs @@ -4,8 +4,8 @@ use futures_util::FutureExt; use governor::Jitter; use governor::RateLimiter; use jsonrpsee::server::middleware::rpc::RpcServiceT; -use jsonrpsee::types::{Request, ResponsePayload}; -use jsonrpsee::MethodResponse; +use jsonrpsee::types::Request; +use jsonrpsee::{MethodResponse, ResponsePayload}; use std::num::NonZeroU32; use std::time::Duration; use subway::extensions::rate_limit::{build_quota, ConnectionRateLimit, IpRateLimit}; @@ -16,7 +16,7 @@ impl RpcServiceT<'static> for MockService { type Future = BoxFuture<'static, MethodResponse>; fn call(&self, req: Request<'static>) -> Self::Future { - async move { MethodResponse::response(req.id, ResponsePayload::result("ok"), 1024) }.boxed() + async move { MethodResponse::response(req.id, ResponsePayload::success("ok"), 1024) }.boxed() } } diff --git a/configs/config.yml b/configs/config.yml index 890f8ae..c0f6511 100644 --- a/configs/config.yml +++ b/configs/config.yml @@ -41,6 +41,10 @@ extensions: # use X-Forwarded-For header to get real ip, if available (e.g. behind a load balancer). # WARNING: Use with caution, as this xff header can be forged. use_xff: true # default is false + prometheus: + port: 9616 + listen_address: "0.0.0.0" + label: "dev" middlewares: methods: diff --git a/src/extensions/client/endpoint.rs b/src/extensions/client/endpoint.rs index f675253..5d6118b 100644 --- a/src/extensions/client/endpoint.rs +++ b/src/extensions/client/endpoint.rs @@ -19,14 +19,14 @@ enum Message { Request { method: String, params: Vec, - response: tokio::sync::oneshot::Sender>, + response: tokio::sync::oneshot::Sender>, timeout: Duration, }, Subscribe { subscribe: String, params: Vec, unsubscribe: String, - response: tokio::sync::oneshot::Sender, jsonrpsee::core::Error>>, + response: tokio::sync::oneshot::Sender, jsonrpsee::core::client::Error>>, timeout: Duration, }, Reconnect, @@ -211,7 +211,7 @@ impl Endpoint { Err(_) => { tracing::warn!("Endpoint {url} request timeout: {method} timeout: {timeout:?}"); health.update(Event::RequestTimeout); - Err(jsonrpsee::core::Error::RequestTimeout) + Err(jsonrpsee::core::client::Error::RequestTimeout) } }; if let Err(err) = &resp { @@ -251,7 +251,7 @@ impl Endpoint { Err(_) => { tracing::warn!("Endpoint {url} subscription timeout: {subscribe}"); health.update(Event::RequestTimeout); - Err(jsonrpsee::core::Error::RequestTimeout) + Err(jsonrpsee::core::client::Error::RequestTimeout) } }; if let Err(err) = &resp { @@ -330,7 +330,7 @@ impl Endpoint { Ok(resp) => resp, Err(err) => { tracing::error!("{url} Unexpected error in response channel: {err}"); - Err(jsonrpsee::core::Error::Custom("Internal server error".into())) + Err(jsonrpsee::core::client::Error::Custom("Internal server error".into())) } }; @@ -382,7 +382,7 @@ impl Endpoint { method: &str, params: Vec, timeout: Duration, - ) -> Result { + ) -> Result { let (response_tx, response_rx) = tokio::sync::oneshot::channel(); let res = self .message_tx @@ -402,7 +402,7 @@ impl Endpoint { Ok(resp) => resp, Err(err) => { tracing::error!("Unexpected error in response channel: {err}"); - Err(jsonrpsee::core::Error::Custom("Internal server error".into())) + Err(jsonrpsee::core::client::Error::Custom("Internal server error".into())) } } } @@ -413,7 +413,7 @@ impl Endpoint { params: Vec, unsubscribe_method: &str, timeout: Duration, - ) -> Result, jsonrpsee::core::Error> { + ) -> Result, jsonrpsee::core::client::Error> { let (response_tx, response_rx) = tokio::sync::oneshot::channel(); let res = self .message_tx @@ -434,7 +434,7 @@ impl Endpoint { Ok(resp) => resp, Err(err) => { tracing::error!("Unexpected error in response channel: {err}"); - Err(jsonrpsee::core::Error::Custom("Internal server error".into())) + Err(jsonrpsee::core::client::Error::Custom("Internal server error".into())) } } } diff --git a/src/extensions/client/health.rs b/src/extensions/client/health.rs index d7224f0..c69adc1 100644 --- a/src/extensions/client/health.rs +++ b/src/extensions/client/health.rs @@ -69,12 +69,12 @@ impl Health { } } - pub fn on_error(&self, err: &jsonrpsee::core::Error) { + pub fn on_error(&self, err: &jsonrpsee::core::client::Error) { match err { - jsonrpsee::core::Error::Call(_) => { + jsonrpsee::core::client::Error::Call(_) => { // NOT SERVER ERROR } - jsonrpsee::core::Error::RequestTimeout => { + jsonrpsee::core::client::Error::RequestTimeout => { tracing::warn!("{:?} request timeout", self.url); self.update(Event::RequestTimeout); } diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index af0a93b..865be01 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -6,9 +6,12 @@ use std::{ use anyhow::anyhow; use async_trait::async_trait; -use futures::FutureExt as Boxed; +use futures::FutureExt as _; use garde::Validate; -use jsonrpsee::core::{client::Subscription, Error, JsonValue}; +use jsonrpsee::core::{ + client::{Error, Subscription}, + JsonValue, +}; use jsonrpsee::ws_client::WsClientBuilder; use opentelemetry::trace::FutureExt; use rand::{seq::SliceRandom, thread_rng}; diff --git a/src/extensions/mod.rs b/src/extensions/mod.rs index babb055..6bd671d 100644 --- a/src/extensions/mod.rs +++ b/src/extensions/mod.rs @@ -13,6 +13,7 @@ pub mod cache; pub mod client; pub mod event_bus; pub mod merge_subscription; +pub mod prometheus; pub mod rate_limit; pub mod server; pub mod telemetry; @@ -142,5 +143,6 @@ define_all_extensions! { server: server::SubwayServerBuilder, event_bus: event_bus::EventBus, rate_limit: rate_limit::RateLimitBuilder, + prometheus: prometheus::Prometheus, validator: validator::Validator, } diff --git a/src/extensions/prometheus/mod.rs b/src/extensions/prometheus/mod.rs new file mode 100644 index 0000000..2bfac00 --- /dev/null +++ b/src/extensions/prometheus/mod.rs @@ -0,0 +1,93 @@ +mod rpc_metrics; + +use super::{Extension, ExtensionRegistry}; +use async_trait::async_trait; +use serde::Deserialize; +use std::iter; +use std::net::SocketAddr; +use substrate_prometheus_endpoint::init_prometheus; +use substrate_prometheus_endpoint::Registry; +use tokio::task::JoinHandle; + +use crate::utils::TypeRegistryRef; +pub use rpc_metrics::RpcMetrics; + +pub async fn get_rpc_metrics(registry: &TypeRegistryRef) -> RpcMetrics { + let prometheus = registry.read().await.get::(); + + match prometheus { + None => RpcMetrics::noop(), + Some(prom) => prom.rpc_metrics(), + } +} + +pub struct Prometheus { + registry: Registry, + rpc_metrics: RpcMetrics, + exporter_task: JoinHandle<()>, +} + +impl Drop for Prometheus { + fn drop(&mut self) { + self.exporter_task.abort(); + } +} + +#[derive(Deserialize, Debug, Clone)] +pub struct PrometheusConfig { + pub port: u16, + pub listen_address: String, + pub prefix: Option, + pub chain_label: Option, +} + +#[async_trait] +impl Extension for Prometheus { + type Config = PrometheusConfig; + + async fn from_config(config: &Self::Config, _registry: &ExtensionRegistry) -> Result { + Ok(Self::new(config.clone())) + } +} + +impl Prometheus { + pub fn new(config: PrometheusConfig) -> Self { + let labels = config + .chain_label + .clone() + .map(|l| iter::once(("chain".to_string(), l.clone())).collect()); + + // make sure the prefix is not an Option of Some empty string + let prefix = match config.prefix { + Some(p) if p.is_empty() => None, + p => p, + }; + let registry = Registry::new_custom(prefix, labels) + .expect("It can't fail, we make sure the `prefix` is either `None` or `Some` of non-empty string"); + let rpc_metrics = RpcMetrics::new(®istry); + + let exporter_task = start_prometheus_exporter(registry.clone(), config.port, config.listen_address); + Self { + registry, + exporter_task, + rpc_metrics, + } + } + + pub fn registry(&self) -> &Registry { + &self.registry + } + + pub fn rpc_metrics(&self) -> RpcMetrics { + self.rpc_metrics.clone() + } +} + +fn start_prometheus_exporter(registry: Registry, port: u16, listen_address: String) -> JoinHandle<()> { + let address = listen_address.parse().expect("Invalid prometheus listen address"); + let addr = SocketAddr::new(address, port); + + tokio::spawn(async move { + init_prometheus(addr, registry).await.expect("Init prometeus failed"); + }) +} diff --git a/src/extensions/prometheus/rpc_metrics.rs b/src/extensions/prometheus/rpc_metrics.rs new file mode 100644 index 0000000..975e113 --- /dev/null +++ b/src/extensions/prometheus/rpc_metrics.rs @@ -0,0 +1,115 @@ +use substrate_prometheus_endpoint::{register, Counter, CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64}; + +#[derive(Clone)] +pub enum RpcMetrics { + Prometheus(InnerMetrics), + Noop, +} + +impl RpcMetrics { + pub fn new(registry: &Registry) -> Self { + Self::Prometheus(InnerMetrics::new(registry)) + } + + pub fn noop() -> Self { + Self::Noop + } + + pub fn ws_open(&self) { + if let Self::Prometheus(inner) = self { + inner.ws_open(); + } + } + + pub fn ws_closed(&self) { + if let Self::Prometheus(inner) = self { + inner.ws_closed(); + } + } + + pub fn cache_query(&self, method: &str) { + if let Self::Prometheus(inner) = self { + inner.cache_query(method); + } + } + pub fn cache_miss(&self, method: &str) { + if let Self::Prometheus(inner) = self { + inner.cache_miss(method); + } + } + + pub fn call_metrics(&self) -> Option<(HistogramVec, CounterVec, CounterVec)> { + if let Self::Prometheus(inner) = self { + return Some(( + inner.call_times.clone(), + inner.calls_started.clone(), + inner.calls_finished.clone(), + )); + } + + None + } +} + +#[derive(Clone)] +pub struct InnerMetrics { + open_session_count: Counter, + closed_session_count: Counter, + cache_query_counter: CounterVec, + cache_miss_counter: CounterVec, + call_times: HistogramVec, + calls_started: CounterVec, + calls_finished: CounterVec, +} + +impl InnerMetrics { + fn new(registry: &Registry) -> Self { + let open_counter = Counter::new("open_ws_counter", "No help").unwrap(); + let closed_counter = Counter::new("closed_ws_counter", "No help").unwrap(); + let cache_miss_counter = CounterVec::new(Opts::new("cache_miss_counter", "No help"), &["method"]).unwrap(); + let cache_query_counter = CounterVec::new(Opts::new("cache_query_counter", "No help"), &["method"]).unwrap(); + let call_times = + HistogramVec::new(HistogramOpts::new("rpc_calls_time", "No help"), &["protocol", "method"]).unwrap(); + let calls_started_counter = + CounterVec::new(Opts::new("rpc_calls_started", "No help"), &["protocol", "method"]).unwrap(); + let calls_finished_counter = CounterVec::new( + Opts::new("rpc_calls_finished", "No help"), + &["protocol", "method", "is_error"], + ) + .unwrap(); + + let open_session_count = register(open_counter, registry).unwrap(); + let closed_session_count = register(closed_counter, registry).unwrap(); + let cache_query_counter = register(cache_query_counter, registry).unwrap(); + let cache_miss_counter = register(cache_miss_counter, registry).unwrap(); + + let call_times = register(call_times, registry).unwrap(); + let calls_started = register(calls_started_counter, registry).unwrap(); + let calls_finished = register(calls_finished_counter, registry).unwrap(); + + Self { + cache_miss_counter, + cache_query_counter, + open_session_count, + closed_session_count, + calls_started, + calls_finished, + call_times, + } + } + fn ws_open(&self) { + self.open_session_count.inc(); + } + + fn ws_closed(&self) { + self.closed_session_count.inc(); + } + + fn cache_query(&self, method: &str) { + self.cache_query_counter.with_label_values(&[method]).inc(); + } + + fn cache_miss(&self, method: &str) { + self.cache_miss_counter.with_label_values(&[method]).inc(); + } +} diff --git a/src/extensions/rate_limit/connection.rs b/src/extensions/rate_limit/connection.rs index 0bd74d4..ccc2f3e 100644 --- a/src/extensions/rate_limit/connection.rs +++ b/src/extensions/rate_limit/connection.rs @@ -88,7 +88,8 @@ where #[cfg(test)] mod tests { use super::*; - use jsonrpsee::types::{Id, ResponsePayload}; + use jsonrpsee::types::Id; + use jsonrpsee::ResponsePayload; #[derive(Clone)] struct MockService; @@ -96,7 +97,7 @@ mod tests { type Future = BoxFuture<'static, MethodResponse>; fn call(&self, req: Request<'static>) -> Self::Future { - async move { MethodResponse::response(req.id, ResponsePayload::result("ok"), 1024) }.boxed() + async move { MethodResponse::response(req.id, ResponsePayload::success("ok"), 1024) }.boxed() } } diff --git a/src/extensions/server/mod.rs b/src/extensions/server/mod.rs index f052f0a..862cee3 100644 --- a/src/extensions/server/mod.rs +++ b/src/extensions/server/mod.rs @@ -4,21 +4,30 @@ use hyper::server::conn::AddrStream; use hyper::service::Service; use hyper::service::{make_service_fn, service_fn}; use jsonrpsee::server::{ - middleware::rpc::RpcServiceBuilder, stop_channel, RandomStringIdProvider, RpcModule, ServerBuilder, ServerHandle, + middleware::rpc::RpcServiceBuilder, stop_channel, ws, RandomStringIdProvider, RpcModule, ServerBuilder, + ServerHandle, }; use jsonrpsee::Methods; + use serde::ser::StdError; use serde::Deserialize; + use std::str::FromStr; use std::sync::Arc; use std::{future::Future, net::SocketAddr}; +use tower::layer::layer_fn; use tower::ServiceBuilder; use tower_http::cors::{AllowOrigin, CorsLayer}; use super::{Extension, ExtensionRegistry}; use crate::extensions::rate_limit::{MethodWeights, RateLimitBuilder, XFF}; +pub use prometheus::Protocol; +mod prometheus; mod proxy_get_request; + +use crate::extensions::prometheus::RpcMetrics; +use crate::extensions::server::prometheus::PrometheusService; use proxy_get_request::{ProxyGetRequestLayer, ProxyGetRequestMethod}; pub struct SubwayServerBuilder { @@ -98,6 +107,7 @@ impl SubwayServerBuilder { &self, rate_limit_builder: Option>, rpc_method_weights: MethodWeights, + rpc_metrics: RpcMetrics, rpc_module_builder: impl FnOnce() -> Fut, ) -> anyhow::Result<(SocketAddr, ServerHandle)> { let config = self.config.clone(); @@ -130,14 +140,20 @@ impl SubwayServerBuilder { let stop_handle = stop_handle.clone(); let rate_limit_builder = rate_limit_builder.clone(); let rpc_method_weights = rpc_method_weights.clone(); + let rpc_metrics = rpc_metrics.clone(); async move { // service_fn handle each request Ok::<_, Box>(service_fn(move |req| { + let is_websocket = ws::is_upgrade_request(&req); + let protocol = if is_websocket { Protocol::Ws } else { Protocol::Http }; + let mut socket_ip = socket_ip.clone(); let methods: Methods = rpc_module.clone().into(); let stop_handle = stop_handle.clone(); let http_middleware = http_middleware.clone(); + let rpc_metrics = rpc_metrics.clone(); + let call_metrics = rpc_metrics.call_metrics(); if let Some(true) = rate_limit_builder.as_ref().map(|r| r.use_xff()) { socket_ip = req.xxf_ip().unwrap_or(socket_ip); @@ -153,6 +169,11 @@ impl SubwayServerBuilder { rate_limit_builder .as_ref() .and_then(|r| r.connection_limit(rpc_method_weights.clone())), + ) + .option_layer( + call_metrics + .as_ref() + .map(|(a, b, c)| layer_fn(|s| PrometheusService::new(s, protocol, a, b, c))), ); let service_builder = ServerBuilder::default() @@ -163,6 +184,15 @@ impl SubwayServerBuilder { .to_service_builder(); let mut service = service_builder.build(methods, stop_handle); + + if is_websocket { + let on_ws_close = service.on_session_closed(); + rpc_metrics.ws_open(); + tokio::spawn(async move { + on_ws_close.await; + rpc_metrics.ws_closed(); + }); + } service.call(req) })) } diff --git a/src/extensions/server/prometheus.rs b/src/extensions/server/prometheus.rs new file mode 100644 index 0000000..31967fc --- /dev/null +++ b/src/extensions/server/prometheus.rs @@ -0,0 +1,81 @@ +use futures::{future::BoxFuture, FutureExt}; +use jsonrpsee::server::middleware::rpc::RpcServiceT; +use jsonrpsee::types::Request; +use jsonrpsee::MethodResponse; +use substrate_prometheus_endpoint::{CounterVec, HistogramVec, U64}; + +use std::fmt::Display; + +#[derive(Clone, Copy)] +pub enum Protocol { + Ws, + Http, +} + +impl Display for Protocol { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let str = match self { + Self::Ws => "ws", + Self::Http => "http", + }; + write!(f, "{}", str) + } +} + +#[derive(Clone)] +pub struct PrometheusService { + inner: S, + protocol: Protocol, + call_times: HistogramVec, + calls_started: CounterVec, + calls_finished: CounterVec, +} + +impl PrometheusService { + pub fn new( + inner: S, + protocol: Protocol, + call_times: &HistogramVec, + calls_started: &CounterVec, + calls_finished: &CounterVec, + ) -> Self { + Self { + inner, + protocol, + calls_started: calls_started.clone(), + calls_finished: calls_finished.clone(), + call_times: call_times.clone(), + } + } +} + +impl<'a, S> RpcServiceT<'a> for PrometheusService +where + S: RpcServiceT<'a> + Send + Sync + Clone + 'static, +{ + type Future = BoxFuture<'a, MethodResponse>; + + fn call(&self, req: Request<'a>) -> Self::Future { + let protocol = self.protocol.to_string(); + let method = req.method.to_string(); + + let histogram = self.call_times.with_label_values(&[&protocol, &method]); + let started = self.calls_started.with_label_values(&[&protocol, &method]); + let finished = self.calls_finished.clone(); + + let service = self.inner.clone(); + async move { + started.inc(); + + let timer = histogram.start_timer(); + let res = service.call(req).await; + timer.stop_and_record(); + finished + .with_label_values(&[&protocol, &method, &res.is_error().to_string()]) + .inc(); + + res + } + .boxed() + } +} diff --git a/src/extensions/server/proxy_get_request.rs b/src/extensions/server/proxy_get_request.rs index 638fb97..ac5a3be 100644 --- a/src/extensions/server/proxy_get_request.rs +++ b/src/extensions/server/proxy_get_request.rs @@ -31,7 +31,7 @@ use hyper::header::{ACCEPT, CONTENT_TYPE}; use hyper::http::HeaderValue; use hyper::{Body, Method, Request, Response, Uri}; use jsonrpsee::{ - core::Error as RpcError, + core::client::Error as RpcError, types::{Id, RequestSer}, }; use std::collections::HashMap; @@ -198,7 +198,7 @@ mod response { } /// Create a response for json internal error. pub(crate) fn internal_error() -> hyper::Response { - let err = ResponsePayload::error(ErrorObjectOwned::from(ErrorCode::InternalError)); + let err = ResponsePayload::<()>::error(ErrorObjectOwned::from(ErrorCode::InternalError)); let rp = Response::new(err, Id::Null); let error = serde_json::to_string(&rp).expect("built from known-good data; qed"); diff --git a/src/middlewares/methods/block_tag.rs b/src/middlewares/methods/block_tag.rs index 2f28e26..00aecd7 100644 --- a/src/middlewares/methods/block_tag.rs +++ b/src/middlewares/methods/block_tag.rs @@ -22,9 +22,7 @@ impl MiddlewareBuilder for BlockTagMiddlewar method: &RpcMethod, extensions: &TypeRegistryRef, ) -> Option>> { - let Some(index) = method.params.iter().position(|p| p.ty == "BlockTag" && p.inject) else { - return None; - }; + let index = method.params.iter().position(|p| p.ty == "BlockTag" && p.inject)?; let eth_api = extensions .read() diff --git a/src/middlewares/methods/cache.rs b/src/middlewares/methods/cache.rs index 4a3eba1..85c0e63 100644 --- a/src/middlewares/methods/cache.rs +++ b/src/middlewares/methods/cache.rs @@ -5,6 +5,7 @@ use blake2::Blake2b512; use futures::FutureExt as _; use opentelemetry::trace::FutureExt; +use crate::extensions::prometheus::{get_rpc_metrics, RpcMetrics}; use crate::{ config::CacheParams, extensions::cache::Cache as CacheExtension, @@ -16,11 +17,12 @@ pub struct BypassCache(pub bool); pub struct CacheMiddleware { cache: Cache, + metrics: RpcMetrics, } impl CacheMiddleware { - pub fn new(cache: Cache) -> Self { - Self { cache } + pub fn new(cache: Cache, metrics: RpcMetrics) -> Self { + Self { cache, metrics } } } @@ -36,6 +38,8 @@ impl MiddlewareBuilder for CacheMiddleware { .get::() .expect("Cache extension not found"); + let metrics = get_rpc_metrics(extensions).await; + // do not cache if size is 0, otherwise use default size let size = match method.cache { Some(CacheParams { size: Some(0), .. }) => return None, @@ -57,7 +61,7 @@ impl MiddlewareBuilder for CacheMiddleware { ttl_seconds.map(std::time::Duration::from_secs), ); - Some(Box::new(Self::new(cache))) + Some(Box::new(Self::new(cache, metrics))) } } @@ -75,11 +79,21 @@ impl Middleware for CacheMiddleware { return next(request, context).await; } + let metrics = self.metrics.clone(); let key = CacheKey::::new(&request.method, &request.params); + let method = request.method.to_string(); + metrics.cache_query(&method); + let result = self .cache - .get_or_insert_with(key.clone(), || next(request, context).boxed()) + .get_or_insert_with(key.clone(), || { + async move { + metrics.cache_miss(&method); + next(request, context).await + } + .boxed() + }) .await; if let Ok(ref value) = result { @@ -110,7 +124,7 @@ mod tests { #[tokio::test] async fn handle_ok_resp() { let cache = Cache::new(NonZeroUsize::try_from(1).unwrap(), None); - let middleware = CacheMiddleware::new(cache.clone()); + let middleware = CacheMiddleware::new(cache.clone(), RpcMetrics::noop()); let res = middleware .call( @@ -187,7 +201,7 @@ mod tests { #[tokio::test] async fn should_not_cache_null() { - let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None)); + let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None), RpcMetrics::noop()); let res = middleware .call( @@ -214,10 +228,10 @@ mod tests { #[tokio::test] async fn cache_ttl_works() { - let middleware = CacheMiddleware::new(Cache::new( - NonZeroUsize::new(1).unwrap(), - Some(Duration::from_millis(10)), - )); + let middleware = CacheMiddleware::new( + Cache::new(NonZeroUsize::new(1).unwrap(), Some(Duration::from_millis(10))), + RpcMetrics::noop(), + ); let res = middleware .call( @@ -257,7 +271,7 @@ mod tests { #[tokio::test] async fn bypass_cache() { - let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None)); + let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None), RpcMetrics::noop()); let res = middleware .call( @@ -300,7 +314,7 @@ mod tests { #[tokio::test] async fn avoid_repeated_requests() { - let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None)); + let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None), RpcMetrics::noop()); let (tx, mut rx) = tokio::sync::mpsc::channel(1); let res = middleware.call( diff --git a/src/middlewares/methods/inject_params.rs b/src/middlewares/methods/inject_params.rs index a23d635..bbfc0f1 100644 --- a/src/middlewares/methods/inject_params.rs +++ b/src/middlewares/methods/inject_params.rs @@ -45,9 +45,7 @@ impl MiddlewareBuilder for InjectParamsMiddl method: &RpcMethod, extensions: &TypeRegistryRef, ) -> Option>> { - let Some(inject_type) = inject_type(&method.params) else { - return None; - }; + let inject_type = inject_type(&method.params)?; let api = extensions .read() diff --git a/src/middlewares/subscriptions/merge_subscription.rs b/src/middlewares/subscriptions/merge_subscription.rs index cbc20f1..02da0be 100644 --- a/src/middlewares/subscriptions/merge_subscription.rs +++ b/src/middlewares/subscriptions/merge_subscription.rs @@ -88,7 +88,7 @@ impl MergeSubscriptionMiddleware { unsubscribe: String, ) -> Result< Box broadcast::Receiver + Sync + Send + 'static>, - jsonrpsee::core::Error, + jsonrpsee::core::client::Error, > { if let Some(tx) = self.upstream_subs.read().await.get(&key).cloned() { tracing::trace!("Found existing upstream subscription for {}", &subscribe); @@ -176,9 +176,7 @@ impl MiddlewareBuilder method: &RpcSubscription, extensions: &TypeRegistryRef, ) -> Option>> { - let Some(merge_strategy) = method.merge_strategy else { - return None; - }; + let merge_strategy = method.merge_strategy?; let ext = extensions.read().await; let client = ext.get::().expect("Client extension not found"); diff --git a/src/server.rs b/src/server.rs index b43bc6a..4776b26 100644 --- a/src/server.rs +++ b/src/server.rs @@ -13,6 +13,7 @@ use serde_json::json; use crate::{ config::Config, extensions::{ + prometheus::get_rpc_metrics, rate_limit::{MethodWeights, RateLimitBuilder}, server::SubwayServerBuilder, }, @@ -52,9 +53,11 @@ pub async fn build(config: Config) -> anyhow::Result { let request_timeout_seconds = server_builder.config.request_timeout_seconds; + let metrics = get_rpc_metrics(&extensions_registry).await; + let registry = extensions_registry.clone(); let (addr, handle) = server_builder - .build(rate_limit_builder, rpc_method_weights, move || async move { + .build(rate_limit_builder, rpc_method_weights, metrics, move || async move { let mut module = RpcModule::new(()); let tracer = telemetry::Tracer::new("server"); @@ -97,7 +100,7 @@ pub async fn build(config: Config) -> anyhow::Result { let result = result_rx .await - .map_err(|_| errors::map_error(jsonrpsee::core::Error::RequestTimeout)); + .map_err(|_| errors::map_error(jsonrpsee::core::client::Error::RequestTimeout)); match result.as_ref() { Ok(Ok(_)) => tracer.span_ok(), @@ -172,7 +175,7 @@ pub async fn build(config: Config) -> anyhow::Result { let result = result_rx .await - .map_err(|_| errors::map_error(jsonrpsee::core::Error::RequestTimeout))?; + .map_err(|_| errors::map_error(jsonrpsee::core::client::Error::RequestTimeout))?; match result.as_ref() { Ok(_) => { diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 2fe38c8..87d8ca9 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -29,8 +29,8 @@ pub mod errors { ErrorObjectOwned::owned(INTERNAL_ERROR_CODE, INTERNAL_ERROR_MSG, Some(msg.to_string())) } - pub fn map_error(err: jsonrpsee::core::Error) -> ErrorObjectOwned { - use jsonrpsee::core::Error::*; + pub fn map_error(err: jsonrpsee::core::client::Error) -> ErrorObjectOwned { + use jsonrpsee::core::client::Error::*; match err { Call(e) => e, x => internal_error(x),