Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exec/rocksdb sort #35

Draft
wants to merge 12 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 5 additions & 8 deletions block-filter/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ impl BlockFilter {
header.hash()
);
let db = self.shared.store();
if db.get_block_filter_hash(&header.hash()).is_some() {
let num_hash = header.num_hash();
if db.get_block_filter_hash(num_hash.clone()).is_some() {
debug!(
"Filter data for block {:#x} already exists. Skip building.",
header.hash()
Expand All @@ -134,11 +135,11 @@ impl BlockFilter {
let parent_block_filter_hash = if header.is_genesis() {
Byte32::zero()
} else {
db.get_block_filter_hash(&header.parent_hash())
db.get_block_filter_hash(header.parent_num_hash())
.expect("parent block filter data stored")
};

let transactions = db.get_block_body(&header.hash());
let transactions = db.get_block_body_by_num_hash(num_hash.clone());
let transactions_size: usize = transactions.iter().map(|tx| tx.data().total_size()).sum();
let provider = WrappedChainDB::new(db);
let (filter_data, missing_out_points) = build_filter_data(provider, &transactions);
Expand All @@ -151,11 +152,7 @@ impl BlockFilter {
}
let db_transaction = db.begin_transaction();
db_transaction
.insert_block_filter(
&header.hash(),
&filter_data.pack(),
&parent_block_filter_hash,
)
.insert_block_filter(&num_hash, &filter_data.pack(), &parent_block_filter_hash)
.expect("insert_block_filter should be ok");
db_transaction.commit().expect("commit should be ok");
debug!("Inserted filter data for block: {}, hash: {:#x}, filter data size: {}, transactions size: {}", header.number(), header.hash(), filter_data.len(), transactions_size);
Expand Down
26 changes: 15 additions & 11 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use ckb_types::{
},
packed::{Byte32, ProposalShortId},
utilities::merkle_mountain_range::ChainRootMMR,
U256,
BlockNumberAndHash, U256,
};
use ckb_verification::cache::Completed;
use ckb_verification::{BlockVerifier, InvalidParentError, NonContextualBlockTxsVerifier};
Expand Down Expand Up @@ -480,7 +480,7 @@ impl ChainService {
}
total_difficulty = cannon_total_difficulty.clone();
} else {
db_txn.insert_block_ext(&block.header().hash(), &ext)?;
db_txn.insert_block_ext(block.header().num_hash(), &ext)?;
}
db_txn.commit()?;

Expand Down Expand Up @@ -804,7 +804,7 @@ impl ChainService {

self.insert_ok_ext(
&txn,
&b.header().hash(),
b.header().num_hash(),
ext.clone(),
Some(&cache_entries),
Some(txs_sizes),
Expand All @@ -822,24 +822,28 @@ impl ChainService {
Err(err) => {
self.print_error(b, &err);
found_error = Some(err);
self.insert_failure_ext(&txn, &b.header().hash(), ext.clone())?;
self.insert_failure_ext(
&txn,
b.header().num_hash(),
ext.clone(),
)?;
}
}
}
Err(err) => {
found_error = Some(err);
self.insert_failure_ext(&txn, &b.header().hash(), ext.clone())?;
self.insert_failure_ext(&txn, b.header().num_hash(), ext.clone())?;
}
}
} else {
self.insert_failure_ext(&txn, &b.header().hash(), ext.clone())?;
self.insert_failure_ext(&txn, b.header().num_hash(), ext.clone())?;
}
} else {
txn.attach_block(b)?;
attach_block_cell(&txn, b)?;
mmr.push(b.digest())
.map_err(|e| InternalErrorKind::MMR.other(e))?;
self.insert_ok_ext(&txn, &b.header().hash(), ext.clone(), None, None)?;
self.insert_ok_ext(&txn, b.header().num_hash(), ext.clone(), None, None)?;
}
}

