Skip to content

Commit

Permalink
feat(cdc): auto schema change for mysql cdc (#17876)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Aug 20, 2024
1 parent 190875c commit 26a5721
Show file tree
Hide file tree
Showing 71 changed files with 1,086 additions and 127 deletions.
12 changes: 1 addition & 11 deletions e2e_test/error_ui/simple/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,8 @@ Caused by these errors (recent errors listed first):
2: invalid IPv4 address


statement error
statement error Failed to run the query
create function int_42() returns int as int_42 using link '55.55.55.55:5555';
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: failed to check UDF signature
2: failed to send requests to UDF service
3: status: Unknown, message: "transport error", details: [], metadata: MetadataMap { headers: {} }
4: transport error
5: connection error
6: connection reset


statement error
Expand Down
69 changes: 69 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_change_mysql.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
control substitution on

system ok
mysql -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE mytest;"

system ok
mysql -e "
USE mytest;
DROP TABLE IF EXISTS customers;
CREATE TABLE customers(
id BIGINT PRIMARY KEY,
modified DATETIME,
custinfo JSON
);
ALTER TABLE customers ADD INDEX zipsa( (CAST(custinfo->'zipcode' AS UNSIGNED ARRAY)) );
"

statement ok
create source mysql_source with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'root',
password = '${MYSQL_PWD:}',
database.name = 'mytest',
server.id = '5701',
auto.schema.change = 'true'
);

statement ok
create table rw_customers (id bigint, modified timestamp, custinfo jsonb, primary key (id)) from mysql_source table 'mytest.customers';

# Name, Type, Is Hidden, Description
query TTTT
describe rw_customers;
----
id bigint false NULL
modified timestamp without time zone false NULL
custinfo jsonb false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL


system ok
mysql -e "
USE mytest;
ALTER TABLE customers ADD COLUMN v1 VARCHAR(255);
ALTER TABLE customers ADD COLUMN v2 double(5,2);
"

sleep 3s

# Name, Type, Is Hidden, Description
query TTTT
describe rw_customers;
----
id bigint false NULL
modified timestamp without time zone false NULL
custinfo jsonb false NULL
v1 character varying false NULL
v2 double precision false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL


statement ok
drop source mysql_source cascade;
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ table.include.list=${database.name}.${table.name:-*}
schema.history.internal.store.only.captured.tables.ddl=true
schema.history.internal.store.only.captured.databases.ddl=true
# default to disable schema change events
include.schema.changes=${debezium.include.schema.changes:-false}
include.schema.changes=${auto.schema.change:-false}
database.server.id=${server.id}
# default to use unencrypted connection
database.ssl.mode=${ssl.mode:-disabled}
Expand Down
5 changes: 5 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,11 @@ message Table {
// conflict" operations.
optional uint32 version_column_index = 38;

// The unique identifier of the upstream table if it is a CDC table.
// It will be used in auto schema change to get the Table which mapped to the
// upstream table.
optional string cdc_table_id = 39;

// Per-table catalog version, used by schema change. `None` for internal
// tables and tests. Not to be confused with the global catalog version for
// notification service.
Expand Down
2 changes: 2 additions & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ message WorkerNode {
bool is_streaming = 1;
bool is_serving = 2;
bool is_unschedulable = 3;
// This is used for frontend node to register its rpc address
string internal_rpc_host_addr = 4;
}
message Resource {
string rw_version = 1;
Expand Down
9 changes: 8 additions & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -454,14 +454,20 @@ message TableSchemaChange {
}

TableChangeType change_type = 1;
string cdc_table_name = 2;
string cdc_table_id = 2;
repeated plan_common.ColumnCatalog columns = 3;
}

message SchemaChangeEnvelope {
repeated TableSchemaChange table_changes = 1;
}

message AutoSchemaChangeRequest {
SchemaChangeEnvelope schema_change = 1;
}

message AutoSchemaChangeResponse {}

service DdlService {
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
rpc DropDatabase(DropDatabaseRequest) returns (DropDatabaseResponse);
Expand Down Expand Up @@ -500,4 +506,5 @@ service DdlService {
rpc GetTables(GetTablesRequest) returns (GetTablesResponse);
rpc Wait(WaitRequest) returns (WaitResponse);
rpc CommentOn(CommentOnRequest) returns (CommentOnResponse);
rpc AutoSchemaChange(AutoSchemaChangeRequest) returns (AutoSchemaChangeResponse);
}
23 changes: 23 additions & 0 deletions proto/frontend_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
syntax = "proto3";

package frontend_service;

import "ddl_service.proto";

option java_package = "com.risingwave.proto";
option optimize_for = SPEED;

message GetTableReplacePlanRequest {
uint32 database_id = 1;
uint32 owner = 2;
string table_name = 3;
ddl_service.TableSchemaChange table_change = 4;
}

message GetTableReplacePlanResponse {
ddl_service.ReplaceTablePlan replace_plan = 1;
}

service FrontendService {
rpc GetTableReplacePlan(GetTableReplacePlanRequest) returns (GetTableReplacePlanResponse);
}
2 changes: 2 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ message AddWorkerNodeRequest {
bool is_streaming = 2;
bool is_serving = 3;
bool is_unschedulable = 4;
// This is used for frontend node to register its rpc address
string internal_rpc_host_addr = 5;
}
common.WorkerType worker_type = 1;
common.HostAddress host = 2;
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ impl SourceExecutor {
rate_limit: None,
},
ConnectorProperties::default(),
None,
));
let stream = self
.source
Expand Down
2 changes: 2 additions & 0 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: true,
internal_rpc_host_addr: "".to_string(),
}),
transactional_id: Some(1),
..Default::default()
Expand All @@ -444,6 +445,7 @@ mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
internal_rpc_host_addr: "".to_string(),
}),
transactional_id: Some(2),
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ mod test {
],
),
prometheus_listener_addr: "127.0.0.1:1234",
health_check_listener_addr: "127.0.0.1:6786",
frontend_rpc_listener_addr: "127.0.0.1:6786",
config_path: "src/config/test.toml",
metrics_level: None,
enable_barrier_read: None,
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,10 @@ pub struct StreamingDeveloperConfig {
/// If not specified, the value of `server.connection_pool_size` will be used.
#[serde(default = "default::developer::stream_exchange_connection_pool_size")]
pub exchange_connection_pool_size: Option<u16>,

/// A flag to allow disabling the auto schema change handling
#[serde(default = "default::developer::stream_enable_auto_schema_change")]
pub enable_auto_schema_change: bool,
}

