Skip to content

Commit

Permalink
feat(node): new observability architecture + events (#1053)
Browse files Browse the repository at this point in the history
Co-authored-by: raulk <[email protected]>
Co-authored-by: raulk <[email protected]>
  • Loading branch information
3 people authored Jul 19, 2024
1 parent 8242a75 commit 6aaa95c
Show file tree
Hide file tree
Showing 42 changed files with 1,353 additions and 488 deletions.
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ members = [
"ipc/provider",
"ipc/api",
"ipc/types",
"ipc/observability",

# ipld
"ipld/resolver",
Expand Down Expand Up @@ -157,7 +158,11 @@ tokio-util = { version = "0.7.8", features = ["compat"] }
tokio-tungstenite = { version = "0.18.0", features = ["native-tls"] }
toml = "0.8"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "registry"] }
tracing-subscriber = { version = "0.3", features = [
"env-filter",
"json",
"registry",
] }
tracing-appender = "0.2.3"
url = { version = "2.4.1", features = ["serde"] }
zeroize = "1.6"
Expand All @@ -168,6 +173,7 @@ ipc-provider = { path = "ipc/provider" }
ipc-wallet = { path = "ipc/wallet", features = ["with-ethers"] }
ipc_ipld_resolver = { path = "ipld/resolver" }
ipc-types = { path = "ipc/types" }
ipc-observability = { path = "ipc/observability" }
ipc_actors_abis = { path = "contracts/binding" }

# Vendored for cross-compilation, see https://github.com/cross-rs/cross/wiki/Recipes#openssl
Expand Down
3 changes: 2 additions & 1 deletion fendermint/abci/examples/kvstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ impl Application for KVStore {
&self,
request: request::PrepareProposal,
) -> Result<response::PrepareProposal> {
let mut txs = take_until_max_size(request.txs, request.max_tx_bytes.try_into().unwrap());
let (txs, _) = take_until_max_size(request.txs, request.max_tx_bytes.try_into().unwrap());
let mut txs = txs.clone();

// Enfore transaciton limit so that we don't have a problem with buffering.
txs.truncate(MAX_TXNS);
Expand Down
2 changes: 1 addition & 1 deletion fendermint/abci/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub trait Application {
&self,
request: request::PrepareProposal,
) -> AbciResult<response::PrepareProposal> {
let txs = take_until_max_size(request.txs, request.max_tx_bytes.try_into().unwrap());
let (txs, _) = take_until_max_size(request.txs, request.max_tx_bytes.try_into().unwrap());

Ok(response::PrepareProposal { txs })
}
Expand Down
4 changes: 2 additions & 2 deletions fendermint/abci/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
/// Take the first transactions until the first one that would exceed the maximum limit.
///
/// The function does not skip or reorder transaction even if a later one would stay within the limit.
pub fn take_until_max_size<T: AsRef<[u8]>>(txs: Vec<T>, max_tx_bytes: usize) -> Vec<T> {
pub fn take_until_max_size<T: AsRef<[u8]>>(txs: Vec<T>, max_tx_bytes: usize) -> (Vec<T>, usize) {
let mut size: usize = 0;
let mut out = Vec::new();
for tx in txs {
Expand All @@ -15,5 +15,5 @@ pub fn take_until_max_size<T: AsRef<[u8]>>(txs: Vec<T>, max_tx_bytes: usize) ->
size += bz.len();
out.push(tx);
}
out
(out, size)
}
2 changes: 2 additions & 0 deletions fendermint/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ fendermint_vm_resolver = { path = "../vm/resolver" }
fendermint_vm_snapshot = { path = "../vm/snapshot" }
fendermint_vm_topdown = { path = "../vm/topdown" }


fvm = { workspace = true }
fvm_ipld_blockstore = { workspace = true }
fvm_ipld_car = { workspace = true }
Expand All @@ -72,6 +73,7 @@ fvm_shared = { workspace = true }
ipc-api = { workspace = true }
ipc-provider = { workspace = true }
ipc_ipld_resolver = { workspace = true }
ipc-observability = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions fendermint/app/options/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ fvm_shared = { workspace = true }
ipc-api = { workspace = true }
ipc-types = { workspace = true }
url = { workspace = true }
ipc-observability = { workspace = true }

fendermint_vm_genesis = { path = "../../vm/genesis" }
fendermint_vm_actor_interface = { path = "../../vm/actor_interface" }
Expand Down
53 changes: 48 additions & 5 deletions fendermint/app/options/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use clap::{Args, Parser, Subcommand};
use config::ConfigArgs;
use debug::DebugArgs;
use fvm_shared::address::Network;
use ipc_observability::traces::FileLayerConfig;
use lazy_static::lazy_static;
use tracing_subscriber::EnvFilter;

Expand All @@ -27,7 +28,7 @@ pub mod run;
mod log;
mod parse;

use log::{parse_log_level, LogLevel};
use log::{parse_log_level, parse_rotation_kind, LogLevel, RotationKind};
use parse::parse_network;

lazy_static! {
Expand Down Expand Up @@ -102,13 +103,44 @@ pub struct Options {
#[arg(long, env = "FM_CONFIG_DIR")]
config_dir: Option<PathBuf>,

// TODO Karel - move all FM_LOG_FILE* flags to a configuration file instead

// Enable logging to a file.
#[arg(long, env = "FM_LOG_FILE_ENABLED")]
pub log_file_enabled: Option<bool>,

/// Set a custom directory for ipc log files.
#[arg(long, env = "FM_LOG_DIR")]
#[arg(long, env = "FM_LOG_FILE_DIR")]
pub log_dir: Option<PathBuf>,

/// Set a custom prefix for ipc log files.
#[arg(long, env = "FM_LOG_FILE_PREFIX")]
pub log_file_prefix: Option<String>,
#[arg(long, env = "FM_LOG_FILE_MAX_FILES")]
pub max_log_files: Option<usize>,

#[arg(
long,
default_value = "daily",
value_enum,
env = "FM_LOG_FILE_ROTATION",
help = "The kind of rotation to use for log files. Options: minutely, hourly, daily, never.",
value_parser = parse_rotation_kind,
)]
pub log_files_rotation: Option<RotationKind>,

#[arg(
long,
env = "FM_LOG_FILE_DOMAINS_FILTER",
help = "Filter log events by domains. Only events from the specified domains will be logged. Comma separated.",
value_delimiter = ','
)]
pub domains_filter: Option<Vec<String>>,

