Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: bind create source #16665

Merged
merged 8 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 133 additions & 83 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,21 @@ use risingwave_connector::source::{
POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR,
};
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::{
PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc,
};
use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
use risingwave_pb::plan_common::{EncodeType, FormatType};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_sqlparser::ast::{
get_delimiter, AstString, ColumnDef, ConnectorSchema, CreateSourceStatement, Encode, Format,
ProtobufSchema, SourceWatermark,
ObjectName, ProtobufSchema, SourceWatermark, TableConstraint,
};
use risingwave_sqlparser::parser::IncludeOption;
use thiserror_ext::AsReport;

use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::{DatabaseId, SchemaId};
use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
use crate::error::{Result, RwError};
use crate::expr::Expr;
Expand Down Expand Up @@ -1320,63 +1319,56 @@ pub fn bind_connector_props(
}
Ok(with_properties)
}

pub async fn handle_create_source(
#[allow(clippy::too_many_arguments)]
pub async fn bind_create_source(
handler_args: HandlerArgs,
stmt: CreateSourceStatement,
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();

if let Either::Right(resp) = session.check_relation_name_duplicated(
stmt.source_name.clone(),
StatementType::CREATE_SOURCE,
stmt.if_not_exists,
)? {
return Ok(resp);
}

let db_name = session.database();
let (schema_name, name) = Binder::resolve_schema_qualified_name(db_name, stmt.source_name)?;
full_name: ObjectName,
source_schema: ConnectorSchema,
with_properties: HashMap<String, String>,
sql_columns_defs: &[ColumnDef],
constraints: Vec<TableConstraint>,
wildcard_idx: Option<usize>,
source_watermarks: Vec<SourceWatermark>,
columns_from_resolve_source: Option<Vec<ColumnCatalog>>,
source_info: StreamSourceInfo,
include_column_options: IncludeOption,
col_id_gen: &mut ColumnIdGenerator,
// `true` for "create source", `false` for "create table with connector"
is_create_source: bool,
) -> Result<(SourceCatalog, DatabaseId, SchemaId)> {
let session = &handler_args.session;
let db_name: &str = session.database();
let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, full_name)?;
let (database_id, schema_id) =
session.get_database_and_schema_id_for_create(schema_name.clone())?;

if handler_args.with_options.is_empty() {
return Err(RwError::from(InvalidInputSyntax(
"missing WITH clause".to_string(),
)));
if !is_create_source && with_properties.is_iceberg_connector() {
return Err(
ErrorCode::BindError("can't create table with iceberg connector".to_string()).into(),
);
}

let source_schema = stmt.source_schema.into_v2_with_warning();

let with_properties = bind_connector_props(&handler_args, &source_schema, true)?;
ensure_table_constraints_supported(&stmt.constraints)?;
let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?;

let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
let is_shared = create_cdc_source_job
|| (with_properties.is_kafka_connector() && session.config().rw_enable_shared_source());
ensure_table_constraints_supported(&constraints)?;
let sql_pk_names = bind_sql_pk_names(sql_columns_defs, &constraints)?;

let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &source_schema, &with_properties)?
} else {
bind_columns_from_source(&session, &source_schema, &with_properties).await?
};
if is_shared {
// Note: this field should be called is_shared. Check field doc for more details.
source_info.cdc_source_job = true;
source_info.is_distributed = !create_cdc_source_job;
}
let columns_from_sql = bind_sql_columns(&stmt.columns)?;
let columns_from_sql = bind_sql_columns(sql_columns_defs)?;

let mut columns = bind_all_columns(
&source_schema,
columns_from_resolve_source,
columns_from_sql,
&stmt.columns,
stmt.wildcard_idx,
sql_columns_defs,
wildcard_idx,
)?;

// add additional columns before bind pk, because `format upsert` requires the key column
handle_addition_columns(&with_properties, stmt.include_column_options, &mut columns)?;
handle_addition_columns(&with_properties, include_column_options, &mut columns)?;
// compatible with the behavior that add a hidden column `_rw_kafka_timestamp` to each message from Kafka source
if is_create_source {
// must behind `handle_addition_columns`
check_and_add_timestamp_column(&with_properties, &mut columns);
}

