Skip to content

Commit

Permalink
Cp traffic control changes (#18441)
Browse files Browse the repository at this point in the history
## Description 

CP #18396 and
#18324

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
williampsmith authored Jun 27, 2024
1 parent 0249e7f commit 926a1c3
Show file tree
Hide file tree
Showing 8 changed files with 400 additions and 173 deletions.
139 changes: 84 additions & 55 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ impl ValidatorService {
async fn handle_transaction(
&self,
request: tonic::Request<Transaction>,
) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
) -> WrappedServiceResponse<HandleTransactionResponse> {
let Self {
state,
consensus_adapter,
Expand Down Expand Up @@ -410,9 +410,14 @@ impl ValidatorService {
// to save more CPU.
return Err(error.into());
}
Ok(tonic::Response::new(info))
Ok((tonic::Response::new(info), Weight::zero()))
}

// In addition to the response from handling the certificates,
// returns a bool indicating whether the request should be tallied
// toward spam count. In general, this should be set to true for
// requests that are read-only and thus do not consume gas, such
// as when the transaction is already executed.
async fn handle_certificates(
&self,
certificates: NonEmpty<CertifiedTransaction>,
Expand All @@ -422,7 +427,7 @@ impl ValidatorService {
_include_auxiliary_data: bool,
epoch_store: &Arc<AuthorityPerEpochStore>,
wait_for_effects: bool,
) -> Result<Option<Vec<HandleCertificateResponseV3>>, tonic::Status> {
) -> Result<(Option<Vec<HandleCertificateResponseV3>>, Weight), tonic::Status> {
// Validate if cert can be executed
// Fullnode does not serve handle_certificate call.
fp_ensure!(
Expand Down Expand Up @@ -468,13 +473,16 @@ impl ValidatorService {
None
};

return Ok(Some(vec![HandleCertificateResponseV3 {
effects: signed_effects.into_inner(),
events,
input_objects: None,
output_objects: None,
auxiliary_data: None,
}]));
return Ok((
Some(vec![HandleCertificateResponseV3 {
effects: signed_effects.into_inner(),
events,
input_objects: None,
output_objects: None,
auxiliary_data: None,
}]),
Weight::one(),
));
};
}

Expand Down Expand Up @@ -558,7 +566,7 @@ impl ValidatorService {
epoch_store,
);
}
return Ok(None);
return Ok((None, Weight::zero()));
}

// 4) Execute the certificates immediately if they contain only owned object transactions,
Expand Down Expand Up @@ -598,22 +606,24 @@ impl ValidatorService {
))
.await?;

Ok(Some(responses))
Ok((Some(responses), Weight::zero()))
}
}

type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;

impl ValidatorService {
async fn transaction_impl(
&self,
request: tonic::Request<Transaction>,
) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
) -> WrappedServiceResponse<HandleTransactionResponse> {
self.handle_transaction(request).await
}