Expand Down Expand Up @@ -877,7 +881,7 @@ impl ChainService {
fn insert_ok_ext(
&self,
txn: &StoreTransaction,
hash: &Byte32,
num_hash: BlockNumberAndHash,
mut ext: BlockExt,
cache_entries: Option<&[Completed]>,
txs_sizes: Option<Vec<u64>>,
Expand All @@ -892,17 +896,17 @@ impl ChainService {
ext.cycles = Some(cycles);
}
ext.txs_sizes = txs_sizes;
txn.insert_block_ext(hash, &ext)
txn.insert_block_ext(num_hash, &ext)
}

fn insert_failure_ext(
&self,
txn: &StoreTransaction,
hash: &Byte32,
num_hash: BlockNumberAndHash,
mut ext: BlockExt,
) -> Result<(), Error> {
ext.verified = Some(false);
txn.insert_block_ext(hash, &ext)
txn.insert_block_ext(num_hash, &ext)
}

fn monitor_block_txs_verified(
Expand Down
7 changes: 5 additions & 2 deletions chain/src/tests/uncle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ fn test_get_block_body_after_inserting() {
chain_service
.process_block(Arc::new(blk.clone()), Switch::DISABLE_ALL)
.unwrap();
let len = shared.snapshot().get_block_body(&blk.hash()).len();
let len = shared
.snapshot()
.get_block_body(blk.number(), &blk.hash())
.len();
assert_eq!(len, 1, "[fork1] snapshot.get_block_body({})", blk.hash(),);
}
for blk in fork2.blocks() {
Expand All @@ -40,7 +43,7 @@ fn test_get_block_body_after_inserting() {
assert!(snapshot.get_block_header(&blk.hash()).is_some());
assert!(snapshot.get_block_uncles(&blk.hash()).is_some());
assert!(snapshot.get_block_proposal_txs_ids(&blk.hash()).is_some());
let len = snapshot.get_block_body(&blk.hash()).len();
let len = snapshot.get_block_body(blk.number(), &blk.hash()).len();
assert_eq!(len, 1, "[fork2] snapshot.get_block_body({})", blk.hash(),);
}
}
2 changes: 1 addition & 1 deletion ckb-bin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ fn run_app_inner(
matches: &ArgMatches,
) -> Result<(), ExitCode> {
let is_silent_logging = is_silent_logging(cmd);
let (mut handle, mut handle_stop_rx, _runtime) = new_global_runtime();
let (mut handle, mut handle_stop_rx, _runtime) = new_global_runtime(None);
let setup = Setup::from_matches(bin_name, cmd, matches)?;
let _guard = SetupGuard::from_setup(&setup, &version, handle.clone(), is_silent_logging)?;

Expand Down
18 changes: 14 additions & 4 deletions ckb-bin/src/subcommand/run.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::thread::available_parallelism;

use crate::helper::deadlock_detection;
use ckb_app_config::{ExitCode, RunArgs};
use ckb_async_runtime::Handle;
use ckb_async_runtime::{new_global_runtime, Handle};
use ckb_build_info::Version;
use ckb_launcher::Launcher;
use ckb_logger::info;
Expand All @@ -11,8 +13,11 @@ use ckb_types::core::cell::setup_system_cell_cache;
pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), ExitCode> {
deadlock_detection();

let rpc_threads_num = calc_rpc_threads_num(&args);
info!("ckb version: {}", version);
let mut launcher = Launcher::new(args, version, async_handle);
info!("run rpc server with {} threads", rpc_threads_num);
let (mut rpc_handle, _rpc_stop_rx, _runtime) = new_global_runtime(Some(rpc_threads_num));
let launcher = Launcher::new(args, version, async_handle, rpc_handle.clone());

let block_assembler_config = launcher.sanitize_block_assembler_config()?;
let miner_enable = block_assembler_config.is_some();
Expand Down Expand Up @@ -40,8 +45,6 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(),
Some(shared.store().db().inner()),
);

launcher.check_assume_valid_target(&shared);

let chain_controller = launcher.start_chain_service(&shared, pack.take_proposal_table());

launcher.start_block_filter(&shared);
Expand All @@ -63,7 +66,14 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(),
})
.expect("Error setting Ctrl-C handler");

rpc_handle.drop_guard();
wait_all_ckb_services_exit();

Ok(())
}

