Skip to content

Commit

Permalink
[Fix] SyncSubmitBlock - queue block for syncing. (#4278)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruseinov authored Apr 30, 2024
1 parent 36d094f commit 4095dde
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/chain_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ mod sync_state;
mod tipset_syncer;
mod validation;

pub use validation::TipsetValidator;

pub use self::{
bad_block_cache::BadBlockCache,
chain_muxer::{ChainMuxer, SyncConfig},
Expand Down
7 changes: 4 additions & 3 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ pub(super) async fn start(

info!("Using network :: {}", get_actual_chain_name(&network_name));
display_chain_logo(&config.chain);
let (tipset_sink, tipset_stream) = flume::bounded(20);
let (tipset_sender, tipset_receiver) = flume::bounded(20);

// if bootstrap peers are not set, set them
let config = if config.network.bootstrap_peers.is_empty() {
Expand Down Expand Up @@ -328,8 +328,8 @@ pub(super) async fn start(
network_send.clone(),
network_rx,
Arc::new(Tipset::from(&genesis_header)),
tipset_sink,
tipset_stream,
tipset_sender.clone(),
tipset_receiver,
opts.stateless,
)?;
let bad_blocks = chain_muxer.bad_blocks_cloned();
Expand Down Expand Up @@ -383,6 +383,7 @@ pub(super) async fn start(
beacon,
chain_store: rpc_chain_store,
shutdown: shutdown_send,
tipset_send: tipset_sender,
},
rpc_address,
)
Expand Down
28 changes: 27 additions & 1 deletion src/rpc/methods/sync.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::blocks::GossipBlock;
use crate::blocks::{Block, FullTipset, GossipBlock, Tipset};
use crate::libp2p::{IdentTopic, NetworkMessage, PUBSUB_BLOCK_STR};
use crate::lotus_json::{lotus_json_with_self, LotusJson};
use crate::rpc::{ApiVersion, Ctx, Permission, RpcMethod, ServerError};
use anyhow::Context as _;
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::to_vec;
use nonempty::{nonempty, NonEmpty};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

// Make sure to add any new methods here.
macro_rules! for_each_method {
Expand All @@ -22,6 +24,8 @@ macro_rules! for_each_method {
};
}

use crate::chain;
use crate::chain_sync::TipsetValidator;
pub(crate) use for_each_method;

pub enum SyncCheckBad {}
Expand Down Expand Up @@ -96,6 +100,26 @@ impl RpcMethod<1> for SyncSubmitBlock {
) -> Result<Self::Ok, ServerError> {
let encoded_message = to_vec(&block_msg)?;
let pubsub_block_str = format!("{}/{}", PUBSUB_BLOCK_STR, ctx.network_name);
let (bls_messages, secp_messages) =
chain::store::block_messages(&ctx.chain_store.db, &block_msg.header)?;
let block = Block {
header: block_msg.header.clone(),
bls_messages,
secp_messages,
};
let ts = FullTipset::from(block);
let genesis_ts = Arc::new(Tipset::from(ctx.chain_store.genesis_block_header()));

TipsetValidator(&ts)
.validate(
ctx.chain_store.clone(),
ctx.bad_blocks.clone(),
genesis_ts,
ctx.state_manager.chain_config().block_delay_secs as u64,
)
.context("failed to validate the tipset")?;

ctx.tipset_send.send(Arc::new(ts.into_tipset()))?;

ctx.network_send.send(NetworkMessage::PubsubMessage {
topic: IdentTopic::new(pubsub_block_str),
Expand Down Expand Up @@ -146,6 +170,7 @@ mod tests {
}]));

let (network_send, network_rx) = flume::bounded(5);
let (tipset_send, _) = flume::bounded(5);
let mut services = JoinSet::new();
let db = Arc::new(MemoryDB::default());
let chain_config = Arc::new(ChainConfig::default());
Expand Down Expand Up @@ -208,6 +233,7 @@ mod tests {
chain_store: cs_for_chain.clone(),
beacon,
shutdown: mpsc::channel(1).0, // dummy for tests
tipset_send,
});
(state, network_rx)
}
Expand Down
4 changes: 4 additions & 0 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ use crate::rpc::auth_layer::AuthLayer;
use crate::rpc::channel::RpcModule as FilRpcModule;
pub use crate::rpc::channel::CANCEL_METHOD_NAME;

use crate::blocks::Tipset;
use fvm_ipld_blockstore::Blockstore;
use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
Expand Down Expand Up @@ -148,6 +149,7 @@ pub struct RPCState<DB> {
pub sync_state: Arc<parking_lot::RwLock<crate::chain_sync::SyncState>>,
pub network_send: flume::Sender<crate::libp2p::NetworkMessage>,
pub network_name: String,
pub tipset_send: flume::Sender<Arc<Tipset>>,
pub start_time: chrono::DateTime<chrono::Utc>,
pub beacon: Arc<crate::beacon::BeaconSchedule>,
pub shutdown: mpsc::Sender<()>,
Expand Down Expand Up @@ -310,6 +312,7 @@ mod tests {
.get_beacon_schedule(genesis.timestamp),
);
let (network_send, _) = flume::bounded(0);
let (tipset_send, _) = flume::bounded(1);
let network_name = get_network_name_from_genesis(genesis, &state_manager).unwrap();
let message_pool = MessagePool::new(
MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()),
Expand All @@ -332,6 +335,7 @@ mod tests {
chain_store,
beacon,
shutdown: mpsc::channel(1).0, // dummy for tests
tipset_send,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/tool/subcommands/api_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,7 @@ async fn start_offline_server(
.get_beacon_schedule(chain_store.genesis_block_header().timestamp),
);
let (network_send, _) = flume::bounded(5);
let (tipset_send, _) = flume::bounded(5);
let network_name = get_network_name_from_genesis(&genesis_header, &state_manager)?;
let message_pool = MessagePool::new(
MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()),
Expand Down Expand Up @@ -1122,6 +1123,7 @@ async fn start_offline_server(
chain_store,
beacon,
shutdown,
tipset_send,
};
rpc_state.sync_state.write().set_stage(SyncStage::Idle);
start_offline_rpc(rpc_state, rpc_port, shutdown_recv).await?;
Expand Down

0 comments on commit 4095dde

Please sign in to comment.