From bc93764b922c2e657864726140be351c979f2b03 Mon Sep 17 00:00:00 2001 From: Jan Koscisz Date: Fri, 10 May 2024 12:26:04 +0200 Subject: [PATCH 1/9] Update jsonrpsee --- Cargo.lock | 258 ++++++++++-------- Cargo.toml | 2 +- src/extensions/client/endpoint.rs | 12 +- src/extensions/client/health.rs | 6 +- src/extensions/client/mod.rs | 7 +- src/extensions/mod.rs | 1 - src/extensions/server/proxy_get_request.rs | 4 +- .../subscriptions/merge_subscription.rs | 6 +- src/server.rs | 4 +- src/utils/mod.rs | 4 +- 10 files changed, 172 insertions(+), 132 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 45faf00..0a7081c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -157,18 +157,7 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" dependencies = [ - "event-listener 2.5.3", -] - -[[package]] -name = "async-lock" -version = "3.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d034b430882f8381900d3fe6f0aaa3ad94f2cb4ac519b429692a1bc2dda4ae7b" -dependencies = [ - "event-listener 4.0.3", - "event-listener-strategy", - "pin-project-lite", + "event-listener", ] [[package]] @@ -190,7 +179,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn", ] [[package]] @@ -201,7 +190,7 @@ checksum = "461abc97219de0eaaf81fe3ef974a540158f3d079c2ab200f891f1a2ef201e85" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn", ] [[package]] @@ -282,6 +271,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "beef" version = "0.5.2" @@ -587,7 +582,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.53", + "syn", ] [[package]] @@ -616,15 +611,6 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "concurrent-queue" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "console-api" version = "0.6.0" @@ -876,7 +862,7 @@ checksum = "5c785274071b1b420972453b306eeca06acf4633829db4223b58a2a8c5953bc4" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn", ] [[package]] @@ -910,27 +896,6 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" -[[package]] -name = "event-listener" -version = "4.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e" -dependencies = [ - "concurrent-queue", - "parking", - "pin-project-lite", -] - -[[package]] -name = "event-listener-strategy" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" -dependencies = [ - "event-listener 4.0.3", - "pin-project-lite", -] - [[package]] name = "fake-simd" version = "0.1.2" @@ -1053,7 +1018,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn", ] [[package]] @@ -1124,7 +1089,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn 2.0.53", + "syn", ] [[package]] @@ -1195,9 +1160,9 @@ dependencies = [ [[package]] name = "gloo-net" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ac9e8288ae2c632fa9f8657ac70bfe38a1530f345282d7ba66a1f70b72b7dc4" +checksum = "43aaa242d1239a8822c15c645f02166398da4f8b5c4bae795c1f5b44e9eee173" dependencies = [ "futures-channel", "futures-core", @@ -1407,10 +1372,10 @@ dependencies = [ "http", "hyper", "log", - "rustls", - "rustls-native-certs", + "rustls 0.21.10", + "rustls-native-certs 0.6.3", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", ] [[package]] @@ -1680,7 +1645,9 @@ dependencies = [ [[package]] name = "jsonrpsee" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfdb12a2381ea5b2e68c3469ec604a007b367778cdb14d09612c8069ebd616ad" dependencies = [ "jsonrpsee-client-transport", "jsonrpsee-core", @@ -1696,7 +1663,9 @@ dependencies = [ [[package]] name = "jsonrpsee-client-transport" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4978087a58c3ab02efc5b07c5e5e2803024536106fd5506f558db172c889b3aa" dependencies = [ "futures-channel", "futures-util", @@ -1704,11 +1673,12 @@ dependencies = [ "http", "jsonrpsee-core", "pin-project", - "rustls-native-certs", + "rustls-native-certs 0.7.0", + "rustls-pki-types", "soketto", "thiserror", "tokio", - "tokio-rustls", + "tokio-rustls 0.25.0", "tokio-util 0.7.10", "tracing", "url", @@ -1717,10 +1687,11 @@ dependencies = [ [[package]] name = "jsonrpsee-core" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4b257e1ec385e07b0255dde0b933f948b5c8b8c28d42afda9587c3a967b896d" dependencies = [ "anyhow", - "async-lock 3.3.0", "async-trait", "beef", "futures-timer", @@ -1728,20 +1699,23 @@ dependencies = [ "hyper", "jsonrpsee-types", "parking_lot 0.12.1", + "pin-project", "rand 0.8.5", "rustc-hash", "serde", "serde_json", - "soketto", "thiserror", "tokio", + "tokio-stream", "tracing", "wasm-bindgen-futures", ] [[package]] name = "jsonrpsee-http-client" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ccf93fc4a0bfe05d851d37d7c32b7f370fe94336b52a2f0efc5f1981895c2e5" dependencies = [ "async-trait", "hyper", @@ -1759,18 +1733,22 @@ dependencies = [ [[package]] name = "jsonrpsee-proc-macros" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d0bb047e79a143b32ea03974a6bf59b62c2a4c5f5d42a381c907a8bbb3f75c0" dependencies = [ "heck 0.4.1", "proc-macro-crate", "proc-macro2", "quote", - "syn 1.0.109", + "syn", ] [[package]] name = "jsonrpsee-server" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12d8b6a9674422a8572e0b0abb12feeb3f2aeda86528c80d0350c2bd0923ab41" dependencies = [ "futures-util", "http", @@ -1792,19 +1770,22 @@ dependencies = [ [[package]] name = "jsonrpsee-types" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "150d6168405890a7a3231a3c74843f58b8959471f6df76078db2619ddee1d07d" dependencies = [ "anyhow", "beef", "serde", "serde_json", "thiserror", - "tracing", ] [[package]] name = "jsonrpsee-wasm-client" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f448d8eacd945cc17b6c0b42c361531ca36a962ee186342a97cdb8fca679cd77" dependencies = [ "jsonrpsee-client-transport", "jsonrpsee-core", @@ -1813,7 +1794,9 @@ dependencies = [ [[package]] name = "jsonrpsee-ws-client" -version = "0.20.0" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58b9db2dfd5bb1194b0ce921504df9ceae210a345bc2f6c5a61432089bbab070" dependencies = [ "http", "jsonrpsee-client-transport", @@ -2017,7 +2000,7 @@ version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1911e88d5831f748a4097a43862d129e3c6fca831eecac9b8db6d01d93c9de2" dependencies = [ - "async-lock 2.8.0", + "async-lock", "async-trait", "crossbeam-channel", "crossbeam-epoch", @@ -2301,12 +2284,6 @@ dependencies = [ "url", ] -[[package]] -name = "parking" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" - [[package]] name = "parking_lot" version = "0.11.2" @@ -2376,7 +2353,7 @@ dependencies = [ "bincode", "either", "fnv", - "itertools 0.10.5", + "itertools 0.11.0", "lazy_static", "nom", "quick-xml 0.31.0", @@ -2405,7 +2382,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn", ] [[package]] @@ -2484,11 +2461,10 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro-crate" -version = "2.0.2" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b00f26d3400549137f92511a46ac1cd8ce37cb5598a96d382381458b992a5d24" +checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" dependencies = [ - "toml_datetime", "toml_edit", ] @@ -2521,7 +2497,7 @@ dependencies = [ "itertools 0.11.0", "proc-macro2", "quote", - "syn 2.0.53", + "syn", ] [[package]] @@ -2904,10 +2880,24 @@ checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" dependencies = [ "log", "ring", - "rustls-webpki", + "rustls-webpki 0.101.7", "sct", ] +[[package]] +name = "rustls" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +dependencies = [ + "log", + "ring", + "rustls-pki-types", + "rustls-webpki 0.102.3", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -2915,7 +2905,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 1.0.4", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-native-certs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.2", + "rustls-pki-types", "schannel", "security-framework", ] @@ -2929,6 +2932,22 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +dependencies = [ + "base64 0.22.1", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -2939,6 +2958,17 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustls-webpki" +version = "0.102.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3bce581c0dd41bce533ce695a1437fa16a7ab5ac3ccfa99fe1a620a7885eabf" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -3040,7 +3070,7 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn", ] [[package]] @@ -3230,7 +3260,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.53", + "syn", ] [[package]] @@ -3303,17 +3333,6 @@ dependencies = [ "symbolic-common", ] -[[package]] -name = "syn" -version = "1.0.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - [[package]] name = "syn" version = "2.0.53" @@ -3387,7 +3406,7 @@ checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn", ] [[package]] @@ -3485,7 +3504,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn", ] [[package]] @@ -3494,7 +3513,18 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls", + "rustls 0.21.10", + "tokio", +] + +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.4", + "rustls-pki-types", "tokio", ] @@ -3507,6 +3537,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util 0.7.10", ] [[package]] @@ -3540,15 +3571,15 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.3" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" +checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" [[package]] name = "toml_edit" -version = "0.20.2" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "396e4d48bbb2b7554c944bde63101b5ae446cff6ec4a24227428f15eb72ef338" +checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ "indexmap 2.2.5", "toml_datetime", @@ -3665,7 +3696,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn", ] [[package]] @@ -3882,7 +3913,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.53", + "syn", "wasm-bindgen-shared", ] @@ -3916,7 +3947,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3939,9 +3970,12 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.25.4" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009" +dependencies = [ + "rustls-pki-types", +] [[package]] name = "winapi" @@ -4173,9 +4207,15 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn", ] +[[package]] +name = "zeroize" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" + [[package]] name = "zstd" version = "0.13.0" diff --git a/Cargo.toml b/Cargo.toml index 7e9b72b..dd28c3b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ tracing-serde = "0.1.3" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } garde = { version = "0.18", features = ["full"] } -jsonrpsee = { path = "./vendor/jsonrpsee/jsonrpsee", features = ["full"] } +jsonrpsee = { version = "0.22.1",features = ["full"] } governor = { path = "./vendor/governor/governor" } [dev-dependencies] diff --git a/src/extensions/client/endpoint.rs b/src/extensions/client/endpoint.rs index 3a212d3..ed37ec7 100644 --- a/src/extensions/client/endpoint.rs +++ b/src/extensions/client/endpoint.rs @@ -113,7 +113,7 @@ impl Endpoint { method: &str, params: Vec, timeout: Duration, - ) -> Result { + ) -> Result { let client = self .client_rx .borrow() @@ -128,8 +128,8 @@ impl Endpoint { } Err(_) => { tracing::error!("request timed out method: {method} params: {params:?}"); - self.health.on_error(&jsonrpsee::core::Error::RequestTimeout); - Err(jsonrpsee::core::Error::RequestTimeout) + self.health.on_error(&jsonrpsee::core::client::Error::RequestTimeout); + Err(jsonrpsee::core::client::Error::RequestTimeout) } } } @@ -140,7 +140,7 @@ impl Endpoint { params: Vec, unsubscribe_method: &str, timeout: Duration, - ) -> Result, jsonrpsee::core::Error> { + ) -> Result, jsonrpsee::core::client::Error> { let client = self .client_rx .borrow() @@ -160,8 +160,8 @@ impl Endpoint { } Err(_) => { tracing::error!("subscribe timed out subscribe: {subscribe_method} params: {params:?}"); - self.health.on_error(&jsonrpsee::core::Error::RequestTimeout); - Err(jsonrpsee::core::Error::RequestTimeout) + self.health.on_error(&jsonrpsee::core::client::Error::RequestTimeout); + Err(jsonrpsee::core::client::Error::RequestTimeout) } } } diff --git a/src/extensions/client/health.rs b/src/extensions/client/health.rs index 7dd5612..6018847 100644 --- a/src/extensions/client/health.rs +++ b/src/extensions/client/health.rs @@ -76,12 +76,12 @@ impl Health { } } - pub fn on_error(&self, err: &jsonrpsee::core::Error) { + pub fn on_error(&self, err: &jsonrpsee::core::client::Error) { match err { - jsonrpsee::core::Error::Call(_) => { + jsonrpsee::core::client::Error::Call(_) => { // NOT SERVER ERROR } - jsonrpsee::core::Error::RequestTimeout => { + jsonrpsee::core::client::Error::RequestTimeout => { tracing::warn!("Endpoint {:?} request timeout", self.url); self.update(Event::RequestTimeout); } diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index 837c17f..e0f7f24 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -5,9 +5,12 @@ use std::{ use anyhow::anyhow; use async_trait::async_trait; -use futures::FutureExt as Boxed; +use futures::FutureExt as _; use garde::Validate; -use jsonrpsee::core::{client::Subscription, Error, JsonValue}; +use jsonrpsee::core::{ + client::{Error, Subscription}, + JsonValue, +}; use jsonrpsee::ws_client::WsClientBuilder; use opentelemetry::trace::FutureExt; use rand::{seq::SliceRandom, thread_rng}; diff --git a/src/extensions/mod.rs b/src/extensions/mod.rs index babb055..ee4436e 100644 --- a/src/extensions/mod.rs +++ b/src/extensions/mod.rs @@ -134,7 +134,6 @@ macro_rules! define_all_extensions { define_all_extensions! { telemetry: telemetry::Telemetry, cache: cache::Cache, - #[garde(dive)] client: client::Client, merge_subscription: merge_subscription::MergeSubscription, substrate_api: api::SubstrateApi, diff --git a/src/extensions/server/proxy_get_request.rs b/src/extensions/server/proxy_get_request.rs index 638fb97..ac5a3be 100644 --- a/src/extensions/server/proxy_get_request.rs +++ b/src/extensions/server/proxy_get_request.rs @@ -31,7 +31,7 @@ use hyper::header::{ACCEPT, CONTENT_TYPE}; use hyper::http::HeaderValue; use hyper::{Body, Method, Request, Response, Uri}; use jsonrpsee::{ - core::Error as RpcError, + core::client::Error as RpcError, types::{Id, RequestSer}, }; use std::collections::HashMap; @@ -198,7 +198,7 @@ mod response { } /// Create a response for json internal error. pub(crate) fn internal_error() -> hyper::Response { - let err = ResponsePayload::error(ErrorObjectOwned::from(ErrorCode::InternalError)); + let err = ResponsePayload::<()>::error(ErrorObjectOwned::from(ErrorCode::InternalError)); let rp = Response::new(err, Id::Null); let error = serde_json::to_string(&rp).expect("built from known-good data; qed"); diff --git a/src/middlewares/subscriptions/merge_subscription.rs b/src/middlewares/subscriptions/merge_subscription.rs index cbc20f1..02da0be 100644 --- a/src/middlewares/subscriptions/merge_subscription.rs +++ b/src/middlewares/subscriptions/merge_subscription.rs @@ -88,7 +88,7 @@ impl MergeSubscriptionMiddleware { unsubscribe: String, ) -> Result< Box broadcast::Receiver + Sync + Send + 'static>, - jsonrpsee::core::Error, + jsonrpsee::core::client::Error, > { if let Some(tx) = self.upstream_subs.read().await.get(&key).cloned() { tracing::trace!("Found existing upstream subscription for {}", &subscribe); @@ -176,9 +176,7 @@ impl MiddlewareBuilder method: &RpcSubscription, extensions: &TypeRegistryRef, ) -> Option>> { - let Some(merge_strategy) = method.merge_strategy else { - return None; - }; + let merge_strategy = method.merge_strategy?; let ext = extensions.read().await; let client = ext.get::().expect("Client extension not found"); diff --git a/src/server.rs b/src/server.rs index b43bc6a..bfce414 100644 --- a/src/server.rs +++ b/src/server.rs @@ -97,7 +97,7 @@ pub async fn build(config: Config) -> anyhow::Result { let result = result_rx .await - .map_err(|_| errors::map_error(jsonrpsee::core::Error::RequestTimeout)); + .map_err(|_| errors::map_error(jsonrpsee::core::client::Error::RequestTimeout)); match result.as_ref() { Ok(Ok(_)) => tracer.span_ok(), @@ -172,7 +172,7 @@ pub async fn build(config: Config) -> anyhow::Result { let result = result_rx .await - .map_err(|_| errors::map_error(jsonrpsee::core::Error::RequestTimeout))?; + .map_err(|_| errors::map_error(jsonrpsee::core::client::Error::RequestTimeout))?; match result.as_ref() { Ok(_) => { diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 2fe38c8..87d8ca9 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -29,8 +29,8 @@ pub mod errors { ErrorObjectOwned::owned(INTERNAL_ERROR_CODE, INTERNAL_ERROR_MSG, Some(msg.to_string())) } - pub fn map_error(err: jsonrpsee::core::Error) -> ErrorObjectOwned { - use jsonrpsee::core::Error::*; + pub fn map_error(err: jsonrpsee::core::client::Error) -> ErrorObjectOwned { + use jsonrpsee::core::client::Error::*; match err { Call(e) => e, x => internal_error(x), From d4e587d3486f00a9f4e5f9ffe03c8f9c87728831 Mon Sep 17 00:00:00 2001 From: Jan Koscisz Date: Fri, 10 May 2024 12:32:36 +0200 Subject: [PATCH 2/9] Add prometheus metrics --- Cargo.lock | 28 ++++++ Cargo.toml | 1 + src/extensions/mod.rs | 2 + src/extensions/prometheus/mod.rs | 90 ++++++++++++++++++ src/extensions/prometheus/rpc_metrics.rs | 115 +++++++++++++++++++++++ src/extensions/server/mod.rs | 32 ++++++- src/extensions/server/prometheus.rs | 81 ++++++++++++++++ src/middlewares/methods/cache.rs | 38 +++++--- src/server.rs | 5 +- 9 files changed, 378 insertions(+), 14 deletions(-) create mode 100644 src/extensions/prometheus/mod.rs create mode 100644 src/extensions/prometheus/rpc_metrics.rs create mode 100644 src/extensions/server/prometheus.rs diff --git a/Cargo.lock b/Cargo.lock index 0a7081c..31a5782 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2477,6 +2477,20 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if 1.0.0", + "fnv", + "lazy_static", + "memchr", + "parking_lot 0.12.1", + "thiserror", +] + [[package]] name = "prost" version = "0.12.3" @@ -3263,6 +3277,19 @@ dependencies = [ "syn", ] +[[package]] +name = "substrate-prometheus-endpoint" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d8fe06b03b8a291c09507c42f92a2c2c10dd3d62975d02c7f64a92d87bfe09b" +dependencies = [ + "hyper", + "log", + "prometheus", + "thiserror", + "tokio", +] + [[package]] name = "subtle" version = "2.5.0" @@ -3302,6 +3329,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "substrate-prometheus-endpoint", "tokio", "tower", "tower-http", diff --git a/Cargo.toml b/Cargo.toml index dd28c3b..b3bb361 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ regex = "1.10.4" serde = "1.0.152" serde_json = "1.0.92" serde_yaml = "0.9.17" +substrate-prometheus-endpoint = "0.17.0" tokio = { version = "1.24.2", features = ["full"] } tower = { version = "0.4.13", features = ["full"] } tower-http = { version = "0.4", features = ["full"] } diff --git a/src/extensions/mod.rs b/src/extensions/mod.rs index ee4436e..aaf7cf5 100644 --- a/src/extensions/mod.rs +++ b/src/extensions/mod.rs @@ -13,6 +13,7 @@ pub mod cache; pub mod client; pub mod event_bus; pub mod merge_subscription; +pub mod prometheus; pub mod rate_limit; pub mod server; pub mod telemetry; @@ -141,5 +142,6 @@ define_all_extensions! { server: server::SubwayServerBuilder, event_bus: event_bus::EventBus, rate_limit: rate_limit::RateLimitBuilder, + prometheus: prometheus::Prometheus, validator: validator::Validator, } diff --git a/src/extensions/prometheus/mod.rs b/src/extensions/prometheus/mod.rs new file mode 100644 index 0000000..c0424d1 --- /dev/null +++ b/src/extensions/prometheus/mod.rs @@ -0,0 +1,90 @@ +mod rpc_metrics; + +use super::{Extension, ExtensionRegistry}; +use async_trait::async_trait; +use serde::Deserialize; +use std::iter; +use std::net::SocketAddr; +use substrate_prometheus_endpoint::init_prometheus; +use substrate_prometheus_endpoint::Registry; +use tokio::task::JoinHandle; + +use crate::utils::TypeRegistryRef; +pub use rpc_metrics::RpcMetrics; + +pub async fn get_rpc_metrics(registry: &TypeRegistryRef) -> RpcMetrics { + let prometheus = registry.read().await.get::(); + + match prometheus { + None => RpcMetrics::noop(), + Some(prom) => prom.rpc_metrics(), + } +} + +pub struct Prometheus { + pub registry: Registry, + rpc_metrics: RpcMetrics, + pub exporter_task: JoinHandle<()>, +} + +impl Drop for Prometheus { + fn drop(&mut self) { + self.exporter_task.abort(); + } +} + +#[derive(Deserialize, Debug, Clone)] +pub struct PrometheusConfig { + pub port: u16, + pub listen_address: String, + pub prefix: Option, + pub chain_label: Option, +} + +#[async_trait] +impl Extension for Prometheus { + type Config = PrometheusConfig; + + async fn from_config(config: &Self::Config, _registry: &ExtensionRegistry) -> Result { + Ok(Self::new(config.clone())) + } +} + +impl Prometheus { + pub fn new(config: PrometheusConfig) -> Self { + let labels = config + .chain_label + .clone() + .map(|l| iter::once(("chain".to_string(), l.clone())).collect()); + let prefix = match config.prefix { + Some(p) if p.is_empty() => None, + p => p, + }; + let registry = Registry::new_custom(prefix, labels).expect("Can't happen"); + let rpc_metrics = RpcMetrics::new(®istry); + + let exporter_task = start_prometheus_exporter(registry.clone(), config.port, config.listen_address); + Self { + registry, + exporter_task, + rpc_metrics, + } + } + + pub fn registry(&self) -> &Registry { + &self.registry + } + + pub fn rpc_metrics(&self) -> RpcMetrics { + self.rpc_metrics.clone() + } +} + +fn start_prometheus_exporter(registry: Registry, port: u16, listen_address: String) -> JoinHandle<()> { + let address = listen_address.parse().unwrap(); + let addr = SocketAddr::new(address, port); + + tokio::spawn(async move { + init_prometheus(addr, registry).await.unwrap(); + }) +} diff --git a/src/extensions/prometheus/rpc_metrics.rs b/src/extensions/prometheus/rpc_metrics.rs new file mode 100644 index 0000000..975e113 --- /dev/null +++ b/src/extensions/prometheus/rpc_metrics.rs @@ -0,0 +1,115 @@ +use substrate_prometheus_endpoint::{register, Counter, CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64}; + +#[derive(Clone)] +pub enum RpcMetrics { + Prometheus(InnerMetrics), + Noop, +} + +impl RpcMetrics { + pub fn new(registry: &Registry) -> Self { + Self::Prometheus(InnerMetrics::new(registry)) + } + + pub fn noop() -> Self { + Self::Noop + } + + pub fn ws_open(&self) { + if let Self::Prometheus(inner) = self { + inner.ws_open(); + } + } + + pub fn ws_closed(&self) { + if let Self::Prometheus(inner) = self { + inner.ws_closed(); + } + } + + pub fn cache_query(&self, method: &str) { + if let Self::Prometheus(inner) = self { + inner.cache_query(method); + } + } + pub fn cache_miss(&self, method: &str) { + if let Self::Prometheus(inner) = self { + inner.cache_miss(method); + } + } + + pub fn call_metrics(&self) -> Option<(HistogramVec, CounterVec, CounterVec)> { + if let Self::Prometheus(inner) = self { + return Some(( + inner.call_times.clone(), + inner.calls_started.clone(), + inner.calls_finished.clone(), + )); + } + + None + } +} + +#[derive(Clone)] +pub struct InnerMetrics { + open_session_count: Counter, + closed_session_count: Counter, + cache_query_counter: CounterVec, + cache_miss_counter: CounterVec, + call_times: HistogramVec, + calls_started: CounterVec, + calls_finished: CounterVec, +} + +impl InnerMetrics { + fn new(registry: &Registry) -> Self { + let open_counter = Counter::new("open_ws_counter", "No help").unwrap(); + let closed_counter = Counter::new("closed_ws_counter", "No help").unwrap(); + let cache_miss_counter = CounterVec::new(Opts::new("cache_miss_counter", "No help"), &["method"]).unwrap(); + let cache_query_counter = CounterVec::new(Opts::new("cache_query_counter", "No help"), &["method"]).unwrap(); + let call_times = + HistogramVec::new(HistogramOpts::new("rpc_calls_time", "No help"), &["protocol", "method"]).unwrap(); + let calls_started_counter = + CounterVec::new(Opts::new("rpc_calls_started", "No help"), &["protocol", "method"]).unwrap(); + let calls_finished_counter = CounterVec::new( + Opts::new("rpc_calls_finished", "No help"), + &["protocol", "method", "is_error"], + ) + .unwrap(); + + let open_session_count = register(open_counter, registry).unwrap(); + let closed_session_count = register(closed_counter, registry).unwrap(); + let cache_query_counter = register(cache_query_counter, registry).unwrap(); + let cache_miss_counter = register(cache_miss_counter, registry).unwrap(); + + let call_times = register(call_times, registry).unwrap(); + let calls_started = register(calls_started_counter, registry).unwrap(); + let calls_finished = register(calls_finished_counter, registry).unwrap(); + + Self { + cache_miss_counter, + cache_query_counter, + open_session_count, + closed_session_count, + calls_started, + calls_finished, + call_times, + } + } + fn ws_open(&self) { + self.open_session_count.inc(); + } + + fn ws_closed(&self) { + self.closed_session_count.inc(); + } + + fn cache_query(&self, method: &str) { + self.cache_query_counter.with_label_values(&[method]).inc(); + } + + fn cache_miss(&self, method: &str) { + self.cache_miss_counter.with_label_values(&[method]).inc(); + } +} diff --git a/src/extensions/server/mod.rs b/src/extensions/server/mod.rs index f052f0a..862cee3 100644 --- a/src/extensions/server/mod.rs +++ b/src/extensions/server/mod.rs @@ -4,21 +4,30 @@ use hyper::server::conn::AddrStream; use hyper::service::Service; use hyper::service::{make_service_fn, service_fn}; use jsonrpsee::server::{ - middleware::rpc::RpcServiceBuilder, stop_channel, RandomStringIdProvider, RpcModule, ServerBuilder, ServerHandle, + middleware::rpc::RpcServiceBuilder, stop_channel, ws, RandomStringIdProvider, RpcModule, ServerBuilder, + ServerHandle, }; use jsonrpsee::Methods; + use serde::ser::StdError; use serde::Deserialize; + use std::str::FromStr; use std::sync::Arc; use std::{future::Future, net::SocketAddr}; +use tower::layer::layer_fn; use tower::ServiceBuilder; use tower_http::cors::{AllowOrigin, CorsLayer}; use super::{Extension, ExtensionRegistry}; use crate::extensions::rate_limit::{MethodWeights, RateLimitBuilder, XFF}; +pub use prometheus::Protocol; +mod prometheus; mod proxy_get_request; + +use crate::extensions::prometheus::RpcMetrics; +use crate::extensions::server::prometheus::PrometheusService; use proxy_get_request::{ProxyGetRequestLayer, ProxyGetRequestMethod}; pub struct SubwayServerBuilder { @@ -98,6 +107,7 @@ impl SubwayServerBuilder { &self, rate_limit_builder: Option>, rpc_method_weights: MethodWeights, + rpc_metrics: RpcMetrics, rpc_module_builder: impl FnOnce() -> Fut, ) -> anyhow::Result<(SocketAddr, ServerHandle)> { let config = self.config.clone(); @@ -130,14 +140,20 @@ impl SubwayServerBuilder { let stop_handle = stop_handle.clone(); let rate_limit_builder = rate_limit_builder.clone(); let rpc_method_weights = rpc_method_weights.clone(); + let rpc_metrics = rpc_metrics.clone(); async move { // service_fn handle each request Ok::<_, Box>(service_fn(move |req| { + let is_websocket = ws::is_upgrade_request(&req); + let protocol = if is_websocket { Protocol::Ws } else { Protocol::Http }; + let mut socket_ip = socket_ip.clone(); let methods: Methods = rpc_module.clone().into(); let stop_handle = stop_handle.clone(); let http_middleware = http_middleware.clone(); + let rpc_metrics = rpc_metrics.clone(); + let call_metrics = rpc_metrics.call_metrics(); if let Some(true) = rate_limit_builder.as_ref().map(|r| r.use_xff()) { socket_ip = req.xxf_ip().unwrap_or(socket_ip); @@ -153,6 +169,11 @@ impl SubwayServerBuilder { rate_limit_builder .as_ref() .and_then(|r| r.connection_limit(rpc_method_weights.clone())), + ) + .option_layer( + call_metrics + .as_ref() + .map(|(a, b, c)| layer_fn(|s| PrometheusService::new(s, protocol, a, b, c))), ); let service_builder = ServerBuilder::default() @@ -163,6 +184,15 @@ impl SubwayServerBuilder { .to_service_builder(); let mut service = service_builder.build(methods, stop_handle); + + if is_websocket { + let on_ws_close = service.on_session_closed(); + rpc_metrics.ws_open(); + tokio::spawn(async move { + on_ws_close.await; + rpc_metrics.ws_closed(); + }); + } service.call(req) })) } diff --git a/src/extensions/server/prometheus.rs b/src/extensions/server/prometheus.rs new file mode 100644 index 0000000..f8b5c03 --- /dev/null +++ b/src/extensions/server/prometheus.rs @@ -0,0 +1,81 @@ +use futures::{future::BoxFuture, FutureExt}; +use jsonrpsee::server::middleware::rpc::RpcServiceT; +use jsonrpsee::types::Request; +use jsonrpsee::MethodResponse; +use substrate_prometheus_endpoint::{CounterVec, HistogramVec, U64}; + +use std::fmt::Display; + +#[derive(Clone, Copy)] +pub enum Protocol { + Ws, + Http, +} + +impl Display for Protocol { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let str = match self { + Self::Ws => "ws".to_string(), + Self::Http => "http".to_string(), + }; + write!(f, "{}", str) + } +} + +#[derive(Clone)] +pub struct PrometheusService { + inner: S, + protocol: Protocol, + call_times: HistogramVec, + calls_started: CounterVec, + calls_finished: CounterVec, +} + +impl PrometheusService { + pub fn new( + inner: S, + protocol: Protocol, + call_times: &HistogramVec, + calls_started: &CounterVec, + calls_finished: &CounterVec, + ) -> Self { + Self { + inner, + protocol, + calls_started: calls_started.clone(), + calls_finished: calls_finished.clone(), + call_times: call_times.clone(), + } + } +} + +impl<'a, S> RpcServiceT<'a> for PrometheusService +where + S: RpcServiceT<'a> + Send + Sync + Clone + 'static, +{ + type Future = BoxFuture<'a, MethodResponse>; + + fn call(&self, req: Request<'a>) -> Self::Future { + let protocol = self.protocol.to_string(); + let method = req.method.to_string(); + + let histogram = self.call_times.with_label_values(&[&protocol, &method]); + let started = self.calls_started.with_label_values(&[&protocol, &method]); + let finished = self.calls_finished.clone(); + + let service = self.inner.clone(); + async move { + started.inc(); + + let timer = histogram.start_timer(); + let res = service.call(req).await; + timer.stop_and_record(); + finished + .with_label_values(&[&protocol, &method, &res.is_error().to_string()]) + .inc(); + + res + } + .boxed() + } +} diff --git a/src/middlewares/methods/cache.rs b/src/middlewares/methods/cache.rs index 4a3eba1..85c0e63 100644 --- a/src/middlewares/methods/cache.rs +++ b/src/middlewares/methods/cache.rs @@ -5,6 +5,7 @@ use blake2::Blake2b512; use futures::FutureExt as _; use opentelemetry::trace::FutureExt; +use crate::extensions::prometheus::{get_rpc_metrics, RpcMetrics}; use crate::{ config::CacheParams, extensions::cache::Cache as CacheExtension, @@ -16,11 +17,12 @@ pub struct BypassCache(pub bool); pub struct CacheMiddleware { cache: Cache, + metrics: RpcMetrics, } impl CacheMiddleware { - pub fn new(cache: Cache) -> Self { - Self { cache } + pub fn new(cache: Cache, metrics: RpcMetrics) -> Self { + Self { cache, metrics } } } @@ -36,6 +38,8 @@ impl MiddlewareBuilder for CacheMiddleware { .get::() .expect("Cache extension not found"); + let metrics = get_rpc_metrics(extensions).await; + // do not cache if size is 0, otherwise use default size let size = match method.cache { Some(CacheParams { size: Some(0), .. }) => return None, @@ -57,7 +61,7 @@ impl MiddlewareBuilder for CacheMiddleware { ttl_seconds.map(std::time::Duration::from_secs), ); - Some(Box::new(Self::new(cache))) + Some(Box::new(Self::new(cache, metrics))) } } @@ -75,11 +79,21 @@ impl Middleware for CacheMiddleware { return next(request, context).await; } + let metrics = self.metrics.clone(); let key = CacheKey::::new(&request.method, &request.params); + let method = request.method.to_string(); + metrics.cache_query(&method); + let result = self .cache - .get_or_insert_with(key.clone(), || next(request, context).boxed()) + .get_or_insert_with(key.clone(), || { + async move { + metrics.cache_miss(&method); + next(request, context).await + } + .boxed() + }) .await; if let Ok(ref value) = result { @@ -110,7 +124,7 @@ mod tests { #[tokio::test] async fn handle_ok_resp() { let cache = Cache::new(NonZeroUsize::try_from(1).unwrap(), None); - let middleware = CacheMiddleware::new(cache.clone()); + let middleware = CacheMiddleware::new(cache.clone(), RpcMetrics::noop()); let res = middleware .call( @@ -187,7 +201,7 @@ mod tests { #[tokio::test] async fn should_not_cache_null() { - let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None)); + let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None), RpcMetrics::noop()); let res = middleware .call( @@ -214,10 +228,10 @@ mod tests { #[tokio::test] async fn cache_ttl_works() { - let middleware = CacheMiddleware::new(Cache::new( - NonZeroUsize::new(1).unwrap(), - Some(Duration::from_millis(10)), - )); + let middleware = CacheMiddleware::new( + Cache::new(NonZeroUsize::new(1).unwrap(), Some(Duration::from_millis(10))), + RpcMetrics::noop(), + ); let res = middleware .call( @@ -257,7 +271,7 @@ mod tests { #[tokio::test] async fn bypass_cache() { - let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None)); + let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None), RpcMetrics::noop()); let res = middleware .call( @@ -300,7 +314,7 @@ mod tests { #[tokio::test] async fn avoid_repeated_requests() { - let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None)); + let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None), RpcMetrics::noop()); let (tx, mut rx) = tokio::sync::mpsc::channel(1); let res = middleware.call( diff --git a/src/server.rs b/src/server.rs index bfce414..4776b26 100644 --- a/src/server.rs +++ b/src/server.rs @@ -13,6 +13,7 @@ use serde_json::json; use crate::{ config::Config, extensions::{ + prometheus::get_rpc_metrics, rate_limit::{MethodWeights, RateLimitBuilder}, server::SubwayServerBuilder, }, @@ -52,9 +53,11 @@ pub async fn build(config: Config) -> anyhow::Result { let request_timeout_seconds = server_builder.config.request_timeout_seconds; + let metrics = get_rpc_metrics(&extensions_registry).await; + let registry = extensions_registry.clone(); let (addr, handle) = server_builder - .build(rate_limit_builder, rpc_method_weights, move || async move { + .build(rate_limit_builder, rpc_method_weights, metrics, move || async move { let mut module = RpcModule::new(()); let tracer = telemetry::Tracer::new("server"); From fd9e04444515d059f002e6902b3f11cd5d63eaff Mon Sep 17 00:00:00 2001 From: Jan Koscisz Date: Fri, 10 May 2024 12:40:59 +0200 Subject: [PATCH 3/9] Add prometheus to config --- configs/config.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/configs/config.yml b/configs/config.yml index 890f8ae..c0f6511 100644 --- a/configs/config.yml +++ b/configs/config.yml @@ -41,6 +41,10 @@ extensions: # use X-Forwarded-For header to get real ip, if available (e.g. behind a load balancer). # WARNING: Use with caution, as this xff header can be forged. use_xff: true # default is false + prometheus: + port: 9616 + listen_address: "0.0.0.0" + label: "dev" middlewares: methods: From a28b06081c7c8378fe7d30b3c3cb579230d44fd7 Mon Sep 17 00:00:00 2001 From: Jan Koscisz Date: Fri, 10 May 2024 12:54:48 +0200 Subject: [PATCH 4/9] Revert one unwanted change --- src/extensions/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/extensions/mod.rs b/src/extensions/mod.rs index aaf7cf5..6bd671d 100644 --- a/src/extensions/mod.rs +++ b/src/extensions/mod.rs @@ -135,6 +135,7 @@ macro_rules! define_all_extensions { define_all_extensions! { telemetry: telemetry::Telemetry, cache: cache::Cache, + #[garde(dive)] client: client::Client, merge_subscription: merge_subscription::MergeSubscription, substrate_api: api::SubstrateApi, From 8bcba852eae85fa5edfcf0f47eb3e5bfc994a771 Mon Sep 17 00:00:00 2001 From: Jan Koscisz Date: Fri, 10 May 2024 16:37:05 +0200 Subject: [PATCH 5/9] clippy --- src/middlewares/methods/block_tag.rs | 4 +--- src/middlewares/methods/inject_params.rs | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/middlewares/methods/block_tag.rs b/src/middlewares/methods/block_tag.rs index ed8e657..1e0c6cf 100644 --- a/src/middlewares/methods/block_tag.rs +++ b/src/middlewares/methods/block_tag.rs @@ -22,9 +22,7 @@ impl MiddlewareBuilder for BlockTagMiddlewar method: &RpcMethod, extensions: &TypeRegistryRef, ) -> Option>> { - let Some(index) = method.params.iter().position(|p| p.ty == "BlockTag" && p.inject) else { - return None; - }; + let index = method.params.iter().position(|p| p.ty == "BlockTag" && p.inject)?; let eth_api = extensions .read() diff --git a/src/middlewares/methods/inject_params.rs b/src/middlewares/methods/inject_params.rs index 77feaed..bcbca9f 100644 --- a/src/middlewares/methods/inject_params.rs +++ b/src/middlewares/methods/inject_params.rs @@ -45,9 +45,7 @@ impl MiddlewareBuilder for InjectParamsMiddl method: &RpcMethod, extensions: &TypeRegistryRef, ) -> Option>> { - let Some(inject_type) = inject_type(&method.params) else { - return None; - }; + let inject_type = inject_type(&method.params)?; let api = extensions .read() From 3281f89aa941c5df6e92e26cf2541412ae9b4b9f Mon Sep 17 00:00:00 2001 From: Jan Koscisz Date: Fri, 10 May 2024 16:51:26 +0200 Subject: [PATCH 6/9] Update clippy in test & fix warnings --- .github/workflows/test.yml | 6 +++--- src/extensions/client/endpoint.rs | 14 ++++++++------ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 00c0a10..57d63e8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -22,15 +22,15 @@ jobs: - name: Install toolchain uses: dtolnay/rust-toolchain@stable with: - toolchain: nightly-2023-09-29 + toolchain: nightly-2024-02-09 components: rustfmt, clippy - uses: actions/checkout@v3 with: submodules: recursive - name: Check format - run: cargo +nightly-2023-09-29 fmt --all -- --check + run: cargo +nightly-2024-02-09 fmt --all -- --check - name: Check clippy - run: cargo +nightly-2023-09-29 clippy --all-targets --all-features -- -D warnings + run: cargo +nightly-2024-02-09 clippy --all-targets --all-features -- -D warnings - name: Build run: cargo build --verbose - name: Run tests diff --git a/src/extensions/client/endpoint.rs b/src/extensions/client/endpoint.rs index 20e8fdb..4b8fb2a 100644 --- a/src/extensions/client/endpoint.rs +++ b/src/extensions/client/endpoint.rs @@ -134,7 +134,7 @@ impl Endpoint { params: Vec, timeout: Duration, ) -> Result { - match tokio::time::timeout(timeout, async { + let request_result = tokio::time::timeout(timeout, async { self.connected().await; let client = self .client_rx @@ -149,8 +149,9 @@ impl Endpoint { } } }) - .await - { + .await; + + match request_result { Ok(res) => res, Err(_) => { tracing::error!("request timed out method: {method} params: {params:?}"); @@ -167,7 +168,7 @@ impl Endpoint { unsubscribe_method: &str, timeout: Duration, ) -> Result, jsonrpsee::core::client::Error> { - match tokio::time::timeout(timeout, async { + let subscription_result = tokio::time::timeout(timeout, async { self.connected().await; let client = self .client_rx @@ -185,8 +186,9 @@ impl Endpoint { } } }) - .await - { + .await; + + match subscription_result { Ok(res) => res, Err(_) => { tracing::error!("subscribe timed out subscribe: {subscribe_method} params: {params:?}"); From 16ad0fb5a5b610756a679eb4321339c05594411a Mon Sep 17 00:00:00 2001 From: Jan Koscisz Date: Tue, 14 May 2024 13:15:06 +0200 Subject: [PATCH 7/9] fix ci --- benches/bench/rate_limit.rs | 6 +++--- src/extensions/rate_limit/connection.rs | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/benches/bench/rate_limit.rs b/benches/bench/rate_limit.rs index e89678c..6ad6310 100644 --- a/benches/bench/rate_limit.rs +++ b/benches/bench/rate_limit.rs @@ -4,8 +4,8 @@ use futures_util::FutureExt; use governor::Jitter; use governor::RateLimiter; use jsonrpsee::server::middleware::rpc::RpcServiceT; -use jsonrpsee::types::{Request, ResponsePayload}; -use jsonrpsee::MethodResponse; +use jsonrpsee::types::Request; +use jsonrpsee::{MethodResponse, ResponsePayload}; use std::num::NonZeroU32; use std::time::Duration; use subway::extensions::rate_limit::{build_quota, ConnectionRateLimit, IpRateLimit}; @@ -16,7 +16,7 @@ impl RpcServiceT<'static> for MockService { type Future = BoxFuture<'static, MethodResponse>; fn call(&self, req: Request<'static>) -> Self::Future { - async move { MethodResponse::response(req.id, ResponsePayload::result("ok"), 1024) }.boxed() + async move { MethodResponse::response(req.id, ResponsePayload::success("ok"), 1024) }.boxed() } } diff --git a/src/extensions/rate_limit/connection.rs b/src/extensions/rate_limit/connection.rs index 0bd74d4..ccc2f3e 100644 --- a/src/extensions/rate_limit/connection.rs +++ b/src/extensions/rate_limit/connection.rs @@ -88,7 +88,8 @@ where #[cfg(test)] mod tests { use super::*; - use jsonrpsee::types::{Id, ResponsePayload}; + use jsonrpsee::types::Id; + use jsonrpsee::ResponsePayload; #[derive(Clone)] struct MockService; @@ -96,7 +97,7 @@ mod tests { type Future = BoxFuture<'static, MethodResponse>; fn call(&self, req: Request<'static>) -> Self::Future { - async move { MethodResponse::response(req.id, ResponsePayload::result("ok"), 1024) }.boxed() + async move { MethodResponse::response(req.id, ResponsePayload::success("ok"), 1024) }.boxed() } } From c29e7f87403748a7a415d21719ce2f3bf635b6b6 Mon Sep 17 00:00:00 2001 From: kostekIV <27210860+kostekIV@users.noreply.github.com> Date: Tue, 14 May 2024 13:17:35 +0200 Subject: [PATCH 8/9] Apply suggestions from code review Co-authored-by: Xiliang Chen --- src/extensions/prometheus/mod.rs | 4 ++-- src/extensions/server/prometheus.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/extensions/prometheus/mod.rs b/src/extensions/prometheus/mod.rs index c0424d1..950a6aa 100644 --- a/src/extensions/prometheus/mod.rs +++ b/src/extensions/prometheus/mod.rs @@ -81,10 +81,10 @@ impl Prometheus { } fn start_prometheus_exporter(registry: Registry, port: u16, listen_address: String) -> JoinHandle<()> { - let address = listen_address.parse().unwrap(); + let address = listen_address.parse().expect("Invalid prometheus listen address"); let addr = SocketAddr::new(address, port); tokio::spawn(async move { - init_prometheus(addr, registry).await.unwrap(); + init_prometheus(addr, registry).await.expect("Init prometeus failed"); }) } diff --git a/src/extensions/server/prometheus.rs b/src/extensions/server/prometheus.rs index f8b5c03..31967fc 100644 --- a/src/extensions/server/prometheus.rs +++ b/src/extensions/server/prometheus.rs @@ -15,8 +15,8 @@ pub enum Protocol { impl Display for Protocol { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let str = match self { - Self::Ws => "ws".to_string(), - Self::Http => "http".to_string(), + Self::Ws => "ws", + Self::Http => "http", }; write!(f, "{}", str) } From f54470b7e554597f72adcfa8267ac3aaea5fa681 Mon Sep 17 00:00:00 2001 From: Jan Koscisz Date: Wed, 15 May 2024 17:16:03 +0200 Subject: [PATCH 9/9] More explicit expect --- src/extensions/prometheus/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/extensions/prometheus/mod.rs b/src/extensions/prometheus/mod.rs index 950a6aa..2bfac00 100644 --- a/src/extensions/prometheus/mod.rs +++ b/src/extensions/prometheus/mod.rs @@ -22,9 +22,9 @@ pub async fn get_rpc_metrics(registry: &TypeRegistryRef) -> RpcMetrics { } pub struct Prometheus { - pub registry: Registry, + registry: Registry, rpc_metrics: RpcMetrics, - pub exporter_task: JoinHandle<()>, + exporter_task: JoinHandle<()>, } impl Drop for Prometheus { @@ -56,11 +56,14 @@ impl Prometheus { .chain_label .clone() .map(|l| iter::once(("chain".to_string(), l.clone())).collect()); + + // make sure the prefix is not an Option of Some empty string let prefix = match config.prefix { Some(p) if p.is_empty() => None, p => p, }; - let registry = Registry::new_custom(prefix, labels).expect("Can't happen"); + let registry = Registry::new_custom(prefix, labels) + .expect("It can't fail, we make sure the `prefix` is either `None` or `Some` of non-empty string"); let rpc_metrics = RpcMetrics::new(®istry); let exporter_task = start_prometheus_exporter(registry.clone(), config.port, config.listen_address);