Skip to content

Commit

Permalink
fix(frontend): dedup column ids (close #7905) (#7972)
Browse files Browse the repository at this point in the history
dedup column ids (close #7905)

Approved-By: tabVersion
  • Loading branch information
soundOfDestiny authored Feb 17, 2023
1 parent d30a96e commit 2ba6a65
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 5 deletions.
38 changes: 37 additions & 1 deletion src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_pb::plan_common::{
};

use super::row_id_column_desc;
use crate::catalog::Field;
use crate::catalog::{Field, ROW_ID_COLUMN_ID};
use crate::error::ErrorCode;
use crate::types::DataType;

Expand Down Expand Up @@ -51,6 +51,12 @@ impl ColumnId {
pub const fn next(self) -> Self {
Self(self.0 + 1)
}

pub fn apply_delta_if_not_row_id(&mut self, delta: i32) {
if self.0 != ROW_ID_COLUMN_ID.get_id() {
self.0 += delta;
}
}
}

impl From<i32> for ColumnId {
Expand Down Expand Up @@ -291,6 +297,36 @@ impl ColumnCatalog {
}
}

pub fn columns_extend(preserved_columns: &mut Vec<ColumnCatalog>, columns: Vec<ColumnCatalog>) {
debug_assert_eq!(ROW_ID_COLUMN_ID.get_id(), 0);
let mut max_incoming_column_id = ROW_ID_COLUMN_ID.get_id();
columns.iter().for_each(|column| {
let column_id = column.column_id().get_id();
if column_id > max_incoming_column_id {
max_incoming_column_id = column_id;
}
});
preserved_columns.iter_mut().for_each(|column| {
column
.column_desc
.column_id
.apply_delta_if_not_row_id(max_incoming_column_id)
});

preserved_columns.extend(columns);
}

pub fn is_column_ids_dedup(columns: &[ColumnCatalog]) -> bool {
let mut column_ids = columns
.iter()
.map(|column| column.column_id().get_id())
.collect_vec();
column_ids.sort();
let original_len = column_ids.len();
column_ids.dedup();
column_ids.len() == original_len
}

#[cfg(test)]
pub mod tests {
use risingwave_pb::plan_common::ColumnDesc as ProstColumnDesc;
Expand Down
16 changes: 12 additions & 4 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use std::collections::HashMap;

use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ROW_ID_COLUMN_ID};
use risingwave_common::catalog::{
columns_extend, is_column_ids_dedup, ColumnCatalog, ColumnDesc, ROW_ID_COLUMN_ID,
};
use risingwave_common::error::ErrorCode::{self, ProtocolError};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
Expand Down Expand Up @@ -159,7 +161,8 @@ pub(crate) async fn resolve_source_schema(
)));
}

columns.extend(
columns_extend(
columns,
extract_protobuf_table_schema(protobuf_schema, with_properties.clone()).await?,
);

Expand Down Expand Up @@ -188,7 +191,10 @@ pub(crate) async fn resolve_source_schema(
)));
}

columns.extend(extract_avro_table_schema(avro_schema, with_properties.clone()).await?);
columns_extend(
columns,
extract_avro_table_schema(avro_schema, with_properties.clone()).await?,
);

StreamSourceInfo {
row_format: RowFormatType::Avro as i32,
Expand Down Expand Up @@ -294,7 +300,7 @@ pub(crate) async fn resolve_source_schema(
let _ = full_columns.remove(index);
}

columns.extend(full_columns);
columns_extend(columns, full_columns);
StreamSourceInfo {
row_format: RowFormatType::DebeziumAvro as i32,
row_schema_location: avro_schema.row_schema_location.0.clone(),
Expand Down Expand Up @@ -394,6 +400,8 @@ pub async fn handle_create_source(
)
.await?;

debug_assert!(is_column_ids_dedup(&columns));

let watermark_descs =
bind_source_watermark(&session, name.clone(), stmt.source_watermarks, &columns)?;
// TODO(yuhao): allow multiple watermark on source.
Expand Down

0 comments on commit 2ba6a65

Please sign in to comment.