Skip to content

Commit

Permalink
feat: use replacable cache instead
Browse files Browse the repository at this point in the history
  • Loading branch information
karlem committed Nov 29, 2024
1 parent cca6e49 commit 1e13f86
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 76 deletions.
61 changes: 45 additions & 16 deletions fendermint/app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use async_trait::async_trait;
use cid::Cid;
use fendermint_abci::util::take_until_max_size;
use fendermint_abci::{AbciResult, Application};
use fendermint_crypto::PublicKey;
use fendermint_storage::{
Codec, Encode, KVCollection, KVRead, KVReadable, KVStore, KVWritable, KVWrite,
};
Expand All @@ -22,7 +23,7 @@ use fendermint_vm_interpreter::fvm::state::{
FvmUpdatableParams,
};
use fendermint_vm_interpreter::fvm::store::ReadOnlyBlockstore;
use fendermint_vm_interpreter::fvm::{BlockGasLimit, CurrentValidators, FvmApplyRet, PowerUpdates};
use fendermint_vm_interpreter::fvm::{BlockGasLimit, FvmApplyRet, PowerUpdates};
use fendermint_vm_interpreter::genesis::{read_genesis_car, GenesisAppState};
use fendermint_vm_interpreter::signed::InvalidSignature;
use fendermint_vm_interpreter::{
Expand All @@ -46,14 +47,13 @@ use tendermint::consensus::params::{
Params as TendermintConsensusParams, ValidatorParams as TendermintValidatorParams,
};
use tendermint::evidence::{Duration as TendermintDuration, Params as TendermintEvidenceParams};

use tracing::instrument;

use crate::observe::{
BlockCommitted, BlockProposalEvaluated, BlockProposalReceived, BlockProposalSent, Message,
MpoolReceived,
};
use crate::validators::ValidatorTracker;
use crate::validators::ValidatorCache;
use crate::AppExitCode;
use crate::BlockHeight;
use crate::{tmconv::*, VERSION};
Expand Down Expand Up @@ -186,8 +186,8 @@ where
///
/// Zero means unlimited.
state_hist_size: u64,
/// Tracks the validator
validators: ValidatorTracker,
/// Caches the validators.
validators_cache: Arc<tokio::sync::Mutex<Option<ValidatorCache>>>,
}

impl<DB, SS, S, I> App<DB, SS, S, I>
Expand Down Expand Up @@ -221,7 +221,7 @@ where
snapshots,
exec_state: Arc::new(tokio::sync::Mutex::new(None)),
check_state: Arc::new(tokio::sync::Mutex::new(None)),
validators: ValidatorTracker::new(),
validators_cache: Arc::new(tokio::sync::Mutex::new(None)),
};
app.init_committed_state()?;
Ok(app)
Expand Down Expand Up @@ -433,6 +433,37 @@ where
_ => Err(anyhow!("invalid app state json")),
}
}

/// Replaces the current validators cache with a new one.
async fn replace_validators_cache(&self) -> Result<()> {
let mut state = self
.read_only_view(None)?
.ok_or_else(|| anyhow!("exec state should be present"))?;

let mut cache = self.validators_cache.lock().await;
*cache = Some(ValidatorCache::new_from_state(&mut state)?);
Ok(())
}

/// Retrieves a validator from the cache, initializing it if necessary.
async fn get_validator_from_cache(&self, id: &tendermint::account::Id) -> Result<PublicKey> {
let mut cache = self.validators_cache.lock().await;

// If cache is not initialized, update it from the state
if cache.is_none() {
let mut state = self
.read_only_view(None)?
.ok_or_else(|| anyhow!("exec state should be present"))?;

*cache = Some(ValidatorCache::new_from_state(&mut state)?);
}

// Retrieve the validator from the cache
cache
.as_ref()
.context("Validator cache is not available")?
.get_validator(id)
}
}

// NOTE: The `Application` interface doesn't allow failures at the moment. The protobuf
Expand Down Expand Up @@ -460,7 +491,7 @@ where
Message = Vec<u8>,
BeginOutput = FvmApplyRet,
DeliverOutput = BytesMessageApplyRes,
EndOutput = (PowerUpdates, BlockGasLimit, CurrentValidators),
EndOutput = (PowerUpdates, BlockGasLimit),
>,
I: CheckInterpreter<
State = FvmExecState<ReadOnlyBlockstore<SS>>,
Expand Down Expand Up @@ -500,11 +531,6 @@ where

let (validators, state_params) = read_genesis_car(genesis_bytes, &self.state_store).await?;

let validators_keys = validators.iter().map(|v| v.public_key.clone()).collect();

// Set the initial validator set to validators cache.
self.validators.set_validators(validators_keys)?;