async fn submit_certificate_impl(
&self,
request: tonic::Request<CertifiedTransaction>,
) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
) -> WrappedServiceResponse<SubmitCertificateResponse> {
let epoch_store = self.state.load_epoch_store_one_call_per_task();
let certificate = request.into_inner();
Self::transaction_validity_check(&epoch_store, certificate.data())?;
Expand All @@ -630,17 +640,20 @@ impl ValidatorService {
)
.instrument(span)
.await
.map(|executed| {
tonic::Response::new(SubmitCertificateResponse {
executed: executed.map(|mut x| x.remove(0)).map(Into::into),
})
.map(|(executed, spam_weight)| {
(
tonic::Response::new(SubmitCertificateResponse {
executed: executed.map(|mut x| x.remove(0)).map(Into::into),
}),
spam_weight,
)
})
}

async fn handle_certificate_v2_impl(
&self,
request: tonic::Request<CertifiedTransaction>,
) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
) -> WrappedServiceResponse<HandleCertificateResponseV2> {
let epoch_store = self.state.load_epoch_store_one_call_per_task();
let certificate = request.into_inner();
Self::transaction_validity_check(&epoch_store, certificate.data())?;
Expand All @@ -657,19 +670,24 @@ impl ValidatorService {
)
.instrument(span)
.await
.map(|v| {
tonic::Response::new(
v.expect("handle_certificate should not return none with wait_for_effects=true")
.map(|(resp, spam_weight)| {
(
tonic::Response::new(
resp.expect(
"handle_certificate should not return none with wait_for_effects=true",
)
.remove(0)
.into(),
),
spam_weight,
)
})
}

async fn handle_certificate_v3_impl(
&self,
request: tonic::Request<HandleCertificateRequestV3>,
) -> Result<tonic::Response<HandleCertificateResponseV3>, tonic::Status> {
) -> WrappedServiceResponse<HandleCertificateResponseV3> {
let epoch_store = self.state.load_epoch_store_one_call_per_task();
let request = request.into_inner();
Self::transaction_validity_check(&epoch_store, request.certificate.data())?;
Expand All @@ -686,10 +704,15 @@ impl ValidatorService {
)
.instrument(span)
.await
.map(|v| {
tonic::Response::new(
v.expect("handle_certificate should not return none with wait_for_effects=true")
.map(|(resp, spam_weight)| {
(
tonic::Response::new(
resp.expect(
"handle_certificate should not return none with wait_for_effects=true",
)
.remove(0),
),
spam_weight,
)
})
}
Expand Down Expand Up @@ -781,7 +804,7 @@ impl ValidatorService {
async fn handle_soft_bundle_certificates_v3_impl(
&self,
request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV3>, tonic::Status> {
) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV3> {
let epoch_store = self.state.load_epoch_store_one_call_per_task();
let request = request.into_inner();

Expand Down Expand Up @@ -811,10 +834,13 @@ impl ValidatorService {
)
.instrument(span)
.await
.map(|v| {
tonic::Response::new(HandleSoftBundleCertificatesResponseV3 {
responses: v.unwrap_or_default(),
})
.map(|(resp, spam_weight)| {
(
tonic::Response::new(HandleSoftBundleCertificatesResponseV3 {
responses: resp.unwrap_or_default(),
}),
spam_weight,
)
})
}

Expand All @@ -841,49 +867,48 @@ impl ValidatorService {
async fn object_info_impl(
&self,
request: tonic::Request<ObjectInfoRequest>,
) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
) -> WrappedServiceResponse<ObjectInfoResponse> {
let request = request.into_inner();
let response = self.state.handle_object_info_request(request).await?;
Ok(tonic::Response::new(response))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn transaction_info_impl(
&self,
request: tonic::Request<TransactionInfoRequest>,
) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
) -> WrappedServiceResponse<TransactionInfoResponse> {
let request = request.into_inner();
let response = self.state.handle_transaction_info_request(request).await?;
Ok(tonic::Response::new(response))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn checkpoint_impl(
&self,
request: tonic::Request<CheckpointRequest>,
) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
) -> WrappedServiceResponse<CheckpointResponse> {
let request = request.into_inner();
let response = self.state.handle_checkpoint_request(&request)?;
Ok(tonic::Response::new(response))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn checkpoint_v2_impl(
&self,
request: tonic::Request<CheckpointRequestV2>,
) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
) -> WrappedServiceResponse<CheckpointResponseV2> {
let request = request.into_inner();
let response = self.state.handle_checkpoint_request_v2(&request)?;
Ok(tonic::Response::new(response))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn get_system_state_object_impl(
&self,
_request: tonic::Request<SystemStateRequest>,
) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
) -> WrappedServiceResponse<SuiSystemState> {
let response = self
.state
.get_object_cache_reader()
.get_sui_system_state_object_unsafe()?;

Ok(tonic::Response::new(response))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
Expand All @@ -902,22 +927,27 @@ impl ValidatorService {
fn handle_traffic_resp<T>(
&self,
client: Option<IpAddr>,
response: &Result<tonic::Response<T>, tonic::Status>,
) {
let error: Option<SuiError> = if let Err(status) = response {
Some(SuiError::from(status.clone()))
} else {
None
wrapped_response: WrappedServiceResponse<T>,
) -> Result<tonic::Response<T>, tonic::Status> {
let (error, spam_weight, unwrapped_response) = match wrapped_response {
Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
Err(status) => (
Some(SuiError::from(status.clone())),
Weight::zero(),
Err(status.clone()),
),
};

if let Some(traffic_controller) = self.traffic_controller.clone() {
traffic_controller.tally(TrafficTally {
direct: client,
through_fullnode: None,
error_weight: error.map(normalize).unwrap_or(Weight::zero()),
spam_weight,
timestamp: SystemTime::now(),
})
}
unwrapped_response
}
}

Expand All @@ -935,7 +965,7 @@ fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {

// TODO: refine error matching here
fn normalize(err: SuiError) -> Weight {
match err {
match dbg!(err) {
SuiError::UserInputError { .. }
| SuiError::InvalidSignature { .. }
| SuiError::SignerSignatureAbsent { .. }
Expand All @@ -954,7 +984,7 @@ fn normalize(err: SuiError) -> Weight {
macro_rules! handle_with_decoration {
($self:ident, $func_name:ident, $request:ident) => {{
if $self.client_id_source.is_none() {
return $self.$func_name($request).await;
return $self.$func_name($request).await.map(|(result, _)| result);
}

let client = match $self.client_id_source.as_ref().unwrap() {
Expand Down Expand Up @@ -1018,11 +1048,10 @@ macro_rules! handle_with_decoration {

// check if either IP is blocked, in which case return early
$self.handle_traffic_req(client.clone()).await?;
// handle request
let response = $self.$func_name($request).await;
// handle response tallying
$self.handle_traffic_resp(client, &response);
response

// handle traffic tallying
let wrapped_response = $self.$func_name($request).await;
$self.handle_traffic_resp(client, wrapped_response)
}};
}

Expand Down
8 changes: 8 additions & 0 deletions crates/sui-core/src/traffic_controller/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct TrafficControllerMetrics {
pub num_dry_run_blocked_requests: IntCounter,
pub tally_handled: IntCounter,
pub error_tally_handled: IntCounter,
pub deadmans_switch_enabled: IntGauge,
}

impl TrafficControllerMetrics {
Expand Down Expand Up @@ -85,6 +86,13 @@ impl TrafficControllerMetrics {
registry
)
.unwrap(),
deadmans_switch_enabled: register_int_gauge_with_registry!(
"deadmans_switch_enabled",
"If 1, the deadman's switch is enabled and all traffic control
should be getting bypassed",
registry
)
.unwrap(),
}
}

Expand Down
Loading

0 comments on commit 926a1c3

Please sign in to comment.