Skip to content

Commit

Permalink
refactor create source
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page committed May 9, 2024
1 parent cb8a92a commit e219475
Showing 1 changed file with 132 additions and 84 deletions.
216 changes: 132 additions & 84 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,21 @@ use risingwave_connector::source::{
POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR,
};
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::{
PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc,
};
use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
use risingwave_pb::plan_common::{EncodeType, FormatType};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_sqlparser::ast::{
get_delimiter, AstString, ColumnDef, ConnectorSchema, CreateSourceStatement, Encode, Format,
ProtobufSchema, SourceWatermark,
ObjectName, ProtobufSchema, SourceWatermark, TableConstraint,
};
use risingwave_sqlparser::parser::IncludeOption;
use thiserror_ext::AsReport;

use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::{DatabaseId, SchemaId};
use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
use crate::error::{Result, RwError};
use crate::expr::Expr;
Expand Down Expand Up @@ -1320,63 +1319,56 @@ pub fn bind_connector_props(
}
Ok(with_properties)
}

pub async fn handle_create_source(
#[allow(clippy::too_many_arguments)]
pub async fn bind_create_source(
handler_args: HandlerArgs,
stmt: CreateSourceStatement,
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();

if let Either::Right(resp) = session.check_relation_name_duplicated(
stmt.source_name.clone(),
StatementType::CREATE_SOURCE,
stmt.if_not_exists,
)? {
return Ok(resp);
}

let db_name = session.database();
let (schema_name, name) = Binder::resolve_schema_qualified_name(db_name, stmt.source_name)?;
full_name: ObjectName,
source_schema: ConnectorSchema,
with_properties: HashMap<String, String>,
sql_columns_defs: &[ColumnDef],
constraints: Vec<TableConstraint>,
wildcard_idx: Option<usize>,
source_watermarks: Vec<SourceWatermark>,
columns_from_resolve_source: Option<Vec<ColumnCatalog>>,
source_info: StreamSourceInfo,
include_column_options: IncludeOption,
mut col_id_gen: ColumnIdGenerator,
// `true` for "create source", `false` for "create table with connector"
is_create_source: bool,
) -> Result<(SourceCatalog, DatabaseId, SchemaId)> {
let session = &handler_args.session;
let db_name: &str = session.database();
let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, full_name)?;
let (database_id, schema_id) =
session.get_database_and_schema_id_for_create(schema_name.clone())?;

if handler_args.with_options.is_empty() {
return Err(RwError::from(InvalidInputSyntax(
"missing WITH clause".to_string(),
)));
if !is_create_source && with_properties.is_iceberg_connector() {
return Err(
ErrorCode::BindError("can't create table with iceberg connector".to_string()).into(),
);
}

let source_schema = stmt.source_schema.into_v2_with_warning();
ensure_table_constraints_supported(&constraints)?;
let sql_pk_names = bind_sql_pk_names(sql_columns_defs, &constraints)?;

let with_properties = bind_connector_props(&handler_args, &source_schema, true)?;
ensure_table_constraints_supported(&stmt.constraints)?;
let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?;

let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
let is_shared = create_cdc_source_job
|| (with_properties.is_kafka_connector() && session.config().rw_enable_shared_source());

let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &source_schema, &with_properties)?
} else {
bind_columns_from_source(&session, &source_schema, &with_properties).await?
};
if is_shared {
// Note: this field should be called is_shared. Check field doc for more details.
source_info.cdc_source_job = true;
source_info.is_distributed = !create_cdc_source_job;
}
let columns_from_sql = bind_sql_columns(&stmt.columns)?;
let columns_from_sql = bind_sql_columns(sql_columns_defs)?;

let mut columns = bind_all_columns(
&source_schema,
columns_from_resolve_source,
columns_from_sql,
&stmt.columns,
stmt.wildcard_idx,
sql_columns_defs,
wildcard_idx,
)?;

// add additional columns before bind pk, because `format upsert` requires the key column
handle_addition_columns(&with_properties, stmt.include_column_options, &mut columns)?;
handle_addition_columns(&with_properties, include_column_options, &mut columns)?;
// compatible with the behavior that add a hidden column `_rw_kafka_timestamp` to each message from Kafka source
if is_create_source {
// must behind `handle_addition_columns`
check_and_add_timestamp_column(&with_properties, &mut columns);
}

