Skip to content

Commit

Permalink
feat: remove RegionRequestHandler.handle
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored and WenyXu committed Sep 12, 2023
1 parent 95cd86e commit 094d463
Show file tree
Hide file tree
Showing 9 changed files with 13 additions and 44 deletions.
5 changes: 1 addition & 4 deletions src/client/src/region_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@

use std::sync::Arc;

use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::QueryRequest;
use async_trait::async_trait;
use common_meta::datanode_manager::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;

use crate::error::Result;

#[async_trait]
pub trait RegionRequestHandler: Send + Sync {
async fn handle(&self, request: RegionRequest) -> Result<AffectedRows>;

// TODO(ruihang): add trace id and span id in the request.
async fn do_get(&self, request: QueryRequest) -> Result<SendableRecordBatchStream>;
}
Expand Down
21 changes: 2 additions & 19 deletions src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,17 @@

use std::sync::Arc;

use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::QueryRequest;
use async_trait::async_trait;
use client::error::{HandleRequestSnafu, Result as ClientResult};
use client::region_handler::RegionRequestHandler;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;

use crate::catalog::FrontendCatalogManager;
use crate::error::{
FindDatanodeSnafu, FindTableRouteSnafu, NotSupportedSnafu, RequestQuerySnafu, Result,
};
use crate::error::{FindDatanodeSnafu, FindTableRouteSnafu, RequestQuerySnafu, Result};

pub(crate) struct DistRegionRequestHandler {
catalog_manager: Arc<FrontendCatalogManager>,
Expand All @@ -41,13 +38,6 @@ impl DistRegionRequestHandler {

#[async_trait]
impl RegionRequestHandler for DistRegionRequestHandler {
async fn handle(&self, request: RegionRequest) -> ClientResult<AffectedRows> {
self.handle_inner(request)
.await
.map_err(BoxedError::new)
.context(HandleRequestSnafu)
}

async fn do_get(&self, request: QueryRequest) -> ClientResult<SendableRecordBatchStream> {
self.do_get_inner(request)
.await
Expand All @@ -57,13 +47,6 @@ impl RegionRequestHandler for DistRegionRequestHandler {
}

impl DistRegionRequestHandler {
async fn handle_inner(&self, _request: RegionRequest) -> Result<AffectedRows> {
NotSupportedSnafu {
feat: "region request",
}
.fail()
}

async fn do_get_inner(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
let region_id = RegionId::from_u64(request.region_id);

Expand Down
11 changes: 0 additions & 11 deletions src/frontend/src/instance/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ pub(crate) struct StandaloneRegionRequestHandler {
}

impl StandaloneRegionRequestHandler {
#[allow(dead_code)]
pub fn arc(region_server: RegionServer) -> Arc<Self> {
Arc::new(Self { region_server })
}
Expand All @@ -63,16 +62,6 @@ impl StandaloneRegionRequestHandler {

#[async_trait]
impl RegionRequestHandler for StandaloneRegionRequestHandler {
async fn handle(&self, request: RegionRequest) -> ClientResult<AffectedRows> {
let response = self
.handle_inner(request)
.await
.map_err(BoxedError::new)
.context(HandleRequestSnafu)?;
check_response_header(response.header)?;
Ok(response.affected_rows)
}

async fn do_get(&self, request: QueryRequest) -> ClientResult<SendableRecordBatchStream> {
self.region_server
.handle_read(request)
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/req_convert/common/partitioner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::region::{DeleteRequest, DeleteRequests, InsertRequest, InsertRequests};
use api::v1::region::{DeleteRequest, InsertRequest};
use api::v1::Rows;
use partition::manager::PartitionRuleManager;
use snafu::ResultExt;
Expand All @@ -33,7 +33,7 @@ impl<'a> Partitioner<'a> {
&self,
table_id: TableId,
rows: Rows,
) -> Result<InsertRequests> {
) -> Result<Vec<InsertRequest>> {
let requests = self
.partition_manager
.split_rows(table_id, rows)
Expand All @@ -45,14 +45,14 @@ impl<'a> Partitioner<'a> {
rows: Some(rows),
})
.collect();
Ok(InsertRequests { requests })
Ok(requests)
}

pub async fn partition_delete_requests(
&self,
table_id: TableId,
rows: Rows,
) -> Result<DeleteRequests> {
) -> Result<Vec<DeleteRequest>> {
let requests = self
.partition_manager
.split_rows(table_id, rows)
Expand All @@ -64,6 +64,6 @@ impl<'a> Partitioner<'a> {
rows: Some(rows),
})
.collect();
Ok(DeleteRequests { requests })
Ok(requests)
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/req_convert/delete/row_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<'a> RowToRegion<'a> {
.partition_delete_requests(table_id, request.rows.unwrap_or_default())
.await?;

region_request.extend(requests.requests);
region_request.extend(requests);
}

Ok(RegionDeleteRequests {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/req_convert/delete/table_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<'a> TableToRegion<'a> {
let requests = Partitioner::new(self.partition_manager)
.partition_delete_requests(self.table_info.table_id(), rows)
.await?;
Ok(requests)
Ok(RegionDeleteRequests { requests })
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/req_convert/insert/row_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<'a> RowToRegion<'a> {
.partition_insert_requests(table_id, request.rows.unwrap_or_default())
.await?;

region_request.extend(requests.requests);
region_request.extend(requests);
}

Ok(RegionInsertRequests {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/req_convert/insert/stmt_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl<'a> StatementToRegion<'a> {
let requests = Partitioner::new(self.partition_manager)
.partition_insert_requests(table_info.table_id(), Rows { schema, rows })
.await?;
Ok(requests)
Ok(RegionInsertRequests { requests })
}

async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/req_convert/insert/table_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<'a> TableToRegion<'a> {
let requests = Partitioner::new(self.partition_manager)
.partition_insert_requests(self.table_info.table_id(), rows)
.await?;
Ok(requests)
Ok(RegionInsertRequests { requests })
}
}

Expand Down

0 comments on commit 094d463

Please sign in to comment.