Skip to content

Commit

Permalink
feat: add rx support
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler committed Aug 19, 2024
1 parent 94404b4 commit 1a69bc8
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 28 deletions.
123 changes: 97 additions & 26 deletions src/server/grpc/p2pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::cmp::max;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

Expand All @@ -10,7 +12,11 @@ use minotari_app_grpc::tari_rpc::{
GetNewBlockResponse, GetNewBlockTemplateWithCoinbasesRequest, HeightRequest, NewBlockTemplateRequest, PowAlgo,
SubmitBlockRequest, SubmitBlockResponse,
};
use tari_core::proof_of_work::sha3x_difficulty;
use tari_common_types::types::FixedHash;
use tari_core::blocks::BlockHeader;
use tari_core::consensus::ConsensusManager;
use tari_core::proof_of_work::{PowAlgorithm, randomx_difficulty, sha3x_difficulty};
use tari_core::proof_of_work::randomx_factory::RandomXFactory;
use tokio::sync::Mutex;
use tonic::{Code, Request, Response, Status};

Expand All @@ -36,6 +42,14 @@ where
/// Current share chain
share_chain: Arc<S>,
sync_in_progress: Arc<AtomicBool>,
random_xfactory: RandomXFactory,
consensus_manager: ConsensusManager,
genesis_block_hash: FixedHash,
block_height_difficulty_cache: Arc<Mutex<HashMap<u64, u64>>>,
// TODO: refactor into a stats service
stats_max_difficulty_since_last_success: Arc<Mutex<u64>>,
stats_accepted_by_main_chain: Arc<Mutex<u64>>,
stats_rejected_by_main_chain: Arc<Mutex<u64>>,
}

impl<S> ShaP2PoolGrpc<S>
Expand All @@ -47,18 +61,29 @@ where
p2p_client: p2p::ServiceClient,
share_chain: Arc<S>,
sync_in_progress: Arc<AtomicBool>,
random_xfactory: RandomXFactory,
consensus_manager: ConsensusManager,
genesis_block_hash: FixedHash
) -> Result<Self, Error> {
Ok(Self {
client: Arc::new(Mutex::new(util::connect_base_node(base_node_address).await?)),
p2p_client,
share_chain,
sync_in_progress,
random_xfactory,
consensus_manager,
genesis_block_hash,
block_height_difficulty_cache: Arc::new(Mutex::new(HashMap::new())),
stats_max_difficulty_since_last_success: Arc::new(Mutex::new(0)),
stats_accepted_by_main_chain: Arc::new(Mutex::new(0)),
stats_rejected_by_main_chain: Arc::new(Mutex::new(0)),
})
}

