diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 6580fa891140..b257bf57e6f7 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -50,7 +50,7 @@ use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse}; -use store_api::region_request::{RegionCloseRequest, RegionRequest}; +use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::table::scan::StreamScanAdapter; @@ -112,7 +112,7 @@ impl RegionServer { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { self.inner.handle_request(region_id, request).await } @@ -209,13 +209,7 @@ impl RegionServerHandler for RegionServer { // only insert/delete will have multiple results. let mut affected_rows = 0; for result in results { - match result { - Output::AffectedRows(rows) => affected_rows += rows, - Output::Stream(_) | Output::RecordBatches(_) => { - // TODO: change the output type to only contains `affected_rows` - unreachable!() - } - } + affected_rows += result; } Ok(RegionResponse { @@ -294,7 +288,7 @@ impl RegionServerInner { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { let request_type = request.request_type(); let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED .with_label_values(&[request_type]) diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 7a5b31771cb1..b5926c0c2230 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -31,7 +31,7 @@ use query::QueryEngine; use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; -use store_api::region_request::RegionRequest; +use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use table::TableRef; use tokio::sync::mpsc::{Receiver, Sender}; @@ -109,10 +109,9 @@ impl RegionEngine for MockRegionEngine { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { let _ = self.sender.send((region_id, request)).await; - - Ok(Output::AffectedRows(0)) + Ok(0) } async fn handle_query( diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 485761af1861..c8b9f82992c2 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -18,7 +18,6 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use common_catalog::consts::FILE_ENGINE; use common_error::ext::BoxedError; -use common_query::Output; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{error, info}; use object_store::ObjectStore; @@ -26,7 +25,8 @@ use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; use store_api::region_request::{ - RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, RegionRequest, + AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, + RegionRequest, }; use store_api::storage::{RegionId, ScanRequest}; use tokio::sync::Mutex; @@ -59,7 +59,7 @@ impl RegionEngine for FileRegionEngine { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { self.inner .handle_request(region_id, request) .await @@ -149,7 +149,7 @@ impl EngineInner { &self, region_id: RegionId, request: RegionRequest, - ) -> EngineResult { + ) -> EngineResult { match request { RegionRequest::Create(req) => self.handle_create(region_id, req).await, RegionRequest::Drop(req) => self.handle_drop(region_id, req).await, @@ -187,7 +187,7 @@ impl EngineInner { &self, region_id: RegionId, request: RegionCreateRequest, - ) -> EngineResult { + ) -> EngineResult { ensure!( request.engine == FILE_ENGINE, UnexpectedEngineSnafu { @@ -196,7 +196,7 @@ impl EngineInner { ); if self.exists(region_id).await { - return Ok(Output::AffectedRows(0)); + return Ok(0); } info!("Try to create region, region_id: {}", region_id); @@ -204,7 +204,7 @@ impl EngineInner { let _lock = self.region_mutex.lock().await; // Check again after acquiring the lock if self.exists(region_id).await { - return Ok(Output::AffectedRows(0)); + return Ok(0); } let res = FileRegion::create(region_id, request, &self.object_store).await; @@ -217,16 +217,16 @@ impl EngineInner { self.regions.write().unwrap().insert(region_id, region); info!("A new region is created, region_id: {}", region_id); - Ok(Output::AffectedRows(0)) + Ok(0) } async fn handle_open( &self, region_id: RegionId, request: RegionOpenRequest, - ) -> EngineResult { + ) -> EngineResult { if self.exists(region_id).await { - return Ok(Output::AffectedRows(0)); + return Ok(0); } info!("Try to open region, region_id: {}", region_id); @@ -234,7 +234,7 @@ impl EngineInner { let _lock = self.region_mutex.lock().await; // Check again after acquiring the lock if self.exists(region_id).await { - return Ok(Output::AffectedRows(0)); + return Ok(0); } let res = FileRegion::open(region_id, request, &self.object_store).await; @@ -247,14 +247,14 @@ impl EngineInner { self.regions.write().unwrap().insert(region_id, region); info!("Region opened, region_id: {}", region_id); - Ok(Output::AffectedRows(0)) + Ok(0) } async fn handle_close( &self, region_id: RegionId, _request: RegionCloseRequest, - ) -> EngineResult { + ) -> EngineResult { let _lock = self.region_mutex.lock().await; let mut regions = self.regions.write().unwrap(); @@ -262,14 +262,14 @@ impl EngineInner { info!("Region closed, region_id: {}", region_id); } - Ok(Output::AffectedRows(0)) + Ok(0) } async fn handle_drop( &self, region_id: RegionId, _request: RegionDropRequest, - ) -> EngineResult { + ) -> EngineResult { if !self.exists(region_id).await { return RegionNotFoundSnafu { region_id }.fail(); } @@ -291,7 +291,7 @@ impl EngineInner { let _ = self.regions.write().unwrap().remove(®ion_id); info!("Region dropped, region_id: {}", region_id); - Ok(Output::AffectedRows(0)) + Ok(0) } async fn get_region(&self, region_id: RegionId) -> Option { diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index 6d35aaefb579..f9ee734ffc56 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -13,14 +13,13 @@ // limitations under the License. use api::v1::SemanticType; -use common_query::Output; use common_telemetry::tracing::warn; use mito2::engine::MitoEngine; use snafu::ResultExt; use store_api::metadata::ColumnMetadata; use store_api::region_engine::RegionEngine; use store_api::region_request::{ - AddColumn, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest, + AddColumn, AffectedRows, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest, }; use store_api::storage::consts::ReservedColumnId; use store_api::storage::RegionId; @@ -137,7 +136,7 @@ impl DataRegion { &self, region_id: RegionId, request: RegionPutRequest, - ) -> Result { + ) -> Result { let region_id = utils::to_data_region_id(region_id); self.mito .handle_request(region_id, RegionRequest::Put(request)) diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 7abc0cfacf15..65436ef0d328 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -28,7 +28,7 @@ use common_recordbatch::SendableRecordBatchStream; use mito2::engine::MitoEngine; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; -use store_api::region_request::RegionRequest; +use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use tokio::sync::RwLock; @@ -108,30 +108,20 @@ impl RegionEngine for MetricEngine { METRIC_ENGINE_NAME } - /// Handles request to the region. - /// - /// Only query is not included, which is handled in `handle_query` + /// Handles non-query request to the region. Returns the count of affected rows. async fn handle_request( &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { let result = match request { RegionRequest::Put(put) => self.inner.put_region(region_id, put).await, RegionRequest::Delete(_) => todo!(), - RegionRequest::Create(create) => self - .inner - .create_region(region_id, create) - .await - .map(|_| Output::AffectedRows(0)), + RegionRequest::Create(create) => self.inner.create_region(region_id, create).await, RegionRequest::Drop(_) => todo!(), RegionRequest::Open(_) => todo!(), RegionRequest::Close(_) => todo!(), - RegionRequest::Alter(alter) => self - .inner - .alter_region(region_id, alter) - .await - .map(|_| Output::AffectedRows(0)), + RegionRequest::Alter(alter) => self.inner.alter_region(region_id, alter).await, RegionRequest::Flush(_) => todo!(), RegionRequest::Compact(_) => todo!(), RegionRequest::Truncate(_) => todo!(), diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 74ea3921f6b8..94b89d6a7685 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -14,7 +14,7 @@ use common_telemetry::{error, info}; use snafu::OptionExt; -use store_api::region_request::{AlterKind, RegionAlterRequest}; +use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest}; use store_api::storage::RegionId; use crate::engine::MetricEngineInner; @@ -28,18 +28,21 @@ impl MetricEngineInner { &self, region_id: RegionId, request: RegionAlterRequest, - ) -> Result<()> { + ) -> Result { let is_altering_logical_region = self .state .read() .await .physical_regions() .contains_key(®ion_id); - if is_altering_logical_region { + + let result = if is_altering_logical_region { self.alter_physical_region(region_id, request).await } else { self.alter_logical_region(region_id, request).await - } + }; + + result.map(|_| 0) } async fn alter_logical_region( diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index f3b5ff4ef59c..ba4873abab2d 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -25,7 +25,7 @@ use object_store::util::join_dir; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::ColumnMetadata; use store_api::region_engine::RegionEngine; -use store_api::region_request::{RegionCreateRequest, RegionRequest}; +use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest}; use store_api::storage::consts::ReservedColumnId; use store_api::storage::RegionId; @@ -50,16 +50,18 @@ impl MetricEngineInner { &self, region_id: RegionId, request: RegionCreateRequest, - ) -> Result<()> { + ) -> Result { Self::verify_region_create_request(&request)?; - if request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) { + let result = if request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) { self.create_physical_region(region_id, request).await } else if request.options.contains_key(LOGICAL_TABLE_METADATA_KEY) { self.create_logical_region(region_id, request).await } else { MissingRegionOptionSnafu {}.fail() - } + }; + + result.map(|_| 0) } /// Initialize a physical metric region at given region id. diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 8baa37a7cfbc..6cbc8c7b3ca1 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -17,11 +17,9 @@ use std::hash::{BuildHasher, Hash, Hasher}; use ahash::RandomState; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType}; -use common_query::Output; use common_telemetry::{error, info}; -use datatypes::data_type::ConcreteDataType; use snafu::OptionExt; -use store_api::region_request::RegionPutRequest; +use store_api::region_request::{AffectedRows, RegionPutRequest}; use store_api::storage::{RegionId, TableId}; use crate::consts::{DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, RANDOM_STATE}; @@ -38,7 +36,7 @@ impl MetricEngineInner { &self, region_id: RegionId, request: RegionPutRequest, - ) -> Result { + ) -> Result { let is_putting_physical_region = self .state .read() @@ -62,7 +60,7 @@ impl MetricEngineInner { &self, logical_region_id: RegionId, mut request: RegionPutRequest, - ) -> Result { + ) -> Result { let physical_region_id = *self .state .read() @@ -208,7 +206,6 @@ impl MetricEngineInner { #[cfg(test)] mod tests { - use common_recordbatch::RecordBatches; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; @@ -231,14 +228,11 @@ mod tests { // write data let logical_region_id = env.default_logical_region_id(); - let Output::AffectedRows(count) = env + let count = env .metric() .handle_request(logical_region_id, request) .await - .unwrap() - else { - panic!() - }; + .unwrap(); assert_eq!(count, 5); // read data from physical region @@ -306,13 +300,10 @@ mod tests { }); // write data - let Output::AffectedRows(count) = engine + let count = engine .handle_request(logical_region_id, request) .await - .unwrap() - else { - panic!() - }; + .unwrap(); assert_eq!(100, count); } diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 3f009b667e37..94f04b17aa66 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -340,7 +340,6 @@ impl CompactionStatus { mod tests { use std::sync::Mutex; - use common_query::Output; use tokio::sync::oneshot; use super::*; @@ -371,7 +370,7 @@ mod tests { ) .unwrap(); let output = output_rx.await.unwrap().unwrap(); - assert!(matches!(output, Output::AffectedRows(0))); + assert_eq!(output, 0); assert!(scheduler.region_status.is_empty()); // Only one file, picker won't compact it. @@ -389,7 +388,7 @@ mod tests { ) .unwrap(); let output = output_rx.await.unwrap().unwrap(); - assert!(matches!(output, Output::AffectedRows(0))); + assert_eq!(output, 0); assert!(scheduler.region_status.is_empty()); } diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index e3bae3acfb4a..95b1eee3f1f6 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use common_base::readable_size::ReadableSize; -use common_query::Output; use common_telemetry::{debug, error, info}; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; @@ -158,7 +157,7 @@ impl Picker for TwcsPicker { if outputs.is_empty() && expired_ssts.is_empty() { // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request. for waiter in waiters { - waiter.send(Ok(Output::AffectedRows(0))); + waiter.send(Ok(0)); } return None; } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 558dec2a6c68..f1a7e4455d04 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -44,14 +44,13 @@ use std::sync::Arc; use async_trait::async_trait; use common_error::ext::BoxedError; -use common_query::Output; use common_recordbatch::SendableRecordBatchStream; use object_store::manager::ObjectStoreManagerRef; use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; -use store_api::region_request::RegionRequest; +use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; @@ -147,7 +146,11 @@ impl EngineInner { } /// Handles [RegionRequest] and return its executed result. - async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result { + async fn handle_request( + &self, + region_id: RegionId, + request: RegionRequest, + ) -> Result { let _timer = HANDLE_REQUEST_ELAPSED .with_label_values(&[request.type_name()]) .start_timer(); @@ -220,7 +223,7 @@ impl RegionEngine for MitoEngine { &self, region_id: RegionId, request: RegionRequest, - ) -> std::result::Result { + ) -> Result { self.inner .handle_request(region_id, request) .await diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 397ee490984e..a072a90b5cbe 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -110,7 +110,7 @@ async fn test_region_replay() { let engine = env.reopen_engine(engine, MitoConfig::default()).await; - let open_region = engine + let rows = engine .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { @@ -121,9 +121,6 @@ async fn test_region_replay() { ) .await .unwrap(); - let Output::AffectedRows(rows) = open_region else { - unreachable!() - }; assert_eq!(0, rows); let request = ScanRequest::default(); diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index de35ead2772d..42aba40cc03a 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -15,7 +15,6 @@ use std::ops::Range; use api::v1::{ColumnSchema, Rows}; -use common_query::Output; use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use datatypes::prelude::ScalarVector; use datatypes::vectors::TimestampMillisecondVector; @@ -43,7 +42,7 @@ async fn put_and_flush( }; put_rows(engine, region_id, rows).await; - let Output::AffectedRows(rows) = engine + let rows = engine .handle_request( region_id, RegionRequest::Flush(RegionFlushRequest { @@ -51,10 +50,7 @@ async fn put_and_flush( }), ) .await - .unwrap() - else { - unreachable!() - }; + .unwrap(); assert_eq!(0, rows); } @@ -70,20 +66,16 @@ async fn delete_and_flush( rows: build_rows_for_key("a", rows.start, rows.end, 0), }; - let deleted = engine + let rows_affected = engine .handle_request( region_id, RegionRequest::Delete(RegionDeleteRequest { rows }), ) .await .unwrap(); - - let Output::AffectedRows(rows_affected) = deleted else { - unreachable!() - }; assert_eq!(row_cnt, rows_affected); - let Output::AffectedRows(rows) = engine + let rows = engine .handle_request( region_id, RegionRequest::Flush(RegionFlushRequest { @@ -91,10 +83,7 @@ async fn delete_and_flush( }), ) .await - .unwrap() - else { - unreachable!() - }; + .unwrap(); assert_eq!(0, rows); } @@ -142,7 +131,7 @@ async fn test_compaction_region() { .handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {})) .await .unwrap(); - assert!(matches!(output, Output::AffectedRows(0))); + assert_eq!(output, 0); let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); assert_eq!( diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 504123bfd9a1..99b880de7cf2 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -18,7 +18,6 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use common_query::Output; use common_telemetry::{error, info}; use snafu::ResultExt; use store_api::storage::RegionId; @@ -214,7 +213,7 @@ impl RegionFlushTask { /// Consumes the task and notify the sender the job is success. fn on_success(self) { for sender in self.senders { - sender.send(Ok(Output::AffectedRows(0))); + sender.send(Ok(0)); } } @@ -736,7 +735,7 @@ mod tests { .unwrap(); assert!(scheduler.region_status.is_empty()); let output = output_rx.await.unwrap().unwrap(); - assert!(matches!(output, Output::AffectedRows(0))); + assert_eq!(output, 0); assert!(scheduler.region_status.is_empty()); } } diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index da256879d00b..6756c83b64ea 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -16,7 +16,6 @@ use std::mem; use std::sync::Arc; use api::v1::{Mutation, OpType, Rows, WalEntry}; -use common_query::Output; use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::storage::{RegionId, SequenceNumber}; @@ -57,8 +56,7 @@ impl WriteNotify { .send_mut(Err(err.clone()).context(WriteGroupSnafu)); } else { // Send success result. - self.sender - .send_mut(Ok(Output::AffectedRows(self.num_rows))); + self.sender.send_mut(Ok(self.num_rows)); } } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 8ef05b8e1ffd..47241d6bf999 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -23,8 +23,6 @@ use api::helper::{ ColumnDataTypeWrapper, }; use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value}; -use common_query::Output; -use common_query::Output::AffectedRows; use common_telemetry::{info, warn}; use datatypes::prelude::DataType; use prometheus::HistogramTimer; @@ -34,8 +32,9 @@ use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::region_engine::SetReadonlyResponse; use store_api::region_request::{ - RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, - RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, + AffectedRows, RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, + RegionCreateRequest, RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, + RegionTruncateRequest, }; use store_api::storage::{RegionId, SequenceNumber}; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -384,16 +383,16 @@ pub(crate) fn validate_proto_value( /// Oneshot output result sender. #[derive(Debug)] -pub(crate) struct OutputTx(Sender>); +pub(crate) struct OutputTx(Sender>); impl OutputTx { /// Creates a new output sender. - pub(crate) fn new(sender: Sender>) -> OutputTx { + pub(crate) fn new(sender: Sender>) -> OutputTx { OutputTx(sender) } /// Sends the `result`. - pub(crate) fn send(self, result: Result) { + pub(crate) fn send(self, result: Result) { // Ignores send result. let _ = self.0.send(result); } @@ -415,14 +414,14 @@ impl OptionOutputTx { } /// Sends the `result` and consumes the inner sender. - pub(crate) fn send_mut(&mut self, result: Result) { + pub(crate) fn send_mut(&mut self, result: Result) { if let Some(sender) = self.0.take() { sender.send(result); } } /// Sends the `result` and consumes the sender. - pub(crate) fn send(mut self, result: Result) { + pub(crate) fn send(mut self, result: Result) { if let Some(sender) = self.0.take() { sender.send(result); } @@ -434,8 +433,8 @@ impl OptionOutputTx { } } -impl From>> for OptionOutputTx { - fn from(sender: Sender>) -> Self { +impl From>> for OptionOutputTx { + fn from(sender: Sender>) -> Self { Self::new(Some(OutputTx::new(sender))) } } @@ -494,7 +493,7 @@ impl WorkerRequest { pub(crate) fn try_from_region_request( region_id: RegionId, value: RegionRequest, - ) -> Result<(WorkerRequest, Receiver>)> { + ) -> Result<(WorkerRequest, Receiver>)> { let (sender, receiver) = oneshot::channel(); let worker_request = match value { RegionRequest::Put(v) => { @@ -630,7 +629,7 @@ impl FlushFinished { /// Marks the flush job as successful and observes the timer. pub(crate) fn on_success(self) { for sender in self.senders { - sender.send(Ok(Output::AffectedRows(0))); + sender.send(Ok(0)); } } } @@ -685,7 +684,7 @@ impl CompactionFinished { COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64()); for sender in self.senders { - sender.send(Ok(AffectedRows(0))); + sender.send(Ok(0)); } info!("Successfully compacted region: {}", self.region_id); } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index c0469a7ac97f..6804577d02af 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -596,13 +596,10 @@ pub fn delete_rows_schema(request: &RegionCreateRequest) -> Vec Vec) { - let Output::AffectedRows(rows) = engine + let rows = engine .handle_request( region_id, RegionRequest::Flush(RegionFlushRequest { row_group_size }), ) .await - .unwrap() - else { - unreachable!() - }; + .unwrap(); assert_eq!(0, rows); } diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 693235e90f9f..8e6f474a1aeb 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -16,7 +16,6 @@ use std::sync::Arc; -use common_query::Output; use common_telemetry::{debug, error, info, warn}; use snafu::ResultExt; use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef}; @@ -54,7 +53,7 @@ impl RegionWorkerLoop { region_id, version.metadata.schema_version, request.schema_version ); // Returns if it altered. - sender.send(Ok(Output::AffectedRows(0))); + sender.send(Ok(0)); return; } // Validate request. @@ -69,7 +68,7 @@ impl RegionWorkerLoop { "Ignores alter request as it alters nothing, region_id: {}, request: {:?}", region_id, request ); - sender.send(Ok(Output::AffectedRows(0))); + sender.send(Ok(0)); return; } @@ -118,7 +117,7 @@ impl RegionWorkerLoop { ); // Notifies waiters. - sender.send(Ok(Output::AffectedRows(0))); + sender.send(Ok(0)); } } diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index c9e152baa39a..80f1bfa63235 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -14,8 +14,8 @@ //! Handling close request. -use common_query::Output; use common_telemetry::info; +use store_api::region_request::AffectedRows; use store_api::storage::RegionId; use crate::error::Result; @@ -23,9 +23,12 @@ use crate::metrics::REGION_COUNT; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { - pub(crate) async fn handle_close_request(&mut self, region_id: RegionId) -> Result { + pub(crate) async fn handle_close_request( + &mut self, + region_id: RegionId, + ) -> Result { let Some(region) = self.regions.get_region(region_id) else { - return Ok(Output::AffectedRows(0)); + return Ok(0); }; info!("Try to close region {}", region_id); @@ -41,6 +44,6 @@ impl RegionWorkerLoop { REGION_COUNT.dec(); - Ok(Output::AffectedRows(0)) + Ok(0) } } diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 7647866494f2..6d86ddb1887f 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -16,12 +16,11 @@ use std::sync::Arc; -use common_query::Output; use common_telemetry::info; use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataBuilder; -use store_api::region_request::RegionCreateRequest; +use store_api::region_request::{AffectedRows, RegionCreateRequest}; use store_api::storage::RegionId; use crate::error::{InvalidMetadataSnafu, Result}; @@ -34,7 +33,7 @@ impl RegionWorkerLoop { &mut self, region_id: RegionId, request: RegionCreateRequest, - ) -> Result { + ) -> Result { // Checks whether the table exists. if let Some(region) = self.regions.get_region(region_id) { // Region already exists. @@ -45,7 +44,7 @@ impl RegionWorkerLoop { &request.primary_key, )?; - return Ok(Output::AffectedRows(0)); + return Ok(0); } // Convert the request into a RegionMetadata and validate it. @@ -76,6 +75,6 @@ impl RegionWorkerLoop { // Insert the MitoRegion into the RegionMap. self.regions.insert_region(Arc::new(region)); - Ok(Output::AffectedRows(0)) + Ok(0) } } diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 023daca2810b..fa0d1181a988 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -16,12 +16,12 @@ use std::time::Duration; -use common_query::Output; use common_telemetry::{info, warn}; use futures::TryStreamExt; use object_store::util::join_path; use object_store::{EntryMode, ObjectStore}; use snafu::ResultExt; +use store_api::region_request::AffectedRows; use store_api::storage::RegionId; use tokio::time::sleep; @@ -34,7 +34,10 @@ const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; // 5 minutes const MAX_RETRY_TIMES: u64 = 288; // 24 hours (5m * 288) impl RegionWorkerLoop { - pub(crate) async fn handle_drop_request(&mut self, region_id: RegionId) -> Result { + pub(crate) async fn handle_drop_request( + &mut self, + region_id: RegionId, + ) -> Result { let region = self.regions.writable_region(region_id)?; info!("Try to drop region: {}", region_id); @@ -86,7 +89,7 @@ impl RegionWorkerLoop { listener.on_later_drop_end(region_id, removed); }); - Ok(Output::AffectedRows(0)) + Ok(0) } } diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index a90a64e395b0..095da683a8f5 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -16,12 +16,11 @@ use std::sync::Arc; -use common_query::Output; use common_telemetry::info; use object_store::util::join_path; use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; -use store_api::region_request::RegionOpenRequest; +use store_api::region_request::{AffectedRows, RegionOpenRequest}; use store_api::storage::RegionId; use crate::error::{ObjectStoreNotFoundSnafu, OpenDalSnafu, RegionNotFoundSnafu, Result}; @@ -35,9 +34,9 @@ impl RegionWorkerLoop { &mut self, region_id: RegionId, request: RegionOpenRequest, - ) -> Result { + ) -> Result { if self.regions.is_region_exists(region_id) { - return Ok(Output::AffectedRows(0)); + return Ok(0); } let object_store = if let Some(storage_name) = request.options.get("storage") { self.object_store_manager @@ -82,6 +81,6 @@ impl RegionWorkerLoop { // Insert the MitoRegion into the RegionMap. self.regions.insert_region(Arc::new(region)); - Ok(Output::AffectedRows(0)) + Ok(0) } } diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 5b1e9db8a90d..ecb66817b30c 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -14,9 +14,9 @@ //! Handling truncate related requests. -use common_query::Output; use common_telemetry::info; use store_api::logstore::LogStore; +use store_api::region_request::AffectedRows; use store_api::storage::RegionId; use crate::error::Result; @@ -24,7 +24,10 @@ use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionTrun use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { - pub(crate) async fn handle_truncate_request(&mut self, region_id: RegionId) -> Result { + pub(crate) async fn handle_truncate_request( + &mut self, + region_id: RegionId, + ) -> Result { let region = self.regions.writable_region(region_id)?; info!("Try to truncate region {}", region_id); @@ -62,6 +65,6 @@ impl RegionWorkerLoop { region_id, truncated_entry_id, truncated_sequence ); - Ok(Output::AffectedRows(0)) + Ok(0) } } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 27c0d7a93a6c..f80c7d94ec06 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -19,13 +19,12 @@ use std::sync::Arc; use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole}; use async_trait::async_trait; use common_error::ext::BoxedError; -use common_query::Output; use common_recordbatch::SendableRecordBatchStream; use serde::{Deserialize, Serialize}; use crate::logstore::entry; use crate::metadata::RegionMetadataRef; -use crate::region_request::RegionRequest; +use crate::region_request::{AffectedRows, RegionRequest}; use crate::storage::{RegionId, ScanRequest}; /// The result of setting readonly for the region. @@ -114,14 +113,12 @@ pub trait RegionEngine: Send + Sync { /// Name of this engine fn name(&self) -> &str; - /// Handles request to the region. - /// - /// Only query is not included, which is handled in `handle_query` + /// Handles non-query request to the region. Returns the count of affected rows. async fn handle_request( &self, region_id: RegionId, request: RegionRequest, - ) -> Result; + ) -> Result; /// Handles substrait query and return a stream of record batches async fn handle_query( diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 928f2abd73ee..e04382c64f74 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -28,6 +28,8 @@ use crate::metadata::{ use crate::path_utils::region_dir; use crate::storage::{ColumnId, RegionId, ScanRequest}; +pub type AffectedRows = usize; + #[derive(Debug, IntoStaticStr)] pub enum RegionRequest { Put(RegionPutRequest),