Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): auto schema change for mysql cdc #17876

Merged
merged 93 commits into from
Aug 20, 2024
Merged
Changes from 1 commit
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
606b1d0
enable ddl event
StrikeW Jun 16, 2024
99c0785
split schema event to a chunk
StrikeW Jul 8, 2024
245ddbe
Merge remote-tracking branch 'origin/main' into siyuan/cdc-schema-change
StrikeW Jul 11, 2024
dbb18a5
skeleton to parse cdc message
StrikeW Jul 11, 2024
bd123ff
Merge branch 'siyuan/cdc-schema-change' of github.com:risingwavelabs/…
StrikeW Jul 11, 2024
5971ea2
minor
StrikeW Jul 11, 2024
1d60a6a
impl mysql schema event parser
StrikeW Jul 16, 2024
da9d689
Merge remote-tracking branch 'origin/main' into siyuan/cdc-schema-change
StrikeW Jul 17, 2024
7d2a408
meta->frontend rpc skeleton
StrikeW Jul 21, 2024
e1cc54a
call frontend to generate new table plan
StrikeW Jul 22, 2024
58e1817
minor
StrikeW Jul 22, 2024
35f9b4f
launch rpc service
StrikeW Jul 22, 2024
532c31e
refactor frontend service
StrikeW Jul 23, 2024
f507196
add cdc table name to PbTable
StrikeW Jul 23, 2024
9b85887
wip: meta->frontend rpc implementation
StrikeW Jul 23, 2024
33857bb
wip: rpc source parser->meta
StrikeW Jul 24, 2024
850f1d0
refine source -> meta -> frontend rpc
StrikeW Jul 24, 2024
6ffeb3b
submit schema change to spawned task in source exec
StrikeW Jul 24, 2024
44f736f
Merge remote-tracking branch 'origin/main' into siyuan/cdc-schema-change
StrikeW Jul 26, 2024
266354b
meta->frontend rpc skeleton
StrikeW Jul 21, 2024
32b9377
call frontend to generate new table plan
StrikeW Jul 22, 2024
253ea41
minor
StrikeW Jul 22, 2024
e6cfb50
launch rpc service
StrikeW Jul 22, 2024
f1c41b0
refactor frontend service
StrikeW Jul 23, 2024
dcb359e
add cdc table name to PbTable
StrikeW Jul 23, 2024
0539172
wip: meta->frontend rpc implementation
StrikeW Jul 23, 2024
f1fff64
wip: rpc source parser->meta
StrikeW Jul 24, 2024
5d97d73
refine source -> meta -> frontend rpc
StrikeW Jul 24, 2024
ead027b
submit schema change to spawned task in source exec
StrikeW Jul 24, 2024
7d1fe85
Merge branch 'siyuan/cdc-handle-schema-event' of github.com:risingwav…
StrikeW Jul 27, 2024
1295374
wip: debuging rpc
StrikeW Jul 27, 2024
82815d1
finish auto replace workflow
StrikeW Jul 28, 2024
7be0555
register frontend rpc addr to meta
StrikeW Jul 29, 2024
78088c9
add e2e test
StrikeW Jul 29, 2024
a2b2a2b
minor
StrikeW Jul 29, 2024
bd3cbaf
minor
StrikeW Jul 29, 2024
73878f4
refine
StrikeW Jul 29, 2024
09b7979
refine parsing
StrikeW Jul 29, 2024
950a09d
add
StrikeW Jul 30, 2024
27a7017
clean
StrikeW Jul 30, 2024
158ff10
format
StrikeW Jul 30, 2024
a7c00d6
clippy
StrikeW Jul 30, 2024
7cd2788
clean code
StrikeW Jul 31, 2024
702d146
minor
StrikeW Jul 31, 2024
12778aa
clean again
StrikeW Jul 31, 2024
6452745
fix comment
StrikeW Jul 31, 2024
c50856a
minor
StrikeW Jul 31, 2024
a8f52e4
fix comments
StrikeW Aug 1, 2024
4ce2c47
Merge branch 'siyuan/cdc-schema-change' into siyuan/cdc-handle-schema…
StrikeW Aug 1, 2024
b85a604
remove fullTableName
StrikeW Aug 1, 2024
3532e74
Merge remote-tracking branch 'origin/siyuan/cdc-schema-change' into s…
StrikeW Aug 1, 2024
22e67e8
minor
StrikeW Aug 1, 2024
70c9299
fix type cast
StrikeW Aug 2, 2024
2c435d9
Merge remote-tracking branch 'origin/siyuan/cdc-schema-change' into s…
StrikeW Aug 2, 2024
3012b57
Merge remote-tracking branch 'origin/siyuan/cdc-handle-schema-event' …
StrikeW Aug 2, 2024
af8bca9
clean
StrikeW Aug 2, 2024
5b1d8d6
clippy
StrikeW Aug 2, 2024
da9ee1b
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 2, 2024
6d18dce
fix comments
StrikeW Aug 6, 2024
d1711c9
fix check
StrikeW Aug 7, 2024
3038bf2
add license check
StrikeW Aug 7, 2024
717f1af
Merge branch 'main' into siyuan/cdc-handle-schema-event
StrikeW Aug 7, 2024
6ac586b
fix ut
StrikeW Aug 7, 2024
331409a
Merge branch 'siyuan/cdc-handle-schema-event' of github.com:risingwav…
StrikeW Aug 7, 2024
0dee439
minor
StrikeW Aug 7, 2024
90bc49e
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 7, 2024
f17d1c2
fix sim test
StrikeW Aug 8, 2024
40ccdd4
fix sim test
StrikeW Aug 8, 2024
ed91f7e
refactor frontend rpc addr impl
StrikeW Aug 12, 2024
259c055
fix
StrikeW Aug 12, 2024
df65235
add feature guard flag
StrikeW Aug 14, 2024
766313c
retry with grpc errors
StrikeW Aug 14, 2024
df80dd6
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 14, 2024
756d840
Revert "chore: bump `tonic` to v0.12 (#17889)"
StrikeW Aug 14, 2024
219a77e
minor
StrikeW Aug 14, 2024
ddd667a
Reapply "chore: bump `tonic` to v0.12 (#17889)"
StrikeW Aug 14, 2024
deefc9a
fix
StrikeW Aug 14, 2024
08d355b
fix
StrikeW Aug 14, 2024
5d238d2
fix
StrikeW Aug 14, 2024
1b3ee00
fix
StrikeW Aug 14, 2024
864fae2
rename cdc_table_name -> cdc_table_id
StrikeW Aug 16, 2024
a7fc383
fix
StrikeW Aug 16, 2024
7609748
fix
StrikeW Aug 16, 2024
52ddfeb
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 19, 2024
844cbf0
fix ut
StrikeW Aug 19, 2024
b853f20
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 19, 2024
27fa578
minor
StrikeW Aug 19, 2024
e0fd50c
minor
StrikeW Aug 19, 2024
ec742de
fix
StrikeW Aug 20, 2024
31162c8
try fix
StrikeW Aug 20, 2024
d68aca5
init logger for test_sink_scale
StrikeW Aug 20, 2024
d003d2b
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 20, 2024
533cf22
unblock ci
StrikeW Aug 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip: debuging rpc
StrikeW committed Jul 27, 2024
commit 129537451241a677bcbd3053513d283197010f8f
Original file line number Diff line number Diff line change
@@ -167,7 +167,8 @@ var record = event.value();
switch (eventType) {
case HEARTBEAT:
{
var message = msgBuilder.build();
var message =
msgBuilder.setMsgType(CdcMessage.CdcMessageType.HEARTBEAT).build();
LOG.debug("heartbeat => {}", message.getOffset());
respBuilder.addEvents(message);
break;
1 change: 1 addition & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
@@ -152,6 +152,7 @@ message CdcMessage {
DATA = 1;
TRANSACTION_META = 2;
SCHEMA_CHANGE = 3;
HEARTBEAT = 4;
}

// The value of the Debezium message
3 changes: 2 additions & 1 deletion src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
@@ -101,7 +101,7 @@ impl PlainParser {
&& let Some(data) = payload
{
match cdc_meta.msg_type {
CdcMessageType::Data => {
CdcMessageType::Data | CdcMessageType::Heartbeat => {
return self.parse_rows(key, Some(data), writer).await;
}
CdcMessageType::TransactionMeta => {
@@ -120,6 +120,7 @@ impl PlainParser {
};
}
CdcMessageType::SchemaChange => {
tracing::info!("got schema change message");
let accessor = self
.schema_change_builder
.as_mut()
2 changes: 2 additions & 0 deletions src/connector/src/source/cdc/source/message.rs
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ pub enum CdcMessageType {
Data,
TransactionMeta,
SchemaChange,
Heartbeat,
}

impl From<cdc_message::CdcMessageType> for CdcMessageType {
@@ -32,6 +33,7 @@ impl From<cdc_message::CdcMessageType> for CdcMessageType {
cdc_message::CdcMessageType::Data => CdcMessageType::Data,
cdc_message::CdcMessageType::TransactionMeta => CdcMessageType::TransactionMeta,
cdc_message::CdcMessageType::SchemaChange => CdcMessageType::SchemaChange,
cdc_message::CdcMessageType::Heartbeat => CdcMessageType::Heartbeat,
cdc_message::CdcMessageType::Unspecified => CdcMessageType::Unknown,
}
}
2 changes: 2 additions & 0 deletions src/frontend/src/rpc/mod.rs
Original file line number Diff line number Diff line change
@@ -62,6 +62,8 @@ impl FrontendService for FrontendServiceImpl {
request: RpcRequest<GetTableReplacePlanRequest>,
) -> Result<RpcResponse<GetTableReplacePlanResponse>, Status> {
let req = request.into_inner();
tracing::info!("get_table_replace_plan for table {}", req.table_name);

let table_change = req.table_change.expect("schema change message is required");
let replace_plan =
get_new_table_plan(table_change, req.table_name, req.database_id, req.owner).await?;
2 changes: 2 additions & 0 deletions src/meta/model_v2/migration/src/lib.rs
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ mod m20240630_131430_remove_parallel_unit;
mod m20240701_060504_hummock_time_travel;
mod m20240702_080451_system_param_value;
mod m20240702_084927_unnecessary_fk;
mod m20240726_063833_auto_schema_change;

pub struct Migrator;

@@ -39,6 +40,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240702_080451_system_param_value::Migration),
Box::new(m20240702_084927_unnecessary_fk::Migration),
Box::new(m20240701_060504_hummock_time_travel::Migration),
Box::new(m20240726_063833_auto_schema_change::Migration),
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use sea_orm_migration::prelude::{Table as MigrationTable, *};

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
MigrationTable::alter()
.table(Table::Table)
.add_column(ColumnDef::new(Table::CdcTableName).string())
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
MigrationTable::alter()
.table(Table::Table)
.drop_column(Table::CdcTableName)
.to_owned(),
)
.await
}
}

#[derive(DeriveIden)]
enum Table {
Table,
CdcTableName,
}
14 changes: 13 additions & 1 deletion src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
@@ -939,6 +939,8 @@ impl DdlService for DdlServiceImpl {
let worker = workers
.first()
.ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;

tracing::info!("get client for frontend {:?}", worker);
let client = self
.env
.frontend_client_pool()
@@ -955,11 +957,14 @@ impl DdlService for DdlServiceImpl {
for table_change in schema_change.table_changes {
let cdc_table_name = table_change.cdc_table_name.clone();

tracing::info!("auto schema change cdc table: {}", cdc_table_name);

// get the table catalog corresponding to the cdc table
let tables: Vec<Table> = self
.metadata_manager
.get_table_catalog_by_cdc_table_name(cdc_table_name)
.get_table_catalog_by_cdc_table_name(cdc_table_name.replace("\"", ""))
.await?;
tracing::info!("number of table to replace: {}", tables.len());

for table in tables {
// send a request to the frontend to get the ReplaceTablePlan
@@ -973,13 +978,20 @@ impl DdlService for DdlServiceImpl {
.await
.map_err(|err| MetaError::from(err))?;

if let Some(plan) = resp.replace_plan.as_ref() {
plan.table
.as_ref()
.inspect(|t| tracing::info!("Table to replace: {}", t.name));
}

if let Some(plan) = resp.replace_plan {
// start the schema change procedure
self.ddl_controller
.run_command(DdlCommand::ReplaceTable(Self::extract_replace_table_info(
plan,
)))
.await?;
tracing::info!("replace table {} success", table.id);
}
}
}
2 changes: 2 additions & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
@@ -848,6 +848,8 @@ impl CommandContext {

pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> {
let futures = self.info.node_map.values().map(|worker_node| async {
let worker = worker_node.clone();
tracing::info!("get client for compute {:?}", worker);
let client = self
.barrier_manager_context
.env
2 changes: 1 addition & 1 deletion src/storage/src/monitor/local_metrics.rs
Original file line number Diff line number Diff line change
@@ -210,7 +210,7 @@ impl Drop for StoreLocalStatistic {
&& !self.added.load(Ordering::Relaxed)
&& self.need_report()
{
tracing::error!("local stats lost!\n{:#?}", self);
// tracing::error!("local stats lost!\n{:#?}", self);
}
}
}
2 changes: 2 additions & 0 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
@@ -130,6 +130,8 @@ impl<S: StateStore> SourceExecutor<S> {
let _ = tokio::task::spawn(async move {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here spawn a task in source executor to handle schema change from parser

let mut schema_change_rx = rx;
while let Some((schema_change, parser_tx)) = schema_change_rx.recv().await {
tracing::info!("recv a schema change envelope");

// handle schema change
if let Some(ref meta_client) = meta_client {
match meta_client