Skip to content

Commit

Permalink
refactor: merge RegionHandleResult into RegionHandleResponse (#3721)
Browse files Browse the repository at this point in the history
* refactor: merge RegionHandleResult into RegionHandleResponse

Signed-off-by: tison <[email protected]>

* RegionResponse to api::region

Signed-off-by: tison <[email protected]>

* order

Signed-off-by: tison <[email protected]>

---------

Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun authored Apr 17, 2024
1 parent 16aef70 commit 50ae4dc
Show file tree
Hide file tree
Showing 15 changed files with 108 additions and 103 deletions.
1 change: 1 addition & 0 deletions src/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod prom_store {
}
}

pub mod region;
pub mod v1;

pub use greptime_proto;
Expand Down
42 changes: 42 additions & 0 deletions src/api/src/region.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use common_base::AffectedRows;
use greptime_proto::v1::region::RegionResponse as RegionResponseV1;

/// This result struct is derived from [RegionResponseV1]
#[derive(Debug)]
pub struct RegionResponse {
pub affected_rows: AffectedRows,
pub extension: HashMap<String, Vec<u8>>,
}

impl RegionResponse {
pub fn from_region_response(region_response: RegionResponseV1) -> Self {
Self {
affected_rows: region_response.affected_rows as _,
extension: region_response.extension,
}
}

/// Creates one response without extension
pub fn new(affected_rows: AffectedRows) -> Self {
Self {
affected_rows,
extension: Default::default(),
}
}
}
11 changes: 6 additions & 5 deletions src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::ResponseHeader;
use arc_swap::ArcSwapOption;
Expand All @@ -23,7 +24,7 @@ use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_meta::datanode_manager::{Datanode, HandleResponse};
use common_meta::datanode_manager::Datanode;
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
Expand All @@ -46,7 +47,7 @@ pub struct RegionRequester {

#[async_trait]
impl Datanode for RegionRequester {
async fn handle(&self, request: RegionRequest) -> MetaResult<HandleResponse> {
async fn handle(&self, request: RegionRequest) -> MetaResult<RegionResponse> {
self.handle_inner(request).await.map_err(|err| {
if err.should_retry() {
meta_error::Error::RetryLater {
Expand Down Expand Up @@ -165,7 +166,7 @@ impl RegionRequester {
Ok(Box::pin(record_batch_stream))
}

async fn handle_inner(&self, request: RegionRequest) -> Result<HandleResponse> {
async fn handle_inner(&self, request: RegionRequest) -> Result<RegionResponse> {
let request_type = request
.body
.as_ref()
Expand Down Expand Up @@ -194,10 +195,10 @@ impl RegionRequester {

check_response_header(&response.header)?;

Ok(HandleResponse::from_region_response(response))
Ok(RegionResponse::from_region_response(response))
}

pub async fn handle(&self, request: RegionRequest) -> Result<HandleResponse> {
pub async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
self.handle_inner(request).await
}
}
Expand Down
30 changes: 3 additions & 27 deletions src/common/meta/src/datanode_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;

Expand All @@ -26,7 +26,7 @@ use crate::peer::Peer;
#[async_trait::async_trait]
pub trait Datanode: Send + Sync {
/// Handles DML, and DDL requests.
async fn handle(&self, request: RegionRequest) -> Result<HandleResponse>;
async fn handle(&self, request: RegionRequest) -> Result<RegionResponse>;

/// Handles query requests
async fn handle_query(&self, request: QueryRequest) -> Result<SendableRecordBatchStream>;
Expand All @@ -42,27 +42,3 @@ pub trait DatanodeManager: Send + Sync {
}

pub type DatanodeManagerRef = Arc<dyn DatanodeManager>;

/// This result struct is derived from [RegionResponse]
#[derive(Debug)]
pub struct HandleResponse {
pub affected_rows: AffectedRows,
pub extension: HashMap<String, Vec<u8>>,
}

impl HandleResponse {
pub fn from_region_response(region_response: RegionResponse) -> Self {
Self {
affected_rows: region_response.affected_rows as _,
extension: region_response.extension,
}
}

/// Creates one response without extension
pub fn new(affected_rows: AffectedRows) -> Self {
Self {
affected_rows,
extension: Default::default(),
}
}
}
8 changes: 4 additions & 4 deletions src/common/meta/src/ddl/drop_database/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ impl State for DropDatabaseExecutor {
mod tests {
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;

use crate::datanode_manager::HandleResponse;
use crate::ddl::drop_database::cursor::DropDatabaseCursor;
use crate::ddl::drop_database::executor::DropDatabaseExecutor;
use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
Expand All @@ -144,8 +144,8 @@ mod tests {

#[async_trait::async_trait]
impl MockDatanodeHandler for NaiveDatanodeHandler {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<HandleResponse> {
Ok(HandleResponse::new(0))
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
Ok(RegionResponse::new(0))
}

async fn handle_query(
Expand Down Expand Up @@ -291,7 +291,7 @@ mod tests {

#[async_trait::async_trait]
impl MockDatanodeHandler for RetryErrorDatanodeHandler {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<HandleResponse> {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
Err(Error::RetryLater {
source: BoxedError::new(
error::UnexpectedSnafu {
Expand Down
18 changes: 9 additions & 9 deletions src/common/meta/src/ddl/test_util/datanode_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use common_error::ext::{BoxedError, ErrorExt, StackError};
use common_error::status_code::StatusCode;
Expand All @@ -20,14 +21,13 @@ use common_telemetry::debug;
use snafu::{ResultExt, Snafu};
use tokio::sync::mpsc;

use crate::datanode_manager::HandleResponse;
use crate::error::{self, Error, Result};
use crate::peer::Peer;
use crate::test_util::MockDatanodeHandler;

#[async_trait::async_trait]
impl MockDatanodeHandler for () {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<HandleResponse> {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
unreachable!()
}

Expand All @@ -45,10 +45,10 @@ pub struct DatanodeWatcher(pub mpsc::Sender<(Peer, RegionRequest)>);

#[async_trait::async_trait]
impl MockDatanodeHandler for DatanodeWatcher {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
self.0.send((peer.clone(), request)).await.unwrap();
Ok(HandleResponse::new(0))
Ok(RegionResponse::new(0))
}

async fn handle_query(
Expand All @@ -65,7 +65,7 @@ pub struct RetryErrorDatanodeHandler;

#[async_trait::async_trait]
impl MockDatanodeHandler for RetryErrorDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
debug!("Returning retry later for request: {request:?}, peer: {peer:?}");
Err(Error::RetryLater {
source: BoxedError::new(
Expand All @@ -91,7 +91,7 @@ pub struct UnexpectedErrorDatanodeHandler;

#[async_trait::async_trait]
impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
error::UnexpectedSnafu {
err_msg: "mock error",
Expand Down Expand Up @@ -135,7 +135,7 @@ impl ErrorExt for MockRequestOutdatedError {

#[async_trait::async_trait]
impl MockDatanodeHandler for RequestOutdatedErrorDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
Err(BoxedError::new(MockRequestOutdatedError)).context(error::ExternalSnafu)
}
Expand All @@ -154,9 +154,9 @@ pub struct NaiveDatanodeHandler;

#[async_trait::async_trait]
impl MockDatanodeHandler for NaiveDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
Ok(HandleResponse::new(0))
Ok(RegionResponse::new(0))
}

async fn handle_query(
Expand Down
9 changes: 4 additions & 5 deletions src/common/meta/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@

use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;

use crate::cache_invalidator::DummyCacheInvalidator;
use crate::datanode_manager::{
Datanode, DatanodeManager, DatanodeManagerRef, DatanodeRef, HandleResponse,
};
use crate::datanode_manager::{Datanode, DatanodeManager, DatanodeManagerRef, DatanodeRef};
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::DdlContext;
use crate::error::Result;
Expand All @@ -35,7 +34,7 @@ use crate::wal_options_allocator::WalOptionsAllocator;

#[async_trait::async_trait]
pub trait MockDatanodeHandler: Sync + Send + Clone {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse>;
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse>;

async fn handle_query(
&self,
Expand Down Expand Up @@ -65,7 +64,7 @@ struct MockDatanode<T> {

#[async_trait::async_trait]
impl<T: MockDatanodeHandler> Datanode for MockDatanode<T> {
async fn handle(&self, request: RegionRequest) -> Result<HandleResponse> {
async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
self.handler.handle(&self.peer, request).await
}

Expand Down
16 changes: 8 additions & 8 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ use std::fmt::Debug;
use std::ops::Deref;
use std::sync::{Arc, Mutex, RwLock};

use api::v1::region::{region_request, QueryRequest, RegionResponse};
use api::region::RegionResponse;
use api::v1::region::{region_request, QueryRequest, RegionResponse as RegionResponseV1};
use api::v1::{ResponseHeader, Status};
use arrow_flight::{FlightData, Ticket};
use async_trait::async_trait;
use bytes::Bytes;
use common_error::ext::BoxedError;
use common_error::status_code::StatusCode;
use common_meta::datanode_manager::HandleResponse;
use common_query::logical_plan::Expr;
use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::{DfPhysicalPlan, OutputData};
Expand Down Expand Up @@ -129,7 +129,7 @@ impl RegionServer {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<HandleResponse> {
) -> Result<RegionResponse> {
self.inner.handle_request(region_id, request).await
}

Expand Down Expand Up @@ -218,7 +218,7 @@ impl RegionServer {

#[async_trait]
impl RegionServerHandler for RegionServer {
async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponse> {
async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
let is_parallel = matches!(
request,
region_request::Body::Inserts(_) | region_request::Body::Deletes(_)
Expand Down Expand Up @@ -276,7 +276,7 @@ impl RegionServerHandler for RegionServer {
extension.extend(result.extension);
}

Ok(RegionResponse {
Ok(RegionResponseV1 {
header: Some(ResponseHeader {
status: Some(Status {
status_code: StatusCode::Success as _,
Expand Down Expand Up @@ -465,7 +465,7 @@ impl RegionServerInner {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<HandleResponse> {
) -> Result<RegionResponse> {
let request_type = request.request_type();
let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
.with_label_values(&[request_type])
Expand All @@ -490,7 +490,7 @@ impl RegionServerInner {

let engine = match self.get_engine(region_id, &region_change)? {
CurrentEngine::Engine(engine) => engine,
CurrentEngine::EarlyReturn(rows) => return Ok(HandleResponse::new(rows)),
CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
};

// Sets corresponding region status to registering/deregistering before the operation.
Expand All @@ -505,7 +505,7 @@ impl RegionServerInner {
// Sets corresponding region status to ready.
self.set_region_status_ready(region_id, engine, region_change)
.await?;
Ok(HandleResponse {
Ok(RegionResponse {
affected_rows: result.affected_rows,
extension: result.extension,
})
Expand Down
Loading

0 comments on commit 50ae4dc

Please sign in to comment.