Skip to content

Commit

Permalink
wrap affected rows into RegionResponse
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed Dec 6, 2023
1 parent 0abf5e7 commit a6e734d
Show file tree
Hide file tree
Showing 15 changed files with 61 additions and 41 deletions.
6 changes: 3 additions & 3 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse};
use store_api::region_request::{RegionCloseRequest, RegionRequest};
use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table::scan::StreamScanAdapter;
Expand Down Expand Up @@ -112,7 +112,7 @@ impl RegionServer {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<usize> {
) -> Result<AffectedRows> {
self.inner.handle_request(region_id, request).await
}

Expand Down Expand Up @@ -288,7 +288,7 @@ impl RegionServerInner {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<usize> {
) -> Result<AffectedRows> {
let request_type = request.request_type();
let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
.with_label_values(&[request_type])
Expand Down
4 changes: 2 additions & 2 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use query::QueryEngine;
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::RegionRequest;
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use table::TableRef;
use tokio::sync::mpsc::{Receiver, Sender};
Expand Down Expand Up @@ -109,7 +109,7 @@ impl RegionEngine for MockRegionEngine {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<usize, BoxedError> {
) -> Result<AffectedRows, BoxedError> {
let _ = self.sender.send((region_id, request)).await;
Ok(0)
}
Expand Down
15 changes: 8 additions & 7 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::{
RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, RegionRequest,
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -58,7 +59,7 @@ impl RegionEngine for FileRegionEngine {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<usize, BoxedError> {
) -> Result<AffectedRows, BoxedError> {
self.inner
.handle_request(region_id, request)
.await
Expand Down Expand Up @@ -148,7 +149,7 @@ impl EngineInner {
&self,
region_id: RegionId,
request: RegionRequest,
) -> EngineResult<usize> {
) -> EngineResult<AffectedRows> {
match request {
RegionRequest::Create(req) => self.handle_create(region_id, req).await,
RegionRequest::Drop(req) => self.handle_drop(region_id, req).await,
Expand Down Expand Up @@ -186,7 +187,7 @@ impl EngineInner {
&self,
region_id: RegionId,
request: RegionCreateRequest,
) -> EngineResult<usize> {
) -> EngineResult<AffectedRows> {
ensure!(
request.engine == FILE_ENGINE,
UnexpectedEngineSnafu {
Expand Down Expand Up @@ -223,7 +224,7 @@ impl EngineInner {
&self,
region_id: RegionId,
request: RegionOpenRequest,
) -> EngineResult<usize> {
) -> EngineResult<AffectedRows> {
if self.exists(region_id).await {
return Ok(0);
}
Expand Down Expand Up @@ -253,7 +254,7 @@ impl EngineInner {
&self,
region_id: RegionId,
_request: RegionCloseRequest,
) -> EngineResult<usize> {
) -> EngineResult<AffectedRows> {
let _lock = self.region_mutex.lock().await;

let mut regions = self.regions.write().unwrap();
Expand All @@ -268,7 +269,7 @@ impl EngineInner {
&self,
region_id: RegionId,
_request: RegionDropRequest,
) -> EngineResult<usize> {
) -> EngineResult<AffectedRows> {
if !self.exists(region_id).await {
return RegionNotFoundSnafu { region_id }.fail();
}
Expand Down
4 changes: 2 additions & 2 deletions src/metric-engine/src/data_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AddColumn, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest,
AddColumn, AffectedRows, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::RegionId;
Expand Down Expand Up @@ -136,7 +136,7 @@ impl DataRegion {
&self,
region_id: RegionId,
request: RegionPutRequest,
) -> Result<usize> {
) -> Result<AffectedRows> {
let region_id = utils::to_data_region_id(region_id);
self.mito
.handle_request(region_id, RegionRequest::Put(request))
Expand Down
4 changes: 2 additions & 2 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_recordbatch::SendableRecordBatchStream;
use mito2::engine::MitoEngine;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::RegionRequest;
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::RwLock;

Expand Down Expand Up @@ -113,7 +113,7 @@ impl RegionEngine for MetricEngine {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<usize, BoxedError> {
) -> Result<AffectedRows, BoxedError> {
let result = match request {
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Delete(_) => todo!(),
Expand Down
6 changes: 3 additions & 3 deletions src/metric-engine/src/engine/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType};
use common_telemetry::{error, info};
use snafu::OptionExt;
use store_api::region_request::RegionPutRequest;
use store_api::region_request::{AffectedRows, RegionPutRequest};
use store_api::storage::{RegionId, TableId};

use crate::consts::{DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, RANDOM_STATE};
Expand All @@ -36,7 +36,7 @@ impl MetricEngineInner {
&self,
region_id: RegionId,
request: RegionPutRequest,
) -> Result<usize> {
) -> Result<AffectedRows> {
let is_putting_physical_region = self
.state
.read()
Expand All @@ -60,7 +60,7 @@ impl MetricEngineInner {
&self,
logical_region_id: RegionId,
mut request: RegionPutRequest,
) -> Result<usize> {
) -> Result<AffectedRows> {
let physical_region_id = *self
.state
.read()
Expand Down
10 changes: 7 additions & 3 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::RegionRequest;
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
Expand Down Expand Up @@ -146,7 +146,11 @@ impl EngineInner {
}

/// Handles [RegionRequest] and return its executed result.
async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result<usize> {
async fn handle_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<AffectedRows> {
let _timer = HANDLE_REQUEST_ELAPSED
.with_label_values(&[request.type_name()])
.start_timer();
Expand Down Expand Up @@ -219,7 +223,7 @@ impl RegionEngine for MitoEngine {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<usize, BoxedError> {
) -> Result<AffectedRows, BoxedError> {
self.inner
.handle_request(region_id, request)
.await
Expand Down
21 changes: 11 additions & 10 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::region_engine::SetReadonlyResponse;
use store_api::region_request::{
RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
AffectedRows, RegionAlterRequest, RegionCloseRequest, RegionCompactRequest,
RegionCreateRequest, RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
RegionTruncateRequest,
};
use store_api::storage::{RegionId, SequenceNumber};
use tokio::sync::oneshot::{self, Receiver, Sender};
Expand Down Expand Up @@ -382,16 +383,16 @@ pub(crate) fn validate_proto_value(

/// Oneshot output result sender.
#[derive(Debug)]
pub(crate) struct OutputTx(Sender<Result<usize>>);
pub(crate) struct OutputTx(Sender<Result<AffectedRows>>);

impl OutputTx {
/// Creates a new output sender.
pub(crate) fn new(sender: Sender<Result<usize>>) -> OutputTx {
pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
OutputTx(sender)
}

/// Sends the `result`.
pub(crate) fn send(self, result: Result<usize>) {
pub(crate) fn send(self, result: Result<AffectedRows>) {
// Ignores send result.
let _ = self.0.send(result);
}
Expand All @@ -413,14 +414,14 @@ impl OptionOutputTx {
}

/// Sends the `result` and consumes the inner sender.
pub(crate) fn send_mut(&mut self, result: Result<usize>) {
pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
if let Some(sender) = self.0.take() {
sender.send(result);
}
}

/// Sends the `result` and consumes the sender.
pub(crate) fn send(mut self, result: Result<usize>) {
pub(crate) fn send(mut self, result: Result<AffectedRows>) {
if let Some(sender) = self.0.take() {
sender.send(result);
}
Expand All @@ -432,8 +433,8 @@ impl OptionOutputTx {
}
}

impl From<Sender<Result<usize>>> for OptionOutputTx {
fn from(sender: Sender<Result<usize>>) -> Self {
impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
fn from(sender: Sender<Result<AffectedRows>>) -> Self {
Self::new(Some(OutputTx::new(sender)))
}
}
Expand Down Expand Up @@ -492,7 +493,7 @@ impl WorkerRequest {
pub(crate) fn try_from_region_request(
region_id: RegionId,
value: RegionRequest,
) -> Result<(WorkerRequest, Receiver<Result<usize>>)> {
) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
let (sender, receiver) = oneshot::channel();
let worker_request = match value {
RegionRequest::Put(v) => {
Expand Down
6 changes: 5 additions & 1 deletion src/mito2/src/worker/handle_close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
//! Handling close request.
use common_telemetry::info;
use store_api::region_request::AffectedRows;
use store_api::storage::RegionId;

use crate::error::Result;
use crate::metrics::REGION_COUNT;
use crate::worker::RegionWorkerLoop;

impl<S> RegionWorkerLoop<S> {
pub(crate) async fn handle_close_request(&mut self, region_id: RegionId) -> Result<usize> {
pub(crate) async fn handle_close_request(
&mut self,
region_id: RegionId,
) -> Result<AffectedRows> {
let Some(region) = self.regions.get_region(region_id) else {
return Ok(0);
};
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/worker/handle_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common_telemetry::info;
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataBuilder;
use store_api::region_request::RegionCreateRequest;
use store_api::region_request::{AffectedRows, RegionCreateRequest};
use store_api::storage::RegionId;

use crate::error::{InvalidMetadataSnafu, Result};
Expand All @@ -33,7 +33,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
&mut self,
region_id: RegionId,
request: RegionCreateRequest,
) -> Result<usize> {
) -> Result<AffectedRows> {
// Checks whether the table exists.
if let Some(region) = self.regions.get_region(region_id) {
// Region already exists.
Expand Down
6 changes: 5 additions & 1 deletion src/mito2/src/worker/handle_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use futures::TryStreamExt;
use object_store::util::join_path;
use object_store::{EntryMode, ObjectStore};
use snafu::ResultExt;
use store_api::region_request::AffectedRows;
use store_api::storage::RegionId;
use tokio::time::sleep;

Expand All @@ -33,7 +34,10 @@ const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; // 5 minutes
const MAX_RETRY_TIMES: u64 = 288; // 24 hours (5m * 288)

impl<S> RegionWorkerLoop<S> {
pub(crate) async fn handle_drop_request(&mut self, region_id: RegionId) -> Result<usize> {
pub(crate) async fn handle_drop_request(
&mut self,
region_id: RegionId,
) -> Result<AffectedRows> {
let region = self.regions.writable_region(region_id)?;

info!("Try to drop region: {}", region_id);
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/worker/handle_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common_telemetry::info;
use object_store::util::join_path;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::region_request::RegionOpenRequest;
use store_api::region_request::{AffectedRows, RegionOpenRequest};
use store_api::storage::RegionId;

use crate::error::{ObjectStoreNotFoundSnafu, OpenDalSnafu, RegionNotFoundSnafu, Result};
Expand All @@ -34,7 +34,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
&mut self,
region_id: RegionId,
request: RegionOpenRequest,
) -> Result<usize> {
) -> Result<AffectedRows> {
if self.regions.is_region_exists(region_id) {
return Ok(0);
}
Expand Down
6 changes: 5 additions & 1 deletion src/mito2/src/worker/handle_truncate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@
use common_telemetry::info;
use store_api::logstore::LogStore;
use store_api::region_request::AffectedRows;
use store_api::storage::RegionId;

use crate::error::Result;
use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionTruncate};
use crate::worker::RegionWorkerLoop;

impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_truncate_request(&mut self, region_id: RegionId) -> Result<usize> {
pub(crate) async fn handle_truncate_request(
&mut self,
region_id: RegionId,
) -> Result<AffectedRows> {
let region = self.regions.writable_region(region_id)?;

info!("Try to truncate region {}", region_id);
Expand Down
4 changes: 2 additions & 2 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize};

use crate::logstore::entry;
use crate::metadata::RegionMetadataRef;
use crate::region_request::RegionRequest;
use crate::region_request::{AffectedRows, RegionRequest};
use crate::storage::{RegionId, ScanRequest};

/// The result of setting readonly for the region.
Expand Down Expand Up @@ -118,7 +118,7 @@ pub trait RegionEngine: Send + Sync {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<usize, BoxedError>;
) -> Result<AffectedRows, BoxedError>;

/// Handles substrait query and return a stream of record batches
async fn handle_query(
Expand Down
2 changes: 2 additions & 0 deletions src/store-api/src/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use crate::metadata::{
use crate::path_utils::region_dir;
use crate::storage::{ColumnId, RegionId, ScanRequest};

pub type AffectedRows = usize;

#[derive(Debug, IntoStaticStr)]
pub enum RegionRequest {
Put(RegionPutRequest),
Expand Down

0 comments on commit a6e734d

Please sign in to comment.