diff --git a/Cargo.lock b/Cargo.lock index 876a1fe01..07f591bd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2886,6 +2886,7 @@ dependencies = [ "fvm_shared", "hex", "ipc-api", + "ipc-observability", "ipc-provider", "ipc_ipld_resolver", "k256 0.11.6", @@ -2935,6 +2936,7 @@ dependencies = [ "fvm_shared", "hex", "ipc-api", + "ipc-observability", "ipc-types", "lazy_static", "num-traits", @@ -3341,17 +3343,20 @@ dependencies = [ "fvm_shared", "hex", "ipc-api", + "ipc-observability", "ipc_actors_abis", "libipld", "multihash 0.18.1", "num-traits", "pin-project", + "prometheus", "quickcheck", "quickcheck_macros", "rand", "serde", "serde_json", "serde_with 2.3.3", + "strum 0.26.1", "tempfile", "tendermint 0.31.1", "tendermint-rpc", @@ -3463,11 +3468,13 @@ dependencies = [ "hex", "im", "ipc-api", + "ipc-observability", "ipc-provider", "ipc_actors_abis", "ipc_ipld_resolver", "libp2p", "num-traits", + "prometheus", "rand", "serde", "serde_json", @@ -5045,6 +5052,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "ipc-observability" +version = "0.1.0" +dependencies = [ + "hex", + "lazy_static", + "prometheus", + "tracing", + "tracing-appender", + "tracing-subscriber", +] + [[package]] name = "ipc-provider" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 1e9cfca82..63dfcf3e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "ipc/provider", "ipc/api", "ipc/types", + "ipc/observability", # ipld "ipld/resolver", @@ -157,7 +158,11 @@ tokio-util = { version = "0.7.8", features = ["compat"] } tokio-tungstenite = { version = "0.18.0", features = ["native-tls"] } toml = "0.8" tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "registry"] } +tracing-subscriber = { version = "0.3", features = [ + "env-filter", + "json", + "registry", +] } tracing-appender = "0.2.3" url = { version = "2.4.1", features = ["serde"] } zeroize = "1.6" @@ -168,6 +173,7 @@ ipc-provider = { path = "ipc/provider" } ipc-wallet = { path = "ipc/wallet", features = ["with-ethers"] } ipc_ipld_resolver = { path = "ipld/resolver" } ipc-types = { path = "ipc/types" } +ipc-observability = { path = "ipc/observability" } ipc_actors_abis = { path = "contracts/binding" } # Vendored for cross-compilation, see https://github.com/cross-rs/cross/wiki/Recipes#openssl diff --git a/fendermint/abci/examples/kvstore.rs b/fendermint/abci/examples/kvstore.rs index 9477c502f..41695750e 100644 --- a/fendermint/abci/examples/kvstore.rs +++ b/fendermint/abci/examples/kvstore.rs @@ -75,7 +75,8 @@ impl Application for KVStore { &self, request: request::PrepareProposal, ) -> Result { - let mut txs = take_until_max_size(request.txs, request.max_tx_bytes.try_into().unwrap()); + let (txs, _) = take_until_max_size(request.txs, request.max_tx_bytes.try_into().unwrap()); + let mut txs = txs.clone(); // Enfore transaciton limit so that we don't have a problem with buffering. txs.truncate(MAX_TXNS); diff --git a/fendermint/abci/src/application.rs b/fendermint/abci/src/application.rs index 0cb08164f..58d3b33ae 100644 --- a/fendermint/abci/src/application.rs +++ b/fendermint/abci/src/application.rs @@ -64,7 +64,7 @@ pub trait Application { &self, request: request::PrepareProposal, ) -> AbciResult { - let txs = take_until_max_size(request.txs, request.max_tx_bytes.try_into().unwrap()); + let (txs, _) = take_until_max_size(request.txs, request.max_tx_bytes.try_into().unwrap()); Ok(response::PrepareProposal { txs }) } diff --git a/fendermint/abci/src/util.rs b/fendermint/abci/src/util.rs index b40d26d0d..119fb15ce 100644 --- a/fendermint/abci/src/util.rs +++ b/fendermint/abci/src/util.rs @@ -4,7 +4,7 @@ /// Take the first transactions until the first one that would exceed the maximum limit. /// /// The function does not skip or reorder transaction even if a later one would stay within the limit. -pub fn take_until_max_size>(txs: Vec, max_tx_bytes: usize) -> Vec { +pub fn take_until_max_size>(txs: Vec, max_tx_bytes: usize) -> (Vec, usize) { let mut size: usize = 0; let mut out = Vec::new(); for tx in txs { @@ -15,5 +15,5 @@ pub fn take_until_max_size>(txs: Vec, max_tx_bytes: usize) -> size += bz.len(); out.push(tx); } - out + (out, size) } diff --git a/fendermint/app/Cargo.toml b/fendermint/app/Cargo.toml index b36134316..b524d15b8 100644 --- a/fendermint/app/Cargo.toml +++ b/fendermint/app/Cargo.toml @@ -64,6 +64,7 @@ fendermint_vm_resolver = { path = "../vm/resolver" } fendermint_vm_snapshot = { path = "../vm/snapshot" } fendermint_vm_topdown = { path = "../vm/topdown" } + fvm = { workspace = true } fvm_ipld_blockstore = { workspace = true } fvm_ipld_car = { workspace = true } @@ -72,6 +73,7 @@ fvm_shared = { workspace = true } ipc-api = { workspace = true } ipc-provider = { workspace = true } ipc_ipld_resolver = { workspace = true } +ipc-observability = { workspace = true } [dev-dependencies] tempfile = { workspace = true } diff --git a/fendermint/app/options/Cargo.toml b/fendermint/app/options/Cargo.toml index 9aaf29ce7..c49020dd1 100644 --- a/fendermint/app/options/Cargo.toml +++ b/fendermint/app/options/Cargo.toml @@ -26,6 +26,7 @@ fvm_shared = { workspace = true } ipc-api = { workspace = true } ipc-types = { workspace = true } url = { workspace = true } +ipc-observability = { workspace = true } fendermint_vm_genesis = { path = "../../vm/genesis" } fendermint_vm_actor_interface = { path = "../../vm/actor_interface" } diff --git a/fendermint/app/options/src/lib.rs b/fendermint/app/options/src/lib.rs index ea317efba..64d7c2b77 100644 --- a/fendermint/app/options/src/lib.rs +++ b/fendermint/app/options/src/lib.rs @@ -7,6 +7,7 @@ use clap::{Args, Parser, Subcommand}; use config::ConfigArgs; use debug::DebugArgs; use fvm_shared::address::Network; +use ipc_observability::traces::FileLayerConfig; use lazy_static::lazy_static; use tracing_subscriber::EnvFilter; @@ -27,7 +28,7 @@ pub mod run; mod log; mod parse; -use log::{parse_log_level, LogLevel}; +use log::{parse_log_level, parse_rotation_kind, LogLevel, RotationKind}; use parse::parse_network; lazy_static! { @@ -102,13 +103,44 @@ pub struct Options { #[arg(long, env = "FM_CONFIG_DIR")] config_dir: Option, + // TODO Karel - move all FM_LOG_FILE* flags to a configuration file instead + + // Enable logging to a file. + #[arg(long, env = "FM_LOG_FILE_ENABLED")] + pub log_file_enabled: Option, + /// Set a custom directory for ipc log files. - #[arg(long, env = "FM_LOG_DIR")] + #[arg(long, env = "FM_LOG_FILE_DIR")] pub log_dir: Option, - /// Set a custom prefix for ipc log files. - #[arg(long, env = "FM_LOG_FILE_PREFIX")] - pub log_file_prefix: Option, + #[arg(long, env = "FM_LOG_FILE_MAX_FILES")] + pub max_log_files: Option, + + #[arg( + long, + default_value = "daily", + value_enum, + env = "FM_LOG_FILE_ROTATION", + help = "The kind of rotation to use for log files. Options: minutely, hourly, daily, never.", + value_parser = parse_rotation_kind, + )] + pub log_files_rotation: Option, + + #[arg( + long, + env = "FM_LOG_FILE_DOMAINS_FILTER", + help = "Filter log events by domains. Only events from the specified domains will be logged. Comma separated.", + value_delimiter = ',' + )] + pub domains_filter: Option>, + + #[arg( + long, + env = "FM_LOG_FILE_EVENTS_FILTER", + help = "Filter log events by name. Only events with the specified names will be logged. Comma separated.", + value_delimiter = ',' + )] + pub events_filter: Option>, /// Optionally override the default configuration. #[arg(short, long, default_value = "dev")] @@ -162,6 +194,17 @@ impl Options { } } + pub fn log_file_config(&self) -> FileLayerConfig { + FileLayerConfig { + enabled: self.log_file_enabled.unwrap_or(false), + directory: self.log_dir.clone(), + max_log_files: self.max_log_files, + rotation: self.log_files_rotation.clone(), + domain_filter: self.domains_filter.clone(), + events_filter: self.events_filter.clone(), + } + } + /// Path to the configuration directories. /// /// If not specified then returns the default under the home directory. diff --git a/fendermint/app/options/src/log.rs b/fendermint/app/options/src/log.rs index e10415cb8..bf2ae5f53 100644 --- a/fendermint/app/options/src/log.rs +++ b/fendermint/app/options/src/log.rs @@ -6,6 +6,8 @@ use clap::{builder::PossibleValue, ValueEnum}; use lazy_static::lazy_static; use tracing_subscriber::EnvFilter; +pub use ipc_observability::traces::RotationKind; + /// Standard log levels, or something we can pass to /// /// To be fair all of these could be handled by the `EnvFilter`, even `off`, @@ -79,3 +81,13 @@ pub fn parse_log_level(s: &str) -> Result { Ok(LogLevel::Filter(s.to_string())) } } + +pub fn parse_rotation_kind(s: &str) -> Result { + match s { + "minutely" => Ok(RotationKind::Minutely), + "hourly" => Ok(RotationKind::Hourly), + "daily" => Ok(RotationKind::Daily), + "never" => Ok(RotationKind::Never), + _ => Err(format!("invalid rotation kind: {}", s)), + } +} diff --git a/fendermint/app/src/app.rs b/fendermint/app/src/app.rs index 81d67073f..e87677483 100644 --- a/fendermint/app/src/app.rs +++ b/fendermint/app/src/app.rs @@ -13,7 +13,6 @@ use fendermint_abci::{AbciResult, Application}; use fendermint_storage::{ Codec, Encode, KVCollection, KVRead, KVReadable, KVStore, KVWritable, KVWrite, }; -use fendermint_tracing::emit; use fendermint_vm_core::Timestamp; use fendermint_vm_interpreter::bytes::{ BytesMessageApplyRes, BytesMessageCheckRes, BytesMessageQuery, BytesMessageQueryRes, @@ -37,13 +36,17 @@ use fvm_shared::chainid::ChainID; use fvm_shared::clock::ChainEpoch; use fvm_shared::econ::TokenAmount; use fvm_shared::version::NetworkVersion; +use ipc_observability::{emit, serde::HexEncodableBlockHash}; use num_traits::Zero; use serde::{Deserialize, Serialize}; use tendermint::abci::request::CheckTxKind; use tendermint::abci::{request, response}; use tracing::instrument; -use crate::events::{NewBlock, ProposalProcessed}; +use crate::observe::{ + BlockCommitted, BlockProposalEvaluated, BlockProposalReceived, BlockProposalSent, Message, + MpoolReceived, +}; use crate::AppExitCode; use crate::BlockHeight; use crate::{tmconv::*, VERSION}; @@ -619,15 +622,26 @@ where // Update the check state. *guard = Some(state); + let mut mpool_received_trace = MpoolReceived::default(); + let response = match result { Err(e) => invalid_check_tx(AppError::InvalidEncoding, e.description), Ok(result) => match result { Err(IllegalMessage) => invalid_check_tx(AppError::IllegalMessage, "".to_owned()), Ok(Err(InvalidSignature(d))) => invalid_check_tx(AppError::InvalidSignature, d), - Ok(Ok(ret)) => to_check_tx(ret), + Ok(Ok(ret)) => { + mpool_received_trace.message = Some(Message::from(&ret.message)); + to_check_tx(ret) + } }, }; + mpool_received_trace.accept = response.code.is_ok(); + if !mpool_received_trace.accept { + mpool_received_trace.reason = Some(format!("{:?} - {}", response.code, response.info)); + } + + emit(mpool_received_trace); Ok(response) } @@ -650,7 +664,14 @@ where .context("failed to prepare proposal")?; let txs = txs.into_iter().map(bytes::Bytes::from).collect(); - let txs = take_until_max_size(txs, request.max_tx_bytes.try_into().unwrap()); + let (txs, size) = take_until_max_size(txs, request.max_tx_bytes.try_into().unwrap()); + + emit(BlockProposalSent { + validator: &request.proposer_address, + height: request.height.value(), + tx_count: txs.len(), + size, + }); Ok(response::PrepareProposal { txs }) } @@ -666,6 +687,7 @@ where "process proposal" ); let txs: Vec<_> = request.txs.into_iter().map(|tx| tx.to_vec()).collect(); + let size_txs = txs.iter().map(|tx| tx.len()).sum::(); let num_txs = txs.len(); let accept = self @@ -674,12 +696,22 @@ where .await .context("failed to process proposal")?; - emit!(ProposalProcessed { - is_accepted: accept, - block_height: request.height.value(), - block_hash: request.hash.to_string().as_str(), - num_txs, - proposer: request.proposer_address.to_string().as_str() + emit(BlockProposalReceived { + height: request.height.value(), + hash: HexEncodableBlockHash(request.hash.into()), + size: size_txs, + tx_count: num_txs, + validator: &request.proposer_address, + }); + + emit(BlockProposalEvaluated { + height: request.height.value(), + hash: HexEncodableBlockHash(request.hash.into()), + size: size_txs, + tx_count: num_txs, + validator: &request.proposer_address, + accept, + reason: None, }); if accept { @@ -867,12 +899,15 @@ where // Commit app state to the datastore. self.set_committed_state(state)?; - emit!(NewBlock { block_height }); - // Reset check state. let mut guard = self.check_state.lock().await; *guard = None; + emit(BlockCommitted { + height: block_height, + app_hash: HexEncodableBlockHash(app_hash.clone().into()), + }); + Ok(response::Commit { data: app_hash.into(), retain_height: retain_height.try_into().expect("height is valid"), diff --git a/fendermint/app/src/cmd/run.rs b/fendermint/app/src/cmd/run.rs index 28f8b9a97..87c1e11cd 100644 --- a/fendermint/app/src/cmd/run.rs +++ b/fendermint/app/src/cmd/run.rs @@ -4,13 +4,11 @@ use anyhow::{anyhow, bail, Context}; use async_stm::atomically_or_err; use fendermint_abci::ApplicationService; -use fendermint_app::events::{ParentFinalityVoteAdded, ParentFinalityVoteIgnored}; use fendermint_app::ipc::{AppParentFinalityQuery, AppVote}; use fendermint_app::{App, AppConfig, AppStore, BitswapBlockstore}; use fendermint_app_settings::AccountKind; use fendermint_crypto::SecretKey; use fendermint_rocksdb::{blockstore::NamespaceBlockstore, namespaces, RocksDb, RocksDbConfig}; -use fendermint_tracing::emit; use fendermint_vm_actor_interface::eam::EthAddress; use fendermint_vm_interpreter::chain::ChainEnv; use fendermint_vm_interpreter::fvm::upgrades::UpgradeScheduler; @@ -22,7 +20,8 @@ use fendermint_vm_interpreter::{ }; use fendermint_vm_resolver::ipld::IpldResolver; use fendermint_vm_snapshot::{SnapshotManager, SnapshotParams}; -use fendermint_vm_topdown::proxy::IPCProviderProxy; +use fendermint_vm_topdown::observe::register_metrics as register_topdown_metrics; +use fendermint_vm_topdown::proxy::{IPCProviderProxy, IPCProviderProxyWithLatency}; use fendermint_vm_topdown::sync::launch_polling_syncer; use fendermint_vm_topdown::voting::{publish_vote_loop, Error as VoteError, VoteTally}; use fendermint_vm_topdown::{CachedFinalityProvider, IPCParentFinality, Toggle}; @@ -71,6 +70,8 @@ async fn run(settings: Settings) -> anyhow::Result<()> { let metrics_registry = if settings.metrics.enabled { let registry = prometheus::Registry::new(); + register_topdown_metrics(®istry).context("failed to register topdown metrics")?; + fendermint_app::metrics::register_app_metrics(®istry) .context("failed to register metrics")?; @@ -249,9 +250,14 @@ async fn run(settings: Settings) -> anyhow::Result<()> { config = config.with_max_cache_blocks(v); } - let ipc_provider = Arc::new(make_ipc_provider_proxy(&settings)?); + let ipc_provider = { + let p = make_ipc_provider_proxy(&settings)?; + Arc::new(IPCProviderProxyWithLatency::new(p)) + }; + let finality_provider = CachedFinalityProvider::uninitialized(config.clone(), ipc_provider.clone()).await?; + let p = Arc::new(Toggle::enabled(finality_provider)); (p, Some((ipc_provider, config))) } else { @@ -542,13 +548,9 @@ async fn dispatch_vote( }) .await; - let added = match res { - Ok(added) => { - added - } + match res { Err(e @ VoteError::Equivocation(_, _, _, _)) => { tracing::warn!(error = e.to_string(), "failed to handle vote"); - false } Err(e @ ( VoteError::Uninitialized // early vote, we're not ready yet @@ -556,33 +558,11 @@ async fn dispatch_vote( | VoteError::UnexpectedBlock(_, _) // won't happen here )) => { tracing::debug!(error = e.to_string(), "failed to handle vote"); - false + } + _ => { + tracing::debug!("vote handled"); } }; - - let block_height = f.height; - let block_hash = &hex::encode(&f.block_hash); - let validator = &format!("{:?}", vote.public_key); - - if added { - emit!( - DEBUG, - ParentFinalityVoteAdded { - block_height, - block_hash, - validator, - } - ) - } else { - emit!( - DEBUG, - ParentFinalityVoteIgnored { - block_height, - block_hash, - validator, - } - ) - } } } } diff --git a/fendermint/app/src/events.rs b/fendermint/app/src/events.rs deleted file mode 100644 index 2956fe85b..000000000 --- a/fendermint/app/src/events.rs +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2022-2024 Protocol Labs -// SPDX-License-Identifier: Apache-2.0, MIT - -use crate::BlockHeight; - -/// Re-export other events, just to provide the visibility of where they are. -pub use fendermint_vm_event::{ - NewBottomUpCheckpoint, NewParentView, ParentFinalityCommitted, ParentFinalityMissingQuorum, -}; - -/// Hex encoded block hash. -pub type BlockHashHex<'a> = &'a str; - -#[derive(Debug, Default)] -pub struct ProposalProcessed<'a> { - pub is_accepted: bool, - pub block_height: BlockHeight, - pub block_hash: BlockHashHex<'a>, - pub num_txs: usize, - pub proposer: &'a str, -} - -#[derive(Debug, Default)] -pub struct NewBlock { - pub block_height: BlockHeight, -} - -#[derive(Debug, Default)] -pub struct ParentFinalityVoteAdded<'a> { - pub block_height: BlockHeight, - pub block_hash: BlockHashHex<'a>, - pub validator: &'a str, -} - -#[derive(Debug, Default)] -pub struct ParentFinalityVoteIgnored<'a> { - pub block_height: BlockHeight, - pub block_hash: BlockHashHex<'a>, - pub validator: &'a str, -} - -// TODO: Add new events for: -// * snapshots diff --git a/fendermint/app/src/lib.rs b/fendermint/app/src/lib.rs index 29c83a384..0b529a9f2 100644 --- a/fendermint/app/src/lib.rs +++ b/fendermint/app/src/lib.rs @@ -1,9 +1,9 @@ // Copyright 2022-2024 Protocol Labs // SPDX-License-Identifier: Apache-2.0, MIT mod app; -pub mod events; pub mod ipc; pub mod metrics; +pub mod observe; mod store; mod tmconv; diff --git a/fendermint/app/src/main.rs b/fendermint/app/src/main.rs index 72268af01..117fa3121 100644 --- a/fendermint/app/src/main.rs +++ b/fendermint/app/src/main.rs @@ -3,73 +3,18 @@ pub use fendermint_app_options as options; pub use fendermint_app_settings as settings; -use tracing_appender::{ - non_blocking::WorkerGuard, - rolling::{RollingFileAppender, Rotation}, -}; -use tracing_subscriber::fmt::format::FmtSpan; -use tracing_subscriber::{fmt, layer::SubscriberExt, Layer}; +use ipc_observability::traces::{register_tracing_subscriber, WorkerGuard}; mod cmd; fn init_tracing(opts: &options::Options) -> Option { - let console_filter = opts.log_console_filter().expect("invalid filter"); - let file_filter = opts.log_file_filter().expect("invalid filter"); - - // log all traces to stderr (reserving stdout for any actual output such as from the CLI commands) - let console_layer = fmt::layer() - .with_writer(std::io::stderr) - .with_target(false) - .with_file(true) - .with_line_number(true) - .with_filter(console_filter); - - // add a file layer if log_dir is set - let (file_layer, file_guard) = match &opts.log_dir { - Some(log_dir) => { - let filename = match &opts.log_file_prefix { - Some(prefix) => format!("{}-{}", prefix, "fendermint"), - None => "fendermint".to_string(), - }; - - let appender = RollingFileAppender::builder() - .filename_prefix(filename) - .filename_suffix("log") - .rotation(Rotation::DAILY) - .max_log_files(7) - .build(log_dir) - .expect("failed to initialize rolling file appender"); - - let (non_blocking, file_guard) = tracing_appender::non_blocking(appender); - - let file_layer = fmt::layer() - .json() - .with_writer(non_blocking) - .with_span_events(FmtSpan::CLOSE) - .with_target(false) - .with_file(true) - .with_line_number(true) - .with_filter(file_filter); - - (Some(file_layer), Some(file_guard)) - } - None => (None, None), - }; - - let metrics_layer = if opts.metrics_enabled() { - Some(fendermint_app::metrics::layer()) - } else { - None - }; - - let registry = tracing_subscriber::registry() - .with(console_layer) - .with(file_layer) - .with(metrics_layer); - - tracing::subscriber::set_global_default(registry).expect("Unable to set a global collector"); - - file_guard + let console_filter = opts + .log_console_filter() + .expect("invalid console level filter"); + let file_filter = opts.log_file_filter().expect("invalid file level filter"); + let file_config = opts.log_file_config(); + + register_tracing_subscriber(console_filter, file_filter, file_config) } /// Install a panic handler that prints stuff to the logs, otherwise it only shows up in the console. diff --git a/fendermint/app/src/metrics/mod.rs b/fendermint/app/src/metrics/mod.rs index abe06cdcd..34459e81c 100644 --- a/fendermint/app/src/metrics/mod.rs +++ b/fendermint/app/src/metrics/mod.rs @@ -2,8 +2,6 @@ // SPDX-License-Identifier: Apache-2.0, MIT mod prometheus; -mod tracing; pub use prometheus::app::register_metrics as register_app_metrics; pub use prometheus::eth::register_metrics as register_eth_metrics; -pub use tracing::layer; diff --git a/fendermint/app/src/metrics/prometheus.rs b/fendermint/app/src/metrics/prometheus.rs index 52cd28214..8be633924 100644 --- a/fendermint/app/src/metrics/prometheus.rs +++ b/fendermint/app/src/metrics/prometheus.rs @@ -26,16 +26,6 @@ pub mod app { use prometheus::{IntCounter, IntGauge, Registry}; metrics! { - TOPDOWN_VIEW_BLOCK_HEIGHT: IntGauge = "Highest parent subnet block observed"; - TOPDOWN_VIEW_NUM_MSGS: IntCounter = "Number of top-down messages observed since start"; - TOPDOWN_VIEW_NUM_VAL_CHNGS: IntCounter = "Number of top-down validator changes observed since start"; - TOPDOWN_FINALIZED_BLOCK_HEIGHT: IntGauge = "Highest parent subnet block finalized"; - TOPDOWN_FINALITY_VOTE_BLOCK_HEIGHT: IntGauge = "Block for which a finality vote has been received and added last"; - TOPDOWN_FINALITY_VOTE_MAX_BLOCK_HEIGHT: IntGauge = "Highest block for which a finality vote has been received and added"; - TOPDOWN_FINALITY_VOTE_ADDED: IntCounter = "Number of finality votes received and added since start"; - TOPDOWN_FINALITY_VOTE_IGNORED: IntCounter = "Number of finality votes received and ignored since start"; - TOPDOWN_FINALITY_MISSING_QUORUM: IntCounter = "Number of times we could have proposed but didn't because the quorum was missing"; - BOTTOMUP_CKPT_BLOCK_HEIGHT: IntGauge = "Highest bottom-up checkpoint created"; BOTTOMUP_CKPT_CONFIG_NUM: IntGauge = "Highest configuration number checkpointed"; BOTTOMUP_CKPT_NUM_MSGS: IntCounter = "Number of bottom-up messages observed since start"; diff --git a/fendermint/app/src/metrics/tracing.rs b/fendermint/app/src/metrics/tracing.rs deleted file mode 100644 index 106b00d56..000000000 --- a/fendermint/app/src/metrics/tracing.rs +++ /dev/null @@ -1,236 +0,0 @@ -// Copyright 2022-2024 Protocol Labs -// SPDX-License-Identifier: Apache-2.0, MIT -//! Subscribing to tracing events and turning them into metrics. - -use std::marker::PhantomData; - -use tracing::{Event, Subscriber}; -use tracing_subscriber::{filter, layer, registry::LookupSpan, Layer}; - -use super::prometheus::app as am; -use crate::events::*; - -/// Create a layer that handles events by incrementing metrics. -pub fn layer() -> impl Layer -where - S: Subscriber, - for<'a> S: LookupSpan<'a>, -{ - MetricsLayer::new().with_filter(filter::filter_fn(|md| md.name().starts_with("event::"))) -} - -struct MetricsLayer { - _subscriber: PhantomData, -} - -impl MetricsLayer { - pub fn new() -> Self { - Self { - _subscriber: PhantomData, - } - } -} - -/// Check that the field exist on a type; if it doesn't this won't compile. -/// This ensures that we're mapping fields with the correct name. -macro_rules! check_field { - ($event_ty:ident :: $field:ident) => {{ - if false { - #[allow(clippy::needless_update)] - let _event = $event_ty { - $field: Default::default(), - ..Default::default() - }; - } - }}; -} - -/// Set a gague to an absolute value based on a field in an event. -macro_rules! set_gauge { - ($event:ident, $event_ty:ident :: $field:ident, $gauge:expr) => { - check_field!($event_ty::$field); - let mut fld = visitors::FindU64::new(stringify!($field)); - $event.record(&mut fld); - $gauge.set(fld.value as i64); - }; -} - -/// Set a gauge to the maximum of its value and a field in an event. -macro_rules! max_gauge { - ($event:ident, $event_ty:ident :: $field:ident, $gauge:expr) => { - check_field!($event_ty::$field); - let mut fld = visitors::FindU64::new(stringify!($field)); - $event.record(&mut fld); - let curr = $gauge.get(); - $gauge.set(std::cmp::max(fld.value as i64, curr)); - }; -} - -/// Increment a counter by the value of a field in the event. -macro_rules! inc_counter { - ($event:ident, $event_ty:ident :: $field:ident, $counter:expr) => { - check_field!($event_ty::$field); - let mut fld = visitors::FindU64::new(stringify!($field)); - $event.record(&mut fld); - $counter.inc_by(fld.value); - }; -} - -/// Increment a counter by 1. -/// -/// The field is ignored, it's only here because of how the macros look like. -macro_rules! inc1_counter { - ($event:ident, $event_ty:ident :: $field:ident, $counter:expr) => { - check_field!($event_ty::$field); - $counter.inc(); - }; -} - -/// Produce the prefixed event name from the type name. -macro_rules! event_name { - ($event_ty:ident) => { - concat!("event::", stringify!($event_ty)) - }; -} - -/// Call one of the macros that set values on a metric. -macro_rules! event_mapping { - ($op:ident, $event:ident, $event_ty:ident :: $field:ident, $metric:expr) => { - $op!($event, $event_ty::$field, $metric); - }; -} - -/// Match the event name to event DTO types and within the map fields to metrics. -macro_rules! event_match { - ($event:ident { $( $event_ty:ident { $( $field:ident => $op:ident ! $metric:expr ),* $(,)? } ),* } ) => { - match $event.metadata().name() { - $( - event_name!($event_ty) => { - $( - event_mapping!($op, $event, $event_ty :: $field, $metric); - )* - } - )* - _ => {} - } - }; -} - -impl Layer for MetricsLayer { - fn on_event(&self, event: &Event<'_>, _ctx: layer::Context<'_, S>) { - event_match!(event { - NewParentView { - block_height => set_gauge ! &am::TOPDOWN_VIEW_BLOCK_HEIGHT, - num_msgs => inc_counter ! &am::TOPDOWN_VIEW_NUM_MSGS, - num_validator_changes => inc_counter ! &am::TOPDOWN_VIEW_NUM_VAL_CHNGS, - }, - ParentFinalityCommitted { - block_height => set_gauge ! &am::TOPDOWN_FINALIZED_BLOCK_HEIGHT, - }, - ParentFinalityVoteAdded { - // This one can move up and down randomly as votes come in, but statistically should - // be less likely to be affected by Byzantine validators casting nonsense votes. - block_height => set_gauge ! &am::TOPDOWN_FINALITY_VOTE_BLOCK_HEIGHT, - // This one should only move up, showing the highest vote in the tally. - // It should be easy to produce this on Grafana as well from the one above. - block_height => max_gauge ! &am::TOPDOWN_FINALITY_VOTE_MAX_BLOCK_HEIGHT, - validator => inc1_counter ! &am::TOPDOWN_FINALITY_VOTE_ADDED, - }, - ParentFinalityVoteIgnored { - validator => inc1_counter ! &am::TOPDOWN_FINALITY_VOTE_IGNORED, - }, - ParentFinalityMissingQuorum { - block_hash => inc1_counter ! &am::TOPDOWN_FINALITY_MISSING_QUORUM, - }, - NewBottomUpCheckpoint { - block_height => set_gauge ! &am::BOTTOMUP_CKPT_BLOCK_HEIGHT, - next_configuration_number => set_gauge ! &am::BOTTOMUP_CKPT_CONFIG_NUM, - num_msgs => inc_counter ! &am::BOTTOMUP_CKPT_NUM_MSGS, - }, - NewBlock { - block_height => set_gauge ! &am::ABCI_COMMITTED_BLOCK_HEIGHT - } - }); - } -} - -mod visitors { - use tracing::field::{Field, Visit}; - - pub struct FindU64<'a> { - pub name: &'a str, - pub value: u64, - } - - impl<'a> FindU64<'a> { - pub fn new(name: &'a str) -> Self { - Self { name, value: 0 } - } - } - - // Looking for multiple values because the callsite might be passed as a literal which turns into an i64 for example. - impl<'a> Visit for FindU64<'a> { - fn record_u64(&mut self, field: &Field, value: u64) { - if field.name() == self.name { - self.value = value; - } - } - - fn record_i64(&mut self, field: &Field, value: i64) { - if field.name() == self.name { - self.value = value as u64; - } - } - - fn record_i128(&mut self, field: &Field, value: i128) { - if field.name() == self.name { - self.value = value as u64; - } - } - - fn record_u128(&mut self, field: &Field, value: u128) { - if field.name() == self.name { - self.value = value as u64; - } - } - - fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {} - } -} - -#[cfg(test)] -mod tests { - use fendermint_tracing::emit; - use fendermint_vm_event::ParentFinalityCommitted; - use prometheus::IntGauge; - use tracing_subscriber::layer::SubscriberExt; - - #[test] - fn test_metrics_layer() { - let gauge: &IntGauge = &super::super::prometheus::app::TOPDOWN_FINALIZED_BLOCK_HEIGHT; - - let v0 = gauge.get(); - gauge.inc(); - let v1 = gauge.get(); - assert!(v1 > v0, "gague should change without being registered"); - - let block_height = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(); - - let subscriber = tracing_subscriber::registry().with(super::layer()); - - tracing::subscriber::with_default(subscriber, || { - emit! { - ParentFinalityCommitted { block_height, block_hash: "metrics-test-block" } - } - }); - - assert_eq!( - gauge.get() as u64, - block_height, - "metrics should be captured" - ); - } -} diff --git a/fendermint/app/src/observe.rs b/fendermint/app/src/observe.rs new file mode 100644 index 000000000..793abe2fa --- /dev/null +++ b/fendermint/app/src/observe.rs @@ -0,0 +1,205 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +use fvm_shared::address::Address; +use fvm_shared::econ::TokenAmount; + +use fendermint_vm_interpreter::fvm::FvmMessage; +use tendermint::account::Id; + +use ipc_observability::{ + impl_traceable, impl_traceables, lazy_static, register_metrics, serde::HexEncodableBlockHash, + Recordable, TraceLevel, Traceable, +}; + +use prometheus::{register_counter_vec, CounterVec, Registry}; + +register_metrics! { + PROPOSALS_BLOCK_PROPOSAL_RECEIVED: CounterVec + = register_counter_vec!("proposals_block_proposal_received", "Block proposal received", &["height"]); + PROPOSALS_BLOCK_PROPOSAL_SENT: CounterVec + = register_counter_vec!("proposals_block_proposal_sent", "Block proposal sent", &["height"]); + PROPOSALS_BLOCK_PROPOSAL_ACCEPTED: CounterVec + = register_counter_vec!("proposals_block_proposal_accepted", "Block proposal accepted", &["height"]); + PROPOSALS_BLOCK_PROPOSAL_REJECTED: CounterVec + = register_counter_vec!("proposals_block_proposal_rejected", "Block proposal rejected", &["height"]); + PROPOSALS_BLOCK_COMMITTED: CounterVec + = register_counter_vec!("proposals_block_committed", "Block committed", &["height"]); + MPOOL_RECEIVED: CounterVec = register_counter_vec!("mpool_received", "Mpool received", &["accept"]); +} + +impl_traceables!( + TraceLevel::Info, + "Proposals", + BlockProposalReceived<'a>, + BlockProposalSent<'a>, + BlockProposalEvaluated<'a>, + BlockCommitted +); + +impl_traceables!(TraceLevel::Info, "Mpool", MpoolReceived); + +pub type BlockHeight = u64; + +#[derive(Debug)] +pub struct BlockProposalReceived<'a> { + pub height: BlockHeight, + pub hash: HexEncodableBlockHash, + pub size: usize, + pub tx_count: usize, + pub validator: &'a Id, +} + +impl Recordable for BlockProposalReceived<'_> { + fn record_metrics(&self) { + PROPOSALS_BLOCK_PROPOSAL_RECEIVED + .with_label_values(&[&self.height.to_string()]) + .inc(); + } +} + +#[derive(Debug)] +pub struct BlockProposalSent<'a> { + pub validator: &'a Id, + pub height: BlockHeight, + pub size: usize, + pub tx_count: usize, +} + +impl Recordable for BlockProposalSent<'_> { + fn record_metrics(&self) { + PROPOSALS_BLOCK_PROPOSAL_SENT + .with_label_values(&[&self.height.to_string()]) + .inc(); + } +} + +#[derive(Debug)] +pub struct BlockProposalEvaluated<'a> { + pub height: BlockHeight, + pub hash: HexEncodableBlockHash, + pub size: usize, + pub tx_count: usize, + pub validator: &'a Id, + pub accept: bool, + pub reason: Option<&'a str>, +} + +impl Recordable for BlockProposalEvaluated<'_> { + fn record_metrics(&self) { + if self.accept { + PROPOSALS_BLOCK_PROPOSAL_ACCEPTED + .with_label_values(&[&self.height.to_string()]) + .inc(); + } else { + PROPOSALS_BLOCK_PROPOSAL_REJECTED + .with_label_values(&[&self.height.to_string()]) + .inc(); + } + } +} + +#[derive(Debug)] +pub struct BlockCommitted { + pub height: BlockHeight, + pub app_hash: HexEncodableBlockHash, +} + +impl Recordable for BlockCommitted { + fn record_metrics(&self) { + PROPOSALS_BLOCK_COMMITTED + .with_label_values(&[&self.height.to_string()]) + .inc(); + } +} + +#[derive(Debug)] +pub struct Message { + pub from: Address, + pub to: Address, + pub value: TokenAmount, + pub gas_limit: u64, + pub fee_cap: TokenAmount, + pub premium: TokenAmount, +} + +impl From<&FvmMessage> for Message { + fn from(fvm_message: &FvmMessage) -> Self { + Message { + from: fvm_message.from, + to: fvm_message.to, + value: fvm_message.value.clone(), + gas_limit: fvm_message.gas_limit, + fee_cap: fvm_message.gas_fee_cap.clone(), + premium: fvm_message.gas_premium.clone(), + } + } +} + +#[derive(Debug, Default)] +pub struct MpoolReceived { + // TODO - add cid later on + // pub message_cid: &'a str, + pub message: Option, + pub accept: bool, + pub reason: Option, +} + +impl Recordable for MpoolReceived { + fn record_metrics(&self) { + MPOOL_RECEIVED + .with_label_values(&[&self.accept.to_string()]) + .inc(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ipc_observability::emit; + + #[test] + fn test_emit() { + let id = Id::new([0x01; 20]); + + emit(BlockProposalReceived { + height: 1, + hash: HexEncodableBlockHash(vec![0x01, 0x02, 0x03]), + size: 100, + tx_count: 10, + validator: &id, + }); + + emit(BlockProposalSent { + height: 1, + size: 100, + tx_count: 10, + validator: &id, + }); + + emit(BlockProposalEvaluated { + height: 1, + hash: HexEncodableBlockHash(vec![0x01, 0x02, 0x03]), + size: 100, + tx_count: 10, + validator: &id, + accept: true, + reason: None, + }); + + emit(BlockProposalEvaluated { + height: 1, + hash: HexEncodableBlockHash(vec![0x01, 0x02, 0x03]), + size: 100, + tx_count: 10, + validator: &id, + accept: false, + reason: None, + }); + + emit(BlockCommitted { + height: 1, + app_hash: HexEncodableBlockHash(vec![0x01, 0x02, 0x03]), + }); + } +} diff --git a/fendermint/eth/api/src/apis/mod.rs b/fendermint/eth/api/src/apis/mod.rs index ca7e1968a..e948728f5 100644 --- a/fendermint/eth/api/src/apis/mod.rs +++ b/fendermint/eth/api/src/apis/mod.rs @@ -121,7 +121,6 @@ pub fn register_methods(server: ServerBuilder) -> ServerBuilder = &'a str; -#[derive(Debug, Default)] -pub struct NewParentView<'a> { - pub is_null: bool, - pub block_height: BlockHeight, - pub block_hash: Option>, // hex encoded, unless null block - pub num_msgs: usize, - pub num_validator_changes: usize, -} - #[derive(Debug, Default)] pub struct ParentFinalityCommitted<'a> { pub block_height: BlockHeight, diff --git a/fendermint/vm/interpreter/Cargo.toml b/fendermint/vm/interpreter/Cargo.toml index 35dd6e83e..63e591da8 100644 --- a/fendermint/vm/interpreter/Cargo.toml +++ b/fendermint/vm/interpreter/Cargo.toml @@ -28,6 +28,7 @@ fendermint_testing = { path = "../../testing", optional = true } ipc_actors_abis = { workspace = true } ipc-api = { workspace = true } +ipc-observability = { workspace = true } async-trait = { workspace = true } async-stm = { workspace = true } @@ -42,6 +43,8 @@ tendermint = { workspace = true } tendermint-rpc = { workspace = true } tracing = { workspace = true } thiserror = { workspace = true } +prometheus = { workspace = true } +strum = { workspace = true } cid = { workspace = true } fvm = { workspace = true } diff --git a/fendermint/vm/interpreter/src/chain.rs b/fendermint/vm/interpreter/src/chain.rs index 78e75de8e..79136138f 100644 --- a/fendermint/vm/interpreter/src/chain.rs +++ b/fendermint/vm/interpreter/src/chain.rs @@ -20,7 +20,7 @@ use fendermint_vm_message::{ ipc::{BottomUpCheckpoint, CertifiedMessage, IpcMessage, SignedRelayedMessage}, }; use fendermint_vm_resolver::pool::{ResolveKey, ResolvePool}; -use fendermint_vm_topdown::proxy::IPCProviderProxy; +use fendermint_vm_topdown::proxy::IPCProviderProxyWithLatency; use fendermint_vm_topdown::voting::{ValidatorKey, VoteTally}; use fendermint_vm_topdown::{ CachedFinalityProvider, IPCParentFinality, ParentFinalityProvider, ParentViewProvider, Toggle, @@ -34,7 +34,7 @@ use std::sync::Arc; /// A resolution pool for bottom-up and top-down checkpoints. pub type CheckpointPool = ResolvePool; -pub type TopDownFinalityProvider = Arc>>; +pub type TopDownFinalityProvider = Arc>>; /// These are the extra state items that the chain interpreter needs, /// a sort of "environment" supporting IPC. @@ -362,12 +362,20 @@ where tracing::debug!("chain interpreter applied topdown msgs"); + let local_block_height = state.block_height() as u64; + let proposer = state.validator_id().map(|id| id.to_string()); + let proposer_ref = proposer.as_deref(); + atomically(|| { env.parent_finality_provider .set_new_finality(finality.clone(), prev_finality.clone())?; - env.parent_finality_votes - .set_finalized(finality.height, finality.block_hash.clone())?; + env.parent_finality_votes.set_finalized( + finality.height, + finality.block_hash.clone(), + proposer_ref, + Some(local_block_height), + )?; Ok(()) }) diff --git a/fendermint/vm/interpreter/src/fvm/check.rs b/fendermint/vm/interpreter/src/fvm/check.rs index 1329ccced..5e627007c 100644 --- a/fendermint/vm/interpreter/src/fvm/check.rs +++ b/fendermint/vm/interpreter/src/fvm/check.rs @@ -6,10 +6,16 @@ use async_trait::async_trait; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::RawBytes; use fvm_shared::{address::Address, error::ExitCode}; +use ipc_observability::{emit, measure_time}; use crate::CheckInterpreter; -use super::{state::FvmExecState, store::ReadOnlyBlockstore, FvmMessage, FvmMessageInterpreter}; +use super::{ + observe::{MsgExec, MsgExecPurpose}, + state::FvmExecState, + store::ReadOnlyBlockstore, + FvmMessage, FvmMessageInterpreter, +}; /// Transaction check results are expressed by the exit code, so that hopefully /// they would result in the same error code if they were applied. @@ -19,6 +25,7 @@ pub struct FvmCheckRet { pub exit_code: ExitCode, pub return_data: Option, pub info: Option, + pub message: FvmMessage, } #[async_trait] @@ -64,6 +71,7 @@ where exit_code, return_data, info, + message: msg.clone(), }; Ok((state, ret)) }; @@ -109,8 +117,18 @@ where // Instead of modifying just the partial state, we will execute the call in earnest. // This is required for fully supporting the Ethereum API "pending" queries, if that's needed. - // This will stack the effect for subsequent transactions added to the mempool. - let (apply_ret, _) = state.execute_explicit(msg.clone())?; + let (execution_result, latency) = + measure_time(|| state.execute_explicit(msg.clone())); + + let (apply_ret, _) = execution_result?; + + emit(MsgExec { + purpose: MsgExecPurpose::Check, + height: state.block_height(), + message: msg.clone(), + duration: latency.as_secs_f64(), + exit_code: apply_ret.msg_receipt.exit_code.value(), + }); return checked( state, diff --git a/fendermint/vm/interpreter/src/fvm/exec.rs b/fendermint/vm/interpreter/src/fvm/exec.rs index c5f845e1f..150cb218a 100644 --- a/fendermint/vm/interpreter/src/fvm/exec.rs +++ b/fendermint/vm/interpreter/src/fvm/exec.rs @@ -9,12 +9,14 @@ use fendermint_vm_actor_interface::{chainmetadata, cron, system}; use fvm::executor::ApplyRet; use fvm_ipld_blockstore::Blockstore; use fvm_shared::{address::Address, ActorID, MethodNum, BLOCK_GAS_LIMIT}; +use ipc_observability::{emit, measure_time}; use tendermint_rpc::Client; use crate::ExecInterpreter; use super::{ checkpoint::{self, PowerUpdates}, + observe::{MsgExec, MsgExecPurpose}, state::FvmExecState, FvmMessage, FvmMessageInterpreter, }; @@ -149,36 +151,37 @@ where mut state: Self::State, msg: Self::Message, ) -> anyhow::Result<(Self::State, Self::DeliverOutput)> { - let from = msg.from; - let to = msg.to; - let method_num = msg.method_num; - let gas_limit = msg.gas_limit; + let (apply_ret, emitters, latency) = if msg.from == system::SYSTEM_ACTOR_ADDR { + let (execution_result, latency) = measure_time(|| state.execute_implicit(msg.clone())); + let (apply_ret, emitters) = execution_result?; - let (apply_ret, emitters) = if from == system::SYSTEM_ACTOR_ADDR { - state.execute_implicit(msg)? + (apply_ret, emitters, latency) } else { - state.execute_explicit(msg)? + let (execution_result, latency) = measure_time(|| state.execute_explicit(msg.clone())); + let (apply_ret, emitters) = execution_result?; + + (apply_ret, emitters, latency) }; - tracing::info!( - height = state.block_height(), - from = from.to_string(), - to = to.to_string(), - method_num = method_num, - exit_code = apply_ret.msg_receipt.exit_code.value(), - gas_used = apply_ret.msg_receipt.gas_used, - "tx delivered" - ); + let exit_code = apply_ret.msg_receipt.exit_code.value(); let ret = FvmApplyRet { apply_ret, - from, - to, - method_num, - gas_limit, + from: msg.from, + to: msg.to, + method_num: msg.method_num, + gas_limit: msg.gas_limit, emitters, }; + emit(MsgExec { + purpose: MsgExecPurpose::Apply, + height: state.block_height(), + message: msg, + duration: latency.as_secs_f64(), + exit_code, + }); + Ok((state, ret)) } diff --git a/fendermint/vm/interpreter/src/fvm/mod.rs b/fendermint/vm/interpreter/src/fvm/mod.rs index 0aa31b69c..83cb934d9 100644 --- a/fendermint/vm/interpreter/src/fvm/mod.rs +++ b/fendermint/vm/interpreter/src/fvm/mod.rs @@ -8,6 +8,7 @@ mod checkpoint; mod exec; mod externs; mod genesis; +mod observe; mod query; pub mod state; pub mod store; diff --git a/fendermint/vm/interpreter/src/fvm/observe.rs b/fendermint/vm/interpreter/src/fvm/observe.rs new file mode 100644 index 000000000..02f1b86d0 --- /dev/null +++ b/fendermint/vm/interpreter/src/fvm/observe.rs @@ -0,0 +1,95 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +use ipc_observability::{ + impl_traceable, impl_traceables, lazy_static, register_metrics, Recordable, TraceLevel, + Traceable, +}; +use prometheus::{register_histogram, Histogram, Registry}; + +use fvm_shared::message::Message; + +register_metrics! { + EXEC_FVM_CHECK_EXECUTION_TIME_SECS: Histogram + = register_histogram!("exec_fvm_check_execution_time_secs", "Execution time of FVM check in seconds"); + EXEC_FVM_ESTIMATE_EXECUTION_TIME_SECS: Histogram + = register_histogram!("exec_fvm_estimate_execution_time_secs", "Execution time of FVM estimate in seconds"); + EXEC_FVM_APPLY_EXECUTION_TIME_SECS: Histogram + = register_histogram!("exec_fvm_apply_execution_time_secs", "Execution time of FVM apply in seconds"); + EXEC_FVM_CALL_EXECUTION_TIME_SECS: Histogram + = register_histogram!("exec_fvm_call_execution_time_secs", "Execution time of FVM call in seconds"); +} + +impl_traceables!(TraceLevel::Info, "Execution", MsgExec); + +#[derive(Debug, strum::EnumString)] +#[strum(serialize_all = "snake_case")] +pub enum MsgExecPurpose { + Check, + Apply, + Estimate, + Call, +} + +#[derive(Debug)] +#[allow(dead_code)] +pub struct MsgExec { + pub purpose: MsgExecPurpose, + pub message: Message, + pub height: i64, + pub duration: f64, + pub exit_code: u32, +} + +impl Recordable for MsgExec { + fn record_metrics(&self) { + match self.purpose { + MsgExecPurpose::Check => EXEC_FVM_CHECK_EXECUTION_TIME_SECS.observe(self.duration), + MsgExecPurpose::Estimate => { + EXEC_FVM_ESTIMATE_EXECUTION_TIME_SECS.observe(self.duration) + } + MsgExecPurpose::Apply => EXEC_FVM_APPLY_EXECUTION_TIME_SECS.observe(self.duration), + MsgExecPurpose::Call => EXEC_FVM_CALL_EXECUTION_TIME_SECS.observe(self.duration), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ipc_observability::emit; + + #[test] + fn test_metrics() { + let registry = Registry::new(); + register_metrics(®istry).unwrap(); + } + + #[test] + fn test_emit() { + use fvm_ipld_encoding::RawBytes; + use fvm_shared::address::Address; + use fvm_shared::econ::TokenAmount; + + let message = Message { + version: 1, + from: Address::new_id(1), + to: Address::new_id(2), + sequence: 1, + value: TokenAmount::from_atto(1), + method_num: 1, + params: RawBytes::default(), + gas_limit: 1, + gas_fee_cap: TokenAmount::from_atto(1), + gas_premium: TokenAmount::from_atto(1), + }; + + emit(MsgExec { + purpose: MsgExecPurpose::Check, + height: 1, + duration: 1.0, + exit_code: 1, + message: message.clone(), + }); + } +} diff --git a/fendermint/vm/interpreter/src/fvm/query.rs b/fendermint/vm/interpreter/src/fvm/query.rs index 945cea65c..9d668fd7c 100644 --- a/fendermint/vm/interpreter/src/fvm/query.rs +++ b/fendermint/vm/interpreter/src/fvm/query.rs @@ -1,5 +1,7 @@ // Copyright 2022-2024 Protocol Labs // SPDX-License-Identifier: Apache-2.0, MIT +use std::time::Instant; + use async_trait::async_trait; use cid::Cid; use fendermint_vm_message::query::{ActorState, FvmQuery, GasEstimate, StateParams}; @@ -8,8 +10,10 @@ use fvm_ipld_encoding::RawBytes; use fvm_shared::{ bigint::BigInt, econ::TokenAmount, error::ExitCode, message::Message, ActorID, BLOCK_GAS_LIMIT, }; +use ipc_observability::emit; use num_traits::Zero; +use super::observe::{MsgExec, MsgExecPurpose}; use crate::QueryInterpreter; use super::{state::FvmQueryState, FvmApplyRet, FvmMessageInterpreter}; @@ -79,24 +83,19 @@ where let method_num = msg.method_num; let gas_limit = msg.gas_limit; + let start = Instant::now(); // Do not stack effects - let (state, (apply_ret, emitters)) = state.call(*msg).await?; + let (state, (apply_ret, emitters)) = state.call(*msg.clone()).await?; + let latency = start.elapsed().as_secs_f64(); + let exit_code = apply_ret.msg_receipt.exit_code.value(); - tracing::info!( - height = state.block_height(), - pending = state.pending(), - to = to.to_string(), - from = from.to_string(), - method_num, - exit_code = apply_ret.msg_receipt.exit_code.value(), - data = hex::encode(apply_ret.msg_receipt.return_data.bytes()), - info = apply_ret - .failure_info - .as_ref() - .map(|i| i.to_string()) - .unwrap_or_default(), - "query call" - ); + emit(MsgExec { + purpose: MsgExecPurpose::Call, + height: state.block_height(), + message: *msg, + duration: latency, + exit_code, + }); let ret = FvmApplyRet { apply_ret, @@ -177,15 +176,19 @@ where msg.gas_premium = TokenAmount::zero(); msg.gas_fee_cap = TokenAmount::zero(); + let start = Instant::now(); // estimate the gas limit and assign it to the message // revert any changes because we'll repeat the estimation let (state, (ret, _)) = state.call(msg.clone()).await?; + let latency = start.elapsed().as_secs_f64(); - tracing::debug!( - gas_used = ret.msg_receipt.gas_used, - exit_code = ret.msg_receipt.exit_code.value(), - "estimated gassed message" - ); + emit(MsgExec { + purpose: MsgExecPurpose::Estimate, + height: state.block_height(), + message: msg.clone(), + duration: latency, + exit_code: ret.msg_receipt.exit_code.value(), + }); if !ret.msg_receipt.exit_code.is_success() { // if the message fail we can't estimate the gas. @@ -278,7 +281,9 @@ where // set message nonce to zero so the right one is picked up msg.sequence = 0; - let (state, (apply_ret, _)) = state.call(msg).await?; + let start = Instant::now(); + let (state, (apply_ret, _)) = state.call(msg.clone()).await?; + let latency = start.elapsed().as_secs_f64(); let ret = GasEstimate { exit_code: apply_ret.msg_receipt.exit_code, @@ -290,6 +295,14 @@ where gas_limit: apply_ret.msg_receipt.gas_used, }; + emit(MsgExec { + purpose: MsgExecPurpose::Estimate, + height: state.block_height(), + message: msg, + duration: latency, + exit_code: ret.exit_code.value(), + }); + // if the message succeeded or failed with a different error than `SYS_OUT_OF_GAS`, // immediately return as we either succeeded finding the right gas estimation, // or something non-related happened. diff --git a/fendermint/vm/topdown/Cargo.toml b/fendermint/vm/topdown/Cargo.toml index 42980a06e..f24db3051 100644 --- a/fendermint/vm/topdown/Cargo.toml +++ b/fendermint/vm/topdown/Cargo.toml @@ -29,11 +29,14 @@ tendermint-rpc = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +prometheus = { workspace = true } fendermint_vm_genesis = { path = "../genesis" } fendermint_vm_event = { path = "../event" } fendermint_tracing = { path = "../../tracing" } +ipc-observability = { workspace = true } + [dev-dependencies] arbitrary = { workspace = true } clap = { workspace = true } diff --git a/fendermint/vm/topdown/src/lib.rs b/fendermint/vm/topdown/src/lib.rs index 91390093e..baf471baa 100644 --- a/fendermint/vm/topdown/src/lib.rs +++ b/fendermint/vm/topdown/src/lib.rs @@ -11,6 +11,8 @@ pub mod proxy; mod toggle; pub mod voting; +pub mod observe; + use async_stm::Stm; use async_trait::async_trait; use ethers::utils::hex; diff --git a/fendermint/vm/topdown/src/observe.rs b/fendermint/vm/topdown/src/observe.rs new file mode 100644 index 000000000..5a0caa66e --- /dev/null +++ b/fendermint/vm/topdown/src/observe.rs @@ -0,0 +1,240 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +use ipc_observability::{ + impl_traceable, impl_traceables, lazy_static, register_metrics, serde::HexEncodableBlockHash, + Recordable, TraceLevel, Traceable, +}; +use prometheus::{ + register_histogram_vec, register_int_counter_vec, register_int_gauge, register_int_gauge_vec, + HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry, +}; + +register_metrics! { + TOPDOWN_PARENT_RPC_CALL_TOTAL: IntCounterVec + = register_int_counter_vec!("topdown_parent_rpc_call_total", "Parent RPC calls", &["source", "method", "status"]); + TOPDOWN_PARENT_RPC_CALL_LATENCY_SECS: HistogramVec + = register_histogram_vec!("topdown_parent_rpc_call_latency_secs", "Parent RPC calls latency", &["source", "method", "status"]); + TOPDOWN_PARENT_FINALITY_LATEST_ACQUIRED_HEIGHT: IntGaugeVec + = register_int_gauge_vec!("topdown_parent_finality_latest_acquired_height", "Latest locally acquired parent finality", &["source"]); + TOPDOWN_PARENT_FINALITY_VOTING_LATEST_RECEIVED_HEIGHT: IntGaugeVec + = register_int_gauge_vec!("topdown_parent_finality_voting_latest_received_height", "Parent finality gossip received", &["validator"]); + TOPDOWN_PARENT_FINALITY_VOTING_LATEST_SENT_HEIGHT: IntGauge + = register_int_gauge!("topdown_parent_finality_voting_latest_sent_height", "Parent finality peer"); + TOPDOWN_PARENT_FINALITY_VOTING_QUORUM_HEIGHT: IntGauge + = register_int_gauge!( + "topdown_parent_finality_voting_quorum_height", + "Parent finality vote tally new agreement; recorded whenever the latest epoch with quorum" + ); + TOPDOWN_PARENT_FINALITY_VOTING_QUORUM_WEIGHT: IntGauge + = register_int_gauge!( + "topdown_parent_finality_voting_quorum_weight", + "Parent finality vote tally new agreement; recorded whenever the latest epoch with quorum" + ); + TOPDOWN_PARENT_FINALITY_COMMITTED_HEIGHT: IntGauge + = register_int_gauge!("topdown_parent_finality_committed_height", "Parent finality committed on chain"); +} + +impl_traceables!( + TraceLevel::Info, + "Topdown", + ParentRpcCalled<'a>, + ParentFinalityAcquired<'a>, + ParentFinalityPeerVoteReceived<'a>, + ParentFinalityPeerVoteSent, + ParentFinalityPeerQuorumReached, + ParentFinalityCommitted<'a> +); + +#[derive(Debug)] +pub struct ParentRpcCalled<'a> { + pub source: &'a str, + pub json_rpc: &'a str, + pub method: &'a str, + pub status: &'a str, + pub latency: f64, +} + +impl Recordable for ParentRpcCalled<'_> { + fn record_metrics(&self) { + TOPDOWN_PARENT_RPC_CALL_TOTAL + .with_label_values(&[self.source, self.method, self.status]) + .inc(); + + TOPDOWN_PARENT_RPC_CALL_LATENCY_SECS + .with_label_values(&[self.source, self.method, self.status]) + .observe(self.latency); + } +} + +pub type BlockHeight = u64; + +#[derive(Debug)] +pub struct ParentFinalityAcquired<'a> { + pub source: &'a str, + pub is_null: bool, + pub block_height: BlockHeight, + pub block_hash: Option, + pub commitment_hash: Option, + pub num_msgs: usize, + pub num_validator_changes: usize, +} + +impl Recordable for ParentFinalityAcquired<'_> { + fn record_metrics(&self) { + TOPDOWN_PARENT_FINALITY_LATEST_ACQUIRED_HEIGHT + .with_label_values(&[self.source]) + .set(self.block_height as i64); + } +} + +#[derive(Debug)] +pub struct ParentFinalityPeerVoteReceived<'a> { + pub validator: &'a str, + pub block_height: BlockHeight, + pub block_hash: HexEncodableBlockHash, + pub commitment_hash: Option, +} + +impl Recordable for ParentFinalityPeerVoteReceived<'_> { + fn record_metrics(&self) { + TOPDOWN_PARENT_FINALITY_VOTING_LATEST_RECEIVED_HEIGHT + .with_label_values(&[self.validator]) + .set(self.block_height as i64); + } +} + +#[derive(Debug)] +pub struct ParentFinalityPeerVoteSent { + pub block_height: BlockHeight, + pub block_hash: HexEncodableBlockHash, + pub commitment_hash: Option, +} + +impl Recordable for ParentFinalityPeerVoteSent { + fn record_metrics(&self) { + TOPDOWN_PARENT_FINALITY_VOTING_LATEST_SENT_HEIGHT.set(self.block_height as i64); + } +} + +#[derive(Debug)] +pub struct ParentFinalityPeerQuorumReached { + pub block_height: BlockHeight, + pub block_hash: HexEncodableBlockHash, + pub commitment_hash: Option, + pub weight: u64, +} + +impl Recordable for ParentFinalityPeerQuorumReached { + fn record_metrics(&self) { + TOPDOWN_PARENT_FINALITY_VOTING_QUORUM_HEIGHT.set(self.block_height as i64); + + // TODO Karel - this should be sum of weights of all validators that voted? Ask Raul + TOPDOWN_PARENT_FINALITY_VOTING_QUORUM_WEIGHT.set(self.weight as i64); + } +} + +#[derive(Debug)] +pub struct ParentFinalityCommitted<'a> { + pub parent_height: BlockHeight, + pub block_hash: HexEncodableBlockHash, + pub local_height: Option, + pub proposer: Option<&'a str>, +} + +impl Recordable for ParentFinalityCommitted<'_> { + fn record_metrics(&self) { + TOPDOWN_PARENT_FINALITY_COMMITTED_HEIGHT.set(self.parent_height as i64); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ipc_observability::emit; + + #[test] + fn test_metrics() { + let registry = Registry::new(); + register_metrics(®istry).unwrap(); + } + + #[test] + fn test_metric_increase() { + let registry = Registry::new(); + register_metrics(®istry).unwrap(); + + // Initialize the metric values + let source = "source"; + let method = "method"; + let status = "status"; + let initial_value = TOPDOWN_PARENT_RPC_CALL_TOTAL + .with_label_values(&[source, method, status]) + .get(); + + // Emit a record to increase the metric + emit(ParentRpcCalled { + source, + json_rpc: "json_rpc", + method, + status, + latency: 0.0, + }); + + // Check that the metric value has increased by 1 + let new_value = TOPDOWN_PARENT_RPC_CALL_TOTAL + .with_label_values(&[source, method, status]) + .get(); + assert_eq!(new_value, initial_value + 1); + } + + #[test] + fn test_emit() { + emit(ParentRpcCalled { + source: "source", + json_rpc: "json_rpc", + method: "method", + status: "status", + latency: 0.0, + }); + + let hash = vec![0u8; 32]; + + emit(ParentFinalityAcquired { + source: "source", + is_null: false, + block_height: 0, + block_hash: Some(HexEncodableBlockHash(hash.clone())), + commitment_hash: Some(HexEncodableBlockHash(hash.clone())), + num_msgs: 0, + num_validator_changes: 0, + }); + + emit(ParentFinalityPeerVoteReceived { + validator: "validator", + block_height: 0, + block_hash: HexEncodableBlockHash(hash.clone()), + commitment_hash: Some(HexEncodableBlockHash(hash.clone())), + }); + + emit(ParentFinalityPeerVoteSent { + block_height: 0, + block_hash: HexEncodableBlockHash(hash.clone()), + commitment_hash: Some(HexEncodableBlockHash(hash.clone())), + }); + + emit(ParentFinalityPeerQuorumReached { + block_height: 0, + block_hash: HexEncodableBlockHash(hash.clone()), + commitment_hash: Some(HexEncodableBlockHash(hash.clone())), + weight: 0, + }); + + emit(ParentFinalityCommitted { + parent_height: 0, + block_hash: HexEncodableBlockHash(hash.clone()), + local_height: Some(0), + proposer: Some("proposerOption"), + }); + } +} diff --git a/fendermint/vm/topdown/src/proxy.rs b/fendermint/vm/topdown/src/proxy.rs index 29fb5f93c..94a8e3177 100644 --- a/fendermint/vm/topdown/src/proxy.rs +++ b/fendermint/vm/topdown/src/proxy.rs @@ -1,15 +1,19 @@ // Copyright 2022-2024 Protocol Labs // SPDX-License-Identifier: Apache-2.0, MIT +use crate::observe::ParentRpcCalled; use crate::BlockHeight; use anyhow::anyhow; +use anyhow::Result; use async_trait::async_trait; use fvm_shared::clock::ChainEpoch; use ipc_api::cross::IpcEnvelope; use ipc_api::staking::StakingChangeRequest; use ipc_api::subnet_id::SubnetID; +use ipc_observability::emit; use ipc_provider::manager::{GetBlockHashResult, TopDownQueryPayload}; use ipc_provider::IpcProvider; +use std::time::Instant; use tracing::instrument; /// The interface to querying state of the parent @@ -76,7 +80,6 @@ impl ParentQueryProxy for IPCProviderProxy { } /// Getting the block hash at the target height. - #[instrument(skip(self))] async fn get_block_hash(&self, height: BlockHeight) -> anyhow::Result { self.ipc_provider .get_block_hash(&self.parent_subnet, height as ChainEpoch) @@ -84,7 +87,6 @@ impl ParentQueryProxy for IPCProviderProxy { } /// Get the top down messages from the starting to the ending height. - #[instrument(skip(self))] async fn get_top_down_msgs( &self, height: BlockHeight, @@ -100,7 +102,6 @@ impl ParentQueryProxy for IPCProviderProxy { } /// Get the validator set at the specified height. - #[instrument(skip(self))] async fn get_validator_changes( &self, height: BlockHeight, @@ -116,3 +117,97 @@ impl ParentQueryProxy for IPCProviderProxy { }) } } + +// TODO - create a macro for this +pub struct IPCProviderProxyWithLatency { + inner: IPCProviderProxy, +} + +impl IPCProviderProxyWithLatency { + pub fn new(inner: IPCProviderProxy) -> Self { + Self { inner } + } +} + +#[async_trait] +impl ParentQueryProxy for IPCProviderProxyWithLatency { + #[instrument(skip(self))] + async fn get_chain_head_height(&self) -> anyhow::Result { + emit_event_with_latency( + &self.inner.parent_subnet.to_string(), + "chain_head", + || async { self.inner.get_chain_head_height().await }, + ) + .await + } + + #[instrument(skip(self))] + async fn get_genesis_epoch(&self) -> anyhow::Result { + emit_event_with_latency( + &self.inner.parent_subnet.to_string(), + "genesis_epoch", + || async { self.inner.get_genesis_epoch().await }, + ) + .await + } + + #[instrument(skip(self))] + async fn get_block_hash(&self, height: BlockHeight) -> anyhow::Result { + emit_event_with_latency( + &self.inner.parent_subnet.to_string(), + "get_block_hash", + || async { self.inner.get_block_hash(height).await }, + ) + .await + } + + #[instrument(skip(self))] + async fn get_top_down_msgs( + &self, + height: BlockHeight, + ) -> anyhow::Result>> { + emit_event_with_latency( + &self.inner.parent_subnet.to_string(), + "get_top_down_msgs", + || async { self.inner.get_top_down_msgs(height).await }, + ) + .await + } + + #[instrument(skip(self))] + async fn get_validator_changes( + &self, + height: BlockHeight, + ) -> anyhow::Result>> { + emit_event_with_latency( + &self.inner.parent_subnet.to_string(), + "get_validator_changeset", + || async { self.inner.get_validator_changes(height).await }, + ) + .await + } +} + +// TODO Karel - make it nicer. Perhaps use a macro? +async fn emit_event_with_latency(json_rpc: &str, method: &str, func: F) -> Result +where + F: FnOnce() -> Fut, + Fut: std::future::Future>, +{ + let start = Instant::now(); + let result = func().await; + let latency = start.elapsed().as_secs_f64(); + + emit(ParentRpcCalled { + source: "IPC Provider Proxy", + json_rpc, + method, + latency, + status: match &result { + Ok(_) => "success", + Err(_) => "error", + }, + }); + + result +} diff --git a/fendermint/vm/topdown/src/sync/mod.rs b/fendermint/vm/topdown/src/sync/mod.rs index 03d98f129..2cfe2a0f9 100644 --- a/fendermint/vm/topdown/src/sync/mod.rs +++ b/fendermint/vm/topdown/src/sync/mod.rs @@ -138,7 +138,8 @@ where atomically(|| { view_provider.set_new_finality(finality.clone(), None)?; - vote_tally.set_finalized(finality.height, finality.block_hash.clone())?; + + vote_tally.set_finalized(finality.height, finality.block_hash.clone(), None, None)?; vote_tally.set_power_table(power_table.clone())?; Ok(()) }) diff --git a/fendermint/vm/topdown/src/sync/syncer.rs b/fendermint/vm/topdown/src/sync/syncer.rs index 675d4e4c6..ee4748058 100644 --- a/fendermint/vm/topdown/src/sync/syncer.rs +++ b/fendermint/vm/topdown/src/sync/syncer.rs @@ -16,8 +16,8 @@ use libp2p::futures::TryFutureExt; use std::sync::Arc; use tracing::instrument; -use fendermint_tracing::emit; -use fendermint_vm_event::{BlockHashHex, NewParentView}; +use crate::observe::ParentFinalityAcquired; +use ipc_observability::{emit, serde::HexEncodableBlockHash}; /// Parent syncer that constantly poll parent. This struct handles lotus null blocks and deferred /// execution. For ETH based parent, it should work out of the box as well. @@ -205,12 +205,14 @@ where }) .await?; - emit!(NewParentView { + emit(ParentFinalityAcquired { + source: "Parent syncer", is_null: true, block_height: height, - block_hash: None::, + block_hash: None, + commitment_hash: None, num_msgs: 0, - num_validator_changes: 0 + num_validator_changes: 0, }); // Null block received, no block hash for the current height being polled. @@ -256,10 +258,13 @@ where }) .await?; - emit!(NewParentView { + emit(ParentFinalityAcquired { + source: "Parent syncer", is_null: false, block_height: height, - block_hash: Some(&hex::encode(&data.0)), + block_hash: Some(HexEncodableBlockHash(data.0.clone())), + // TODO Karel, Willes - when we introduce commitment hash, we should add it here + commitment_hash: None, num_msgs: data.2.len(), num_validator_changes: data.1.len(), }); diff --git a/fendermint/vm/topdown/src/voting.rs b/fendermint/vm/topdown/src/voting.rs index e913e1887..793c2ab24 100644 --- a/fendermint/vm/topdown/src/voting.rs +++ b/fendermint/vm/topdown/src/voting.rs @@ -3,10 +3,16 @@ use async_stm::{abort, atomically_or_err, retry, Stm, StmResult, TVar}; use serde::{de::DeserializeOwned, Serialize}; +use std::fmt::Display; use std::hash::Hash; use std::{fmt::Debug, time::Duration}; +use crate::observe::{ + ParentFinalityCommitted, ParentFinalityPeerQuorumReached, ParentFinalityPeerVoteReceived, + ParentFinalityPeerVoteSent, +}; use crate::{BlockHash, BlockHeight}; +use ipc_observability::{emit, serde::HexEncodableBlockHash}; // Usign this type because it's `Hash`, unlike the normal `libsecp256k1::PublicKey`. pub use ipc_ipld_resolver::ValidatorKey; @@ -67,7 +73,7 @@ pub struct VoteTally { impl VoteTally where - K: Clone + Hash + Eq + Sync + Send + 'static + Debug, + K: Clone + Hash + Eq + Sync + Send + 'static + Debug + Display, V: AsRef<[u8]> + Clone + Hash + Eq + Sync + Send + 'static, { /// Create an uninitialized instance. Before blocks can be added to it @@ -210,7 +216,9 @@ where } } - let votes_for_block = votes_at_height.entry(block_hash).or_default(); + let validator_pub_key = validator_key.to_string(); + + let votes_for_block = votes_at_height.entry(block_hash.clone()).or_default(); if votes_for_block.insert(validator_key).is_some() { return Ok(false); @@ -218,6 +226,14 @@ where self.votes.write(votes)?; + emit(ParentFinalityPeerVoteReceived { + block_height, + validator: &validator_pub_key, + block_hash: HexEncodableBlockHash(block_hash.as_ref().to_vec()), + // TODO- this needs to be the commitment hash once implemented + commitment_hash: None, + }); + Ok(true) } @@ -278,6 +294,14 @@ where tracing::debug!(weight, quorum_threshold, "showdown"); if weight >= quorum_threshold { + emit(ParentFinalityPeerQuorumReached { + block_height: *block_height, + block_hash: HexEncodableBlockHash(block_hash.as_ref().to_vec()), + // TODO - just placeholder - need to use real commitment once implemented + commitment_hash: None, + weight, + }); + return Ok(Some((*block_height, block_hash.clone()))); } } @@ -288,14 +312,28 @@ where /// Call when a new finalized block is added to the ledger, to clear out all preceding blocks. /// /// After this operation the minimum item in the chain will the new finalized block. - pub fn set_finalized(&self, block_height: BlockHeight, block_hash: V) -> Stm<()> { + pub fn set_finalized( + &self, + parent_block_height: BlockHeight, + parent_block_hash: V, + proposer: Option<&str>, + local_block_height: Option, + ) -> Stm<()> { self.chain.update(|chain| { - let (_, mut chain) = chain.split(&block_height); - chain.insert(block_height, Some(block_hash)); + let (_, mut chain) = chain.split(&parent_block_height); + chain.insert(parent_block_height, Some(parent_block_hash.clone())); chain })?; - self.votes.update(|votes| votes.split(&block_height).1)?; + self.votes + .update(|votes| votes.split(&parent_block_height).1)?; + + emit(ParentFinalityCommitted { + local_height: local_block_height, + parent_height: parent_block_height, + block_hash: HexEncodableBlockHash(parent_block_hash.as_ref().to_vec()), + proposer, + }); Ok(()) } @@ -418,6 +456,12 @@ pub async fn publish_vote_loop( if let Err(e) = client.publish_vote(vote) { tracing::error!(error = e.to_string(), "failed to publish vote"); } + + emit(ParentFinalityPeerVoteSent { + block_height: next_height, + block_hash: HexEncodableBlockHash(next_hash.clone()), + commitment_hash: None, + }); } Err(e) => { tracing::error!(error = e.to_string(), "failed to sign vote"); diff --git a/fendermint/vm/topdown/tests/smt_voting.rs b/fendermint/vm/topdown/tests/smt_voting.rs index 4e19c0355..b605a79a1 100644 --- a/fendermint/vm/topdown/tests/smt_voting.rs +++ b/fendermint/vm/topdown/tests/smt_voting.rs @@ -13,6 +13,7 @@ //! cargo test --release -p fendermint_vm_topdown --test smt_voting //! ``` +use core::fmt; use std::{ cmp::{max, min}, collections::BTreeMap, @@ -41,6 +42,12 @@ state_machine_test!(voting, 10000 ms, 65512 bytes, 200 steps, VotingMachine::new #[derive(Debug, Clone, Hash, Eq, PartialEq, PartialOrd, Ord)] pub struct VotingKey(u64); +impl fmt::Display for VotingKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "VotingKey({})", self.0) + } +} + pub type VotingError = voting::Error; pub enum VotingCommand { @@ -369,7 +376,7 @@ impl smt::StateMachine for VotingMachine { VotingCommand::BlockFinalized(block_height, block_hash) => self.atomically_ok(|| { system - .set_finalized(*block_height, block_hash.clone()) + .set_finalized(*block_height, block_hash.clone(), None, None) .map(|_| None) }), diff --git a/ipc/observability/Cargo.toml b/ipc/observability/Cargo.toml new file mode 100644 index 000000000..de39eba22 --- /dev/null +++ b/ipc/observability/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "ipc-observability" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +license.workspace = true +license-file.workspace = true + +[dependencies] +lazy_static = { workspace = true } +prometheus = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +tracing-appender = { workspace = true } +hex = { workspace = true } diff --git a/ipc/observability/src/lib.rs b/ipc/observability/src/lib.rs new file mode 100644 index 000000000..504c256a9 --- /dev/null +++ b/ipc/observability/src/lib.rs @@ -0,0 +1,55 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +pub mod macros; +pub mod traces; +mod tracing_layers; +pub use lazy_static::lazy_static; +pub mod serde; + +use std::fmt::Debug; +use tracing::{debug, error, info, trace, warn}; + +use std::time::Instant; + +pub trait Recordable { + fn record_metrics(&self); +} + +pub trait Traceable { + fn trace_level(&self) -> TraceLevel; + fn domain(&self) -> &'static str; +} + +pub enum TraceLevel { + Trace, + Debug, + Info, + Warn, + Error, +} + +pub fn emit(trace: T) +where + T: Recordable + Traceable + Debug, +{ + match trace.trace_level() { + TraceLevel::Trace => trace!(domain=trace.domain(), event = ?trace), + TraceLevel::Debug => debug!(domain=trace.domain(), event = ?trace), + TraceLevel::Info => info!(domain=trace.domain(), event = ?trace), + TraceLevel::Warn => warn!(domain=trace.domain(), event = ?trace), + TraceLevel::Error => error!(domain=trace.domain(), event = ?trace), + } + + trace.record_metrics(); +} + +pub fn measure_time(f: F) -> (T, std::time::Duration) +where + F: FnOnce() -> T, +{ + let start = Instant::now(); + let result = f(); + let duration = start.elapsed(); + (result, duration) +} diff --git a/ipc/observability/src/macros.rs b/ipc/observability/src/macros.rs new file mode 100644 index 000000000..5f87f1d23 --- /dev/null +++ b/ipc/observability/src/macros.rs @@ -0,0 +1,54 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +#[macro_export] +macro_rules! register_metrics { + ($($name:ident : $type:ty = $make:expr);* $(;)?) => { + + $( + lazy_static! { + pub static ref $name: $type = $make.unwrap(); + } + )* + + pub fn register_metrics(registry: &Registry) -> anyhow::Result<()> { + $(registry.register(Box::new($name.clone()))?;)* + Ok(()) + } + }; +} + +#[macro_export] +macro_rules! impl_traceable { + ($struct_name:ident<$lifetime:tt>, $trace_level:expr, $domain:expr) => { + impl<$lifetime> Traceable for $struct_name<$lifetime> { + fn trace_level(&self) -> TraceLevel { + $trace_level + } + + fn domain(&self) -> &'static str { + $domain + } + } + }; + ($struct_name:ident, $trace_level:expr, $domain:expr) => { + impl Traceable for $struct_name { + fn trace_level(&self) -> TraceLevel { + $trace_level + } + + fn domain(&self) -> &'static str { + $domain + } + } + }; +} + +#[macro_export] +macro_rules! impl_traceables { + ($trace_level:expr, $domain:expr, $($struct_name:ident$(<$lifetime:tt>)?),+) => { + $( + impl_traceable!($struct_name$(<$lifetime>)?, $trace_level, $domain); + )+ + }; +} diff --git a/ipc/observability/src/serde.rs b/ipc/observability/src/serde.rs new file mode 100644 index 000000000..9225adb0b --- /dev/null +++ b/ipc/observability/src/serde.rs @@ -0,0 +1,14 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +use hex; +use std::fmt; + +/// Hex encodable block hash. +pub struct HexEncodableBlockHash(pub Vec); + +impl fmt::Debug for HexEncodableBlockHash { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", hex::encode(&self.0)) + } +} diff --git a/ipc/observability/src/traces.rs b/ipc/observability/src/traces.rs new file mode 100644 index 000000000..ae14ad90f --- /dev/null +++ b/ipc/observability/src/traces.rs @@ -0,0 +1,134 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +use std::num::NonZeroUsize; +use std::path::PathBuf; +use std::str::FromStr; +pub use tracing_appender::non_blocking; +pub use tracing_appender::non_blocking::WorkerGuard; +use tracing_appender::rolling::{RollingFileAppender, Rotation}; +use tracing_subscriber::filter::EnvFilter; +use tracing_subscriber::{fmt, layer::SubscriberExt, Layer}; + +use crate::tracing_layers::DomainEventFilterLayer; + +#[derive(Debug, Clone)] +pub enum RotationKind { + Minutely, + Hourly, + Daily, + Never, +} + +impl RotationKind { + fn to_tracing_rotation(&self) -> Rotation { + match self { + RotationKind::Minutely => Rotation::DAILY, + RotationKind::Hourly => Rotation::HOURLY, + RotationKind::Daily => Rotation::DAILY, + RotationKind::Never => Rotation::NEVER, + } + } +} + +impl FromStr for RotationKind { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "minutely" => Ok(RotationKind::Minutely), + "hourly" => Ok(RotationKind::Hourly), + "daily" => Ok(RotationKind::Daily), + "never" => Ok(RotationKind::Never), + _ => Err(format!("invalid rotation kind: {}", s)), + } + } +} + +#[derive(Default)] +pub struct FileLayerConfig { + pub enabled: bool, + pub directory: Option, + pub max_log_files: Option, + pub rotation: Option, + pub domain_filter: Option>, + pub events_filter: Option>, +} + +// Register a tracing subscriber with the given options +// Returns a guard that must be kept alive for the duration of the program (because it's non-blocking and needs to flush) +pub fn register_tracing_subscriber( + console_level_filter: EnvFilter, + file_level_filter: EnvFilter, + file_opts: FileLayerConfig, +) -> Option { + // log all traces to stderr (reserving stdout for any actual output such as from the CLI commands) + let console_layer = fmt::layer() + .with_writer(std::io::stderr) + .with_target(false) + .with_file(true) + .with_line_number(true) + .with_filter(console_level_filter); + + let (file_layer, file_guard) = if file_opts.enabled { + let (non_blocking, file_guard) = non_blocking(file_appender_from_opts(&file_opts)); + + let file_layer = fmt::layer() + .json() + .with_writer(non_blocking) + .with_span_events(fmt::format::FmtSpan::CLOSE) + .with_target(false) + .with_file(true) + .with_line_number(true) + .with_filter(file_level_filter); + + let domains = file_opts + .domain_filter + .map(|v| v.iter().map(|s| s.to_string()).collect()); + let events = file_opts + .events_filter + .map(|v| v.iter().map(|s| s.to_string()).collect()); + + let file_layer = DomainEventFilterLayer::new(domains, events, file_layer); + + (Some(file_layer), Some(file_guard)) + } else { + (None, None) + }; + + let registry = tracing_subscriber::registry() + .with(console_layer) + .with(file_layer); + + tracing::subscriber::set_global_default(registry) + .expect("Unable to set a global tracing subscriber"); + + file_guard +} + +fn file_appender_from_opts(opts: &FileLayerConfig) -> RollingFileAppender { + let directory = opts + .directory + .as_deref() + .expect("missing file log directory"); + let mut appender = RollingFileAppender::builder().filename_suffix("traces"); + + if let Some(max_log_files) = opts.max_log_files { + println!("max log files: {}", max_log_files); + + appender = appender.max_log_files( + NonZeroUsize::new(max_log_files) + .expect("max_log_files must be greater than 0") + .into(), + ); + }; + + if let Some(rotation_kind) = &opts.rotation { + println!("rotation kind: {:?}", rotation_kind); + appender = appender.rotation(rotation_kind.to_tracing_rotation()); + }; + + appender + .build(directory) + .expect("failed to create traces appender") +} diff --git a/ipc/observability/src/tracing_layers.rs b/ipc/observability/src/tracing_layers.rs new file mode 100644 index 000000000..f680fbdcb --- /dev/null +++ b/ipc/observability/src/tracing_layers.rs @@ -0,0 +1,99 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +use tracing::{field::Visit, Event, Subscriber}; +use tracing_subscriber::{layer::Context, Layer}; + +const DOMAIN_FIELD: &str = "domain"; +const EVENT_FIELD: &str = "event"; + +pub struct DomainEventFilterLayer { + domains: Option>, + events: Option>, + inner: L, +} + +impl DomainEventFilterLayer { + pub fn new(domains: Option>, events: Option>, inner: L) -> Self { + DomainEventFilterLayer { + domains, + events, + inner, + } + } +} + +impl Layer for DomainEventFilterLayer +where + S: Subscriber, + L: Layer, +{ + fn on_event(&self, event: &Event, ctx: Context) { + if self.domains.is_none() && self.events.is_none() { + self.inner.on_event(event, ctx); + return; + } + + let mut visitor = EventVisitor::new(); + event.record(&mut visitor); + + if self.domains.is_some() + && !visitor.domain.map_or(false, |d| { + self.domains + .as_ref() + .unwrap() + .iter() + .any(|dom| d.contains(dom)) + }) + { + return; + } + + if self.events.is_some() + && !visitor.event.map_or(false, |e| { + self.events + .as_ref() + .unwrap() + .iter() + .any(|evt| e.contains(evt)) + }) + { + return; + } + + self.inner.on_event(event, ctx); + } +} + +#[derive(Debug)] +struct EventVisitor { + domain: Option, + event: Option, +} + +impl EventVisitor { + fn new() -> Self { + EventVisitor { + domain: None, + event: None, + } + } +} + +impl Visit for EventVisitor { + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + if field.name() != DOMAIN_FIELD { + return; + } + + self.domain = Some(value.to_string()); + } + + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + if field.name() != EVENT_FIELD { + return; + } + + self.event = Some(format!("{:?}", value)); + } +} diff --git a/ipld/resolver/src/vote_record.rs b/ipld/resolver/src/vote_record.rs index ccaa1ad2c..3678d5e47 100644 --- a/ipld/resolver/src/vote_record.rs +++ b/ipld/resolver/src/vote_record.rs @@ -5,6 +5,8 @@ use libp2p::identity::{Keypair, PublicKey}; use serde::de::{DeserializeOwned, Error}; use serde::{Deserialize, Serialize}; +use std::fmt::Display; + use crate::{ signed_record::{Record, SignedRecord}, Timestamp, @@ -15,6 +17,12 @@ use crate::{ #[derive(Debug, Clone, Eq, PartialEq, Hash, PartialOrd, Ord)] pub struct ValidatorKey(PublicKey); +impl Display for ValidatorKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.to_peer_id().fmt(f) + } +} + impl Serialize for ValidatorKey { fn serialize(&self, serializer: S) -> Result where