#[arg(
long,
env = "FM_LOG_FILE_EVENTS_FILTER",
help = "Filter log events by name. Only events with the specified names will be logged. Comma separated.",
value_delimiter = ','
)]
pub events_filter: Option<Vec<String>>,

/// Optionally override the default configuration.
#[arg(short, long, default_value = "dev")]
Expand Down Expand Up @@ -162,6 +194,17 @@ impl Options {
}
}

pub fn log_file_config(&self) -> FileLayerConfig {
FileLayerConfig {
enabled: self.log_file_enabled.unwrap_or(false),
directory: self.log_dir.clone(),
max_log_files: self.max_log_files,
rotation: self.log_files_rotation.clone(),
domain_filter: self.domains_filter.clone(),
events_filter: self.events_filter.clone(),
}
}

/// Path to the configuration directories.
///
/// If not specified then returns the default under the home directory.
Expand Down
12 changes: 12 additions & 0 deletions fendermint/app/options/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use clap::{builder::PossibleValue, ValueEnum};
use lazy_static::lazy_static;
use tracing_subscriber::EnvFilter;

pub use ipc_observability::traces::RotationKind;

/// Standard log levels, or something we can pass to <https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html>
///
/// To be fair all of these could be handled by the `EnvFilter`, even `off`,
Expand Down Expand Up @@ -79,3 +81,13 @@ pub fn parse_log_level(s: &str) -> Result<LogLevel, String> {
Ok(LogLevel::Filter(s.to_string()))
}
}

