Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: RegionEngine::handle_request always returns affected rows #2874

Merged
merged 11 commits into from
Dec 6, 2023
2 changes: 1 addition & 1 deletion src/datanode/src/alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ impl CountdownTask {
let request = RegionRequest::Close(RegionCloseRequest {});
match self
.region_server
.handle_request(self.region_id, request)
.handle_execution(self.region_id, request)
.await
{
Ok(_) => return true,
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ impl DatanodeBuilder {
tasks.push(async move {
let _permit = semaphore_moved.acquire().await;
region_server
.handle_request(
.handle_execution(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: engine.clone(),
Expand Down
12 changes: 6 additions & 6 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl RegionHeartbeatResponseHandler {
region_dir: region_dir(&region_storage_path, region_id),
options,
});
let result = region_server.handle_request(region_id, request).await;
let result = region_server.handle_execution(region_id, request).await;

let success = result.is_ok();
let error = result.as_ref().map_err(|e| e.to_string()).err();
Expand All @@ -77,7 +77,7 @@ impl RegionHeartbeatResponseHandler {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
let request = RegionRequest::Close(RegionCloseRequest {});
let result = region_server.handle_request(region_id, request).await;
let result = region_server.handle_execution(region_id, request).await;

match result {
Ok(_) => InstructionReply::CloseRegion(SimpleReply {
Expand Down Expand Up @@ -257,7 +257,7 @@ mod tests {
let builder = CreateRequestBuilder::new();
let create_req = builder.build();
region_server
.handle_request(region_id, RegionRequest::Create(create_req))
.handle_execution(region_id, RegionRequest::Create(create_req))
.await
.unwrap();

Expand Down Expand Up @@ -306,12 +306,12 @@ mod tests {
create_req.region_dir = region_dir(storage_path, region_id);

region_server
.handle_request(region_id, RegionRequest::Create(create_req))
.handle_execution(region_id, RegionRequest::Create(create_req))
.await
.unwrap();

region_server
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();
let mut heartbeat_env = HeartbeatResponseTestEnv::new();
Expand Down Expand Up @@ -386,7 +386,7 @@ mod tests {
create_req.region_dir = region_dir(storage_path, region_id);

region_server
.handle_request(region_id, RegionRequest::Create(create_req))
.handle_execution(region_id, RegionRequest::Create(create_req))
.await
.unwrap();

Expand Down
24 changes: 9 additions & 15 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ impl RegionServer {
self.inner.register_engine(engine);
}

pub async fn handle_request(
pub async fn handle_execution(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output> {
self.inner.handle_request(region_id, request).await
) -> Result<usize> {
self.inner.handle_execution(region_id, request).await
}

#[tracing::instrument(skip_all)]
Expand Down Expand Up @@ -194,7 +194,7 @@ impl RegionServerHandler for RegionServer {
));
async move {
self_to_move
.handle_request(region_id, req)
.handle_execution(region_id, req)
.trace(span)
.await
}
Expand All @@ -209,13 +209,7 @@ impl RegionServerHandler for RegionServer {
// only insert/delete will have multiple results.
let mut affected_rows = 0;
for result in results {
match result {
Output::AffectedRows(rows) => affected_rows += rows,
Output::Stream(_) | Output::RecordBatches(_) => {
// TODO: change the output type to only contains `affected_rows`
unreachable!()
}
}
affected_rows += result;
}

Ok(RegionResponse {
Expand Down Expand Up @@ -290,11 +284,11 @@ impl RegionServerInner {
.insert(engine_name.to_string(), engine);
}

pub async fn handle_request(
pub async fn handle_execution(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output> {
) -> Result<usize> {
let request_type = request.request_type();
let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
.with_label_values(&[request_type])
Expand Down Expand Up @@ -329,7 +323,7 @@ impl RegionServerInner {
let engine_type = engine.name();

let result = engine
.handle_request(region_id, request)
.handle_execution(region_id, request)
.trace(info_span!(
"RegionEngine::handle_region_request",
engine_type
Expand Down Expand Up @@ -409,7 +403,7 @@ impl RegionServerInner {
let region_id = *region.key();
let engine = region.value();
let closed = engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await;
match closed {
Ok(_) => info!("Region {region_id} is closed"),
Expand Down
7 changes: 3 additions & 4 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,13 @@ impl RegionEngine for MockRegionEngine {
"mock"
}

async fn handle_request(
async fn handle_execution(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output, BoxedError> {
) -> Result<usize, BoxedError> {
let _ = self.sender.send((region_id, request)).await;

Ok(Output::AffectedRows(0))
Ok(0)
}

async fn handle_query(
Expand Down
31 changes: 15 additions & 16 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use common_catalog::consts::FILE_ENGINE;
use common_error::ext::BoxedError;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{error, info};
use object_store::ObjectStore;
Expand Down Expand Up @@ -55,11 +54,11 @@ impl RegionEngine for FileRegionEngine {
FILE_ENGINE
}

async fn handle_request(
async fn handle_execution(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output, BoxedError> {
) -> Result<usize, BoxedError> {
self.inner
.handle_request(region_id, request)
.await
Expand Down Expand Up @@ -149,7 +148,7 @@ impl EngineInner {
&self,
region_id: RegionId,
request: RegionRequest,
) -> EngineResult<Output> {
) -> EngineResult<usize> {
match request {
RegionRequest::Create(req) => self.handle_create(region_id, req).await,
RegionRequest::Drop(req) => self.handle_drop(region_id, req).await,
Expand Down Expand Up @@ -187,7 +186,7 @@ impl EngineInner {
&self,
region_id: RegionId,
request: RegionCreateRequest,
) -> EngineResult<Output> {
) -> EngineResult<usize> {
ensure!(
request.engine == FILE_ENGINE,
UnexpectedEngineSnafu {
Expand All @@ -196,15 +195,15 @@ impl EngineInner {
);

if self.exists(region_id).await {
return Ok(Output::AffectedRows(0));
return Ok(0);
}

info!("Try to create region, region_id: {}", region_id);

let _lock = self.region_mutex.lock().await;
// Check again after acquiring the lock
if self.exists(region_id).await {
return Ok(Output::AffectedRows(0));
return Ok(0);
}

let res = FileRegion::create(region_id, request, &self.object_store).await;
Expand All @@ -217,24 +216,24 @@ impl EngineInner {
self.regions.write().unwrap().insert(region_id, region);

info!("A new region is created, region_id: {}", region_id);
Ok(Output::AffectedRows(0))
Ok(0)
}

async fn handle_open(
&self,
region_id: RegionId,
request: RegionOpenRequest,
) -> EngineResult<Output> {
) -> EngineResult<usize> {
if self.exists(region_id).await {
return Ok(Output::AffectedRows(0));
return Ok(0);
}

info!("Try to open region, region_id: {}", region_id);

let _lock = self.region_mutex.lock().await;
// Check again after acquiring the lock
if self.exists(region_id).await {
return Ok(Output::AffectedRows(0));
return Ok(0);
}

let res = FileRegion::open(region_id, request, &self.object_store).await;
Expand All @@ -247,29 +246,29 @@ impl EngineInner {
self.regions.write().unwrap().insert(region_id, region);

info!("Region opened, region_id: {}", region_id);
Ok(Output::AffectedRows(0))
Ok(0)
}

async fn handle_close(
&self,
region_id: RegionId,
_request: RegionCloseRequest,
) -> EngineResult<Output> {
) -> EngineResult<usize> {
let _lock = self.region_mutex.lock().await;

let mut regions = self.regions.write().unwrap();
if regions.remove(&region_id).is_some() {
info!("Region closed, region_id: {}", region_id);
}

Ok(Output::AffectedRows(0))
Ok(0)
}

async fn handle_drop(
&self,
region_id: RegionId,
_request: RegionDropRequest,
) -> EngineResult<Output> {
) -> EngineResult<usize> {
if !self.exists(region_id).await {
return RegionNotFoundSnafu { region_id }.fail();
}
Expand All @@ -291,7 +290,7 @@ impl EngineInner {
let _ = self.regions.write().unwrap().remove(&region_id);

info!("Region dropped, region_id: {}", region_id);
Ok(Output::AffectedRows(0))
Ok(0)
}

async fn get_region(&self, region_id: RegionId) -> Option<FileRegionRef> {
Expand Down
7 changes: 3 additions & 4 deletions src/metric-engine/src/data_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use api::v1::SemanticType;
use common_query::Output;
use common_telemetry::tracing::warn;
use mito2::engine::MitoEngine;
use snafu::ResultExt;
Expand Down Expand Up @@ -125,7 +124,7 @@ impl DataRegion {
{
let _timer = MITO_DDL_DURATION.start_timer();
self.mito
.handle_request(region_id, alter_request)
.handle_execution(region_id, alter_request)
.await
.context(MitoWriteOperationSnafu)?;
}
Expand All @@ -137,10 +136,10 @@ impl DataRegion {
&self,
region_id: RegionId,
request: RegionPutRequest,
) -> Result<Output> {
) -> Result<usize> {
let region_id = utils::to_data_region_id(region_id);
self.mito
.handle_request(region_id, RegionRequest::Put(request))
.handle_execution(region_id, RegionRequest::Put(request))
.await
.context(MitoWriteOperationSnafu)
}
Expand Down
24 changes: 9 additions & 15 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,30 +108,24 @@ impl RegionEngine for MetricEngine {
METRIC_ENGINE_NAME
}

/// Handles request to the region.
///
/// Only query is not included, which is handled in `handle_query`
async fn handle_request(
/// Handles non-query execution to the region. Returns the count of affected rows.
async fn handle_execution(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output, BoxedError> {
) -> Result<usize, BoxedError> {
let result = match request {
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Delete(_) => todo!(),
RegionRequest::Create(create) => self
.inner
.create_region(region_id, create)
.await
.map(|_| Output::AffectedRows(0)),
RegionRequest::Create(create) => {
self.inner.create_region(region_id, create).await.map(|_| 0)
tisonkun marked this conversation as resolved.
Show resolved Hide resolved
}
RegionRequest::Drop(_) => todo!(),
RegionRequest::Open(_) => todo!(),
RegionRequest::Close(_) => todo!(),
RegionRequest::Alter(alter) => self
.inner
.alter_region(region_id, alter)
.await
.map(|_| Output::AffectedRows(0)),
RegionRequest::Alter(alter) => {
self.inner.alter_region(region_id, alter).await.map(|_| 0)
tisonkun marked this conversation as resolved.
Show resolved Hide resolved
}
RegionRequest::Flush(_) => todo!(),
RegionRequest::Compact(_) => todo!(),
RegionRequest::Truncate(_) => todo!(),
Expand Down
4 changes: 2 additions & 2 deletions src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl MetricEngineInner {
let create_metadata_region_request =
self.create_request_for_metadata_region(&request.region_dir);
self.mito
.handle_request(
.handle_execution(
metadata_region_id,
RegionRequest::Create(create_metadata_region_request),
)
Expand All @@ -91,7 +91,7 @@ impl MetricEngineInner {
.map(|metadata| metadata.column_schema.name.clone())
.collect::<HashSet<_>>();
self.mito
.handle_request(
.handle_execution(
data_region_id,
RegionRequest::Create(create_data_region_request),
)
Expand Down
Loading
Loading