Skip to content

Commit

Permalink
Deadlock fix, commented about locking order. Fixed double requests.
Browse files Browse the repository at this point in the history
  • Loading branch information
bayk committed Dec 5, 2024
1 parent fb5d02f commit 07bc880
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 113 deletions.
83 changes: 72 additions & 11 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,11 @@ impl OrphanBlockPool {
/// maintains locking for the pipeline to avoid conflicting processing.
pub struct Chain {
db_root: String,
store: Arc<store::ChainStore>,
store: Arc<store::ChainStore>, // Lock order (with childrer): 3
adapter: Arc<dyn ChainAdapter + Send + Sync>,
orphans: Arc<OrphanBlockPool>,
txhashset: Arc<RwLock<txhashset::TxHashSet>>,
header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
txhashset: Arc<RwLock<txhashset::TxHashSet>>, // Lock order (with childrer): 2
header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>, // Lock order (with childrer): 1
pibd_segmenter: Arc<RwLock<Option<Segmenter>>>,
pibd_desegmenter: Arc<RwLock<Option<Desegmenter>>>,
reset_pibd_desegmenter: Arc<RwLock<bool>>,
Expand Down Expand Up @@ -387,12 +387,14 @@ impl Chain {
}

/// Return our shared header MMR handle.
pub fn header_pmmr(&self) -> Arc<RwLock<PMMRHandle<BlockHeader>>> {
/// Note, caller is responsible for locking in correct order. See the comment at declaration
pub fn get_header_pmmr_for_test(&self) -> Arc<RwLock<PMMRHandle<BlockHeader>>> {
self.header_pmmr.clone()
}

/// Return our shared txhashset instance.
pub fn txhashset(&self) -> Arc<RwLock<TxHashSet>> {
/// Note, caller is responsible for locking in correct order. See the comment at declaration
pub fn get_txhashset_for_test(&self) -> Arc<RwLock<TxHashSet>> {
self.txhashset.clone()
}

Expand All @@ -402,7 +404,8 @@ impl Chain {
}

/// Shared store instance.
pub fn store(&self) -> Arc<store::ChainStore> {
/// Note, caller is responsible for locking in correct order. See the comment at declaration
pub fn get_store_for_tests(&self) -> Arc<store::ChainStore> {
self.store.clone()
}

Expand Down Expand Up @@ -1041,7 +1044,8 @@ impl Chain {
/// Do we need to do the check here? we are doing check for every tx regardless of the kernel version.
pub fn replay_attack_check(&self, tx: &Transaction) -> Result<(), Error> {
let mut header_pmmr = self.header_pmmr.write();
txhashset::header_extending_readonly(&mut header_pmmr, &self.store(), |ext, batch| {
let batch_read = self.store.batch_read()?;
txhashset::header_extending_readonly(&mut header_pmmr, batch_read, |ext, batch| {
pipe::check_against_spent_output(&tx.body, None, None, ext, batch)?;
Ok(())
})
Expand Down Expand Up @@ -1079,8 +1083,9 @@ impl Chain {
/// Sets prev_root on a brand new block header by applying the previous header to the header MMR.
pub fn set_prev_root_only(&self, header: &mut BlockHeader) -> Result<(), Error> {
let mut header_pmmr = self.header_pmmr.write();
let batch_read = self.store.batch_read()?;
let prev_root =
txhashset::header_extending_readonly(&mut header_pmmr, &self.store(), |ext, batch| {
txhashset::header_extending_readonly(&mut header_pmmr, batch_read, |ext, batch| {
let prev_header = batch.get_previous_header(header)?;
self.rewind_and_apply_header_fork(&prev_header, ext, batch)?;
ext.root()
Expand Down Expand Up @@ -1301,7 +1306,7 @@ impl Chain {

Ok(Segmenter::new(
Arc::new(RwLock::new(segm_header_pmmr_backend)),
self.txhashset(),
self.txhashset.clone(),
bitmap_snapshot,
header.clone(),
))
Expand Down Expand Up @@ -1360,7 +1365,7 @@ impl Chain {
);

Ok(Desegmenter::new(
self.txhashset(),
self.txhashset.clone(),
self.header_pmmr.clone(),
archive_header.clone(),
bitmap_root_hash,
Expand Down Expand Up @@ -2100,7 +2105,8 @@ impl Chain {
/// Note: This is based on the provided sync_head to support syncing against a fork.
pub fn get_locator_hashes(&self, sync_head: Tip, heights: &[u64]) -> Result<Vec<Hash>, Error> {
let mut header_pmmr = self.header_pmmr.write();
txhashset::header_extending_readonly(&mut header_pmmr, &self.store(), |ext, batch| {
let batch_read = self.store.batch_read()?;
txhashset::header_extending_readonly(&mut header_pmmr, batch_read, |ext, batch| {
let header = batch.get_block_header(&sync_head.hash())?;
self.rewind_and_apply_header_fork(&header, ext, batch)?;

Expand Down Expand Up @@ -2128,6 +2134,61 @@ impl Chain {
.block_exists(h)
.map_err(|e| Error::StoreErr(e, "chain block exists".to_owned()))
}

/// Locate headers from the main chain.
pub fn locate_headers(
&self,
locator: &[Hash],
block_header_num: u32,
) -> Result<Vec<mwc_core::core::BlockHeader>, crate::Error> {
debug!("locator: {:?}", locator);

let header = match self.find_common_header(locator) {
Some(header) => header,
None => return Ok(vec![]),
};

let max_height = self.header_head()?.height;

let header_pmmr = self.header_pmmr.read();

// looks like we know one, getting as many following headers as allowed
let hh = header.height;
let mut headers = vec![];
for h in (hh + 1)..=(hh + (block_header_num as u64)) {
if h > max_height {
break;
}

if let Ok(hash) = header_pmmr.get_header_hash_by_height(h) {
let header = self.get_block_header(&hash)?;
headers.push(header);
} else {
error!("Failed to locate headers successfully.");
break;
}
}
debug!("returning headers: {}", headers.len());
Ok(headers)
}

// Find the first locator hash that refers to a known header on our main chain.
fn find_common_header(&self, locator: &[Hash]) -> Option<BlockHeader> {
let header_pmmr = self.header_pmmr.read();

for hash in locator {
if let Ok(header) = self.get_block_header(&hash) {
if let Ok(hash_at_height) = header_pmmr.get_header_hash_by_height(header.height) {
if let Ok(header_at_height) = self.get_block_header(&hash_at_height) {
if header.hash() == header_at_height.hash() {
return Some(header);
}
}
}
}
}
None
}
}

fn setup_head(
Expand Down
2 changes: 1 addition & 1 deletion chain/src/pibd_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl PibdParams {
match self.cpu_num {
1 => 2,
2 => 4,
_ => 8,
_ => 6,
}
}

Expand Down
11 changes: 8 additions & 3 deletions chain/src/txhashset/desegmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use tokio::runtime::Builder;
use tokio::task;

/// Desegmenter for rebuilding a txhashset from PIBD segments
/// Note!!! header_pmmr, txhashset & store are from the Chain. Same locking rules are applicable
pub struct Desegmenter {
txhashset: Arc<RwLock<TxHashSet>>,
header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
Expand Down Expand Up @@ -293,10 +294,14 @@ impl Desegmenter {
// Check NRD relative height rules for full kernel history.
{
info!("desegmenter validation: validating kernel history");
let txhashset = self.txhashset.read();
Chain::validate_kernel_history(&self.archive_header, &txhashset)?;

// Note, locking order is: header_pmmr->txhashset->batch !!!
{
// validate_kernel_history is long operation, that is why let's lock txhashset twice.
let txhashset = self.txhashset.read();
Chain::validate_kernel_history(&self.archive_header, &txhashset)?;
}
let header_pmmr = self.header_pmmr.read();
let txhashset = self.txhashset.read();
let batch = self.store.batch_read()?;
txhashset.verify_kernel_pos_index(
&self.genesis,
Expand Down
1 change: 1 addition & 0 deletions chain/src/txhashset/segmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use mwc_core::core::pmmr::{ReadonlyPMMR, VecBackend};
use std::{sync::Arc, time::Instant};

/// Segmenter for generating PIBD segments.
/// Note!!! header_pmmr, txhashset & store are from the Chain. Same locking rules are applicable
#[derive(Clone)]
pub struct Segmenter {
// every 512th header (HEADERS_PER_BATCH) must be here, we don't need all header hashes
Expand Down
8 changes: 3 additions & 5 deletions chain/src/txhashset/txhashset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,25 +866,23 @@ where
/// to allow headers to be validated before we receive the full block data.
pub fn header_extending_readonly<'a, F, T>(
handle: &'a mut PMMRHandle<BlockHeader>,
store: &ChainStore,
batch_read: Batch<'_>,
inner: F,
) -> Result<T, Error>
where
F: FnOnce(&mut HeaderExtension<'_>, &Batch<'_>) -> Result<T, Error>,
{
let batch = store.batch_read()?;

let head = match handle.head_hash() {
Ok(hash) => {
let header = batch.get_block_header(&hash)?;
let header = batch_read.get_block_header(&hash)?;
Tip::from_header(&header)
}
Err(_) => Tip::default(),
};

let pmmr = PMMR::at(&mut handle.backend, handle.size);
let mut extension = HeaderExtension::new(pmmr, head);
let res = inner(&mut extension, &batch);
let res = inner(&mut extension, &batch_read);

handle.backend.discard();

Expand Down
6 changes: 3 additions & 3 deletions chain/tests/process_block_cut_through.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ fn process_block_cut_through() -> Result<(), chain::Error> {
// Now exercise the internal call to pipe::process_block() directly so we can introspect the error
// without it being wrapped as above.
{
let store = chain.store();
let header_pmmr = chain.header_pmmr();
let txhashset = chain.txhashset();
let store = chain.get_store_for_tests();
let header_pmmr = chain.get_header_pmmr_for_test();
let txhashset = chain.get_txhashset_for_test();

let mut header_pmmr = header_pmmr.write();
let mut txhashset = txhashset.write();
Expand Down
2 changes: 1 addition & 1 deletion chain/tests/store_indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn test_store_indices() {

{
// Start a new batch and delete the block.
let store = chain.store();
let store = chain.get_store_for_tests();
let batch = store.batch_write().unwrap();
assert!(batch.delete_block(&block_hash).is_ok());

Expand Down
2 changes: 1 addition & 1 deletion chain/tests/test_block_known.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn check_known() {
// reset chain head to earlier state
{
let chain = init_chain(chain_dir, genesis.clone());
let store = chain.store();
let store = chain.get_store_for_tests();
let batch = store.batch_write().unwrap();
let head_header = chain.head_header().unwrap();
let prev = batch.get_previous_header(&head_header).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions chain/tests/test_pibd_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl DesegmenterRequestor {
{
let max_height = src_chain.header_head().unwrap().height;

let header_pmmr = src_chain.header_pmmr();
let header_pmmr = src_chain.get_header_pmmr_for_test();
let header_pmmr = header_pmmr.read();

let header = src_chain.get_block_header(&target_hash).unwrap();
Expand Down Expand Up @@ -343,7 +343,7 @@ impl DesegmenterRequestor {
}

pub fn check_roots(&self, archive_header_height: u64) {
let roots = self.chain.txhashset().read().roots().unwrap();
let roots = self.chain.get_txhashset_for_test().read().roots().unwrap();
let archive_header = self
.chain
.get_header_by_height(archive_header_height)
Expand Down
52 changes: 1 addition & 51 deletions servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,38 +418,7 @@ where
}

fn locate_headers(&self, locator: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
debug!("locator: {:?}", locator);

let header = match self.find_common_header(locator) {
Some(header) => header,
None => return Ok(vec![]),
};

let max_height = self.chain().header_head()?.height;

let header_pmmr = self.chain().header_pmmr();
let header_pmmr = header_pmmr.read();

// looks like we know one, getting as many following headers as allowed
let hh = header.height;
let mut headers = vec![];
for h in (hh + 1)..=(hh + (p2p::MAX_BLOCK_HEADERS as u64)) {
if h > max_height {
break;
}

if let Ok(hash) = header_pmmr.get_header_hash_by_height(h) {
let header = self.chain().get_block_header(&hash)?;
headers.push(header);
} else {
error!("Failed to locate headers successfully.");
break;
}
}

debug!("returning headers: {}", headers.len());

Ok(headers)
self.chain().locate_headers(locator, p2p::MAX_BLOCK_HEADERS)
}

/// Gets a full block by its hash.
Expand Down Expand Up @@ -815,25 +784,6 @@ where
.expect("Failed to upgrade weak ref to our chain.")
}

// Find the first locator hash that refers to a known header on our main chain.
fn find_common_header(&self, locator: &[Hash]) -> Option<BlockHeader> {
let header_pmmr = self.chain().header_pmmr();
let header_pmmr = header_pmmr.read();

for hash in locator {
if let Ok(header) = self.chain().get_block_header(&hash) {
if let Ok(hash_at_height) = header_pmmr.get_header_hash_by_height(header.height) {
if let Ok(header_at_height) = self.chain().get_block_header(&hash_at_height) {
if header.hash() == header_at_height.hash() {
return Some(header);
}
}
}
}
}
None
}

// pushing the new block through the chain pipeline
// remembering to reset the head if we have a bad block
fn process_block(
Expand Down
Loading

0 comments on commit 07bc880

Please sign in to comment.