Skip to content

Commit

Permalink
u128 cannot be deserialized correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
markopoloparadox committed Feb 16, 2024
1 parent 3fbf158 commit 3cc9feb
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 45 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,15 @@ pub(crate) async fn import_single_block_metered<
if let Some(metrics) = metrics.as_ref() {
metrics.report_verification_and_import(started.elapsed());
}
if let Ok(data) = BlockMetricsTelemetry::try_from(BlockMetrics::take()) {
if let Some(data) = BlockMetrics::take().to_block_metrics_telemetry() {
println!("{:?}", data);
telemetry!(
telemetry;
SUBSTRATE_INFO;
"block.metrics";
"proposalTimestamps" => data.proposal_timestamps,
"syncBlockTimestamps" => data.sync_block_start_timestamps,
"importBlockTimestamps" => data.import_block_timestamps,
"proposal_timestamps" => data.proposal_timestamps,
"sync_block_timestamps" => data.sync_block_timestamps,
"import_block_timestamps" => data.import_block_timestamps,
);
}

Expand Down
1 change: 1 addition & 0 deletions client/consensus/slots/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ sp-core = { version = "21.0.0", path = "../../../primitives/core" }
sp-inherents = { version = "4.0.0-dev", path = "../../../primitives/inherents" }
sp-runtime = { version = "24.0.0", path = "../../../primitives/runtime" }
sp-state-machine = { version = "0.28.0", path = "../../../primitives/state-machine" }
block-metrics = { path = "../../metrics" }

[dev-dependencies]
substrate-test-runtime-client = { version = "2.0.0", path = "../../../test-utils/runtime/client" }
24 changes: 23 additions & 1 deletion client/consensus/slots/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@ mod aux_schema;
mod slots;

pub use aux_schema::{check_equivocation, MAX_SLOT_CAPACITY, PRUNING_BOUND};
use block_metrics::{BlockMetrics, BlockMetricsTelemetry};
pub use slots::SlotInfo;
use slots::Slots;

use futures::{future::Either, Future, TryFutureExt};
use futures_timer::Delay;
use log::{debug, info, warn};
use sc_consensus::{BlockImport, JustificationSyncLink};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO, CONSENSUS_WARN};
use sc_telemetry::{
telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO, CONSENSUS_WARN, SUBSTRATE_INFO,
};
use sp_arithmetic::traits::BaseArithmetic;
use sp_consensus::{Proposal, Proposer, SelectChain, SyncOracle};
use sp_consensus_slots::{Slot, SlotDuration};
Expand Down Expand Up @@ -432,6 +435,9 @@ pub trait SimpleSlotWorker<B: BlockT> {
);

let header = block_import_params.post_header();
if let Ok(block_number) = header_num.try_into() {
BlockMetrics::observe_import_block_start_timestamp(block_number);
}
match self.block_import().import_block(block_import_params).await {
Ok(res) => {
res.handle_justification(
Expand All @@ -456,6 +462,22 @@ pub trait SimpleSlotWorker<B: BlockT> {
},
}

if let Ok(block_number) = header_num.try_into() {
BlockMetrics::observe_import_block_end_timestamp(block_number);
}

if let Some(data) = BlockMetrics::take().to_block_metrics_telemetry() {
println!("{:?}", data);
telemetry!(
telemetry;
SUBSTRATE_INFO;
"block.metrics";
"proposal_timestamps" => data.proposal_timestamps,
"sync_block_timestamps" => data.sync_block_timestamps,
"import_block_timestamps" => data.import_block_timestamps,
);
}

Some(SlotResult { block: B::new(header, body), storage_proof })
}
}
Expand Down
103 changes: 63 additions & 40 deletions client/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// the telemetry data.

use std::{
sync::OnceLock,
sync::Mutex,
time::{SystemTime, SystemTimeError, UNIX_EPOCH},
};

Expand All @@ -19,120 +19,143 @@ pub struct BlockMetrics {
pub import_block_end_timestamp: Option<(u64, u128)>, // (block number, timestamp in ms).
}

