Skip to content

Commit

Permalink
feat: address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
karlem committed Jul 18, 2024
1 parent 2a0b425 commit e7e7cb5
Show file tree
Hide file tree
Showing 19 changed files with 214 additions and 259 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 14 additions & 18 deletions fendermint/app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use fendermint_vm_interpreter::fvm::store::ReadOnlyBlockstore;
use fendermint_vm_interpreter::fvm::{FvmApplyRet, FvmGenesisOutput, PowerUpdates};
use fendermint_vm_interpreter::signed::InvalidSignature;
use fendermint_vm_interpreter::{
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProposalInterpreter, QueryInterpreter,
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProcessResult, ProposalInterpreter,
QueryInterpreter,
};
use fendermint_vm_message::query::FvmQueryHeight;
use fendermint_vm_snapshot::{SnapshotClient, SnapshotError};
Expand All @@ -44,7 +45,8 @@ use tendermint::abci::{request, response};
use tracing::instrument;

use crate::observe::{
BlockCommitted, BlockProposalEvaluated, BlockProposalReceived, BlockProposalSent, MpoolReceived,
BlockCommitted, BlockProposalEvaluated, BlockProposalReceived, BlockProposalSent, Message,
MpoolReceived,
};
use crate::AppExitCode;
use crate::BlockHeight;
Expand Down Expand Up @@ -629,23 +631,14 @@ where
Err(IllegalMessage) => invalid_check_tx(AppError::IllegalMessage, "".to_owned()),
Ok(Err(InvalidSignature(d))) => invalid_check_tx(AppError::InvalidSignature, d),
Ok(Ok(ret)) => {
mpool_received_trace.from = Some(ret.message.from);
mpool_received_trace.to = Some(ret.message.to);
mpool_received_trace.value = Some(ret.message.value.clone());
mpool_received_trace.param_len = ret.message.params.len();
mpool_received_trace.gas_limit = ret.message.gas_limit;
mpool_received_trace.fee_cap = Some(ret.message.gas_fee_cap.clone());
mpool_received_trace.premium = Some(ret.message.gas_premium.clone());

mpool_received_trace.message = Some(Message::from(&ret.message));
to_check_tx(ret)
}
},
};

