Skip to content

Commit

Permalink
update syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptoAtwill committed Nov 28, 2024
1 parent ab177b2 commit 7434eba
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 118 deletions.
38 changes: 26 additions & 12 deletions fendermint/vm/topdown/src/observation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ pub struct ObservationConfig {
/// The content that validators gossip among each other.
#[derive(Serialize, Deserialize, Hash, Debug, Clone, Eq, PartialEq, Arbitrary)]
pub struct Observation {
pub(crate) parent_height: u64,
pub(crate) parent_subnet_height: u64,
/// The hash of the chain unit at that height. Usually a block hash, but could
/// be another entity (e.g. tipset CID), depending on the parent chain
/// and our interface to it. For example, if the parent is a Filecoin network,
/// this would be a tipset CID coerced into a block hash if queried through
/// the Eth API, or the tipset CID as-is if accessed through the Filecoin API.
pub(crate) parent_hash: Bytes,
pub(crate) parent_subnet_hash: Bytes,
/// A rolling/cumulative commitment to topdown effects since the beginning of
/// time, including the ones in this block.
pub(crate) cumulative_effects_comm: Bytes,
Expand Down Expand Up @@ -100,7 +100,7 @@ pub fn deduce_new_observation<S: ParentViewStore>(

let observation = agg.into_observation()?;
tracing::info!(
height = observation.parent_height,
height = observation.parent_subnet_height,
"new observation derived"
);

Expand All @@ -120,6 +120,10 @@ impl CertifiedObservation {
&self.observation
}

pub fn observation_signature(&self) -> &RecoverableECDSASignature {
&self.observation_signature
}

pub fn ensure_valid(&self) -> anyhow::Result<ValidatorKey> {
let to_sign = fvm_ipld_encoding::to_vec(&self.observation)?;
let (pk1, _) = self.observation_signature.recover(&to_sign)?;
Expand Down Expand Up @@ -163,8 +167,8 @@ impl CertifiedObservation {
impl Observation {
pub fn new(parent_height: BlockHeight, parent_hash: Bytes, commitment: Bytes) -> Self {
Self {
parent_height,
parent_hash,
parent_subnet_height: parent_height,
parent_subnet_hash: parent_hash,
cumulative_effects_comm: commitment,
}
}
Expand All @@ -175,16 +179,16 @@ impl Display for Observation {
write!(
f,
"Observation(parent_height={}, parent_hash={}, commitment={})",
self.parent_height,
hex::encode(&self.parent_hash),
self.parent_subnet_height,
hex::encode(&self.parent_subnet_hash),
hex::encode(&self.cumulative_effects_comm),
)
}
}

impl Observation {
pub fn parent_height(&self) -> BlockHeight {
self.parent_height
self.parent_subnet_height
}
}

Expand All @@ -195,7 +199,7 @@ impl ObservationConfig {
}
}

struct LinearizedParentBlockView {
pub(crate) struct LinearizedParentBlockView {
parent_height: u64,
parent_hash: Option<BlockHash>,
cumulative_effects_comm: Bytes,
Expand All @@ -211,13 +215,23 @@ impl From<&Checkpoint> for LinearizedParentBlockView {
}
}

impl From<&Observation> for LinearizedParentBlockView {
fn from(value: &Observation) -> Self {
LinearizedParentBlockView {
parent_height: value.parent_subnet_height,
parent_hash: Some(value.parent_subnet_hash.clone()),
cumulative_effects_comm: value.cumulative_effects_comm.clone(),
}
}
}

impl LinearizedParentBlockView {
fn new_commitment(&mut self, to_append: Bytes) {
let bytes = [
self.cumulative_effects_comm.as_slice(),
to_append.as_slice(),
]
.concat();
.concat();
let cid = Cid::new_v1(DAG_CBOR, Code::Blake2b256.digest(&bytes));
self.cumulative_effects_comm = cid.to_bytes();
}
Expand All @@ -238,7 +252,7 @@ impl LinearizedParentBlockView {
Ok(())
}

fn into_observation(self) -> Result<Observation, Error> {
pub fn into_observation(self) -> Result<Observation, Error> {
let Some(hash) = self.parent_hash else {
return Err(Error::CannotCommitObservationAtNullBlock(
self.parent_height,
Expand All @@ -250,4 +264,4 @@ impl LinearizedParentBlockView {
self.cumulative_effects_comm,
))
}
}
}
148 changes: 103 additions & 45 deletions fendermint/vm/topdown/src/syncer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::observation::{Observation, ObservationConfig};
use crate::proxy::ParentQueryProxy;
use crate::syncer::poll::ParentPoll;
use crate::observation::{LinearizedParentBlockView, Observation, ObservationConfig};
use crate::syncer::store::ParentViewStore;
use crate::{BlockHeight, Checkpoint};
use anyhow::anyhow;
use async_trait::async_trait;
use ipc_api::cross::IpcEnvelope;
use ipc_api::staking::StakingChangeRequest;
use serde::Deserialize;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::{broadcast, mpsc};

pub mod error;
pub mod payload;
pub mod poll;
pub mod store;

pub type QuorumCertContent = (Observation, Vec<IpcEnvelope>, Vec<StakingChangeRequest>);

#[derive(Clone, Debug)]
pub enum TopDownSyncEvent {
/// The fendermint node is syncing with peers
NodeSyncing,
NewProposal(Box<Observation>),
}

#[derive(Debug, Clone, Deserialize)]
pub struct ParentSyncerConfig {
pub request_channel_size: usize,
/// The event broadcast channel buffer size
Expand All @@ -31,12 +38,10 @@ pub struct ParentSyncerConfig {
/// conservative and avoid other from rejecting the proposal because they don't see the
/// height as final yet.
pub chain_head_delay: BlockHeight,
/// Parent syncing cron period, in seconds
pub polling_interval: Duration,
/// Top down exponential back off retry base
pub exponential_back_off: Duration,
/// The max number of retries for exponential backoff before giving up
pub exponential_retry_limit: usize,
/// Parent syncing cron period, in millis
pub polling_interval_millis: Duration,
/// Max number of requests to process in the reactor loop
pub max_requests_per_loop: usize,
/// Max number of un-finalized parent blocks that should be stored in the store
pub max_store_blocks: BlockHeight,
/// Attempts to sync as many block as possible till the finalized chain head
Expand All @@ -46,26 +51,37 @@ pub struct ParentSyncerConfig {
}

#[derive(Clone)]
pub struct ParentSyncerReactorClient {
pub struct ParentSyncerReactorClient<S> {
tx: mpsc::Sender<ParentSyncerRequest>,
checkpoint: Arc<Mutex<Checkpoint>>,
store: S,
}

pub fn start_parent_syncer<P, S>(
config: ParentSyncerConfig,
proxy: P,
store: S,
last_finalized: Checkpoint,
) -> anyhow::Result<ParentSyncerReactorClient>
where
S: ParentViewStore + Send + Sync + 'static,
P: Send + Sync + 'static + ParentQueryProxy,
{
let (tx, mut rx) = mpsc::channel(config.request_channel_size);
impl<S: ParentViewStore + Send + Sync> ParentSyncerReactorClient<S> {
pub fn new(
request_channel_size: usize,
store: S,
) -> (Self, mpsc::Receiver<ParentSyncerRequest>) {
let (tx, rx) = mpsc::channel(request_channel_size);
let checkpoint = Arc::new(Mutex::new(Checkpoint::v1(0, vec![], vec![])));
(
Self {
tx,
checkpoint,
store,
},
rx,
)
}
}

pub fn start_polling_reactor<P: ParentPoller + Send + Sync + 'static>(
mut rx: mpsc::Receiver<ParentSyncerRequest>,
mut poller: P,
config: ParentSyncerConfig,
) {
let polling_interval = config.polling_interval_millis;
tokio::spawn(async move {
let polling_interval = config.polling_interval;
let mut poller = ParentPoll::new(config, proxy, store, last_finalized);

loop {
select! {
_ = tokio::time::sleep(polling_interval) => {
Expand All @@ -75,41 +91,83 @@ where
}
req = rx.recv() => {
let Some(req) = req else { break };
handle_request(req, &mut poller);
match req {
ParentSyncerRequest::Finalized(cp) => {
if let Err(e) = poller.finalize(cp) {
tracing::error!(err = e.to_string(), "cannot finalize syncer")
}
},
}
}
}
}

tracing::warn!("parent syncer stopped")
});
Ok(ParentSyncerReactorClient { tx })
}

impl ParentSyncerReactorClient {
/// Polls the parent block view
#[async_trait]
pub trait ParentPoller {
type Store: ParentViewStore + Send + Sync + 'static + Clone;

fn subscribe(&self) -> broadcast::Receiver<TopDownSyncEvent>;

fn store(&self) -> Self::Store;

/// The target block height is finalized, purge all the parent view before the target height
fn finalize(&mut self, checkpoint: Checkpoint) -> anyhow::Result<()>;

/// Try to poll the next parent height
async fn try_poll(&mut self) -> anyhow::Result<()>;
}

impl<S: ParentViewStore + Send + Sync + 'static> ParentSyncerReactorClient<S> {
fn set_checkpoint(&self, cp: Checkpoint) {
let mut checkpoint = self.checkpoint.lock().unwrap();
*checkpoint = cp.clone();
}
/// Marks the height as finalized.
/// There is no need to wait for ack from the reactor
pub async fn finalize_parent_height(&self, cp: Checkpoint) -> anyhow::Result<()> {
self.set_checkpoint(cp.clone());
self.tx.send(ParentSyncerRequest::Finalized(cp)).await?;
Ok(())
}
}

enum ParentSyncerRequest {
/// A new parent height is finalized
Finalized(Checkpoint),
}
pub fn prepare_quorum_cert_content(
&self,
end_height: BlockHeight,
) -> anyhow::Result<QuorumCertContent> {
let latest_checkpoint = self.checkpoint.lock().unwrap().clone();

let mut xnet_msgs = vec![];
let mut validator_changes = vec![];
let mut linear = LinearizedParentBlockView::from(&latest_checkpoint);

let start = latest_checkpoint.target_height() + 1;
for h in start..=end_height {
let Some(v) = self.store.get(h)? else {
return Err(anyhow!("parent block view store does not have data at {h}"));
};

fn handle_request<P, S>(req: ParentSyncerRequest, poller: &mut ParentPoll<P, S>)
where
S: ParentViewStore + Send + Sync + 'static,
P: Send + Sync + 'static + ParentQueryProxy,
{
match req {
ParentSyncerRequest::Finalized(c) => {
let height = c.target_height();
if let Err(e) = poller.finalize(c) {
tracing::error!(height, err = e.to_string(), "cannot finalize parent viewer");
if let Err(e) = linear.append(v.clone()) {
return Err(anyhow!("parent block view cannot be appended: {e}"));
}

if let Some(payload) = v.payload {
xnet_msgs.extend(payload.xnet_msgs);
validator_changes.extend(payload.validator_changes);
}
}

let ob = linear
.into_observation()
.map_err(|e| anyhow!("cannot convert linearized parent view into observation: {e}"))?;

Ok((ob, xnet_msgs, validator_changes))
}
}

pub enum ParentSyncerRequest {
/// A new parent height is finalized
Finalized(Checkpoint),
}
Loading

0 comments on commit 7434eba

Please sign in to comment.