pub const BLOCK_METRICS: OnceLock<BlockMetrics> = OnceLock::new();
pub static BLOCK_METRICS: Mutex<BlockMetrics> = Mutex::new(BlockMetrics::new());
impl BlockMetrics {
pub const fn new() -> Self {
Self {
proposal_end_timestamp: None,
proposal_time: None,
new_sync_target_timestamp: None,
accepted_block_timestamp: None,
import_block_start_timestamp: None,
import_block_end_timestamp: None,
}
}

pub fn observe_proposal_end_timestamp(block_number: u64) {
let Ok(timestamp) = Self::get_current_timestamp_in_ms() else {
return;
};

let mut metrics = BLOCK_METRICS.get_or_init(|| BlockMetrics::default()).clone();
let Ok(mut metrics) = BLOCK_METRICS.lock() else {
return;
};

metrics.proposal_end_timestamp = Some((block_number, timestamp));
_ = BLOCK_METRICS.set(metrics);
}

pub fn observe_proposal_time(block_number: u64, time: u128) {
let mut metrics = BLOCK_METRICS.get_or_init(|| BlockMetrics::default()).clone();
let Ok(mut metrics) = BLOCK_METRICS.lock() else {
return;
};

metrics.proposal_time = Some((block_number, time));
_ = BLOCK_METRICS.set(metrics);
}

pub fn observe_new_sync_target_timestamp(block_number: u64) {
let Ok(timestamp) = Self::get_current_timestamp_in_ms() else {
return;
};
let Ok(mut metrics) = BLOCK_METRICS.lock() else {
return;
};

let mut metrics = BLOCK_METRICS.get_or_init(|| BlockMetrics::default()).clone();
metrics.new_sync_target_timestamp = Some((block_number, timestamp));
_ = BLOCK_METRICS.set(metrics);
}

pub fn observe_accepted_block_timestamp(block_number: u64) {
let Ok(timestamp) = Self::get_current_timestamp_in_ms() else {
return;
};
let Ok(mut metrics) = BLOCK_METRICS.lock() else {
return;
};

let mut metrics = BLOCK_METRICS.get_or_init(|| BlockMetrics::default()).clone();
metrics.accepted_block_timestamp = Some((block_number, timestamp));
_ = BLOCK_METRICS.set(metrics);
}

pub fn observe_import_block_start_timestamp(block_number: u64) {
let Ok(timestamp) = Self::get_current_timestamp_in_ms() else {
return;
};
let Ok(mut metrics) = BLOCK_METRICS.lock() else {
return;
};

let mut metrics: BlockMetrics =
BLOCK_METRICS.get_or_init(|| BlockMetrics::default()).clone();
metrics.import_block_start_timestamp = Some((block_number, timestamp));
_ = BLOCK_METRICS.set(metrics);
}

pub fn observe_import_block_end_timestamp(block_number: u64) {
let Ok(timestamp) = Self::get_current_timestamp_in_ms() else {
return;
};
let Ok(mut metrics) = BLOCK_METRICS.lock() else {
return;
};

let mut metrics = BLOCK_METRICS.get_or_init(|| BlockMetrics::default()).clone();
metrics.import_block_end_timestamp = Some((block_number, timestamp));
_ = BLOCK_METRICS.set(metrics);
}

pub fn take() -> BlockMetrics {
let metrics = BLOCK_METRICS.get_or_init(|| BlockMetrics::default()).clone();
_ = BLOCK_METRICS.set(BlockMetrics::default());
let Ok(mut metrics) = BLOCK_METRICS.lock() else {
return BlockMetrics::new();
};

metrics
}
let val = metrics.clone();
*metrics = BlockMetrics::new();

fn get_current_timestamp_in_ms() -> Result<u128, SystemTimeError> {
let start = SystemTime::now();
start.duration_since(UNIX_EPOCH).map(|f| f.as_millis())
val
}
}

impl TryFrom<BlockMetrics> for BlockMetricsTelemetry {
type Error = ();

fn try_from(value: BlockMetrics) -> Result<Self, Self::Error> {
pub fn to_block_metrics_telemetry(self) -> Option<BlockMetricsTelemetry> {
let mut proposal_timestamps = None;
if let (Some(end), Some(time)) = (&value.proposal_end_timestamp, &value.proposal_time) {
if let (Some(end), Some(time)) = (&self.proposal_end_timestamp, &self.proposal_time) {
if end.0 == time.0 {
proposal_timestamps = Some((end.1 - time.1, end.1, end.0));
proposal_timestamps = Some(((end.1 - time.1) as u64, end.1 as u64, end.0));
}
}

let mut sync_block_start_timestamps = None;
let mut sync_block_timestamps = None;
if let (Some(start), Some(end)) =
(&value.new_sync_target_timestamp, &value.accepted_block_timestamp)
(&self.new_sync_target_timestamp, &self.accepted_block_timestamp)
{
if start.0 == end.0 {
sync_block_start_timestamps = Some((start.1, end.1, start.0));
sync_block_timestamps = Some((start.1 as u64, end.1 as u64, start.0));
}
}

let mut import_block_timestamps = None;
if let (Some(start), Some(end)) =
(&value.import_block_start_timestamp, &value.import_block_end_timestamp)
(&self.import_block_start_timestamp, &self.import_block_end_timestamp)
{
if start.0 == end.0 {
import_block_timestamps = Some((start.1, end.1, start.0));
import_block_timestamps = Some((start.1 as u64, end.1 as u64, start.0));
}
}

if proposal_timestamps.is_none()
&& sync_block_start_timestamps.is_none()
&& sync_block_timestamps.is_none()
&& import_block_timestamps.is_none()
{
return Err(());
return None;
}

Ok(Self { proposal_timestamps, sync_block_start_timestamps, import_block_timestamps })
Some(BlockMetricsTelemetry {
proposal_timestamps,
sync_block_timestamps,
import_block_timestamps,
})
}

fn get_current_timestamp_in_ms() -> Result<u128, SystemTimeError> {
let start = SystemTime::now();
start.duration_since(UNIX_EPOCH).map(|f| f.as_millis())
}
}

#[derive(Debug)]
pub struct BlockMetricsTelemetry {
pub proposal_timestamps: Option<(u128, u128, u64)>, // (timestamp in ms (start, end, block_number))
pub sync_block_start_timestamps: Option<(u128, u128, u64)>, // (timestamp in ms (start, end, block_number))
pub import_block_timestamps: Option<(u128, u128, u64)>, // (timestamp in ms (start, end, block_number))
pub proposal_timestamps: Option<(u64, u64, u64)>, // (timestamp in ms (start, end, block_number))
pub sync_block_timestamps: Option<(u64, u64, u64)>, // (timestamp in ms (start, end, block_number))
pub import_block_timestamps: Option<(u64, u64, u64)>, // (timestamp in ms (start, end, block_number))
}

0 comments on commit 3cc9feb

Please sign in to comment.