let pk_names = bind_source_pk(
&source_schema,
&source_info,
Expand All @@ -1386,78 +1378,134 @@ pub async fn handle_create_source(
)
.await?;

// must behind `handle_addition_columns`
check_and_add_timestamp_column(&with_properties, &mut columns);

let mut col_id_gen = ColumnIdGenerator::new_initial();
for c in &mut columns {
c.column_desc.column_id = col_id_gen.generate(c.name())
}

if !pk_names.is_empty() {
if is_create_source && !pk_names.is_empty() {
return Err(ErrorCode::InvalidInputSyntax(
"Source does not support PRIMARY KEY constraint, please use \"CREATE TABLE\" instead"
.to_owned(),
)
.into());
}
let (mut columns, pk_column_ids, row_id_index) =
bind_pk_on_relation(columns, pk_names, with_properties.connector_need_pk())?;
let (mut columns, pk_col_ids, row_id_index) = bind_pk_on_relation(columns, pk_names, true)?;

for c in &mut columns {
c.column_desc.column_id = col_id_gen.generate(c.name())
}
debug_assert!(is_column_ids_dedup(&columns));

let watermark_descs =
bind_source_watermark(&session, name.clone(), stmt.source_watermarks, &columns)?;
bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
// TODO(yuhao): allow multiple watermark on source.
assert!(watermark_descs.len() <= 1);

bind_sql_column_constraints(
&session,
name.clone(),
session,
source_name.clone(),
&mut columns,
stmt.columns,
&pk_column_ids,
// TODO(st1page): pass the ref
sql_columns_defs.to_vec(),
&pk_col_ids,
)?;

check_source_schema(&with_properties, row_id_index, &columns).await?;

let pk_column_ids = pk_column_ids.into_iter().map(Into::into).collect();

let mut with_options = WithOptions::new(with_properties);
// resolve privatelink connection for Kafka source
// resolve privatelink connection for Kafka
let mut with_properties = WithOptions::new(with_properties);
let connection_id =
resolve_privatelink_in_with_option(&mut with_options, &schema_name, &session)?;
let definition = handler_args.normalized_sql.clone();
resolve_privatelink_in_with_option(&mut with_properties, &schema_name, session)?;

let source = PbSource {
let definition: String = handler_args.normalized_sql.clone();

let associated_table_id = if is_create_source {
None
} else {
Some(TableId::placeholder())
};
let source = SourceCatalog {
id: TableId::placeholder().table_id,
schema_id,
database_id,
name,
row_id_index: row_id_index.map(|idx| idx as u32),
columns: columns.iter().map(|c| c.to_protobuf()).collect_vec(),
pk_column_ids,
with_properties: with_options.into_inner().into_iter().collect(),
info: Some(source_info),
name: source_name,
columns,
pk_col_ids,
append_only: row_id_index.is_some(),
owner: session.user_id(),
info: source_info,
row_id_index,
with_properties: with_properties.into_inner().into_iter().collect(),
watermark_descs,
associated_table_id,
definition,
connection_id,
initialized_at_epoch: None,
created_at_epoch: None,
optional_associated_table_id: None,
initialized_at_epoch: None,
version: INITIAL_SOURCE_VERSION_ID,
initialized_at_cluster_version: None,
created_at_cluster_version: None,
initialized_at_cluster_version: None,
};
Ok((source, database_id, schema_id))
}

pub async fn handle_create_source(
handler_args: HandlerArgs,
stmt: CreateSourceStatement,
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();

if let Either::Right(resp) = session.check_relation_name_duplicated(
stmt.source_name.clone(),
StatementType::CREATE_SOURCE,
stmt.if_not_exists,
)? {
return Ok(resp);
}

if handler_args.with_options.is_empty() {
return Err(RwError::from(InvalidInputSyntax(
"missing WITH clause".to_string(),
)));
}

let source_schema = stmt.source_schema.into_v2_with_warning();
let with_properties = bind_connector_props(&handler_args, &source_schema, true)?;

let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
let is_shared = create_cdc_source_job
|| (with_properties.is_kafka_connector() && session.config().rw_enable_shared_source());

let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &source_schema, &with_properties)?
} else {
bind_columns_from_source(&session, &source_schema, &with_properties).await?
};
if is_shared {
// Note: this field should be called is_shared. Check field doc for more details.
source_info.cdc_source_job = true;
source_info.is_distributed = !create_cdc_source_job;
}
let col_id_gen = ColumnIdGenerator::new_initial();

let (source_catalog, database_id, schema_id) = bind_create_source(
handler_args.clone(),
stmt.source_name,
source_schema,
with_properties,
&stmt.columns,
stmt.constraints,
stmt.wildcard_idx,
stmt.source_watermarks,
columns_from_resolve_source,
source_info,
stmt.include_column_options,
col_id_gen,
true,
)
.await?;

let source = source_catalog.to_prost(schema_id, database_id);

let catalog_writer = session.catalog_writer()?;

if is_shared {
let graph = {
let context = OptimizerContext::from_handler_args(handler_args);
let source_node = LogicalSource::with_catalog(
Rc::new(SourceCatalog::from(&source)),
Rc::new(source_catalog),
SourceNodeKind::CreateSharedSource,
context.into(),
None,
Expand Down

0 comments on commit e219475

Please sign in to comment.