Skip to content

Commit

Permalink
unlimited orphan
Browse files Browse the repository at this point in the history
Signed-off-by: Eval EXEC <[email protected]>
  • Loading branch information
eval-exec committed Feb 9, 2024
1 parent bf6d481 commit 1114553
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 132 deletions.
15 changes: 13 additions & 2 deletions chain/src/chain_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![allow(missing_docs)]

use crate::consume_orphan::ConsumeOrphan;
use crate::{LonelyBlock, ProcessBlockRequest};
use crate::{LonelyBlock, LonelyBlockHash, ProcessBlockRequest};
use ckb_channel::{select, Receiver};
use ckb_error::{Error, InternalErrorKind};
use ckb_logger::{self, debug, error, info, warn};
Expand Down Expand Up @@ -126,7 +126,18 @@ impl ChainService {
return;
}
}
self.consume_orphan.process_lonely_block(lonely_block);
let db_txn = self.shared.store().begin_transaction();
if let Err(err) = db_txn.insert_block(&lonely_block.block()) {
error!("insert block failed: {:?}", err);
return;
}
if let Err(err) = db_txn.commit() {
error!("commit block failed: {:?}", err);
return;
}

let lonely_block_hash: LonelyBlockHash = lonely_block.into();
self.consume_orphan.process_lonely_block(lonely_block_hash);

