From e610b866e562dc4160b0da7b8b953e9786b1899c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Tue, 23 Apr 2024 18:49:54 +0200 Subject: [PATCH 01/23] Add codec --- Cargo.lock | 2 ++ binary_port/Cargo.toml | 2 ++ binary_port/src/binary_request.rs | 60 ++++++++++++++++++++++++++++++- binary_port/src/error.rs | 15 ++++++++ binary_port/src/lib.rs | 1 + 5 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 binary_port/src/error.rs diff --git a/Cargo.lock b/Cargo.lock index 19f77648fb..34d4a93610 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -472,6 +472,7 @@ name = "casper-binary-port" version = "1.0.0" dependencies = [ "bincode", + "bytes", "casper-types", "once_cell", "rand", @@ -481,6 +482,7 @@ dependencies = [ "serde_json", "serde_test", "thiserror", + "tokio-util 0.6.10", ] [[package]] diff --git a/binary_port/Cargo.toml b/binary_port/Cargo.toml index d467bde88e..3ab9705f51 100644 --- a/binary_port/Cargo.toml +++ b/binary_port/Cargo.toml @@ -11,6 +11,7 @@ license = "Apache-2.0" exclude = ["proptest-regressions"] [dependencies] +bytes = "1.0.1" casper-types = { version = "5.0.0", path = "../types", features = ["datasize", "json-schema", "std"] } serde = { version = "1.0.183", features = ["derive"] } thiserror = "1.0.45" @@ -19,6 +20,7 @@ once_cell = { version = "1.5.2" } schemars = { version = "0.8.16", features = ["preserve_order", "impl_json_schema"] } bincode = "1.3.3" rand = "0.8.3" +tokio-util = { version = "0.6.4", features = ["codec"] } [dev-dependencies] casper-types = { path = "../types", features = ["datasize", "json-schema", "std", "testing"] } diff --git a/binary_port/src/binary_request.rs b/binary_port/src/binary_request.rs index 0441c53e6a..e02b6024c5 100644 --- a/binary_port/src/binary_request.rs +++ b/binary_port/src/binary_request.rs @@ -1,11 +1,13 @@ use core::convert::TryFrom; +use bytes::Buf; use casper_types::{ bytesrepr::{self, FromBytes, ToBytes}, ProtocolVersion, Transaction, }; +use tokio_util::codec; -use crate::get_request::GetRequest; +use crate::{error::Error, get_request::GetRequest}; #[cfg(test)] use casper_types::testing::TestRng; @@ -240,3 +242,59 @@ mod tests { assert_eq!(BinaryRequest::try_from((val.tag(), &bytes[..])), Ok(val)); } } + +// TODO[RC]: To dedicated file + +type LengthEncoding = u32; +const LENGTH_ENCODING_SIZE_BYTES: usize = std::mem::size_of::(); +// TODO: To config +const MAX_REQUEST_SIZE_BYTES: usize = 1024 * 1024; // 1MB + +pub struct BinaryPortMessage(Vec); + +pub struct BinaryPortCodec {} + +impl codec::Encoder for BinaryPortCodec { + type Error = Error; + + fn encode( + &mut self, + item: BinaryPortMessage, + dst: &mut bytes::BytesMut, + ) -> Result<(), Self::Error> { + let length = item.0.len() as LengthEncoding; + let length_bytes = length.to_le_bytes(); + dst.extend(length_bytes.iter().chain(item.0.iter())); + Ok(()) + } +} +impl codec::Decoder for BinaryPortCodec { + type Item = BinaryPortMessage; + + type Error = Error; + + fn decode(&mut self, src: &mut bytes::BytesMut) -> Result, Self::Error> { + if src.len() < LENGTH_ENCODING_SIZE_BYTES { + // Not enough bytes to read the length. + return Ok(None); + } + let length = LengthEncoding::from_le_bytes([src[0], src[1], src[2], src[3]]) as usize; + if length > MAX_REQUEST_SIZE_BYTES { + return Err(Error::RequestTooLarge { + allowed: MAX_REQUEST_SIZE_BYTES, + got: length, + }); + } + if length == 0 { + return Err(Error::EmptyRequest); + } + if src.len() < length + LENGTH_ENCODING_SIZE_BYTES { + // Not enough bytes to read the whole message. + return Ok(None); + } + + let payload = src[LENGTH_ENCODING_SIZE_BYTES..LENGTH_ENCODING_SIZE_BYTES + length].to_vec(); + src.advance(LENGTH_ENCODING_SIZE_BYTES + length); + Ok(Some(BinaryPortMessage(payload))) + } +} diff --git a/binary_port/src/error.rs b/binary_port/src/error.rs new file mode 100644 index 0000000000..2aa90a7f38 --- /dev/null +++ b/binary_port/src/error.rs @@ -0,0 +1,15 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("Invalid request tag ({0})")] + InvalidBinaryRequestTag(u8), + #[error("Request too large: allowed {allowed} bytes, got {got} bytes")] + RequestTooLarge { allowed: usize, got: usize }, + #[error("Empty request")] + EmptyRequest, + #[error(transparent)] + Io(#[from] std::io::Error), + #[error(transparent)] + BytesRepr(#[from] casper_types::bytesrepr::Error), +} \ No newline at end of file diff --git a/binary_port/src/lib.rs b/binary_port/src/lib.rs index c9014d5035..9daba78c2e 100644 --- a/binary_port/src/lib.rs +++ b/binary_port/src/lib.rs @@ -9,6 +9,7 @@ mod dictionary_item_identifier; mod error_code; mod get_request; mod global_state_query_result; +mod error; mod information_request; mod minimal_block_info; mod node_status; From 6462b6f430ad3568c78507847f540e2c64a03296 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Tue, 23 Apr 2024 20:22:36 +0200 Subject: [PATCH 02/23] Thread payload via Framed tokio transport --- Cargo.lock | 81 +------------------ binary_port/src/binary_request.rs | 4 +- binary_port/src/lib.rs | 6 +- node/Cargo.toml | 1 - node/src/components/binary_port.rs | 79 ++++++++---------- node/src/components/binary_port/config.rs | 8 +- node/src/components/binary_port/error.rs | 3 - .../reactor/main_reactor/tests/binary_port.rs | 6 -- 8 files changed, 51 insertions(+), 137 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 34d4a93610..c2ba56b1bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -176,12 +176,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" -[[package]] -name = "array-init" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d62b7694a562cdf5a74227903507c56ab2cc8bdd1f781ed5cb4cf9c9f810bfc" - [[package]] name = "assert-json-diff" version = "2.0.2" @@ -310,12 +304,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" -[[package]] -name = "bimap" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" - [[package]] name = "bincode" version = "1.3.3" @@ -427,26 +415,6 @@ version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" -[[package]] -name = "bytemuck" -version = "1.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "374d28ec25809ee0e23827c2ab573d729e293f281dfe393500e7ad618baa61c6" -dependencies = [ - "bytemuck_derive", -] - -[[package]] -name = "bytemuck_derive" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "965ab7eb5f8f97d2a083c799f3a1b994fc397b2fe2da5d1da1626ce15a39f2b1" -dependencies = [ - "proc-macro2 1.0.66", - "quote 1.0.32", - "syn 2.0.28", -] - [[package]] name = "byteorder" version = "1.4.3" @@ -588,7 +556,7 @@ dependencies = [ "serde", "serde_bytes", "serde_json", - "strum 0.24.1", + "strum", "tempfile", "thiserror", "tracing", @@ -631,7 +599,6 @@ dependencies = [ "humantime", "hyper", "itertools 0.10.5", - "juliet", "libc", "linked-hash-map", "lmdb-rkv", @@ -671,7 +638,7 @@ dependencies = [ "static_assertions", "stats_alloc", "structopt", - "strum 0.24.1", + "strum", "sys-info", "tempfile", "thiserror", @@ -762,7 +729,7 @@ dependencies = [ "serde_bytes", "serde_json", "serde_test", - "strum 0.24.1", + "strum", "tempfile", "thiserror", "tracing", @@ -3430,24 +3397,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "juliet" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4336a0d5e38193caafe774bd2be027cf5aa3c3e45b3f1bda1791fcacc9e9951d" -dependencies = [ - "array-init", - "bimap", - "bytemuck", - "bytes", - "futures", - "once_cell", - "strum 0.25.0", - "thiserror", - "tokio", - "tracing", -] - [[package]] name = "k256" version = "0.13.1" @@ -5438,16 +5387,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" dependencies = [ - "strum_macros 0.24.3", -] - -[[package]] -name = "strum" -version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" -dependencies = [ - "strum_macros 0.25.3", + "strum_macros", ] [[package]] @@ -5463,19 +5403,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "strum_macros" -version = "0.25.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" -dependencies = [ - "heck 0.4.1", - "proc-macro2 1.0.66", - "quote 1.0.32", - "rustversion", - "syn 2.0.28", -] - [[package]] name = "subtle" version = "2.4.1" diff --git a/binary_port/src/binary_request.rs b/binary_port/src/binary_request.rs index e02b6024c5..c4a35a9908 100644 --- a/binary_port/src/binary_request.rs +++ b/binary_port/src/binary_request.rs @@ -250,7 +250,8 @@ const LENGTH_ENCODING_SIZE_BYTES: usize = std::mem::size_of::(); // TODO: To config const MAX_REQUEST_SIZE_BYTES: usize = 1024 * 1024; // 1MB -pub struct BinaryPortMessage(Vec); +// TODO[RC]: Not pub +pub struct BinaryPortMessage(pub Vec); pub struct BinaryPortCodec {} @@ -268,6 +269,7 @@ impl codec::Encoder for BinaryPortCodec { Ok(()) } } + impl codec::Decoder for BinaryPortCodec { type Item = BinaryPortMessage; diff --git a/binary_port/src/lib.rs b/binary_port/src/lib.rs index 9daba78c2e..eae0f71f5f 100644 --- a/binary_port/src/lib.rs +++ b/binary_port/src/lib.rs @@ -6,10 +6,10 @@ mod binary_response; mod binary_response_and_request; mod binary_response_header; mod dictionary_item_identifier; +mod error; mod error_code; mod get_request; mod global_state_query_result; -mod error; mod information_request; mod minimal_block_info; mod node_status; @@ -21,7 +21,9 @@ mod state_request; mod type_wrappers; pub use balance_response::BalanceResponse; -pub use binary_request::{BinaryRequest, BinaryRequestHeader, BinaryRequestTag}; +pub use binary_request::{ + BinaryPortCodec, BinaryPortMessage, BinaryRequest, BinaryRequestHeader, BinaryRequestTag, +}; pub use binary_response::BinaryResponse; pub use binary_response_and_request::BinaryResponseAndRequest; pub use binary_response_header::BinaryResponseHeader; diff --git a/node/Cargo.toml b/node/Cargo.toml index 850ca8424a..1c8455b662 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -13,7 +13,6 @@ default-run = "casper-node" exclude = ["proptest-regressions"] [dependencies] -juliet = { version ="0.3", features = ["tracing"] } ansi_term = "0.12.1" anyhow = "1" aquamarine = "0.1.12" diff --git a/node/src/components/binary_port.rs b/node/src/components/binary_port.rs index e695441873..62e94a4f17 100644 --- a/node/src/components/binary_port.rs +++ b/node/src/components/binary_port.rs @@ -8,13 +8,18 @@ mod tests; use std::{convert::TryFrom, net::SocketAddr, sync::Arc}; +// TODO[RC]: Merge these three +use futures::TryStreamExt; +use futures::{stream::StreamExt, SinkExt}; +use tokio::{net::TcpListener, sync::Semaphore}; + use bytes::Bytes; use casper_binary_port::{ - BalanceResponse, BinaryRequest, BinaryRequestHeader, BinaryRequestTag, BinaryResponse, - BinaryResponseAndRequest, DictionaryItemIdentifier, DictionaryQueryResult, ErrorCode, - GetRequest, GetTrieFullResult, GlobalStateQueryResult, GlobalStateRequest, InformationRequest, - InformationRequestTag, NodeStatus, PayloadType, PurseIdentifier, ReactorStateName, RecordId, - TransactionWithExecutionInfo, + BalanceResponse, BinaryPortCodec, BinaryPortMessage, BinaryRequest, BinaryRequestHeader, + BinaryRequestTag, BinaryResponse, BinaryResponseAndRequest, DictionaryItemIdentifier, + DictionaryQueryResult, ErrorCode, GetRequest, GetTrieFullResult, GlobalStateQueryResult, + GlobalStateRequest, InformationRequest, InformationRequestTag, NodeStatus, PayloadType, + PurseIdentifier, ReactorStateName, RecordId, TransactionWithExecutionInfo, }; use casper_storage::{ data_access_layer::{ @@ -34,20 +39,15 @@ use casper_types::{ use datasize::DataSize; use futures::{future::BoxFuture, FutureExt}; -use juliet::{ - io::IoCoreBuilder, - protocol::ProtocolBuilder, - rpc::{JulietRpcServer, RpcBuilder}, - ChannelConfiguration, ChannelId, -}; use once_cell::sync::OnceCell; use prometheus::Registry; use tokio::{ io::{AsyncRead, AsyncWrite}, join, - net::{TcpListener, TcpStream}, - sync::{Notify, OwnedSemaphorePermit, Semaphore}, + net::TcpStream, + sync::{Notify, OwnedSemaphorePermit}, }; +use tokio_util::codec::Framed; use tracing::{debug, error, info, warn}; use crate::{ @@ -821,13 +821,11 @@ where } } -async fn client_loop( - mut server: JulietRpcServer, +async fn client_loop( + stream: TcpStream, effect_builder: EffectBuilder, ) -> Result<(), Error> where - R: AsyncRead + Unpin, - W: AsyncWrite + Unpin, REv: From + From + From @@ -840,20 +838,30 @@ where + From + Send, { + let mut framed = Framed::new(stream, BinaryPortCodec {}); + loop { - let Some(incoming_request) = server.next_request().await? else { - debug!("remote party closed the connection"); - return Ok(()); - }; + // TODO[RC]: Fix unwrap + let next_message = framed.next().await.unwrap().unwrap(); + let payload = next_message.0; + // let Some(incoming_request) = .unwrap().unwrap() else { + // debug!("remote party closed the connection"); + // return Ok(()); + // }; - let Some(payload) = incoming_request.payload() else { + if payload.is_empty() { return Err(Error::NoPayload); }; let version = effect_builder.get_protocol_version().await; - let resp = handle_payload(effect_builder, payload, version).await; - let resp_and_payload = BinaryResponseAndRequest::new(resp, payload); - incoming_request.respond(Some(Bytes::from(ToBytes::to_bytes(&resp_and_payload)?))) + let resp = handle_payload(effect_builder, &payload, version).await; + let resp_and_payload = BinaryResponseAndRequest::new(resp, &payload); + let response_payload = BinaryPortMessage(resp_and_payload.to_bytes().unwrap()); + framed + .send(response_payload) + .await + // TODO[RC]: Fix unwrap() + .unwrap(); } } @@ -895,7 +903,7 @@ where async fn handle_client( addr: SocketAddr, - mut client: TcpStream, + stream: TcpStream, effect_builder: EffectBuilder, config: Arc, _permit: OwnedSemaphorePermit, @@ -912,12 +920,7 @@ async fn handle_client( + From + Send, { - let (reader, writer) = client.split(); - // We are a server, we won't make any requests of our own, but we need to keep the client - // around, since dropping the client will trigger a server shutdown. - let (_client, server) = new_rpc_builder(&config).build(reader, writer); - - if let Err(err) = client_loop(server, effect_builder).await { + if let Err(err) = client_loop(stream, effect_builder).await { // Low severity is used to prevent malicious clients from causing log floods. info!(%addr, err=display_error(&err), "binary port client handler error"); } @@ -997,18 +1000,6 @@ impl Finalize for BinaryPort { } } -fn new_rpc_builder(config: &Config) -> RpcBuilder<1> { - let protocol_builder = ProtocolBuilder::<1>::with_default_channel_config( - ChannelConfiguration::default() - .with_request_limit(config.client_request_limit) - .with_max_request_payload_size(config.max_request_size_bytes) - .with_max_response_payload_size(config.max_response_size_bytes), - ); - let io_builder = IoCoreBuilder::new(protocol_builder) - .buffer_size(ChannelId::new(0), config.client_request_buffer_size); - RpcBuilder::new(io_builder) -} - async fn resolve_block_header( effect_builder: EffectBuilder, block_identifier: Option, diff --git a/node/src/components/binary_port/config.rs b/node/src/components/binary_port/config.rs index be5e904be6..0626baa2da 100644 --- a/node/src/components/binary_port/config.rs +++ b/node/src/components/binary_port/config.rs @@ -40,8 +40,9 @@ pub struct Config { pub client_request_buffer_size: usize, /// Maximum number of connections to the server. pub max_connections: usize, - /// Gas hold handling - pub gas_hold_handling: HoldBalanceHandling, + // Gas hold handling + // TODO[RC]: Temporarily removed + // pub gas_hold_handling: HoldBalanceHandling, } impl Config { @@ -58,7 +59,8 @@ impl Config { max_response_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE, client_request_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE, max_connections: DEFAULT_MAX_CONNECTIONS, - gas_hold_handling: DEFAULT_GAS_HOLD_BALANCE_HANDLING, + // TODO[RC]: Temporarily removed + //gas_hold_handling: DEFAULT_GAS_HOLD_BALANCE_HANDLING, } } } diff --git a/node/src/components/binary_port/error.rs b/node/src/components/binary_port/error.rs index a9ad34bfd8..dac2c7681b 100644 --- a/node/src/components/binary_port/error.rs +++ b/node/src/components/binary_port/error.rs @@ -1,5 +1,4 @@ use casper_types::bytesrepr; -use juliet::rpc::RpcServerError; use thiserror::Error; #[derive(Debug, Error)] @@ -8,6 +7,4 @@ pub(crate) enum Error { BytesRepr(#[from] bytesrepr::Error), #[error("received request without payload")] NoPayload, - #[error(transparent)] - RpcServer(#[from] RpcServerError), } diff --git a/node/src/reactor/main_reactor/tests/binary_port.rs b/node/src/reactor/main_reactor/tests/binary_port.rs index cb75b8bb7e..a97a4ece3c 100644 --- a/node/src/reactor/main_reactor/tests/binary_port.rs +++ b/node/src/reactor/main_reactor/tests/binary_port.rs @@ -25,12 +25,6 @@ use casper_types::{ EntityAddr, GlobalStateIdentifier, Key, KeyTag, NextUpgrade, Peers, ProtocolVersion, SecretKey, SignedBlock, StoredValue, Timestamp, Transaction, TransactionV1Builder, Transfer, URef, U512, }; -use juliet::{ - io::IoCoreBuilder, - protocol::ProtocolBuilder, - rpc::{JulietRpcClient, RpcBuilder}, - ChannelConfiguration, ChannelId, -}; use rand::Rng; use tokio::{net::TcpStream, time::timeout}; use tracing::error; From 1757d72ad9ec24f1b2d432791afd5e9ed9bbb32c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 24 Apr 2024 08:31:41 +0200 Subject: [PATCH 03/23] Temporary fix for 'gas_hold_handling' --- node/src/components/binary_port/config.rs | 8 +++----- resources/local/config.toml | 2 ++ 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/node/src/components/binary_port/config.rs b/node/src/components/binary_port/config.rs index 0626baa2da..be5e904be6 100644 --- a/node/src/components/binary_port/config.rs +++ b/node/src/components/binary_port/config.rs @@ -40,9 +40,8 @@ pub struct Config { pub client_request_buffer_size: usize, /// Maximum number of connections to the server. pub max_connections: usize, - // Gas hold handling - // TODO[RC]: Temporarily removed - // pub gas_hold_handling: HoldBalanceHandling, + /// Gas hold handling + pub gas_hold_handling: HoldBalanceHandling, } impl Config { @@ -59,8 +58,7 @@ impl Config { max_response_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE, client_request_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE, max_connections: DEFAULT_MAX_CONNECTIONS, - // TODO[RC]: Temporarily removed - //gas_hold_handling: DEFAULT_GAS_HOLD_BALANCE_HANDLING, + gas_hold_handling: DEFAULT_GAS_HOLD_BALANCE_HANDLING, } } } diff --git a/resources/local/config.toml b/resources/local/config.toml index 2c89176c61..63b8d7987d 100644 --- a/resources/local/config.toml +++ b/resources/local/config.toml @@ -323,6 +323,8 @@ client_request_buffer_size = 20 # Maximum number of connections to the server. max_connections = 16 +gas_hold_handling = { type = 'accrued' } + # ============================================== # Configuration options for the REST HTTP server # ============================================== From aca2f566afb8b6e311ef0977f4105cfdebcd99ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 24 Apr 2024 08:33:59 +0200 Subject: [PATCH 04/23] Move 'BinaryMessage' to dedicated file --- binary_port/src/binary_message.rs | 69 ++++++++++++++++++++++++++++++ binary_port/src/binary_request.rs | 62 +-------------------------- binary_port/src/lib.rs | 6 +-- node/src/components/binary_port.rs | 8 ++-- 4 files changed, 77 insertions(+), 68 deletions(-) create mode 100644 binary_port/src/binary_message.rs diff --git a/binary_port/src/binary_message.rs b/binary_port/src/binary_message.rs new file mode 100644 index 0000000000..8ef08a72d1 --- /dev/null +++ b/binary_port/src/binary_message.rs @@ -0,0 +1,69 @@ +use bytes::Buf; +use tokio_util::codec; + +use crate::error::Error; + +type LengthEncoding = u32; +const LENGTH_ENCODING_SIZE_BYTES: usize = std::mem::size_of::(); +// TODO[RC]: To config +const MAX_REQUEST_SIZE_BYTES: usize = 1024 * 1024; // 1MB + +pub struct BinaryMessage(Vec); + +impl BinaryMessage { + pub fn new(payload: Vec) -> Self { + BinaryMessage(payload) + } + + pub fn payload(&self) -> &[u8] { + &self.0 + } +} + +pub struct BinaryMessageCodec {} + +impl codec::Encoder for BinaryMessageCodec { + type Error = Error; + + fn encode( + &mut self, + item: BinaryMessage, + dst: &mut bytes::BytesMut, + ) -> Result<(), Self::Error> { + let length = item.0.len() as LengthEncoding; + let length_bytes = length.to_le_bytes(); + dst.extend(length_bytes.iter().chain(item.0.iter())); + Ok(()) + } +} + +impl codec::Decoder for BinaryMessageCodec { + type Item = BinaryMessage; + + type Error = Error; + + fn decode(&mut self, src: &mut bytes::BytesMut) -> Result, Self::Error> { + if src.len() < LENGTH_ENCODING_SIZE_BYTES { + // Not enough bytes to read the length. + return Ok(None); + } + let length = LengthEncoding::from_le_bytes([src[0], src[1], src[2], src[3]]) as usize; + if length > MAX_REQUEST_SIZE_BYTES { + return Err(Error::RequestTooLarge { + allowed: MAX_REQUEST_SIZE_BYTES, + got: length, + }); + } + if length == 0 { + return Err(Error::EmptyRequest); + } + if src.len() < length + LENGTH_ENCODING_SIZE_BYTES { + // Not enough bytes to read the whole message. + return Ok(None); + } + + let payload = src[LENGTH_ENCODING_SIZE_BYTES..LENGTH_ENCODING_SIZE_BYTES + length].to_vec(); + src.advance(LENGTH_ENCODING_SIZE_BYTES + length); + Ok(Some(BinaryMessage(payload))) + } +} diff --git a/binary_port/src/binary_request.rs b/binary_port/src/binary_request.rs index c4a35a9908..0441c53e6a 100644 --- a/binary_port/src/binary_request.rs +++ b/binary_port/src/binary_request.rs @@ -1,13 +1,11 @@ use core::convert::TryFrom; -use bytes::Buf; use casper_types::{ bytesrepr::{self, FromBytes, ToBytes}, ProtocolVersion, Transaction, }; -use tokio_util::codec; -use crate::{error::Error, get_request::GetRequest}; +use crate::get_request::GetRequest; #[cfg(test)] use casper_types::testing::TestRng; @@ -242,61 +240,3 @@ mod tests { assert_eq!(BinaryRequest::try_from((val.tag(), &bytes[..])), Ok(val)); } } - -// TODO[RC]: To dedicated file - -type LengthEncoding = u32; -const LENGTH_ENCODING_SIZE_BYTES: usize = std::mem::size_of::(); -// TODO: To config -const MAX_REQUEST_SIZE_BYTES: usize = 1024 * 1024; // 1MB - -// TODO[RC]: Not pub -pub struct BinaryPortMessage(pub Vec); - -pub struct BinaryPortCodec {} - -impl codec::Encoder for BinaryPortCodec { - type Error = Error; - - fn encode( - &mut self, - item: BinaryPortMessage, - dst: &mut bytes::BytesMut, - ) -> Result<(), Self::Error> { - let length = item.0.len() as LengthEncoding; - let length_bytes = length.to_le_bytes(); - dst.extend(length_bytes.iter().chain(item.0.iter())); - Ok(()) - } -} - -impl codec::Decoder for BinaryPortCodec { - type Item = BinaryPortMessage; - - type Error = Error; - - fn decode(&mut self, src: &mut bytes::BytesMut) -> Result, Self::Error> { - if src.len() < LENGTH_ENCODING_SIZE_BYTES { - // Not enough bytes to read the length. - return Ok(None); - } - let length = LengthEncoding::from_le_bytes([src[0], src[1], src[2], src[3]]) as usize; - if length > MAX_REQUEST_SIZE_BYTES { - return Err(Error::RequestTooLarge { - allowed: MAX_REQUEST_SIZE_BYTES, - got: length, - }); - } - if length == 0 { - return Err(Error::EmptyRequest); - } - if src.len() < length + LENGTH_ENCODING_SIZE_BYTES { - // Not enough bytes to read the whole message. - return Ok(None); - } - - let payload = src[LENGTH_ENCODING_SIZE_BYTES..LENGTH_ENCODING_SIZE_BYTES + length].to_vec(); - src.advance(LENGTH_ENCODING_SIZE_BYTES + length); - Ok(Some(BinaryPortMessage(payload))) - } -} diff --git a/binary_port/src/lib.rs b/binary_port/src/lib.rs index eae0f71f5f..367b0ef742 100644 --- a/binary_port/src/lib.rs +++ b/binary_port/src/lib.rs @@ -1,6 +1,7 @@ //! A Rust library for types used by the binary port of a casper node. mod balance_response; +mod binary_message; mod binary_request; mod binary_response; mod binary_response_and_request; @@ -21,9 +22,8 @@ mod state_request; mod type_wrappers; pub use balance_response::BalanceResponse; -pub use binary_request::{ - BinaryPortCodec, BinaryPortMessage, BinaryRequest, BinaryRequestHeader, BinaryRequestTag, -}; +pub use binary_message::{BinaryMessage, BinaryMessageCodec}; +pub use binary_request::{BinaryRequest, BinaryRequestHeader, BinaryRequestTag}; pub use binary_response::BinaryResponse; pub use binary_response_and_request::BinaryResponseAndRequest; pub use binary_response_header::BinaryResponseHeader; diff --git a/node/src/components/binary_port.rs b/node/src/components/binary_port.rs index 62e94a4f17..867970d897 100644 --- a/node/src/components/binary_port.rs +++ b/node/src/components/binary_port.rs @@ -15,7 +15,7 @@ use tokio::{net::TcpListener, sync::Semaphore}; use bytes::Bytes; use casper_binary_port::{ - BalanceResponse, BinaryPortCodec, BinaryPortMessage, BinaryRequest, BinaryRequestHeader, + BalanceResponse, BinaryMessage, BinaryMessageCodec, BinaryRequest, BinaryRequestHeader, BinaryRequestTag, BinaryResponse, BinaryResponseAndRequest, DictionaryItemIdentifier, DictionaryQueryResult, ErrorCode, GetRequest, GetTrieFullResult, GlobalStateQueryResult, GlobalStateRequest, InformationRequest, InformationRequestTag, NodeStatus, PayloadType, @@ -838,12 +838,12 @@ where + From + Send, { - let mut framed = Framed::new(stream, BinaryPortCodec {}); + let mut framed = Framed::new(stream, BinaryMessageCodec {}); loop { // TODO[RC]: Fix unwrap let next_message = framed.next().await.unwrap().unwrap(); - let payload = next_message.0; + let payload = next_message.payload(); // let Some(incoming_request) = .unwrap().unwrap() else { // debug!("remote party closed the connection"); // return Ok(()); @@ -856,7 +856,7 @@ where let version = effect_builder.get_protocol_version().await; let resp = handle_payload(effect_builder, &payload, version).await; let resp_and_payload = BinaryResponseAndRequest::new(resp, &payload); - let response_payload = BinaryPortMessage(resp_and_payload.to_bytes().unwrap()); + let response_payload = BinaryMessage::new(resp_and_payload.to_bytes().unwrap()); framed .send(response_payload) .await From a36f231ed9b156d84c9955d388c28b5aec416c44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 24 Apr 2024 08:42:15 +0200 Subject: [PATCH 05/23] Add unit tests for the binary port codec --- binary_port/src/binary_message.rs | 116 +++++++++++++++++++++++++++++- 1 file changed, 115 insertions(+), 1 deletion(-) diff --git a/binary_port/src/binary_message.rs b/binary_port/src/binary_message.rs index 8ef08a72d1..4a2dbc8487 100644 --- a/binary_port/src/binary_message.rs +++ b/binary_port/src/binary_message.rs @@ -1,5 +1,10 @@ +#[cfg(test)] +use casper_types::testing::TestRng; +#[cfg(test)] +use rand::Rng; + use bytes::Buf; -use tokio_util::codec; +use tokio_util::codec::{self}; use crate::error::Error; @@ -8,6 +13,7 @@ const LENGTH_ENCODING_SIZE_BYTES: usize = std::mem::size_of::(); // TODO[RC]: To config const MAX_REQUEST_SIZE_BYTES: usize = 1024 * 1024; // 1MB +#[derive(Clone, PartialEq, Debug)] pub struct BinaryMessage(Vec); impl BinaryMessage { @@ -18,6 +24,13 @@ impl BinaryMessage { pub fn payload(&self) -> &[u8] { &self.0 } + + #[cfg(test)] + pub(crate) fn random(rng: &mut TestRng) -> Self { + let len = rng.gen_range(1..=1024); + let payload = std::iter::repeat_with(|| rng.gen()).take(len).collect(); + BinaryMessage(payload) + } } pub struct BinaryMessageCodec {} @@ -67,3 +80,104 @@ impl codec::Decoder for BinaryMessageCodec { Ok(Some(BinaryMessage(payload))) } } + +#[cfg(test)] +mod tests { + use casper_types::testing::TestRng; + use tokio_util::codec::{Decoder, Encoder}; + + use crate::{ + binary_message::{LengthEncoding, LENGTH_ENCODING_SIZE_BYTES, MAX_REQUEST_SIZE_BYTES}, + error::Error, + BinaryMessage, BinaryMessageCodec, + }; + + #[test] + fn binary_message_codec() { + let rng = &mut TestRng::new(); + let val = BinaryMessage::random(rng); + let mut codec = BinaryMessageCodec {}; + let mut bytes = bytes::BytesMut::new(); + codec + .encode(val.clone(), &mut bytes) + .expect("should encode"); + + let decoded = codec + .decode(&mut bytes) + .expect("should decode") + .expect("should be Some"); + + assert_eq!(val, decoded); + } + + #[test] + fn should_not_decode_when_not_enough_bytes_to_decode_length() { + let rng = &mut TestRng::new(); + let val = BinaryMessage::random(rng); + let mut codec = BinaryMessageCodec {}; + let mut bytes = bytes::BytesMut::new(); + codec.encode(val, &mut bytes).expect("should encode"); + + let _ = bytes.split_off(LENGTH_ENCODING_SIZE_BYTES / 2); + let in_bytes = bytes.clone(); + assert!(codec.decode(&mut bytes).expect("should decode").is_none()); + + // Ensure that the bytes are not consumed. + assert_eq!(in_bytes, bytes); + } + + #[test] + fn should_not_decode_when_not_enough_bytes_to_decode_full_frame() { + let rng = &mut TestRng::new(); + let val = BinaryMessage::random(rng); + let mut codec = BinaryMessageCodec {}; + let mut bytes = bytes::BytesMut::new(); + codec.encode(val, &mut bytes).expect("should encode"); + + let _ = bytes.split_off(bytes.len() - 1); + let in_bytes = bytes.clone(); + assert!(codec.decode(&mut bytes).expect("should decode").is_none()); + + // Ensure that the bytes are not consumed. + assert_eq!(in_bytes, bytes); + } + + #[test] + fn should_leave_remainder_in_buffer() { + let rng = &mut TestRng::new(); + let val = BinaryMessage::random(rng); + let mut codec = BinaryMessageCodec {}; + let mut bytes = bytes::BytesMut::new(); + codec.encode(val, &mut bytes).expect("should encode"); + let suffix = bytes::Bytes::from_static(b"suffix"); + bytes.extend(&suffix); + + let _ = codec.decode(&mut bytes); + + // Ensure that the bytes are not consumed. + assert_eq!(bytes, suffix); + } + + #[test] + fn should_bail_on_too_large_request() { + let mut codec = BinaryMessageCodec {}; + let mut bytes = bytes::BytesMut::new(); + let too_large = (MAX_REQUEST_SIZE_BYTES + 1) as LengthEncoding; + bytes.extend(&too_large.to_le_bytes()); + + let result = codec.decode(&mut bytes).unwrap_err(); + assert!(matches!(result, Error::RequestTooLarge { allowed, got } + if allowed == MAX_REQUEST_SIZE_BYTES && got == too_large as usize)); + } + + #[test] + fn should_bail_on_empty_request() { + let mut codec = BinaryMessageCodec {}; + let mut bytes = bytes::BytesMut::new(); + let empty = 0 as LengthEncoding; + bytes.extend(&empty.to_le_bytes()); + + let result = codec.decode(&mut bytes).unwrap_err(); + assert!(matches!(result, Error::EmptyRequest)); + } +} From afc37764c9b9643f26e29b7e9051014d8e72972b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 24 Apr 2024 09:14:42 +0200 Subject: [PATCH 06/23] Remove juliet-related parameters from config --- binary_port/src/binary_message.rs | 44 +++++++++++++++-------- binary_port/src/error.rs | 4 +-- node/src/components/binary_port.rs | 5 +-- node/src/components/binary_port/config.rs | 13 +++---- node/src/components/binary_port/tests.rs | 3 +- resources/local/config.toml | 7 ++-- resources/production/config-example.toml | 7 ++-- 7 files changed, 44 insertions(+), 39 deletions(-) diff --git a/binary_port/src/binary_message.rs b/binary_port/src/binary_message.rs index 4a2dbc8487..aa7ef9e1b0 100644 --- a/binary_port/src/binary_message.rs +++ b/binary_port/src/binary_message.rs @@ -10,8 +10,6 @@ use crate::error::Error; type LengthEncoding = u32; const LENGTH_ENCODING_SIZE_BYTES: usize = std::mem::size_of::(); -// TODO[RC]: To config -const MAX_REQUEST_SIZE_BYTES: usize = 1024 * 1024; // 1MB #[derive(Clone, PartialEq, Debug)] pub struct BinaryMessage(Vec); @@ -33,7 +31,21 @@ impl BinaryMessage { } } -pub struct BinaryMessageCodec {} +pub struct BinaryMessageCodec { + max_message_size_bytes: u32, +} + +impl BinaryMessageCodec { + pub fn new(max_message_size_bytes: u32) -> Self { + Self { + max_message_size_bytes, + } + } + + pub fn max_message_size_bytes(&self) -> u32 { + self.max_message_size_bytes + } +} impl codec::Encoder for BinaryMessageCodec { type Error = Error; @@ -61,10 +73,10 @@ impl codec::Decoder for BinaryMessageCodec { return Ok(None); } let length = LengthEncoding::from_le_bytes([src[0], src[1], src[2], src[3]]) as usize; - if length > MAX_REQUEST_SIZE_BYTES { + if length > self.max_message_size_bytes as usize { return Err(Error::RequestTooLarge { - allowed: MAX_REQUEST_SIZE_BYTES, - got: length, + allowed: self.max_message_size_bytes, + got: length as u32, }); } if length == 0 { @@ -87,16 +99,18 @@ mod tests { use tokio_util::codec::{Decoder, Encoder}; use crate::{ - binary_message::{LengthEncoding, LENGTH_ENCODING_SIZE_BYTES, MAX_REQUEST_SIZE_BYTES}, + binary_message::{LengthEncoding, LENGTH_ENCODING_SIZE_BYTES}, error::Error, BinaryMessage, BinaryMessageCodec, }; + const MAX_MESSAGE_SIZE_BYTES: u32 = 1024 * 104 * 10; + #[test] fn binary_message_codec() { let rng = &mut TestRng::new(); let val = BinaryMessage::random(rng); - let mut codec = BinaryMessageCodec {}; + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); let mut bytes = bytes::BytesMut::new(); codec .encode(val.clone(), &mut bytes) @@ -114,7 +128,7 @@ mod tests { fn should_not_decode_when_not_enough_bytes_to_decode_length() { let rng = &mut TestRng::new(); let val = BinaryMessage::random(rng); - let mut codec = BinaryMessageCodec {}; + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); let mut bytes = bytes::BytesMut::new(); codec.encode(val, &mut bytes).expect("should encode"); @@ -130,7 +144,7 @@ mod tests { fn should_not_decode_when_not_enough_bytes_to_decode_full_frame() { let rng = &mut TestRng::new(); let val = BinaryMessage::random(rng); - let mut codec = BinaryMessageCodec {}; + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); let mut bytes = bytes::BytesMut::new(); codec.encode(val, &mut bytes).expect("should encode"); @@ -146,7 +160,7 @@ mod tests { fn should_leave_remainder_in_buffer() { let rng = &mut TestRng::new(); let val = BinaryMessage::random(rng); - let mut codec = BinaryMessageCodec {}; + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); let mut bytes = bytes::BytesMut::new(); codec.encode(val, &mut bytes).expect("should encode"); let suffix = bytes::Bytes::from_static(b"suffix"); @@ -160,19 +174,19 @@ mod tests { #[test] fn should_bail_on_too_large_request() { - let mut codec = BinaryMessageCodec {}; + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); let mut bytes = bytes::BytesMut::new(); - let too_large = (MAX_REQUEST_SIZE_BYTES + 1) as LengthEncoding; + let too_large = (codec.max_message_size_bytes + 1) as LengthEncoding; bytes.extend(&too_large.to_le_bytes()); let result = codec.decode(&mut bytes).unwrap_err(); assert!(matches!(result, Error::RequestTooLarge { allowed, got } - if allowed == MAX_REQUEST_SIZE_BYTES && got == too_large as usize)); + if allowed == codec.max_message_size_bytes && got == too_large)); } #[test] fn should_bail_on_empty_request() { - let mut codec = BinaryMessageCodec {}; + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); let mut bytes = bytes::BytesMut::new(); let empty = 0 as LengthEncoding; bytes.extend(&empty.to_le_bytes()); diff --git a/binary_port/src/error.rs b/binary_port/src/error.rs index 2aa90a7f38..44202df8b1 100644 --- a/binary_port/src/error.rs +++ b/binary_port/src/error.rs @@ -5,11 +5,11 @@ pub enum Error { #[error("Invalid request tag ({0})")] InvalidBinaryRequestTag(u8), #[error("Request too large: allowed {allowed} bytes, got {got} bytes")] - RequestTooLarge { allowed: usize, got: usize }, + RequestTooLarge { allowed: u32, got: u32 }, #[error("Empty request")] EmptyRequest, #[error(transparent)] Io(#[from] std::io::Error), #[error(transparent)] BytesRepr(#[from] casper_types::bytesrepr::Error), -} \ No newline at end of file +} diff --git a/node/src/components/binary_port.rs b/node/src/components/binary_port.rs index 867970d897..9961111e71 100644 --- a/node/src/components/binary_port.rs +++ b/node/src/components/binary_port.rs @@ -824,6 +824,7 @@ where async fn client_loop( stream: TcpStream, effect_builder: EffectBuilder, + max_message_size_bytes: u32, ) -> Result<(), Error> where REv: From @@ -838,7 +839,7 @@ where + From + Send, { - let mut framed = Framed::new(stream, BinaryMessageCodec {}); + let mut framed = Framed::new(stream, BinaryMessageCodec::new(max_message_size_bytes)); loop { // TODO[RC]: Fix unwrap @@ -920,7 +921,7 @@ async fn handle_client( + From + Send, { - if let Err(err) = client_loop(stream, effect_builder).await { + if let Err(err) = client_loop(stream, effect_builder, config.max_message_size_bytes).await { // Low severity is used to prevent malicious clients from causing log floods. info!(%addr, err=display_error(&err), "binary port client handler error"); } diff --git a/node/src/components/binary_port/config.rs b/node/src/components/binary_port/config.rs index be5e904be6..c55bf78257 100644 --- a/node/src/components/binary_port/config.rs +++ b/node/src/components/binary_port/config.rs @@ -4,8 +4,8 @@ use serde::{Deserialize, Serialize}; /// Uses a fixed port per node, but binds on any interface. const DEFAULT_ADDRESS: &str = "0.0.0.0:0"; -/// Default maximum payload size. -const DEFAULT_MAX_PAYLOAD_SIZE: u32 = 4 * 1024 * 1024; +/// Default maximum message size. +const DEFAULT_MAX_MESSAGE_SIZE: u32 = 4 * 1024 * 1024; /// Default request limit. const DEFAULT_CLIENT_REQUEST_LIMIT: u16 = 3; /// Default request buffer size. @@ -30,10 +30,8 @@ pub struct Config { pub allow_request_get_trie: bool, /// Flag used to enable/disable the [`TrySpeculativeExec`] request. pub allow_request_speculative_exec: bool, - /// Maximum size of a request in bytes. - pub max_request_size_bytes: u32, - /// Maximum size of a response in bytes. - pub max_response_size_bytes: u32, + /// Maximum size of the binary port message. + pub max_message_size_bytes: u32, /// Maximum number of in-flight requests per client. pub client_request_limit: u16, /// Number of requests that can be buffered per client. @@ -54,8 +52,7 @@ impl Config { allow_request_get_trie: false, allow_request_speculative_exec: false, client_request_limit: DEFAULT_CLIENT_REQUEST_LIMIT, - max_request_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE, - max_response_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE, + max_message_size_bytes: DEFAULT_MAX_MESSAGE_SIZE, client_request_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE, max_connections: DEFAULT_MAX_CONNECTIONS, gas_hold_handling: DEFAULT_GAS_HOLD_BALANCE_HANDLING, diff --git a/node/src/components/binary_port/tests.rs b/node/src/components/binary_port/tests.rs index 8ad50d5dd9..9271493b30 100644 --- a/node/src/components/binary_port/tests.rs +++ b/node/src/components/binary_port/tests.rs @@ -163,8 +163,7 @@ async fn run_test_case( allow_request_get_all_values, allow_request_get_trie, allow_request_speculative_exec, - max_request_size_bytes: 1024, - max_response_size_bytes: 1024, + max_message_size_bytes: 1024, client_request_limit: 2, client_request_buffer_size: 16, max_connections: 2, diff --git a/resources/local/config.toml b/resources/local/config.toml index 63b8d7987d..307fa9b82c 100644 --- a/resources/local/config.toml +++ b/resources/local/config.toml @@ -306,11 +306,8 @@ allow_request_get_trie = false # Flag that enables the `TrySpeculativeExec` request. Disabled by default. allow_request_speculative_exec = false -# Maximum size of a request in bytes. -max_request_size_bytes = 4_194_304 - -# Maximum size of a response in bytes. -max_response_size_bytes = 4_194_304 +# Maximum size of a message in bytes. +max_message_size_bytes = 4_194_304 # Maximum number of in-flight requests. # This is based on measurements captured on a local instance of the sidecar, diff --git a/resources/production/config-example.toml b/resources/production/config-example.toml index 6f88125388..7e2b7ddf6c 100644 --- a/resources/production/config-example.toml +++ b/resources/production/config-example.toml @@ -306,11 +306,8 @@ allow_request_get_trie = false # Flag that enables the `TrySpeculativeExec` request. Disabled by default. allow_request_speculative_exec = false -# Maximum size of a request in bytes. -max_request_size_bytes = 4_194_304 - -# Maximum size of a response in bytes. -max_response_size_bytes = 4_194_304 +# Maximum size of a message in bytes. +max_message_size_bytes = 4_194_304 # Maximum number of in-flight requests per client. client_request_limit = 3 From d25a94cce918bb615c3c1fbe2e8393eeba99490f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 24 Apr 2024 09:17:02 +0200 Subject: [PATCH 07/23] Clean-up imports --- node/src/components/binary_port.rs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/node/src/components/binary_port.rs b/node/src/components/binary_port.rs index 9961111e71..107d37779c 100644 --- a/node/src/components/binary_port.rs +++ b/node/src/components/binary_port.rs @@ -8,12 +8,6 @@ mod tests; use std::{convert::TryFrom, net::SocketAddr, sync::Arc}; -// TODO[RC]: Merge these three -use futures::TryStreamExt; -use futures::{stream::StreamExt, SinkExt}; -use tokio::{net::TcpListener, sync::Semaphore}; - -use bytes::Bytes; use casper_binary_port::{ BalanceResponse, BinaryMessage, BinaryMessageCodec, BinaryRequest, BinaryRequestHeader, BinaryRequestTag, BinaryResponse, BinaryResponseAndRequest, DictionaryItemIdentifier, @@ -38,14 +32,13 @@ use casper_types::{ }; use datasize::DataSize; -use futures::{future::BoxFuture, FutureExt}; +use futures::{future::BoxFuture, FutureExt, SinkExt, StreamExt}; use once_cell::sync::OnceCell; use prometheus::Registry; use tokio::{ - io::{AsyncRead, AsyncWrite}, join, - net::TcpStream, - sync::{Notify, OwnedSemaphorePermit}, + net::{TcpListener, TcpStream}, + sync::{Notify, OwnedSemaphorePermit, Semaphore}, }; use tokio_util::codec::Framed; use tracing::{debug, error, info, warn}; From 9680b168defa3f717770bc9d02660bd7175c8dd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 24 Apr 2024 09:37:32 +0200 Subject: [PATCH 08/23] Add error handling in client loop --- binary_port/src/lib.rs | 1 + node/src/components/binary_port.rs | 26 ++++++++++-------------- node/src/components/binary_port/error.rs | 2 ++ 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/binary_port/src/lib.rs b/binary_port/src/lib.rs index 367b0ef742..36b7582cb5 100644 --- a/binary_port/src/lib.rs +++ b/binary_port/src/lib.rs @@ -28,6 +28,7 @@ pub use binary_response::BinaryResponse; pub use binary_response_and_request::BinaryResponseAndRequest; pub use binary_response_header::BinaryResponseHeader; pub use dictionary_item_identifier::DictionaryItemIdentifier; +pub use error::Error; pub use error_code::ErrorCode; pub use get_request::GetRequest; pub use global_state_query_result::GlobalStateQueryResult; diff --git a/node/src/components/binary_port.rs b/node/src/components/binary_port.rs index 107d37779c..b4aaa34a39 100644 --- a/node/src/components/binary_port.rs +++ b/node/src/components/binary_port.rs @@ -835,27 +835,23 @@ where let mut framed = Framed::new(stream, BinaryMessageCodec::new(max_message_size_bytes)); loop { - // TODO[RC]: Fix unwrap - let next_message = framed.next().await.unwrap().unwrap(); - let payload = next_message.payload(); - // let Some(incoming_request) = .unwrap().unwrap() else { - // debug!("remote party closed the connection"); - // return Ok(()); - // }; - + let Some(result) = framed.next().await else { + debug!("remote party closed the connection"); + return Ok(()); + }; + let result = result?; + let payload = result.payload(); if payload.is_empty() { return Err(Error::NoPayload); }; let version = effect_builder.get_protocol_version().await; - let resp = handle_payload(effect_builder, &payload, version).await; - let resp_and_payload = BinaryResponseAndRequest::new(resp, &payload); - let response_payload = BinaryMessage::new(resp_and_payload.to_bytes().unwrap()); + let response = handle_payload(effect_builder, &payload, version).await; framed - .send(response_payload) - .await - // TODO[RC]: Fix unwrap() - .unwrap(); + .send(BinaryMessage::new( + BinaryResponseAndRequest::new(response, &payload).to_bytes()?, + )) + .await? } } diff --git a/node/src/components/binary_port/error.rs b/node/src/components/binary_port/error.rs index dac2c7681b..71723b0364 100644 --- a/node/src/components/binary_port/error.rs +++ b/node/src/components/binary_port/error.rs @@ -7,4 +7,6 @@ pub(crate) enum Error { BytesRepr(#[from] bytesrepr::Error), #[error("received request without payload")] NoPayload, + #[error(transparent)] + BinaryPortError(#[from] casper_binary_port::Error), } From 5aab49a7d39b92445ce9c96665cbacb2a9dc9494 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Mon, 22 Apr 2024 11:06:47 +0200 Subject: [PATCH 09/23] Fix typo --- storage/src/system/protocol_upgrade.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/src/system/protocol_upgrade.rs b/storage/src/system/protocol_upgrade.rs index c83ea7f9e1..1e550d43ce 100644 --- a/storage/src/system/protocol_upgrade.rs +++ b/storage/src/system/protocol_upgrade.rs @@ -97,7 +97,7 @@ impl From for ProtocolUpgradeError { } } -/// Adrresses for system entities. +/// Addresses for system entities. pub struct SystemEntityAddresses { mint: AddressableEntityHash, auction: AddressableEntityHash, From ec4e25913b4e4836f4a746386cbccafa2ae7b72e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 24 Apr 2024 12:10:52 +0200 Subject: [PATCH 10/23] Update binary port component tests --- .../reactor/main_reactor/tests/binary_port.rs | 56 +++++++------------ 1 file changed, 21 insertions(+), 35 deletions(-) diff --git a/node/src/reactor/main_reactor/tests/binary_port.rs b/node/src/reactor/main_reactor/tests/binary_port.rs index a97a4ece3c..fe01c58a3d 100644 --- a/node/src/reactor/main_reactor/tests/binary_port.rs +++ b/node/src/reactor/main_reactor/tests/binary_port.rs @@ -7,11 +7,12 @@ use std::{ }; use casper_binary_port::{ - BalanceResponse, BinaryRequest, BinaryRequestHeader, BinaryResponse, BinaryResponseAndRequest, - ConsensusStatus, ConsensusValidatorChanges, DictionaryItemIdentifier, DictionaryQueryResult, - ErrorCode, GetRequest, GetTrieFullResult, GlobalStateQueryResult, GlobalStateRequest, - InformationRequest, InformationRequestTag, LastProgress, NetworkName, NodeStatus, PayloadType, - PurseIdentifier, ReactorStateName, RecordId, Uptime, + BalanceResponse, BinaryMessage, BinaryMessageCodec, BinaryRequest, BinaryRequestHeader, + BinaryResponse, BinaryResponseAndRequest, ConsensusStatus, ConsensusValidatorChanges, + DictionaryItemIdentifier, DictionaryQueryResult, ErrorCode, GetRequest, GetTrieFullResult, + GlobalStateQueryResult, GlobalStateRequest, InformationRequest, InformationRequestTag, + LastProgress, NetworkName, NodeStatus, PayloadType, PurseIdentifier, ReactorStateName, + RecordId, Uptime, }; use casper_storage::global_state::state::CommitProvider; use casper_types::{ @@ -25,8 +26,10 @@ use casper_types::{ EntityAddr, GlobalStateIdentifier, Key, KeyTag, NextUpgrade, Peers, ProtocolVersion, SecretKey, SignedBlock, StoredValue, Timestamp, Transaction, TransactionV1Builder, Transfer, URef, U512, }; +use futures::{SinkExt, StreamExt}; use rand::Rng; use tokio::{net::TcpStream, time::timeout}; +use tokio_util::codec::Framed; use tracing::error; use crate::{ @@ -43,6 +46,7 @@ const GUARANTEED_BLOCK_HEIGHT: u64 = 2; const TEST_DICT_NAME: &str = "test_dict"; const TEST_DICT_ITEM_KEY: &str = "test_key"; +const MESSAGE_SIZE: u32 = 1024 * 1024 * 10; struct TestData { rng: TestRng, @@ -71,7 +75,7 @@ fn network_produced_blocks( } async fn setup() -> ( - JulietRpcClient<1>, + Framed, ( impl futures::Future>, TestRng)>, TestData, @@ -136,31 +140,14 @@ async fn setup() -> ( // We let the entire network run in the background, until our request completes. let finish_cranking = fixture.run_until_stopped(rng.create_child()); - // Set-up juliet client. - let protocol_builder = ProtocolBuilder::<1>::with_default_channel_config( - ChannelConfiguration::default() - .with_request_limit(10) - .with_max_request_payload_size(1024 * 1024 * 8) - .with_max_response_payload_size(1024 * 1024 * 8), - ); - let io_builder = IoCoreBuilder::new(protocol_builder).buffer_size(ChannelId::new(0), 4096); - let rpc_builder = RpcBuilder::new(io_builder); + // Set-up client. let address = format!("localhost:{}", binary_port_addr.port()); let stream = TcpStream::connect(address.clone()) .await .expect("should create stream"); - let (reader, writer) = stream.into_split(); - let (client, mut server) = rpc_builder.build(reader, writer); - - // We are not using the server functionality, but still need to run it for IO reasons. - tokio::spawn(async move { - if let Err(err) = server.next_request().await { - error!(%err, "server read error"); - } - }); ( - client, + Framed::new(stream, BinaryMessageCodec::new(MESSAGE_SIZE)), ( finish_cranking, TestData { @@ -289,7 +276,7 @@ async fn binary_port_component() { testing::init_logging(); let ( - client, + mut client, ( finish_cranking, TestData { @@ -375,19 +362,18 @@ async fn binary_port_component() { .cloned() .collect::>(); - let request_guard = client - .create_request(ChannelId::new(0)) - .with_payload(original_request_bytes.clone().into()) - .queue_for_sending() - .await; + client + .send(BinaryMessage::new(original_request_bytes.clone())) + .await + .expect("should send message"); - let response = timeout(Duration::from_secs(10), request_guard.wait_for_response()) + let response = timeout(Duration::from_secs(10), client.next()) .await .unwrap_or_else(|err| panic!("{}: should complete without timeout: {}", name, err)) - .unwrap_or_else(|err| panic!("{}: should have ok response: {}", name, err)) - .unwrap_or_else(|| panic!("{}: should have bytes", name)); + .unwrap_or_else(|| panic!("{}: should have bytes", name)) + .unwrap_or_else(|err| panic!("{}: should have ok response: {}", name, err)); let (binary_response_and_request, _): (BinaryResponseAndRequest, _) = - FromBytes::from_bytes(&response).expect("should deserialize response"); + FromBytes::from_bytes(response.payload()).expect("should deserialize response"); let mirrored_request_bytes = binary_response_and_request.original_request(); assert_eq!( From 1649963c59eb0c8bc30417e4f28c1fcdde25ece4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 24 Apr 2024 12:41:28 +0200 Subject: [PATCH 11/23] Remove references to 'juliet' from protocol description file --- node/BINARY_PORT_PROTOCOL.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/BINARY_PORT_PROTOCOL.md b/node/BINARY_PORT_PROTOCOL.md index 265c3e39f9..472315574e 100644 --- a/node/BINARY_PORT_PROTOCOL.md +++ b/node/BINARY_PORT_PROTOCOL.md @@ -2,7 +2,7 @@ The specification of the protocol used to communicate between the RPC sidecar and binary port casper-node. ## Synopsis -This is a binary protocol which follows a simple request-response model built on top of [Juliet](https://github.com/casper-network/juliet). The protocol consists of one party (the client) sending requests to another party (the server) and the server sending responses back to the client. Both requests and responses are wrapped in envelopes containing a version and a payload type tag. The versioning scheme is based on [SemVer](https://semver.org/), see [versioning](#versioning) for more details. The payload type tags are used to interpret the contents of the payloads. +This is a binary protocol which follows a simple request-response model. The protocol consists of one party (the client) sending requests to another party (the server) and the server sending responses back to the client. Both requests and responses are wrapped in envelopes containing a version and a payload type tag. The versioning scheme is based on [SemVer](https://semver.org/), see [versioning](#versioning) for more details. The payload type tags are used to interpret the contents of the payloads. ### Request format | Size in bytes | Field | Description | From 2d8c60ad941b3f22eb97cea974025f09018ea400 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 24 Apr 2024 12:41:32 +0200 Subject: [PATCH 12/23] Clean-up --- node/src/components/binary_port.rs | 4 ++-- node/src/components/binary_port/error.rs | 2 +- node/src/reactor/main_reactor/tests/binary_port.rs | 1 - 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/node/src/components/binary_port.rs b/node/src/components/binary_port.rs index b4aaa34a39..c80c0aeddf 100644 --- a/node/src/components/binary_port.rs +++ b/node/src/components/binary_port.rs @@ -846,10 +846,10 @@ where }; let version = effect_builder.get_protocol_version().await; - let response = handle_payload(effect_builder, &payload, version).await; + let response = handle_payload(effect_builder, payload, version).await; framed .send(BinaryMessage::new( - BinaryResponseAndRequest::new(response, &payload).to_bytes()?, + BinaryResponseAndRequest::new(response, payload).to_bytes()?, )) .await? } diff --git a/node/src/components/binary_port/error.rs b/node/src/components/binary_port/error.rs index 71723b0364..46050a4eed 100644 --- a/node/src/components/binary_port/error.rs +++ b/node/src/components/binary_port/error.rs @@ -8,5 +8,5 @@ pub(crate) enum Error { #[error("received request without payload")] NoPayload, #[error(transparent)] - BinaryPortError(#[from] casper_binary_port::Error), + BinaryPort(#[from] casper_binary_port::Error), } diff --git a/node/src/reactor/main_reactor/tests/binary_port.rs b/node/src/reactor/main_reactor/tests/binary_port.rs index fe01c58a3d..527125f905 100644 --- a/node/src/reactor/main_reactor/tests/binary_port.rs +++ b/node/src/reactor/main_reactor/tests/binary_port.rs @@ -30,7 +30,6 @@ use futures::{SinkExt, StreamExt}; use rand::Rng; use tokio::{net::TcpStream, time::timeout}; use tokio_util::codec::Framed; -use tracing::error; use crate::{ reactor::{main_reactor::MainReactor, Runner}, From fe166c88599b5ed2960d24abccba51a6717b6a78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 24 Apr 2024 15:26:23 +0200 Subject: [PATCH 13/23] Use smaller message size in tests --- binary_port/src/binary_message.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binary_port/src/binary_message.rs b/binary_port/src/binary_message.rs index aa7ef9e1b0..764f3f4352 100644 --- a/binary_port/src/binary_message.rs +++ b/binary_port/src/binary_message.rs @@ -104,7 +104,7 @@ mod tests { BinaryMessage, BinaryMessageCodec, }; - const MAX_MESSAGE_SIZE_BYTES: u32 = 1024 * 104 * 10; + const MAX_MESSAGE_SIZE_BYTES: u32 = 1024 * 1024; #[test] fn binary_message_codec() { From 5c05f43290b8973d0bb01917030300f83244b741 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 24 Apr 2024 15:26:54 +0200 Subject: [PATCH 14/23] Validate message size in encoder --- binary_port/src/binary_message.rs | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/binary_port/src/binary_message.rs b/binary_port/src/binary_message.rs index 764f3f4352..17918e83dd 100644 --- a/binary_port/src/binary_message.rs +++ b/binary_port/src/binary_message.rs @@ -56,6 +56,12 @@ impl codec::Encoder for BinaryMessageCodec { dst: &mut bytes::BytesMut, ) -> Result<(), Self::Error> { let length = item.0.len() as LengthEncoding; + if length > self.max_message_size_bytes { + return Err(Error::RequestTooLarge { + allowed: self.max_message_size_bytes, + got: length, + }); + } let length_bytes = length.to_le_bytes(); dst.extend(length_bytes.iter().chain(item.0.iter())); Ok(()) @@ -95,6 +101,8 @@ impl codec::Decoder for BinaryMessageCodec { #[cfg(test)] mod tests { + use std::vec; + use casper_types::testing::TestRng; use tokio_util::codec::{Decoder, Encoder}; @@ -173,7 +181,19 @@ mod tests { } #[test] - fn should_bail_on_too_large_request() { + fn encode_should_bail_on_too_large_request() { + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); + let too_large = MAX_MESSAGE_SIZE_BYTES as usize + 1; + let val = BinaryMessage::new(vec![0; too_large]); + let mut bytes = bytes::BytesMut::new(); + let result = codec.encode(val, &mut bytes).unwrap_err(); + + assert!(matches!(result, Error::RequestTooLarge { allowed, got } + if allowed == codec.max_message_size_bytes && got == too_large as u32)); + } + + #[test] + fn decode_should_bail_on_too_large_request() { let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); let mut bytes = bytes::BytesMut::new(); let too_large = (codec.max_message_size_bytes + 1) as LengthEncoding; From 625d4f02984073c8378ff134b84ea6a2c145a144 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Thu, 25 Apr 2024 10:07:46 +0200 Subject: [PATCH 15/23] Add more UTs to cover size check in encoding --- binary_port/src/binary_message.rs | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/binary_port/src/binary_message.rs b/binary_port/src/binary_message.rs index 17918e83dd..19befd4fbb 100644 --- a/binary_port/src/binary_message.rs +++ b/binary_port/src/binary_message.rs @@ -192,18 +192,44 @@ mod tests { if allowed == codec.max_message_size_bytes && got == too_large as u32)); } + #[test] + fn should_encode_request_of_maximum_size() { + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); + let just_right_size = MAX_MESSAGE_SIZE_BYTES as usize; + let val = BinaryMessage::new(vec![0; just_right_size]); + let mut bytes = bytes::BytesMut::new(); + + let result = codec.encode(val, &mut bytes); + assert!(result.is_ok()); + } + #[test] fn decode_should_bail_on_too_large_request() { + let rng = &mut TestRng::new(); let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); let mut bytes = bytes::BytesMut::new(); let too_large = (codec.max_message_size_bytes + 1) as LengthEncoding; - bytes.extend(&too_large.to_le_bytes()); + bytes.extend(too_large.to_le_bytes()); + bytes.extend(std::iter::repeat_with(|| rng.gen::()).take(too_large as usize)); let result = codec.decode(&mut bytes).unwrap_err(); assert!(matches!(result, Error::RequestTooLarge { allowed, got } if allowed == codec.max_message_size_bytes && got == too_large)); } + #[test] + fn should_decode_request_of_maximum_size() { + let rng = &mut TestRng::new(); + let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); + let mut bytes = bytes::BytesMut::new(); + let just_right_size = (codec.max_message_size_bytes) as LengthEncoding; + bytes.extend(just_right_size.to_le_bytes()); + bytes.extend(std::iter::repeat_with(|| rng.gen::()).take(just_right_size as usize)); + + let result = codec.decode(&mut bytes); + assert!(result.is_ok()); + } + #[test] fn should_bail_on_empty_request() { let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); From e2c991e79b4eaed6fb7d9d3d1ab8b8298e1671c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Thu, 25 Apr 2024 10:08:24 +0200 Subject: [PATCH 16/23] Minor syntax update in 'BinaryMessage::new()' --- binary_port/src/binary_message.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binary_port/src/binary_message.rs b/binary_port/src/binary_message.rs index 19befd4fbb..79316ab8bf 100644 --- a/binary_port/src/binary_message.rs +++ b/binary_port/src/binary_message.rs @@ -16,7 +16,7 @@ pub struct BinaryMessage(Vec); impl BinaryMessage { pub fn new(payload: Vec) -> Self { - BinaryMessage(payload) + Self(payload) } pub fn payload(&self) -> &[u8] { From e510428a42522817f9da38d546f56e772101369c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Thu, 25 Apr 2024 10:09:06 +0200 Subject: [PATCH 17/23] Use pattern-matching approach in decoder for easier validation --- binary_port/src/binary_message.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/binary_port/src/binary_message.rs b/binary_port/src/binary_message.rs index 79316ab8bf..06638c10c9 100644 --- a/binary_port/src/binary_message.rs +++ b/binary_port/src/binary_message.rs @@ -74,11 +74,19 @@ impl codec::Decoder for BinaryMessageCodec { type Error = Error; fn decode(&mut self, src: &mut bytes::BytesMut) -> Result, Self::Error> { - if src.len() < LENGTH_ENCODING_SIZE_BYTES { + let (length, maybe_payload) = if let [b1, b2, b3, b4, remainder @ ..] = &src[..] { + let length = LengthEncoding::from_le_bytes([*b1, *b2, *b3, *b4]) as usize; + (length, remainder.get(..length)) + } else { // Not enough bytes to read the length. return Ok(None); - } - let length = LengthEncoding::from_le_bytes([src[0], src[1], src[2], src[3]]) as usize; + }; + + let Some(payload) = maybe_payload else { + // Not enough bytes to read the whole message. + return Ok(None); + }; + if length > self.max_message_size_bytes as usize { return Err(Error::RequestTooLarge { allowed: self.max_message_size_bytes, @@ -88,12 +96,8 @@ impl codec::Decoder for BinaryMessageCodec { if length == 0 { return Err(Error::EmptyRequest); } - if src.len() < length + LENGTH_ENCODING_SIZE_BYTES { - // Not enough bytes to read the whole message. - return Ok(None); - } - let payload = src[LENGTH_ENCODING_SIZE_BYTES..LENGTH_ENCODING_SIZE_BYTES + length].to_vec(); + let payload = payload.to_vec(); src.advance(LENGTH_ENCODING_SIZE_BYTES + length); Ok(Some(BinaryMessage(payload))) } @@ -104,6 +108,7 @@ mod tests { use std::vec; use casper_types::testing::TestRng; + use rand::Rng; use tokio_util::codec::{Decoder, Encoder}; use crate::{ From e20b3ae07eee1b601cd624c9655a55adba9a86ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Thu, 25 Apr 2024 11:39:33 +0200 Subject: [PATCH 18/23] Rename 'client_loop()' to `handle_client_loop()' --- node/src/components/binary_port.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/node/src/components/binary_port.rs b/node/src/components/binary_port.rs index c80c0aeddf..bbb96349d0 100644 --- a/node/src/components/binary_port.rs +++ b/node/src/components/binary_port.rs @@ -814,7 +814,7 @@ where } } -async fn client_loop( +async fn handle_client_loop( stream: TcpStream, effect_builder: EffectBuilder, max_message_size_bytes: u32, @@ -910,7 +910,9 @@ async fn handle_client( + From + Send, { - if let Err(err) = client_loop(stream, effect_builder, config.max_message_size_bytes).await { + if let Err(err) = + handle_client_loop(stream, effect_builder, config.max_message_size_bytes).await + { // Low severity is used to prevent malicious clients from causing log floods. info!(%addr, err=display_error(&err), "binary port client handler error"); } From 77704e64a2a1a776a09be7bd1cdd4a49e2bb0a7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Thu, 25 Apr 2024 11:54:19 +0200 Subject: [PATCH 19/23] Use 'Bytes' in 'BinaryMessage' for improved performance --- binary_port/src/binary_message.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/binary_port/src/binary_message.rs b/binary_port/src/binary_message.rs index 06638c10c9..2e8a56f6fc 100644 --- a/binary_port/src/binary_message.rs +++ b/binary_port/src/binary_message.rs @@ -3,7 +3,7 @@ use casper_types::testing::TestRng; #[cfg(test)] use rand::Rng; -use bytes::Buf; +use bytes::{Buf, Bytes}; use tokio_util::codec::{self}; use crate::error::Error; @@ -12,11 +12,11 @@ type LengthEncoding = u32; const LENGTH_ENCODING_SIZE_BYTES: usize = std::mem::size_of::(); #[derive(Clone, PartialEq, Debug)] -pub struct BinaryMessage(Vec); +pub struct BinaryMessage(Bytes); impl BinaryMessage { pub fn new(payload: Vec) -> Self { - Self(payload) + Self(payload.into()) } pub fn payload(&self) -> &[u8] { @@ -74,15 +74,15 @@ impl codec::Decoder for BinaryMessageCodec { type Error = Error; fn decode(&mut self, src: &mut bytes::BytesMut) -> Result, Self::Error> { - let (length, maybe_payload) = if let [b1, b2, b3, b4, remainder @ ..] = &src[..] { + let (length, have_full_frame) = if let [b1, b2, b3, b4, remainder @ ..] = &src[..] { let length = LengthEncoding::from_le_bytes([*b1, *b2, *b3, *b4]) as usize; - (length, remainder.get(..length)) + (length, remainder.len() >= length) } else { // Not enough bytes to read the length. return Ok(None); }; - let Some(payload) = maybe_payload else { + if !have_full_frame { // Not enough bytes to read the whole message. return Ok(None); }; @@ -97,16 +97,13 @@ impl codec::Decoder for BinaryMessageCodec { return Err(Error::EmptyRequest); } - let payload = payload.to_vec(); - src.advance(LENGTH_ENCODING_SIZE_BYTES + length); - Ok(Some(BinaryMessage(payload))) + src.advance(LENGTH_ENCODING_SIZE_BYTES); + Ok(Some(BinaryMessage(src.split_to(length).freeze()))) } } #[cfg(test)] mod tests { - use std::vec; - use casper_types::testing::TestRng; use rand::Rng; use tokio_util::codec::{Decoder, Encoder}; @@ -189,7 +186,7 @@ mod tests { fn encode_should_bail_on_too_large_request() { let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); let too_large = MAX_MESSAGE_SIZE_BYTES as usize + 1; - let val = BinaryMessage::new(vec![0; too_large]); + let val = BinaryMessage::new(std::iter::repeat(0).take(too_large).collect()); let mut bytes = bytes::BytesMut::new(); let result = codec.encode(val, &mut bytes).unwrap_err(); @@ -201,7 +198,7 @@ mod tests { fn should_encode_request_of_maximum_size() { let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); let just_right_size = MAX_MESSAGE_SIZE_BYTES as usize; - let val = BinaryMessage::new(vec![0; just_right_size]); + let val = BinaryMessage::new(std::iter::repeat(0).take(just_right_size).collect()); let mut bytes = bytes::BytesMut::new(); let result = codec.encode(val, &mut bytes); From 364f99d64f472c6def10d378b9a148bb72adf3a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Thu, 25 Apr 2024 11:57:36 +0200 Subject: [PATCH 20/23] Simplify test code --- binary_port/src/binary_message.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/binary_port/src/binary_message.rs b/binary_port/src/binary_message.rs index 2e8a56f6fc..52d6d7ef32 100644 --- a/binary_port/src/binary_message.rs +++ b/binary_port/src/binary_message.rs @@ -186,7 +186,7 @@ mod tests { fn encode_should_bail_on_too_large_request() { let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); let too_large = MAX_MESSAGE_SIZE_BYTES as usize + 1; - let val = BinaryMessage::new(std::iter::repeat(0).take(too_large).collect()); + let val = BinaryMessage::new(vec![0; too_large]); let mut bytes = bytes::BytesMut::new(); let result = codec.encode(val, &mut bytes).unwrap_err(); @@ -198,7 +198,7 @@ mod tests { fn should_encode_request_of_maximum_size() { let mut codec = BinaryMessageCodec::new(MAX_MESSAGE_SIZE_BYTES); let just_right_size = MAX_MESSAGE_SIZE_BYTES as usize; - let val = BinaryMessage::new(std::iter::repeat(0).take(just_right_size).collect()); + let val = BinaryMessage::new(vec![0; just_right_size]); let mut bytes = bytes::BytesMut::new(); let result = codec.encode(val, &mut bytes); From 33415a5a0b26757ac21ce62a457d8481940904e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Fri, 26 Apr 2024 10:51:51 +0200 Subject: [PATCH 21/23] Remove the now duplicated 'gas_hold_handling' field from config --- resources/local/config.toml | 2 -- 1 file changed, 2 deletions(-) diff --git a/resources/local/config.toml b/resources/local/config.toml index b2d03a2a3d..ba6cf01bb5 100644 --- a/resources/local/config.toml +++ b/resources/local/config.toml @@ -322,8 +322,6 @@ gas_hold_handling = { type = 'accrued' } # Maximum number of connections to the server. max_connections = 16 -gas_hold_handling = { type = 'accrued' } - # ============================================== # Configuration options for the REST HTTP server # ============================================== From 91ff4b645507925215771c7d5c1ad81344b32086 Mon Sep 17 00:00:00 2001 From: Jacek Malec <145967538+jacek-casper@users.noreply.github.com> Date: Mon, 29 Apr 2024 14:27:26 +0100 Subject: [PATCH 22/23] Point at custom sidecar temporarily --- ci/ci.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ci/ci.json b/ci/ci.json index 5fdc6120d9..032fcffe9d 100644 --- a/ci/ci.json +++ b/ci/ci.json @@ -9,8 +9,8 @@ "branch": "main" }, "casper-sidecar": { - "github_repo_url": "https://github.com/casper-network/casper-sidecar", - "branch": "feat-2.0" + "github_repo_url": "https://github.com/rafal-ch/casper-sidecar", + "branch": "julietless_sidecar_2" }, "casper-nctl": { "github_repo_url": "https://github.com/casper-network/casper-nctl", From 3b6ad2d4f727d2e61870f31a945d0f260500aa51 Mon Sep 17 00:00:00 2001 From: Jacek Malec <145967538+jacek-casper@users.noreply.github.com> Date: Mon, 29 Apr 2024 16:39:11 +0100 Subject: [PATCH 23/23] Point back at latest sidecar --- ci/ci.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ci/ci.json b/ci/ci.json index 032fcffe9d..5fdc6120d9 100644 --- a/ci/ci.json +++ b/ci/ci.json @@ -9,8 +9,8 @@ "branch": "main" }, "casper-sidecar": { - "github_repo_url": "https://github.com/rafal-ch/casper-sidecar", - "branch": "julietless_sidecar_2" + "github_repo_url": "https://github.com/casper-network/casper-sidecar", + "branch": "feat-2.0" }, "casper-nctl": { "github_repo_url": "https://github.com/casper-network/casper-nctl",