if response.code.is_ok() {
mpool_received_trace.accept = true;
} else {
mpool_received_trace.accept = false;
mpool_received_trace.accept = response.code.is_ok();
if !mpool_received_trace.accept {
mpool_received_trace.reason = Some(format!("{:?} - {}", response.code, response.info));
}

Expand Down Expand Up @@ -698,7 +691,10 @@ where
let size_txs = txs.iter().map(|tx| tx.len()).sum::<usize>();
let num_txs = txs.len();

let process_result = self.interpreter.process(self.chain_env.clone(), txs).await;
let process_result = self
.interpreter
.process(self.chain_env.clone(), txs)
.await?;

emit(BlockProposalReceived {
height: request.height.value(),
Expand All @@ -719,10 +715,10 @@ where
};

let process_proposal = match process_result {
Ok(_) => response::ProcessProposal::Accept,
Err(e) => {
ProcessResult::Accepted => response::ProcessProposal::Accept,
ProcessResult::Rejected(reason) => {
proposal_evaluated.accept = false;
proposal_evaluated.reason = Some(e);
proposal_evaluated.reason = Some(reason);
response::ProcessProposal::Reject
}
};
Expand Down
20 changes: 18 additions & 2 deletions fendermint/app/src/cmd/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use fendermint_vm_snapshot::{SnapshotManager, SnapshotParams};
use fendermint_vm_topdown::observe::register_metrics as register_topdown_metrics;
use fendermint_vm_topdown::proxy::{IPCProviderProxy, IPCProviderProxyWithLatency};
use fendermint_vm_topdown::sync::launch_polling_syncer;
use fendermint_vm_topdown::voting::{publish_vote_loop, VoteTally};
use fendermint_vm_topdown::voting::{publish_vote_loop, Error as VoteError, VoteTally};
use fendermint_vm_topdown::{CachedFinalityProvider, IPCParentFinality, Toggle};
use fvm_shared::address::{current_network, Address, Network};
use ipc_ipld_resolver::{Event as ResolverEvent, VoteRecord};
Expand Down Expand Up @@ -539,14 +539,30 @@ async fn dispatch_vote(
tracing::debug!("ignoring vote; topdown disabled");
return;
}
let _res = atomically_or_err(|| {
let res = atomically_or_err(|| {
parent_finality_votes.add_vote(
vote.public_key.clone(),
f.height,
f.block_hash.clone(),
)
})
.await;

match res {
Err(e @ VoteError::Equivocation(_, _, _, _)) => {
tracing::warn!(error = e.to_string(), "failed to handle vote");
}
Err(e @ (
VoteError::Uninitialized // early vote, we're not ready yet
| VoteError::UnpoweredValidator(_) // maybe arrived too early or too late, or spam
| VoteError::UnexpectedBlock(_, _) // won't happen here
)) => {
tracing::debug!(error = e.to_string(), "failed to handle vote");
}
_ => {
tracing::debug!("vote handled");
}
};
}
}
}
41 changes: 33 additions & 8 deletions fendermint/app/src/observe.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use fendermint_vm_message::conv::from_eth;
use fvm_shared::address::Address;
use fvm_shared::econ::TokenAmount;

use fendermint_vm_interpreter::errors::ProcessError;
use fendermint_vm_interpreter::fvm::FvmMessage;
use tendermint::account::Id;

use ipc_observability::{
Expand Down Expand Up @@ -113,25 +115,48 @@ impl Recordable for BlockCommitted {
}
}

#[derive(Debug)]
pub struct Message {
pub from: Address,
pub to: Address,
pub value: TokenAmount,
pub gas_limit: u64,
pub fee_cap: TokenAmount,
pub premium: TokenAmount,
}

impl From<&FvmMessage> for Message {
fn from(fvm_message: &FvmMessage) -> Self {
Message {
from: fvm_message.from,
to: fvm_message.to,
value: fvm_message.value.clone(),
gas_limit: fvm_message.gas_limit,
fee_cap: fvm_message.gas_fee_cap.clone(),
premium: fvm_message.gas_premium.clone(),
}
}
}

#[derive(Debug, Default)]
pub struct MpoolReceived {
// TODO - add cid later on
// pub message_cid: &'a str,
pub from: Option<Address>,
pub to: Option<Address>,
pub value: Option<TokenAmount>,
pub param_len: usize,
pub gas_limit: u64,
pub fee_cap: Option<TokenAmount>,
pub premium: Option<TokenAmount>,
pub message: Option<Message>,
pub accept: bool,
pub reason: Option<String>,
}

impl Recordable for MpoolReceived {
fn record_metrics(&self) {
let from = self
.message
.as_ref()
.map(|m| m.from.to_string())
.unwrap_or("".to_string());

MPOOL_RECEIVED
.with_label_values(&[&self.accept.to_string(), self.from.map_or("", |_| "")])
.with_label_values(&[&self.accept.to_string(), &from])
.inc();
}
}
Expand Down
1 change: 1 addition & 0 deletions fendermint/vm/interpreter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tendermint-rpc = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }
prometheus = { workspace = true }
strum = { workspace = true }

cid = { workspace = true }
fvm = { workspace = true }
Expand Down
13 changes: 9 additions & 4 deletions fendermint/vm/interpreter/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use crate::{
chain::{ChainMessageApplyRet, ChainMessageCheckRes},
errors::ProcessError,
fvm::{FvmQuery, FvmQueryRet},
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProposalInterpreter, QueryInterpreter,
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProcessResult, ProposalInterpreter,
QueryInterpreter,
};

pub type BytesMessageApplyRes = Result<ChainMessageApplyRet, IpldError>;
Expand Down Expand Up @@ -130,13 +131,15 @@ where
&self,
state: Self::State,
msgs: Vec<Self::Message>,
) -> anyhow::Result<bool, ProcessError> {
) -> anyhow::Result<ProcessResult> {
if msgs.len() > self.max_msgs {
tracing::warn!(
block_msgs = msgs.len(),
"rejecting block: too many messages"
);
return Err(ProcessError::TooManyMessages(msgs.len()));
return Ok(ProcessResult::Rejected(ProcessError::TooManyMessages(
msgs.len(),
)));
}

let mut chain_msgs = Vec::new();
Expand All @@ -157,7 +160,9 @@ where
"failed to decode message in proposal as ChainMessage"
);
if self.reject_malformed_proposal {
return Err(ProcessError::FailedToDecodeMessage(e.to_string()));
return Ok(ProcessResult::Rejected(
ProcessError::FailedToDecodeMessage(e.to_string()),
));
}
}
Ok(msg) => chain_msgs.push(msg),
Expand Down
13 changes: 8 additions & 5 deletions fendermint/vm/interpreter/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use crate::{
fvm::state::FvmExecState,
fvm::FvmMessage,
signed::{SignedMessageApplyRes, SignedMessageCheckRes, SyntheticMessage, VerifiableMessage},
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProposalInterpreter, QueryInterpreter,
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProcessResult, ProposalInterpreter,
QueryInterpreter,
};
use anyhow::{bail, Context};
use async_stm::atomically;
Expand Down Expand Up @@ -180,7 +181,7 @@ where
&self,
env: Self::State,
msgs: Vec<Self::Message>,
) -> anyhow::Result<bool, ProcessError> {
) -> anyhow::Result<ProcessResult> {
for msg in msgs {
match msg {
ChainMessage::Ipc(IpcMessage::BottomUpExec(msg)) => {
Expand All @@ -199,7 +200,7 @@ where
.await;

if !is_resolved {
return Err(ProcessError::CheckpointNotResolved);
return Ok(ProcessResult::Rejected(ProcessError::CheckpointNotResolved));
}
}
ChainMessage::Ipc(IpcMessage::TopDownExec(ParentFinality {
Expand All @@ -213,13 +214,15 @@ where
let is_final =
atomically(|| env.parent_finality_provider.check_proposal(&prop)).await;
if !is_final {
return Err(ProcessError::ParentFinalityNotAvailable);
return Ok(ProcessResult::Rejected(
ProcessError::ParentFinalityNotAvailable,
));
}
}
_ => {}
};
}
Ok(true)
Ok(ProcessResult::Accepted)
}
}

Expand Down
2 changes: 2 additions & 0 deletions fendermint/vm/interpreter/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ pub enum ProcessError {
TooManyMessages(usize),
#[error("failed to decode message in proposal as ChainMessage: {0}")]
FailedToDecodeMessage(String),
#[error("")]
Empty,
}
26 changes: 11 additions & 15 deletions fendermint/vm/interpreter/src/fvm/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use async_trait::async_trait;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::RawBytes;
use fvm_shared::{address::Address, error::ExitCode};
use ipc_observability::emit;
use ipc_observability::{emit, measure_time};

use crate::CheckInterpreter;

use super::{
observe::MsgExecCheck,
state::{ElapsedExecution, FvmExecState},
observe::{MsgExec, MsgExecPurpose},
state::FvmExecState,
store::ReadOnlyBlockstore,
FvmMessage, FvmMessageInterpreter,
};
Expand Down Expand Up @@ -116,20 +116,16 @@ where
} else if self.exec_in_check {
// Instead of modifying just the partial state, we will execute the call in earnest.
// This is required for fully supporting the Ethereum API "pending" queries, if that's needed.
let (apply_ret, _, latency) =
ElapsedExecution::new(&mut state).execute_explicit(msg.clone())?;

emit(MsgExecCheck {
let (execution_result, latency) =
measure_time(|| state.execute_explicit(msg.clone()));

let (apply_ret, _) = execution_result?;

emit(MsgExec {
purpose: MsgExecPurpose::Check,
height: state.block_height(),
from: msg.from.to_string().as_str(),
to: msg.to.to_string().as_str(),
value: msg.value.to_string().as_str(),
method_num: msg.method_num,
gas_limit: msg.gas_limit,
gas_price: msg.gas_premium.to_string().as_str(),
// TODO Karel - this should be the serialized params
params: msg.params.clone().bytes(),
nonce: msg.sequence,
message: msg.clone(),
duration: latency.as_secs_f64(),
exit_code: apply_ret.msg_receipt.exit_code.value(),
});
Expand Down
Loading

0 comments on commit e7e7cb5

Please sign in to comment.