From 1a69bc87d773ea3b2acc8dc67768a5afc07375ee Mon Sep 17 00:00:00 2001 From: stringhandler Date: Mon, 19 Aug 2024 14:51:06 +0200 Subject: [PATCH] feat: add rx support --- src/server/grpc/p2pool.rs | 123 ++++++++++++++++++++++++++++++-------- src/server/p2p/network.rs | 12 +++- src/server/server.rs | 11 ++++ 3 files changed, 118 insertions(+), 28 deletions(-) diff --git a/src/server/grpc/p2pool.rs b/src/server/grpc/p2pool.rs index fac5306e..ab561d31 100644 --- a/src/server/grpc/p2pool.rs +++ b/src/server/grpc/p2pool.rs @@ -1,6 +1,8 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause +use std::cmp::max; +use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -10,7 +12,11 @@ use minotari_app_grpc::tari_rpc::{ GetNewBlockResponse, GetNewBlockTemplateWithCoinbasesRequest, HeightRequest, NewBlockTemplateRequest, PowAlgo, SubmitBlockRequest, SubmitBlockResponse, }; -use tari_core::proof_of_work::sha3x_difficulty; +use tari_common_types::types::FixedHash; +use tari_core::blocks::BlockHeader; +use tari_core::consensus::ConsensusManager; +use tari_core::proof_of_work::{PowAlgorithm, randomx_difficulty, sha3x_difficulty}; +use tari_core::proof_of_work::randomx_factory::RandomXFactory; use tokio::sync::Mutex; use tonic::{Code, Request, Response, Status}; @@ -36,6 +42,14 @@ where /// Current share chain share_chain: Arc, sync_in_progress: Arc, + random_xfactory: RandomXFactory, + consensus_manager: ConsensusManager, + genesis_block_hash: FixedHash, + block_height_difficulty_cache: Arc>>, + // TODO: refactor into a stats service + stats_max_difficulty_since_last_success: Arc>, + stats_accepted_by_main_chain: Arc>, + stats_rejected_by_main_chain: Arc>, } impl ShaP2PoolGrpc @@ -47,18 +61,29 @@ where p2p_client: p2p::ServiceClient, share_chain: Arc, sync_in_progress: Arc, + random_xfactory: RandomXFactory, + consensus_manager: ConsensusManager, + genesis_block_hash: FixedHash ) -> Result { Ok(Self { client: Arc::new(Mutex::new(util::connect_base_node(base_node_address).await?)), p2p_client, share_chain, sync_in_progress, + random_xfactory, + consensus_manager, + genesis_block_hash, + block_height_difficulty_cache: Arc::new(Mutex::new(HashMap::new())), + stats_max_difficulty_since_last_success: Arc::new(Mutex::new(0)), + stats_accepted_by_main_chain: Arc::new(Mutex::new(0)), + stats_rejected_by_main_chain: Arc::new(Mutex::new(0)), }) } /// Submits a new block to share chain and broadcasts to the p2p network. pub async fn submit_share_chain_block(&self, block: &Block) -> Result<(), Status> { if self.sync_in_progress.load(Ordering::Relaxed) { + warn!(target: LOG_TARGET, "Share chain syncing is in progress..."); return Err(Status::new(Code::Unavailable, "Share chain syncing is in progress...")); } @@ -85,11 +110,13 @@ where _request: Request, ) -> Result, Status> { if self.sync_in_progress.load(Ordering::Relaxed) { + warn!(target: LOG_TARGET, "Share chain syncing is in progress..."); return Err(Status::new(Code::Unavailable, "Share chain syncing is in progress...")); } let mut pow_algo = PowAlgo::default(); - pow_algo.set_pow_algo(PowAlgos::Sha3x); + // TODO: use config + pow_algo.set_pow_algo(PowAlgos::Randomx); // request original block template to get reward let req = NewBlockTemplateRequest { @@ -105,7 +132,7 @@ where // request new block template with shares as coinbases let shares = self.share_chain.generate_shares(reward).await; - let response = self + let mut response = self .client .lock() .await @@ -118,11 +145,17 @@ where .into_inner(); // set target difficulty - let miner_data = response + let mut miner_data = response .clone() .miner_data .ok_or_else(|| Status::internal("missing miner data"))?; + if let Some(header) = &response.block { + let height = header.header.as_ref().map(|h| h.height).unwrap_or(0); + self.block_height_difficulty_cache.lock().await.insert(height, miner_data.target_difficulty); + } let target_difficulty = miner_data.target_difficulty / SHARE_COUNT; + miner_data.target_difficulty = target_difficulty; + response.miner_data = Some(miner_data); Ok(Response::new(GetNewBlockResponse { block: Some(response), @@ -137,7 +170,9 @@ where &self, request: Request, ) -> Result, Status> { + dbg!("Submit received"); if self.sync_in_progress.load(Ordering::Relaxed) { + warn!(target: LOG_TARGET, "Share chain syncing is in progress..."); return Err(Status::new(Code::Unavailable, "Share chain syncing is in progress...")); } @@ -156,31 +191,63 @@ where // Check block's difficulty compared to the latest network one to increase the probability // to get the block accepted (and also a block with lower difficulty than latest one is invalid anyway). - let request_block_difficulty = - sha3x_difficulty(origin_block_header).map_err(|error| Status::internal(error.to_string()))?; - let mut network_difficulty_stream = self - .client - .lock() - .await - .get_network_difficulty(HeightRequest { - from_tip: 0, - start_height: origin_block_header.height - 1, - end_height: origin_block_header.height, - }) - .await? - .into_inner(); - let mut network_difficulty_matches = false; - while let Ok(Some(diff_resp)) = network_difficulty_stream.message().await { - if origin_block_header.height == diff_resp.height + 1 - && request_block_difficulty.as_u64() >= diff_resp.difficulty - { - network_difficulty_matches = true; - } + let request_block_difficulty = match origin_block_header.pow.pow_algo { + PowAlgorithm::Sha3x => { + sha3x_difficulty(origin_block_header).map_err(|error| Status::internal(error.to_string()))? + }, + PowAlgorithm::RandomX => randomx_difficulty( + origin_block_header, + &self.random_xfactory, + &self.genesis_block_hash, + &self.consensus_manager + ).map_err(|error| Status::internal(error.to_string()))?, + }; + // TODO: Cache this so that we don't ask each time. If we have a block we should not + // waste time before submitting it, or we might lose a share + // let mut network_difficulty_stream = self + // .client + // .lock() + // .await + // .get_network_difficulty(HeightRequest { + // from_tip: 0, + // start_height: origin_block_header.height - 1, + // end_height: origin_block_header.height, + // }) + // .await? + // .into_inner(); + // let mut network_difficulty_matches = false; + // while let Ok(Some(diff_resp)) = network_difficulty_stream.message().await { + // dbg!("Diff resp: {:?}", &diff_resp); + // if origin_block_header.height == diff_resp.height + 1 + // && request_block_difficulty.as_u64() >= diff_resp.difficulty + // { + // network_difficulty_matches = true; + // } + // } + let network_difficulty_matches = match self.block_height_difficulty_cache.lock().await.get(&(origin_block_header.height)) { + Some(difficulty) => request_block_difficulty.as_u64() >= *difficulty, + None => false, + }; + let mut max_difficulty = self.stats_max_difficulty_since_last_success.lock().await; + if *max_difficulty < request_block_difficulty.as_u64() { + *max_difficulty = request_block_difficulty.as_u64(); } + let mut accepted = self.stats_accepted_by_main_chain.lock().await; + let mut rejected = self.stats_rejected_by_main_chain.lock().await; + info!("Submit stats... max/accepted/rejected: {}/{}/{}", max_difficulty, accepted, rejected); + if !network_difficulty_matches { block.set_sent_to_main_chain(false); - self.submit_share_chain_block(&block).await?; + // Don't error if we can't submit it. + match self.submit_share_chain_block(&block).await { + Ok(_) => { + info!("🔗 Block submitted to share chain!"); + }, + Err(error) => { + warn!("Failed to submit block to share chain: {error:?}"); + }, + }; return Ok(Response::new(SubmitBlockResponse { block_hash: block.hash().to_vec(), })); @@ -188,15 +255,19 @@ where // submit block to base node let (metadata, extensions, _inner) = request.into_parts(); + info!("🔗 Submitting block to base node..."); let grpc_request = Request::from_parts(metadata, extensions, grpc_request_payload); - match self.client.lock().await.submit_block(grpc_request).await { + match self.client.lock().await.submit_block(grpc_request).await { Ok(resp) => { + *accepted += 1; + *max_difficulty = 0; info!("💰 New matching block found and sent to network!"); block.set_sent_to_main_chain(true); self.submit_share_chain_block(&block).await?; Ok(resp) }, Err(_) => { + *rejected += 1; block.set_sent_to_main_chain(false); self.submit_share_chain_block(&block).await?; Ok(Response::new(SubmitBlockResponse { diff --git a/src/server/p2p/network.rs b/src/server/p2p/network.rs index 6505c8ea..b8b06eb4 100644 --- a/src/server/p2p/network.rs +++ b/src/server/p2p/network.rs @@ -428,7 +428,7 @@ where /// Note: this is a "stop-the-world" operation, many operations are skipped when synchronizing. async fn sync_share_chain(&mut self) { if self.sync_in_progress.load(Ordering::Relaxed) { - debug!("Sync already in progress..."); + warn!(target: LOG_TARGET, "Sync already in progress..."); return; } self.sync_in_progress.store(true, Ordering::Relaxed); @@ -602,6 +602,14 @@ where self.swarm.behaviour_mut().kademlia.remove_peer(&exp_peer); self.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&exp_peer); } + match self.share_chain.tip_height().await { + Ok(tip) => { + info!(target: LOG_TARGET, "Share chain: {tip:?}"); + }, + Err(error) => { + error!(target: LOG_TARGET, "Failed to show tip height: {error:?}"); + }, + } // broadcast peer info debug!(target: LOG_TARGET, "Peer count: {:?}", self.peer_store.peer_count().await); @@ -681,7 +689,7 @@ where let peer_store = self.peer_store.clone(); let share_chain = self.share_chain.clone(); let share_chain_sync_tx = self.share_chain_sync_tx.clone(); - tokio::spawn(async move { + let handle = tokio::spawn(async move { Self::initial_share_chain_sync( in_progress, peer_store, diff --git a/src/server/server.rs b/src/server/server.rs index 9bf928d6..dce1b082 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -10,6 +10,9 @@ use std::{ use log::{error, info}; use minotari_app_grpc::tari_rpc::{base_node_server::BaseNodeServer, sha_p2_pool_server::ShaP2PoolServer}; +use tari_common::configuration::Network; +use tari_core::consensus::ConsensusManager; +use tari_core::proof_of_work::randomx_factory::RandomXFactory; use thiserror::Error; use crate::{ @@ -31,6 +34,8 @@ pub enum Error { Grpc(#[from] grpc::error::Error), #[error("Socket address parse error: {0}")] AddrParse(#[from] AddrParseError), + #[error("Consensus manager error: {0}")] + ConsensusBuilderError(#[from] tari_core::consensus::ConsensusBuilderError), } /// Server represents the server running all the necessary components for sha-p2pool. @@ -60,6 +65,9 @@ where let mut base_node_grpc_server = None; let mut p2pool_server = None; + let randomx_factory = RandomXFactory::new(1); + let consensus_manager = ConsensusManager::builder(Network::default()).build()?; + let genesis_block_hash = consensus_manager.get_genesis_block().hash().clone(); if config.mining_enabled { let base_node_grpc_service = TariBaseNodeGrpc::new(config.base_node_address.clone()) .await @@ -71,6 +79,9 @@ where p2p_service.client(), share_chain.clone(), sync_in_progress.clone(), + randomx_factory, + consensus_manager, + genesis_block_hash, ) .await .map_err(Error::Grpc)?;