debug!(
"processing block: {}-{}, (tip:unverified_tip):({}:{})",
Expand Down
145 changes: 37 additions & 108 deletions chain/src/consume_orphan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,77 +15,6 @@ use ckb_types::U256;
use ckb_verification::InvalidParentError;
use std::sync::Arc;

// Store the an unverified block to the database. We may usually do this
// for an orphan block with unknown parent. But this function is also useful in testing.
pub fn store_unverified_block(
shared: &Shared,
block: Arc<BlockView>,
) -> Result<(HeaderView, U256), Error> {
let (block_number, block_hash) = (block.number(), block.hash());

let parent_header = shared
.store()
.get_block_header(&block.data().header().raw().parent_hash())
.expect("parent already store");

if let Some(ext) = shared.store().get_block_ext(&block.hash()) {
debug!("block {}-{} has stored BlockExt", block_number, block_hash);
return Ok((parent_header, ext.total_difficulty));
}

trace!("begin accept block: {}-{}", block.number(), block.hash());

let parent_ext = shared
.store()
.get_block_ext(&block.data().header().raw().parent_hash())
.expect("parent already store");

if parent_ext.verified == Some(false) {
return Err(InvalidParentError {
parent_hash: parent_header.hash(),
}
.into());
}

let cannon_total_difficulty =
parent_ext.total_difficulty.to_owned() + block.header().difficulty();

let db_txn = Arc::new(shared.store().begin_transaction());

db_txn.insert_block(block.as_ref())?;

let next_block_epoch = shared
.consensus()
.next_epoch_ext(&parent_header, &db_txn.borrow_as_data_loader())
.expect("epoch should be stored");
let new_epoch = next_block_epoch.is_head();
let epoch = next_block_epoch.epoch();

db_txn.insert_block_epoch_index(
&block.header().hash(),
&epoch.last_block_hash_in_previous_epoch(),
)?;
if new_epoch {
db_txn.insert_epoch_ext(&epoch.last_block_hash_in_previous_epoch(), &epoch)?;
}

let ext = BlockExt {
received_at: unix_time_as_millis(),
total_difficulty: cannon_total_difficulty.clone(),
total_uncles_count: parent_ext.total_uncles_count + block.data().uncles().len() as u64,
verified: None,
txs_fees: vec![],
cycles: None,
txs_sizes: None,
};

db_txn.insert_block_ext(&block.header().hash(), &ext)?;

db_txn.commit()?;

Ok((parent_header, cannon_total_difficulty))
}

pub(crate) struct ConsumeOrphan {
shared: Shared,

Expand Down Expand Up @@ -117,7 +46,7 @@ impl ConsumeOrphan {
continue;
}

let descendants: Vec<LonelyBlock> = self
let descendants: Vec<LonelyBlockHash> = self
.orphan_blocks_broker
.remove_blocks_by_parent(&leader_hash);
if descendants.is_empty() {
Expand All @@ -131,7 +60,7 @@ impl ConsumeOrphan {
}
}

pub(crate) fn process_lonely_block(&self, lonely_block: LonelyBlock) {
pub(crate) fn process_lonely_block(&self, lonely_block: LonelyBlockHash) {
let parent_hash = lonely_block.block().parent_hash();
let parent_status = self
.shared
Expand Down Expand Up @@ -164,7 +93,7 @@ impl ConsumeOrphan {
.set(self.orphan_blocks_broker.len() as i64)
});
}
fn send_unverified_block(&self, lonely_block: LonelyBlockHash, total_difficulty: U256) {
fn send_unverified_block(&self, lonely_block: LonelyBlockHash) {
let block_number = lonely_block.block_number_and_hash.number();
let block_hash = lonely_block.block_number_and_hash.hash();
if let Some(metrics) = ckb_metrics::handle() {
Expand All @@ -185,12 +114,11 @@ impl ConsumeOrphan {
return;
}
};

if total_difficulty.gt(self.shared.get_unverified_tip().total_difficulty()) {
if lonely_block.block_number_and_hash.number() > self.shared.snapshot().tip_number() {
self.shared.set_unverified_tip(ckb_shared::HeaderIndex::new(
block_number,
block_hash.clone(),
total_difficulty,
0.into(),
));
self.shared
.get_unverified_index()
Expand All @@ -205,42 +133,43 @@ impl ConsumeOrphan {
block_hash.clone(),
block_number.saturating_sub(self.shared.snapshot().tip_number())
)
} else {
debug!(
"received a block {}-{} with lower or equal difficulty than unverified_tip {}-{}",
block_number,
block_hash,
self.shared.get_unverified_tip().number(),
self.shared.get_unverified_tip().hash(),
);
}
}

pub(crate) fn process_descendant(&self, lonely_block: LonelyBlock) {
match store_unverified_block(&self.shared, lonely_block.block().to_owned()) {
Ok((_parent_header, total_difficulty)) => {
self.shared
.insert_block_status(lonely_block.block().hash(), BlockStatus::BLOCK_STORED);
self.shared.remove_header_view(&lonely_block.block().hash());

let lonely_block_hash: LonelyBlockHash = lonely_block.into();

self.send_unverified_block(lonely_block_hash, total_difficulty)
}

Err(err) => {
error!(
"accept block {} failed: {}",
lonely_block.block().hash(),
err
);
// if total_difficulty.gt(self.shared.get_unverified_tip().total_difficulty()) {
// } else {
// debug!(
// "received a block {}-{} with lower or equal difficulty than unverified_tip {}-{}",
// block_number,
// block_hash,
// self.shared.get_unverified_tip().number(),
// self.shared.get_unverified_tip().hash(),
// );
// }
}

lonely_block.execute_callback(Err(err));
}
}
pub(crate) fn process_descendant(&self, lonely_block: LonelyBlockHash) {
self.shared
.insert_block_status(lonely_block.block().hash(), BlockStatus::BLOCK_STORED);
self.shared.remove_header_view(&lonely_block.block().hash());

self.send_unverified_block(lonely_block)
// match store_unverified_block(&self.shared, lonely_block.block().to_owned()) {
// Ok((_parent_header, total_difficulty)) => {
// }
//
// Err(err) => {
// error!(
// "accept block {} failed: {}",
// lonely_block.block().hash(),
// err
// );
//
// lonely_block.execute_callback(Err(err));
// }
// }
}

fn accept_descendants(&self, descendants: Vec<LonelyBlock>) {
fn accept_descendants(&self, descendants: Vec<LonelyBlockHash>) {
for descendant_block in descendants {
self.process_descendant(descendant_block);
}
Expand Down
2 changes: 1 addition & 1 deletion chain/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread;

const ORPHAN_BLOCK_SIZE: usize = (BLOCK_DOWNLOAD_WINDOW * 2) as usize;
const ORPHAN_BLOCK_SIZE: usize = (BLOCK_DOWNLOAD_WINDOW * 200) as usize;

pub fn start_chain_services(builder: ChainServicesBuilder) -> ChainController {
let orphan_blocks_broker = Arc::new(OrphanBlockPool::with_capacity(ORPHAN_BLOCK_SIZE));
Expand Down
40 changes: 20 additions & 20 deletions chain/src/utils/orphan_block_pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(dead_code)]
use crate::LonelyBlock;
use crate::LonelyBlockHash;
use ckb_logger::debug;
use ckb_types::core::{BlockView, EpochNumber};
use ckb_types::core::EpochNumber;
use ckb_types::packed;
use ckb_util::{parking_lot::RwLock, shrink_to_fit};
use std::collections::{HashMap, HashSet, VecDeque};
Expand All @@ -15,7 +15,7 @@ const EXPIRED_EPOCH: u64 = 6;
#[derive(Default)]
struct InnerPool {
// Group by blocks in the pool by the parent hash.
blocks: HashMap<ParentHash, HashMap<packed::Byte32, LonelyBlock>>,
blocks: HashMap<ParentHash, HashMap<packed::Byte32, LonelyBlockHash>>,
// The map tells the parent hash when given the hash of a block in the pool.
//
// The block is in the orphan pool if and only if the block hash exists as a key in this map.
Expand All @@ -33,7 +33,7 @@ impl InnerPool {
}
}

fn insert(&mut self, lonely_block: LonelyBlock) {
fn insert(&mut self, lonely_block: LonelyBlockHash) {
let hash = lonely_block.block().header().hash();
let parent_hash = lonely_block.block().data().header().raw().parent_hash();
self.blocks
Expand All @@ -53,7 +53,7 @@ impl InnerPool {
self.parents.insert(hash, parent_hash);
}

pub fn remove_blocks_by_parent(&mut self, parent_hash: &ParentHash) -> Vec<LonelyBlock> {
pub fn remove_blocks_by_parent(&mut self, parent_hash: &ParentHash) -> Vec<LonelyBlockHash> {
// try remove leaders first
if !self.leaders.remove(parent_hash) {
return Vec::new();
Expand All @@ -62,7 +62,7 @@ impl InnerPool {
let mut queue: VecDeque<packed::Byte32> = VecDeque::new();
queue.push_back(parent_hash.to_owned());

let mut removed: Vec<LonelyBlock> = Vec::new();
let mut removed: Vec<LonelyBlockHash> = Vec::new();
while let Some(parent_hash) = queue.pop_front() {
if let Some(orphaned) = self.blocks.remove(&parent_hash) {
let (hashes, blocks): (Vec<_>, Vec<_>) = orphaned.into_iter().unzip();
Expand All @@ -87,15 +87,15 @@ impl InnerPool {
removed
}

pub fn get_block(&self, hash: &packed::Byte32) -> Option<Arc<BlockView>> {
self.parents.get(hash).and_then(|parent_hash| {
self.blocks.get(parent_hash).and_then(|blocks| {
blocks
.get(hash)
.map(|lonely_block| Arc::clone(lonely_block.block()))
})
})
}
// pub fn get_block(&self, hash: &packed::Byte32) -> Option<Arc<BlockView>> {
// self.parents.get(hash).and_then(|parent_hash| {
// self.blocks.get(parent_hash).and_then(|blocks| {
// blocks
// .get(hash)
// .map(|lonely_block| Arc::clone(lonely_block.block()))
// })
// })
// }

pub fn contains_block(&self, hash: &packed::Byte32) -> bool {
self.parents.contains_key(hash)
Expand Down Expand Up @@ -148,17 +148,17 @@ impl OrphanBlockPool {
}

/// Insert orphaned block, for which we have already requested its parent block
pub fn insert(&self, lonely_block: LonelyBlock) {
pub fn insert(&self, lonely_block: LonelyBlockHash) {
self.inner.write().insert(lonely_block);
}

pub fn remove_blocks_by_parent(&self, parent_hash: &ParentHash) -> Vec<LonelyBlock> {
pub fn remove_blocks_by_parent(&self, parent_hash: &ParentHash) -> Vec<LonelyBlockHash> {
self.inner.write().remove_blocks_by_parent(parent_hash)
}

pub fn get_block(&self, hash: &packed::Byte32) -> Option<Arc<BlockView>> {
self.inner.read().get_block(hash)
}
// pub fn get_block(&self, hash: &packed::Byte32) -> Option<Arc<BlockView>> {
// self.inner.read().get_block(hash)
// }

pub fn contains_block(&self, hash: &packed::Byte32) -> bool {
self.inner.read().contains_block(hash)
Expand Down
3 changes: 2 additions & 1 deletion sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ impl BlockFetcher {
IBDState::Out => last_common.number() + 1,
}
};
let mut end = min(best_known.number(), start + BLOCK_DOWNLOAD_WINDOW);
// let mut end = min(best_known.number(), start + BLOCK_DOWNLOAD_WINDOW);
let mut end = best_known.number();
let n_fetch = min(
end.saturating_sub(start) as usize + 1,
state.read_inflight_blocks().peer_can_fetch_count(self.peer),
Expand Down

0 comments on commit 1114553

Please sign in to comment.