Skip to content
This repository has been archived by the owner on Mar 12, 2024. It is now read-only.

Support inputs of larger sizes #240

Merged
merged 4 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [1.0.2] 2023-09-11
## [1.0.2] 2023-09-12

### Changed

- Updated state-fold libraries to version 0.8
- Added `SS_MAX_DECODING_MESSAGE_SIZE` to state-server and dispatcher with 100MB default
- Added `MAX_DECODING_MESSAGE_SIZE` to advance runner with 100MB default
- Improved node log for large inputs

### Fixed

Expand Down
24 changes: 12 additions & 12 deletions offchain/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions offchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ diesel = "2.1"
diesel_migrations = "2.1"
env_logger = "0.10"
ethabi = "18.0"
eth-block-history = "0.7"
eth-state-client-lib = "0.7"
eth-state-fold-types = "0.7"
eth-state-fold = "0.7"
eth-state-server-lib = "0.7"
eth-block-history = "0.8"
eth-state-client-lib = "0.8"
eth-state-fold-types = "0.8"
eth-state-fold = "0.8"
eth-state-server-lib = "0.8"
eth-tx-manager = "0.10"
ethers-signers = "1.0"
futures = "0.3"
Expand Down
6 changes: 6 additions & 0 deletions offchain/advance-runner/src/server_manager/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use grpc_interfaces::cartesi_server_manager::{CyclesConfig, DeadlineConfig};
#[derive(Debug, Clone)]
pub struct ServerManagerConfig {
pub server_manager_endpoint: String,
pub max_decoding_message_size: usize,
pub session_id: String,
pub pending_inputs_sleep_duration: u64,
pub pending_inputs_max_retries: u64,
Expand Down Expand Up @@ -52,6 +53,7 @@ impl ServerManagerConfig {

Self {
server_manager_endpoint: cli_config.server_manager_endpoint,
max_decoding_message_size: cli_config.max_decoding_message_size,
session_id: cli_config.session_id,
pending_inputs_sleep_duration: cli_config
.sm_pending_inputs_sleep_duration,
Expand All @@ -71,6 +73,10 @@ pub struct ServerManagerCLIConfig {
#[arg(long, env, default_value = "http://127.0.0.1:5001")]
pub server_manager_endpoint: String,

/// Maximum size of a decoded message
#[arg(long, env, default_value_t = 100 * 1024 * 1024)]
pub max_decoding_message_size: usize,

/// Server-manager session id
#[arg(long, env, default_value = "default_rollups_id")]
pub session_id: String,
Expand Down
3 changes: 2 additions & 1 deletion offchain/advance-runner/src/server_manager/facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ impl ServerManagerFacade {
.map_err(Error::transient)
})
.await
.context(ConnectionSnafu)?;
.context(ConnectionSnafu)?
.max_decoding_message_size(config.max_decoding_message_size);

Ok(Self {
client,
Expand Down
1 change: 1 addition & 0 deletions offchain/advance-runner/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl AdvanceRunnerFixture {

let server_manager_config = ServerManagerConfig {
server_manager_endpoint,
max_decoding_message_size: 100 * 1024 * 1024,
session_id,
pending_inputs_sleep_duration: 1000,
pending_inputs_max_retries: 10,
Expand Down
4 changes: 2 additions & 2 deletions offchain/dispatcher/src/machine/rollups_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,13 @@ impl BrokerSend for BrokerFacade {
input_index: u64,
input: &Input,
) -> Result<(), BrokerFacadeError> {
tracing::info!(?input_index, ?input, "enqueueing input");
tracing::trace!(?input_index, ?input, "enqueueing input");

let mut broker = self.broker.lock().await;
let status = self.broker_status(&mut broker).await?;

let event = build_next_input(input, &status);
tracing::trace!(?event, "producing input event");
tracing::info!(?event, "producing input event");

input_sanity_check!(event, input_index);

Expand Down
2 changes: 1 addition & 1 deletion offchain/dispatcher/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub async fn create_state_server(
.await
.context(ConnectSnafu)?;

Ok(GrpcStateFoldClient::new_from_channel(channel))
Ok(GrpcStateFoldClient::new_from_channel(channel, config))
}

pub async fn create_block_subscription(
Expand Down
15 changes: 14 additions & 1 deletion offchain/rollups-events/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use std::fmt::Write;
pub const ADDRESS_SIZE: usize = 20;
pub const HASH_SIZE: usize = 32;

const PAYLOAD_DEBUG_MAX_LEN: usize = 100;

/// A binary array that is converted to a hex string when serialized
#[derive(Clone, Hash, Eq, PartialEq)]
pub struct HexArray<const N: usize>([u8; N]);
Expand Down Expand Up @@ -143,7 +145,18 @@ impl<'de> Deserialize<'de> for Payload {

impl std::fmt::Debug for Payload {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", base64_engine.encode(self.inner()))
let len = self.inner().len();
if len > PAYLOAD_DEBUG_MAX_LEN {
let slice = &self.inner().as_slice()[0..PAYLOAD_DEBUG_MAX_LEN];
write!(
f,
"{}...[total: {} bytes]",
base64_engine.encode(slice),
len
)
} else {
write!(f, "{}", base64_engine.encode(self.inner()))
}
}
}

Expand Down
3 changes: 1 addition & 2 deletions offchain/state-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,11 @@ where

let server = StateServer::<_, _, F>::new(block_subscriber, env);

let server_address = config.server_address;
let (shutdown_tx, shutdown_rx) = oneshot::channel();

tokio::spawn(async { wait_for_signal(shutdown_tx).await });

Ok(start_server(server_address, server, shutdown_rx)
Ok(start_server(&config, server, shutdown_rx)
.await
.context(TonicSnafu)?)
}
Expand Down
Loading