Skip to content

Commit

Permalink
rename cdc_source_job -> has_streaming_job
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jan 9, 2024
1 parent 1e34118 commit 0b4e940
Show file tree
Hide file tree
Showing 19 changed files with 861 additions and 25 deletions.
13 changes: 10 additions & 3 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,16 @@ message StreamSourceInfo {
SchemaRegistryNameStrategy name_strategy = 10;
optional string key_message_name = 11;
plan_common.ExternalTableDesc external_table = 12;
// Whether the stream source is a cdc source streaming job.
// We need this field to differentiate the cdc source job until we fully implement risingwavelabs/rfcs#72.
bool cdc_source_job = 13;
// Whether the stream source has a streaming job.
// This is related with [RFC: Reusable Source Executor](https://github.com/risingwavelabs/rfcs/pull/72).
// Currently, the following sources have streaming jobs:
// - Direct CDC sources (mysql & postgresql)
// - MQ sources (Kafka, Pulsar, Kinesis, etc.)
bool has_streaming_job = 13;
// Only used when `has_streaming_job` is `true`.
// If `false`, `requires_singleton` will be set in the stream plan.
bool is_distributed = 15;
reserved "cdc_source_job"; // deprecated
// Options specified by user in the FORMAT ENCODE clause.
map<string, string> format_encode_options = 14;
}
Expand Down
2 changes: 1 addition & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ enum TableJobType {
// table streaming jobs excepts the `SHARED_CDC_SOURCE` type
TABLE_JOB_TYPE_GENERAL = 1;
// table streaming job sharing a CDC source job
TABLE_JOB_TYPE_SHARED_CDC_SOURCE = 2;
TABLE_JOB_TYPE_SOURCE = 2;
}

message CreateTableRequest {
Expand Down
1 change: 1 addition & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,7 @@ message StreamNode {
StreamFsFetchNode stream_fs_fetch = 138;
StreamCdcScanNode stream_cdc_scan = 139;
CdcFilterNode cdc_filter = 140;
SourceNode source_backfill = 141;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ pub fn visit_stream_node_tables_inner<F>(
always!(source.state_table, "FsFetch");
}
}
NodeBody::SourceBackfill(node) => {
always!(
node.source_inner.as_mut().unwrap().state_table,
"SourceBackfill"
)
}

// Sink
NodeBody::Sink(node) => {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
};
self.table_schema = table_schema;
if let Some(info) = source.info.as_ref() {
self.is_multi_table_shared = info.cdc_source_job;
self.is_multi_table_shared = info.has_streaming_job;
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ impl From<&SourceCatalog> for BoundSource {
}
}

impl BoundSource {
pub fn can_backfill(&self) -> bool {
self.catalog.info.has_streaming_job
}
}

impl Binder {
/// Binds table or source, or logical view according to what we get from the catalog.
pub fn bind_relation_by_name_inner(
Expand Down
17 changes: 9 additions & 8 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ fn bind_columns_from_source_for_cdc(
row_encode: row_encode_to_prost(&source_schema.row_encode) as i32,
format_encode_options,
use_schema_registry: json_schema_infer_use_schema_registry(&schema_config),
cdc_source_job: true,
has_streaming_job: true,
..Default::default()
};
if !format_encode_options_to_consume.is_empty() {
Expand Down Expand Up @@ -1122,18 +1122,22 @@ pub async fn handle_create_source(
ensure_table_constraints_supported(&stmt.constraints)?;
let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?;

// gated the feature with a session variable
let create_cdc_source_job = if is_cdc_connector(&with_properties) {
CdcTableType::from_properties(&with_properties).can_backfill()
} else {
false
};
let has_streaming_job = create_cdc_source_job || is_kafka_connector(&with_properties);

let (columns_from_resolve_source, source_info) = if create_cdc_source_job {
let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &source_schema, &with_properties)?
} else {
bind_columns_from_source(&session, &source_schema, &with_properties).await?
};
if has_streaming_job {
source_info.has_streaming_job = true;
source_info.is_distributed = !create_cdc_source_job;
}
let columns_from_sql = bind_sql_columns(&stmt.columns)?;

let mut columns = bind_all_columns(
Expand Down Expand Up @@ -1226,21 +1230,18 @@ pub async fn handle_create_source(

let catalog_writer = session.catalog_writer()?;

if create_cdc_source_job {
// create a streaming job for the cdc source, which will mark as *singleton* in the Fragmenter
if has_streaming_job {
let graph = {
let context = OptimizerContext::from_handler_args(handler_args);
// cdc source is an append-only source in plain json format
let source_node = LogicalSource::new(
Some(Rc::new(SourceCatalog::from(&source))),
columns.clone(),
row_id_index,
false,
false, // Do not gen RowID. Gen RowID after backfill node instead.
false,
context.into(),
)?;

// generate stream graph for cdc source job
let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
let mut graph = build_graph(stream_plan);
graph.parallelism =
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,8 @@ impl ToStream for LogicalSource {
{
plan_prefix = Some(self.rewrite_new_s3_plan()?);
}

// TODO: after SourceBackfill is added, we shouldn't put generated columns/row id here, and put them after backfill instead.
plan = if self.core.for_table {
dispatch_new_s3_plan(self.rewrite_to_stream_batch_source(), plan_prefix)
} else {
Expand Down
Loading

0 comments on commit 0b4e940

Please sign in to comment.