Skip to content

Commit

Permalink
refactor(katana): separate node service task (#2413)
Browse files Browse the repository at this point in the history
  • Loading branch information
kariy authored Sep 11, 2024
1 parent a3727ac commit 80d1827
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 76 deletions.
39 changes: 39 additions & 0 deletions crates/katana/core/src/service/messaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,18 @@ mod service;
#[cfg(feature = "starknet-messaging")]
mod starknet;

use std::future::Future;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};

use ::starknet::providers::ProviderError as StarknetProviderError;
use alloy_transport::TransportError;
use anyhow::Result;
use async_trait::async_trait;
use ethereum::EthereumMessaging;
use futures::StreamExt;
use katana_executor::ExecutorFactory;
use katana_primitives::chain::ChainId;
use katana_primitives::receipt::MessageToL1;
use serde::Deserialize;
Expand Down Expand Up @@ -202,3 +207,37 @@ impl MessengerMode {
}
}
}

#[allow(missing_debug_implementations)]
#[must_use = "MessagingTask does nothing unless polled"]
pub struct MessagingTask<EF: ExecutorFactory> {
messaging: MessagingService<EF>,
}

impl<EF: ExecutorFactory> MessagingTask<EF> {
pub fn new(messaging: MessagingService<EF>) -> Self {
Self { messaging }
}
}

impl<EF: ExecutorFactory> Future for MessagingTask<EF> {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(outcome)) = this.messaging.poll_next_unpin(cx) {
match outcome {
MessagingOutcome::Gather { msg_count, .. } => {
info!(target: LOG_TARGET, %msg_count, "Collected messages from settlement chain.");
}

MessagingOutcome::Send { msg_count, .. } => {
info!(target: LOG_TARGET, %msg_count, "Sent messages to the settlement chain.");
}
}
}

Poll::Pending
}
}
5 changes: 0 additions & 5 deletions crates/katana/core/src/service/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
use dojo_metrics::Metrics;
use metrics::Counter;

#[derive(Debug)]
pub(crate) struct ServiceMetrics {
pub(crate) block_producer: BlockProducerMetrics,
}

