Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: consolidate Insert request related partitioning and distributed processing operations into Inserter #2346

10 changes: 1 addition & 9 deletions src/client/src/region_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,14 @@

use std::sync::Arc;

use api::v1::region::{region_request, QueryRequest};
use api::v1::region::QueryRequest;
use async_trait::async_trait;
use common_meta::datanode_manager::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;
use session::context::QueryContextRef;

use crate::error::Result;

#[async_trait]
pub trait RegionRequestHandler: Send + Sync {
async fn handle(
&self,
request: region_request::Body,
ctx: QueryContextRef,
) -> Result<AffectedRows>;

// TODO(ruihang): add trace id and span id in the request.
async fn do_get(&self, request: QueryRequest) -> Result<SendableRecordBatchStream>;
}
Expand Down
8 changes: 4 additions & 4 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct DdlManager {
datanode_manager: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_creator: TableMetadataAllocatorRef,
table_meta_allocator: TableMetadataAllocatorRef,
}

impl DdlManager {
Expand All @@ -58,14 +58,14 @@ impl DdlManager {
datanode_clients: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_creator: TableMetadataAllocatorRef,
table_meta_allocator: TableMetadataAllocatorRef,
) -> Self {
Self {
procedure_manager,
datanode_manager: datanode_clients,
cache_invalidator,
table_metadata_manager,
table_creator,
table_meta_allocator,
}
}

Expand Down Expand Up @@ -333,7 +333,7 @@ async fn handle_create_table_task(
mut create_table_task: CreateTableTask,
) -> Result<SubmitDdlTaskResponse> {
let (table_id, region_routes) = ddl_manager
.table_creator
.table_meta_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&mut create_table_task.table_info,
Expand Down
144 changes: 112 additions & 32 deletions src/frontend/src/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,49 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::{iter, mem};

use api::v1::region::region_request;
use api::v1::{DeleteRequests, RowDeleteRequest, RowDeleteRequests};
use catalog::CatalogManager;
use client::region_handler::RegionRequestHandler;
use api::v1::region::{DeleteRequests as RegionDeleteRequests, RegionRequestHeader};
use api::v1::{DeleteRequests, RowDeleteRequests};
use catalog::CatalogManagerRef;
use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef};
use common_meta::peer::Peer;
use common_query::Output;
use futures_util::future;
use metrics::counter;
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use table::requests::DeleteRequest as TableDeleteRequest;
use table::TableRef;

use crate::error::{
CatalogSnafu, InvalidDeleteRequestSnafu, MissingTimeIndexColumnSnafu, RequestDatanodeSnafu,
Result, TableNotFoundSnafu,
CatalogSnafu, FindRegionLeaderSnafu, InvalidDeleteRequestSnafu, JoinTaskSnafu,
MissingTimeIndexColumnSnafu, RequestDeletesSnafu, Result, TableNotFoundSnafu,
};
use crate::req_convert::delete::{ColumnToRow, RowToRegion};
use crate::region_req_factory::RegionRequestFactory;
use crate::req_convert::delete::{ColumnToRow, RowToRegion, TableToRegion};

pub(crate) struct Deleter<'a> {
catalog_manager: &'a dyn CatalogManager,
region_request_handler: &'a dyn RegionRequestHandler,
pub struct Deleter {
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
datanode_manager: DatanodeManagerRef,
}

impl<'a> Deleter<'a> {
pub type DeleterRef = Arc<Deleter>;

impl Deleter {
pub fn new(
catalog_manager: &'a dyn CatalogManager,
region_request_handler: &'a dyn RegionRequestHandler,
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
datanode_manager: DatanodeManagerRef,
) -> Self {
Self {
catalog_manager,
region_request_handler,
partition_manager,
datanode_manager,
}
}

Expand All @@ -67,31 +79,99 @@ impl<'a> Deleter<'a> {
.map(|r| !r.rows.is_empty())
.unwrap_or_default()
});
validate_row_count_match(&requests)?;
validate_column_count_match(&requests)?;

let requests = self.trim_columns(requests, &ctx).await?;
let region_request = RowToRegion::new(self.catalog_manager, &ctx)
.convert(requests)
.await?;
let region_request = region_request::Body::Deletes(region_request);
let deletes = RowToRegion::new(
self.catalog_manager.as_ref(),
self.partition_manager.as_ref(),
&ctx,
)
.convert(requests)
.await?;

let affected_rows = self
.region_request_handler
.handle(region_request, ctx)
.await
.context(RequestDatanodeSnafu)?;
let affected_rows = self.do_request(deletes, ctx.trace_id(), 0).await?;
Ok(Output::AffectedRows(affected_rows as _))
}

pub async fn handle_table_delete(
&self,
request: TableDeleteRequest,
ctx: QueryContextRef,
) -> Result<AffectedRows> {
let catalog = request.catalog_name.as_str();
let schema = request.schema_name.as_str();
let table = request.table_name.as_str();
let table = self.get_table(catalog, schema, table).await?;
let table_info = table.table_info();

let deletes = TableToRegion::new(&table_info, &self.partition_manager)
.convert(request)
.await?;
self.do_request(deletes, ctx.trace_id(), 0).await
}
}

