Skip to content

Commit

Permalink
feat(cdc): add event log and fill table catalog for auto schema change (
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Aug 22, 2024
1 parent c37ca31 commit 4a5bc4c
Show file tree
Hide file tree
Showing 29 changed files with 358 additions and 65 deletions.
1 change: 1 addition & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ message Table {
string name = 4;
repeated plan_common.ColumnCatalog columns = 5;
repeated common.ColumnOrder pk = 6;
// For cdc table created from a cdc source, here records the source id.
repeated uint32 dependent_relations = 8;
oneof optional_associated_source_id {
uint32 associated_source_id = 9;
Expand Down
1 change: 1 addition & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ message TableSchemaChange {
TableChangeType change_type = 1;
string cdc_table_id = 2;
repeated plan_common.ColumnCatalog columns = 3;
string upstream_ddl = 4;
}

message SchemaChangeEnvelope {
Expand Down
6 changes: 6 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,11 @@ message EventLog {
common.HostAddress host_addr = 3;
string panic_info = 4;
}
message EventAutoSchemaChangeFail {
uint32 table_id = 1;
string cdc_table_id = 2;
string upstream_ddl = 3;
}
// Event logs identifier, which should be populated by event log service.
optional string unique_id = 1;
// Processing time, which should be populated by event log service.
Expand All @@ -743,6 +748,7 @@ message EventLog {
EventInjectBarrierFail inject_barrier_fail = 7;
EventCollectBarrierFail collect_barrier_fail = 8;
EventLog.EventWorkerNodePanic worker_node_panic = 9;
EventLog.EventAutoSchemaChangeFail auto_schema_change_fail = 10;
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/parser/debezium/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub struct TableSchemaChange {
pub(crate) cdc_table_id: String,
pub(crate) columns: Vec<ColumnCatalog>,
pub(crate) change_type: TableChangeType,
pub(crate) upstream_ddl: String,
}

impl SchemaChangeEnvelope {
Expand All @@ -89,14 +90,15 @@ impl SchemaChangeEnvelope {
change_type: table_change.change_type.to_proto() as _,
cdc_table_id: table_change.cdc_table_id.clone(),
columns,
upstream_ddl: table_change.upstream_ddl.clone(),
}
})
.collect();

PbSchemaChangeEnvelope { table_changes }
}

pub fn table_names(&self) -> Vec<String> {
pub fn table_ids(&self) -> Vec<String> {
self.table_changes
.iter()
.map(|table_change| table_change.cdc_table_id.clone())
Expand Down
11 changes: 8 additions & 3 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ impl PlainParser {
.generate_accessor(data)
.await?;

return match parse_schema_change(&accessor, &self.source_ctx.connector_props) {
return match parse_schema_change(
&accessor,
self.source_ctx.source_id.into(),
&self.source_ctx.connector_props,
) {
Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)),
Err(err) => Err(err)?,
};
Expand Down Expand Up @@ -482,7 +486,7 @@ mod tests {
.unwrap();
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0);

let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_1", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#;
let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_0", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#;
let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
"mydb.test".to_string(),
0,
Expand All @@ -508,7 +512,7 @@ mod tests {
SchemaChangeEnvelope {
table_changes: [
TableSchemaChange {
cdc_table_id: "mydb.test",
cdc_table_id: "0.mydb.test",
columns: [
ColumnCatalog {
column_desc: ColumnDesc {
Expand Down Expand Up @@ -560,6 +564,7 @@ mod tests {
},
],
change_type: Alter,
upstream_ddl: "ALTER TABLE test add column v2 varchar(32)",
},
],
},
Expand Down
20 changes: 19 additions & 1 deletion src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use itertools::Itertools;
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
use risingwave_common::types::{
DataType, Datum, DatumCow, Scalar, ScalarImpl, ScalarRefImpl, Timestamptz, ToDatumRef,
ToOwnedDatum,
};
use risingwave_connector_codec::decoder::AccessExt;
use risingwave_pb::plan_common::additional_column::ColumnType;
Expand All @@ -25,6 +26,7 @@ use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation
use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaChange};
use crate::parser::schema_change::TableChangeType;
use crate::parser::TransactionControl;
use crate::source::cdc::build_cdc_table_id;
use crate::source::cdc::external::mysql::{mysql_type_to_rw_type, type_name_to_mysql_type};
use crate::source::{ConnectorProperties, SourceColumnDesc};

Expand Down Expand Up @@ -70,6 +72,7 @@ pub struct DebeziumChangeEvent<A> {
const BEFORE: &str = "before";
const AFTER: &str = "after";

const UPSTREAM_DDL: &str = "ddl";
const SOURCE: &str = "source";
const SOURCE_TS_MS: &str = "ts_ms";
const SOURCE_DB: &str = "db";
Expand Down Expand Up @@ -152,12 +155,23 @@ macro_rules! jsonb_access_field {
};
}

/// Parse the schema change message from Debezium.
/// The layout of MySQL schema change message can refer to
/// <https://debezium.io/documentation/reference/2.6/connectors/mysql.html#mysql-schema-change-topic>
pub fn parse_schema_change(
accessor: &impl Access,
source_id: u32,
connector_props: &ConnectorProperties,
) -> AccessResult<SchemaChangeEnvelope> {
let mut schema_changes = vec![];

let upstream_ddl = accessor
.access(&[UPSTREAM_DDL], &DataType::Varchar)?
.to_owned_datum()
.unwrap()
.as_utf8()
.to_string();

if let Some(ScalarRefImpl::List(table_changes)) = accessor
.access(&[TABLE_CHANGES], &DataType::List(Box::new(DataType::Jsonb)))?
.to_datum_ref()
Expand Down Expand Up @@ -210,8 +224,11 @@ pub fn parse_schema_change(
column_descs.push(ColumnDesc::named(name, ColumnId::placeholder(), data_type));
}
}

// concatenate the source_id to the cdc_table_id
let cdc_table_id = build_cdc_table_id(source_id, id.replace('"', "").as_str());
schema_changes.push(TableSchemaChange {
cdc_table_id: id.replace('"', ""), // remove the double quotes
cdc_table_id,
columns: column_descs
.into_iter()
.map(|column_desc| ColumnCatalog {
Expand All @@ -220,6 +237,7 @@ pub fn parse_schema_change(
})
.collect_vec(),
change_type: ty.as_str().into(),
upstream_ddl: upstream_ddl.clone(),
});
}

Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ pub const CITUS_CDC_CONNECTOR: &str = Citus::CDC_CONNECTOR_NAME;
pub const MONGODB_CDC_CONNECTOR: &str = Mongodb::CDC_CONNECTOR_NAME;
pub const SQL_SERVER_CDC_CONNECTOR: &str = SqlServer::CDC_CONNECTOR_NAME;

/// Build a unique CDC table identifier from a source ID and external table name
pub fn build_cdc_table_id(source_id: u32, external_table_name: &str) -> String {
format!("{}.{}", source_id, external_table_name)
}

pub trait CdcSourceTypeTrait: Send + Sync + Clone + 'static {
const CDC_CONNECTOR_NAME: &'static str;
fn source_type() -> CdcSourceType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ fn event_type(e: &Event) -> String {
Event::InjectBarrierFail(_) => "INJECT_BARRIER_FAIL",
Event::CollectBarrierFail(_) => "COLLECT_BARRIER_FAIL",
Event::WorkerNodePanic(_) => "WORKER_NODE_PANIC",
Event::AutoSchemaChangeFail(_) => "AUTO_SCHEMA_CHANGE_FAIL",
}
.into()
}
4 changes: 3 additions & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use risingwave_common::catalog::{
use risingwave_common::license::Feature;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_connector::source::cdc::build_cdc_table_id;
use risingwave_connector::source::cdc::external::{
ExternalTableConfig, ExternalTableImpl, DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
};
Expand Down Expand Up @@ -834,6 +835,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table(
vec![],
);

let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
let materialize = plan_root.gen_table_plan(
context,
resolved_table_name,
Expand All @@ -848,7 +850,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table(
Some(col_id_gen.into_version()),
true,
None,
Some(external_table_name),
Some(cdc_table_id),
)?;

let mut table = materialize.table().to_prost(schema_id, database_id);
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ impl PlanRoot {
version: Option<TableVersion>,
with_external_source: bool,
retention_seconds: Option<NonZeroU32>,
cdc_table_name: Option<String>,
cdc_table_id: Option<String>,
) -> Result<StreamMaterialize> {
assert_eq!(self.phase, PlanPhase::Logical);
assert_eq!(self.plan.convention(), Convention::Logical);
Expand Down Expand Up @@ -868,7 +868,7 @@ impl PlanRoot {
row_id_index,
version,
retention_seconds,
cdc_table_name,
cdc_table_id,
)
}

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl StreamMaterialize {
row_id_index: Option<usize>,
version: Option<TableVersion>,
retention_seconds: Option<NonZeroU32>,
cdc_table_name: Option<String>,
cdc_table_id: Option<String>,
) -> Result<Self> {
let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?;

Expand All @@ -154,7 +154,7 @@ impl StreamMaterialize {
CreateType::Foreground,
)?;

table.cdc_table_id = cdc_table_name;
table.cdc_table_id = cdc_table_id;

Ok(Self::new(input, table))
}
Expand Down
10 changes: 10 additions & 0 deletions src/meta/model_v2/src/object_dependency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ pub enum Relation {
on_delete = "Cascade"
)]
Object1,

// To join source on the oid column
#[sea_orm(
belongs_to = "super::source::Entity",
from = "Column::Oid",
to = "super::source::Column::SourceId",
on_update = "NoAction",
on_delete = "Cascade"
)]
Source,
}

impl ActiveModelBehavior for ActiveModel {}
10 changes: 10 additions & 0 deletions src/meta/model_v2/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,16 @@ pub enum Relation {
on_delete = "NoAction"
)]
Source,

// To join object_dependency on the used_by column
#[sea_orm(
belongs_to = "super::object_dependency::Entity",
from = "Column::TableId",
to = "super::object_dependency::Column::UsedBy",
on_update = "NoAction",
on_delete = "Cascade"
)]
ObjectDependency,
}

impl Related<super::object::Entity> for Entity {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ pub async fn start_service_as_election_leader(
.await
.unwrap(),
);
let catalog_controller = Arc::new(CatalogController::new(env.clone()));
let catalog_controller = Arc::new(CatalogController::new(env.clone()).await?);
MetadataManager::new_v2(cluster_controller, catalog_controller)
}
};
Expand Down
Loading

0 comments on commit 4a5bc4c

Please sign in to comment.