let pk_names = bind_source_pk(
&source_schema,
&source_info,
Expand All @@ -1386,78 +1378,136 @@ pub async fn handle_create_source(
)
.await?;

// must behind `handle_addition_columns`
check_and_add_timestamp_column(&with_properties, &mut columns);

let mut col_id_gen = ColumnIdGenerator::new_initial();
for c in &mut columns {
c.column_desc.column_id = col_id_gen.generate(c.name())
}

if !pk_names.is_empty() {
if is_create_source && !pk_names.is_empty() {
return Err(ErrorCode::InvalidInputSyntax(
"Source does not support PRIMARY KEY constraint, please use \"CREATE TABLE\" instead"
.to_owned(),
)
.into());
}
let (mut columns, pk_column_ids, row_id_index) =
bind_pk_on_relation(columns, pk_names, with_properties.connector_need_pk())?;

for c in &mut columns {
c.column_desc.column_id = col_id_gen.generate(c.name())
}
debug_assert!(is_column_ids_dedup(&columns));

let (mut columns, pk_col_ids, row_id_index) = bind_pk_on_relation(columns, pk_names, true)?;

let watermark_descs =
bind_source_watermark(&session, name.clone(), stmt.source_watermarks, &columns)?;
bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
// TODO(yuhao): allow multiple watermark on source.
assert!(watermark_descs.len() <= 1);

bind_sql_column_constraints(
&session,
name.clone(),
session,
source_name.clone(),
&mut columns,
stmt.columns,
&pk_column_ids,
// TODO(st1page): pass the ref
sql_columns_defs.to_vec(),
&pk_col_ids,
)?;

check_source_schema(&with_properties, row_id_index, &columns).await?;

let pk_column_ids = pk_column_ids.into_iter().map(Into::into).collect();

let mut with_options = WithOptions::new(with_properties);
// resolve privatelink connection for Kafka source
// resolve privatelink connection for Kafka
let mut with_properties = WithOptions::new(with_properties);
let connection_id =
resolve_privatelink_in_with_option(&mut with_options, &schema_name, &session)?;
let definition = handler_args.normalized_sql.clone();
resolve_privatelink_in_with_option(&mut with_properties, &schema_name, session)?;

let definition: String = handler_args.normalized_sql.clone();

let source = PbSource {
let associated_table_id = if is_create_source {
None
} else {
Some(TableId::placeholder())
};
let source = SourceCatalog {
id: TableId::placeholder().table_id,
schema_id,
database_id,
name,
row_id_index: row_id_index.map(|idx| idx as u32),
columns: columns.iter().map(|c| c.to_protobuf()).collect_vec(),
pk_column_ids,
with_properties: with_options.into_inner().into_iter().collect(),
info: Some(source_info),
name: source_name,
columns,
pk_col_ids,
append_only: row_id_index.is_some(),
owner: session.user_id(),
info: source_info,
row_id_index,
with_properties: with_properties.into_inner().into_iter().collect(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will try to refactor the with_properties in another PR.

watermark_descs,
associated_table_id,
definition,
connection_id,
initialized_at_epoch: None,
created_at_epoch: None,
optional_associated_table_id: None,
initialized_at_epoch: None,
version: INITIAL_SOURCE_VERSION_ID,
initialized_at_cluster_version: None,
created_at_cluster_version: None,
initialized_at_cluster_version: None,
};
Ok((source, database_id, schema_id))
}

pub async fn handle_create_source(
handler_args: HandlerArgs,
stmt: CreateSourceStatement,
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();

if let Either::Right(resp) = session.check_relation_name_duplicated(
stmt.source_name.clone(),
StatementType::CREATE_SOURCE,
stmt.if_not_exists,
)? {
return Ok(resp);
}

if handler_args.with_options.is_empty() {
return Err(RwError::from(InvalidInputSyntax(
"missing WITH clause".to_string(),
)));
}

let source_schema = stmt.source_schema.into_v2_with_warning();
let with_properties = bind_connector_props(&handler_args, &source_schema, true)?;

let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
let is_shared = create_cdc_source_job
|| (with_properties.is_kafka_connector() && session.config().rw_enable_shared_source());

let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &source_schema, &with_properties)?
} else {
bind_columns_from_source(&session, &source_schema, &with_properties).await?
};
if is_shared {
// Note: this field should be called is_shared. Check field doc for more details.
source_info.cdc_source_job = true;
source_info.is_distributed = !create_cdc_source_job;
}
let mut col_id_gen = ColumnIdGenerator::new_initial();

let (source_catalog, database_id, schema_id) = bind_create_source(
handler_args.clone(),
stmt.source_name,
source_schema,
with_properties,
&stmt.columns,
stmt.constraints,
stmt.wildcard_idx,
stmt.source_watermarks,
columns_from_resolve_source,
source_info,
stmt.include_column_options,
&mut col_id_gen,
true,
)
.await?;

let source = source_catalog.to_prost(schema_id, database_id);

let catalog_writer = session.catalog_writer()?;

if is_shared {
let graph = {
let context = OptimizerContext::from_handler_args(handler_args);
let source_node = LogicalSource::with_catalog(
Rc::new(SourceCatalog::from(&source)),
Rc::new(source_catalog),
SourceNodeKind::CreateSharedSource,
context.into(),
None,
Expand Down
Loading
Loading