Skip to content

Commit

Permalink
feat: add kafka backfill frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Mar 5, 2024
1 parent 5f403fd commit cefa584
Show file tree
Hide file tree
Showing 41 changed files with 1,129 additions and 252 deletions.
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ d70dba827c303373f3220c9733f7c7443e5c2d37

# chore: cargo +nightly fmt (#13162) (format let-chains)
c583e2c6c054764249acf484438c7bf7197765f4

# chore: replace all ProstXxx with PbXxx (#8621)
6fd8821f2e053957b183d648bea9c95b6703941f
13 changes: 10 additions & 3 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,16 @@ message StreamSourceInfo {
SchemaRegistryNameStrategy name_strategy = 10;
optional string key_message_name = 11;
plan_common.ExternalTableDesc external_table = 12;
// Whether the stream source is a cdc source streaming job.
// We need this field to differentiate the cdc source job until we fully implement risingwavelabs/rfcs#72.
bool cdc_source_job = 13;
// Whether the stream source has a streaming job.
// This is related with [RFC: Reusable Source Executor](https://github.com/risingwavelabs/rfcs/pull/72).
// Currently, the following sources have streaming jobs:
// - Direct CDC sources (mysql & postgresql)
// - MQ sources (Kafka, Pulsar, Kinesis, etc.)
bool has_streaming_job = 13;
// Only used when `has_streaming_job` is `true`.
// If `false`, `requires_singleton` will be set in the stream plan.
bool is_distributed = 15;
reserved "cdc_source_job"; // deprecated
// Options specified by user in the FORMAT ENCODE clause.
map<string, string> format_encode_options = 14;
}
Expand Down
3 changes: 2 additions & 1 deletion src/common/src/monitor/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, Registry,
};
use thiserror_ext::AsReport;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tonic::transport::{Channel, Endpoint};
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -549,7 +550,7 @@ impl<L> tonic::transport::server::Router<L> {
config.tcp_nodelay,
config.keepalive_duration,
)
.unwrap();
.unwrap_or_else(|err| panic!("failed to connect to {listen_addr}: {}", err.as_report()));
let incoming = MonitoredConnection::new(
incoming,
MonitorNewConnectionImpl {
Expand Down
26 changes: 26 additions & 0 deletions src/common/src/util/iter_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,29 @@ where
{
a.into_iter().zip_eq_fast(b)
}

pub trait IntoIteratorExt
where
for<'a> &'a Self: IntoIterator,
{
/// Shorter version of `self.iter().map(f).collect()`.
fn map_collect<A, B, F, BCollection>(&self, f: F) -> BCollection
where
F: FnMut(&A) -> B,
for<'a> &'a Self: IntoIterator<Item = &'a A>,
BCollection: FromIterator<B>,
{
self.into_iter().map(f).collect()
}

/// Shorter version of `self.iter().map(f).collect_vec()`.
fn map_to_vec<A, B, F>(&self, f: F) -> Vec<B>
where
F: FnMut(&A) -> B,
for<'a> &'a Self: IntoIterator<Item = &'a A>,
{
self.map_collect(f)
}
}

impl<T> IntoIteratorExt for T where for<'a> &'a Self: IntoIterator {}
3 changes: 3 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ pub fn visit_stream_node_tables_inner<F>(
always!(source.state_table, "FsFetch");
}
}
NodeBody::SourceBackfill(node) => {
always!(node.state_table, "SourceBackfill")
}

// Sink
NodeBody::Sink(node) => {
Expand Down
61 changes: 61 additions & 0 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,67 @@ pub fn build_additional_column_catalog(
Ok(catalog)
}

pub fn add_partition_offset_cols(
columns: &[ColumnCatalog],
connector_name: &str,
) -> ([bool; 2], [ColumnCatalog; 2]) {
let mut columns_exist = [false; 2];
let mut last_column_id = columns
.iter()
.map(|c| c.column_desc.column_id)
.max()
.unwrap_or(ColumnId::placeholder());

let additional_columns: Vec<_> = {
let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS
.get(&*connector_name)
.unwrap_or(&COMMON_COMPATIBLE_ADDITIONAL_COLUMNS);
["partition", "file", "offset"]
.iter()
.filter_map(|col_type| {
last_column_id = last_column_id.next();
if compat_col_types.contains(col_type) {
Some(
build_additional_column_catalog(
last_column_id,
&connector_name,
col_type,
None,
None,
None,
false,
)
.unwrap(),
)
} else {
None
}
})
.collect()
};
assert_eq!(additional_columns.len(), 2);

// Check if partition/file/offset columns are included explicitly.
for col in columns {
use risingwave_pb::plan_common::additional_column::ColumnType;
match col.column_desc.additional_column {
AdditionalColumn {
column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
} => {
columns_exist[0] = true;
}
AdditionalColumn {
column_type: Some(ColumnType::Offset(_)),
} => {
columns_exist[1] = true;
}
_ => (),
}
}

(columns_exist, additional_columns.try_into().unwrap())
}

fn build_header_catalog(
column_id: ColumnId,
col_name: &str,
Expand Down
4 changes: 4 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ pub trait SourceProperties: TryFromHashmap + Clone + WithOptions {
type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;

/// Load additional info from `PbSource`. Currently only used by CDC.
fn init_from_pb_source(&mut self, _source: &PbSource) {}

/// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
}

Expand Down Expand Up @@ -443,10 +445,12 @@ impl ConnectorProperties {
matches!(self, ConnectorProperties::Kinesis(_))
}

/// Load additional info from `PbSource`. Currently only used by CDC.
pub fn init_from_pb_source(&mut self, source: &PbSource) {
dispatch_source_prop!(self, prop, prop.init_from_pb_source(source))
}

/// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
dispatch_source_prop!(self, prop, prop.init_from_pb_cdc_table_desc(cdc_table_desc))
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ where
};
self.table_schema = table_schema;
if let Some(info) = source.info.as_ref() {
self.is_multi_table_shared = info.cdc_source_job;
self.is_multi_table_shared = info.has_streaming_job;
}
}

Expand Down
74 changes: 12 additions & 62 deletions src/connector/src/source/reader/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,18 @@
use std::collections::HashMap;
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::catalog::{ColumnDesc, ColumnId};
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::catalog::PbStreamSourceInfo;
use risingwave_pb::plan_common::additional_column::ColumnType;
use risingwave_pb::plan_common::{AdditionalColumn, PbColumnCatalog};
use risingwave_pb::plan_common::PbColumnCatalog;

#[expect(deprecated)]
use super::fs_reader::FsSourceReader;
use super::reader::SourceReader;
use crate::error::ConnectorResult;
use crate::parser::additional_columns::{
build_additional_column_catalog, COMMON_COMPATIBLE_ADDITIONAL_COLUMNS,
COMPATIBLE_ADDITIONAL_COLUMNS,
};
use crate::parser::additional_columns::add_partition_offset_cols;
use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig};
use crate::source::monitor::SourceMetrics;
use crate::source::{SourceColumnDesc, SourceColumnType, UPSTREAM_SOURCE_KEY};
Expand Down Expand Up @@ -94,65 +91,18 @@ impl SourceDescBuilder {
/// This function builds `SourceColumnDesc` from `ColumnCatalog`, and handle the creation
/// of hidden columns like partition/file, offset that are not specified by user.
pub fn column_catalogs_to_source_column_descs(&self) -> Vec<SourceColumnDesc> {
let mut columns_exist = [false; 2];
let mut last_column_id = self
.columns
.iter()
.map(|c| c.column_desc.as_ref().unwrap().column_id.into())
.max()
.unwrap_or(ColumnId::placeholder());
let connector_name = self
.with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.to_lowercase())
.unwrap();

let additional_columns: Vec<_> = {
let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS
.get(&*connector_name)
.unwrap_or(&COMMON_COMPATIBLE_ADDITIONAL_COLUMNS);
["partition", "file", "offset"]
.iter()
.filter_map(|col_type| {
last_column_id = last_column_id.next();
if compat_col_types.contains(col_type) {
Some(
build_additional_column_catalog(
last_column_id,
&connector_name,
col_type,
None,
None,
None,
false,
)
.unwrap()
.to_protobuf(),
)
} else {
None
}
})
.collect()
};
assert_eq!(additional_columns.len(), 2);

// Check if partition/file/offset columns are included explicitly.
for col in &self.columns {
match col.column_desc.as_ref().unwrap().get_additional_column() {
Ok(AdditionalColumn {
column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
}) => {
columns_exist[0] = true;
}
Ok(AdditionalColumn {
column_type: Some(ColumnType::Offset(_)),
}) => {
columns_exist[1] = true;
}
_ => (),
}
}
let columns = self
.columns
.iter()
.map(|c| ColumnCatalog::from(c.clone()))
.collect_vec();
let (columns_exist, additional_columns) =
add_partition_offset_cols(&columns, &connector_name);

let mut columns: Vec<_> = self
.columns
Expand All @@ -163,7 +113,7 @@ impl SourceDescBuilder {
for (existed, c) in columns_exist.iter().zip_eq_fast(&additional_columns) {
if !existed {
columns.push(SourceColumnDesc::hidden_addition_col_from_column_desc(
&ColumnDesc::from(c.column_desc.as_ref().unwrap()),
&c.column_desc,
));
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ impl From<&SourceCatalog> for BoundSource {
}
}

impl BoundSource {
pub fn can_backfill(&self) -> bool {
self.catalog.info.has_streaming_job
}
}

impl Binder {
/// Binds table or source, or logical view according to what we get from the catalog.
pub fn bind_relation_by_name_inner(
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_pb::catalog::{PbSource, StreamSourceInfo, WatermarkDesc};

use super::{ColumnId, ConnectionId, DatabaseId, OwnedByUserCatalog, SchemaId, SourceId};
use crate::catalog::TableId;
use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
use crate::user::UserId;

/// This struct `SourceCatalog` is used in frontend.
Expand Down Expand Up @@ -83,6 +84,13 @@ impl SourceCatalog {
pub fn version(&self) -> SourceVersionId {
self.version
}

pub fn connector_name(&self) -> String {
self.with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.to_lowercase())
.unwrap()
}
}

impl From<&PbSource> for SourceCatalog {
Expand Down
15 changes: 8 additions & 7 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ fn bind_columns_from_source_for_cdc(
row_encode: row_encode_to_prost(&source_schema.row_encode) as i32,
format_encode_options,
use_schema_registry: json_schema_infer_use_schema_registry(&schema_config),
cdc_source_job: true,
has_streaming_job: true,
..Default::default()
};
if !format_encode_options_to_consume.is_empty() {
Expand Down Expand Up @@ -1305,18 +1305,22 @@ pub async fn handle_create_source(
ensure_table_constraints_supported(&stmt.constraints)?;
let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?;

// gated the feature with a session variable
let create_cdc_source_job = if is_cdc_connector(&with_properties) {
CdcTableType::from_properties(&with_properties).can_backfill()
} else {
false
};
let has_streaming_job = create_cdc_source_job || is_kafka_connector(&with_properties);

let (columns_from_resolve_source, source_info) = if create_cdc_source_job {
let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &source_schema, &with_properties)?
} else {
bind_columns_from_source(&session, &source_schema, &with_properties).await?
};
if has_streaming_job {
source_info.has_streaming_job = true;
source_info.is_distributed = !create_cdc_source_job;
}
let columns_from_sql = bind_sql_columns(&stmt.columns)?;

let mut columns = bind_all_columns(
Expand Down Expand Up @@ -1413,18 +1417,15 @@ pub async fn handle_create_source(

let catalog_writer = session.catalog_writer()?;

if create_cdc_source_job {
// create a streaming job for the cdc source, which will mark as *singleton* in the Fragmenter
if has_streaming_job {
let graph = {
let context = OptimizerContext::from_handler_args(handler_args);
// cdc source is an append-only source in plain json format
let source_node = LogicalSource::with_catalog(
Rc::new(SourceCatalog::from(&source)),
SourceNodeKind::CreateSourceWithStreamjob,
context.into(),
)?;

// generate stream graph for cdc source job
let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
let mut graph = build_graph(stream_plan)?;
graph.parallelism =
Expand Down
Loading

0 comments on commit cefa584

Please sign in to comment.