diff --git a/Cargo.lock b/Cargo.lock index 19f77648fb..c2ba56b1bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -176,12 +176,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" -[[package]] -name = "array-init" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d62b7694a562cdf5a74227903507c56ab2cc8bdd1f781ed5cb4cf9c9f810bfc" - [[package]] name = "assert-json-diff" version = "2.0.2" @@ -310,12 +304,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" -[[package]] -name = "bimap" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" - [[package]] name = "bincode" version = "1.3.3" @@ -427,26 +415,6 @@ version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" -[[package]] -name = "bytemuck" -version = "1.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "374d28ec25809ee0e23827c2ab573d729e293f281dfe393500e7ad618baa61c6" -dependencies = [ - "bytemuck_derive", -] - -[[package]] -name = "bytemuck_derive" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "965ab7eb5f8f97d2a083c799f3a1b994fc397b2fe2da5d1da1626ce15a39f2b1" -dependencies = [ - "proc-macro2 1.0.66", - "quote 1.0.32", - "syn 2.0.28", -] - [[package]] name = "byteorder" version = "1.4.3" @@ -472,6 +440,7 @@ name = "casper-binary-port" version = "1.0.0" dependencies = [ "bincode", + "bytes", "casper-types", "once_cell", "rand", @@ -481,6 +450,7 @@ dependencies = [ "serde_json", "serde_test", "thiserror", + "tokio-util 0.6.10", ] [[package]] @@ -586,7 +556,7 @@ dependencies = [ "serde", "serde_bytes", "serde_json", - "strum 0.24.1", + "strum", "tempfile", "thiserror", "tracing", @@ -629,7 +599,6 @@ dependencies = [ "humantime", "hyper", "itertools 0.10.5", - "juliet", "libc", "linked-hash-map", "lmdb-rkv", @@ -669,7 +638,7 @@ dependencies = [ "static_assertions", "stats_alloc", "structopt", - "strum 0.24.1", + "strum", "sys-info", "tempfile", "thiserror", @@ -760,7 +729,7 @@ dependencies = [ "serde_bytes", "serde_json", "serde_test", - "strum 0.24.1", + "strum", "tempfile", "thiserror", "tracing", @@ -3428,24 +3397,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "juliet" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4336a0d5e38193caafe774bd2be027cf5aa3c3e45b3f1bda1791fcacc9e9951d" -dependencies = [ - "array-init", - "bimap", - "bytemuck", - "bytes", - "futures", - "once_cell", - "strum 0.25.0", - "thiserror", - "tokio", - "tracing", -] - [[package]] name = "k256" version = "0.13.1" @@ -5436,16 +5387,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" dependencies = [ - "strum_macros 0.24.3", -] - -[[package]] -name = "strum" -version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" -dependencies = [ - "strum_macros 0.25.3", + "strum_macros", ] [[package]] @@ -5461,19 +5403,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "strum_macros" -version = "0.25.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" -dependencies = [ - "heck 0.4.1", - "proc-macro2 1.0.66", - "quote 1.0.32", - "rustversion", - "syn 2.0.28", -] - [[package]] name = "subtle" version = "2.4.1" diff --git a/binary_port/Cargo.toml b/binary_port/Cargo.toml index d467bde88e..3ab9705f51 100644 --- a/binary_port/Cargo.toml +++ b/binary_port/Cargo.toml @@ -11,6 +11,7 @@ license = "Apache-2.0" exclude = ["proptest-regressions"] [dependencies] +bytes = "1.0.1" casper-types = { version = "5.0.0", path = "../types", features = ["datasize", "json-schema", "std"] } serde = { version = "1.0.183", features = ["derive"] } thiserror = "1.0.45" @@ -19,6 +20,7 @@ once_cell = { version = "1.5.2" } schemars = { version = "0.8.16", features = ["preserve_order", "impl_json_schema"] } bincode = "1.3.3" rand = "0.8.3" +tokio-util = { version = "0.6.4", features = ["codec"] } [dev-dependencies] casper-types = { path = "../types", features = ["datasize", "json-schema", "std", "testing"] } diff --git a/binary_port/src/binary_message.rs b/binary_port/src/binary_message.rs new file mode 100644 index 0000000000..52d6d7ef32 --- /dev/null +++ b/binary_port/src/binary_message.rs @@ -0,0 +1,245 @@ +#[cfg(test)] +use casper_types::testing::TestRng; +#[cfg(test)] +use rand::Rng; + +use bytes::{Buf, Bytes}; +use tokio_util::codec::{self}; + +use crate::error::Error; + +type LengthEncoding = u32; +const LENGTH_ENCODING_SIZE_BYTES: usize = std::mem::size_of::(); + +#[derive(Clone, PartialEq, Debug)] +pub struct BinaryMessage(Bytes); + +impl BinaryMessage { + pub fn new(payload: Vec) -> Self { + Self(payload.into()) + } + + pub fn payload(&self) -> &[u8] { + &self.0 + } + + #[cfg(test)] + pub(crate) fn random(rng: &mut TestRng) -> Self { + let len = rng.gen_range(1..=1024); + let payload = std::iter::repeat_with(|| rng.gen()).take(len).collect(); + BinaryMessage(payload) + } +} + +pub struct BinaryMessageCodec { + max_message_size_bytes: u32, +} + +impl BinaryMessageCodec { + pub fn new(max_message_size_bytes: u32) -> Self { + Self { + max_message_size_bytes, + } + } + + pub fn max_message_size_bytes(&self) -> u32 { + self.max_message_size_bytes + } +} + +impl codec::Encoder for BinaryMessageCodec { + type Error = Error; + + fn encode( + &mut self, + item: BinaryMessage, + dst: &mut bytes::BytesMut, + ) -> Result<(), Self::Error> { + let length = item.0.len() as LengthEncoding; + if length > self.max_message_size_bytes { + return Err(Error::RequestTooLarge { + allowed: self.max_message_size_bytes, + got: length, + }); + } + let length_bytes = length.to_le_bytes(); + dst.extend(length_bytes.iter().chain(item.0.iter())); + Ok(()) + } +} + +impl codec::Decoder for BinaryMessageCodec { + type Item = BinaryMessage; + + type Error = Error; + + fn decode(&mut self, src: &mut bytes::BytesMut) -> Result, Self::Error> { + let (length, have_full_frame) = if let [b1, b2, b3, b4, remainder @ ..] = &src[..] { + let length = LengthEncoding::from_le_bytes([*b1, *b2, *b3, *b4]) as usize; + (length, remainder.len() >= length) + } else { + // Not enough bytes to read the length. + return Ok(None); + }; + + if !have_full_frame { + // Not enough bytes to read the whole message. + return Ok(None); + }; + + if length > self.max_message_size_bytes as usize { + return Err(Error::RequestTooLarge { + allowed: self.max_message_size_bytes, + got: length as u32, + }); + } + if length == 0 { + return Err(Error::EmptyRequest); + } + + src.advance(LENGTH_ENCODING_SIZE_BYTES); + Ok(Some(BinaryMessage(src.split_to(length).freeze()))) + } +} + +#[cfg(test)] +mod tests { + use casper_types::testing::TestRng; + use rand::Rng; + use tokio_util::codec::{Decoder, Encoder}; + + use crate::{ + binary_message::{LengthEncoding, LENGTH_ENCODING_SIZE_BYTES}, + error::Error, + BinaryMessage, BinaryMessageCodec, + }; + + const MAX_MESSAGE_SIZE_BYTES: u32 = 1024 * 1024; + + #[test] + fn binary_message_codec() { + let rng = &mut TestRng::new(); + let val = BinaryMessage::random(rng); + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); + let mut bytes = bytes::BytesMut::new(); + codec + .encode(val.clone(), &mut bytes) + .expect("should encode"); + + let decoded = codec + .decode(&mut bytes) + .expect("should decode") + .expect("should be Some"); + + assert_eq!(val, decoded); + } + + #[test] + fn should_not_decode_when_not_enough_bytes_to_decode_length() { + let rng = &mut TestRng::new(); + let val = BinaryMessage::random(rng); + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); + let mut bytes = bytes::BytesMut::new(); + codec.encode(val, &mut bytes).expect("should encode"); + + let _ = bytes.split_off(LENGTH_ENCODING_SIZE_BYTES / 2); + let in_bytes = bytes.clone(); + assert!(codec.decode(&mut bytes).expect("should decode").is_none()); + + // Ensure that the bytes are not consumed. + assert_eq!(in_bytes, bytes); + } + + #[test] + fn should_not_decode_when_not_enough_bytes_to_decode_full_frame() { + let rng = &mut TestRng::new(); + let val = BinaryMessage::random(rng); + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); + let mut bytes = bytes::BytesMut::new(); + codec.encode(val, &mut bytes).expect("should encode"); + + let _ = bytes.split_off(bytes.len() - 1); + let in_bytes = bytes.clone(); + assert!(codec.decode(&mut bytes).expect("should decode").is_none()); + + // Ensure that the bytes are not consumed. + assert_eq!(in_bytes, bytes); + } + + #[test] + fn should_leave_remainder_in_buffer() { + let rng = &mut TestRng::new(); + let val = BinaryMessage::random(rng); + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); + let mut bytes = bytes::BytesMut::new(); + codec.encode(val, &mut bytes).expect("should encode"); + let suffix = bytes::Bytes::from_static(b"suffix"); + bytes.extend(&suffix); + + let _ = codec.decode(&mut bytes); + + // Ensure that the bytes are not consumed. + assert_eq!(bytes, suffix); + } + + #[test] + fn encode_should_bail_on_too_large_request() { + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); + let too_large = MAX_MESSAGE_SIZE_BYTES as usize + 1; + let val = BinaryMessage::new(vec![0; too_large]); + let mut bytes = bytes::BytesMut::new(); + let result = codec.encode(val, &mut bytes).unwrap_err(); + + assert!(matches!(result, Error::RequestTooLarge { allowed, got } + if allowed == codec.max_message_size_bytes && got == too_large as u32)); + } + + #[test] + fn should_encode_request_of_maximum_size() { + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); + let just_right_size = MAX_MESSAGE_SIZE_BYTES as usize; + let val = BinaryMessage::new(vec![0; just_right_size]); + let mut bytes = bytes::BytesMut::new(); + + let result = codec.encode(val, &mut bytes); + assert!(result.is_ok()); + } + + #[test] + fn decode_should_bail_on_too_large_request() { + let rng = &mut TestRng::new(); + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); + let mut bytes = bytes::BytesMut::new(); + let too_large = (codec.max_message_size_bytes + 1) as LengthEncoding; + bytes.extend(too_large.to_le_bytes()); + bytes.extend(std::iter::repeat_with(|| rng.gen::()).take(too_large as usize)); + + let result = codec.decode(&mut bytes).unwrap_err(); + assert!(matches!(result, Error::RequestTooLarge { allowed, got } + if allowed == codec.max_message_size_bytes && got == too_large)); + } + + #[test] + fn should_decode_request_of_maximum_size() { + let rng = &mut TestRng::new(); + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); + let mut bytes = bytes::BytesMut::new(); + let just_right_size = (codec.max_message_size_bytes) as LengthEncoding; + bytes.extend(just_right_size.to_le_bytes()); + bytes.extend(std::iter::repeat_with(|| rng.gen::()).take(just_right_size as usize)); + + let result = codec.decode(&mut bytes); + assert!(result.is_ok()); + } + + #[test] + fn should_bail_on_empty_request() { + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); + let mut bytes = bytes::BytesMut::new(); + let empty = 0 as LengthEncoding; + bytes.extend(&empty.to_le_bytes()); + + let result = codec.decode(&mut bytes).unwrap_err(); + assert!(matches!(result, Error::EmptyRequest)); + } +} diff --git a/binary_port/src/error.rs b/binary_port/src/error.rs new file mode 100644 index 0000000000..44202df8b1 --- /dev/null +++ b/binary_port/src/error.rs @@ -0,0 +1,15 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("Invalid request tag ({0})")] + InvalidBinaryRequestTag(u8), + #[error("Request too large: allowed {allowed} bytes, got {got} bytes")] + RequestTooLarge { allowed: u32, got: u32 }, + #[error("Empty request")] + EmptyRequest, + #[error(transparent)] + Io(#[from] std::io::Error), + #[error(transparent)] + BytesRepr(#[from] casper_types::bytesrepr::Error), +} diff --git a/binary_port/src/lib.rs b/binary_port/src/lib.rs index c9014d5035..36b7582cb5 100644 --- a/binary_port/src/lib.rs +++ b/binary_port/src/lib.rs @@ -1,11 +1,13 @@ //! A Rust library for types used by the binary port of a casper node. mod balance_response; +mod binary_message; mod binary_request; mod binary_response; mod binary_response_and_request; mod binary_response_header; mod dictionary_item_identifier; +mod error; mod error_code; mod get_request; mod global_state_query_result; @@ -20,11 +22,13 @@ mod state_request; mod type_wrappers; pub use balance_response::BalanceResponse; +pub use binary_message::{BinaryMessage, BinaryMessageCodec}; pub use binary_request::{BinaryRequest, BinaryRequestHeader, BinaryRequestTag}; pub use binary_response::BinaryResponse; pub use binary_response_and_request::BinaryResponseAndRequest; pub use binary_response_header::BinaryResponseHeader; pub use dictionary_item_identifier::DictionaryItemIdentifier; +pub use error::Error; pub use error_code::ErrorCode; pub use get_request::GetRequest; pub use global_state_query_result::GlobalStateQueryResult; diff --git a/node/BINARY_PORT_PROTOCOL.md b/node/BINARY_PORT_PROTOCOL.md index 265c3e39f9..472315574e 100644 --- a/node/BINARY_PORT_PROTOCOL.md +++ b/node/BINARY_PORT_PROTOCOL.md @@ -2,7 +2,7 @@ The specification of the protocol used to communicate between the RPC sidecar and binary port casper-node. ## Synopsis -This is a binary protocol which follows a simple request-response model built on top of [Juliet](https://github.com/casper-network/juliet). The protocol consists of one party (the client) sending requests to another party (the server) and the server sending responses back to the client. Both requests and responses are wrapped in envelopes containing a version and a payload type tag. The versioning scheme is based on [SemVer](https://semver.org/), see [versioning](#versioning) for more details. The payload type tags are used to interpret the contents of the payloads. +This is a binary protocol which follows a simple request-response model. The protocol consists of one party (the client) sending requests to another party (the server) and the server sending responses back to the client. Both requests and responses are wrapped in envelopes containing a version and a payload type tag. The versioning scheme is based on [SemVer](https://semver.org/), see [versioning](#versioning) for more details. The payload type tags are used to interpret the contents of the payloads. ### Request format | Size in bytes | Field | Description | diff --git a/node/Cargo.toml b/node/Cargo.toml index 850ca8424a..1c8455b662 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -13,7 +13,6 @@ default-run = "casper-node" exclude = ["proptest-regressions"] [dependencies] -juliet = { version ="0.3", features = ["tracing"] } ansi_term = "0.12.1" anyhow = "1" aquamarine = "0.1.12" diff --git a/node/src/components/binary_port.rs b/node/src/components/binary_port.rs index a74a045cb1..216952f98a 100644 --- a/node/src/components/binary_port.rs +++ b/node/src/components/binary_port.rs @@ -8,13 +8,12 @@ mod tests; use std::{convert::TryFrom, net::SocketAddr, sync::Arc}; -use bytes::Bytes; use casper_binary_port::{ - BalanceResponse, BinaryRequest, BinaryRequestHeader, BinaryRequestTag, BinaryResponse, - BinaryResponseAndRequest, DictionaryItemIdentifier, DictionaryQueryResult, ErrorCode, - GetRequest, GetTrieFullResult, GlobalStateQueryResult, GlobalStateRequest, InformationRequest, - InformationRequestTag, NodeStatus, PayloadType, PurseIdentifier, ReactorStateName, RecordId, - TransactionWithExecutionInfo, + BalanceResponse, BinaryMessage, BinaryMessageCodec, BinaryRequest, BinaryRequestHeader, + BinaryRequestTag, BinaryResponse, BinaryResponseAndRequest, DictionaryItemIdentifier, + DictionaryQueryResult, ErrorCode, GetRequest, GetTrieFullResult, GlobalStateQueryResult, + GlobalStateRequest, InformationRequest, InformationRequestTag, NodeStatus, PayloadType, + PurseIdentifier, ReactorStateName, RecordId, TransactionWithExecutionInfo, }; use casper_storage::{ data_access_layer::{ @@ -33,21 +32,15 @@ use casper_types::{ }; use datasize::DataSize; -use futures::{future::BoxFuture, FutureExt}; -use juliet::{ - io::IoCoreBuilder, - protocol::ProtocolBuilder, - rpc::{JulietRpcServer, RpcBuilder}, - ChannelConfiguration, ChannelId, -}; +use futures::{future::BoxFuture, FutureExt, SinkExt, StreamExt}; use once_cell::sync::OnceCell; use prometheus::Registry; use tokio::{ - io::{AsyncRead, AsyncWrite}, join, net::{TcpListener, TcpStream}, sync::{Notify, OwnedSemaphorePermit, Semaphore}, }; +use tokio_util::codec::Framed; use tracing::{debug, error, info, warn}; use crate::{ @@ -811,13 +804,12 @@ where } } -async fn client_loop( - mut server: JulietRpcServer, +async fn handle_client_loop( + stream: TcpStream, effect_builder: EffectBuilder, + max_message_size_bytes: u32, ) -> Result<(), Error> where - R: AsyncRead + Unpin, - W: AsyncWrite + Unpin, REv: From + From + From @@ -830,20 +822,26 @@ where + From + Send, { + let mut framed = Framed::new(stream, BinaryMessageCodec::new(max_message_size_bytes)); + loop { - let Some(incoming_request) = server.next_request().await? else { + let Some(result) = framed.next().await else { debug!("remote party closed the connection"); return Ok(()); }; - - let Some(payload) = incoming_request.payload() else { + let result = result?; + let payload = result.payload(); + if payload.is_empty() { return Err(Error::NoPayload); }; let version = effect_builder.get_protocol_version().await; - let resp = handle_payload(effect_builder, payload, version).await; - let resp_and_payload = BinaryResponseAndRequest::new(resp, payload); - incoming_request.respond(Some(Bytes::from(ToBytes::to_bytes(&resp_and_payload)?))) + let response = handle_payload(effect_builder, payload, version).await; + framed + .send(BinaryMessage::new( + BinaryResponseAndRequest::new(response, payload).to_bytes()?, + )) + .await? } } @@ -885,7 +883,7 @@ where async fn handle_client( addr: SocketAddr, - mut client: TcpStream, + stream: TcpStream, effect_builder: EffectBuilder, config: Arc, _permit: OwnedSemaphorePermit, @@ -902,12 +900,9 @@ async fn handle_client( + From + Send, { - let (reader, writer) = client.split(); - // We are a server, we won't make any requests of our own, but we need to keep the client - // around, since dropping the client will trigger a server shutdown. - let (_client, server) = new_rpc_builder(&config).build(reader, writer); - - if let Err(err) = client_loop(server, effect_builder).await { + if let Err(err) = + handle_client_loop(stream, effect_builder, config.max_message_size_bytes).await + { // Low severity is used to prevent malicious clients from causing log floods. info!(%addr, err=display_error(&err), "binary port client handler error"); } @@ -987,18 +982,6 @@ impl Finalize for BinaryPort { } } -fn new_rpc_builder(config: &Config) -> RpcBuilder<1> { - let protocol_builder = ProtocolBuilder::<1>::with_default_channel_config( - ChannelConfiguration::default() - .with_request_limit(config.client_request_limit) - .with_max_request_payload_size(config.max_request_size_bytes) - .with_max_response_payload_size(config.max_response_size_bytes), - ); - let io_builder = IoCoreBuilder::new(protocol_builder) - .buffer_size(ChannelId::new(0), config.client_request_buffer_size); - RpcBuilder::new(io_builder) -} - async fn resolve_block_header( effect_builder: EffectBuilder, block_identifier: Option, diff --git a/node/src/components/binary_port/config.rs b/node/src/components/binary_port/config.rs index be5e904be6..c55bf78257 100644 --- a/node/src/components/binary_port/config.rs +++ b/node/src/components/binary_port/config.rs @@ -4,8 +4,8 @@ use serde::{Deserialize, Serialize}; /// Uses a fixed port per node, but binds on any interface. const DEFAULT_ADDRESS: &str = "0.0.0.0:0"; -/// Default maximum payload size. -const DEFAULT_MAX_PAYLOAD_SIZE: u32 = 4 * 1024 * 1024; +/// Default maximum message size. +const DEFAULT_MAX_MESSAGE_SIZE: u32 = 4 * 1024 * 1024; /// Default request limit. const DEFAULT_CLIENT_REQUEST_LIMIT: u16 = 3; /// Default request buffer size. @@ -30,10 +30,8 @@ pub struct Config { pub allow_request_get_trie: bool, /// Flag used to enable/disable the [`TrySpeculativeExec`] request. pub allow_request_speculative_exec: bool, - /// Maximum size of a request in bytes. - pub max_request_size_bytes: u32, - /// Maximum size of a response in bytes. - pub max_response_size_bytes: u32, + /// Maximum size of the binary port message. + pub max_message_size_bytes: u32, /// Maximum number of in-flight requests per client. pub client_request_limit: u16, /// Number of requests that can be buffered per client. @@ -54,8 +52,7 @@ impl Config { allow_request_get_trie: false, allow_request_speculative_exec: false, client_request_limit: DEFAULT_CLIENT_REQUEST_LIMIT, - max_request_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE, - max_response_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE, + max_message_size_bytes: DEFAULT_MAX_MESSAGE_SIZE, client_request_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE, max_connections: DEFAULT_MAX_CONNECTIONS, gas_hold_handling: DEFAULT_GAS_HOLD_BALANCE_HANDLING, diff --git a/node/src/components/binary_port/error.rs b/node/src/components/binary_port/error.rs index a9ad34bfd8..46050a4eed 100644 --- a/node/src/components/binary_port/error.rs +++ b/node/src/components/binary_port/error.rs @@ -1,5 +1,4 @@ use casper_types::bytesrepr; -use juliet::rpc::RpcServerError; use thiserror::Error; #[derive(Debug, Error)] @@ -9,5 +8,5 @@ pub(crate) enum Error { #[error("received request without payload")] NoPayload, #[error(transparent)] - RpcServer(#[from] RpcServerError), + BinaryPort(#[from] casper_binary_port::Error), } diff --git a/node/src/components/binary_port/tests.rs b/node/src/components/binary_port/tests.rs index 8ad50d5dd9..9271493b30 100644 --- a/node/src/components/binary_port/tests.rs +++ b/node/src/components/binary_port/tests.rs @@ -163,8 +163,7 @@ async fn run_test_case( allow_request_get_all_values, allow_request_get_trie, allow_request_speculative_exec, - max_request_size_bytes: 1024, - max_response_size_bytes: 1024, + max_message_size_bytes: 1024, client_request_limit: 2, client_request_buffer_size: 16, max_connections: 2, diff --git a/node/src/reactor/main_reactor/tests/binary_port.rs b/node/src/reactor/main_reactor/tests/binary_port.rs index 906f7d97d2..a4f616d0bb 100644 --- a/node/src/reactor/main_reactor/tests/binary_port.rs +++ b/node/src/reactor/main_reactor/tests/binary_port.rs @@ -7,11 +7,12 @@ use std::{ }; use casper_binary_port::{ - BalanceResponse, BinaryRequest, BinaryRequestHeader, BinaryResponse, BinaryResponseAndRequest, - ConsensusStatus, ConsensusValidatorChanges, DictionaryItemIdentifier, DictionaryQueryResult, - ErrorCode, GetRequest, GetTrieFullResult, GlobalStateQueryResult, GlobalStateRequest, - InformationRequest, InformationRequestTag, LastProgress, NetworkName, NodeStatus, PayloadType, - PurseIdentifier, ReactorStateName, RecordId, Uptime, + BalanceResponse, BinaryMessage, BinaryMessageCodec, BinaryRequest, BinaryRequestHeader, + BinaryResponse, BinaryResponseAndRequest, ConsensusStatus, ConsensusValidatorChanges, + DictionaryItemIdentifier, DictionaryQueryResult, ErrorCode, GetRequest, GetTrieFullResult, + GlobalStateQueryResult, GlobalStateRequest, InformationRequest, InformationRequestTag, + LastProgress, NetworkName, NodeStatus, PayloadType, PurseIdentifier, ReactorStateName, + RecordId, Uptime, }; use casper_storage::global_state::state::CommitProvider; use casper_types::{ @@ -25,15 +26,10 @@ use casper_types::{ EntityAddr, GlobalStateIdentifier, Key, KeyTag, NextUpgrade, Peers, ProtocolVersion, SecretKey, SignedBlock, StoredValue, Transaction, TransactionV1Builder, Transfer, URef, U512, }; -use juliet::{ - io::IoCoreBuilder, - protocol::ProtocolBuilder, - rpc::{JulietRpcClient, RpcBuilder}, - ChannelConfiguration, ChannelId, -}; +use futures::{SinkExt, StreamExt}; use rand::Rng; use tokio::{net::TcpStream, time::timeout}; -use tracing::error; +use tokio_util::codec::Framed; use crate::{ reactor::{main_reactor::MainReactor, Runner}, @@ -49,6 +45,7 @@ const GUARANTEED_BLOCK_HEIGHT: u64 = 2; const TEST_DICT_NAME: &str = "test_dict"; const TEST_DICT_ITEM_KEY: &str = "test_key"; +const MESSAGE_SIZE: u32 = 1024 * 1024 * 10; struct TestData { rng: TestRng, @@ -77,7 +74,7 @@ fn network_produced_blocks( } async fn setup() -> ( - JulietRpcClient<1>, + Framed, ( impl futures::Future>, TestRng)>, TestData, @@ -142,31 +139,14 @@ async fn setup() -> ( // We let the entire network run in the background, until our request completes. let finish_cranking = fixture.run_until_stopped(rng.create_child()); - // Set-up juliet client. - let protocol_builder = ProtocolBuilder::<1>::with_default_channel_config( - ChannelConfiguration::default() - .with_request_limit(10) - .with_max_request_payload_size(1024 * 1024 * 8) - .with_max_response_payload_size(1024 * 1024 * 8), - ); - let io_builder = IoCoreBuilder::new(protocol_builder).buffer_size(ChannelId::new(0), 4096); - let rpc_builder = RpcBuilder::new(io_builder); + // Set-up client. let address = format!("localhost:{}", binary_port_addr.port()); let stream = TcpStream::connect(address.clone()) .await .expect("should create stream"); - let (reader, writer) = stream.into_split(); - let (client, mut server) = rpc_builder.build(reader, writer); - - // We are not using the server functionality, but still need to run it for IO reasons. - tokio::spawn(async move { - if let Err(err) = server.next_request().await { - error!(%err, "server read error"); - } - }); ( - client, + Framed::new(stream, BinaryMessageCodec::new(MESSAGE_SIZE)), ( finish_cranking, TestData { @@ -295,7 +275,7 @@ async fn binary_port_component() { testing::init_logging(); let ( - client, + mut client, ( finish_cranking, TestData { @@ -381,19 +361,18 @@ async fn binary_port_component() { .cloned() .collect::>(); - let request_guard = client - .create_request(ChannelId::new(0)) - .with_payload(original_request_bytes.clone().into()) - .queue_for_sending() - .await; + client + .send(BinaryMessage::new(original_request_bytes.clone())) + .await + .expect("should send message"); - let response = timeout(Duration::from_secs(10), request_guard.wait_for_response()) + let response = timeout(Duration::from_secs(10), client.next()) .await .unwrap_or_else(|err| panic!("{}: should complete without timeout: {}", name, err)) - .unwrap_or_else(|err| panic!("{}: should have ok response: {}", name, err)) - .unwrap_or_else(|| panic!("{}: should have bytes", name)); + .unwrap_or_else(|| panic!("{}: should have bytes", name)) + .unwrap_or_else(|err| panic!("{}: should have ok response: {}", name, err)); let (binary_response_and_request, _): (BinaryResponseAndRequest, _) = - FromBytes::from_bytes(&response).expect("should deserialize response"); + FromBytes::from_bytes(response.payload()).expect("should deserialize response"); let mirrored_request_bytes = binary_response_and_request.original_request(); assert_eq!( diff --git a/resources/local/config.toml b/resources/local/config.toml index 07c4999116..ba6cf01bb5 100644 --- a/resources/local/config.toml +++ b/resources/local/config.toml @@ -306,11 +306,8 @@ allow_request_get_trie = false # Flag that enables the `TrySpeculativeExec` request. Disabled by default. allow_request_speculative_exec = false -# Maximum size of a request in bytes. -max_request_size_bytes = 4_194_304 - -# Maximum size of a response in bytes. -max_response_size_bytes = 4_194_304 +# Maximum size of a message in bytes. +max_message_size_bytes = 4_194_304 # Maximum number of in-flight requests. # This is based on measurements captured on a local instance of the sidecar, diff --git a/resources/production/config-example.toml b/resources/production/config-example.toml index c5ba95a957..062608ee6b 100644 --- a/resources/production/config-example.toml +++ b/resources/production/config-example.toml @@ -306,11 +306,8 @@ allow_request_get_trie = false # Flag that enables the `TrySpeculativeExec` request. Disabled by default. allow_request_speculative_exec = false -# Maximum size of a request in bytes. -max_request_size_bytes = 4_194_304 - -# Maximum size of a response in bytes. -max_response_size_bytes = 4_194_304 +# Maximum size of a message in bytes. +max_message_size_bytes = 4_194_304 # Maximum number of in-flight requests per client. client_request_limit = 3 diff --git a/storage/src/system/protocol_upgrade.rs b/storage/src/system/protocol_upgrade.rs index 900ee25566..f255476510 100644 --- a/storage/src/system/protocol_upgrade.rs +++ b/storage/src/system/protocol_upgrade.rs @@ -97,7 +97,7 @@ impl From for ProtocolUpgradeError { } } -/// Adrresses for system entities. +/// Addresses for system entities. pub struct SystemEntityAddresses { mint: AddressableEntityHash, auction: AddressableEntityHash,