diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 978de69c1ba9..6c7c5d8fcf5d 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -162,9 +162,11 @@ impl CreateLogicalTablesProcedure { manager.create_logic_tables_metadata(tables_data).await?; } - info!("Created {num_tables} tables metadata for physical table {physical_table_id}"); + let table_ids = self.creator.data.real_table_ids(); - Ok(Status::done_with_output(self.creator.data.real_table_ids())) + info!("Created {num_tables} tables {table_ids:?} metadata for physical table {physical_table_id}"); + + Ok(Status::done_with_output(table_ids)) } fn create_region_request_builder( diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index f4c455a122d8..a9a455f878ba 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -203,7 +203,7 @@ impl TryFrom for SubmitDdlTaskResponse { fn try_from(resp: PbSubmitDdlTaskResponse) -> Result { let table_id = resp.table_id.map(|t| t.id); - let table_ids = resp.table_ids.iter().map(|t| t.id).collect(); + let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect(); Ok(Self { key: resp.key, table_id, @@ -219,6 +219,11 @@ impl From for PbSubmitDdlTaskResponse { table_id: val .table_id .map(|table_id| api::v1::meta::TableId { id: table_id }), + table_ids: val + .table_ids + .into_iter() + .map(|id| api::v1::meta::TableId { id }) + .collect(), ..Default::default() } } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index bffff45806c3..07523c348215 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -252,20 +252,36 @@ impl Inserter { on_physical_table: Option, statement_executor: &StatementExecutor, ) -> Result<()> { - // TODO(jeremy): create and alter in batch? (from `handle_metric_row_inserts`) + let mut create_tables = vec![]; for req in &requests.inserts { let catalog = ctx.current_catalog(); let schema = ctx.current_schema(); let table = self.get_table(catalog, schema, &req.table_name).await?; match table { Some(table) => { + // TODO(jeremy): alter in batch? (from `handle_metric_row_inserts`) validate_request_with_table(req, &table)?; self.alter_table_on_demand(req, table, ctx, statement_executor) .await? } None => { - self.create_table(req, ctx, &on_physical_table, statement_executor) - .await? + create_tables.push(req); + } + } + } + if !create_tables.is_empty() { + if let Some(on_physical_table) = on_physical_table { + // Creates logical tables in batch. + self.create_logical_tables( + create_tables, + ctx, + &on_physical_table, + statement_executor, + ) + .await?; + } else { + for req in create_tables { + self.create_table(req, ctx, statement_executor).await?; } } } @@ -403,7 +419,6 @@ impl Inserter { &self, req: &RowInsertRequest, ctx: &QueryContextRef, - on_physical_table: &Option, statement_executor: &StatementExecutor, ) -> Result<()> { let table_ref = @@ -412,15 +427,7 @@ impl Inserter { let request_schema = req.rows.as_ref().unwrap().schema.as_slice(); let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?; - if let Some(physical_table) = on_physical_table { - create_table_expr.engine = METRIC_ENGINE_NAME.to_string(); - create_table_expr.table_options.insert( - LOGICAL_TABLE_METADATA_KEY.to_string(), - physical_table.clone(), - ); - } - - info!("Table `{table_ref}` does not exist, try creating table",); + info!("Table `{table_ref}` does not exist, try creating table"); // TODO(weny): multiple regions table. let res = statement_executor @@ -444,6 +451,65 @@ impl Inserter { } } } + + async fn create_logical_tables( + &self, + create_tables: Vec<&RowInsertRequest>, + ctx: &QueryContextRef, + physical_table: &str, + statement_executor: &StatementExecutor, + ) -> Result<()> { + let create_table_exprs = create_tables + .iter() + .map(|req| { + let table_ref = TableReference::full( + ctx.current_catalog(), + ctx.current_schema(), + &req.table_name, + ); + + info!("Logical table `{table_ref}` does not exist, try creating table"); + + let request_schema = req.rows.as_ref().unwrap().schema.as_slice(); + let mut create_table_expr = build_create_table_expr(&table_ref, request_schema)?; + + create_table_expr.engine = METRIC_ENGINE_NAME.to_string(); + create_table_expr.table_options.insert( + LOGICAL_TABLE_METADATA_KEY.to_string(), + physical_table.to_string(), + ); + + Ok(create_table_expr) + }) + .collect::>>()?; + + let res = statement_executor + .create_logical_tables(&create_table_exprs) + .await; + + match res { + Ok(_) => { + info!("Successfully created logical tables"); + Ok(()) + } + Err(err) => { + let failed_tables = create_table_exprs + .into_iter() + .map(|expr| { + format!( + "{}.{}.{}", + expr.catalog_name, expr.schema_name, expr.table_name + ) + }) + .collect::>(); + error!( + "Failed to create logical tables {:?}: {}", + failed_tables, err + ); + Err(err) + } + } + } } fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {