Skip to content

Commit

Permalink
refactor(common): use ColumnDesc::named when possible (#13574)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Nov 22, 2023
1 parent dd3b5c6 commit 710a01e
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 145 deletions.
4 changes: 2 additions & 2 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ impl ColumnDesc {
}
}

pub fn named(name: String, column_id: ColumnId, data_type: DataType) -> ColumnDesc {
pub fn named(name: impl Into<String>, column_id: ColumnId, data_type: DataType) -> ColumnDesc {
ColumnDesc {
data_type,
column_id,
name,
name: name.into(),
field_descs: vec![],
type_name: String::new(),
generated_or_default_column: None,
Expand Down
49 changes: 11 additions & 38 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ pub fn is_system_schema(schema_name: &str) -> bool {

pub const ROWID_PREFIX: &str = "_row_id";

pub fn row_id_column_name() -> String {
ROWID_PREFIX.to_string()
}

pub fn is_row_id_column_name(name: &str) -> bool {
name.starts_with(ROWID_PREFIX)
}
Expand All @@ -105,57 +101,34 @@ pub const USER_COLUMN_ID_OFFSET: i32 = ROW_ID_COLUMN_ID.next().get_id();

/// Creates a row ID column (for implicit primary key). It'll always have the ID `0` for now.
pub fn row_id_column_desc() -> ColumnDesc {
ColumnDesc {
data_type: DataType::Serial,
column_id: ROW_ID_COLUMN_ID,
name: row_id_column_name(),
field_descs: vec![],
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
}
ColumnDesc::named(ROWID_PREFIX, ROW_ID_COLUMN_ID, DataType::Serial)
}

pub const OFFSET_COLUMN_NAME: &str = "_rw_offset";

pub fn offset_column_name() -> String {
OFFSET_COLUMN_NAME.to_string()
}

pub const CDC_SOURCE_COLUMN_NUM: u32 = 4;
pub const TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name";
pub fn cdc_table_name_column_name() -> String {
TABLE_NAME_COLUMN_NAME.to_string()
}

pub fn is_offset_column_name(name: &str) -> bool {
name.starts_with(OFFSET_COLUMN_NAME)
}
/// Creates a offset column for storing upstream offset
/// Used in cdc source currently
pub fn offset_column_desc() -> ColumnDesc {
ColumnDesc {
data_type: DataType::Varchar,
column_id: ColumnId::placeholder(),
name: offset_column_name(),
field_descs: vec![],
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
}
ColumnDesc::named(
OFFSET_COLUMN_NAME,
ColumnId::placeholder(),
DataType::Varchar,
)
}

/// A column to store the upstream table name of the cdc table
pub fn cdc_table_name_column_desc() -> ColumnDesc {
ColumnDesc {
data_type: DataType::Varchar,
column_id: ColumnId::placeholder(),
name: cdc_table_name_column_name(),
field_descs: vec![],
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
}
ColumnDesc::named(
TABLE_NAME_COLUMN_NAME,
ColumnId::placeholder(),
DataType::Varchar,
)
}

/// The local system catalog reader in the frontend node.
Expand Down
10 changes: 1 addition & 9 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,7 @@ async fn test_table_materialize() -> StreamResult<()> {
let column_descs = all_column_ids
.iter()
.zip_eq_fast(all_schema.fields.iter().cloned())
.map(|(column_id, field)| ColumnDesc {
data_type: field.data_type,
column_id: *column_id,
name: field.name,
field_descs: vec![],
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
})
.map(|(column_id, field)| ColumnDesc::named(field.name, *column_id, field.data_type))
.collect_vec();
let (barrier_tx, barrier_rx) = unbounded_channel();
let vnodes = Bitmap::from_bytes(&[0b11111111]);
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/source/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,10 +578,10 @@ mod tests {
sink_id: Default::default(),
properties: Default::default(),
columns: vec![
ColumnDesc::named("v1".into(), ColumnId::new(1), DataType::Int32),
ColumnDesc::named("v2".into(), ColumnId::new(2), DataType::Decimal),
ColumnDesc::named("v3".into(), ColumnId::new(3), DataType::Varchar),
ColumnDesc::named("v4".into(), ColumnId::new(4), DataType::Date),
ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
ColumnDesc::named("v2", ColumnId::new(2), DataType::Decimal),
ColumnDesc::named("v3", ColumnId::new(3), DataType::Varchar),
ColumnDesc::named("v4", ColumnId::new(4), DataType::Date),
],
downstream_pk: vec![0],
sink_type: SinkType::AppendOnly,
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/nexmark/source/combined_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
pub use nexmark::event::EventType;
use nexmark::event::{Auction, Bid, Event, Person};
use risingwave_common::array::StructValue;
use risingwave_common::catalog::row_id_column_name;
use risingwave_common::catalog::ROWID_PREFIX;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum, ScalarImpl, StructType, Timestamp};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -108,7 +108,7 @@ pub fn get_event_data_types_with_names(

if let Some(row_id_index) = row_id_index {
// _row_id
fields.insert(row_id_index, (row_id_column_name(), DataType::Serial));
fields.insert(row_id_index, (ROWID_PREFIX.into(), DataType::Serial));
}

fields
Expand Down
15 changes: 5 additions & 10 deletions src/frontend/src/binder/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,16 +570,11 @@ pub fn bind_struct_field(column_def: &StructField) -> Result<ColumnDesc> {
let field_descs = if let AstDataType::Struct(defs) = &column_def.data_type {
defs.iter()
.map(|f| {
Ok(ColumnDesc {
data_type: bind_data_type(&f.data_type)?,
// Literals don't have `column_id`.
column_id: ColumnId::new(0),
name: f.name.real_value(),
field_descs: vec![],
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
})
Ok(ColumnDesc::named(
f.name.real_value(),
ColumnId::new(0), // Literals don't have `column_id`.
bind_data_type(&f.data_type)?,
))
})
.collect::<Result<Vec<_>>>()?
} else {
Expand Down
9 changes: 2 additions & 7 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,7 @@ fn schema_has_schema_registry(schema: &ConnectorSchema) -> bool {
mod tests {
use std::collections::HashMap;

use risingwave_common::catalog::{
row_id_column_name, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME,
};
use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROWID_PREFIX};
use risingwave_common::types::DataType;

use crate::catalog::root_catalog::SchemaPath;
Expand Down Expand Up @@ -333,10 +331,7 @@ mod tests {
// Check the old columns and IDs are not changed.
assert_eq!(columns["i"], altered_columns["i"]);
assert_eq!(columns["r"], altered_columns["r"]);
assert_eq!(
columns[row_id_column_name().as_str()],
altered_columns[row_id_column_name().as_str()]
);
assert_eq!(columns[ROWID_PREFIX], altered_columns[ROWID_PREFIX]);

// Check the version is updated.
assert_eq!(
Expand Down
7 changes: 2 additions & 5 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,7 @@ pub mod tests {
use std::collections::HashMap;

use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
use risingwave_common::catalog::{
row_id_column_name, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME,
};
use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROWID_PREFIX};
use risingwave_common::types::DataType;

use crate::catalog::root_catalog::SchemaPath;
Expand Down Expand Up @@ -295,9 +293,8 @@ pub mod tests {
vec![DataType::Varchar, DataType::Varchar],
vec!["address".to_string(), "zipcode".to_string()],
);
let row_id_col_name = row_id_column_name();
let expected_columns = maplit::hashmap! {
row_id_col_name.as_str() => DataType::Serial,
ROWID_PREFIX => DataType::Serial,
"country" => DataType::new_struct(
vec![DataType::Varchar,city_type,DataType::Varchar],
vec!["address".to_string(), "city".to_string(), "zipcode".to_string()],
Expand Down
74 changes: 19 additions & 55 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,7 @@ async fn extract_json_table_schema(
pub fn debezium_cdc_source_schema() -> Vec<ColumnCatalog> {
let columns = vec![
ColumnCatalog {
column_desc: ColumnDesc {
data_type: DataType::Jsonb,
column_id: ColumnId::placeholder(),
name: "payload".to_string(),
field_descs: vec![],
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
},
column_desc: ColumnDesc::named("payload", ColumnId::placeholder(), DataType::Jsonb),
is_hidden: false,
},
ColumnCatalog::offset_column(),
Expand Down Expand Up @@ -621,27 +613,11 @@ pub(crate) fn bind_all_columns(
(Format::DebeziumMongo, Encode::Json) => {
let mut columns = vec![
ColumnCatalog {
column_desc: ColumnDesc {
data_type: DataType::Varchar,
column_id: 0.into(),
name: "_id".to_string(),
field_descs: vec![],
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
},
column_desc: ColumnDesc::named("_id", 0.into(), DataType::Varchar),
is_hidden: false,
},
ColumnCatalog {
column_desc: ColumnDesc {
data_type: DataType::Jsonb,
column_id: 0.into(),
name: "payload".to_string(),
field_descs: vec![],
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
},
column_desc: ColumnDesc::named("payload", 0.into(), DataType::Jsonb),
is_hidden: false,
},
];
Expand Down Expand Up @@ -828,15 +804,11 @@ fn check_and_add_timestamp_column(
) {
if is_kafka_connector(with_properties) {
let kafka_timestamp_column = ColumnCatalog {
column_desc: ColumnDesc {
data_type: DataType::Timestamptz,
column_id: ColumnId::placeholder(),
name: KAFKA_TIMESTAMP_COLUMN_NAME.to_string(),
field_descs: vec![],
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
},
column_desc: ColumnDesc::named(
KAFKA_TIMESTAMP_COLUMN_NAME,
ColumnId::placeholder(),
DataType::Timestamptz,
),

is_hidden: true,
};
Expand All @@ -846,15 +818,11 @@ fn check_and_add_timestamp_column(

fn add_default_key_column(columns: &mut Vec<ColumnCatalog>) {
let column = ColumnCatalog {
column_desc: ColumnDesc {
data_type: DataType::Bytea,
column_id: ColumnId::new(columns.len() as i32),
name: DEFAULT_KEY_COLUMN_NAME.to_string(),
field_descs: vec![],
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
},
column_desc: ColumnDesc::named(
DEFAULT_KEY_COLUMN_NAME,
(columns.len() as i32).into(),
DataType::Bytea,
),
is_hidden: false,
};
columns.push(column);
Expand Down Expand Up @@ -1279,8 +1247,8 @@ pub mod tests {
use std::collections::HashMap;

use risingwave_common::catalog::{
cdc_table_name_column_name, offset_column_name, row_id_column_name, DEFAULT_DATABASE_NAME,
DEFAULT_KEY_COLUMN_NAME, DEFAULT_SCHEMA_NAME,
DEFAULT_DATABASE_NAME, DEFAULT_KEY_COLUMN_NAME, DEFAULT_SCHEMA_NAME, OFFSET_COLUMN_NAME,
ROWID_PREFIX, TABLE_NAME_COLUMN_NAME,
};
use risingwave_common::types::DataType;

Expand Down Expand Up @@ -1319,9 +1287,8 @@ pub mod tests {
vec![DataType::Varchar, DataType::Varchar],
vec!["address".to_string(), "zipcode".to_string()],
);
let row_id_col_name = row_id_column_name();
let expected_columns = maplit::hashmap! {
row_id_col_name.as_str() => DataType::Serial,
ROWID_PREFIX => DataType::Serial,
DEFAULT_KEY_COLUMN_NAME => DataType::Bytea,
"id" => DataType::Int32,
"zipcode" => DataType::Int64,
Expand Down Expand Up @@ -1363,14 +1330,11 @@ pub mod tests {
.map(|col| (col.name(), col.data_type().clone()))
.collect::<HashMap<&str, DataType>>();

let row_id_col_name = row_id_column_name();
let offset_col_name = offset_column_name();
let table_name_col_name = cdc_table_name_column_name();
let expected_columns = maplit::hashmap! {
row_id_col_name.as_str() => DataType::Serial,
ROWID_PREFIX => DataType::Serial,
"payload" => DataType::Jsonb,
offset_col_name.as_str() => DataType::Varchar,
table_name_col_name.as_str() => DataType::Varchar,
OFFSET_COLUMN_NAME => DataType::Varchar,
TABLE_NAME_COLUMN_NAME => DataType::Varchar,
};
assert_eq!(columns, expected_columns);
}
Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ mod tests {
use std::collections::HashMap;

use risingwave_common::catalog::{
row_id_column_name, Field, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME,
Field, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROWID_PREFIX,
};
use risingwave_common::types::DataType;

Expand Down Expand Up @@ -1151,9 +1151,8 @@ mod tests {
.map(|col| (col.name(), col.data_type().clone()))
.collect::<HashMap<&str, DataType>>();

let row_id_col_name = row_id_column_name();
let expected_columns = maplit::hashmap! {
row_id_col_name.as_str() => DataType::Serial,
ROWID_PREFIX => DataType::Serial,
"v1" => DataType::Int16,
"v2" => DataType::new_struct(
vec![DataType::Int64,DataType::Float64,DataType::Float64],
Expand Down
16 changes: 6 additions & 10 deletions src/source/src/source_desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl SourceDescBuilder {
pub mod test_utils {
use std::collections::HashMap;

use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
use risingwave_common::catalog::{ColumnDesc, Schema};
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::ColumnCatalog;

Expand All @@ -190,15 +190,11 @@ pub mod test_utils {
.enumerate()
.map(|(i, f)| ColumnCatalog {
column_desc: Some(
ColumnDesc {
data_type: f.data_type.clone(),
column_id: ColumnId::from(i as i32), // use column index as column id
name: f.name.clone(),
field_descs: vec![],
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
}
ColumnDesc::named(
f.name.clone(),
(i as i32).into(), // use column index as column id
f.data_type.clone(),
)
.to_protobuf(),
),
is_hidden: false,
Expand Down

0 comments on commit 710a01e

Please sign in to comment.