diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 2255734e85f4..b257bf57e6f7 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -50,7 +50,7 @@ use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse}; -use store_api::region_request::{RegionCloseRequest, RegionRequest}; +use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::table::scan::StreamScanAdapter; @@ -112,7 +112,7 @@ impl RegionServer { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { self.inner.handle_request(region_id, request).await } @@ -288,7 +288,7 @@ impl RegionServerInner { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { let request_type = request.request_type(); let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED .with_label_values(&[request_type]) diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 0c0a55d28ac9..b5926c0c2230 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -31,7 +31,7 @@ use query::QueryEngine; use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; -use store_api::region_request::RegionRequest; +use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use table::TableRef; use tokio::sync::mpsc::{Receiver, Sender}; @@ -109,7 +109,7 @@ impl RegionEngine for MockRegionEngine { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { let _ = self.sender.send((region_id, request)).await; Ok(0) } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 5608fbd9af19..c8b9f82992c2 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -25,7 +25,8 @@ use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; use store_api::region_request::{ - RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, RegionRequest, + AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, + RegionRequest, }; use store_api::storage::{RegionId, ScanRequest}; use tokio::sync::Mutex; @@ -58,7 +59,7 @@ impl RegionEngine for FileRegionEngine { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { self.inner .handle_request(region_id, request) .await @@ -148,7 +149,7 @@ impl EngineInner { &self, region_id: RegionId, request: RegionRequest, - ) -> EngineResult { + ) -> EngineResult { match request { RegionRequest::Create(req) => self.handle_create(region_id, req).await, RegionRequest::Drop(req) => self.handle_drop(region_id, req).await, @@ -186,7 +187,7 @@ impl EngineInner { &self, region_id: RegionId, request: RegionCreateRequest, - ) -> EngineResult { + ) -> EngineResult { ensure!( request.engine == FILE_ENGINE, UnexpectedEngineSnafu { @@ -223,7 +224,7 @@ impl EngineInner { &self, region_id: RegionId, request: RegionOpenRequest, - ) -> EngineResult { + ) -> EngineResult { if self.exists(region_id).await { return Ok(0); } @@ -253,7 +254,7 @@ impl EngineInner { &self, region_id: RegionId, _request: RegionCloseRequest, - ) -> EngineResult { + ) -> EngineResult { let _lock = self.region_mutex.lock().await; let mut regions = self.regions.write().unwrap(); @@ -268,7 +269,7 @@ impl EngineInner { &self, region_id: RegionId, _request: RegionDropRequest, - ) -> EngineResult { + ) -> EngineResult { if !self.exists(region_id).await { return RegionNotFoundSnafu { region_id }.fail(); } diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index 3b40e115f6e5..f9ee734ffc56 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -19,7 +19,7 @@ use snafu::ResultExt; use store_api::metadata::ColumnMetadata; use store_api::region_engine::RegionEngine; use store_api::region_request::{ - AddColumn, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest, + AddColumn, AffectedRows, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest, }; use store_api::storage::consts::ReservedColumnId; use store_api::storage::RegionId; @@ -136,7 +136,7 @@ impl DataRegion { &self, region_id: RegionId, request: RegionPutRequest, - ) -> Result { + ) -> Result { let region_id = utils::to_data_region_id(region_id); self.mito .handle_request(region_id, RegionRequest::Put(request)) diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 330592c4fa44..17aa7e5a334c 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -28,7 +28,7 @@ use common_recordbatch::SendableRecordBatchStream; use mito2::engine::MitoEngine; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; -use store_api::region_request::RegionRequest; +use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use tokio::sync::RwLock; @@ -113,7 +113,7 @@ impl RegionEngine for MetricEngine { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { let result = match request { RegionRequest::Put(put) => self.inner.put_region(region_id, put).await, RegionRequest::Delete(_) => todo!(), diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index f37b0b5e4526..6cbc8c7b3ca1 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -19,7 +19,7 @@ use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType}; use common_telemetry::{error, info}; use snafu::OptionExt; -use store_api::region_request::RegionPutRequest; +use store_api::region_request::{AffectedRows, RegionPutRequest}; use store_api::storage::{RegionId, TableId}; use crate::consts::{DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, RANDOM_STATE}; @@ -36,7 +36,7 @@ impl MetricEngineInner { &self, region_id: RegionId, request: RegionPutRequest, - ) -> Result { + ) -> Result { let is_putting_physical_region = self .state .read() @@ -60,7 +60,7 @@ impl MetricEngineInner { &self, logical_region_id: RegionId, mut request: RegionPutRequest, - ) -> Result { + ) -> Result { let physical_region_id = *self .state .read() diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 58b0dec4fc56..f1a7e4455d04 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -50,7 +50,7 @@ use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; -use store_api::region_request::RegionRequest; +use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; @@ -146,7 +146,11 @@ impl EngineInner { } /// Handles [RegionRequest] and return its executed result. - async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result { + async fn handle_request( + &self, + region_id: RegionId, + request: RegionRequest, + ) -> Result { let _timer = HANDLE_REQUEST_ELAPSED .with_label_values(&[request.type_name()]) .start_timer(); @@ -219,7 +223,7 @@ impl RegionEngine for MitoEngine { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { self.inner .handle_request(region_id, request) .await diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index c603ea2c3ee0..47241d6bf999 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -32,8 +32,9 @@ use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::region_engine::SetReadonlyResponse; use store_api::region_request::{ - RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, - RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, + AffectedRows, RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, + RegionCreateRequest, RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, + RegionTruncateRequest, }; use store_api::storage::{RegionId, SequenceNumber}; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -382,16 +383,16 @@ pub(crate) fn validate_proto_value( /// Oneshot output result sender. #[derive(Debug)] -pub(crate) struct OutputTx(Sender>); +pub(crate) struct OutputTx(Sender>); impl OutputTx { /// Creates a new output sender. - pub(crate) fn new(sender: Sender>) -> OutputTx { + pub(crate) fn new(sender: Sender>) -> OutputTx { OutputTx(sender) } /// Sends the `result`. - pub(crate) fn send(self, result: Result) { + pub(crate) fn send(self, result: Result) { // Ignores send result. let _ = self.0.send(result); } @@ -413,14 +414,14 @@ impl OptionOutputTx { } /// Sends the `result` and consumes the inner sender. - pub(crate) fn send_mut(&mut self, result: Result) { + pub(crate) fn send_mut(&mut self, result: Result) { if let Some(sender) = self.0.take() { sender.send(result); } } /// Sends the `result` and consumes the sender. - pub(crate) fn send(mut self, result: Result) { + pub(crate) fn send(mut self, result: Result) { if let Some(sender) = self.0.take() { sender.send(result); } @@ -432,8 +433,8 @@ impl OptionOutputTx { } } -impl From>> for OptionOutputTx { - fn from(sender: Sender>) -> Self { +impl From>> for OptionOutputTx { + fn from(sender: Sender>) -> Self { Self::new(Some(OutputTx::new(sender))) } } @@ -492,7 +493,7 @@ impl WorkerRequest { pub(crate) fn try_from_region_request( region_id: RegionId, value: RegionRequest, - ) -> Result<(WorkerRequest, Receiver>)> { + ) -> Result<(WorkerRequest, Receiver>)> { let (sender, receiver) = oneshot::channel(); let worker_request = match value { RegionRequest::Put(v) => { diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index 1ae6bbdb5adb..80f1bfa63235 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -15,6 +15,7 @@ //! Handling close request. use common_telemetry::info; +use store_api::region_request::AffectedRows; use store_api::storage::RegionId; use crate::error::Result; @@ -22,7 +23,10 @@ use crate::metrics::REGION_COUNT; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { - pub(crate) async fn handle_close_request(&mut self, region_id: RegionId) -> Result { + pub(crate) async fn handle_close_request( + &mut self, + region_id: RegionId, + ) -> Result { let Some(region) = self.regions.get_region(region_id) else { return Ok(0); }; diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 5eebfe9bf28f..6d86ddb1887f 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -20,7 +20,7 @@ use common_telemetry::info; use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataBuilder; -use store_api::region_request::RegionCreateRequest; +use store_api::region_request::{AffectedRows, RegionCreateRequest}; use store_api::storage::RegionId; use crate::error::{InvalidMetadataSnafu, Result}; @@ -33,7 +33,7 @@ impl RegionWorkerLoop { &mut self, region_id: RegionId, request: RegionCreateRequest, - ) -> Result { + ) -> Result { // Checks whether the table exists. if let Some(region) = self.regions.get_region(region_id) { // Region already exists. diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index e03f9df42922..fa0d1181a988 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -21,6 +21,7 @@ use futures::TryStreamExt; use object_store::util::join_path; use object_store::{EntryMode, ObjectStore}; use snafu::ResultExt; +use store_api::region_request::AffectedRows; use store_api::storage::RegionId; use tokio::time::sleep; @@ -33,7 +34,10 @@ const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; // 5 minutes const MAX_RETRY_TIMES: u64 = 288; // 24 hours (5m * 288) impl RegionWorkerLoop { - pub(crate) async fn handle_drop_request(&mut self, region_id: RegionId) -> Result { + pub(crate) async fn handle_drop_request( + &mut self, + region_id: RegionId, + ) -> Result { let region = self.regions.writable_region(region_id)?; info!("Try to drop region: {}", region_id); diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index dd9447ab7a80..095da683a8f5 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -20,7 +20,7 @@ use common_telemetry::info; use object_store::util::join_path; use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; -use store_api::region_request::RegionOpenRequest; +use store_api::region_request::{AffectedRows, RegionOpenRequest}; use store_api::storage::RegionId; use crate::error::{ObjectStoreNotFoundSnafu, OpenDalSnafu, RegionNotFoundSnafu, Result}; @@ -34,7 +34,7 @@ impl RegionWorkerLoop { &mut self, region_id: RegionId, request: RegionOpenRequest, - ) -> Result { + ) -> Result { if self.regions.is_region_exists(region_id) { return Ok(0); } diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 4749efa2455e..ecb66817b30c 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -16,6 +16,7 @@ use common_telemetry::info; use store_api::logstore::LogStore; +use store_api::region_request::AffectedRows; use store_api::storage::RegionId; use crate::error::Result; @@ -23,7 +24,10 @@ use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionTrun use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { - pub(crate) async fn handle_truncate_request(&mut self, region_id: RegionId) -> Result { + pub(crate) async fn handle_truncate_request( + &mut self, + region_id: RegionId, + ) -> Result { let region = self.regions.writable_region(region_id)?; info!("Try to truncate region {}", region_id); diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 1936ca74e144..f80c7d94ec06 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize}; use crate::logstore::entry; use crate::metadata::RegionMetadataRef; -use crate::region_request::RegionRequest; +use crate::region_request::{AffectedRows, RegionRequest}; use crate::storage::{RegionId, ScanRequest}; /// The result of setting readonly for the region. @@ -118,7 +118,7 @@ pub trait RegionEngine: Send + Sync { &self, region_id: RegionId, request: RegionRequest, - ) -> Result; + ) -> Result; /// Handles substrait query and return a stream of record batches async fn handle_query( diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 928f2abd73ee..e04382c64f74 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -28,6 +28,8 @@ use crate::metadata::{ use crate::path_utils::region_dir; use crate::storage::{ColumnId, RegionId, ScanRequest}; +pub type AffectedRows = usize; + #[derive(Debug, IntoStaticStr)] pub enum RegionRequest { Put(RegionPutRequest),