Skip to content

Commit

Permalink
Make filtering parallel in get_logs. (#1512)
Browse files Browse the repository at this point in the history
* Make filtering parallel in get_logs.

* Remove lock in rocksdb.

* Fix comments.

* Fix comments.
  • Loading branch information
peilun-conflux authored and Peilun Li committed Jun 9, 2020
1 parent aeacff1 commit 5264264
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 40 deletions.
67 changes: 38 additions & 29 deletions core/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,41 +756,50 @@ impl ConsensusGraph {
}

let blooms = filter.bloom_possibilities();
let mut blocks = vec![];
for epoch_number in from_epoch..(to_epoch + 1) {
if epoch_number <= inner.get_cur_era_genesis_height() {
// Blocks before (including) `cur_era_genesis` does not has
// epoch set in memory, so we should get
// the epoch set from db
let epoch_set = self
.data_man
.executed_epoch_set_hashes_from_db(epoch_number)
.expect("epoch set past checkpoint should exist");
let epoch_hash = epoch_set.last().expect("Not empty");
for hash in &epoch_set {
if self.block_matches_bloom(hash, epoch_hash, &blooms) {
blocks.push(*hash);
(from_epoch..(to_epoch + 1))
.into_par_iter()
.map(|epoch_number| {
let mut blocks = Vec::new();
if epoch_number <= inner.get_cur_era_genesis_height() {
// Blocks before (including) `cur_era_genesis` do not
// have epoch set in memory, so
// we should get the epoch set from db
let epoch_set = self
.data_man
.executed_epoch_set_hashes_from_db(epoch_number)
.expect("epoch set from past era should exist");
let epoch_hash = epoch_set.last().expect("Not empty");
for hash in &epoch_set {
if self
.block_matches_bloom(hash, epoch_hash, &blooms)
{
blocks.push(*hash)
}
}
}
} else {
// Use the epoch set maintained in memory
let epoch_hash = &inner.arena
[inner.get_pivot_block_arena_index(epoch_number)]
.hash;
for index in inner.get_ordered_executable_epoch_blocks(
inner.get_pivot_block_arena_index(epoch_number),
) {
let hash = &inner.arena[*index].hash;
if self.block_matches_bloom(hash, epoch_hash, &blooms) {
blocks.push(*hash);
} else {
// Use the epoch set maintained in memory
let epoch_hash = &inner.arena
[inner.get_pivot_block_arena_index(epoch_number)]
.hash;
for index in inner.get_ordered_executable_epoch_blocks(
inner.get_pivot_block_arena_index(epoch_number),
) {
let hash = &inner.arena[*index].hash;
if self
.block_matches_bloom(hash, epoch_hash, &blooms)
{
blocks.push(*hash);
}
}
}
}
}
blocks
blocks
})
.flatten()
.collect()
} else {
filter.block_hashes.as_ref().unwrap().clone()
};
debug!("get_logs: {} blocks after filter", block_hashes.len());

Ok(self.logs_from_blocks(
block_hashes,
Expand Down
21 changes: 10 additions & 11 deletions db/src/kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,14 @@ fn col_config(

unsafe impl Send for Database {}
unsafe impl Sync for Database {}
/// TODO Mutex around Options may not be needed
/// Key-Value database.
pub struct Database {
db: RwLock<Option<DBAndColumns>>,
config: DatabaseConfig,
path: String,
write_opts: Mutex<WriteOptions>,
read_opts: Mutex<ReadOptions>,
block_opts: Mutex<BlockBasedOptions>,
write_opts: WriteOptions,
read_opts: ReadOptions,
block_opts: BlockBasedOptions,
// Dirty values added with `write_buffered`. Cleaned on `flush`.
overlay: RwLock<Vec<HashMap<DBKey, KeyState>>>,
// Values currently being flushed. Cleared when `flush` completes.
Expand Down Expand Up @@ -399,9 +398,9 @@ impl Database {
),
flushing_lock: Mutex::new(false),
path: path.to_owned(),
read_opts: Mutex::new(read_opts),
write_opts: Mutex::new(write_opts),
block_opts: Mutex::new(block_opts),
read_opts,
write_opts,
block_opts,
})
}

Expand Down Expand Up @@ -458,7 +457,7 @@ impl Database {

check_for_corruption(
&self.path,
cfs.db.write_opt(&batch, &*self.write_opts.lock()),
cfs.db.write_opt(&batch, &self.write_opts),
)?;

for column in self.flushing.write().iter_mut() {
Expand Down Expand Up @@ -512,7 +511,7 @@ impl Database {

check_for_corruption(
&self.path,
cfs.db.write_opt(&batch, &*self.write_opts.lock()),
cfs.db.write_opt(&batch, &self.write_opts),
)
}
None => Err(other_io_err("Database is closed")),
Expand Down Expand Up @@ -541,7 +540,7 @@ impl Database {
.get_cf_opt(
cfs.get_cf(col as usize),
key,
&*self.read_opts.lock(),
&self.read_opts,
)
.map(|r| r.map(|v| v.to_vec()))
.map_err(other_io_err),
Expand Down Expand Up @@ -641,7 +640,7 @@ impl Database {
let name = format!("col{}", col);
db.create_cf((
name.as_str(),
col_config(&self.config, &*self.block_opts.lock())?,
col_config(&self.config, &self.block_opts)?,
))
.map_err(other_io_err)?;
column_names.push(name);
Expand Down

0 comments on commit 5264264

Please sign in to comment.