Skip to content

Commit

Permalink
refactor: RegionEngine::handle_request always returns affected rows (#…
Browse files Browse the repository at this point in the history
…2874)

* refactor: RegionEngine::handle_request -> handle_execution

Signed-off-by: tison <[email protected]>

* propagate refactor

Signed-off-by: tison <[email protected]>

* revert spell change

Signed-off-by: tison <[email protected]>

* propagate refactor

Signed-off-by: tison <[email protected]>

* cargo clippy

Signed-off-by: tison <[email protected]>

* propagate refactor

Signed-off-by: tison <[email protected]>

* cargo fmt

Signed-off-by: tison <[email protected]>

* more name clarification

Signed-off-by: tison <[email protected]>

* revert rename

Signed-off-by: tison <[email protected]>

* wrap affected rows into RegionResponse

Signed-off-by: tison <[email protected]>

* flatten return AffectedRows

Signed-off-by: tison <[email protected]>

---------

Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun authored Dec 6, 2023
1 parent 1141dbe commit f74715c
Show file tree
Hide file tree
Showing 25 changed files with 122 additions and 165 deletions.
14 changes: 4 additions & 10 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse};
use store_api::region_request::{RegionCloseRequest, RegionRequest};
use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table::scan::StreamScanAdapter;
Expand Down Expand Up @@ -112,7 +112,7 @@ impl RegionServer {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output> {
) -> Result<AffectedRows> {
self.inner.handle_request(region_id, request).await
}

Expand Down Expand Up @@ -209,13 +209,7 @@ impl RegionServerHandler for RegionServer {
// only insert/delete will have multiple results.
let mut affected_rows = 0;
for result in results {
match result {
Output::AffectedRows(rows) => affected_rows += rows,
Output::Stream(_) | Output::RecordBatches(_) => {
// TODO: change the output type to only contains `affected_rows`
unreachable!()
}
}
affected_rows += result;
}

Ok(RegionResponse {
Expand Down Expand Up @@ -294,7 +288,7 @@ impl RegionServerInner {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output> {
) -> Result<AffectedRows> {
let request_type = request.request_type();
let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
.with_label_values(&[request_type])
Expand Down
7 changes: 3 additions & 4 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use query::QueryEngine;
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::RegionRequest;
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use table::TableRef;
use tokio::sync::mpsc::{Receiver, Sender};
Expand Down Expand Up @@ -109,10 +109,9 @@ impl RegionEngine for MockRegionEngine {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output, BoxedError> {
) -> Result<AffectedRows, BoxedError> {
let _ = self.sender.send((region_id, request)).await;

Ok(Output::AffectedRows(0))
Ok(0)
}

async fn handle_query(
Expand Down
32 changes: 16 additions & 16 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use common_catalog::consts::FILE_ENGINE;
use common_error::ext::BoxedError;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{error, info};
use object_store::ObjectStore;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::{
RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, RegionRequest,
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -59,7 +59,7 @@ impl RegionEngine for FileRegionEngine {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output, BoxedError> {
) -> Result<AffectedRows, BoxedError> {
self.inner
.handle_request(region_id, request)
.await
Expand Down Expand Up @@ -149,7 +149,7 @@ impl EngineInner {
&self,
region_id: RegionId,
request: RegionRequest,
) -> EngineResult<Output> {
) -> EngineResult<AffectedRows> {
match request {
RegionRequest::Create(req) => self.handle_create(region_id, req).await,
RegionRequest::Drop(req) => self.handle_drop(region_id, req).await,
Expand Down Expand Up @@ -187,7 +187,7 @@ impl EngineInner {
&self,
region_id: RegionId,
request: RegionCreateRequest,
) -> EngineResult<Output> {
) -> EngineResult<AffectedRows> {
ensure!(
request.engine == FILE_ENGINE,
UnexpectedEngineSnafu {
Expand All @@ -196,15 +196,15 @@ impl EngineInner {
);

if self.exists(region_id).await {
return Ok(Output::AffectedRows(0));
return Ok(0);
}

info!("Try to create region, region_id: {}", region_id);

let _lock = self.region_mutex.lock().await;
// Check again after acquiring the lock
if self.exists(region_id).await {
return Ok(Output::AffectedRows(0));
return Ok(0);
}

let res = FileRegion::create(region_id, request, &self.object_store).await;
Expand All @@ -217,24 +217,24 @@ impl EngineInner {
self.regions.write().unwrap().insert(region_id, region);

info!("A new region is created, region_id: {}", region_id);
Ok(Output::AffectedRows(0))
Ok(0)
}

async fn handle_open(
&self,
region_id: RegionId,
request: RegionOpenRequest,
) -> EngineResult<Output> {
) -> EngineResult<AffectedRows> {
if self.exists(region_id).await {
return Ok(Output::AffectedRows(0));
return Ok(0);
}

info!("Try to open region, region_id: {}", region_id);

let _lock = self.region_mutex.lock().await;
// Check again after acquiring the lock
if self.exists(region_id).await {
return Ok(Output::AffectedRows(0));
return Ok(0);
}

let res = FileRegion::open(region_id, request, &self.object_store).await;
Expand All @@ -247,29 +247,29 @@ impl EngineInner {
self.regions.write().unwrap().insert(region_id, region);

info!("Region opened, region_id: {}", region_id);
Ok(Output::AffectedRows(0))
Ok(0)
}

async fn handle_close(
&self,
region_id: RegionId,
_request: RegionCloseRequest,
) -> EngineResult<Output> {
) -> EngineResult<AffectedRows> {
let _lock = self.region_mutex.lock().await;

let mut regions = self.regions.write().unwrap();
if regions.remove(&region_id).is_some() {
info!("Region closed, region_id: {}", region_id);
}

Ok(Output::AffectedRows(0))
Ok(0)
}

async fn handle_drop(
&self,
region_id: RegionId,
_request: RegionDropRequest,
) -> EngineResult<Output> {
) -> EngineResult<AffectedRows> {
if !self.exists(region_id).await {
return RegionNotFoundSnafu { region_id }.fail();
}
Expand All @@ -291,7 +291,7 @@ impl EngineInner {
let _ = self.regions.write().unwrap().remove(&region_id);

info!("Region dropped, region_id: {}", region_id);
Ok(Output::AffectedRows(0))
Ok(0)
}

async fn get_region(&self, region_id: RegionId) -> Option<FileRegionRef> {
Expand Down
5 changes: 2 additions & 3 deletions src/metric-engine/src/data_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
// limitations under the License.

use api::v1::SemanticType;
use common_query::Output;
use common_telemetry::tracing::warn;
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AddColumn, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest,
AddColumn, AffectedRows, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::RegionId;
Expand Down Expand Up @@ -137,7 +136,7 @@ impl DataRegion {
&self,
region_id: RegionId,
request: RegionPutRequest,
) -> Result<Output> {
) -> Result<AffectedRows> {
let region_id = utils::to_data_region_id(region_id);
self.mito
.handle_request(region_id, RegionRequest::Put(request))
Expand Down
20 changes: 5 additions & 15 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_recordbatch::SendableRecordBatchStream;
use mito2::engine::MitoEngine;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::RegionRequest;
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::RwLock;

Expand Down Expand Up @@ -108,30 +108,20 @@ impl RegionEngine for MetricEngine {
METRIC_ENGINE_NAME
}

/// Handles request to the region.
///
/// Only query is not included, which is handled in `handle_query`
/// Handles non-query request to the region. Returns the count of affected rows.
async fn handle_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output, BoxedError> {
) -> Result<AffectedRows, BoxedError> {
let result = match request {
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Delete(_) => todo!(),
RegionRequest::Create(create) => self
.inner
.create_region(region_id, create)
.await
.map(|_| Output::AffectedRows(0)),
RegionRequest::Create(create) => self.inner.create_region(region_id, create).await,
RegionRequest::Drop(_) => todo!(),
RegionRequest::Open(_) => todo!(),
RegionRequest::Close(_) => todo!(),
RegionRequest::Alter(alter) => self
.inner
.alter_region(region_id, alter)
.await
.map(|_| Output::AffectedRows(0)),
RegionRequest::Alter(alter) => self.inner.alter_region(region_id, alter).await,
RegionRequest::Flush(_) => todo!(),
RegionRequest::Compact(_) => todo!(),
RegionRequest::Truncate(_) => todo!(),
Expand Down
11 changes: 7 additions & 4 deletions src/metric-engine/src/engine/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use common_telemetry::{error, info};
use snafu::OptionExt;
use store_api::region_request::{AlterKind, RegionAlterRequest};
use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest};
use store_api::storage::RegionId;

use crate::engine::MetricEngineInner;
Expand All @@ -28,18 +28,21 @@ impl MetricEngineInner {
&self,
region_id: RegionId,
request: RegionAlterRequest,
) -> Result<()> {
) -> Result<AffectedRows> {
let is_altering_logical_region = self
.state
.read()
.await
.physical_regions()
.contains_key(&region_id);
if is_altering_logical_region {

let result = if is_altering_logical_region {
self.alter_physical_region(region_id, request).await
} else {
self.alter_logical_region(region_id, request).await
}
};

result.map(|_| 0)
}

async fn alter_logical_region(
Expand Down
10 changes: 6 additions & 4 deletions src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use object_store::util::join_dir;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionCreateRequest, RegionRequest};
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::RegionId;

Expand All @@ -50,16 +50,18 @@ impl MetricEngineInner {
&self,
region_id: RegionId,
request: RegionCreateRequest,
) -> Result<()> {
) -> Result<AffectedRows> {
Self::verify_region_create_request(&request)?;

if request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) {
let result = if request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) {
self.create_physical_region(region_id, request).await
} else if request.options.contains_key(LOGICAL_TABLE_METADATA_KEY) {
self.create_logical_region(region_id, request).await
} else {
MissingRegionOptionSnafu {}.fail()
}
};

result.map(|_| 0)
}

/// Initialize a physical metric region at given region id.
Expand Down
Loading

0 comments on commit f74715c

Please sign in to comment.