Skip to content

Commit

Permalink
refactor(katana-core): refactor service module
Browse files Browse the repository at this point in the history
commit-id:aac1f033
  • Loading branch information
kariy committed Sep 28, 2023
1 parent 503a775 commit 173cdb1
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 107 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions crates/katana/core/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::env::{BlockContextGenerator, Env};
use crate::execution::{ExecutionOutcome, MaybeInvalidExecutedTransaction, TransactionExecutor};
use crate::fork::db::ForkedDb;
use crate::sequencer_error::SequencerError;
use crate::service::MinedBlockOutcome;
use crate::service::block_producer::MinedBlockOutcome;
use crate::utils::{convert_state_diff_to_rpc_state_diff, get_current_timestamp};

pub struct ExternalFunctionCall {
Expand Down Expand Up @@ -218,8 +218,8 @@ impl Backend {

/// Mines a new block based on the provided execution outcome.
/// This method should only be called by the
/// [IntervalBlockProducer](crate::service::IntervalBlockProducer) when the node is running in
/// `interval` mining mode.
/// [IntervalBlockProducer](crate::service::block_producer::IntervalBlockProducer) when the node
/// is running in `interval` mining mode.
pub async fn mine_pending_block(
&self,
execution_outcome: ExecutionOutcome,
Expand Down
3 changes: 2 additions & 1 deletion crates/katana/core/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use crate::db::{AsStateRefDb, StateExtRef, StateRefDb};
use crate::execution::{MaybeInvalidExecutedTransaction, PendingState};
use crate::pool::TransactionPool;
use crate::sequencer_error::SequencerError;
use crate::service::{BlockProducer, BlockProducerMode, NodeService, TransactionMiner};
use crate::service::block_producer::{BlockProducer, BlockProducerMode};
use crate::service::{NodeService, TransactionMiner};
use crate::utils::event::{ContinuationToken, ContinuationTokenError};

type SequencerResult<T> = Result<T, SequencerError>;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
// Code adapted from Foundry's Anvil

//! background service
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
Expand All @@ -10,11 +6,9 @@ use std::task::{Context, Poll};
use std::time::Duration;

use blockifier::state::state_api::{State, StateReader};
use futures::channel::mpsc::Receiver;
use futures::stream::{Fuse, Stream, StreamExt};
use futures::stream::{Stream, StreamExt};
use futures::FutureExt;
use parking_lot::RwLock;
use starknet::core::types::FieldElement;
use tokio::time::{interval_at, Instant, Interval};
use tracing::trace;

Expand All @@ -26,56 +20,10 @@ use crate::execution::{
create_execution_outcome, ExecutedTransaction, ExecutionOutcome,
MaybeInvalidExecutedTransaction, PendingState, TransactionExecutor,
};
use crate::pool::TransactionPool;

/// The type that drives the blockchain's state
///
/// This service is basically an endless future that continuously polls the miner which returns
/// transactions for the next block, then those transactions are handed off to the [BlockProducer]
/// to construct a new block.
pub struct NodeService {
/// the pool that holds all transactions
pool: Arc<TransactionPool>,
/// creates new blocks
block_producer: BlockProducer,
/// the miner responsible to select transactions from the `pool´
miner: TransactionMiner,
}

impl NodeService {
pub fn new(
pool: Arc<TransactionPool>,
miner: TransactionMiner,
block_producer: BlockProducer,
) -> Self {
Self { pool, block_producer, miner }
}
}

impl Future for NodeService {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pin = self.get_mut();

// this drives block production and feeds new sets of ready transactions to the block
// producer
loop {
while let Poll::Ready(Some(outcome)) = pin.block_producer.poll_next_unpin(cx) {
trace!(target: "node", "mined block {}", outcome.block_number);
}

if let Poll::Ready(transactions) = pin.miner.poll(&pin.pool, cx) {
// miner returned a set of transaction that we feed to the producer
pin.block_producer.queue(transactions);
} else {
// no progress made
break;
}
}

Poll::Pending
}
pub struct MinedBlockOutcome {
pub block_number: u64,
pub transactions: Vec<MaybeInvalidExecutedTransaction>,
}

type ServiceFuture<T> = Pin<Box<dyn Future<Output = T> + Send + Sync>>;
Expand Down Expand Up @@ -122,7 +70,7 @@ impl BlockProducer {
}
}

fn queue(&self, transactions: Vec<Transaction>) {
pub(super) fn queue(&self, transactions: Vec<Transaction>) {
let mut mode = self.inner.write();
match &mut *mode {
BlockProducerMode::Instant(producer) => producer.queued.push_back(transactions),
Expand Down Expand Up @@ -439,6 +387,7 @@ impl Stream for InstantBlockProducer {

// poll the mining future
if let Some(mut mining) = pin.block_mining.take() {
println!("ohayo");
if let Poll::Ready(outcome) = mining.poll_unpin(cx) {
return Poll::Ready(Some(outcome));
} else {
Expand All @@ -449,46 +398,3 @@ impl Stream for InstantBlockProducer {
Poll::Pending
}
}

pub struct MinedBlockOutcome {
pub block_number: u64,
pub transactions: Vec<MaybeInvalidExecutedTransaction>,
}

/// The type which takes the transaction from the pool and feeds them to the block producer.
pub struct TransactionMiner {
/// stores whether there are pending transacions (if known)
has_pending_txs: Option<bool>,
/// Receives hashes of transactions that are ready from the pool
rx: Fuse<Receiver<FieldElement>>,
}

impl TransactionMiner {
pub fn new(rx: Receiver<FieldElement>) -> Self {
Self { rx: rx.fuse(), has_pending_txs: None }
}

fn poll(
&mut self,
pool: &Arc<TransactionPool>,
cx: &mut Context<'_>,
) -> Poll<Vec<Transaction>> {
// drain the notification stream
while let Poll::Ready(Some(_)) = Pin::new(&mut self.rx).poll_next(cx) {
self.has_pending_txs = Some(true);
}

if self.has_pending_txs == Some(false) {
return Poll::Pending;
}

// take all the transactions from the pool
let transactions = pool.get_transactions();

if transactions.is_empty() {
return Poll::Pending;
}

Poll::Ready(transactions)
}
}
107 changes: 107 additions & 0 deletions crates/katana/core/src/service/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Code adapted from Foundry's Anvil

//! background service
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures::channel::mpsc::Receiver;
use futures::stream::{Fuse, Stream, StreamExt};
use starknet::core::types::FieldElement;
use tracing::trace;

use self::block_producer::BlockProducer;
use crate::backend::storage::transaction::Transaction;
use crate::pool::TransactionPool;

pub mod block_producer;

/// The type that drives the blockchain's state
///
/// This service is basically an endless future that continuously polls the miner which returns
/// transactions for the next block, then those transactions are handed off to the [BlockProducer]
/// to construct a new block.
pub struct NodeService {
/// the pool that holds all transactions
pool: Arc<TransactionPool>,
/// creates new blocks
block_producer: BlockProducer,
/// the miner responsible to select transactions from the `pool´
miner: TransactionMiner,
}

impl NodeService {
pub fn new(
pool: Arc<TransactionPool>,
miner: TransactionMiner,
block_producer: BlockProducer,
) -> Self {
Self { pool, block_producer, miner }
}
}

impl Future for NodeService {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pin = self.get_mut();

// this drives block production and feeds new sets of ready transactions to the block
// producer
loop {
while let Poll::Ready(Some(outcome)) = pin.block_producer.poll_next_unpin(cx) {
trace!(target: "node", "mined block {}", outcome.block_number);
}

if let Poll::Ready(transactions) = pin.miner.poll(&pin.pool, cx) {
// miner returned a set of transaction that we feed to the producer
pin.block_producer.queue(transactions);
} else {
// no progress made
break;
}
}

Poll::Pending
}
}

/// The type which takes the transaction from the pool and feeds them to the block producer.
pub struct TransactionMiner {
/// stores whether there are pending transacions (if known)
has_pending_txs: Option<bool>,
/// Receives hashes of transactions that are ready from the pool
rx: Fuse<Receiver<FieldElement>>,
}

impl TransactionMiner {
pub fn new(rx: Receiver<FieldElement>) -> Self {
Self { rx: rx.fuse(), has_pending_txs: None }
}

fn poll(
&mut self,
pool: &Arc<TransactionPool>,
cx: &mut Context<'_>,
) -> Poll<Vec<Transaction>> {
// drain the notification stream
while let Poll::Ready(Some(_)) = Pin::new(&mut self.rx).poll_next(cx) {
self.has_pending_txs = Some(true);
}

if self.has_pending_txs == Some(false) {
return Poll::Pending;
}

// take all the transactions from the pool
let transactions = pool.get_transactions();

if transactions.is_empty() {
return Poll::Pending;
}

Poll::Ready(transactions)
}
}

0 comments on commit 173cdb1

Please sign in to comment.