Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jun 18, 2024
1 parent 01beaa7 commit 295121d
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 31 deletions.
6 changes: 2 additions & 4 deletions e2e_test/source_inline/kafka/avro/alter_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ system ok
echo '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \\
| curl -X POST -H 'content-type:application/json' -d @- '${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions'

sleep 5s

statement ok
create source s
WITH (
Expand All @@ -32,11 +30,11 @@ system ok
echo '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \\
| curl -X POST -H 'content-type:application/json' -d @- '${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions'

sleep 5s

system ok
echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_source_test

sleep 2s

query ?
select * from s
----
Expand Down
19 changes: 10 additions & 9 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,15 +430,16 @@ pub fn columns_extend(preserved_columns: &mut Vec<ColumnCatalog>, columns: Vec<C
preserved_columns.extend(columns);
}

pub fn is_column_ids_distinct(columns: &[ColumnCatalog]) -> bool {
let mut column_ids = columns
.iter()
.map(|column| column.column_id().get_id())
.collect_vec();
column_ids.sort();
let original_len = column_ids.len();
column_ids.dedup();
column_ids.len() == original_len
pub fn debug_assert_column_ids_distinct(columns: &[ColumnCatalog]) {
debug_assert!(
columns
.iter()
.map(|c| c.column_id())
.duplicates()
.next()
.is_none(),
"duplicate ColumnId found in source catalog. Columns: {columns:#?}"
);
}

#[cfg(test)]
Expand Down
16 changes: 2 additions & 14 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use either::Either;
use itertools::Itertools;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{is_system_schema, Field};
use risingwave_common::catalog::{debug_assert_column_ids_distinct, is_system_schema, Field};
use risingwave_common::session_config::USER_NAME_WILD_CARD;
use risingwave_connector::WithPropertiesExt;
use risingwave_sqlparser::ast::{AsOf, Statement, TableAlias};
Expand Down Expand Up @@ -221,19 +221,7 @@ impl Binder {
source_catalog: &SourceCatalog,
as_of: Option<AsOf>,
) -> (Relation, Vec<(bool, Field)>) {
if cfg!(debug_assertions) {
let columns = &source_catalog.columns;
assert!(
columns
.iter()
.map(|c| c.column_id())
.duplicates()
.next()
.is_none(),
"duplicate ColumnId found in source catalog. Columns: {:?}",
columns
);
}
debug_assert_column_ids_distinct(&source_catalog.columns);
self.included_relations.insert(source_catalog.id.into());
(
Relation::Source(Box::new(BoundSource {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl TableType {
}
}

/// The version of a table, used by schema change. See [`PbTableVersion`].
/// The version of a table, used by schema change. See [`PbTableVersion`] for more details.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct TableVersion {
pub version_id: TableVersionId,
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Resul
Ok(stmt.to_string())
}

/// FIXME: perhapts we should use sth like `ColumnIdGenerator::new_alter`,
/// However, the `SourceVersion` is problematic: It doesn't contain `next_col_id`.
/// (But for now this isn't a large problem, since drop column is not allowed for source yet..)
///
/// Besides, the logic of column id handling is a mess.
/// In some places, we use `ColumnId::placeholder()`, and use `col_id_gen` to fill it at the end;
/// In other places, we create column id ad-hoc.
pub fn max_column_id(columns: &Vec<ColumnCatalog>) -> ColumnId {
// XXX: should we check the column IDs of struct fields here?
columns
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ pub async fn refresh_sr_and_get_columns_diff(
};

let mut added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns);
// The newly resolved columns' column IDs also starts from 0. They cannot be used directly.
// The newly resolved columns' column IDs also starts from 1. They cannot be used directly.
let mut next_col_id = max_column_id(&original_source.columns).next();
for col in &mut added_columns {
col.column_desc.column_id = next_col_id;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::array::arrow::{FromArrow, IcebergArrowConvert};
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{
is_column_ids_distinct, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId,
debug_assert_column_ids_distinct, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId,
INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
};
use risingwave_common::types::DataType;
Expand Down Expand Up @@ -1417,7 +1417,7 @@ pub async fn bind_create_source(
for c in &mut columns {
c.column_desc.column_id = col_id_gen.generate(c.name())
}
debug_assert!(is_column_ids_distinct(&columns));
debug_assert_column_ids_distinct(&columns);

let must_need_pk = if is_create_source {
with_properties.connector_need_pk()
Expand Down

0 comments on commit 295121d

Please sign in to comment.