let validators =
to_validator_updates(validators).context("failed to convert validators")?;

Expand Down Expand Up @@ -774,8 +800,8 @@ where
state_params.timestamp = to_timestamp(request.header.time);

let validator = self
.validators
.get_public_key(&request.header.proposer_address)?;
.get_validator_from_cache(&request.header.proposer_address)
.await?;

let state = FvmExecState::new(db, self.multi_engine.as_ref(), block_height, state_params)
.context("error creating new state")?
Expand Down Expand Up @@ -835,7 +861,7 @@ where
tracing::debug!(height = request.height, "end block");

// End the interpreter for this block.
let (power_updates, new_block_gas_limit, current_validators) = self
let (power_updates, new_block_gas_limit) = self
.modify_exec_state(|s| self.interpreter.end(s))
.await
.context("end failed")?;
Expand All @@ -844,7 +870,10 @@ where
let validator_updates =
to_validator_updates(power_updates.0).context("failed to convert validator updates")?;

self.validators.set_validators(current_validators)?;
// Replace the validator cache if the validator set has changed.
if !validator_updates.is_empty() {
self.replace_validators_cache().await?;
}

let consensus_param_updates = self
.update_block_gas_limit(new_block_gas_limit as i64)
Expand Down
75 changes: 36 additions & 39 deletions fendermint/app/src/validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,50 @@

//! Tracks the validator ID from Tendermint to their corresponding public key.
use anyhow::{anyhow, Result};
use anyhow::{anyhow, Ok, Result};
use fendermint_crypto::PublicKey;
use fendermint_vm_genesis::ValidatorKey;
use fendermint_vm_genesis::{Power, Validator};
use fendermint_vm_interpreter::fvm::state::ipc::GatewayCaller;
use fendermint_vm_interpreter::fvm::state::FvmExecState;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

#[derive(Clone)]
pub(crate) struct ValidatorTracker {
validator_mapping: Arc<RwLock<HashMap<tendermint::account::Id, PublicKey>>>,
}
use tendermint::account::Id as TendermintId;
use tendermint::PublicKey as TendermintPubKey;

