diff --git a/fendermint/vm/topdown/src/observation.rs b/fendermint/vm/topdown/src/observation.rs index 9e481029f..d59c9d9fa 100644 --- a/fendermint/vm/topdown/src/observation.rs +++ b/fendermint/vm/topdown/src/observation.rs @@ -32,13 +32,13 @@ pub struct ObservationConfig { /// The content that validators gossip among each other. #[derive(Serialize, Deserialize, Hash, Debug, Clone, Eq, PartialEq, Arbitrary)] pub struct Observation { - pub(crate) parent_height: u64, + pub(crate) parent_subnet_height: u64, /// The hash of the chain unit at that height. Usually a block hash, but could /// be another entity (e.g. tipset CID), depending on the parent chain /// and our interface to it. For example, if the parent is a Filecoin network, /// this would be a tipset CID coerced into a block hash if queried through /// the Eth API, or the tipset CID as-is if accessed through the Filecoin API. - pub(crate) parent_hash: Bytes, + pub(crate) parent_subnet_hash: Bytes, /// A rolling/cumulative commitment to topdown effects since the beginning of /// time, including the ones in this block. pub(crate) cumulative_effects_comm: Bytes, @@ -100,7 +100,7 @@ pub fn deduce_new_observation( let observation = agg.into_observation()?; tracing::info!( - height = observation.parent_height, + height = observation.parent_subnet_height, "new observation derived" ); @@ -120,6 +120,10 @@ impl CertifiedObservation { &self.observation } + pub fn observation_signature(&self) -> &RecoverableECDSASignature { + &self.observation_signature + } + pub fn ensure_valid(&self) -> anyhow::Result { let to_sign = fvm_ipld_encoding::to_vec(&self.observation)?; let (pk1, _) = self.observation_signature.recover(&to_sign)?; @@ -163,8 +167,8 @@ impl CertifiedObservation { impl Observation { pub fn new(parent_height: BlockHeight, parent_hash: Bytes, commitment: Bytes) -> Self { Self { - parent_height, - parent_hash, + parent_subnet_height: parent_height, + parent_subnet_hash: parent_hash, cumulative_effects_comm: commitment, } } @@ -175,8 +179,8 @@ impl Display for Observation { write!( f, "Observation(parent_height={}, parent_hash={}, commitment={})", - self.parent_height, - hex::encode(&self.parent_hash), + self.parent_subnet_height, + hex::encode(&self.parent_subnet_hash), hex::encode(&self.cumulative_effects_comm), ) } @@ -184,7 +188,7 @@ impl Display for Observation { impl Observation { pub fn parent_height(&self) -> BlockHeight { - self.parent_height + self.parent_subnet_height } } @@ -195,7 +199,7 @@ impl ObservationConfig { } } -struct LinearizedParentBlockView { +pub(crate) struct LinearizedParentBlockView { parent_height: u64, parent_hash: Option, cumulative_effects_comm: Bytes, @@ -211,13 +215,23 @@ impl From<&Checkpoint> for LinearizedParentBlockView { } } +impl From<&Observation> for LinearizedParentBlockView { + fn from(value: &Observation) -> Self { + LinearizedParentBlockView { + parent_height: value.parent_subnet_height, + parent_hash: Some(value.parent_subnet_hash.clone()), + cumulative_effects_comm: value.cumulative_effects_comm.clone(), + } + } +} + impl LinearizedParentBlockView { fn new_commitment(&mut self, to_append: Bytes) { let bytes = [ self.cumulative_effects_comm.as_slice(), to_append.as_slice(), ] - .concat(); + .concat(); let cid = Cid::new_v1(DAG_CBOR, Code::Blake2b256.digest(&bytes)); self.cumulative_effects_comm = cid.to_bytes(); } @@ -238,7 +252,7 @@ impl LinearizedParentBlockView { Ok(()) } - fn into_observation(self) -> Result { + pub fn into_observation(self) -> Result { let Some(hash) = self.parent_hash else { return Err(Error::CannotCommitObservationAtNullBlock( self.parent_height, @@ -250,4 +264,4 @@ impl LinearizedParentBlockView { self.cumulative_effects_comm, )) } -} +} \ No newline at end of file diff --git a/fendermint/vm/topdown/src/syncer/mod.rs b/fendermint/vm/topdown/src/syncer/mod.rs index 691307367..3def2e614 100644 --- a/fendermint/vm/topdown/src/syncer/mod.rs +++ b/fendermint/vm/topdown/src/syncer/mod.rs @@ -1,20 +1,26 @@ // Copyright 2022-2024 Protocol Labs // SPDX-License-Identifier: Apache-2.0, MIT -use crate::observation::{Observation, ObservationConfig}; -use crate::proxy::ParentQueryProxy; -use crate::syncer::poll::ParentPoll; +use crate::observation::{LinearizedParentBlockView, Observation, ObservationConfig}; use crate::syncer::store::ParentViewStore; use crate::{BlockHeight, Checkpoint}; +use anyhow::anyhow; +use async_trait::async_trait; +use ipc_api::cross::IpcEnvelope; +use ipc_api::staking::StakingChangeRequest; +use serde::Deserialize; +use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::select; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; pub mod error; pub mod payload; pub mod poll; pub mod store; +pub type QuorumCertContent = (Observation, Vec, Vec); + #[derive(Clone, Debug)] pub enum TopDownSyncEvent { /// The fendermint node is syncing with peers @@ -22,6 +28,7 @@ pub enum TopDownSyncEvent { NewProposal(Box), } +#[derive(Debug, Clone, Deserialize)] pub struct ParentSyncerConfig { pub request_channel_size: usize, /// The event broadcast channel buffer size @@ -31,12 +38,10 @@ pub struct ParentSyncerConfig { /// conservative and avoid other from rejecting the proposal because they don't see the /// height as final yet. pub chain_head_delay: BlockHeight, - /// Parent syncing cron period, in seconds - pub polling_interval: Duration, - /// Top down exponential back off retry base - pub exponential_back_off: Duration, - /// The max number of retries for exponential backoff before giving up - pub exponential_retry_limit: usize, + /// Parent syncing cron period, in millis + pub polling_interval_millis: Duration, + /// Max number of requests to process in the reactor loop + pub max_requests_per_loop: usize, /// Max number of un-finalized parent blocks that should be stored in the store pub max_store_blocks: BlockHeight, /// Attempts to sync as many block as possible till the finalized chain head @@ -46,26 +51,37 @@ pub struct ParentSyncerConfig { } #[derive(Clone)] -pub struct ParentSyncerReactorClient { +pub struct ParentSyncerReactorClient { tx: mpsc::Sender, + checkpoint: Arc>, + store: S, } -pub fn start_parent_syncer( - config: ParentSyncerConfig, - proxy: P, - store: S, - last_finalized: Checkpoint, -) -> anyhow::Result -where - S: ParentViewStore + Send + Sync + 'static, - P: Send + Sync + 'static + ParentQueryProxy, -{ - let (tx, mut rx) = mpsc::channel(config.request_channel_size); +impl ParentSyncerReactorClient { + pub fn new( + request_channel_size: usize, + store: S, + ) -> (Self, mpsc::Receiver) { + let (tx, rx) = mpsc::channel(request_channel_size); + let checkpoint = Arc::new(Mutex::new(Checkpoint::v1(0, vec![], vec![]))); + ( + Self { + tx, + checkpoint, + store, + }, + rx, + ) + } +} +pub fn start_polling_reactor( + mut rx: mpsc::Receiver, + mut poller: P, + config: ParentSyncerConfig, +) { + let polling_interval = config.polling_interval_millis; tokio::spawn(async move { - let polling_interval = config.polling_interval; - let mut poller = ParentPoll::new(config, proxy, store, last_finalized); - loop { select! { _ = tokio::time::sleep(polling_interval) => { @@ -75,41 +91,83 @@ where } req = rx.recv() => { let Some(req) = req else { break }; - handle_request(req, &mut poller); + match req { + ParentSyncerRequest::Finalized(cp) => { + if let Err(e) = poller.finalize(cp) { + tracing::error!(err = e.to_string(), "cannot finalize syncer") + } + }, + } } } } - - tracing::warn!("parent syncer stopped") }); - Ok(ParentSyncerReactorClient { tx }) } -impl ParentSyncerReactorClient { +/// Polls the parent block view +#[async_trait] +pub trait ParentPoller { + type Store: ParentViewStore + Send + Sync + 'static + Clone; + + fn subscribe(&self) -> broadcast::Receiver; + + fn store(&self) -> Self::Store; + + /// The target block height is finalized, purge all the parent view before the target height + fn finalize(&mut self, checkpoint: Checkpoint) -> anyhow::Result<()>; + + /// Try to poll the next parent height + async fn try_poll(&mut self) -> anyhow::Result<()>; +} + +impl ParentSyncerReactorClient { + fn set_checkpoint(&self, cp: Checkpoint) { + let mut checkpoint = self.checkpoint.lock().unwrap(); + *checkpoint = cp.clone(); + } /// Marks the height as finalized. /// There is no need to wait for ack from the reactor pub async fn finalize_parent_height(&self, cp: Checkpoint) -> anyhow::Result<()> { + self.set_checkpoint(cp.clone()); self.tx.send(ParentSyncerRequest::Finalized(cp)).await?; Ok(()) } -} -enum ParentSyncerRequest { - /// A new parent height is finalized - Finalized(Checkpoint), -} + pub fn prepare_quorum_cert_content( + &self, + end_height: BlockHeight, + ) -> anyhow::Result { + let latest_checkpoint = self.checkpoint.lock().unwrap().clone(); + + let mut xnet_msgs = vec![]; + let mut validator_changes = vec![]; + let mut linear = LinearizedParentBlockView::from(&latest_checkpoint); + + let start = latest_checkpoint.target_height() + 1; + for h in start..=end_height { + let Some(v) = self.store.get(h)? else { + return Err(anyhow!("parent block view store does not have data at {h}")); + }; -fn handle_request(req: ParentSyncerRequest, poller: &mut ParentPoll) -where - S: ParentViewStore + Send + Sync + 'static, - P: Send + Sync + 'static + ParentQueryProxy, -{ - match req { - ParentSyncerRequest::Finalized(c) => { - let height = c.target_height(); - if let Err(e) = poller.finalize(c) { - tracing::error!(height, err = e.to_string(), "cannot finalize parent viewer"); + if let Err(e) = linear.append(v.clone()) { + return Err(anyhow!("parent block view cannot be appended: {e}")); + } + + if let Some(payload) = v.payload { + xnet_msgs.extend(payload.xnet_msgs); + validator_changes.extend(payload.validator_changes); } } + + let ob = linear + .into_observation() + .map_err(|e| anyhow!("cannot convert linearized parent view into observation: {e}"))?; + + Ok((ob, xnet_msgs, validator_changes)) } } + +pub enum ParentSyncerRequest { + /// A new parent height is finalized + Finalized(Checkpoint), +} \ No newline at end of file diff --git a/fendermint/vm/topdown/src/syncer/poll.rs b/fendermint/vm/topdown/src/syncer/poll.rs index 125491fae..6bce1ae60 100644 --- a/fendermint/vm/topdown/src/syncer/poll.rs +++ b/fendermint/vm/topdown/src/syncer/poll.rs @@ -7,16 +7,18 @@ use crate::proxy::ParentQueryProxy; use crate::syncer::error::Error; use crate::syncer::payload::ParentBlockView; use crate::syncer::store::ParentViewStore; -use crate::syncer::{ParentSyncerConfig, TopDownSyncEvent}; +use crate::syncer::{ParentPoller, ParentSyncerConfig, TopDownSyncEvent}; use crate::{is_null_round_str, BlockHash, BlockHeight, Checkpoint}; use anyhow::anyhow; +use async_trait::async_trait; use ipc_observability::emit; use ipc_observability::serde::HexEncodableBlockHash; use libp2p::futures::TryFutureExt; use tokio::sync::broadcast; +use tokio::sync::broadcast::Receiver; use tracing::instrument; -pub(crate) struct ParentPoll { +pub struct ParentPoll { config: ParentSyncerConfig, parent_proxy: P, store: S, @@ -24,71 +26,44 @@ pub(crate) struct ParentPoll { last_finalized: Checkpoint, } -impl ParentPoll -where - S: ParentViewStore + Send + Sync + 'static, - P: Send + Sync + 'static + ParentQueryProxy, +#[async_trait] +impl ParentPoller for ParentPoll + where + S: ParentViewStore + Send + Sync + 'static + Clone, + P: Send + Sync + 'static + ParentQueryProxy, { - pub fn new(config: ParentSyncerConfig, proxy: P, store: S, last_finalized: Checkpoint) -> Self { - let (tx, _) = broadcast::channel(config.broadcast_channel_size); - Self { - config, - parent_proxy: proxy, - store, - event_broadcast: tx, - last_finalized, - } + type Store = S; + + fn subscribe(&self) -> Receiver { + self.event_broadcast.subscribe() + } + + fn store(&self) -> Self::Store { + self.store.clone() } /// The target block height is finalized, purge all the parent view before the target height - pub fn finalize(&mut self, checkpoint: Checkpoint) -> Result<(), Error> { + fn finalize(&mut self, checkpoint: Checkpoint) -> anyhow::Result<()> { let Some(min_height) = self.store.min_parent_view_height()? else { return Ok(()); }; for h in min_height..=checkpoint.target_height() { self.store.purge(h)?; } - Ok(()) - } - /// Get the latest non null block data stored - async fn latest_nonnull_data(&self) -> anyhow::Result<(BlockHeight, BlockHash)> { - let Some(latest_height) = self.store.max_parent_view_height()? else { - return Ok(( - self.last_finalized.target_height(), - self.last_finalized.target_hash().clone(), - )); - }; + self.last_finalized = checkpoint; - let start = self.last_finalized.target_height() + 1; - for h in (start..=latest_height).rev() { - let Some(p) = self.store.get(h)? else { - continue; - }; - - // if parent hash of the proposal is null, it means the - let Some(p) = p.payload else { - continue; - }; - - return Ok((h, p.parent_hash)); - } - - // this means the votes stored are all null blocks, return last committed finality - Ok(( - self.last_finalized.target_height(), - self.last_finalized.target_hash().clone(), - )) + Ok(()) } /// Insert the height into cache when we see a new non null block - pub async fn try_poll(&mut self) -> anyhow::Result<()> { + async fn try_poll(&mut self) -> anyhow::Result<()> { let Some(chain_head) = self.finalized_chain_head().await? else { return Ok(()); }; let (mut latest_height_fetched, mut first_non_null_parent_hash) = - self.latest_nonnull_data().await?; + self.latest_nonnull_data()?; tracing::debug!(chain_head, latest_height_fetched, "syncing heights"); if latest_height_fetched > chain_head { @@ -140,6 +115,53 @@ where Ok(()) } +} + +impl ParentPoll + where + S: ParentViewStore + Send + Sync + 'static, + P: Send + Sync + 'static + ParentQueryProxy, +{ + pub fn new(config: ParentSyncerConfig, proxy: P, store: S, last_finalized: Checkpoint) -> Self { + let (tx, _) = broadcast::channel(config.broadcast_channel_size); + Self { + config, + parent_proxy: proxy, + store, + event_broadcast: tx, + last_finalized, + } + } + + /// Get the latest non null block data stored + fn latest_nonnull_data(&self) -> anyhow::Result<(BlockHeight, BlockHash)> { + let Some(latest_height) = self.store.max_parent_view_height()? else { + return Ok(( + self.last_finalized.target_height(), + self.last_finalized.target_hash().clone(), + )); + }; + + let start = self.last_finalized.target_height() + 1; + for h in (start..=latest_height).rev() { + let Some(p) = self.store.get(h)? else { + continue; + }; + + // if parent hash of the proposal is null, it means the + let Some(p) = p.payload else { + continue; + }; + + return Ok((h, p.parent_hash)); + } + + // this means the votes stored are all null blocks, return last committed finality + Ok(( + self.last_finalized.target_height(), + self.last_finalized.target_hash().clone(), + )) + } fn store_full(&self) -> anyhow::Result { let Some(h) = self.store.max_parent_view_height()? else { @@ -251,8 +273,8 @@ async fn fetch_data

( height: BlockHeight, block_hash: BlockHash, ) -> Result -where - P: ParentQueryProxy + Send + Sync + 'static, + where + P: ParentQueryProxy + Send + Sync + 'static, { let changes_res = parent_proxy .get_validator_changes(height) @@ -291,4 +313,4 @@ where topdown_msgs_res.value, changes_res.value, )) -} +} \ No newline at end of file diff --git a/fendermint/vm/topdown/src/syncer/store.rs b/fendermint/vm/topdown/src/syncer/store.rs index f38b4dccb..21015b89a 100644 --- a/fendermint/vm/topdown/src/syncer/store.rs +++ b/fendermint/vm/topdown/src/syncer/store.rs @@ -4,48 +4,69 @@ use crate::syncer::error::Error; use crate::syncer::payload::ParentBlockView; use crate::{BlockHeight, SequentialKeyCache}; +use std::sync::{Arc, RwLock}; /// Stores the parent view observed of the current node pub trait ParentViewStore { /// Store a newly observed parent view - fn store(&mut self, view: ParentBlockView) -> Result<(), Error>; + fn store(&self, view: ParentBlockView) -> Result<(), Error>; /// Get the parent view at the specified height fn get(&self, height: BlockHeight) -> Result, Error>; /// Purge the parent view at the target height - fn purge(&mut self, height: BlockHeight) -> Result<(), Error>; + fn purge(&self, height: BlockHeight) -> Result<(), Error>; fn min_parent_view_height(&self) -> Result, Error>; fn max_parent_view_height(&self) -> Result, Error>; } +#[derive(Clone)] pub struct InMemoryParentViewStore { - inner: SequentialKeyCache, + inner: Arc>>, +} + +impl Default for InMemoryParentViewStore { + fn default() -> Self { + Self::new() + } +} + +impl InMemoryParentViewStore { + pub fn new() -> Self { + Self { + inner: Arc::new(RwLock::new(SequentialKeyCache::sequential())), + } + } } impl ParentViewStore for InMemoryParentViewStore { - fn store(&mut self, view: ParentBlockView) -> Result<(), Error> { - self.inner + fn store(&self, view: ParentBlockView) -> Result<(), Error> { + let mut inner = self.inner.write().unwrap(); + inner .append(view.parent_height, view) .map_err(|_| Error::NonSequentialParentViewInsert) } fn get(&self, height: BlockHeight) -> Result, Error> { - Ok(self.inner.get_value(height).cloned()) + let inner = self.inner.read().unwrap(); + Ok(inner.get_value(height).cloned()) } - fn purge(&mut self, height: BlockHeight) -> Result<(), Error> { - self.inner.remove_key_below(height + 1); + fn purge(&self, height: BlockHeight) -> Result<(), Error> { + let mut inner = self.inner.write().unwrap(); + inner.remove_key_below(height + 1); Ok(()) } fn min_parent_view_height(&self) -> Result, Error> { - Ok(self.inner.lower_bound()) + let inner = self.inner.read().unwrap(); + Ok(inner.lower_bound()) } fn max_parent_view_height(&self) -> Result, Error> { - Ok(self.inner.upper_bound()) + let inner = self.inner.read().unwrap(); + Ok(inner.upper_bound()) } -} +} \ No newline at end of file