/// The subsections `[batch.developer]`.
Expand Down Expand Up @@ -1903,6 +1907,10 @@ pub mod default {
pub fn enable_actor_tokio_metrics() -> bool {
false
}

pub fn stream_enable_auto_schema_change() -> bool {
true
}
}

pub use crate::system_param::default as system;
Expand Down
7 changes: 4 additions & 3 deletions src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
internal_rpc_host_addr: "".to_string(),
};

let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| {
Expand All @@ -231,7 +232,7 @@ mod tests {
let worker_1 = WorkerNode {
id: 1,
parallelism: 1,
property: Some(serving_property),
property: Some(serving_property.clone()),
..Default::default()
};

Expand All @@ -246,7 +247,7 @@ mod tests {
let worker_2 = WorkerNode {
id: 2,
parallelism: 50,
property: Some(serving_property),
property: Some(serving_property.clone()),
..Default::default()
};

Expand All @@ -265,7 +266,7 @@ mod tests {
let worker_3 = WorkerNode {
id: 3,
parallelism: 60,
property: Some(serving_property),
property: Some(serving_property.clone()),
..Default::default()
};
let re_pu_mapping_2 = place_vnode(
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pub async fn compute_node_serve(
is_streaming: opts.role.for_streaming(),
is_serving: opts.role.for_serving(),
is_unschedulable: false,
internal_rpc_host_addr: "".to_string(),
},
&config.meta,
)
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ stream_enable_arrangement_backfill = true
stream_high_join_amplification_threshold = 2048
stream_enable_actor_tokio_metrics = false
stream_exchange_connection_pool_size = 1
stream_enable_auto_schema_change = true

[storage]
share_buffers_sync_parallelism = 1
Expand Down
15 changes: 13 additions & 2 deletions src/connector/src/parser/debezium/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,16 @@ impl From<&str> for TableChangeType {

#[derive(Debug)]
pub struct TableSchemaChange {
pub(crate) cdc_table_name: String,
pub(crate) cdc_table_id: String,
pub(crate) columns: Vec<ColumnCatalog>,
pub(crate) change_type: TableChangeType,
}

impl SchemaChangeEnvelope {
pub fn is_empty(&self) -> bool {
self.table_changes.is_empty()
}

pub fn to_protobuf(&self) -> PbSchemaChangeEnvelope {
let table_changes = self
.table_changes
Expand All @@ -83,12 +87,19 @@ impl SchemaChangeEnvelope {
.collect();
PbTableSchemaChange {
change_type: table_change.change_type.to_proto() as _,
cdc_table_name: table_change.cdc_table_name.clone(),
cdc_table_id: table_change.cdc_table_id.clone(),
columns,
}
})
.collect();

PbSchemaChangeEnvelope { table_changes }
}

pub fn table_names(&self) -> Vec<String> {
self.table_changes
.iter()
.map(|table_change| table_change.cdc_table_id.clone())
.collect()
}
}
22 changes: 20 additions & 2 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,8 +836,26 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
}
},

Ok(ParseResult::SchemaChange(_)) => {
// TODO
Ok(ParseResult::SchemaChange(schema_change)) => {
if schema_change.is_empty() {
continue;
}

let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
// we bubble up the schema change event to the source executor via channel,
// and wait for the source executor to finish the schema change process before
// parsing the following messages.
if let Some(ref tx) = parser.source_ctx().schema_change_tx {
tx.send((schema_change, oneshot_tx))
.await
.expect("send schema change to executor");
match oneshot_rx.await {
Ok(()) => {}
Err(e) => {
tracing::error!(error = %e.as_report(), "failed to wait for schema change");
}
}
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ mod tests {
SchemaChangeEnvelope {
table_changes: [
TableSchemaChange {
cdc_table_name: "mydb.test",
cdc_table_id: "mydb.test",
columns: [
ColumnCatalog {
column_desc: ColumnDesc {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ pub fn parse_schema_change(
}
}
schema_changes.push(TableSchemaChange {
cdc_table_name: id.replace('"', ""), // remove the double quotes
cdc_table_id: id.replace('"', ""), // remove the double quotes
columns: column_descs
.into_iter()
.map(|column_desc| ColumnCatalog {
Expand Down
Loading

0 comments on commit 26a5721

Please sign in to comment.