From 80d18277d84a50c015decbbc9958fc2e4b90a072 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Wed, 11 Sep 2024 14:16:27 -0400 Subject: [PATCH] refactor(katana): separate node service task (#2413) --- .../katana/core/src/service/messaging/mod.rs | 39 +++++++++++ crates/katana/core/src/service/metrics.rs | 5 -- crates/katana/core/src/service/mod.rs | 69 ++++++------------- crates/katana/node/src/lib.rs | 35 +++++----- crates/katana/tasks/src/manager.rs | 4 +- crates/katana/tasks/src/task.rs | 2 +- 6 files changed, 78 insertions(+), 76 deletions(-) diff --git a/crates/katana/core/src/service/messaging/mod.rs b/crates/katana/core/src/service/messaging/mod.rs index 7c40028b9b..cd064f44be 100644 --- a/crates/katana/core/src/service/messaging/mod.rs +++ b/crates/katana/core/src/service/messaging/mod.rs @@ -37,13 +37,18 @@ mod service; #[cfg(feature = "starknet-messaging")] mod starknet; +use std::future::Future; use std::path::Path; +use std::pin::Pin; +use std::task::{Context, Poll}; use ::starknet::providers::ProviderError as StarknetProviderError; use alloy_transport::TransportError; use anyhow::Result; use async_trait::async_trait; use ethereum::EthereumMessaging; +use futures::StreamExt; +use katana_executor::ExecutorFactory; use katana_primitives::chain::ChainId; use katana_primitives::receipt::MessageToL1; use serde::Deserialize; @@ -202,3 +207,37 @@ impl MessengerMode { } } } + +#[allow(missing_debug_implementations)] +#[must_use = "MessagingTask does nothing unless polled"] +pub struct MessagingTask { + messaging: MessagingService, +} + +impl MessagingTask { + pub fn new(messaging: MessagingService) -> Self { + Self { messaging } + } +} + +impl Future for MessagingTask { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + while let Poll::Ready(Some(outcome)) = this.messaging.poll_next_unpin(cx) { + match outcome { + MessagingOutcome::Gather { msg_count, .. } => { + info!(target: LOG_TARGET, %msg_count, "Collected messages from settlement chain."); + } + + MessagingOutcome::Send { msg_count, .. } => { + info!(target: LOG_TARGET, %msg_count, "Sent messages to the settlement chain."); + } + } + } + + Poll::Pending + } +} diff --git a/crates/katana/core/src/service/metrics.rs b/crates/katana/core/src/service/metrics.rs index e773a1630f..c6ce43373e 100644 --- a/crates/katana/core/src/service/metrics.rs +++ b/crates/katana/core/src/service/metrics.rs @@ -1,11 +1,6 @@ use dojo_metrics::Metrics; use metrics::Counter; -#[derive(Debug)] -pub(crate) struct ServiceMetrics { - pub(crate) block_producer: BlockProducerMetrics, -} - #[derive(Metrics)] #[metrics(scope = "block_producer")] pub(crate) struct BlockProducerMetrics { diff --git a/crates/katana/core/src/service/mod.rs b/crates/katana/core/src/service/mod.rs index 0dce5669cd..be12368f45 100644 --- a/crates/katana/core/src/service/mod.rs +++ b/crates/katana/core/src/service/mod.rs @@ -1,4 +1,5 @@ -//! background service +// TODO: remove the messaging feature flag +// TODO: move the tasks to a separate module use std::future::Future; use std::pin::Pin; @@ -14,102 +15,72 @@ use katana_primitives::FieldElement; use tracing::{error, info}; use self::block_producer::BlockProducer; -use self::metrics::{BlockProducerMetrics, ServiceMetrics}; +use self::metrics::BlockProducerMetrics; pub mod block_producer; #[cfg(feature = "messaging")] pub mod messaging; mod metrics; -#[cfg(feature = "messaging")] -use self::messaging::{MessagingOutcome, MessagingService}; - pub(crate) const LOG_TARGET: &str = "node"; /// The type that drives the blockchain's state /// -/// This service is basically an endless future that continuously polls the miner which returns +/// This task is basically an endless future that continuously polls the miner which returns /// transactions for the next block, then those transactions are handed off to the [BlockProducer] /// to construct a new block. +#[must_use = "BlockProductionTask does nothing unless polled"] #[allow(missing_debug_implementations)] -pub struct NodeService { - /// the pool that holds all transactions - pub(crate) pool: TxPool, +pub struct BlockProductionTask { /// creates new blocks pub(crate) block_producer: Arc>, /// the miner responsible to select transactions from the `pool´ pub(crate) miner: TransactionMiner, - /// The messaging service - #[cfg(feature = "messaging")] - pub(crate) messaging: Option>, + /// the pool that holds all transactions + pub(crate) pool: TxPool, /// Metrics for recording the service operations - metrics: ServiceMetrics, + metrics: BlockProducerMetrics, } -impl NodeService { +impl BlockProductionTask { pub fn new( pool: TxPool, miner: TransactionMiner, block_producer: Arc>, - #[cfg(feature = "messaging")] messaging: Option>, ) -> Self { - let metrics = ServiceMetrics { block_producer: BlockProducerMetrics::default() }; - - Self { - pool, - miner, - block_producer, - metrics, - #[cfg(feature = "messaging")] - messaging, - } + Self { block_producer, miner, pool, metrics: BlockProducerMetrics::default() } } } -impl Future for NodeService { +impl Future for BlockProductionTask { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let pin = self.get_mut(); - - #[cfg(feature = "messaging")] - if let Some(messaging) = pin.messaging.as_mut() { - while let Poll::Ready(Some(outcome)) = messaging.poll_next_unpin(cx) { - match outcome { - MessagingOutcome::Gather { msg_count, .. } => { - info!(target: LOG_TARGET, msg_count = %msg_count, "Collected messages from settlement chain."); - } - MessagingOutcome::Send { msg_count, .. } => { - info!(target: LOG_TARGET, msg_count = %msg_count, "Sent messages to the settlement chain."); - } - } - } - } + let this = self.get_mut(); // this drives block production and feeds new sets of ready transactions to the block // producer loop { - while let Poll::Ready(Some(res)) = pin.block_producer.poll_next(cx) { + while let Poll::Ready(Some(res)) = this.block_producer.poll_next(cx) { match res { Ok(outcome) => { info!(target: LOG_TARGET, block_number = %outcome.block_number, "Mined block."); - let metrics = &pin.metrics.block_producer; let gas_used = outcome.stats.l1_gas_used; let steps_used = outcome.stats.cairo_steps_used; - metrics.l1_gas_processed_total.increment(gas_used as u64); - metrics.cairo_steps_processed_total.increment(steps_used as u64); + this.metrics.l1_gas_processed_total.increment(gas_used as u64); + this.metrics.cairo_steps_processed_total.increment(steps_used as u64); } - Err(err) => { - error!(target: LOG_TARGET, error = %err, "Mining block."); + Err(error) => { + error!(target: LOG_TARGET, %error, "Mining block."); } } } - if let Poll::Ready(pool_txs) = pin.miner.poll(&pin.pool, cx) { + if let Poll::Ready(pool_txs) = this.miner.poll(&this.pool, cx) { // miner returned a set of transaction that we feed to the producer - pin.block_producer.queue(pool_txs); + this.block_producer.queue(pool_txs); } else { // no progress made break; diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index bf0bdb4272..55dab949f0 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -19,8 +19,8 @@ use katana_core::env::BlockContextGenerator; use katana_core::sequencer::SequencerConfig; use katana_core::service::block_producer::BlockProducer; #[cfg(feature = "messaging")] -use katana_core::service::messaging::MessagingService; -use katana_core::service::{NodeService, TransactionMiner}; +use katana_core::service::messaging::{MessagingService, MessagingTask}; +use katana_core::service::{BlockProductionTask, TransactionMiner}; use katana_executor::implementation::blockifier::BlockifierFactory; use katana_executor::{ExecutorFactory, SimulationFlag}; use katana_pool::ordering::FiFo; @@ -173,7 +173,7 @@ pub async fn start( config: starknet_config, }); - // --- build block producer service + // --- build block producer let block_producer = if sequencer_config.block_time.is_some() || sequencer_config.no_mining { if let Some(interval) = sequencer_config.block_time { @@ -210,28 +210,25 @@ pub async fn start( info!(%addr, "Metrics endpoint started."); } - // --- build messaging service + // --- create a TaskManager using the ambient Tokio runtime + + let task_manager = TaskManager::current(); + + // --- build and spawn the messaging task #[cfg(feature = "messaging")] - let messaging = if let Some(config) = sequencer_config.messaging.clone() { - MessagingService::new(config, pool.clone(), Arc::clone(&backend)).await.ok() - } else { - None - }; + if let Some(config) = sequencer_config.messaging.clone() { + let messaging = MessagingService::new(config, pool.clone(), Arc::clone(&backend)).await?; + let task = MessagingTask::new(messaging); + task_manager.build_task().critical().name("Messaging").spawn(task); + } let block_producer = Arc::new(block_producer); - // Create a TaskManager using the ambient Tokio runtime - let task_manager = TaskManager::current(); + // --- build and spawn the block production task - // Spawn the NodeService as a critical task - task_manager.build_task().critical().name("NodeService").spawn(NodeService::new( - pool.clone(), - miner, - block_producer.clone(), - #[cfg(feature = "messaging")] - messaging, - )); + let task = BlockProductionTask::new(pool.clone(), miner, block_producer.clone()); + task_manager.build_task().critical().name("BlockProduction").spawn(task); // --- spawn rpc server diff --git a/crates/katana/tasks/src/manager.rs b/crates/katana/tasks/src/manager.rs index 42681079d1..f79a7657e8 100644 --- a/crates/katana/tasks/src/manager.rs +++ b/crates/katana/tasks/src/manager.rs @@ -47,8 +47,8 @@ impl TaskManager { self.on_cancel.cancelled().await; } - /// Shutdowns the manger and wait until all tasks are finished, either due to completion or - /// cancellation. + /// Shuts down the manager and wait until all currently running tasks are finished, either due + /// to completion or cancellation. /// /// No task can be spawned on the manager after this method is called. pub async fn shutdown(self) { diff --git a/crates/katana/tasks/src/task.rs b/crates/katana/tasks/src/task.rs index 09d8462779..1e5cdb813a 100644 --- a/crates/katana/tasks/src/task.rs +++ b/crates/katana/tasks/src/task.rs @@ -131,9 +131,9 @@ impl<'a> CriticalTaskBuilder<'a> { let fut = AssertUnwindSafe(fut) .catch_unwind() .map_err(move |error| { - ct.cancel(); let error = PanickedTaskError { error }; error!(%error, task = task_name, "Critical task failed."); + ct.cancel(); error }) .map(drop);