fn calc_rpc_threads_num(args: &RunArgs) -> usize {
let system_parallelism: usize = available_parallelism().unwrap().into();
let default_num = usize::max(system_parallelism, 1);
args.config.rpc.threads.unwrap_or(default_num)
}
20 changes: 10 additions & 10 deletions db-migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use ckb_channel::select;
use ckb_channel::unbounded;
use ckb_channel::Receiver;
use ckb_db::{ReadOnlyDB, RocksDB};
use ckb_db_schema::{COLUMN_META, META_TIP_HEADER_KEY, MIGRATION_VERSION_KEY};
use ckb_db_schema::COLUMN_META;
use ckb_error::{Error, InternalErrorKind};
use ckb_logger::{debug, error, info};
use ckb_stop_handler::register_thread;
Expand Down Expand Up @@ -79,7 +79,7 @@ impl MigrationWorker {
pb
};
if let Ok(db) = task.migrate(self.db.clone(), Arc::new(pb)) {
db.put_default(MIGRATION_VERSION_KEY, task.version())
db.put_default(COLUMN_META::MIGRATION_VERSION_KEY, task.version())
.map_err(|err| {
internal_error(format!("failed to migrate the database: {err}"))
})
Expand Down Expand Up @@ -117,7 +117,7 @@ impl Migrations {
/// Requires upgrade the executable binary.
pub fn check(&self, db: &ReadOnlyDB, include_background: bool) -> Ordering {
let db_version = match db
.get_pinned_default(MIGRATION_VERSION_KEY)
.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.expect("get the version of database")
{
Some(version_bytes) => {
Expand Down Expand Up @@ -152,7 +152,7 @@ impl Migrations {
/// Check if the migrations will consume a lot of time.
pub fn expensive(&self, db: &ReadOnlyDB, include_background: bool) -> bool {
let db_version = match db
.get_pinned_default(MIGRATION_VERSION_KEY)
.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.expect("get the version of database")
{
Some(version_bytes) => {
Expand All @@ -178,7 +178,7 @@ impl Migrations {
/// Check if all the pending migrations will be executed in background.
pub fn can_run_in_background(&self, db: &ReadOnlyDB) -> bool {
let db_version = match db
.get_pinned_default(MIGRATION_VERSION_KEY)
.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.expect("get the version of database")
{
Some(version_bytes) => {
Expand All @@ -198,7 +198,7 @@ impl Migrations {
}

fn is_non_empty_rdb(&self, db: &ReadOnlyDB) -> bool {
if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY) {
if let Ok(v) = db.get_pinned(COLUMN_META::NAME, COLUMN_META::META_TIP_HEADER_KEY) {
if v.is_some() {
return true;
}
Expand All @@ -207,7 +207,7 @@ impl Migrations {
}

fn is_non_empty_db(&self, db: &RocksDB) -> bool {
if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY) {
if let Ok(v) = db.get_pinned(COLUMN_META::NAME, COLUMN_META::META_TIP_HEADER_KEY) {
if v.is_some() {
return true;
}
Expand All @@ -232,7 +232,7 @@ impl Migrations {
pb
};
db = m.migrate(db, Arc::new(pb))?;
db.put_default(MIGRATION_VERSION_KEY, m.version())
db.put_default(COLUMN_META::MIGRATION_VERSION_KEY, m.version())
.map_err(|err| internal_error(format!("failed to migrate the database: {err}")))?;
}
mpb.join_and_clear().expect("MultiProgress join");
Expand Down Expand Up @@ -273,7 +273,7 @@ impl Migrations {

fn get_migration_version(&self, db: &RocksDB) -> Result<Option<String>, Error> {
let raw = db
.get_pinned_default(MIGRATION_VERSION_KEY)
.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.map_err(|err| {
internal_error(format!("failed to get the version of database: {err}"))
})?;
Expand All @@ -289,7 +289,7 @@ impl Migrations {
if db_version.is_none() {
if let Some(m) = self.migrations.values().last() {
info!("Init database version {}", m.version());
db.put_default(MIGRATION_VERSION_KEY, m.version())
db.put_default(COLUMN_META::MIGRATION_VERSION_KEY, m.version())
.map_err(|err| {
internal_error(format!("failed to migrate the database: {err}"))
})?;
Expand Down
12 changes: 6 additions & 6 deletions db-migration/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use ckb_app_config::DBConfig;
use ckb_db::ReadOnlyDB;
use ckb_db::RocksDB;
use ckb_db_schema::MIGRATION_VERSION_KEY;
use ckb_db_schema::COLUMN_META;
use ckb_error::Error;
use indicatif::ProgressBar;
use std::sync::Arc;
Expand All @@ -26,7 +26,7 @@ fn test_default_migration() {
let r = migrations.migrate(db, false).unwrap();
assert_eq!(
b"20191116225943".to_vec(),
r.get_pinned_default(MIGRATION_VERSION_KEY)
r.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.unwrap()
.unwrap()
.to_vec()
Expand All @@ -41,7 +41,7 @@ fn test_default_migration() {
.unwrap();
assert_eq!(
b"20191127101121".to_vec(),
r.get_pinned_default(MIGRATION_VERSION_KEY)
r.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.unwrap()
.unwrap()
.to_vec()
Expand Down Expand Up @@ -117,7 +117,7 @@ fn test_customized_migration() {
);
assert_eq!(
VERSION.as_bytes(),
db.get_pinned_default(MIGRATION_VERSION_KEY)
db.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.unwrap()
.unwrap()
.to_vec()
Expand Down Expand Up @@ -209,7 +209,7 @@ fn test_background_migration() {
let r = migrations.migrate(db, false).unwrap();
assert_eq!(
b"20191116225943".to_vec(),
r.get_pinned_default(MIGRATION_VERSION_KEY)
r.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.unwrap()
.unwrap()
.to_vec()
Expand Down Expand Up @@ -248,7 +248,7 @@ fn test_background_migration() {
std::thread::sleep(std::time::Duration::from_millis(1000));
assert_eq!(
b"20241127101122".to_vec(),
db.get_pinned_default(MIGRATION_VERSION_KEY)
db.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.unwrap()
.unwrap()
.to_vec()
Expand Down
1 change: 1 addition & 0 deletions db-schema/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ repository = "https://github.com/nervosnetwork/ckb"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ckb-types = { version = "0.117.0-pre", path = "../util/types" }
Loading