From 3cc9febd438978d0611a8405105fba50d7ac0d17 Mon Sep 17 00:00:00 2001 From: Marko Petrlic Date: Fri, 16 Feb 2024 01:12:33 +0100 Subject: [PATCH] u128 cannot be deserialized correctly --- Cargo.lock | 1 + client/consensus/common/src/import_queue.rs | 9 +- client/consensus/slots/Cargo.toml | 1 + client/consensus/slots/src/lib.rs | 24 ++++- client/metrics/src/lib.rs | 103 ++++++++++++-------- 5 files changed, 93 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 937a5973940be..23f3d34ed27ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9491,6 +9491,7 @@ name = "sc-consensus-slots" version = "0.10.0-dev" dependencies = [ "async-trait", + "block-metrics", "futures", "futures-timer", "log", diff --git a/client/consensus/common/src/import_queue.rs b/client/consensus/common/src/import_queue.rs index 87dedf11b6d7d..8a61f120f7400 100644 --- a/client/consensus/common/src/import_queue.rs +++ b/client/consensus/common/src/import_queue.rs @@ -367,14 +367,15 @@ pub(crate) async fn import_single_block_metered< if let Some(metrics) = metrics.as_ref() { metrics.report_verification_and_import(started.elapsed()); } - if let Ok(data) = BlockMetricsTelemetry::try_from(BlockMetrics::take()) { + if let Some(data) = BlockMetrics::take().to_block_metrics_telemetry() { + println!("{:?}", data); telemetry!( telemetry; SUBSTRATE_INFO; "block.metrics"; - "proposalTimestamps" => data.proposal_timestamps, - "syncBlockTimestamps" => data.sync_block_start_timestamps, - "importBlockTimestamps" => data.import_block_timestamps, + "proposal_timestamps" => data.proposal_timestamps, + "sync_block_timestamps" => data.sync_block_timestamps, + "import_block_timestamps" => data.import_block_timestamps, ); } diff --git a/client/consensus/slots/Cargo.toml b/client/consensus/slots/Cargo.toml index 67eeae5317abb..09bda1a533c07 100644 --- a/client/consensus/slots/Cargo.toml +++ b/client/consensus/slots/Cargo.toml @@ -30,6 +30,7 @@ sp-core = { version = "21.0.0", path = "../../../primitives/core" } sp-inherents = { version = "4.0.0-dev", path = "../../../primitives/inherents" } sp-runtime = { version = "24.0.0", path = "../../../primitives/runtime" } sp-state-machine = { version = "0.28.0", path = "../../../primitives/state-machine" } +block-metrics = { path = "../../metrics" } [dev-dependencies] substrate-test-runtime-client = { version = "2.0.0", path = "../../../test-utils/runtime/client" } diff --git a/client/consensus/slots/src/lib.rs b/client/consensus/slots/src/lib.rs index 02f7047fe2f59..777744d00d25b 100644 --- a/client/consensus/slots/src/lib.rs +++ b/client/consensus/slots/src/lib.rs @@ -29,6 +29,7 @@ mod aux_schema; mod slots; pub use aux_schema::{check_equivocation, MAX_SLOT_CAPACITY, PRUNING_BOUND}; +use block_metrics::{BlockMetrics, BlockMetricsTelemetry}; pub use slots::SlotInfo; use slots::Slots; @@ -36,7 +37,9 @@ use futures::{future::Either, Future, TryFutureExt}; use futures_timer::Delay; use log::{debug, info, warn}; use sc_consensus::{BlockImport, JustificationSyncLink}; -use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO, CONSENSUS_WARN}; +use sc_telemetry::{ + telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO, CONSENSUS_WARN, SUBSTRATE_INFO, +}; use sp_arithmetic::traits::BaseArithmetic; use sp_consensus::{Proposal, Proposer, SelectChain, SyncOracle}; use sp_consensus_slots::{Slot, SlotDuration}; @@ -432,6 +435,9 @@ pub trait SimpleSlotWorker { ); let header = block_import_params.post_header(); + if let Ok(block_number) = header_num.try_into() { + BlockMetrics::observe_import_block_start_timestamp(block_number); + } match self.block_import().import_block(block_import_params).await { Ok(res) => { res.handle_justification( @@ -456,6 +462,22 @@ pub trait SimpleSlotWorker { }, } + if let Ok(block_number) = header_num.try_into() { + BlockMetrics::observe_import_block_end_timestamp(block_number); + } + + if let Some(data) = BlockMetrics::take().to_block_metrics_telemetry() { + println!("{:?}", data); + telemetry!( + telemetry; + SUBSTRATE_INFO; + "block.metrics"; + "proposal_timestamps" => data.proposal_timestamps, + "sync_block_timestamps" => data.sync_block_timestamps, + "import_block_timestamps" => data.import_block_timestamps, + ); + } + Some(SlotResult { block: B::new(header, body), storage_proof }) } } diff --git a/client/metrics/src/lib.rs b/client/metrics/src/lib.rs index e5d4e3f044c22..7e843d94fa1bc 100644 --- a/client/metrics/src/lib.rs +++ b/client/metrics/src/lib.rs @@ -5,7 +5,7 @@ // the telemetry data. use std::{ - sync::OnceLock, + sync::Mutex, time::{SystemTime, SystemTimeError, UNIX_EPOCH}, }; @@ -19,120 +19,143 @@ pub struct BlockMetrics { pub import_block_end_timestamp: Option<(u64, u128)>, // (block number, timestamp in ms). } -pub const BLOCK_METRICS: OnceLock = OnceLock::new(); +pub static BLOCK_METRICS: Mutex = Mutex::new(BlockMetrics::new()); impl BlockMetrics { + pub const fn new() -> Self { + Self { + proposal_end_timestamp: None, + proposal_time: None, + new_sync_target_timestamp: None, + accepted_block_timestamp: None, + import_block_start_timestamp: None, + import_block_end_timestamp: None, + } + } + pub fn observe_proposal_end_timestamp(block_number: u64) { let Ok(timestamp) = Self::get_current_timestamp_in_ms() else { return; }; - let mut metrics = BLOCK_METRICS.get_or_init(|| BlockMetrics::default()).clone(); + let Ok(mut metrics) = BLOCK_METRICS.lock() else { + return; + }; + metrics.proposal_end_timestamp = Some((block_number, timestamp)); - _ = BLOCK_METRICS.set(metrics); } pub fn observe_proposal_time(block_number: u64, time: u128) { - let mut metrics = BLOCK_METRICS.get_or_init(|| BlockMetrics::default()).clone(); + let Ok(mut metrics) = BLOCK_METRICS.lock() else { + return; + }; + metrics.proposal_time = Some((block_number, time)); - _ = BLOCK_METRICS.set(metrics); } pub fn observe_new_sync_target_timestamp(block_number: u64) { let Ok(timestamp) = Self::get_current_timestamp_in_ms() else { return; }; + let Ok(mut metrics) = BLOCK_METRICS.lock() else { + return; + }; - let mut metrics = BLOCK_METRICS.get_or_init(|| BlockMetrics::default()).clone(); metrics.new_sync_target_timestamp = Some((block_number, timestamp)); - _ = BLOCK_METRICS.set(metrics); } pub fn observe_accepted_block_timestamp(block_number: u64) { let Ok(timestamp) = Self::get_current_timestamp_in_ms() else { return; }; + let Ok(mut metrics) = BLOCK_METRICS.lock() else { + return; + }; - let mut metrics = BLOCK_METRICS.get_or_init(|| BlockMetrics::default()).clone(); metrics.accepted_block_timestamp = Some((block_number, timestamp)); - _ = BLOCK_METRICS.set(metrics); } pub fn observe_import_block_start_timestamp(block_number: u64) { let Ok(timestamp) = Self::get_current_timestamp_in_ms() else { return; }; + let Ok(mut metrics) = BLOCK_METRICS.lock() else { + return; + }; - let mut metrics: BlockMetrics = - BLOCK_METRICS.get_or_init(|| BlockMetrics::default()).clone(); metrics.import_block_start_timestamp = Some((block_number, timestamp)); - _ = BLOCK_METRICS.set(metrics); } pub fn observe_import_block_end_timestamp(block_number: u64) { let Ok(timestamp) = Self::get_current_timestamp_in_ms() else { return; }; + let Ok(mut metrics) = BLOCK_METRICS.lock() else { + return; + }; - let mut metrics = BLOCK_METRICS.get_or_init(|| BlockMetrics::default()).clone(); metrics.import_block_end_timestamp = Some((block_number, timestamp)); - _ = BLOCK_METRICS.set(metrics); } pub fn take() -> BlockMetrics { - let metrics = BLOCK_METRICS.get_or_init(|| BlockMetrics::default()).clone(); - _ = BLOCK_METRICS.set(BlockMetrics::default()); + let Ok(mut metrics) = BLOCK_METRICS.lock() else { + return BlockMetrics::new(); + }; - metrics - } + let val = metrics.clone(); + *metrics = BlockMetrics::new(); - fn get_current_timestamp_in_ms() -> Result { - let start = SystemTime::now(); - start.duration_since(UNIX_EPOCH).map(|f| f.as_millis()) + val } -} - -impl TryFrom for BlockMetricsTelemetry { - type Error = (); - fn try_from(value: BlockMetrics) -> Result { + pub fn to_block_metrics_telemetry(self) -> Option { let mut proposal_timestamps = None; - if let (Some(end), Some(time)) = (&value.proposal_end_timestamp, &value.proposal_time) { + if let (Some(end), Some(time)) = (&self.proposal_end_timestamp, &self.proposal_time) { if end.0 == time.0 { - proposal_timestamps = Some((end.1 - time.1, end.1, end.0)); + proposal_timestamps = Some(((end.1 - time.1) as u64, end.1 as u64, end.0)); } } - let mut sync_block_start_timestamps = None; + let mut sync_block_timestamps = None; if let (Some(start), Some(end)) = - (&value.new_sync_target_timestamp, &value.accepted_block_timestamp) + (&self.new_sync_target_timestamp, &self.accepted_block_timestamp) { if start.0 == end.0 { - sync_block_start_timestamps = Some((start.1, end.1, start.0)); + sync_block_timestamps = Some((start.1 as u64, end.1 as u64, start.0)); } } let mut import_block_timestamps = None; if let (Some(start), Some(end)) = - (&value.import_block_start_timestamp, &value.import_block_end_timestamp) + (&self.import_block_start_timestamp, &self.import_block_end_timestamp) { if start.0 == end.0 { - import_block_timestamps = Some((start.1, end.1, start.0)); + import_block_timestamps = Some((start.1 as u64, end.1 as u64, start.0)); } } if proposal_timestamps.is_none() - && sync_block_start_timestamps.is_none() + && sync_block_timestamps.is_none() && import_block_timestamps.is_none() { - return Err(()); + return None; } - Ok(Self { proposal_timestamps, sync_block_start_timestamps, import_block_timestamps }) + Some(BlockMetricsTelemetry { + proposal_timestamps, + sync_block_timestamps, + import_block_timestamps, + }) + } + + fn get_current_timestamp_in_ms() -> Result { + let start = SystemTime::now(); + start.duration_since(UNIX_EPOCH).map(|f| f.as_millis()) } } +#[derive(Debug)] pub struct BlockMetricsTelemetry { - pub proposal_timestamps: Option<(u128, u128, u64)>, // (timestamp in ms (start, end, block_number)) - pub sync_block_start_timestamps: Option<(u128, u128, u64)>, // (timestamp in ms (start, end, block_number)) - pub import_block_timestamps: Option<(u128, u128, u64)>, // (timestamp in ms (start, end, block_number)) + pub proposal_timestamps: Option<(u64, u64, u64)>, // (timestamp in ms (start, end, block_number)) + pub sync_block_timestamps: Option<(u64, u64, u64)>, // (timestamp in ms (start, end, block_number)) + pub import_block_timestamps: Option<(u64, u64, u64)>, // (timestamp in ms (start, end, block_number)) }