Skip to content

Commit

Permalink
Merge pull request #3335 from stacks-network/feat/interruptable-miner
Browse files Browse the repository at this point in the history
[DRAFT] [miner] Reduce fork and orphan rate with an interruptable miner
  • Loading branch information
jcnelson authored Oct 22, 2022
2 parents 56061ef + aba47e4 commit 9d89725
Show file tree
Hide file tree
Showing 29 changed files with 5,335 additions and 2,309 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/bitcoin-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ jobs:
test-name:
- tests::neon_integrations::miner_submit_twice
- tests::neon_integrations::microblock_integration_test
- tests::neon_integrations::microblock_fork_poison_integration_test
- tests::neon_integrations::size_check_integration_test
- tests::neon_integrations::cost_voting_integration
- tests::integrations::integration_test_get_info
Expand All @@ -59,7 +60,6 @@ jobs:
- tests::neon_integrations::antientropy_integration_test
- tests::neon_integrations::filter_low_fee_tx_integration_test
- tests::neon_integrations::filter_long_runtime_tx_integration_test
- tests::neon_integrations::mining_transactions_is_fair
- tests::neon_integrations::microblock_large_tx_integration_test_FLAKY
- tests::neon_integrations::block_large_tx_integration_test
- tests::neon_integrations::microblock_limit_hit_integration_test
Expand Down
25 changes: 25 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,31 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to the versioning scheme outlined in the [README.md](README.md).

## [2.05.0.5.0]

### Changed

