diff --git a/src/chain_sync/mod.rs b/src/chain_sync/mod.rs index e9a0eb8da661..de79dadaafac 100644 --- a/src/chain_sync/mod.rs +++ b/src/chain_sync/mod.rs @@ -10,6 +10,8 @@ mod sync_state; mod tipset_syncer; mod validation; +pub use validation::TipsetValidator; + pub use self::{ bad_block_cache::BadBlockCache, chain_muxer::{ChainMuxer, SyncConfig}, diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index f7dac3165554..00ce90367056 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -267,7 +267,7 @@ pub(super) async fn start( info!("Using network :: {}", get_actual_chain_name(&network_name)); display_chain_logo(&config.chain); - let (tipset_sink, tipset_stream) = flume::bounded(20); + let (tipset_sender, tipset_receiver) = flume::bounded(20); // if bootstrap peers are not set, set them let config = if config.network.bootstrap_peers.is_empty() { @@ -328,8 +328,8 @@ pub(super) async fn start( network_send.clone(), network_rx, Arc::new(Tipset::from(&genesis_header)), - tipset_sink, - tipset_stream, + tipset_sender.clone(), + tipset_receiver, opts.stateless, )?; let bad_blocks = chain_muxer.bad_blocks_cloned(); @@ -383,6 +383,7 @@ pub(super) async fn start( beacon, chain_store: rpc_chain_store, shutdown: shutdown_send, + tipset_send: tipset_sender, }, rpc_address, ) diff --git a/src/rpc/methods/sync.rs b/src/rpc/methods/sync.rs index 51e4e3a6e98e..e6b27b562ddb 100644 --- a/src/rpc/methods/sync.rs +++ b/src/rpc/methods/sync.rs @@ -1,16 +1,18 @@ // Copyright 2019-2024 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use crate::blocks::GossipBlock; +use crate::blocks::{Block, FullTipset, GossipBlock, Tipset}; use crate::libp2p::{IdentTopic, NetworkMessage, PUBSUB_BLOCK_STR}; use crate::lotus_json::{lotus_json_with_self, LotusJson}; use crate::rpc::{ApiVersion, Ctx, Permission, RpcMethod, ServerError}; +use anyhow::Context as _; use cid::Cid; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::to_vec; use nonempty::{nonempty, NonEmpty}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use std::sync::Arc; // Make sure to add any new methods here. macro_rules! for_each_method { @@ -22,6 +24,8 @@ macro_rules! for_each_method { }; } +use crate::chain; +use crate::chain_sync::TipsetValidator; pub(crate) use for_each_method; pub enum SyncCheckBad {} @@ -96,6 +100,26 @@ impl RpcMethod<1> for SyncSubmitBlock { ) -> Result { let encoded_message = to_vec(&block_msg)?; let pubsub_block_str = format!("{}/{}", PUBSUB_BLOCK_STR, ctx.network_name); + let (bls_messages, secp_messages) = + chain::store::block_messages(&ctx.chain_store.db, &block_msg.header)?; + let block = Block { + header: block_msg.header.clone(), + bls_messages, + secp_messages, + }; + let ts = FullTipset::from(block); + let genesis_ts = Arc::new(Tipset::from(ctx.chain_store.genesis_block_header())); + + TipsetValidator(&ts) + .validate( + ctx.chain_store.clone(), + ctx.bad_blocks.clone(), + genesis_ts, + ctx.state_manager.chain_config().block_delay_secs as u64, + ) + .context("failed to validate the tipset")?; + + ctx.tipset_send.send(Arc::new(ts.into_tipset()))?; ctx.network_send.send(NetworkMessage::PubsubMessage { topic: IdentTopic::new(pubsub_block_str), @@ -146,6 +170,7 @@ mod tests { }])); let (network_send, network_rx) = flume::bounded(5); + let (tipset_send, _) = flume::bounded(5); let mut services = JoinSet::new(); let db = Arc::new(MemoryDB::default()); let chain_config = Arc::new(ChainConfig::default()); @@ -208,6 +233,7 @@ mod tests { chain_store: cs_for_chain.clone(), beacon, shutdown: mpsc::channel(1).0, // dummy for tests + tipset_send, }); (state, network_rx) } diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index f8c41ba91622..2d2504e89f1b 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -121,6 +121,7 @@ use crate::rpc::auth_layer::AuthLayer; use crate::rpc::channel::RpcModule as FilRpcModule; pub use crate::rpc::channel::CANCEL_METHOD_NAME; +use crate::blocks::Tipset; use fvm_ipld_blockstore::Blockstore; use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; @@ -148,6 +149,7 @@ pub struct RPCState { pub sync_state: Arc>, pub network_send: flume::Sender, pub network_name: String, + pub tipset_send: flume::Sender>, pub start_time: chrono::DateTime, pub beacon: Arc, pub shutdown: mpsc::Sender<()>, @@ -310,6 +312,7 @@ mod tests { .get_beacon_schedule(genesis.timestamp), ); let (network_send, _) = flume::bounded(0); + let (tipset_send, _) = flume::bounded(1); let network_name = get_network_name_from_genesis(genesis, &state_manager).unwrap(); let message_pool = MessagePool::new( MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), @@ -332,6 +335,7 @@ mod tests { chain_store, beacon, shutdown: mpsc::channel(1).0, // dummy for tests + tipset_send, } } } diff --git a/src/tool/subcommands/api_cmd.rs b/src/tool/subcommands/api_cmd.rs index c3efa7dd5570..36dadc8e6f9f 100644 --- a/src/tool/subcommands/api_cmd.rs +++ b/src/tool/subcommands/api_cmd.rs @@ -1087,6 +1087,7 @@ async fn start_offline_server( .get_beacon_schedule(chain_store.genesis_block_header().timestamp), ); let (network_send, _) = flume::bounded(5); + let (tipset_send, _) = flume::bounded(5); let network_name = get_network_name_from_genesis(&genesis_header, &state_manager)?; let message_pool = MessagePool::new( MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), @@ -1122,6 +1123,7 @@ async fn start_offline_server( chain_store, beacon, shutdown, + tipset_send, }; rpc_state.sync_state.write().set_stage(SyncStage::Idle); start_offline_rpc(rpc_state, rpc_port, shutdown_recv).await?;