impl<'a> Deleter<'a> {
impl Deleter {
async fn do_request(
&self,
requests: RegionDeleteRequests,
trace_id: u64,
span_id: u64,
) -> Result<AffectedRows> {
let header = RegionRequestHeader { trace_id, span_id };
let request_factory = RegionRequestFactory::new(header);

let tasks = self
.group_requests_by_peer(requests)
.await?
.into_iter()
.map(|(peer, deletes)| {
let request = request_factory.build_delete(deletes);
let datanode_manager = self.datanode_manager.clone();
common_runtime::spawn_write(async move {
datanode_manager
.datanode(&peer)
.await
.handle(request)
.await
.context(RequestDeletesSnafu)
})
});
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;

let affected_rows = results.into_iter().sum::<Result<u64>>()?;
counter!(crate::metrics::DIST_DELETE_ROW_COUNT, affected_rows);
Ok(affected_rows)
}

async fn group_requests_by_peer(
&self,
requests: RegionDeleteRequests,
) -> Result<HashMap<Peer, RegionDeleteRequests>> {
let mut deletes: HashMap<Peer, RegionDeleteRequests> = HashMap::new();

for req in requests.requests {
let peer = self
.partition_manager
.find_region_leader(req.region_id.into())
.await
.context(FindRegionLeaderSnafu)?;
deletes.entry(peer).or_default().requests.push(req);
}

Ok(deletes)
}

async fn trim_columns(
&self,
mut requests: RowDeleteRequests,
ctx: &QueryContextRef,
) -> Result<RowDeleteRequests> {
for req in &mut requests.deletes {
let table = self.get_table(req, ctx).await?;
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let table = self.get_table(catalog, schema, &req.table_name).await?;
let key_column_names = self.key_column_names(&table)?;

let rows = req.rows.as_mut().unwrap();
Expand Down Expand Up @@ -142,25 +222,25 @@ impl<'a> Deleter<'a> {
Ok(key_column_names)
}

async fn get_table(&self, req: &RowDeleteRequest, ctx: &QueryContextRef) -> Result<TableRef> {
async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
self.catalog_manager
.table(ctx.current_catalog(), ctx.current_schema(), &req.table_name)
.table(catalog, schema, table)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: req.table_name.clone(),
table_name: common_catalog::format_full_table_name(catalog, schema, table),
})
}
}

fn validate_row_count_match(requests: &RowDeleteRequests) -> Result<()> {
fn validate_column_count_match(requests: &RowDeleteRequests) -> Result<()> {
for request in &requests.deletes {
let rows = request.rows.as_ref().unwrap();
let column_count = rows.schema.len();
ensure!(
rows.rows.iter().all(|r| r.values.len() == column_count),
InvalidDeleteRequestSnafu {
reason: "row count mismatch"
reason: "column count mismatch"
}
)
}
Expand Down
15 changes: 13 additions & 2 deletions src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to find leader for region, source: {}", source))]
FindRegionLeader {
source: partition::error::Error,
location: Location,
},

#[snafu(display("Failed to create table info, source: {}", source))]
CreateTableInfo {
#[snafu(backtrace)]
Expand Down Expand Up @@ -683,6 +689,9 @@ pub enum Error {
column,
))]
ColumnNoneDefaultValue { column: String, location: Location },

#[snafu(display("Invalid region request, reason: {}", reason))]
InvalidRegionRequest { reason: String },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -761,7 +770,8 @@ impl ErrorExt for Error {
| Error::BuildDfLogicalPlan { .. }
| Error::BuildTableMeta { .. }
| Error::VectorToGrpcColumn { .. }
| Error::MissingInsertBody { .. } => StatusCode::Internal,
| Error::MissingInsertBody { .. }
| Error::InvalidRegionRequest { .. } => StatusCode::Internal,

Error::IncompleteGrpcResult { .. }
| Error::ContextValueNotFound { .. }
Expand Down Expand Up @@ -808,7 +818,8 @@ impl ErrorExt for Error {
| Error::FindTablePartitionRule { source, .. }
| Error::FindTableRoute { source, .. }
| Error::SplitInsert { source, .. }
| Error::SplitDelete { source, .. } => source.status_code(),
| Error::SplitDelete { source, .. }
| Error::FindRegionLeader { source, .. } => source.status_code(),

Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,

Expand Down
Loading