Skip to content

Commit

Permalink
perf(operator): reuse table info from creating
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc committed May 15, 2024
1 parent cfae276 commit 3779f40
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 66 deletions.
88 changes: 50 additions & 38 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,12 @@ impl Inserter {
});
validate_column_count_match(&requests)?;

self.create_or_alter_tables_on_demand(&requests, &ctx, None, statement_executor)
let table_name_to_ids = self
.create_or_alter_tables_on_demand(&requests, &ctx, None, statement_executor)
.await?;
let inserts = RowToRegion::new(table_name_to_ids, self.partition_manager.as_ref())
.convert(requests)
.await?;
let inserts = RowToRegion::new(
self.catalog_manager.as_ref(),
self.partition_manager.as_ref(),
&ctx,
)
.convert(requests)
.await?;

self.do_request(inserts, &ctx).await
}
Expand All @@ -143,17 +140,17 @@ impl Inserter {
.await?;

// check and create logical tables
self.create_or_alter_tables_on_demand(
&requests,
&ctx,
Some(physical_table.to_string()),
statement_executor,
)
.await?;
let inserts =
RowToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, &ctx)
.convert(requests)
.await?;
let table_name_to_ids = self
.create_or_alter_tables_on_demand(
&requests,
&ctx,
Some(physical_table.to_string()),
statement_executor,
)
.await?;
let inserts = RowToRegion::new(table_name_to_ids, &self.partition_manager)
.convert(requests)
.await?;

self.do_request(inserts, &ctx).await
}
Expand Down Expand Up @@ -359,16 +356,20 @@ impl Inserter {
Ok(inserts)
}

// check if tables already exist:
// - if table does not exist, create table by inferred CreateExpr
// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
/// Creates or alter tables on demand:
/// - if table does not exist, create table by inferred CreateExpr
/// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
///
/// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
/// This mapping is used in the conversion of RowToRegion.
async fn create_or_alter_tables_on_demand(
&self,
requests: &RowInsertRequests,
ctx: &QueryContextRef,
on_physical_table: Option<String>,
statement_executor: &StatementExecutor,
) -> Result<()> {
) -> Result<HashMap<String, TableId>> {
let mut table_name_to_ids = HashMap::with_capacity(requests.inserts.len());
let mut create_tables = vec![];
let mut alter_tables = vec![];
for req in &requests.inserts {
Expand All @@ -377,6 +378,9 @@ impl Inserter {
let table = self.get_table(catalog, schema, &req.table_name).await?;
match table {
Some(table) => {
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());

// TODO(jeremy): alter in batch? (from `handle_metric_row_inserts`)
validate_request_with_table(req, &table)?;
let alter_expr = self.get_alter_table_expr_on_demand(req, table, ctx)?;
Expand All @@ -393,13 +397,19 @@ impl Inserter {
if let Some(on_physical_table) = on_physical_table {
if !create_tables.is_empty() {
// Creates logical tables in batch.
self.create_logical_tables(
create_tables,
ctx,
&on_physical_table,
statement_executor,
)
.await?;
let tables = self
.create_logical_tables(
create_tables,
ctx,
&on_physical_table,
statement_executor,
)
.await?;

for table in tables {
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
}
if !alter_tables.is_empty() {
// Alter logical tables in batch.
Expand All @@ -409,7 +419,9 @@ impl Inserter {
}
} else {
for req in create_tables {
self.create_table(req, ctx, statement_executor).await?;
let table = self.create_table(req, ctx, statement_executor).await?;
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
for alter_expr in alter_tables.into_iter() {
statement_executor
Expand All @@ -418,7 +430,7 @@ impl Inserter {
}
}

Ok(())
Ok(table_name_to_ids)
}

async fn create_physical_table_on_demand(
Expand Down Expand Up @@ -527,7 +539,7 @@ impl Inserter {
req: &RowInsertRequest,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<()> {
) -> Result<TableRef> {
let table_ref =
TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name);

Expand All @@ -542,12 +554,12 @@ impl Inserter {
.await;

match res {
Ok(_) => {
Ok(table) => {
info!(
"Successfully created table {}.{}.{}",
table_ref.catalog, table_ref.schema, table_ref.table,
);
Ok(())
Ok(table)
}
Err(err) => {
error!(
Expand All @@ -565,7 +577,7 @@ impl Inserter {
ctx: &QueryContextRef,
physical_table: &str,
statement_executor: &StatementExecutor,
) -> Result<()> {
) -> Result<Vec<TableRef>> {
let create_table_exprs = create_tables
.iter()
.map(|req| {
Expand Down Expand Up @@ -593,9 +605,9 @@ impl Inserter {
.await;

match res {
Ok(_) => {
Ok(res) => {
info!("Successfully created logical tables");
Ok(())
Ok(res)
}
Err(err) => {
let failed_tables = create_table_exprs
Expand Down
42 changes: 14 additions & 28 deletions src/operator/src/req_convert/insert/row_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,37 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use api::v1::region::InsertRequests as RegionInsertRequests;
use api::v1::RowInsertRequests;
use catalog::CatalogManager;
use partition::manager::PartitionRuleManager;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::TableRef;
use snafu::OptionExt;
use table::metadata::TableId;

use crate::error::{CatalogSnafu, Result, TableNotFoundSnafu};
use crate::error::{Result, TableNotFoundSnafu};
use crate::req_convert::common::partitioner::Partitioner;

pub struct RowToRegion<'a> {
catalog_manager: &'a dyn CatalogManager,
table_name_to_ids: HashMap<String, TableId>,
partition_manager: &'a PartitionRuleManager,
ctx: &'a QueryContext,
}

impl<'a> RowToRegion<'a> {
pub fn new(
catalog_manager: &'a dyn CatalogManager,
table_name_to_ids: HashMap<String, TableId>,
partition_manager: &'a PartitionRuleManager,
ctx: &'a QueryContext,
) -> Self {
Self {
catalog_manager,
table_name_to_ids,
partition_manager,
ctx,
}
}

pub async fn convert(&self, requests: RowInsertRequests) -> Result<RegionInsertRequests> {
let mut region_request = Vec::with_capacity(requests.inserts.len());
for request in requests.inserts {
let table = self.get_table(&request.table_name).await?;
let table_id = table.table_info().table_id();

let table_id = self.get_table_id(&request.table_name)?;
let requests = Partitioner::new(self.partition_manager)
.partition_insert_requests(table_id, request.rows.unwrap_or_default())
.await?;
Expand All @@ -60,19 +55,10 @@ impl<'a> RowToRegion<'a> {
})
}

async fn get_table(&self, table_name: &str) -> Result<TableRef> {
let catalog_name = self.ctx.current_catalog();
let schema_name = self.ctx.current_schema();
self.catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: common_catalog::format_full_table_name(
catalog_name,
schema_name,
table_name,
),
})
fn get_table_id(&self, table_name: &str) -> Result<TableId> {
self.table_name_to_ids
.get(table_name)
.cloned()
.context(TableNotFoundSnafu { table_name })
}
}

0 comments on commit 3779f40

Please sign in to comment.