Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: RegionEngine::handle_request always returns affected rows #2874

Merged
merged 11 commits into from
Dec 6, 2023
14 changes: 4 additions & 10 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse};
use store_api::region_request::{RegionCloseRequest, RegionRequest};
use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table::scan::StreamScanAdapter;
Expand Down Expand Up @@ -112,7 +112,7 @@ impl RegionServer {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output> {
) -> Result<AffectedRows> {
self.inner.handle_request(region_id, request).await
}

Expand Down Expand Up @@ -209,13 +209,7 @@ impl RegionServerHandler for RegionServer {
// only insert/delete will have multiple results.
let mut affected_rows = 0;
for result in results {
match result {
Output::AffectedRows(rows) => affected_rows += rows,
Output::Stream(_) | Output::RecordBatches(_) => {
// TODO: change the output type to only contains `affected_rows`
unreachable!()
}
}
affected_rows += result;
}

Ok(RegionResponse {
Expand Down Expand Up @@ -294,7 +288,7 @@ impl RegionServerInner {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output> {
) -> Result<AffectedRows> {
let request_type = request.request_type();
let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
.with_label_values(&[request_type])
Expand Down
7 changes: 3 additions & 4 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use query::QueryEngine;
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::RegionRequest;
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use table::TableRef;
use tokio::sync::mpsc::{Receiver, Sender};
Expand Down Expand Up @@ -109,10 +109,9 @@ impl RegionEngine for MockRegionEngine {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output, BoxedError> {
) -> Result<AffectedRows, BoxedError> {
let _ = self.sender.send((region_id, request)).await;

Ok(Output::AffectedRows(0))
Ok(0)
}

async fn handle_query(
Expand Down
32 changes: 16 additions & 16 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use common_catalog::consts::FILE_ENGINE;
use common_error::ext::BoxedError;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{error, info};
use object_store::ObjectStore;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::{
RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, RegionRequest,
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -59,7 +59,7 @@ impl RegionEngine for FileRegionEngine {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output, BoxedError> {
) -> Result<AffectedRows, BoxedError> {
self.inner
.handle_request(region_id, request)
.await
Expand Down Expand Up @@ -149,7 +149,7 @@ impl EngineInner {
&self,
region_id: RegionId,
request: RegionRequest,
) -> EngineResult<Output> {
) -> EngineResult<AffectedRows> {
match request {
RegionRequest::Create(req) => self.handle_create(region_id, req).await,
RegionRequest::Drop(req) => self.handle_drop(region_id, req).await,
Expand Down Expand Up @@ -187,7 +187,7 @@ impl EngineInner {
&self,
region_id: RegionId,
request: RegionCreateRequest,
) -> EngineResult<Output> {
) -> EngineResult<AffectedRows> {
ensure!(
request.engine == FILE_ENGINE,
UnexpectedEngineSnafu {
Expand All @@ -196,15 +196,15 @@ impl EngineInner {
);

if self.exists(region_id).await {
return Ok(Output::AffectedRows(0));
return Ok(0);
}

info!("Try to create region, region_id: {}", region_id);

let _lock = self.region_mutex.lock().await;
// Check again after acquiring the lock
if self.exists(region_id).await {
return Ok(Output::AffectedRows(0));
return Ok(0);
}

let res = FileRegion::create(region_id, request, &self.object_store).await;
Expand All @@ -217,24 +217,24 @@ impl EngineInner {
self.regions.write().unwrap().insert(region_id, region);

info!("A new region is created, region_id: {}", region_id);
Ok(Output::AffectedRows(0))
Ok(0)
}

async fn handle_open(
&self,
region_id: RegionId,
request: RegionOpenRequest,
) -> EngineResult<Output> {
) -> EngineResult<AffectedRows> {
if self.exists(region_id).await {
return Ok(Output::AffectedRows(0));
return Ok(0);
}

info!("Try to open region, region_id: {}", region_id);

let _lock = self.region_mutex.lock().await;
// Check again after acquiring the lock
if self.exists(region_id).await {
return Ok(Output::AffectedRows(0));
return Ok(0);
}

let res = FileRegion::open(region_id, request, &self.object_store).await;
Expand All @@ -247,29 +247,29 @@ impl EngineInner {
self.regions.write().unwrap().insert(region_id, region);

info!("Region opened, region_id: {}", region_id);
Ok(Output::AffectedRows(0))
Ok(0)
}

async fn handle_close(
&self,
region_id: RegionId,
_request: RegionCloseRequest,
) -> EngineResult<Output> {
) -> EngineResult<AffectedRows> {
let _lock = self.region_mutex.lock().await;

let mut regions = self.regions.write().unwrap();
if regions.remove(&region_id).is_some() {
info!("Region closed, region_id: {}", region_id);
}

Ok(Output::AffectedRows(0))
Ok(0)
}

async fn handle_drop(
&self,
region_id: RegionId,
_request: RegionDropRequest,
) -> EngineResult<Output> {
) -> EngineResult<AffectedRows> {
if !self.exists(region_id).await {
return RegionNotFoundSnafu { region_id }.fail();
}
Expand All @@ -291,7 +291,7 @@ impl EngineInner {
let _ = self.regions.write().unwrap().remove(&region_id);

info!("Region dropped, region_id: {}", region_id);
Ok(Output::AffectedRows(0))
Ok(0)
}

async fn get_region(&self, region_id: RegionId) -> Option<FileRegionRef> {
Expand Down
5 changes: 2 additions & 3 deletions src/metric-engine/src/data_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
// limitations under the License.

use api::v1::SemanticType;
use common_query::Output;
use common_telemetry::tracing::warn;
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AddColumn, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest,
AddColumn, AffectedRows, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::RegionId;
Expand Down Expand Up @@ -137,7 +136,7 @@ impl DataRegion {
&self,
region_id: RegionId,
request: RegionPutRequest,
) -> Result<Output> {
) -> Result<AffectedRows> {
let region_id = utils::to_data_region_id(region_id);
self.mito
.handle_request(region_id, RegionRequest::Put(request))
Expand Down
20 changes: 5 additions & 15 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_recordbatch::SendableRecordBatchStream;
use mito2::engine::MitoEngine;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::RegionRequest;
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::RwLock;

Expand Down Expand Up @@ -108,30 +108,20 @@ impl RegionEngine for MetricEngine {
METRIC_ENGINE_NAME
}

/// Handles request to the region.
///
/// Only query is not included, which is handled in `handle_query`
/// Handles non-query request to the region. Returns the count of affected rows.
async fn handle_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output, BoxedError> {
) -> Result<AffectedRows, BoxedError> {
let result = match request {
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Delete(_) => todo!(),
RegionRequest::Create(create) => self
.inner
.create_region(region_id, create)
.await
.map(|_| Output::AffectedRows(0)),
RegionRequest::Create(create) => self.inner.create_region(region_id, create).await,
RegionRequest::Drop(_) => todo!(),
RegionRequest::Open(_) => todo!(),
RegionRequest::Close(_) => todo!(),
RegionRequest::Alter(alter) => self
.inner
.alter_region(region_id, alter)
.await
.map(|_| Output::AffectedRows(0)),
RegionRequest::Alter(alter) => self.inner.alter_region(region_id, alter).await,
RegionRequest::Flush(_) => todo!(),
RegionRequest::Compact(_) => todo!(),
RegionRequest::Truncate(_) => todo!(),
Expand Down
11 changes: 7 additions & 4 deletions src/metric-engine/src/engine/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use common_telemetry::{error, info};
use snafu::OptionExt;
use store_api::region_request::{AlterKind, RegionAlterRequest};
use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest};
use store_api::storage::RegionId;

use crate::engine::MetricEngineInner;
Expand All @@ -28,18 +28,21 @@ impl MetricEngineInner {
&self,
region_id: RegionId,
request: RegionAlterRequest,
) -> Result<()> {
) -> Result<AffectedRows> {
let is_altering_logical_region = self
.state
.read()
.await
.physical_regions()
.contains_key(&region_id);
if is_altering_logical_region {

let result = if is_altering_logical_region {
self.alter_physical_region(region_id, request).await
} else {
self.alter_logical_region(region_id, request).await
}
};

result.map(|_| 0)
}

async fn alter_logical_region(
Expand Down
10 changes: 6 additions & 4 deletions src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use object_store::util::join_dir;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionCreateRequest, RegionRequest};
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::RegionId;

Expand All @@ -50,16 +50,18 @@ impl MetricEngineInner {
&self,
region_id: RegionId,
request: RegionCreateRequest,
) -> Result<()> {
) -> Result<AffectedRows> {
Self::verify_region_create_request(&request)?;

if request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) {
let result = if request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) {
self.create_physical_region(region_id, request).await
} else if request.options.contains_key(LOGICAL_TABLE_METADATA_KEY) {
self.create_logical_region(region_id, request).await
} else {
MissingRegionOptionSnafu {}.fail()
}
};

result.map(|_| 0)
}

/// Initialize a physical metric region at given region id.
Expand Down
Loading
Loading