From 173cdb19dfcd04f38a03544e8f444b38daa4815a Mon Sep 17 00:00:00 2001 From: Kariy Date: Fri, 29 Sep 2023 00:52:16 +0700 Subject: [PATCH] refactor(katana-core): refactor service module commit-id:aac1f033 --- Cargo.lock | 6 +- crates/katana/core/src/backend/mod.rs | 6 +- crates/katana/core/src/sequencer.rs | 3 +- .../{service.rs => service/block_producer.rs} | 106 +---------------- crates/katana/core/src/service/mod.rs | 107 ++++++++++++++++++ 5 files changed, 121 insertions(+), 107 deletions(-) rename crates/katana/core/src/{service.rs => service/block_producer.rs} (82%) create mode 100644 crates/katana/core/src/service/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 57bc48b09b..3725ba9e77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4822,7 +4822,7 @@ dependencies = [ "regex-cache", "serde", "serde_derive", - "strum", + "strum 0.24.1", "thiserror", ] @@ -6488,7 +6488,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" dependencies = [ - "strum_macros", + "strum_macros 0.24.3", ] [[package]] @@ -6520,7 +6520,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.32", + "syn 2.0.37", ] [[package]] diff --git a/crates/katana/core/src/backend/mod.rs b/crates/katana/core/src/backend/mod.rs index 5b7ab19a17..a57690686a 100644 --- a/crates/katana/core/src/backend/mod.rs +++ b/crates/katana/core/src/backend/mod.rs @@ -48,7 +48,7 @@ use crate::env::{BlockContextGenerator, Env}; use crate::execution::{ExecutionOutcome, MaybeInvalidExecutedTransaction, TransactionExecutor}; use crate::fork::db::ForkedDb; use crate::sequencer_error::SequencerError; -use crate::service::MinedBlockOutcome; +use crate::service::block_producer::MinedBlockOutcome; use crate::utils::{convert_state_diff_to_rpc_state_diff, get_current_timestamp}; pub struct ExternalFunctionCall { @@ -218,8 +218,8 @@ impl Backend { /// Mines a new block based on the provided execution outcome. /// This method should only be called by the - /// [IntervalBlockProducer](crate::service::IntervalBlockProducer) when the node is running in - /// `interval` mining mode. + /// [IntervalBlockProducer](crate::service::block_producer::IntervalBlockProducer) when the node + /// is running in `interval` mining mode. pub async fn mine_pending_block( &self, execution_outcome: ExecutionOutcome, diff --git a/crates/katana/core/src/sequencer.rs b/crates/katana/core/src/sequencer.rs index 8011022257..4e0d87d55c 100644 --- a/crates/katana/core/src/sequencer.rs +++ b/crates/katana/core/src/sequencer.rs @@ -28,7 +28,8 @@ use crate::db::{AsStateRefDb, StateExtRef, StateRefDb}; use crate::execution::{MaybeInvalidExecutedTransaction, PendingState}; use crate::pool::TransactionPool; use crate::sequencer_error::SequencerError; -use crate::service::{BlockProducer, BlockProducerMode, NodeService, TransactionMiner}; +use crate::service::block_producer::{BlockProducer, BlockProducerMode}; +use crate::service::{NodeService, TransactionMiner}; use crate::utils::event::{ContinuationToken, ContinuationTokenError}; type SequencerResult = Result; diff --git a/crates/katana/core/src/service.rs b/crates/katana/core/src/service/block_producer.rs similarity index 82% rename from crates/katana/core/src/service.rs rename to crates/katana/core/src/service/block_producer.rs index 94407c6573..807e449866 100644 --- a/crates/katana/core/src/service.rs +++ b/crates/katana/core/src/service/block_producer.rs @@ -1,7 +1,3 @@ -// Code adapted from Foundry's Anvil - -//! background service - use std::collections::{HashMap, VecDeque}; use std::future::Future; use std::pin::Pin; @@ -10,11 +6,9 @@ use std::task::{Context, Poll}; use std::time::Duration; use blockifier::state::state_api::{State, StateReader}; -use futures::channel::mpsc::Receiver; -use futures::stream::{Fuse, Stream, StreamExt}; +use futures::stream::{Stream, StreamExt}; use futures::FutureExt; use parking_lot::RwLock; -use starknet::core::types::FieldElement; use tokio::time::{interval_at, Instant, Interval}; use tracing::trace; @@ -26,56 +20,10 @@ use crate::execution::{ create_execution_outcome, ExecutedTransaction, ExecutionOutcome, MaybeInvalidExecutedTransaction, PendingState, TransactionExecutor, }; -use crate::pool::TransactionPool; - -/// The type that drives the blockchain's state -/// -/// This service is basically an endless future that continuously polls the miner which returns -/// transactions for the next block, then those transactions are handed off to the [BlockProducer] -/// to construct a new block. -pub struct NodeService { - /// the pool that holds all transactions - pool: Arc, - /// creates new blocks - block_producer: BlockProducer, - /// the miner responsible to select transactions from the `pool´ - miner: TransactionMiner, -} - -impl NodeService { - pub fn new( - pool: Arc, - miner: TransactionMiner, - block_producer: BlockProducer, - ) -> Self { - Self { pool, block_producer, miner } - } -} - -impl Future for NodeService { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let pin = self.get_mut(); - // this drives block production and feeds new sets of ready transactions to the block - // producer - loop { - while let Poll::Ready(Some(outcome)) = pin.block_producer.poll_next_unpin(cx) { - trace!(target: "node", "mined block {}", outcome.block_number); - } - - if let Poll::Ready(transactions) = pin.miner.poll(&pin.pool, cx) { - // miner returned a set of transaction that we feed to the producer - pin.block_producer.queue(transactions); - } else { - // no progress made - break; - } - } - - Poll::Pending - } +pub struct MinedBlockOutcome { + pub block_number: u64, + pub transactions: Vec, } type ServiceFuture = Pin + Send + Sync>>; @@ -122,7 +70,7 @@ impl BlockProducer { } } - fn queue(&self, transactions: Vec) { + pub(super) fn queue(&self, transactions: Vec) { let mut mode = self.inner.write(); match &mut *mode { BlockProducerMode::Instant(producer) => producer.queued.push_back(transactions), @@ -439,6 +387,7 @@ impl Stream for InstantBlockProducer { // poll the mining future if let Some(mut mining) = pin.block_mining.take() { + println!("ohayo"); if let Poll::Ready(outcome) = mining.poll_unpin(cx) { return Poll::Ready(Some(outcome)); } else { @@ -449,46 +398,3 @@ impl Stream for InstantBlockProducer { Poll::Pending } } - -pub struct MinedBlockOutcome { - pub block_number: u64, - pub transactions: Vec, -} - -/// The type which takes the transaction from the pool and feeds them to the block producer. -pub struct TransactionMiner { - /// stores whether there are pending transacions (if known) - has_pending_txs: Option, - /// Receives hashes of transactions that are ready from the pool - rx: Fuse>, -} - -impl TransactionMiner { - pub fn new(rx: Receiver) -> Self { - Self { rx: rx.fuse(), has_pending_txs: None } - } - - fn poll( - &mut self, - pool: &Arc, - cx: &mut Context<'_>, - ) -> Poll> { - // drain the notification stream - while let Poll::Ready(Some(_)) = Pin::new(&mut self.rx).poll_next(cx) { - self.has_pending_txs = Some(true); - } - - if self.has_pending_txs == Some(false) { - return Poll::Pending; - } - - // take all the transactions from the pool - let transactions = pool.get_transactions(); - - if transactions.is_empty() { - return Poll::Pending; - } - - Poll::Ready(transactions) - } -} diff --git a/crates/katana/core/src/service/mod.rs b/crates/katana/core/src/service/mod.rs new file mode 100644 index 0000000000..3460640f9c --- /dev/null +++ b/crates/katana/core/src/service/mod.rs @@ -0,0 +1,107 @@ +// Code adapted from Foundry's Anvil + +//! background service + +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use futures::channel::mpsc::Receiver; +use futures::stream::{Fuse, Stream, StreamExt}; +use starknet::core::types::FieldElement; +use tracing::trace; + +use self::block_producer::BlockProducer; +use crate::backend::storage::transaction::Transaction; +use crate::pool::TransactionPool; + +pub mod block_producer; + +/// The type that drives the blockchain's state +/// +/// This service is basically an endless future that continuously polls the miner which returns +/// transactions for the next block, then those transactions are handed off to the [BlockProducer] +/// to construct a new block. +pub struct NodeService { + /// the pool that holds all transactions + pool: Arc, + /// creates new blocks + block_producer: BlockProducer, + /// the miner responsible to select transactions from the `pool´ + miner: TransactionMiner, +} + +impl NodeService { + pub fn new( + pool: Arc, + miner: TransactionMiner, + block_producer: BlockProducer, + ) -> Self { + Self { pool, block_producer, miner } + } +} + +impl Future for NodeService { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let pin = self.get_mut(); + + // this drives block production and feeds new sets of ready transactions to the block + // producer + loop { + while let Poll::Ready(Some(outcome)) = pin.block_producer.poll_next_unpin(cx) { + trace!(target: "node", "mined block {}", outcome.block_number); + } + + if let Poll::Ready(transactions) = pin.miner.poll(&pin.pool, cx) { + // miner returned a set of transaction that we feed to the producer + pin.block_producer.queue(transactions); + } else { + // no progress made + break; + } + } + + Poll::Pending + } +} + +/// The type which takes the transaction from the pool and feeds them to the block producer. +pub struct TransactionMiner { + /// stores whether there are pending transacions (if known) + has_pending_txs: Option, + /// Receives hashes of transactions that are ready from the pool + rx: Fuse>, +} + +impl TransactionMiner { + pub fn new(rx: Receiver) -> Self { + Self { rx: rx.fuse(), has_pending_txs: None } + } + + fn poll( + &mut self, + pool: &Arc, + cx: &mut Context<'_>, + ) -> Poll> { + // drain the notification stream + while let Poll::Ready(Some(_)) = Pin::new(&mut self.rx).poll_next(cx) { + self.has_pending_txs = Some(true); + } + + if self.has_pending_txs == Some(false) { + return Poll::Pending; + } + + // take all the transactions from the pool + let transactions = pool.get_transactions(); + + if transactions.is_empty() { + return Poll::Pending; + } + + Poll::Ready(transactions) + } +}