pub fn parse_rotation_kind(s: &str) -> Result<RotationKind, String> {
match s {
"minutely" => Ok(RotationKind::Minutely),
"hourly" => Ok(RotationKind::Hourly),
"daily" => Ok(RotationKind::Daily),
"never" => Ok(RotationKind::Never),
_ => Err(format!("invalid rotation kind: {}", s)),
}
}
59 changes: 47 additions & 12 deletions fendermint/app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use fendermint_abci::{AbciResult, Application};
use fendermint_storage::{
Codec, Encode, KVCollection, KVRead, KVReadable, KVStore, KVWritable, KVWrite,
};
use fendermint_tracing::emit;
use fendermint_vm_core::Timestamp;
use fendermint_vm_interpreter::bytes::{
BytesMessageApplyRes, BytesMessageCheckRes, BytesMessageQuery, BytesMessageQueryRes,
Expand All @@ -37,13 +36,17 @@ use fvm_shared::chainid::ChainID;
use fvm_shared::clock::ChainEpoch;
use fvm_shared::econ::TokenAmount;
use fvm_shared::version::NetworkVersion;
use ipc_observability::{emit, serde::HexEncodableBlockHash};
use num_traits::Zero;
use serde::{Deserialize, Serialize};
use tendermint::abci::request::CheckTxKind;
use tendermint::abci::{request, response};
use tracing::instrument;

use crate::events::{NewBlock, ProposalProcessed};
use crate::observe::{
BlockCommitted, BlockProposalEvaluated, BlockProposalReceived, BlockProposalSent, Message,
MpoolReceived,
};
use crate::AppExitCode;
use crate::BlockHeight;
use crate::{tmconv::*, VERSION};
Expand Down Expand Up @@ -619,15 +622,26 @@ where
// Update the check state.
*guard = Some(state);

let mut mpool_received_trace = MpoolReceived::default();

let response = match result {
Err(e) => invalid_check_tx(AppError::InvalidEncoding, e.description),
Ok(result) => match result {
Err(IllegalMessage) => invalid_check_tx(AppError::IllegalMessage, "".to_owned()),
Ok(Err(InvalidSignature(d))) => invalid_check_tx(AppError::InvalidSignature, d),
Ok(Ok(ret)) => to_check_tx(ret),
Ok(Ok(ret)) => {
mpool_received_trace.message = Some(Message::from(&ret.message));
to_check_tx(ret)
}
},
};

mpool_received_trace.accept = response.code.is_ok();
if !mpool_received_trace.accept {
mpool_received_trace.reason = Some(format!("{:?} - {}", response.code, response.info));
}

emit(mpool_received_trace);
Ok(response)
}

Expand All @@ -650,7 +664,14 @@ where
.context("failed to prepare proposal")?;

let txs = txs.into_iter().map(bytes::Bytes::from).collect();
let txs = take_until_max_size(txs, request.max_tx_bytes.try_into().unwrap());
let (txs, size) = take_until_max_size(txs, request.max_tx_bytes.try_into().unwrap());

emit(BlockProposalSent {
validator: &request.proposer_address,
height: request.height.value(),
tx_count: txs.len(),
size,
});

Ok(response::PrepareProposal { txs })
}
Expand All @@ -666,6 +687,7 @@ where
"process proposal"
);
let txs: Vec<_> = request.txs.into_iter().map(|tx| tx.to_vec()).collect();
let size_txs = txs.iter().map(|tx| tx.len()).sum::<usize>();
let num_txs = txs.len();

let accept = self
Expand All @@ -674,12 +696,22 @@ where
.await
.context("failed to process proposal")?;

emit!(ProposalProcessed {
is_accepted: accept,
block_height: request.height.value(),
block_hash: request.hash.to_string().as_str(),
num_txs,
proposer: request.proposer_address.to_string().as_str()
emit(BlockProposalReceived {
height: request.height.value(),
hash: HexEncodableBlockHash(request.hash.into()),
size: size_txs,
tx_count: num_txs,
validator: &request.proposer_address,
});

emit(BlockProposalEvaluated {
height: request.height.value(),
hash: HexEncodableBlockHash(request.hash.into()),
size: size_txs,
tx_count: num_txs,
validator: &request.proposer_address,
accept,
reason: None,
});

if accept {
Expand Down Expand Up @@ -867,12 +899,15 @@ where
// Commit app state to the datastore.
self.set_committed_state(state)?;

emit!(NewBlock { block_height });

// Reset check state.
let mut guard = self.check_state.lock().await;
*guard = None;

emit(BlockCommitted {
height: block_height,
app_hash: HexEncodableBlockHash(app_hash.clone().into()),
});

Ok(response::Commit {
data: app_hash.into(),
retain_height: retain_height.try_into().expect("height is valid"),
Expand Down
Loading

0 comments on commit 6aaa95c

Please sign in to comment.