- The act of walking the mempool will now cache address nonces in RAM and to a
temporary mempool table used for the purpose, instead of unconditionally
querying them from the chainstate MARF. This builds upon improvements to mempool
goodput over 2.05.0.4.0 (#3337).
- The node and miner implementation has been refactored to remove write-lock
contention that can arise when the node's chains-coordinator thread attempts to store and
process newly-discovered (or newly-mined) blocks, and when the node's relayer
thread attempts to mine a new block. In addition, the miner logic has been
moved to a separate thread in order to avoid starving the relayer thread (which
must handle block and transaction propagation, as well as block-processing).
The refactored miner thread will be preemptively terminated and restarted
by the arrival of new Stacks blocks or burnchain blocks, which further
prevents the miner from holding open write-locks in the underlying
chainstate databases when there is new chain data to discover (which would
invalidate the miner's work anyway). (#3335).

### Fixed

- Fixed `pow` documentation in Clarity (#3338).
- Backported unit tests that were omitted in the 2.05.0.3.0 release (#3348).

## [2.05.0.4.0]

### Fixed
Expand Down
19 changes: 10 additions & 9 deletions src/chainstate/burn/db/sortdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ use crate::chainstate::stacks::index::{ClarityMarfTrieId, MARFValue};
use stacks_common::types::chainstate::StacksAddress;
use stacks_common::types::chainstate::TrieHash;
use stacks_common::types::chainstate::{
BlockHeaderHash, BurnchainHeaderHash, PoxId, SortitionId, VRFSeed,
BlockHeaderHash, BurnchainHeaderHash, PoxId, SortitionId, StacksBlockId, VRFSeed,
};

const BLOCK_HEIGHT_MAX: u64 = ((1 as u64) << 63) - 1;
Expand Down Expand Up @@ -1434,7 +1434,7 @@ impl<'a> SortitionHandleTx<'a> {
)?;
} else {
// see if this block builds off of a Stacks block mined on this burnchain fork
let height_opt = match SortitionDB::get_accepted_stacks_block_pointer(
let parent_height_opt = match SortitionDB::get_accepted_stacks_block_pointer(
self,
&burn_tip.consensus_hash,
parent_stacks_block_hash,
Expand All @@ -1452,18 +1452,19 @@ impl<'a> SortitionHandleTx<'a> {
}
}
};
match height_opt {
Some(height) => {
match parent_height_opt {
Some(parent_height) => {
if stacks_block_height > burn_tip.canonical_stacks_tip_height {
assert!(stacks_block_height > height, "BUG: DB corruption -- block height {} <= {} means we accepted a block out-of-order", stacks_block_height, height);
assert!(stacks_block_height > parent_height, "BUG: DB corruption -- block height {} <= {} means we accepted a block out-of-order", stacks_block_height, parent_height);

// This block builds off of a parent that is _concurrent_ with the memoized canonical stacks chain pointer.
// i.e. this block will reorg the Stacks chain on the canonical burnchain fork.
// Memoize this new stacks chain tip to the canonical burn chain snapshot.
// Note that we don't have to check continuity of accepted blocks -- we already
// are guaranteed by the Stacks chain state code that Stacks blocks in a given
// Stacks fork will be marked as accepted in sequential order (i.e. at height h, h+1,
// h+2, etc., without any gaps).
debug!("Accepted Stacks block {}/{} builds on a previous canonical Stacks tip on this burnchain fork ({})", consensus_hash, stacks_block_hash, &burn_tip.burn_header_hash);
debug!("Accepted Stacks block {}/{} ({}) builds on a previous canonical Stacks tip on this burnchain fork ({})", consensus_hash, stacks_block_hash, stacks_block_height, &burn_tip.burn_header_hash);
let args: &[&dyn ToSql] = &[
consensus_hash,
stacks_block_hash,
Expand All @@ -1477,7 +1478,7 @@ impl<'a> SortitionHandleTx<'a> {
// This block was mined on this fork, but it's acceptance doesn't overtake
// the current stacks chain tip. Remember it so that we can process its children,
// which might do so later.
debug!("Accepted Stacks block {}/{} builds on a non-canonical Stacks tip in this burnchain fork ({})", consensus_hash, stacks_block_hash, &burn_tip.burn_header_hash);
debug!("Accepted Stacks block {}/{} ({}) builds on a non-canonical Stacks tip in this burnchain fork ({} height {})", consensus_hash, stacks_block_hash, stacks_block_height, &burn_tip.burn_header_hash, burn_tip.canonical_stacks_tip_height);
}
SortitionDB::insert_accepted_stacks_block_pointer(
self,
Expand Down Expand Up @@ -2475,8 +2476,8 @@ impl SortitionDB {
pub fn is_db_version_supported_in_epoch(epoch: StacksEpochId, version: &str) -> bool {
match epoch {
StacksEpochId::Epoch10 => false,
StacksEpochId::Epoch20 => (version == "1" || version == "2" || version == "3"),
StacksEpochId::Epoch2_05 => (version == "2" || version == "3" || version == "4"),
StacksEpochId::Epoch20 => version == "1" || version == "2" || version == "3",
StacksEpochId::Epoch2_05 => version == "2" || version == "3" || version == "4",
}
}

Expand Down
9 changes: 9 additions & 0 deletions src/chainstate/coordinator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::convert::{TryFrom, TryInto};
use std::fs;
use std::path::PathBuf;
use std::sync::mpsc::SyncSender;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;

use crate::burnchains::{
Expand All @@ -39,6 +41,7 @@ use crate::chainstate::stacks::{
StacksHeaderInfo,
},
events::{StacksTransactionEvent, StacksTransactionReceipt, TransactionOrigin},
miner::{signal_mining_blocked, signal_mining_ready, MinerStatus},
Error as ChainstateError, StacksBlock, TransactionPayload,
};
use crate::core::StacksEpoch;
Expand Down Expand Up @@ -272,6 +275,7 @@ impl<'a, T: BlockEventDispatcher, CE: CostEstimator + ?Sized, FE: FeeEstimator +
atlas_config: AtlasConfig,
cost_estimator: Option<&mut CE>,
fee_estimator: Option<&mut FE>,
miner_status: Arc<Mutex<MinerStatus>>,
) where
T: BlockEventDispatcher,
{
Expand Down Expand Up @@ -311,18 +315,23 @@ impl<'a, T: BlockEventDispatcher, CE: CostEstimator + ?Sized, FE: FeeEstimator +
// timeout so that we handle Ctrl-C a little gracefully
match comms.wait_on() {
CoordinatorEvents::NEW_STACKS_BLOCK => {
signal_mining_blocked(miner_status.clone());
debug!("Received new stacks block notice");
if let Err(e) = inst.handle_new_stacks_block() {
warn!("Error processing new stacks block: {:?}", e);
}
signal_mining_ready(miner_status.clone());
}
CoordinatorEvents::NEW_BURN_BLOCK => {
signal_mining_blocked(miner_status.clone());
debug!("Received new burn block notice");
if let Err(e) = inst.handle_new_burnchain_block() {
warn!("Error processing new burn block: {:?}", e);
}
signal_mining_ready(miner_status.clone());
}
CoordinatorEvents::STOP => {
signal_mining_blocked(miner_status.clone());
debug!("Received stop notice");
return;
}
Expand Down
136 changes: 101 additions & 35 deletions src/chainstate/stacks/db/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1501,7 +1501,7 @@ impl StacksChainState {
}
}

test_debug!(
debug!(
"Loaded microblock {}/{}-{} (parent={}, expect_seq={})",
&parent_consensus_hash,
&parent_anchored_block_hash,
Expand Down Expand Up @@ -1533,6 +1533,16 @@ impl StacksChainState {
}
}
ret.reverse();

if ret.len() > 0 {
// should start with 0
if ret[0].header.sequence != 0 {
warn!("Invalid microblock stream from {}/{} to {}: sequence does not start with 0, but with {}",
parent_consensus_hash, parent_anchored_block_hash, tip_microblock_hash, ret[0].header.sequence);

return Ok(None);
}
}
Ok(Some(ret))
}

Expand Down Expand Up @@ -1617,10 +1627,11 @@ impl StacksChainState {
return Ok(None);
}

let mut ret = vec![];
let mut ret: Vec<StacksMicroblock> = vec![];
let mut tip: Option<StacksMicroblock> = None;
let mut fork_poison = None;
let mut expected_sequence = start_seq;
let mut parents: HashMap<BlockHeaderHash, usize> = HashMap::new();

// load associated staging microblock data, but best-effort.
// Stop loading once we find a fork juncture.
Expand Down Expand Up @@ -1657,6 +1668,22 @@ impl StacksChainState {
break;
}

if let Some(idx) = parents.get(&mblock.header.prev_block) {
let conflict = ret[*idx].clone();
warn!(
"Microblock fork found: microblocks {} and {} share parent {}",
mblock.block_hash(),
conflict.block_hash(),
&mblock.header.prev_block
);
fork_poison = Some(TransactionPayload::PoisonMicroblock(
mblock.header,
conflict.header,
));
ret.pop(); // last microblock pushed (i.e. the tip) conflicts with mblock
break;
}

// expect forks, so expected_sequence may not always increase
expected_sequence =
cmp::min(mblock.header.sequence, expected_sequence).saturating_add(1);
Expand All @@ -1677,6 +1704,10 @@ impl StacksChainState {
}

tip = Some(mblock.clone());

let prev_block = mblock.header.prev_block.clone();
parents.insert(prev_block, ret.len());

ret.push(mblock);
}
if fork_poison.is_none() && ret.len() == 0 {
Expand Down Expand Up @@ -3453,6 +3484,20 @@ impl StacksChainState {
Ok(count - to_write)
}

/// Check whether or not there exists a Stacks block at or higher than a given height that is
/// unprocessed. This is used by miners to determine whether or not the block-commit they're
/// about to send is about to be invalidated
pub fn has_higher_unprocessed_blocks(conn: &DBConn, height: u64) -> Result<bool, Error> {
let sql =
"SELECT 1 FROM staging_blocks WHERE orphaned = 0 AND processed = 0 AND height >= ?1";
let args: &[&dyn ToSql] = &[&u64_to_sql(height)?];
let res = conn
.query_row(sql, args, |_r| Ok(()))
.optional()
.map(|x| x.is_some())?;
Ok(res)
}

fn extract_signed_microblocks(
parent_anchored_block_header: &StacksBlockHeader,
microblocks: &Vec<StacksMicroblock>,
Expand Down Expand Up @@ -3793,6 +3838,49 @@ impl StacksChainState {
Ok(Some((block_commit.burn_fee, sortition_burns)))
}

/// Do we already have an anchored block?
pub fn has_anchored_block(
conn: &DBConn,
blocks_path: &str,
consensus_hash: &ConsensusHash,
block: &StacksBlock,
) -> Result<bool, Error> {
let index_block_hash =
StacksBlockHeader::make_index_block_hash(consensus_hash, &block.block_hash());
if StacksChainState::has_stored_block(
&conn,
blocks_path,
consensus_hash,
&block.block_hash(),
)? {
debug!(
"Block already stored and processed: {}/{} ({})",
consensus_hash,
&block.block_hash(),
&index_block_hash
);
return Ok(true);
} else if StacksChainState::has_staging_block(conn, consensus_hash, &block.block_hash())? {
debug!(
"Block already stored (but not processed): {}/{} ({})",
consensus_hash,
&block.block_hash(),
&index_block_hash
);
return Ok(true);
} else if StacksChainState::has_block_indexed(&blocks_path, &index_block_hash)? {
debug!(
"Block already stored to chunk store: {}/{} ({})",
consensus_hash,
&block.block_hash(),
&index_block_hash
);
return Ok(true);
}

Ok(false)
}

/// Pre-process and store an anchored block to staging, queuing it up for
/// subsequent processing once all of its ancestors have been processed.
///
Expand Down Expand Up @@ -3828,43 +3916,21 @@ impl StacksChainState {
let mainnet = self.mainnet;
let chain_id = self.chain_id;
let blocks_path = self.blocks_path.clone();
let mut block_tx = self.db_tx_begin()?;

// already in queue or already processed?
let index_block_hash =
StacksBlockHeader::make_index_block_hash(consensus_hash, &block.block_hash());
if StacksChainState::has_stored_block(
&block_tx,
&blocks_path,
// optimistic check (before opening a tx): already in queue or already processed?
if StacksChainState::has_anchored_block(
self.db(),
&self.blocks_path,
consensus_hash,
&block.block_hash(),
)? {
debug!(
"Block already stored and processed: {}/{} ({})",
consensus_hash,
&block.block_hash(),
&index_block_hash
);
return Ok(false);
} else if StacksChainState::has_staging_block(
&block_tx,
consensus_hash,
&block.block_hash(),
block,
)? {
debug!(
"Block already stored (but not processed): {}/{} ({})",
consensus_hash,
&block.block_hash(),
&index_block_hash
);
return Ok(false);
} else if StacksChainState::has_block_indexed(&blocks_path, &index_block_hash)? {
debug!(
"Block already stored to chunk store: {}/{} ({})",
consensus_hash,
&block.block_hash(),
&index_block_hash
);
}

let mut block_tx = self.db_tx_begin()?;

// already in queue or already processed (within the tx; things might have changed)
if StacksChainState::has_anchored_block(&block_tx, &blocks_path, consensus_hash, block)? {
return Ok(false);
}

Expand Down
2 changes: 1 addition & 1 deletion src/chainstate/stacks/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl DBConfig {
pub fn supports_epoch(&self, epoch_id: StacksEpochId) -> bool {
match epoch_id {
StacksEpochId::Epoch10 => false,
StacksEpochId::Epoch20 => (self.version == "1" || self.version == "2"),
StacksEpochId::Epoch20 => self.version == "1" || self.version == "2",
StacksEpochId::Epoch2_05 => self.version == "2",
}
}
Expand Down
Loading

0 comments on commit 9d89725

Please sign in to comment.