Skip to content

Commit

Permalink
feat(meta): support replace source
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Dec 10, 2024
1 parent f31e762 commit ffb51b8
Show file tree
Hide file tree
Showing 22 changed files with 377 additions and 164 deletions.
44 changes: 28 additions & 16 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ message CreateSinkRequest {
catalog.Sink sink = 1;
stream_plan.StreamFragmentGraph fragment_graph = 2;
// It is used to provide a replace plan for the downstream table in `create sink into table` requests.
optional ReplaceTablePlan affected_table_change = 3;
optional ReplaceJobPlan affected_table_change = 3;

Check failure on line 88 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "3" with name "affected_table_change" on message "CreateSinkRequest" changed type from "ddl_service.ReplaceTablePlan" to "ddl_service.ReplaceJobPlan".
// The list of object IDs that this sink depends on.
repeated uint32 dependencies = 4;
}
Expand All @@ -98,7 +98,7 @@ message CreateSinkResponse {
message DropSinkRequest {
uint32 sink_id = 1;
bool cascade = 2;
optional ReplaceTablePlan affected_table_change = 3;
optional ReplaceJobPlan affected_table_change = 3;

Check failure on line 101 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "3" with name "affected_table_change" on message "DropSinkRequest" changed type from "ddl_service.ReplaceTablePlan" to "ddl_service.ReplaceJobPlan".
}

message DropSinkResponse {
Expand Down Expand Up @@ -353,26 +353,38 @@ message DropIndexResponse {
WaitVersion version = 2;
}

message ReplaceTablePlan {
// The new table catalog, with the correct (old) table ID and a new version.
// If the new version does not match the subsequent version in the meta service's
// catalog, this request will be rejected.
catalog.Table table = 1;
message ReplaceJobPlan {
// The new materialization plan, where all schema are updated.
stream_plan.StreamFragmentGraph fragment_graph = 2;
stream_plan.StreamFragmentGraph fragment_graph = 1;
// The mapping from the old columns to the new columns of the table.
// If no column modifications occur (such as for sinking into table), this will be None.
catalog.ColIndexMapping table_col_index_mapping = 3;
// Source catalog of table's associated source
catalog.Source source = 4;
TableJobType job_type = 5;
catalog.ColIndexMapping table_col_index_mapping = 2;

message ReplaceTable {
// The new table catalog, with the correct (old) table ID and a new version.
// If the new version does not match the subsequent version in the meta service's
// catalog, this request will be rejected.
catalog.Table table = 1;
// Source catalog of table's associated source
catalog.Source source = 2;
TableJobType job_type = 3;
}

message ReplaceSource {
catalog.Source source = 1;
}

oneof replace_job {
ReplaceTable replace_table = 3;
ReplaceSource replace_source = 4;
}
}

message ReplaceTablePlanRequest {
ReplaceTablePlan plan = 1;
message ReplaceJobPlanRequest {
ReplaceJobPlan plan = 1;
}

message ReplaceTablePlanResponse {
message ReplaceJobPlanResponse {
common.Status status = 1;
// The new global catalog version.
WaitVersion version = 2;
Expand Down Expand Up @@ -543,7 +555,7 @@ service DdlService {
rpc DropIndex(DropIndexRequest) returns (DropIndexResponse);
rpc CreateFunction(CreateFunctionRequest) returns (CreateFunctionResponse);
rpc DropFunction(DropFunctionRequest) returns (DropFunctionResponse);
rpc ReplaceTablePlan(ReplaceTablePlanRequest) returns (ReplaceTablePlanResponse);
rpc ReplaceJobPlan(ReplaceJobPlanRequest) returns (ReplaceJobPlanResponse);
rpc GetTable(GetTableRequest) returns (GetTableResponse);
rpc GetDdlProgress(GetDdlProgressRequest) returns (GetDdlProgressResponse);
rpc CreateConnection(CreateConnectionRequest) returns (CreateConnectionResponse);
Expand Down
2 changes: 1 addition & 1 deletion proto/frontend_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ message GetTableReplacePlanRequest {
}

message GetTableReplacePlanResponse {
ddl_service.ReplaceTablePlan replace_plan = 1;
ddl_service.ReplaceJobPlan replace_plan = 1;

Check failure on line 18 in proto/frontend_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "replace_plan" on message "GetTableReplacePlanResponse" changed type from "ddl_service.ReplaceTablePlan" to "ddl_service.ReplaceJobPlan".
}

service FrontendService {
Expand Down
1 change: 1 addition & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ message UpdateMutation {
// Added upstream actors.
repeated uint32 added_upstream_actor_id = 3;
// Removed upstream actors.
// Note: this is empty for replace job.
repeated uint32 removed_upstream_actor_id = 4;
}
// Dispatcher updates.
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_pb::catalog::{
};
use risingwave_pb::ddl_service::{
alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request,
create_connection_request, PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType,
create_connection_request, PbReplaceJobPlan, PbTableJobType, ReplaceJobPlan, TableJobType,
WaitVersion,
};
use risingwave_pb::meta::PbTableParallelism;
Expand Down Expand Up @@ -116,7 +116,7 @@ pub trait CatalogWriter: Send + Sync {
&self,
sink: PbSink,
graph: StreamFragmentGraph,
affected_table_change: Option<PbReplaceTablePlan>,
affected_table_change: Option<PbReplaceJobPlan>,
dependencies: HashSet<ObjectId>,
) -> Result<()>;

Expand Down Expand Up @@ -161,7 +161,7 @@ pub trait CatalogWriter: Send + Sync {
&self,
sink_id: u32,
cascade: bool,
affected_table_change: Option<PbReplaceTablePlan>,
affected_table_change: Option<PbReplaceJobPlan>,
) -> Result<()>;

async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()>;
Expand Down Expand Up @@ -329,7 +329,7 @@ impl CatalogWriter for CatalogWriterImpl {
&self,
sink: PbSink,
graph: StreamFragmentGraph,
affected_table_change: Option<ReplaceTablePlan>,
affected_table_change: Option<ReplaceJobPlan>,
dependencies: HashSet<ObjectId>,
) -> Result<()> {
let version = self
Expand Down Expand Up @@ -425,7 +425,7 @@ impl CatalogWriter for CatalogWriterImpl {
&self,
sink_id: u32,
cascade: bool,
affected_table_change: Option<ReplaceTablePlan>,
affected_table_change: Option<ReplaceJobPlan>,
) -> Result<()> {
let version = self
.meta_client
Expand Down
14 changes: 9 additions & 5 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use risingwave_connector::sink::{
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::connection_params::PbConnectionType;
use risingwave_pb::catalog::{PbSink, PbSource, Table};
use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType};
use risingwave_pb::ddl_service::{replace_job_plan, ReplaceJobPlan, TableJobType};
use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
use risingwave_pb::stream_plan::{MergeNode, StreamFragmentGraph, StreamNode};
use risingwave_pb::telemetry::TelemetryDatabaseObject;
Expand Down Expand Up @@ -521,12 +521,16 @@ pub async fn handle_create_sink(
// for new creating sink, we don't have a unique identity because the sink id is not generated yet.
hijack_merger_for_target_table(&mut graph, &columns_without_rw_timestamp, &sink, None)?;

target_table_replace_plan = Some(ReplaceTablePlan {
source,
table: Some(table),
target_table_replace_plan = Some(ReplaceJobPlan {
replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
replace_job_plan::ReplaceTable {
table: Some(table),
source,
job_type: TableJobType::General as _,
},
)),
fragment_graph: Some(graph),
table_col_index_mapping: None,
job_type: TableJobType::General as _,
});
}

Expand Down
14 changes: 9 additions & 5 deletions src/frontend/src/handler/drop_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashSet;

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType};
use risingwave_pb::ddl_service::{replace_job_plan, ReplaceJobPlan, TableJobType};
use risingwave_sqlparser::ast::ObjectName;

use super::RwPgResponse;
Expand Down Expand Up @@ -94,12 +94,16 @@ pub async fn handle_drop_sink(
)?;
}

affected_table_change = Some(ReplaceTablePlan {
source,
table: Some(table),
affected_table_change = Some(ReplaceJobPlan {
replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
replace_job_plan::ReplaceTable {
table: Some(table),
source,
job_type: TableJobType::General as _,
},
)),
fragment_graph: Some(graph),
table_col_index_mapping: None,
job_type: TableJobType::General as _,
});
}

Expand Down
16 changes: 10 additions & 6 deletions src/frontend/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use itertools::Itertools;
use pgwire::pg_server::{BoxedError, SessionManager};
use risingwave_pb::ddl_service::{ReplaceTablePlan, TableSchemaChange};
use risingwave_pb::ddl_service::{replace_job_plan, ReplaceJobPlan, TableSchemaChange};
use risingwave_pb::frontend_service::frontend_service_server::FrontendService;
use risingwave_pb::frontend_service::{GetTableReplacePlanRequest, GetTableReplacePlanResponse};
use risingwave_rpc_client::error::ToTonicStatus;
Expand Down Expand Up @@ -81,7 +81,7 @@ async fn get_new_table_plan(
table_name: String,
database_id: u32,
owner: u32,
) -> Result<ReplaceTablePlan, AutoSchemaChangeError> {
) -> Result<ReplaceJobPlan, AutoSchemaChangeError> {
tracing::info!("get_new_table_plan for table {}", table_name);

let session_mgr = SESSION_MANAGER
Expand Down Expand Up @@ -109,11 +109,15 @@ async fn get_new_table_plan(
)
.await?;

Ok(ReplaceTablePlan {
table: Some(table),
Ok(ReplaceJobPlan {
replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
replace_job_plan::ReplaceTable {
table: Some(table),
source: None, // none for cdc table
job_type: job_type as _,
},
)),
fragment_graph: Some(graph),
table_col_index_mapping: Some(col_index_mapping.to_protobuf()),
source: None, // none for cdc table
job_type: job_type as _,
})
}
6 changes: 3 additions & 3 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::{
alter_name_request, alter_set_schema_request, alter_swap_rename_request,
create_connection_request, DdlProgress, PbTableJobType, ReplaceTablePlan, TableJobType,
create_connection_request, DdlProgress, PbTableJobType, ReplaceJobPlan, TableJobType,
};
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
Expand Down Expand Up @@ -343,7 +343,7 @@ impl CatalogWriter for MockCatalogWriter {
&self,
sink: PbSink,
graph: StreamFragmentGraph,
_affected_table_change: Option<ReplaceTablePlan>,
_affected_table_change: Option<ReplaceJobPlan>,
_dependencies: HashSet<ObjectId>,
) -> Result<()> {
self.create_sink_inner(sink, graph)
Expand Down Expand Up @@ -485,7 +485,7 @@ impl CatalogWriter for MockCatalogWriter {
&self,
sink_id: u32,
cascade: bool,
_target_table_change: Option<ReplaceTablePlan>,
_target_table_change: Option<ReplaceJobPlan>,
) -> Result<()> {
if cascade {
return Err(ErrorCode::NotSupported(
Expand Down
Loading

0 comments on commit ffb51b8

Please sign in to comment.