Skip to content

Commit

Permalink
Prestore block to db before insert to orphan block pool
Browse files Browse the repository at this point in the history
Signed-off-by: Eval EXEC <[email protected]>
  • Loading branch information
eval-exec committed Mar 20, 2024
1 parent 9af7fab commit 6b2d4a0
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 96 deletions.
5 changes: 3 additions & 2 deletions chain/src/chain_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{LonelyBlock, ProcessBlockRequest, RemoteBlock, TruncateRequest, Veri
use ckb_channel::Sender;
use ckb_error::{Error, InternalErrorKind};
use ckb_logger::{self, error};
use ckb_store::ChainDB;
use ckb_types::{
core::{service::Request, BlockView},
packed::Byte32,
Expand Down Expand Up @@ -123,8 +124,8 @@ impl ChainController {
}

/// `Relayer::reconstruct_block` need this
pub fn get_orphan_block(&self, hash: &Byte32) -> Option<Arc<BlockView>> {
self.orphan_block_broker.get_block(hash)
pub fn get_orphan_block(&self, store: &ChainDB, hash: &Byte32) -> Option<Arc<BlockView>> {
self.orphan_block_broker.get_block(store, hash)
}

/// `NetRpcImpl::sync_state` rpc need this
Expand Down
28 changes: 24 additions & 4 deletions chain/src/chain_service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! CKB chain service.
#![allow(missing_docs)]

use crate::{LonelyBlock, ProcessBlockRequest};
use crate::{LonelyBlock, LonelyBlockHash, ProcessBlockRequest};
use ckb_channel::{select, Receiver, Sender};
use ckb_error::{Error, InternalErrorKind};
use ckb_logger::{self, debug, error, info, warn};
Expand All @@ -19,15 +19,15 @@ pub(crate) struct ChainService {

process_block_rx: Receiver<ProcessBlockRequest>,

lonely_block_tx: Sender<LonelyBlock>,
lonely_block_tx: Sender<LonelyBlockHash>,
}
impl ChainService {
/// Create a new ChainService instance with shared.
pub(crate) fn new(
shared: Shared,
process_block_rx: Receiver<ProcessBlockRequest>,

lonely_block_tx: Sender<LonelyBlock>,
lonely_block_tx: Sender<LonelyBlockHash>,
) -> ChainService {
ChainService {
shared,
Expand Down Expand Up @@ -133,7 +133,27 @@ impl ChainService {
.set(self.lonely_block_tx.len() as i64)
}

match self.lonely_block_tx.send(lonely_block) {
let db_txn = self.shared.store().begin_transaction();
if let Err(err) = db_txn.insert_block(lonely_block.block()) {
self.shared.remove_block_status(&block_hash);
error!(
"block {}-{} insert error {:?}",
block_number, block_hash, err
);
return;
}
if let Err(err) = db_txn.commit() {
self.shared.remove_block_status(&block_hash);
error!(
"block {}-{} commit error {:?}",
block_number, block_hash, err
);
return;
}

let lonely_block_hash: LonelyBlockHash = lonely_block.into();

match self.lonely_block_tx.send(lonely_block_hash) {
Ok(_) => {
debug!(
"processing block: {}-{}, (tip:unverified_tip):({}:{})",
Expand Down
81 changes: 38 additions & 43 deletions chain/src/consume_orphan.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(missing_docs)]

use crate::utils::orphan_block_pool::OrphanBlockPool;
use crate::{LonelyBlock, LonelyBlockHash};
use crate::LonelyBlockHash;
use ckb_channel::{select, Receiver, Sender};
use ckb_error::Error;
use ckb_logger::internal::trace;
Expand All @@ -10,11 +10,11 @@ use ckb_shared::block_status::BlockStatus;
use ckb_shared::Shared;
use ckb_store::ChainStore;
use ckb_systemtime::unix_time_as_millis;
use ckb_types::core::{BlockExt, BlockView, EpochNumber, EpochNumberWithFraction, HeaderView};
use ckb_types::core::{BlockExt, EpochNumber, HeaderView};
use ckb_types::U256;
use ckb_verification::InvalidParentError;
use std::sync::Arc;
use dashmap::mapref::entry::Entry;
use std::sync::Arc;

pub(crate) struct ConsumeDescendantProcessor {
pub shared: Shared,
Expand All @@ -25,13 +25,13 @@ pub(crate) struct ConsumeDescendantProcessor {
// for an orphan block with unknown parent. But this function is also useful in testing.
pub fn store_unverified_block(
shared: &Shared,
block: Arc<BlockView>,
block: &LonelyBlockHash,
) -> Result<(HeaderView, U256), Error> {
let (block_number, block_hash) = (block.number(), block.hash());

let parent_header = shared
.store()
.get_block_header(&block.data().header().raw().parent_hash())
.get_block_header(&block.parent_hash())
.expect("parent already store");

if let Some(ext) = shared.store().get_block_ext(&block.hash()) {
Expand Down Expand Up @@ -59,7 +59,7 @@ pub fn store_unverified_block(

let parent_ext = shared
.store()
.get_block_ext(&block.data().header().raw().parent_hash())
.get_block_ext(&block.parent_hash())
.expect("parent already store");

if parent_ext.verified == Some(false) {
Expand All @@ -69,39 +69,33 @@ pub fn store_unverified_block(
.into());
}

let cannon_total_difficulty =
parent_ext.total_difficulty.to_owned() + block.header().difficulty();
let cannon_total_difficulty = parent_ext.total_difficulty.to_owned() + block.difficulty();

let db_txn = Arc::new(shared.store().begin_transaction());

db_txn.insert_block(block.as_ref())?;

let next_block_epoch = shared
.consensus()
.next_epoch_ext(&parent_header, &db_txn.borrow_as_data_loader())
.expect("epoch should be stored");
let new_epoch = next_block_epoch.is_head();
let epoch = next_block_epoch.epoch();

db_txn.insert_block_epoch_index(
&block.header().hash(),
&epoch.last_block_hash_in_previous_epoch(),
)?;
db_txn.insert_block_epoch_index(&block.hash(), &epoch.last_block_hash_in_previous_epoch())?;
if new_epoch {
db_txn.insert_epoch_ext(&epoch.last_block_hash_in_previous_epoch(), &epoch)?;
}

let ext = BlockExt {
received_at: unix_time_as_millis(),
total_difficulty: cannon_total_difficulty.clone(),
total_uncles_count: parent_ext.total_uncles_count + block.data().uncles().len() as u64,
total_uncles_count: parent_ext.total_uncles_count + block.uncles_len() as u64,
verified: None,
txs_fees: vec![],
cycles: None,
txs_sizes: None,
};

db_txn.insert_block_ext(&block.header().hash(), &ext)?;
db_txn.insert_block_ext(&block.hash(), &ext)?;

db_txn.commit()?;

Expand Down Expand Up @@ -157,23 +151,21 @@ impl ConsumeDescendantProcessor {
}
}

pub(crate) fn process_descendant(&self, lonely_block: LonelyBlock) -> Result<(), Error> {
return match store_unverified_block(&self.shared, lonely_block.block().to_owned()) {
pub(crate) fn process_descendant(&self, lonely_block: LonelyBlockHash) -> Result<(), Error> {
return match store_unverified_block(&self.shared, &lonely_block) {
Ok((_parent_header, total_difficulty)) => {
self.shared
.insert_block_status(lonely_block.block().hash(), BlockStatus::BLOCK_STORED);
.insert_block_status(lonely_block.hash(), BlockStatus::BLOCK_STORED);

let lonely_block_hash: LonelyBlockHash = lonely_block.into();

self.send_unverified_block(lonely_block_hash, total_difficulty);
self.send_unverified_block(lonely_block, total_difficulty);
Ok(())
}

Err(err) => {
if let Some(_invalid_parent_err) = err.downcast_ref::<InvalidParentError>() {
self.shared
.block_status_map()
.insert(lonely_block.block().hash(), BlockStatus::BLOCK_INVALID);
.insert(lonely_block.hash(), BlockStatus::BLOCK_INVALID);
}

lonely_block.execute_callback(Err(err.clone()));
Expand All @@ -182,18 +174,18 @@ impl ConsumeDescendantProcessor {
};
}

fn accept_descendants(&self, descendants: Vec<LonelyBlock>) {
fn accept_descendants(&self, descendants: Vec<LonelyBlockHash>) {
let mut has_parent_invalid_error = false;
for descendant_block in descendants {
let block_number = descendant_block.block().number();
let block_hash = descendant_block.block().hash();
let block_number = descendant_block.block_number_and_hash.number();
let block_hash = descendant_block.block_number_and_hash.hash();

if has_parent_invalid_error {
self.shared
.block_status_map()
.insert(block_hash.clone(), BlockStatus::BLOCK_INVALID);
let err = Err(InvalidParentError {
parent_hash: descendant_block.block().parent_hash(),
parent_hash: descendant_block.parent_hash(),
}
.into());

Expand Down Expand Up @@ -228,7 +220,7 @@ pub(crate) struct ConsumeOrphan {
descendant_processor: ConsumeDescendantProcessor,

orphan_blocks_broker: Arc<OrphanBlockPool>,
lonely_blocks_rx: Receiver<LonelyBlock>,
lonely_blocks_rx: Receiver<LonelyBlockHash>,

stop_rx: Receiver<()>,
}
Expand All @@ -238,7 +230,7 @@ impl ConsumeOrphan {
shared: Shared,
orphan_block_pool: Arc<OrphanBlockPool>,
unverified_blocks_tx: Sender<LonelyBlockHash>,
lonely_blocks_rx: Receiver<LonelyBlock>,
lonely_blocks_rx: Receiver<LonelyBlockHash>,
stop_rx: Receiver<()>,
) -> ConsumeOrphan {
ConsumeOrphan {
Expand All @@ -259,17 +251,17 @@ impl ConsumeOrphan {
select! {
recv(self.lonely_blocks_rx) -> msg => match msg {
Ok(lonely_block) => {
let lonely_block_epoch: EpochNumberWithFraction = lonely_block.block().epoch();
let lonely_block_epoch_number: EpochNumber = lonely_block.epoch_number();

let _trace_now = minstant::Instant::now();
self.process_lonely_block(lonely_block);
if let Some(handle) = ckb_metrics::handle() {
handle.ckb_chain_process_lonely_block_duration.observe(_trace_now.elapsed().as_secs_f64())
}

if lonely_block_epoch.number() > last_check_expired_orphans_epoch {
if lonely_block_epoch_number > last_check_expired_orphans_epoch {
self.clean_expired_orphan_blocks();
last_check_expired_orphans_epoch = lonely_block_epoch.number();
last_check_expired_orphans_epoch = lonely_block_epoch_number;
}
},
Err(err) => {
Expand Down Expand Up @@ -311,7 +303,7 @@ impl ConsumeOrphan {
continue;
}

let descendants: Vec<LonelyBlock> = self
let descendants: Vec<LonelyBlockHash> = self
.orphan_blocks_broker
.remove_blocks_by_parent(&leader_hash);
if descendants.is_empty() {
Expand All @@ -325,11 +317,11 @@ impl ConsumeOrphan {
}
}

fn process_lonely_block(&self, lonely_block: LonelyBlock) {
let parent_hash = lonely_block.block().parent_hash();
let block_hash = lonely_block.block().hash();
let block_number = lonely_block.block().number();
fn process_lonely_block(&self, lonely_block_hash: LonelyBlockHash) {
let parent_hash = lonely_block_hash.parent_hash();
let block_hash = lonely_block_hash.hash();
let block_number = lonely_block_hash.number();

{
// Is this block still verifying by ConsumeUnverified?
// If yes, skip it.
Expand All @@ -338,7 +330,7 @@ impl ConsumeOrphan {
if entry.get().eq(&BlockStatus::BLOCK_STORED) {
debug!(
"in process_lonely_block, {} is BLOCK_STORED in block_status_map, it is still verifying by ConsumeUnverified thread",
block_hash,
block_hash,
);
return;
}
Expand All @@ -354,7 +346,10 @@ impl ConsumeOrphan {
parent_hash, parent_status, block_number, block_hash,
);

if let Err(err) = self.descendant_processor.process_descendant(lonely_block) {
if let Err(err) = self
.descendant_processor
.process_descendant(lonely_block_hash)
{
error!(
"process descendant {}-{}, failed {:?}",
block_number, block_hash, err
Expand All @@ -368,14 +363,14 @@ impl ConsumeOrphan {
);
self.shared
.block_status_map()
.insert(lonely_block.block().hash(), BlockStatus::BLOCK_INVALID);
.insert(block_hash, BlockStatus::BLOCK_INVALID);
let err = Err(InvalidParentError {
parent_hash: parent_hash.clone(),
}
.into());
lonely_block.execute_callback(err);
lonely_block_hash.execute_callback(err);
} else {
self.orphan_blocks_broker.insert(lonely_block);
self.orphan_blocks_broker.insert(lonely_block_hash);
}
self.search_orphan_pool();

Expand Down
4 changes: 4 additions & 0 deletions chain/src/consume_unverified.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ impl ConsumeUnverifiedBlockProcessor {
pub(crate) fn consume_unverified_blocks(&mut self, lonely_block_hash: LonelyBlockHash) {
let LonelyBlockHash {
block_number_and_hash,
parent_hash: _,
difficulty: _,
uncles_len: _,
epoch_number: _,
switch,
verify_callback,
} = lonely_block_hash;
Expand Down
4 changes: 2 additions & 2 deletions chain/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::chain_service::ChainService;
use crate::consume_unverified::ConsumeUnverifiedBlocks;
use crate::init_load_unverified::InitLoadUnverified;
use crate::utils::orphan_block_pool::OrphanBlockPool;
use crate::{ChainController, LonelyBlock, LonelyBlockHash};
use crate::{ChainController, LonelyBlockHash};
use ckb_channel::{self as channel, SendError};
use ckb_constant::sync::BLOCK_DOWNLOAD_WINDOW;
use ckb_logger::warn;
Expand Down Expand Up @@ -45,7 +45,7 @@ pub fn start_chain_services(builder: ChainServicesBuilder) -> ChainController {
.expect("start unverified_queue consumer thread should ok");

let (lonely_block_tx, lonely_block_rx) =
channel::bounded::<LonelyBlock>(BLOCK_DOWNLOAD_WINDOW as usize);
channel::bounded::<LonelyBlockHash>(BLOCK_DOWNLOAD_WINDOW as usize);

let (search_orphan_pool_stop_tx, search_orphan_pool_stop_rx) = ckb_channel::bounded::<()>(1);

Expand Down
Loading

0 comments on commit 6b2d4a0

Please sign in to comment.