diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 2731cb0b4876c..98fdd758090fc 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -20,7 +20,7 @@ use risingwave_pb::plan_common::{ }; use super::row_id_column_desc; -use crate::catalog::Field; +use crate::catalog::{Field, ROW_ID_COLUMN_ID}; use crate::error::ErrorCode; use crate::types::DataType; @@ -51,6 +51,12 @@ impl ColumnId { pub const fn next(self) -> Self { Self(self.0 + 1) } + + pub fn apply_delta_if_not_row_id(&mut self, delta: i32) { + if self.0 != ROW_ID_COLUMN_ID.get_id() { + self.0 += delta; + } + } } impl From for ColumnId { @@ -291,6 +297,36 @@ impl ColumnCatalog { } } +pub fn columns_extend(preserved_columns: &mut Vec, columns: Vec) { + debug_assert_eq!(ROW_ID_COLUMN_ID.get_id(), 0); + let mut max_incoming_column_id = ROW_ID_COLUMN_ID.get_id(); + columns.iter().for_each(|column| { + let column_id = column.column_id().get_id(); + if column_id > max_incoming_column_id { + max_incoming_column_id = column_id; + } + }); + preserved_columns.iter_mut().for_each(|column| { + column + .column_desc + .column_id + .apply_delta_if_not_row_id(max_incoming_column_id) + }); + + preserved_columns.extend(columns); +} + +pub fn is_column_ids_dedup(columns: &[ColumnCatalog]) -> bool { + let mut column_ids = columns + .iter() + .map(|column| column.column_id().get_id()) + .collect_vec(); + column_ids.sort(); + let original_len = column_ids.len(); + column_ids.dedup(); + column_ids.len() == original_len +} + #[cfg(test)] pub mod tests { use risingwave_pb::plan_common::ColumnDesc as ProstColumnDesc; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index a3f33d168e1c8..d1dbca29fbc2a 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -16,7 +16,9 @@ use std::collections::HashMap; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ROW_ID_COLUMN_ID}; +use risingwave_common::catalog::{ + columns_extend, is_column_ids_dedup, ColumnCatalog, ColumnDesc, ROW_ID_COLUMN_ID, +}; use risingwave_common::error::ErrorCode::{self, ProtocolError}; use risingwave_common::error::{Result, RwError}; use risingwave_common::types::DataType; @@ -159,7 +161,8 @@ pub(crate) async fn resolve_source_schema( ))); } - columns.extend( + columns_extend( + columns, extract_protobuf_table_schema(protobuf_schema, with_properties.clone()).await?, ); @@ -188,7 +191,10 @@ pub(crate) async fn resolve_source_schema( ))); } - columns.extend(extract_avro_table_schema(avro_schema, with_properties.clone()).await?); + columns_extend( + columns, + extract_avro_table_schema(avro_schema, with_properties.clone()).await?, + ); StreamSourceInfo { row_format: RowFormatType::Avro as i32, @@ -294,7 +300,7 @@ pub(crate) async fn resolve_source_schema( let _ = full_columns.remove(index); } - columns.extend(full_columns); + columns_extend(columns, full_columns); StreamSourceInfo { row_format: RowFormatType::DebeziumAvro as i32, row_schema_location: avro_schema.row_schema_location.0.clone(), @@ -394,6 +400,8 @@ pub async fn handle_create_source( ) .await?; + debug_assert!(is_column_ids_dedup(&columns)); + let watermark_descs = bind_source_watermark(&session, name.clone(), stmt.source_watermarks, &columns)?; // TODO(yuhao): allow multiple watermark on source.