Skip to content

Commit

Permalink
extend vcc endpoint to pass sp blue score, group up transactions by m…
Browse files Browse the repository at this point in the history
…erging blocks
  • Loading branch information
biryukovmaxim committed Dec 24, 2024
1 parent 8fe4663 commit 5ffbf31
Show file tree
Hide file tree
Showing 20 changed files with 188 additions and 100 deletions.
8 changes: 6 additions & 2 deletions components/consensusmanager/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use kaspa_consensus_core::{
use kaspa_utils::sync::rwlock::*;
use std::{ops::Deref, sync::Arc};

pub use tokio::task::spawn_blocking;

use crate::BlockProcessingBatch;
use kaspa_consensus_core::header::CompactHeaderData;
pub use tokio::task::spawn_blocking;

#[allow(dead_code)]
#[derive(Clone)]
Expand Down Expand Up @@ -358,6 +358,10 @@ impl ConsensusSessionOwned {
self.clone().spawn_blocking(move |c| c.get_block(hash)).await
}

pub async fn async_get_compact_header_data(&self, hash: Hash) -> ConsensusResult<CompactHeaderData> {
self.clone().spawn_blocking(move |c| c.get_compact_header_data(hash)).await
}

pub async fn async_get_block_even_if_header_only(&self, hash: Hash) -> ConsensusResult<Block> {
self.clone().spawn_blocking(move |c| c.get_block_even_if_header_only(hash)).await
}
Expand Down
16 changes: 10 additions & 6 deletions consensus/core/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
use futures_util::future::BoxFuture;
use kaspa_muhash::MuHash;
use std::sync::Arc;

use crate::{
acceptance_data::AcceptanceData,
api::args::{TransactionValidationArgs, TransactionValidationBatchArgs},
Expand All @@ -22,7 +18,11 @@ use crate::{
tx::{MutableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
BlockHashSet, BlueWorkType, ChainPath,
};
use consensus_core::header::CompactHeaderData;
use futures_util::future::BoxFuture;
use kaspa_hashes::Hash;
use kaspa_muhash::MuHash;
use std::sync::Arc;

pub use self::stats::{BlockCount, ConsensusStats};

Expand Down Expand Up @@ -141,7 +141,7 @@ pub trait ConsensusApi: Send + Sync {
unimplemented!()
}

/// source refers to the earliest block from which the current node has full header & block data
/// source refers to the earliest block from which the current node has full header & block data
fn get_source(&self) -> Hash {
unimplemented!()
}
Expand All @@ -159,7 +159,7 @@ pub trait ConsensusApi: Send + Sync {

/// Gets the virtual chain paths from `low` to the `sink` hash, or until `chain_path_added_limit` is reached
///
/// Note:
/// Note:
/// 1) `chain_path_added_limit` will populate removed fully, and then the added chain path, up to `chain_path_added_limit` amount of hashes.
/// 1.1) use `None to impose no limit with optimized backward chain iteration, for better performance in cases where batching is not required.
fn get_virtual_chain_from_block(&self, low: Hash, chain_path_added_limit: Option<usize>) -> ConsensusResult<ChainPath> {
Expand Down Expand Up @@ -275,6 +275,10 @@ pub trait ConsensusApi: Send + Sync {
unimplemented!()
}

fn get_compact_header_data(&self, hash: Hash) -> ConsensusResult<CompactHeaderData> {
unimplemented!()
}

fn get_block_even_if_header_only(&self, hash: Hash) -> ConsensusResult<Block> {
unimplemented!()
}
Expand Down
16 changes: 16 additions & 0 deletions consensus/core/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,22 @@ impl MemSizeEstimator for Header {
}
}

#[derive(Clone, Copy, Serialize, Deserialize)]
pub struct CompactHeaderData {
pub daa_score: u64,
pub timestamp: u64,
pub bits: u32,
pub blue_score: u64,
}

impl MemSizeEstimator for CompactHeaderData {}

impl From<&Header> for CompactHeaderData {
fn from(header: &Header) -> Self {
Self { daa_score: header.daa_score, timestamp: header.timestamp, bits: header.bits, blue_score: header.blue_score }
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
6 changes: 5 additions & 1 deletion consensus/notify/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl NotificationTrait for Notification {
removed_chain_block_hashes: payload.removed_chain_block_hashes.clone(),
added_chain_block_hashes: payload.added_chain_block_hashes.clone(),
added_chain_blocks_acceptance_data: Arc::new(vec![]),
added_chain_block_blue_scores: Arc::new(vec![]),
}));
}
}
Expand Down Expand Up @@ -107,15 +108,18 @@ impl BlockAddedNotification {
pub struct VirtualChainChangedNotification {
pub added_chain_block_hashes: Arc<Vec<Hash>>,
pub removed_chain_block_hashes: Arc<Vec<Hash>>,
pub added_chain_block_blue_scores: Arc<Vec<u64>>,

pub added_chain_blocks_acceptance_data: Arc<Vec<Arc<AcceptanceData>>>,
}
impl VirtualChainChangedNotification {
pub fn new(
added_chain_block_hashes: Arc<Vec<Hash>>,
removed_chain_block_hashes: Arc<Vec<Hash>>,
added_chain_blocks_acceptance_data: Arc<Vec<Arc<AcceptanceData>>>,
added_chain_block_blue_scores: Arc<Vec<u64>>,
) -> Self {
Self { added_chain_block_hashes, removed_chain_block_hashes, added_chain_blocks_acceptance_data }
Self { added_chain_block_hashes, removed_chain_block_hashes, added_chain_block_blue_scores, added_chain_blocks_acceptance_data }
}
}

Expand Down
16 changes: 13 additions & 3 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
acceptance_data::AcceptanceDataStoreReader,
block_transactions::BlockTransactionsStoreReader,
ghostdag::{GhostdagData, GhostdagStoreReader},
headers::{CompactHeaderData, HeaderStoreReader},
headers::HeaderStoreReader,
headers_selected_tip::HeadersSelectedTipStoreReader,
past_pruning_points::PastPruningPointsStoreReader,
pruning::PruningStoreReader,
Expand Down Expand Up @@ -60,14 +60,20 @@ use kaspa_consensus_core::{
pruning::PruningImportError,
tx::TxResult,
},
header::Header,
header::{
Header,
CompactHeaderData
},
merkle::calc_hash_merkle_root,
muhash::MuHashExtensions,
network::NetworkType,
pruning::{PruningPointProof, PruningPointTrustedData, PruningPointsList, PruningProofMetadata},
trusted::{ExternalGhostdagData, TrustedBlock},
tx::{MutableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
BlockHashSet, BlueWorkType, ChainPath, HashMapCustomHasher,
BlockHashSet,
BlueWorkType,
ChainPath,
HashMapCustomHasher,
};
use kaspa_consensus_notify::root::ConsensusNotificationRoot;

Expand Down Expand Up @@ -884,6 +890,10 @@ impl ConsensusApi for Consensus {
})
}

fn get_compact_header_data(&self, hash: Hash) -> ConsensusResult<CompactHeaderData> {
self.headers_store.get_compact_header_data(hash).unwrap_option().ok_or(ConsensusError::BlockNotFound(hash))
}

fn get_block_even_if_header_only(&self, hash: Hash) -> ConsensusResult<Block> {
let Some(status) = self.statuses_store.read().get(hash).unwrap_option().filter(|&status| status.has_block_header()) else {
return Err(ConsensusError::HeaderNotFound(hash));
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/consensus/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
daa::DbDaaStore,
depth::DbDepthStore,
ghostdag::{CompactGhostdagData, DbGhostdagStore},
headers::{CompactHeaderData, DbHeadersStore},
headers::DbHeadersStore,
headers_selected_tip::DbHeadersSelectedTipStore,
past_pruning_points::DbPastPruningPointsStore,
pruning::DbPruningStore,
Expand All @@ -27,6 +27,7 @@ use crate::{

use super::cache_policy_builder::CachePolicyBuilder as PolicyBuilder;
use itertools::Itertools;
use kaspa_consensus_core::header::CompactHeaderData;
use kaspa_consensus_core::{blockstatus::BlockStatus, BlockHashSet};
use kaspa_database::registry::DatabaseStorePrefixes;
use kaspa_hashes::Hash;
Expand Down
17 changes: 1 addition & 16 deletions consensus/src/model/stores/headers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use kaspa_consensus_core::header::CompactHeaderData;
use kaspa_consensus_core::{header::Header, BlockHasher, BlockLevel};
use kaspa_database::prelude::{BatchDbWriter, CachedDbAccess};
use kaspa_database::prelude::{CachePolicy, DB};
Expand Down Expand Up @@ -38,22 +39,6 @@ pub trait HeaderStore: HeaderStoreReader {
fn delete(&self, hash: Hash) -> Result<(), StoreError>;
}

#[derive(Clone, Copy, Serialize, Deserialize)]
pub struct CompactHeaderData {
pub daa_score: u64,
pub timestamp: u64,
pub bits: u32,
pub blue_score: u64,
}

impl MemSizeEstimator for CompactHeaderData {}

impl From<&Header> for CompactHeaderData {
fn from(header: &Header) -> Self {
Self { daa_score: header.daa_score, timestamp: header.timestamp, bits: header.bits, blue_score: header.blue_score }
}
}

/// A DB + cache implementation of `HeaderStore` trait, with concurrency support.
#[derive(Clone)]
pub struct DbHeadersStore {
Expand Down
4 changes: 4 additions & 0 deletions consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,15 @@ impl VirtualStateProcessor {
// check for subscriptions before the heavy lifting
let added_chain_blocks_acceptance_data =
chain_path.added.iter().copied().map(|added| self.acceptance_data_store.get(added).unwrap()).collect_vec();

let added_chain_block_blue_scores =
chain_path.added.iter().copied().map(|added| self.headers_store.get_blue_score(added).unwrap()).collect_vec();
self.notification_root
.notify(Notification::VirtualChainChanged(VirtualChainChangedNotification::new(
chain_path.added.into(),
chain_path.removed.into(),
Arc::new(added_chain_blocks_acceptance_data),
Arc::new(added_chain_block_blue_scores),
)))
.expect("expecting an open unbounded channel");
}
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/processes/parents_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ mod tests {
use kaspa_database::prelude::{ReadLock, StoreError, StoreResult};
use kaspa_hashes::Hash;
use parking_lot::RwLock;
use kaspa_consensus_core::header::CompactHeaderData;

struct HeaderStoreMock {
map: RwLock<BlockHashMap<HeaderWithBlockLevel>>,
Expand Down Expand Up @@ -252,7 +253,7 @@ mod tests {
fn get_compact_header_data(
&self,
hash: kaspa_hashes::Hash,
) -> Result<crate::model::stores::headers::CompactHeaderData, StoreError> {
) -> Result<CompactHeaderData, StoreError> {
unimplemented!()
}

Expand Down
6 changes: 3 additions & 3 deletions rpc/core/src/api/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum Notification {
#[display(fmt = "BlockAdded notification: block hash {}", "_0.block.header.hash")]
BlockAdded(BlockAddedNotification),

#[display(fmt = "VirtualChainChanged notification: {} removed blocks, {} added blocks, {} accepted transactions", "_0.removed_chain_block_hashes.len()", "_0.added_chain_block_hashes.len()", "_0.accepted_transaction_ids.len()")]
#[display(fmt = "VirtualChainChanged notification: {} removed blocks, {} added blocks, {} accepted transactions", "_0.removed_chain_block_hashes.len()", "_0.added_chain_block_hashes.len()", "_0.added_acceptance_data.len()")]
VirtualChainChanged(VirtualChainChangedNotification),

#[display(fmt = "FinalityConflict notification: violating block hash {}", "_0.violating_block_hash")]
Expand Down Expand Up @@ -84,11 +84,11 @@ impl NotificationTrait for Notification {
match subscription.active() {
true => {
if let Notification::VirtualChainChanged(ref payload) = self {
if !subscription.include_accepted_transaction_ids() && !payload.accepted_transaction_ids.is_empty() {
if !subscription.include_accepted_transaction_ids() && !payload.added_acceptance_data.is_empty() {
return Some(Notification::VirtualChainChanged(VirtualChainChangedNotification {
removed_chain_block_hashes: payload.removed_chain_block_hashes.clone(),
added_chain_block_hashes: payload.added_chain_block_hashes.clone(),
accepted_transaction_ids: Arc::new(vec![]),
added_acceptance_data: Arc::new(vec![]),
}));
}
}
Expand Down
28 changes: 15 additions & 13 deletions rpc/core/src/convert/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
use crate::{
convert::utxo::utxo_set_into_rpc, BlockAddedNotification, FinalityConflictNotification, FinalityConflictResolvedNotification,
NewBlockTemplateNotification, Notification, PruningPointUtxoSetOverrideNotification, RpcAcceptedTransactionIds,
SinkBlueScoreChangedNotification, UtxosChangedNotification, VirtualChainChangedNotification, VirtualDaaScoreChangedNotification,
NewBlockTemplateNotification, Notification, PruningPointUtxoSetOverrideNotification, RpcAcceptanceData,
RpcMergesetBlockAcceptanceData, SinkBlueScoreChangedNotification, UtxosChangedNotification, VirtualChainChangedNotification,
VirtualDaaScoreChangedNotification,
};
use kaspa_consensus_core::acceptance_data::{AcceptedTxEntry, MergesetBlockAcceptanceData};
use kaspa_consensus_notify::notification as consensus_notify;
use kaspa_index_core::notification as index_notify;
use std::sync::Arc;

// ----------------------------------------------------------------------------
// consensus_core to rpc_core
// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -49,19 +50,20 @@ impl From<&consensus_notify::VirtualChainChangedNotification> for VirtualChainCh
// If acceptance data array is empty, it means that the subscription was set to not
// include accepted_transaction_ids. Otherwise, we expect acceptance data to correlate
// with the added chain block hashes
accepted_transaction_ids: Arc::new(if item.added_chain_blocks_acceptance_data.is_empty() {
added_acceptance_data: Arc::new(if item.added_chain_blocks_acceptance_data.is_empty() {
vec![]
} else {
item.added_chain_block_hashes
item.added_chain_blocks_acceptance_data
.iter()
.zip(item.added_chain_blocks_acceptance_data.iter())
.map(|(hash, acceptance_data)| RpcAcceptedTransactionIds {
accepting_block_hash: hash.to_owned(),
// We collect accepted tx ids from all mergeset blocks
accepted_transaction_ids: acceptance_data
.iter()
.flat_map(|x| x.accepted_transactions.iter().map(|tx| tx.transaction_id))
.collect(),
.zip(item.added_chain_block_blue_scores.iter())
.map(|(acceptance_data, &accepting_blue_score)| RpcAcceptanceData {
accepting_blue_score,
mergeset_block_acceptance_data: acceptance_data.iter().map(|MergesetBlockAcceptanceData{ block_hash, accepted_transactions }| {
let mut accepted_transactions:Vec<_> = accepted_transactions.to_vec();
accepted_transactions.sort_unstable_by_key(|entry| entry.index_within_block);
RpcMergesetBlockAcceptanceData{ merged_block_hash: *block_hash, accepted_transaction_ids: accepted_transactions.into_iter().map(|AcceptedTxEntry{ transaction_id, .. }| transaction_id).collect() }
}).collect()

})
.collect()
}),
Expand Down
Loading

0 comments on commit 5ffbf31

Please sign in to comment.