impl ValidatorTracker {
pub fn new() -> Self {
Self {
validator_mapping: Arc::new(RwLock::new(HashMap::new())),
}
}
use fvm_ipld_blockstore::Blockstore;

/// Get the public key of the validator by ID.
/// Note that the ID is expected to be a validator.
pub fn get_public_key(&self, id: &tendermint::account::Id) -> Result<PublicKey> {
let keys = self
.validator_mapping
.read()
.map_err(|_| anyhow!("Failed to acquire read lock"))?;

keys.get(id)
.copied()
.ok_or_else(|| anyhow!("Validator not found: {:?}", id))
}
#[derive(Clone)]
pub(crate) struct ValidatorCache {
map: HashMap<TendermintId, PublicKey>,
}

/// Sets the validator keys mapping.
pub fn set_validators(&self, validators: Vec<ValidatorKey>) -> Result<()> {
let mut cache = self
.validator_mapping
.write()
.map_err(|_| anyhow!("Failed to acquire write lock to update validators"))?;
impl ValidatorCache {
pub fn new_from_state<SS>(state: &mut FvmExecState<SS>) -> Result<Self>
where
SS: Blockstore + Clone + 'static,
{
let gateway = GatewayCaller::default();
let (_, validators) = gateway.current_power_table(state)?;

cache.clear();
let map = validators
.iter()
.map(validator_to_map_entry)
.collect::<Result<HashMap<_, _>, _>>()?;

validators.into_iter().try_for_each(|validator_key| {
let tendermint_pub_key = tendermint::PublicKey::try_from(validator_key.clone())
.map_err(|_| anyhow!("Failed to convert ValidatorKey to Tendermint public key"))?;
Ok(Self { map })
}

let tendermint_id = tendermint::account::Id::from(tendermint_pub_key);
cache.insert(tendermint_id, *validator_key.public_key());
Ok(())
})
pub fn get_validator(&self, id: &tendermint::account::Id) -> Result<PublicKey> {
self.map
.get(id)
.cloned()
.ok_or_else(|| anyhow!("validator not found"))
}
}

fn validator_to_map_entry(v: &Validator<Power>) -> Result<(TendermintId, PublicKey)> {
let tendermint_pub_key: TendermintPubKey = TendermintPubKey::try_from(v.public_key.clone())?;
let id = TendermintId::from(tendermint_pub_key);
let key = *v.public_key.public_key();
Ok((id, key))
}
4 changes: 2 additions & 2 deletions fendermint/testing/contract-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{future::Future, sync::Arc};

use fendermint_crypto::PublicKey;
use fendermint_vm_genesis::Genesis;
use fendermint_vm_interpreter::fvm::{BlockGasLimit, CurrentValidators, PowerUpdates};
use fendermint_vm_interpreter::fvm::{BlockGasLimit, PowerUpdates};
use fendermint_vm_interpreter::genesis::{create_test_genesis_state, GenesisOutput};
use fendermint_vm_interpreter::{
fvm::{
Expand Down Expand Up @@ -67,7 +67,7 @@ where
Message = FvmMessage,
BeginOutput = FvmApplyRet,
DeliverOutput = FvmApplyRet,
EndOutput = (PowerUpdates, BlockGasLimit, CurrentValidators),
EndOutput = (PowerUpdates, BlockGasLimit),
>,
{
pub async fn new(interpreter: I, genesis: Genesis) -> anyhow::Result<Self> {
Expand Down
4 changes: 2 additions & 2 deletions fendermint/vm/interpreter/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0, MIT
use crate::fvm::state::ipc::GatewayCaller;
use crate::fvm::store::ReadOnlyBlockstore;
use crate::fvm::{topdown, BlockGasLimit, CurrentValidators, FvmApplyRet, PowerUpdates};
use crate::fvm::{topdown, BlockGasLimit, FvmApplyRet, PowerUpdates};
use crate::selector::{GasLimitSelector, MessageSelector};
use crate::{
fvm::state::FvmExecState,
Expand Down Expand Up @@ -246,7 +246,7 @@ where
Message = VerifiableMessage,
DeliverOutput = SignedMessageApplyRes,
State = FvmExecState<DB>,
EndOutput = (PowerUpdates, BlockGasLimit, CurrentValidators),
EndOutput = (PowerUpdates, BlockGasLimit),
>,
{
// The state consists of the resolver pool, which this interpreter needs, and the rest of the
Expand Down
18 changes: 3 additions & 15 deletions fendermint/vm/interpreter/src/fvm/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use anyhow::Context;
use async_trait::async_trait;
use std::collections::HashMap;

use fendermint_crypto::PublicKey;
use fendermint_vm_actor_interface::{chainmetadata, cron, system};
use fendermint_vm_genesis::ValidatorKey;
use fvm::executor::ApplyRet;
use fvm_ipld_blockstore::Blockstore;
use fvm_shared::{address::Address, ActorID, MethodNum, BLOCK_GAS_LIMIT};
Expand All @@ -20,7 +18,7 @@ use super::{
checkpoint::{self, PowerUpdates},
observe::{CheckpointFinalized, MsgExec, MsgExecPurpose},
state::FvmExecState,
BlockGasLimit, CurrentValidators, FvmMessage, FvmMessageInterpreter,
BlockGasLimit, FvmMessage, FvmMessageInterpreter,
};

/// The return value extended with some things from the message that
Expand Down Expand Up @@ -50,7 +48,7 @@ where
/// Return validator power updates and the next base fee.
/// Currently ignoring events as there aren't any emitted by the smart contract,
/// but keep in mind that if there were, those would have to be propagated.
type EndOutput = (PowerUpdates, BlockGasLimit, CurrentValidators);
type EndOutput = (PowerUpdates, BlockGasLimit);

async fn begin(
&self,
Expand Down Expand Up @@ -254,17 +252,7 @@ where
PowerUpdates::default()
};

let current_validators = self
.gateway
.current_membership(&mut state)
.context("failed to update membership")?
.validators
.iter()
.filter_map(|v| PublicKey::parse_slice(&v.metadata, None).ok())
.map(ValidatorKey)
.collect();

let ret = (updates, next_gas_market.block_gas_limit, current_validators);
let ret = (updates, next_gas_market.block_gas_limit);
Ok((state, ret))
}
}
2 changes: 0 additions & 2 deletions fendermint/vm/interpreter/src/fvm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub use check::FvmCheckRet;
pub use checkpoint::PowerUpdates;
pub use exec::FvmApplyRet;
use fendermint_crypto::{PublicKey, SecretKey};
use fendermint_vm_genesis::ValidatorKey;
pub use fendermint_vm_message::query::FvmQuery;
use fvm_ipld_blockstore::Blockstore;
use fvm_shared::address::Address;
Expand All @@ -35,7 +34,6 @@ use self::{state::ipc::GatewayCaller, upgrades::UpgradeScheduler};
pub type FvmMessage = fvm_shared::message::Message;
pub type BaseFee = fvm_shared::econ::TokenAmount;
pub type BlockGasLimit = u64;
pub type CurrentValidators = Vec<ValidatorKey>;

#[derive(Clone)]
pub struct ValidatorContext<C> {
Expand Down

0 comments on commit 1e13f86

Please sign in to comment.