From 57a865c0e658c5ea93eb8989a19cc105ded4feef Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Mon, 4 Nov 2024 15:10:47 -0500 Subject: [PATCH] feat: exit on block production error --- crates/katana/core/src/service/block_producer.rs | 3 --- crates/katana/core/src/service/mod.rs | 4 +++- crates/katana/pipeline/src/stage/sequencing.rs | 4 ++-- crates/katana/storage/db/src/mdbx/tx.rs | 4 +++- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/crates/katana/core/src/service/block_producer.rs b/crates/katana/core/src/service/block_producer.rs index 07ef33aa62..848ae60789 100644 --- a/crates/katana/core/src/service/block_producer.rs +++ b/crates/katana/core/src/service/block_producer.rs @@ -34,9 +34,6 @@ pub enum BlockProductionError { #[error(transparent)] Provider(#[from] ProviderError), - #[error("block mining task cancelled")] - BlockMiningTaskCancelled, - #[error("transaction execution task cancelled")] ExecutionTaskCancelled, diff --git a/crates/katana/core/src/service/mod.rs b/crates/katana/core/src/service/mod.rs index 7fddc571f7..0d368ff9de 100644 --- a/crates/katana/core/src/service/mod.rs +++ b/crates/katana/core/src/service/mod.rs @@ -5,6 +5,7 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +use block_producer::BlockProductionError; use futures::channel::mpsc::Receiver; use futures::stream::{Fuse, Stream, StreamExt}; use katana_executor::ExecutorFactory; @@ -47,7 +48,7 @@ impl BlockProductionTask { } impl Future for BlockProductionTask { - type Output = (); + type Output = Result<(), BlockProductionError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); @@ -68,6 +69,7 @@ impl Future for BlockProductionTask { Err(error) => { error!(target: LOG_TARGET, %error, "Mining block."); + return Poll::Ready(Err(error)); } } } diff --git a/crates/katana/pipeline/src/stage/sequencing.rs b/crates/katana/pipeline/src/stage/sequencing.rs index 6eae240112..ad351b3344 100644 --- a/crates/katana/pipeline/src/stage/sequencing.rs +++ b/crates/katana/pipeline/src/stage/sequencing.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::Result; use futures::future; use katana_core::backend::Backend; -use katana_core::service::block_producer::BlockProducer; +use katana_core::service::block_producer::{BlockProducer, BlockProductionError}; use katana_core::service::messaging::{MessagingConfig, MessagingService, MessagingTask}; use katana_core::service::{BlockProductionTask, TransactionMiner}; use katana_executor::ExecutorFactory; @@ -52,7 +52,7 @@ impl Sequencing { } } - fn run_block_production(&self) -> TaskHandle<()> { + fn run_block_production(&self) -> TaskHandle> { let pool = self.pool.clone(); let miner = TransactionMiner::new(pool.add_listener()); let block_producer = self.block_producer.clone(); diff --git a/crates/katana/storage/db/src/mdbx/tx.rs b/crates/katana/storage/db/src/mdbx/tx.rs index 6c31545cdc..da902c9f5d 100644 --- a/crates/katana/storage/db/src/mdbx/tx.rs +++ b/crates/katana/storage/db/src/mdbx/tx.rs @@ -118,7 +118,9 @@ impl DbTxMut for Tx { fn put(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> { let key = key.encode(); let value = value.compress(); - self.inner.put(self.get_dbi::()?, key, value, WriteFlags::UPSERT).unwrap(); + self.inner.put(self.get_dbi::()?, &key, value, WriteFlags::UPSERT).map_err(|error| { + DatabaseError::Write { error, table: T::NAME, key: Box::from(key.as_ref()) } + })?; Ok(()) }