/// Submits a new block to share chain and broadcasts to the p2p network.
pub async fn submit_share_chain_block(&self, block: &Block) -> Result<(), Status> {
if self.sync_in_progress.load(Ordering::Relaxed) {
warn!(target: LOG_TARGET, "Share chain syncing is in progress...");
return Err(Status::new(Code::Unavailable, "Share chain syncing is in progress..."));
}

Expand All @@ -85,11 +110,13 @@ where
_request: Request<GetNewBlockRequest>,
) -> Result<Response<GetNewBlockResponse>, Status> {
if self.sync_in_progress.load(Ordering::Relaxed) {
warn!(target: LOG_TARGET, "Share chain syncing is in progress...");
return Err(Status::new(Code::Unavailable, "Share chain syncing is in progress..."));
}

let mut pow_algo = PowAlgo::default();
pow_algo.set_pow_algo(PowAlgos::Sha3x);
// TODO: use config
pow_algo.set_pow_algo(PowAlgos::Randomx);

// request original block template to get reward
let req = NewBlockTemplateRequest {
Expand All @@ -105,7 +132,7 @@ where
// request new block template with shares as coinbases
let shares = self.share_chain.generate_shares(reward).await;

let response = self
let mut response = self
.client
.lock()
.await
Expand All @@ -118,11 +145,17 @@ where
.into_inner();

// set target difficulty
let miner_data = response
let mut miner_data = response
.clone()
.miner_data
.ok_or_else(|| Status::internal("missing miner data"))?;
if let Some(header) = &response.block {
let height = header.header.as_ref().map(|h| h.height).unwrap_or(0);
self.block_height_difficulty_cache.lock().await.insert(height, miner_data.target_difficulty);
}
let target_difficulty = miner_data.target_difficulty / SHARE_COUNT;
miner_data.target_difficulty = target_difficulty;
response.miner_data = Some(miner_data);

Ok(Response::new(GetNewBlockResponse {
block: Some(response),
Expand All @@ -137,7 +170,9 @@ where
&self,
request: Request<SubmitBlockRequest>,
) -> Result<Response<SubmitBlockResponse>, Status> {
dbg!("Submit received");
if self.sync_in_progress.load(Ordering::Relaxed) {
warn!(target: LOG_TARGET, "Share chain syncing is in progress...");
return Err(Status::new(Code::Unavailable, "Share chain syncing is in progress..."));
}

Expand All @@ -156,47 +191,83 @@ where

// Check block's difficulty compared to the latest network one to increase the probability
// to get the block accepted (and also a block with lower difficulty than latest one is invalid anyway).
let request_block_difficulty =
sha3x_difficulty(origin_block_header).map_err(|error| Status::internal(error.to_string()))?;
let mut network_difficulty_stream = self
.client
.lock()
.await
.get_network_difficulty(HeightRequest {
from_tip: 0,
start_height: origin_block_header.height - 1,
end_height: origin_block_header.height,
})
.await?
.into_inner();
let mut network_difficulty_matches = false;
while let Ok(Some(diff_resp)) = network_difficulty_stream.message().await {
if origin_block_header.height == diff_resp.height + 1
&& request_block_difficulty.as_u64() >= diff_resp.difficulty
{
network_difficulty_matches = true;
}
let request_block_difficulty = match origin_block_header.pow.pow_algo {
PowAlgorithm::Sha3x => {
sha3x_difficulty(origin_block_header).map_err(|error| Status::internal(error.to_string()))?
},
PowAlgorithm::RandomX => randomx_difficulty(
origin_block_header,
&self.random_xfactory,
&self.genesis_block_hash,
&self.consensus_manager
).map_err(|error| Status::internal(error.to_string()))?,
};
// TODO: Cache this so that we don't ask each time. If we have a block we should not
// waste time before submitting it, or we might lose a share
// let mut network_difficulty_stream = self
// .client
// .lock()
// .await
// .get_network_difficulty(HeightRequest {
// from_tip: 0,
// start_height: origin_block_header.height - 1,
// end_height: origin_block_header.height,
// })
// .await?
// .into_inner();
// let mut network_difficulty_matches = false;
// while let Ok(Some(diff_resp)) = network_difficulty_stream.message().await {
// dbg!("Diff resp: {:?}", &diff_resp);
// if origin_block_header.height == diff_resp.height + 1
// && request_block_difficulty.as_u64() >= diff_resp.difficulty
// {
// network_difficulty_matches = true;
// }
// }
let network_difficulty_matches = match self.block_height_difficulty_cache.lock().await.get(&(origin_block_header.height)) {
Some(difficulty) => request_block_difficulty.as_u64() >= *difficulty,
None => false,
};
let mut max_difficulty = self.stats_max_difficulty_since_last_success.lock().await;
if *max_difficulty < request_block_difficulty.as_u64() {
*max_difficulty = request_block_difficulty.as_u64();
}

let mut accepted = self.stats_accepted_by_main_chain.lock().await;
let mut rejected = self.stats_rejected_by_main_chain.lock().await;
info!("Submit stats... max/accepted/rejected: {}/{}/{}", max_difficulty, accepted, rejected);

if !network_difficulty_matches {
block.set_sent_to_main_chain(false);
self.submit_share_chain_block(&block).await?;
// Don't error if we can't submit it.
match self.submit_share_chain_block(&block).await {
Ok(_) => {
info!("🔗 Block submitted to share chain!");
},
Err(error) => {
warn!("Failed to submit block to share chain: {error:?}");
},
};
return Ok(Response::new(SubmitBlockResponse {
block_hash: block.hash().to_vec(),
}));
}

// submit block to base node
let (metadata, extensions, _inner) = request.into_parts();
info!("🔗 Submitting block to base node...");
let grpc_request = Request::from_parts(metadata, extensions, grpc_request_payload);
match self.client.lock().await.submit_block(grpc_request).await {
match self.client.lock().await.submit_block(grpc_request).await {
Ok(resp) => {
*accepted += 1;
*max_difficulty = 0;
info!("💰 New matching block found and sent to network!");
block.set_sent_to_main_chain(true);
self.submit_share_chain_block(&block).await?;
Ok(resp)
},
Err(_) => {
*rejected += 1;
block.set_sent_to_main_chain(false);
self.submit_share_chain_block(&block).await?;
Ok(Response::new(SubmitBlockResponse {
Expand Down
12 changes: 10 additions & 2 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ where
/// Note: this is a "stop-the-world" operation, many operations are skipped when synchronizing.
async fn sync_share_chain(&mut self) {
if self.sync_in_progress.load(Ordering::Relaxed) {
debug!("Sync already in progress...");
warn!(target: LOG_TARGET, "Sync already in progress...");
return;
}
self.sync_in_progress.store(true, Ordering::Relaxed);
Expand Down Expand Up @@ -602,6 +602,14 @@ where
self.swarm.behaviour_mut().kademlia.remove_peer(&exp_peer);
self.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&exp_peer);
}
match self.share_chain.tip_height().await {
Ok(tip) => {
info!(target: LOG_TARGET, "Share chain: {tip:?}");
},
Err(error) => {
error!(target: LOG_TARGET, "Failed to show tip height: {error:?}");
},
}

// broadcast peer info
debug!(target: LOG_TARGET, "Peer count: {:?}", self.peer_store.peer_count().await);
Expand Down Expand Up @@ -681,7 +689,7 @@ where
let peer_store = self.peer_store.clone();
let share_chain = self.share_chain.clone();
let share_chain_sync_tx = self.share_chain_sync_tx.clone();
tokio::spawn(async move {
let handle = tokio::spawn(async move {
Self::initial_share_chain_sync(
in_progress,
peer_store,
Expand Down
11 changes: 11 additions & 0 deletions src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use std::{

use log::{error, info};
use minotari_app_grpc::tari_rpc::{base_node_server::BaseNodeServer, sha_p2_pool_server::ShaP2PoolServer};
use tari_common::configuration::Network;
use tari_core::consensus::ConsensusManager;
use tari_core::proof_of_work::randomx_factory::RandomXFactory;
use thiserror::Error;

use crate::{
Expand All @@ -31,6 +34,8 @@ pub enum Error {
Grpc(#[from] grpc::error::Error),
#[error("Socket address parse error: {0}")]
AddrParse(#[from] AddrParseError),
#[error("Consensus manager error: {0}")]
ConsensusBuilderError(#[from] tari_core::consensus::ConsensusBuilderError),
}

/// Server represents the server running all the necessary components for sha-p2pool.
Expand Down Expand Up @@ -60,6 +65,9 @@ where

let mut base_node_grpc_server = None;
let mut p2pool_server = None;
let randomx_factory = RandomXFactory::new(1);
let consensus_manager = ConsensusManager::builder(Network::default()).build()?;
let genesis_block_hash = consensus_manager.get_genesis_block().hash().clone();
if config.mining_enabled {
let base_node_grpc_service = TariBaseNodeGrpc::new(config.base_node_address.clone())
.await
Expand All @@ -71,6 +79,9 @@ where
p2p_service.client(),
share_chain.clone(),
sync_in_progress.clone(),
randomx_factory,
consensus_manager,
genesis_block_hash,
)
.await
.map_err(Error::Grpc)?;
Expand Down

0 comments on commit 1a69bc8

Please sign in to comment.