#[derive(Metrics)]
#[metrics(scope = "block_producer")]
pub(crate) struct BlockProducerMetrics {
Expand Down
69 changes: 20 additions & 49 deletions crates/katana/core/src/service/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! background service
// TODO: remove the messaging feature flag
// TODO: move the tasks to a separate module

use std::future::Future;
use std::pin::Pin;
Expand All @@ -14,102 +15,72 @@ use katana_primitives::FieldElement;
use tracing::{error, info};

use self::block_producer::BlockProducer;
use self::metrics::{BlockProducerMetrics, ServiceMetrics};
use self::metrics::BlockProducerMetrics;

pub mod block_producer;
#[cfg(feature = "messaging")]
pub mod messaging;
mod metrics;

#[cfg(feature = "messaging")]
use self::messaging::{MessagingOutcome, MessagingService};

pub(crate) const LOG_TARGET: &str = "node";

/// The type that drives the blockchain's state
///
/// This service is basically an endless future that continuously polls the miner which returns
/// This task is basically an endless future that continuously polls the miner which returns
/// transactions for the next block, then those transactions are handed off to the [BlockProducer]
/// to construct a new block.
#[must_use = "BlockProductionTask does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct NodeService<EF: ExecutorFactory> {
/// the pool that holds all transactions
pub(crate) pool: TxPool,
pub struct BlockProductionTask<EF: ExecutorFactory> {
/// creates new blocks
pub(crate) block_producer: Arc<BlockProducer<EF>>,
/// the miner responsible to select transactions from the `pool´
pub(crate) miner: TransactionMiner,
/// The messaging service
#[cfg(feature = "messaging")]
pub(crate) messaging: Option<MessagingService<EF>>,
/// the pool that holds all transactions
pub(crate) pool: TxPool,
/// Metrics for recording the service operations
metrics: ServiceMetrics,
metrics: BlockProducerMetrics,
}

impl<EF: ExecutorFactory> NodeService<EF> {
impl<EF: ExecutorFactory> BlockProductionTask<EF> {
pub fn new(
pool: TxPool,
miner: TransactionMiner,
block_producer: Arc<BlockProducer<EF>>,
#[cfg(feature = "messaging")] messaging: Option<MessagingService<EF>>,
) -> Self {
let metrics = ServiceMetrics { block_producer: BlockProducerMetrics::default() };

Self {
pool,
miner,
block_producer,
metrics,
#[cfg(feature = "messaging")]
messaging,
}
Self { block_producer, miner, pool, metrics: BlockProducerMetrics::default() }
}
}

impl<EF: ExecutorFactory> Future for NodeService<EF> {
impl<EF: ExecutorFactory> Future for BlockProductionTask<EF> {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pin = self.get_mut();

#[cfg(feature = "messaging")]
if let Some(messaging) = pin.messaging.as_mut() {
while let Poll::Ready(Some(outcome)) = messaging.poll_next_unpin(cx) {
match outcome {
MessagingOutcome::Gather { msg_count, .. } => {
info!(target: LOG_TARGET, msg_count = %msg_count, "Collected messages from settlement chain.");
}
MessagingOutcome::Send { msg_count, .. } => {
info!(target: LOG_TARGET, msg_count = %msg_count, "Sent messages to the settlement chain.");
}
}
}
}
let this = self.get_mut();

// this drives block production and feeds new sets of ready transactions to the block
// producer
loop {
while let Poll::Ready(Some(res)) = pin.block_producer.poll_next(cx) {
while let Poll::Ready(Some(res)) = this.block_producer.poll_next(cx) {
match res {
Ok(outcome) => {
info!(target: LOG_TARGET, block_number = %outcome.block_number, "Mined block.");

let metrics = &pin.metrics.block_producer;
let gas_used = outcome.stats.l1_gas_used;
let steps_used = outcome.stats.cairo_steps_used;
metrics.l1_gas_processed_total.increment(gas_used as u64);
metrics.cairo_steps_processed_total.increment(steps_used as u64);
this.metrics.l1_gas_processed_total.increment(gas_used as u64);
this.metrics.cairo_steps_processed_total.increment(steps_used as u64);
}

Err(err) => {
error!(target: LOG_TARGET, error = %err, "Mining block.");
Err(error) => {
error!(target: LOG_TARGET, %error, "Mining block.");
}
}
}

if let Poll::Ready(pool_txs) = pin.miner.poll(&pin.pool, cx) {
if let Poll::Ready(pool_txs) = this.miner.poll(&this.pool, cx) {
// miner returned a set of transaction that we feed to the producer
pin.block_producer.queue(pool_txs);
this.block_producer.queue(pool_txs);
} else {
// no progress made
break;
Expand Down
35 changes: 16 additions & 19 deletions crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use katana_core::env::BlockContextGenerator;
use katana_core::sequencer::SequencerConfig;
use katana_core::service::block_producer::BlockProducer;
#[cfg(feature = "messaging")]
use katana_core::service::messaging::MessagingService;
use katana_core::service::{NodeService, TransactionMiner};
use katana_core::service::messaging::{MessagingService, MessagingTask};
use katana_core::service::{BlockProductionTask, TransactionMiner};
use katana_executor::implementation::blockifier::BlockifierFactory;
use katana_executor::{ExecutorFactory, SimulationFlag};
use katana_pool::ordering::FiFo;
Expand Down Expand Up @@ -173,7 +173,7 @@ pub async fn start(
config: starknet_config,
});

// --- build block producer service
// --- build block producer

let block_producer = if sequencer_config.block_time.is_some() || sequencer_config.no_mining {
if let Some(interval) = sequencer_config.block_time {
Expand Down Expand Up @@ -210,28 +210,25 @@ pub async fn start(
info!(%addr, "Metrics endpoint started.");
}

// --- build messaging service
// --- create a TaskManager using the ambient Tokio runtime

let task_manager = TaskManager::current();

// --- build and spawn the messaging task

#[cfg(feature = "messaging")]
let messaging = if let Some(config) = sequencer_config.messaging.clone() {
MessagingService::new(config, pool.clone(), Arc::clone(&backend)).await.ok()
} else {
None
};
if let Some(config) = sequencer_config.messaging.clone() {
let messaging = MessagingService::new(config, pool.clone(), Arc::clone(&backend)).await?;
let task = MessagingTask::new(messaging);
task_manager.build_task().critical().name("Messaging").spawn(task);
}

let block_producer = Arc::new(block_producer);

// Create a TaskManager using the ambient Tokio runtime
let task_manager = TaskManager::current();
// --- build and spawn the block production task

// Spawn the NodeService as a critical task
task_manager.build_task().critical().name("NodeService").spawn(NodeService::new(
pool.clone(),
miner,
block_producer.clone(),
#[cfg(feature = "messaging")]
messaging,
));
let task = BlockProductionTask::new(pool.clone(), miner, block_producer.clone());
task_manager.build_task().critical().name("BlockProduction").spawn(task);

// --- spawn rpc server

Expand Down
4 changes: 2 additions & 2 deletions crates/katana/tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ impl TaskManager {
self.on_cancel.cancelled().await;
}

/// Shutdowns the manger and wait until all tasks are finished, either due to completion or
/// cancellation.
/// Shuts down the manager and wait until all currently running tasks are finished, either due
/// to completion or cancellation.
///
/// No task can be spawned on the manager after this method is called.
pub async fn shutdown(self) {
Expand Down
2 changes: 1 addition & 1 deletion crates/katana/tasks/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ impl<'a> CriticalTaskBuilder<'a> {
let fut = AssertUnwindSafe(fut)
.catch_unwind()
.map_err(move |error| {
ct.cancel();
let error = PanickedTaskError { error };
error!(%error, task = task_name, "Critical task failed.");
ct.cancel();
error
})
.map(drop);
Expand Down

0 comments on commit 80d1827

Please sign in to comment.