Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(operator): reuse table info from table creation #3945

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 50 additions & 38 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,12 @@ impl Inserter {
});
validate_column_count_match(&requests)?;

self.create_or_alter_tables_on_demand(&requests, &ctx, None, statement_executor)
let table_name_to_ids = self
.create_or_alter_tables_on_demand(&requests, &ctx, None, statement_executor)
.await?;
let inserts = RowToRegion::new(table_name_to_ids, self.partition_manager.as_ref())
.convert(requests)
.await?;
let inserts = RowToRegion::new(
self.catalog_manager.as_ref(),
self.partition_manager.as_ref(),
&ctx,
)
.convert(requests)
.await?;

self.do_request(inserts, &ctx).await
}
Expand All @@ -143,17 +140,17 @@ impl Inserter {
.await?;

// check and create logical tables
self.create_or_alter_tables_on_demand(
&requests,
&ctx,
Some(physical_table.to_string()),
statement_executor,
)
.await?;
let inserts =
RowToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, &ctx)
.convert(requests)
.await?;
let table_name_to_ids = self
.create_or_alter_tables_on_demand(
&requests,
&ctx,
Some(physical_table.to_string()),
statement_executor,
)
.await?;
let inserts = RowToRegion::new(table_name_to_ids, &self.partition_manager)
.convert(requests)
.await?;

self.do_request(inserts, &ctx).await
}
Expand Down Expand Up @@ -359,16 +356,20 @@ impl Inserter {
Ok(inserts)
}

// check if tables already exist:
// - if table does not exist, create table by inferred CreateExpr
// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
/// Creates or alter tables on demand:
/// - if table does not exist, create table by inferred CreateExpr
/// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
///
/// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
/// This mapping is used in the conversion of RowToRegion.
async fn create_or_alter_tables_on_demand(
&self,
requests: &RowInsertRequests,
ctx: &QueryContextRef,
on_physical_table: Option<String>,
statement_executor: &StatementExecutor,
) -> Result<()> {
) -> Result<HashMap<String, TableId>> {
let mut table_name_to_ids = HashMap::with_capacity(requests.inserts.len());
let mut create_tables = vec![];
let mut alter_tables = vec![];
for req in &requests.inserts {
Expand All @@ -377,6 +378,9 @@ impl Inserter {
let table = self.get_table(catalog, schema, &req.table_name).await?;
match table {
Some(table) => {
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());

// TODO(jeremy): alter in batch? (from `handle_metric_row_inserts`)
validate_request_with_table(req, &table)?;
let alter_expr = self.get_alter_table_expr_on_demand(req, table, ctx)?;
Expand All @@ -393,13 +397,19 @@ impl Inserter {
if let Some(on_physical_table) = on_physical_table {
if !create_tables.is_empty() {
// Creates logical tables in batch.
self.create_logical_tables(
create_tables,
ctx,
&on_physical_table,
statement_executor,
)
.await?;
let tables = self
.create_logical_tables(
create_tables,
ctx,
&on_physical_table,
statement_executor,
)
.await?;

for table in tables {
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
}
if !alter_tables.is_empty() {
// Alter logical tables in batch.
Expand All @@ -409,7 +419,9 @@ impl Inserter {
}
} else {
for req in create_tables {
self.create_table(req, ctx, statement_executor).await?;
let table = self.create_table(req, ctx, statement_executor).await?;
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
for alter_expr in alter_tables.into_iter() {
statement_executor
Expand All @@ -418,7 +430,7 @@ impl Inserter {
}
}

Ok(())
Ok(table_name_to_ids)
}

async fn create_physical_table_on_demand(
Expand Down Expand Up @@ -527,7 +539,7 @@ impl Inserter {
req: &RowInsertRequest,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<()> {
) -> Result<TableRef> {
let table_ref =
TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name);

Expand All @@ -542,12 +554,12 @@ impl Inserter {
.await;

match res {
Ok(_) => {
Ok(table) => {
info!(
"Successfully created table {}.{}.{}",
table_ref.catalog, table_ref.schema, table_ref.table,
);
Ok(())
Ok(table)
}
Err(err) => {
error!(
Expand All @@ -565,7 +577,7 @@ impl Inserter {
ctx: &QueryContextRef,
physical_table: &str,
statement_executor: &StatementExecutor,
) -> Result<()> {
) -> Result<Vec<TableRef>> {
let create_table_exprs = create_tables
.iter()
.map(|req| {
Expand Down Expand Up @@ -593,9 +605,9 @@ impl Inserter {
.await;

match res {
Ok(_) => {
Ok(res) => {
info!("Successfully created logical tables");
Ok(())
Ok(res)
}
Err(err) => {
let failed_tables = create_table_exprs
Expand Down
42 changes: 14 additions & 28 deletions src/operator/src/req_convert/insert/row_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,37 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use api::v1::region::InsertRequests as RegionInsertRequests;
use api::v1::RowInsertRequests;
use catalog::CatalogManager;
use partition::manager::PartitionRuleManager;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::TableRef;
use snafu::OptionExt;
use table::metadata::TableId;

use crate::error::{CatalogSnafu, Result, TableNotFoundSnafu};
use crate::error::{Result, TableNotFoundSnafu};
use crate::req_convert::common::partitioner::Partitioner;

pub struct RowToRegion<'a> {
catalog_manager: &'a dyn CatalogManager,
table_name_to_ids: HashMap<String, TableId>,
partition_manager: &'a PartitionRuleManager,
ctx: &'a QueryContext,
}

impl<'a> RowToRegion<'a> {
pub fn new(
catalog_manager: &'a dyn CatalogManager,
table_name_to_ids: HashMap<String, TableId>,
partition_manager: &'a PartitionRuleManager,
ctx: &'a QueryContext,
) -> Self {
Self {
catalog_manager,
table_name_to_ids,
partition_manager,
ctx,
}
}

pub async fn convert(&self, requests: RowInsertRequests) -> Result<RegionInsertRequests> {
let mut region_request = Vec::with_capacity(requests.inserts.len());
for request in requests.inserts {
let table = self.get_table(&request.table_name).await?;
let table_id = table.table_info().table_id();

let table_id = self.get_table_id(&request.table_name)?;
let requests = Partitioner::new(self.partition_manager)
.partition_insert_requests(table_id, request.rows.unwrap_or_default())
.await?;
Expand All @@ -60,19 +55,10 @@ impl<'a> RowToRegion<'a> {
})
}

async fn get_table(&self, table_name: &str) -> Result<TableRef> {
let catalog_name = self.ctx.current_catalog();
let schema_name = self.ctx.current_schema();
self.catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: common_catalog::format_full_table_name(
catalog_name,
schema_name,
table_name,
),
})
fn get_table_id(&self, table_name: &str) -> Result<TableId> {
self.table_name_to_ids
.get(table_name)
.cloned()
.context(TableNotFoundSnafu { table_name })
}
}