diff --git a/crates/sui-core/src/authority_server.rs b/crates/sui-core/src/authority_server.rs index 17d09e401a1708..a93a722ba915d0 100644 --- a/crates/sui-core/src/authority_server.rs +++ b/crates/sui-core/src/authority_server.rs @@ -346,7 +346,7 @@ impl ValidatorService { async fn handle_transaction( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let Self { state, consensus_adapter, @@ -414,9 +414,14 @@ impl ValidatorService { // to save more CPU. return Err(error.into()); } - Ok(tonic::Response::new(info)) + Ok((tonic::Response::new(info), Weight::zero())) } + // In addition to the response from handling the certificates, + // returns a bool indicating whether the request should be tallied + // toward spam count. In general, this should be set to true for + // requests that are read-only and thus do not consume gas, such + // as when the transaction is already executed. async fn handle_certificates( &self, certificates: NonEmpty, @@ -426,7 +431,7 @@ impl ValidatorService { _include_auxiliary_data: bool, epoch_store: &Arc, wait_for_effects: bool, - ) -> Result>, tonic::Status> { + ) -> Result<(Option>, Weight), tonic::Status> { // Validate if cert can be executed // Fullnode does not serve handle_certificate call. fp_ensure!( @@ -472,13 +477,16 @@ impl ValidatorService { None }; - return Ok(Some(vec![HandleCertificateResponseV3 { - effects: signed_effects.into_inner(), - events, - input_objects: None, - output_objects: None, - auxiliary_data: None, - }])); + return Ok(( + Some(vec![HandleCertificateResponseV3 { + effects: signed_effects.into_inner(), + events, + input_objects: None, + output_objects: None, + auxiliary_data: None, + }]), + Weight::one(), + )); }; } @@ -562,7 +570,7 @@ impl ValidatorService { epoch_store, ); } - return Ok(None); + return Ok((None, Weight::zero())); } // 4) Execute the certificates immediately if they contain only owned object transactions, @@ -605,22 +613,24 @@ impl ValidatorService { )) .await?; - Ok(Some(responses)) + Ok((Some(responses), Weight::zero())) } } +type WrappedServiceResponse = Result<(tonic::Response, Weight), tonic::Status>; + impl ValidatorService { async fn transaction_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { self.handle_transaction(request).await } async fn submit_certificate_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let epoch_store = self.state.load_epoch_store_one_call_per_task(); let certificate = request.into_inner(); Self::transaction_validity_check(&epoch_store, certificate.data())?; @@ -637,17 +647,20 @@ impl ValidatorService { ) .instrument(span) .await - .map(|executed| { - tonic::Response::new(SubmitCertificateResponse { - executed: executed.map(|mut x| x.remove(0)).map(Into::into), - }) + .map(|(executed, spam_weight)| { + ( + tonic::Response::new(SubmitCertificateResponse { + executed: executed.map(|mut x| x.remove(0)).map(Into::into), + }), + spam_weight, + ) }) } async fn handle_certificate_v2_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let epoch_store = self.state.load_epoch_store_one_call_per_task(); let certificate = request.into_inner(); Self::transaction_validity_check(&epoch_store, certificate.data())?; @@ -664,11 +677,16 @@ impl ValidatorService { ) .instrument(span) .await - .map(|v| { - tonic::Response::new( - v.expect("handle_certificate should not return none with wait_for_effects=true") + .map(|(resp, spam_weight)| { + ( + tonic::Response::new( + resp.expect( + "handle_certificate should not return none with wait_for_effects=true", + ) .remove(0) .into(), + ), + spam_weight, ) }) } @@ -676,7 +694,7 @@ impl ValidatorService { async fn handle_certificate_v3_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let epoch_store = self.state.load_epoch_store_one_call_per_task(); let request = request.into_inner(); Self::transaction_validity_check(&epoch_store, request.certificate.data())?; @@ -693,10 +711,15 @@ impl ValidatorService { ) .instrument(span) .await - .map(|v| { - tonic::Response::new( - v.expect("handle_certificate should not return none with wait_for_effects=true") + .map(|(resp, spam_weight)| { + ( + tonic::Response::new( + resp.expect( + "handle_certificate should not return none with wait_for_effects=true", + ) .remove(0), + ), + spam_weight, ) }) } @@ -788,7 +811,7 @@ impl ValidatorService { async fn handle_soft_bundle_certificates_v3_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let epoch_store = self.state.load_epoch_store_one_call_per_task(); let request = request.into_inner(); @@ -818,10 +841,13 @@ impl ValidatorService { ) .instrument(span) .await - .map(|v| { - tonic::Response::new(HandleSoftBundleCertificatesResponseV3 { - responses: v.unwrap_or_default(), - }) + .map(|(resp, spam_weight)| { + ( + tonic::Response::new(HandleSoftBundleCertificatesResponseV3 { + responses: resp.unwrap_or_default(), + }), + spam_weight, + ) }) } @@ -850,49 +876,48 @@ impl ValidatorService { async fn object_info_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let request = request.into_inner(); let response = self.state.handle_object_info_request(request).await?; - Ok(tonic::Response::new(response)) + Ok((tonic::Response::new(response), Weight::one())) } async fn transaction_info_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let request = request.into_inner(); let response = self.state.handle_transaction_info_request(request).await?; - Ok(tonic::Response::new(response)) + Ok((tonic::Response::new(response), Weight::one())) } async fn checkpoint_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let request = request.into_inner(); let response = self.state.handle_checkpoint_request(&request)?; - Ok(tonic::Response::new(response)) + Ok((tonic::Response::new(response), Weight::one())) } async fn checkpoint_v2_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let request = request.into_inner(); let response = self.state.handle_checkpoint_request_v2(&request)?; - Ok(tonic::Response::new(response)) + Ok((tonic::Response::new(response), Weight::one())) } async fn get_system_state_object_impl( &self, _request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let response = self .state .get_object_cache_reader() .get_sui_system_state_object_unsafe()?; - - Ok(tonic::Response::new(response)) + Ok((tonic::Response::new(response), Weight::one())) } async fn handle_traffic_req(&self, client: Option) -> Result<(), tonic::Status> { @@ -911,12 +936,15 @@ impl ValidatorService { fn handle_traffic_resp( &self, client: Option, - response: &Result, tonic::Status>, - ) { - let error: Option = if let Err(status) = response { - Some(SuiError::from(status.clone())) - } else { - None + wrapped_response: WrappedServiceResponse, + ) -> Result, tonic::Status> { + let (error, spam_weight, unwrapped_response) = match wrapped_response { + Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)), + Err(status) => ( + Some(SuiError::from(status.clone())), + Weight::zero(), + Err(status.clone()), + ), }; if let Some(traffic_controller) = self.traffic_controller.clone() { @@ -924,9 +952,11 @@ impl ValidatorService { direct: client, through_fullnode: None, error_weight: error.map(normalize).unwrap_or(Weight::zero()), + spam_weight, timestamp: SystemTime::now(), }) } + unwrapped_response } } @@ -944,7 +974,7 @@ fn make_tonic_request_for_testing(message: T) -> tonic::Request { // TODO: refine error matching here fn normalize(err: SuiError) -> Weight { - match err { + match dbg!(err) { SuiError::UserInputError { .. } | SuiError::InvalidSignature { .. } | SuiError::SignerSignatureAbsent { .. } @@ -963,7 +993,7 @@ fn normalize(err: SuiError) -> Weight { macro_rules! handle_with_decoration { ($self:ident, $func_name:ident, $request:ident) => {{ if $self.client_id_source.is_none() { - return $self.$func_name($request).await; + return $self.$func_name($request).await.map(|(result, _)| result); } let client = match $self.client_id_source.as_ref().unwrap() { @@ -1027,11 +1057,10 @@ macro_rules! handle_with_decoration { // check if either IP is blocked, in which case return early $self.handle_traffic_req(client.clone()).await?; - // handle request - let response = $self.$func_name($request).await; - // handle response tallying - $self.handle_traffic_resp(client, &response); - response + + // handle traffic tallying + let wrapped_response = $self.$func_name($request).await; + $self.handle_traffic_resp(client, wrapped_response) }}; } diff --git a/crates/sui-core/src/traffic_controller/mod.rs b/crates/sui-core/src/traffic_controller/mod.rs index 0f7b38270faf23..a133a90adaff07 100644 --- a/crates/sui-core/src/traffic_controller/mod.rs +++ b/crates/sui-core/src/traffic_controller/mod.rs @@ -330,7 +330,7 @@ async fn handle_error_tally( metrics: Arc, mem_drainfile_present: bool, ) -> Result<(), reqwest::Error> { - if !tally.error_weight.is_sampled().await { + if !tally.error_weight.is_sampled() { return Ok(()); } let resp = policy.handle_tally(tally.clone()); @@ -364,7 +364,7 @@ async fn handle_spam_tally( metrics: Arc, mem_drainfile_present: bool, ) -> Result<(), reqwest::Error> { - if !policy_config.spam_sample_rate.is_sampled().await { + if !(tally.spam_weight.is_sampled() && policy_config.spam_sample_rate.is_sampled()) { return Ok(()); } let resp = policy.handle_tally(tally.clone()); @@ -649,7 +649,8 @@ impl TrafficSim { client, // TODO add proxy IP for testing None, - // TODO add weight adjustment + // TODO add weight adjustments + Weight::one(), Weight::one(), )); } else { diff --git a/crates/sui-core/src/traffic_controller/policies.rs b/crates/sui-core/src/traffic_controller/policies.rs index 64ebfe8cbb59bc..806206dc680289 100644 --- a/crates/sui-core/src/traffic_controller/policies.rs +++ b/crates/sui-core/src/traffic_controller/policies.rs @@ -137,6 +137,7 @@ pub struct TrafficTally { pub direct: Option, pub through_fullnode: Option, pub error_weight: Weight, + pub spam_weight: Weight, pub timestamp: SystemTime, } @@ -145,11 +146,13 @@ impl TrafficTally { direct: Option, through_fullnode: Option, error_weight: Weight, + spam_weight: Weight, ) -> Self { Self { direct, through_fullnode, error_weight, + spam_weight, timestamp: SystemTime::now(), } } @@ -418,18 +421,21 @@ mod tests { direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))), through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))), error_weight: Weight::zero(), + spam_weight: Weight::one(), timestamp: SystemTime::now(), }; let bob = TrafficTally { direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))), through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(4, 3, 2, 1))), error_weight: Weight::zero(), + spam_weight: Weight::one(), timestamp: SystemTime::now(), }; let charlie = TrafficTally { direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))), through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(5, 6, 7, 8))), error_weight: Weight::zero(), + spam_weight: Weight::one(), timestamp: SystemTime::now(), }; diff --git a/crates/sui-e2e-tests/tests/traffic_control_tests.rs b/crates/sui-e2e-tests/tests/traffic_control_tests.rs index df12c95bee009b..9fd583c017c950 100644 --- a/crates/sui-e2e-tests/tests/traffic_control_tests.rs +++ b/crates/sui-e2e-tests/tests/traffic_control_tests.rs @@ -4,12 +4,15 @@ //! NB: Most tests in this module expect real network connections and interactions, thus //! they should nearly all be tokio::test rather than simtest. +use core::panic; use jsonrpsee::{ core::{client::ClientT, RpcResult}, rpc_params, }; use std::fs::File; use std::time::Duration; +use sui_core::authority_client::make_network_authority_clients_with_network_config; +use sui_core::authority_client::AuthorityAPI; use sui_core::traffic_controller::{ nodefw_test_server::NodeFwTestServer, TrafficController, TrafficSim, }; @@ -17,6 +20,7 @@ use sui_json_rpc_types::{ SuiTransactionBlockEffectsAPI, SuiTransactionBlockResponse, SuiTransactionBlockResponseOptions, }; use sui_macros::sim_test; +use sui_network::default_mysten_network_config; use sui_swarm_config::network_config_builder::ConfigBuilder; use sui_test_transaction_builder::batch_make_transfer_transactions; use sui_types::{ @@ -135,16 +139,16 @@ async fn test_validator_traffic_control_dry_run() -> Result<(), anyhow::Error> { .build() .await; - assert_traffic_control_dry_run(test_cluster, n as usize).await + assert_validator_traffic_control_dry_run(test_cluster, n as usize).await } #[tokio::test] async fn test_fullnode_traffic_control_dry_run() -> Result<(), anyhow::Error> { - let n = 15; + let txn_count = 15; let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 1, proxy_blocklist_ttl_sec: 5, - spam_policy_type: PolicyType::TestNConnIP(n - 1), + spam_policy_type: PolicyType::TestNConnIP(txn_count - 1), spam_sample_rate: Weight::one(), // This should never be invoked when set as an error policy // as we are not sending requests that error @@ -156,37 +160,102 @@ async fn test_fullnode_traffic_control_dry_run() -> Result<(), anyhow::Error> { .with_fullnode_policy_config(Some(policy_config)) .build() .await; - assert_traffic_control_dry_run(test_cluster, n as usize).await + + let context = test_cluster.wallet; + let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client; + let mut txns = batch_make_transfer_transactions(&context, txn_count as usize).await; + assert!( + txns.len() >= txn_count as usize, + "Expect at least {} txns. Do we generate enough gas objects during genesis?", + txn_count, + ); + + let txn = txns.swap_remove(0); + let tx_digest = txn.digest(); + let (tx_bytes, signatures) = txn.to_tx_bytes_and_signatures(); + let params = rpc_params![ + tx_bytes, + signatures, + SuiTransactionBlockResponseOptions::new(), + ExecuteTransactionRequestType::WaitForLocalExecution + ]; + + let response: SuiTransactionBlockResponse = jsonrpc_client + .request("sui_executeTransactionBlock", params.clone()) + .await + .unwrap(); + let SuiTransactionBlockResponse { + digest, + confirmed_local_execution, + .. + } = response; + assert_eq!(&digest, tx_digest); + assert!(confirmed_local_execution.unwrap()); + + // it should take no more than 4 requests to be added to the blocklist + for _ in 0..txn_count { + let response: RpcResult = jsonrpc_client + .request("sui_getTransactionBlock", rpc_params![*tx_digest]) + .await; + assert!( + response.is_ok(), + "Expected request to succeed in dry-run mode" + ); + } + Ok(()) } #[tokio::test] -async fn test_validator_traffic_control_spam_blocked() -> Result<(), anyhow::Error> { +async fn test_validator_traffic_control_error_blocked() -> Result<(), anyhow::Error> { let n = 5; let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 1, // Test that any N requests will cause an IP to be added to the blocklist. - spam_policy_type: PolicyType::TestNConnIP(n - 1), - spam_sample_rate: Weight::one(), + error_policy_type: PolicyType::TestNConnIP(n - 1), dry_run: false, ..Default::default() }; let network_config = ConfigBuilder::new_with_temp_dir() .with_policy_config(Some(policy_config)) .build(); - let test_cluster = TestClusterBuilder::new() + let committee = network_config.committee_with_network(); + let _test_cluster = TestClusterBuilder::new() .set_network_config(network_config) .build() .await; - assert_traffic_control_spam_blocked(test_cluster, n as usize).await + let local_clients = make_network_authority_clients_with_network_config( + &committee, + &default_mysten_network_config(), + ) + .unwrap(); + let (_, auth_client) = local_clients.first_key_value().unwrap(); + + // transaction signed using user wallet from a different chain/genesis, + // therefore we should fail with UserInputError + let other_cluster = TestClusterBuilder::new().build().await; + + let mut txns = batch_make_transfer_transactions(&other_cluster.wallet, n as usize).await; + let tx = txns.swap_remove(0); + + // it should take no more than 4 requests to be added to the blocklist + for _ in 0..n { + let response = auth_client.handle_transaction(tx.clone(), None).await; + if let Err(err) = response { + if err.to_string().contains("Too many requests") { + return Ok(()); + } + } + } + panic!("Expected spam policy to trigger within {n} requests"); } #[tokio::test] async fn test_fullnode_traffic_control_spam_blocked() -> Result<(), anyhow::Error> { - let n = 15; + let txn_count = 15; let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 3, // Test that any N requests will cause an IP to be added to the blocklist. - spam_policy_type: PolicyType::TestNConnIP(n - 1), + spam_policy_type: PolicyType::TestNConnIP(txn_count - 1), spam_sample_rate: Weight::one(), dry_run: false, ..Default::default() @@ -195,19 +264,68 @@ async fn test_fullnode_traffic_control_spam_blocked() -> Result<(), anyhow::Erro .with_fullnode_policy_config(Some(policy_config)) .build() .await; - assert_traffic_control_spam_blocked(test_cluster, n as usize).await + + let context = test_cluster.wallet; + let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client; + + let mut txns = batch_make_transfer_transactions(&context, txn_count as usize).await; + assert!( + txns.len() >= txn_count as usize, + "Expect at least {} txns. Do we generate enough gas objects during genesis?", + txn_count, + ); + + let txn = txns.swap_remove(0); + let tx_digest = txn.digest(); + let (tx_bytes, signatures) = txn.to_tx_bytes_and_signatures(); + let params = rpc_params![ + tx_bytes, + signatures, + SuiTransactionBlockResponseOptions::new(), + ExecuteTransactionRequestType::WaitForLocalExecution + ]; + + let response: SuiTransactionBlockResponse = jsonrpc_client + .request("sui_executeTransactionBlock", params.clone()) + .await + .unwrap(); + let SuiTransactionBlockResponse { + digest, + confirmed_local_execution, + .. + } = response; + assert_eq!(&digest, tx_digest); + assert!(confirmed_local_execution.unwrap()); + + // it should take no more than 4 requests to be added to the blocklist + for _ in 0..txn_count { + let response: RpcResult = jsonrpc_client + .request("sui_getTransactionBlock", rpc_params![*tx_digest]) + .await; + if let Err(err) = response { + // TODO: fix validator blocking error handling such that the error message + // is not misleading. The full error message currently is the following: + // Transaction execution failed due to issues with transaction inputs, please + // review the errors and try again: Too many requests. + assert!( + err.to_string().contains("Too many requests"), + "Error not due to spam policy" + ); + return Ok(()); + } + } + panic!("Expected spam policy to trigger within {txn_count} requests"); } #[tokio::test] -async fn test_validator_traffic_control_spam_delegated() -> Result<(), anyhow::Error> { - let n = 4; +async fn test_validator_traffic_control_error_delegated() -> Result<(), anyhow::Error> { + let n = 5; let port = 65000; let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 120, proxy_blocklist_ttl_sec: 120, // Test that any N - 1 requests will cause an IP to be added to the blocklist. - spam_policy_type: PolicyType::TestNConnIP(n - 1), - spam_sample_rate: Weight::one(), + error_policy_type: PolicyType::TestNConnIP(n - 1), dry_run: false, ..Default::default() }; @@ -224,22 +342,58 @@ async fn test_validator_traffic_control_spam_delegated() -> Result<(), anyhow::E .with_policy_config(Some(policy_config)) .with_firewall_config(Some(firewall_config)) .build(); - let test_cluster = TestClusterBuilder::new() + let committee = network_config.committee_with_network(); + let _test_cluster = TestClusterBuilder::new() .set_network_config(network_config) .build() .await; - assert_traffic_control_spam_delegated(test_cluster, n as usize, port).await + let local_clients = make_network_authority_clients_with_network_config( + &committee, + &default_mysten_network_config(), + ) + .unwrap(); + let (_, auth_client) = local_clients.first_key_value().unwrap(); + + // transaction signed using user wallet from a different chain/genesis, + // therefore we should fail with UserInputError + let other_cluster = TestClusterBuilder::new().build().await; + + let mut txns = batch_make_transfer_transactions(&other_cluster.wallet, n as usize).await; + let tx = txns.swap_remove(0); + + // start test firewall server + let mut server = NodeFwTestServer::new(); + server.start(port).await; + // await for the server to start + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + + // it should take no more than 4 requests to be added to the blocklist + for _ in 0..n { + let response = auth_client.handle_transaction(tx.clone(), None).await; + if let Err(err) = response { + if err.to_string().contains("Too many requests") { + return Ok(()); + } + } + } + let fw_blocklist = server.list_addresses_rpc().await; + assert!( + !fw_blocklist.is_empty(), + "Expected blocklist to be non-empty" + ); + server.stop().await; + Ok(()) } #[tokio::test] async fn test_fullnode_traffic_control_spam_delegated() -> Result<(), anyhow::Error> { - let n = 10; + let txn_count = 10; let port = 65001; let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 120, proxy_blocklist_ttl_sec: 120, // Test that any N - 1 requests will cause an IP to be added to the blocklist. - spam_policy_type: PolicyType::TestNConnIP(n - 1), + spam_policy_type: PolicyType::TestNConnIP(txn_count - 1), spam_sample_rate: Weight::one(), dry_run: false, ..Default::default() @@ -258,7 +412,57 @@ async fn test_fullnode_traffic_control_spam_delegated() -> Result<(), anyhow::Er .with_fullnode_fw_config(Some(firewall_config.clone())) .build() .await; - assert_traffic_control_spam_delegated(test_cluster, n as usize, port).await + + // start test firewall server + let mut server = NodeFwTestServer::new(); + server.start(port).await; + // await for the server to start + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + let context = test_cluster.wallet; + let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client; + let mut txns = batch_make_transfer_transactions(&context, txn_count as usize).await; + assert!( + txns.len() >= txn_count as usize, + "Expect at least {} txns. Do we generate enough gas objects during genesis?", + txn_count, + ); + + let txn = txns.swap_remove(0); + let tx_digest = txn.digest(); + let (tx_bytes, signatures) = txn.to_tx_bytes_and_signatures(); + let params = rpc_params![ + tx_bytes, + signatures, + SuiTransactionBlockResponseOptions::new(), + ExecuteTransactionRequestType::WaitForLocalExecution + ]; + + // it should take no more than 4 requests to be added to the blocklist + let response: SuiTransactionBlockResponse = jsonrpc_client + .request("sui_executeTransactionBlock", params.clone()) + .await + .unwrap(); + let SuiTransactionBlockResponse { + digest, + confirmed_local_execution, + .. + } = response; + assert_eq!(&digest, tx_digest); + assert!(confirmed_local_execution.unwrap()); + + for _ in 0..txn_count { + let response: RpcResult = jsonrpc_client + .request("sui_getTransactionBlock", rpc_params![*tx_digest]) + .await; + assert!(response.is_ok(), "Expected request to succeed"); + } + let fw_blocklist = server.list_addresses_rpc().await; + assert!( + !fw_blocklist.is_empty(), + "Expected blocklist to be non-empty" + ); + server.stop().await; + Ok(()) } #[tokio::test] @@ -511,7 +715,7 @@ async fn assert_traffic_control_ok(mut test_cluster: TestCluster) -> Result<(), /// Test that in dry-run mode, actions that would otherwise /// lead to request blocking (in this case, a spammy client) /// are allowed to proceed. -async fn assert_traffic_control_dry_run( +async fn assert_validator_traffic_control_dry_run( mut test_cluster: TestCluster, txn_count: usize, ) -> Result<(), anyhow::Error> { @@ -525,6 +729,7 @@ async fn assert_traffic_control_dry_run( ); let txn = txns.swap_remove(0); + let tx_digest = txn.digest(); let (tx_bytes, signatures) = txn.to_tx_bytes_and_signatures(); let params = rpc_params![ tx_bytes, @@ -533,10 +738,22 @@ async fn assert_traffic_control_dry_run( ExecuteTransactionRequestType::WaitForLocalExecution ]; + let response: SuiTransactionBlockResponse = jsonrpc_client + .request("sui_executeTransactionBlock", params.clone()) + .await + .unwrap(); + let SuiTransactionBlockResponse { + digest, + confirmed_local_execution, + .. + } = response; + assert_eq!(&digest, tx_digest); + assert!(confirmed_local_execution.unwrap()); + // it should take no more than 4 requests to be added to the blocklist for _ in 0..txn_count { let response: RpcResult = jsonrpc_client - .request("sui_executeTransactionBlock", params.clone()) + .request("sui_getTransactionBlock", rpc_params![*tx_digest]) .await; assert!( response.is_ok(), @@ -545,90 +762,3 @@ async fn assert_traffic_control_dry_run( } Ok(()) } - -async fn assert_traffic_control_spam_blocked( - mut test_cluster: TestCluster, - txn_count: usize, -) -> Result<(), anyhow::Error> { - let context = &mut test_cluster.wallet; - let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client; - - let mut txns = batch_make_transfer_transactions(context, txn_count).await; - assert!( - txns.len() >= txn_count, - "Expect at least {} txns. Do we generate enough gas objects during genesis?", - txn_count, - ); - - let txn = txns.swap_remove(0); - let (tx_bytes, signatures) = txn.to_tx_bytes_and_signatures(); - let params = rpc_params![ - tx_bytes, - signatures, - SuiTransactionBlockResponseOptions::new(), - ExecuteTransactionRequestType::WaitForLocalExecution - ]; - - // it should take no more than 4 requests to be added to the blocklist - for _ in 0..txn_count { - let response: RpcResult = jsonrpc_client - .request("sui_executeTransactionBlock", params.clone()) - .await; - if let Err(err) = response { - // TODO: fix validator blocking error handling such that the error message - // is not misleading. The full error message currently is the following: - // Transaction execution failed due to issues with transaction inputs, please - // review the errors and try again: Too many requests. - assert!( - err.to_string().contains("Too many requests"), - "Error not due to spam policy" - ); - return Ok(()); - } - } - panic!("Expected spam policy to trigger within {txn_count} requests"); -} - -async fn assert_traffic_control_spam_delegated( - mut test_cluster: TestCluster, - txn_count: usize, - listen_port: u16, -) -> Result<(), anyhow::Error> { - // start test firewall server - let mut server = NodeFwTestServer::new(); - server.start(listen_port).await; - // await for the server to start - tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - let context = &mut test_cluster.wallet; - let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client; - let mut txns = batch_make_transfer_transactions(context, txn_count).await; - assert!( - txns.len() >= txn_count, - "Expect at least {} txns. Do we generate enough gas objects during genesis?", - txn_count, - ); - - let txn = txns.swap_remove(0); - let (tx_bytes, signatures) = txn.to_tx_bytes_and_signatures(); - let params = rpc_params![ - tx_bytes, - signatures, - SuiTransactionBlockResponseOptions::new(), - ExecuteTransactionRequestType::WaitForLocalExecution - ]; - - // it should take no more than 4 requests to be added to the blocklist - for _ in 0..txn_count { - let response: RpcResult = jsonrpc_client - .request("sui_executeTransactionBlock", params.clone()) - .await; - assert!(response.is_ok(), "Expected request to succeed"); - } - let fw_blocklist = server.list_addresses_rpc().await; - assert!( - !fw_blocklist.is_empty(), - "Expected blocklist to be non-empty" - ); - server.stop().await; - Ok(()) -} diff --git a/crates/sui-json-rpc/src/axum_router.rs b/crates/sui-json-rpc/src/axum_router.rs index 5700f708f37e9d..f7ef9d7693b67f 100644 --- a/crates/sui-json-rpc/src/axum_router.rs +++ b/crates/sui-json-rpc/src/axum_router.rs @@ -165,12 +165,13 @@ async fn process_raw_request( return blocked_response; } } - let response = process_request(request, api_version, service.call_data()).await; // handle response tallying + let response = process_request(request, api_version, service.call_data()).await; if let Some(traffic_controller) = &service.traffic_controller { handle_traffic_resp(traffic_controller.clone(), client, &response); } + response } else if let Ok(_batch) = serde_json::from_str::>(raw_request) { MethodResponse::error( @@ -207,6 +208,14 @@ fn handle_traffic_resp( direct: client, through_fullnode: None, error_weight: error.map(normalize).unwrap_or(Weight::zero()), + // For now, count everything as spam with equal weight + // on the rpc node side, including gas-charging endpoints + // such as `sui_executeTransactionBlock`, as this can enable + // node operators who wish to rate limit their transcation + // traffic and incentivize high volume clients to choose a + // suitable rpc provider (or run their own). Later we may want + // to provide a weight distribution based on the method being called. + spam_weight: Weight::one(), timestamp: SystemTime::now(), }); } diff --git a/crates/sui-types/src/traffic_control.rs b/crates/sui-types/src/traffic_control.rs index c5d5d41116f0a8..e976081abcecd9 100644 --- a/crates/sui-types/src/traffic_control.rs +++ b/crates/sui-types/src/traffic_control.rs @@ -53,7 +53,7 @@ impl Weight { self.0 } - pub async fn is_sampled(&self) -> bool { + pub fn is_sampled(&self) -> bool { let mut rng = rand::thread_rng(); let sample = rand::distributions::Uniform::new(0.0, 1.0).sample(&mut rng); sample <= self.value() @@ -201,6 +201,11 @@ pub struct PolicyConfig { #[serde(default = "default_channel_capacity")] pub channel_capacity: usize, #[serde(default = "default_spam_sample_rate")] + /// Note that this sample policy is applied on top of the + /// endpoint-specific sample policy (not configurable) which + /// weighs endpoints by the relative effort required to serve + /// them. Therefore a sample rate of N will yield an actual + /// sample rate <= N. pub spam_sample_rate: Weight, #[serde(default = "default_dry_run")] pub dry_run: bool, diff --git a/crates/test-cluster/src/lib.rs b/crates/test-cluster/src/lib.rs index 7b2b084832ab54..78f7d65a077cd5 100644 --- a/crates/test-cluster/src/lib.rs +++ b/crates/test-cluster/src/lib.rs @@ -23,6 +23,7 @@ use sui_bridge::types::CertifiedBridgeAction; use sui_bridge::types::VerifiedCertifiedBridgeAction; use sui_bridge::utils::publish_and_register_coins_return_add_coins_on_sui_action; use sui_bridge::utils::wait_for_server_to_be_up; +use sui_config::genesis::Genesis; use sui_config::local_ip_utils::get_available_port; use sui_config::node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange}; use sui_config::{Config, SUI_CLIENT_CONFIG, SUI_NETWORK_CONFIG}; @@ -210,6 +211,10 @@ impl TestCluster { self.swarm.active_validators().map(|v| v.name()).collect() } + pub fn get_genesis(&self) -> Genesis { + self.swarm.config().genesis.clone() + } + pub fn stop_node(&self, name: &AuthorityName) { self.swarm.node(name).unwrap().stop(); }