Skip to content

Commit

Permalink
feat: consolidate Insert request related partitioning and distributed…
Browse files Browse the repository at this point in the history
… processing operations into Inserter (#2346)

* refactor: RegionRequest as param of RegionRequestHandler.handle

Signed-off-by: Zhenchi <[email protected]>

* feat: partition insert & delete reqs for both standalone and distributed mode

Signed-off-by: Zhenchi <[email protected]>

* chore: nit change

Signed-off-by: Zhenchi <[email protected]>

* fix: wrong function nameg

Signed-off-by: Zhenchi <[email protected]>

* feat: do request in inserter & deleter

Signed-off-by: Zhenchi <[email protected]>

* feat: remove RegionRequestHandler.handle

Signed-off-by: Zhenchi <[email protected]>

* refactor: rename table_creator

Signed-off-by: Zhenchi <[email protected]>

* chore: nit change

Signed-off-by: Zhenchi <[email protected]>

* refactor: address comments

Signed-off-by: Zhenchi <[email protected]>

* chore: nit change

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored and waynexia committed Sep 12, 2023
1 parent c6bb859 commit 0d9ead0
Show file tree
Hide file tree
Showing 31 changed files with 890 additions and 1,264 deletions.
10 changes: 1 addition & 9 deletions src/client/src/region_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,14 @@

use std::sync::Arc;

use api::v1::region::{region_request, QueryRequest};
use api::v1::region::QueryRequest;
use async_trait::async_trait;
use common_meta::datanode_manager::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;
use session::context::QueryContextRef;

use crate::error::Result;

#[async_trait]
pub trait RegionRequestHandler: Send + Sync {
async fn handle(
&self,
request: region_request::Body,
ctx: QueryContextRef,
) -> Result<AffectedRows>;

// TODO(ruihang): add trace id and span id in the request.
async fn do_get(&self, request: QueryRequest) -> Result<SendableRecordBatchStream>;
}
Expand Down
8 changes: 4 additions & 4 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct DdlManager {
datanode_manager: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_creator: TableMetadataAllocatorRef,
table_meta_allocator: TableMetadataAllocatorRef,
}

impl DdlManager {
Expand All @@ -58,14 +58,14 @@ impl DdlManager {
datanode_clients: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_creator: TableMetadataAllocatorRef,
table_meta_allocator: TableMetadataAllocatorRef,
) -> Self {
Self {
procedure_manager,
datanode_manager: datanode_clients,
cache_invalidator,
table_metadata_manager,
table_creator,
table_meta_allocator,
}
}

Expand Down Expand Up @@ -333,7 +333,7 @@ async fn handle_create_table_task(
mut create_table_task: CreateTableTask,
) -> Result<SubmitDdlTaskResponse> {
let (table_id, region_routes) = ddl_manager
.table_creator
.table_meta_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&mut create_table_task.table_info,
Expand Down
144 changes: 112 additions & 32 deletions src/frontend/src/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,49 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::{iter, mem};

use api::v1::region::region_request;
use api::v1::{DeleteRequests, RowDeleteRequest, RowDeleteRequests};
use catalog::CatalogManager;
use client::region_handler::RegionRequestHandler;
use api::v1::region::{DeleteRequests as RegionDeleteRequests, RegionRequestHeader};
use api::v1::{DeleteRequests, RowDeleteRequests};
use catalog::CatalogManagerRef;
use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef};
use common_meta::peer::Peer;
use common_query::Output;
use futures_util::future;
use metrics::counter;
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use table::requests::DeleteRequest as TableDeleteRequest;
use table::TableRef;

use crate::error::{
CatalogSnafu, InvalidDeleteRequestSnafu, MissingTimeIndexColumnSnafu, RequestDatanodeSnafu,
Result, TableNotFoundSnafu,
CatalogSnafu, FindRegionLeaderSnafu, InvalidDeleteRequestSnafu, JoinTaskSnafu,
MissingTimeIndexColumnSnafu, RequestDeletesSnafu, Result, TableNotFoundSnafu,
};
use crate::req_convert::delete::{ColumnToRow, RowToRegion};
use crate::region_req_factory::RegionRequestFactory;
use crate::req_convert::delete::{ColumnToRow, RowToRegion, TableToRegion};

pub(crate) struct Deleter<'a> {
catalog_manager: &'a dyn CatalogManager,
region_request_handler: &'a dyn RegionRequestHandler,
pub struct Deleter {
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
datanode_manager: DatanodeManagerRef,
}

impl<'a> Deleter<'a> {
pub type DeleterRef = Arc<Deleter>;

impl Deleter {
pub fn new(
catalog_manager: &'a dyn CatalogManager,
region_request_handler: &'a dyn RegionRequestHandler,
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
datanode_manager: DatanodeManagerRef,
) -> Self {
Self {
catalog_manager,
region_request_handler,
partition_manager,
datanode_manager,
}
}

Expand All @@ -67,31 +79,99 @@ impl<'a> Deleter<'a> {
.map(|r| !r.rows.is_empty())
.unwrap_or_default()
});
validate_row_count_match(&requests)?;
validate_column_count_match(&requests)?;

let requests = self.trim_columns(requests, &ctx).await?;
let region_request = RowToRegion::new(self.catalog_manager, &ctx)
.convert(requests)
.await?;
let region_request = region_request::Body::Deletes(region_request);
let deletes = RowToRegion::new(
self.catalog_manager.as_ref(),
self.partition_manager.as_ref(),
&ctx,
)
.convert(requests)
.await?;

let affected_rows = self
.region_request_handler
.handle(region_request, ctx)
.await
.context(RequestDatanodeSnafu)?;
let affected_rows = self.do_request(deletes, ctx.trace_id(), 0).await?;
Ok(Output::AffectedRows(affected_rows as _))
}

pub async fn handle_table_delete(
&self,
request: TableDeleteRequest,
ctx: QueryContextRef,
) -> Result<AffectedRows> {
let catalog = request.catalog_name.as_str();
let schema = request.schema_name.as_str();
let table = request.table_name.as_str();
let table = self.get_table(catalog, schema, table).await?;
let table_info = table.table_info();

let deletes = TableToRegion::new(&table_info, &self.partition_manager)
.convert(request)
.await?;
self.do_request(deletes, ctx.trace_id(), 0).await
}
}

impl<'a> Deleter<'a> {
impl Deleter {
async fn do_request(
&self,
requests: RegionDeleteRequests,
trace_id: u64,
span_id: u64,
) -> Result<AffectedRows> {
let header = RegionRequestHeader { trace_id, span_id };
let request_factory = RegionRequestFactory::new(header);

let tasks = self
.group_requests_by_peer(requests)
.await?
.into_iter()
.map(|(peer, deletes)| {
let request = request_factory.build_delete(deletes);
let datanode_manager = self.datanode_manager.clone();
common_runtime::spawn_write(async move {
datanode_manager
.datanode(&peer)
.await
.handle(request)
.await
.context(RequestDeletesSnafu)
})
});
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;

let affected_rows = results.into_iter().sum::<Result<u64>>()?;
counter!(crate::metrics::DIST_DELETE_ROW_COUNT, affected_rows);
Ok(affected_rows)
}

async fn group_requests_by_peer(
&self,
requests: RegionDeleteRequests,
) -> Result<HashMap<Peer, RegionDeleteRequests>> {
let mut deletes: HashMap<Peer, RegionDeleteRequests> = HashMap::new();

for req in requests.requests {
let peer = self
.partition_manager
.find_region_leader(req.region_id.into())
.await
.context(FindRegionLeaderSnafu)?;
deletes.entry(peer).or_default().requests.push(req);
}

Ok(deletes)
}

async fn trim_columns(
&self,
mut requests: RowDeleteRequests,
ctx: &QueryContextRef,
) -> Result<RowDeleteRequests> {
for req in &mut requests.deletes {
let table = self.get_table(req, ctx).await?;
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let table = self.get_table(catalog, schema, &req.table_name).await?;
let key_column_names = self.key_column_names(&table)?;

let rows = req.rows.as_mut().unwrap();
Expand Down Expand Up @@ -142,25 +222,25 @@ impl<'a> Deleter<'a> {
Ok(key_column_names)
}

async fn get_table(&self, req: &RowDeleteRequest, ctx: &QueryContextRef) -> Result<TableRef> {
async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
self.catalog_manager
.table(ctx.current_catalog(), ctx.current_schema(), &req.table_name)
.table(catalog, schema, table)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: req.table_name.clone(),
table_name: common_catalog::format_full_table_name(catalog, schema, table),
})
}
}

fn validate_row_count_match(requests: &RowDeleteRequests) -> Result<()> {
fn validate_column_count_match(requests: &RowDeleteRequests) -> Result<()> {
for request in &requests.deletes {
let rows = request.rows.as_ref().unwrap();
let column_count = rows.schema.len();
ensure!(
rows.rows.iter().all(|r| r.values.len() == column_count),
InvalidDeleteRequestSnafu {
reason: "row count mismatch"
reason: "column count mismatch"
}
)
}
Expand Down
15 changes: 13 additions & 2 deletions src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to find leader for region, source: {}", source))]
FindRegionLeader {
source: partition::error::Error,
location: Location,
},

#[snafu(display("Failed to create table info, source: {}", source))]
CreateTableInfo {
#[snafu(backtrace)]
Expand Down Expand Up @@ -683,6 +689,9 @@ pub enum Error {
column,
))]
ColumnNoneDefaultValue { column: String, location: Location },

#[snafu(display("Invalid region request, reason: {}", reason))]
InvalidRegionRequest { reason: String },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -761,7 +770,8 @@ impl ErrorExt for Error {
| Error::BuildDfLogicalPlan { .. }
| Error::BuildTableMeta { .. }
| Error::VectorToGrpcColumn { .. }
| Error::MissingInsertBody { .. } => StatusCode::Internal,
| Error::MissingInsertBody { .. }
| Error::InvalidRegionRequest { .. } => StatusCode::Internal,

Error::IncompleteGrpcResult { .. }
| Error::ContextValueNotFound { .. }
Expand Down Expand Up @@ -808,7 +818,8 @@ impl ErrorExt for Error {
| Error::FindTablePartitionRule { source, .. }
| Error::FindTableRoute { source, .. }
| Error::SplitInsert { source, .. }
| Error::SplitDelete { source, .. } => source.status_code(),
| Error::SplitDelete { source, .. }
| Error::FindRegionLeader { source, .. } => source.status_code(),

Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,

Expand Down
Loading

0 comments on commit 0d9ead0

Please sign in to comment.