Skip to content

Commit

Permalink
connection dep for sink
Browse files Browse the repository at this point in the history
  • Loading branch information
tabversion committed Nov 15, 2024
1 parent 1da1d1a commit 403868e
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 48 deletions.
1 change: 1 addition & 0 deletions src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ fn mock_from_legacy_type(
options: Default::default(),
secret_refs: Default::default(),
key_encode: None,
connection_id: None,
}))
} else {
SinkFormatDesc::from_legacy_type(connector, r#type)
Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ pub struct SinkFormatDesc {
pub options: BTreeMap<String, String>,
pub secret_refs: BTreeMap<String, PbSecretRef>,
pub key_encode: Option<SinkEncode>,
pub connection_id: Option<u32>,
}

/// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`].
Expand Down Expand Up @@ -188,6 +189,7 @@ impl SinkFormatDesc {
options: Default::default(),
secret_refs: Default::default(),
key_encode: None,
connection_id: None,
}))
}

Expand Down Expand Up @@ -223,7 +225,7 @@ impl SinkFormatDesc {
options,
key_encode,
secret_refs: self.secret_refs.clone(),
connection_id: None,
connection_id: self.connection_id,
}
}

Expand All @@ -236,6 +238,7 @@ impl SinkFormatDesc {
options: Default::default(),
secret_refs: Default::default(),
key_encode: None,
connection_id: None,
}
}
}
Expand Down Expand Up @@ -300,6 +303,7 @@ impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
options: value.options,
key_encode,
secret_refs: value.secret_refs,
connection_id: value.connection_id,
})
}
}
Expand Down
54 changes: 43 additions & 11 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ use std::sync::{Arc, LazyLock};
use anyhow::Context;
use either::Either;
use itertools::Itertools;
use maplit::{convert_args, hashmap};
use maplit::{convert_args, hashmap, hashset};
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::catalog::{ColumnCatalog, DatabaseId, Schema, SchemaId, TableId, UserId};
use risingwave_common::catalog::{
ColumnCatalog, ConnectionId, DatabaseId, Schema, SchemaId, TableId, UserId,
};
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::types::DataType;
use risingwave_common::{bail, catalog};
Expand All @@ -32,6 +34,8 @@ use risingwave_connector::sink::kafka::KAFKA_SINK;
use risingwave_connector::sink::{
CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL,
};
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::connection_params::PbConnectionType;
use risingwave_pb::catalog::{PbSink, PbSource, Table};
use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType};
use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
Expand All @@ -57,7 +61,9 @@ use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
use crate::handler::create_mv::parse_column_names;
use crate::handler::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator};
use crate::handler::privilege::resolve_query_privileges;
use crate::handler::util::SourceSchemaCompatExt;
use crate::handler::util::{
check_connector_match_connection_type, ensure_connection_type_allowed, SourceSchemaCompatExt,
};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::{
generic, IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamProject,
Expand All @@ -66,9 +72,24 @@ use crate::optimizer::{OptimizerContext, PlanRef, RelationCollectorVisitor};
use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
use crate::session::SessionImpl;
use crate::stream_fragmenter::build_graph;
use crate::utils::{resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options};
use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};

static ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> = LazyLock::new(|| {
hashset! {
PbConnectionType::Kafka,
PbConnectionType::Iceberg,
}
});

static ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
LazyLock::new(|| {
hashset! {
PbConnectionType::Unspecified,
PbConnectionType::SchemaRegistry,
}
});

// used to store result of `gen_sink_plan`
pub struct SinkPlanContext {
pub query: Box<Query>,
Expand All @@ -92,7 +113,15 @@ pub async fn gen_sink_plan(
let mut with_options = handler_args.with_options.clone();

resolve_privatelink_in_with_option(&mut with_options)?;
let mut resolved_with_options = resolve_secret_ref_in_with_options(with_options, session)?;
let (mut resolved_with_options, connection_type, connector_conn_ref) =
resolve_connection_ref_and_secret_ref(with_options, session)?;
ensure_connection_type_allowed(connection_type, &ALLOWED_CONNECTION_CONNECTOR)?;

// if not using connection, we don't need to check connector match connection type
if !matches!(connection_type, PbConnectionType::Unspecified) {
let connector = resolved_with_options.get_connector().unwrap();
check_connector_match_connection_type(connector.as_str(), &connection_type)?;
}

let partition_info = get_partition_compute_info(&resolved_with_options).await?;

Expand Down Expand Up @@ -260,7 +289,7 @@ pub async fn gen_sink_plan(
SchemaId::new(sink_schema_id),
DatabaseId::new(sink_database_id),
UserId::new(session.user_id()),
None, // deprecated: private link connection id
connector_conn_ref.map(ConnectionId::from),
dependent_relations.into_iter().collect_vec(),
);

Expand Down Expand Up @@ -852,11 +881,13 @@ fn bind_sink_format_desc(
}
}

let (mut options, secret_refs) = resolve_secret_ref_in_with_options(
WithOptions::try_from(value.row_options.as_slice())?,
session,
)?
.into_parts();
let (props, connection_type_flag, schema_registry_conn_ref) =
resolve_connection_ref_and_secret_ref(
WithOptions::try_from(value.row_options.as_slice())?,
session,
)?;
ensure_connection_type_allowed(connection_type_flag, &ALLOWED_CONNECTION_SCHEMA_REGISTRY)?;
let (mut options, secret_refs) = props.into_parts();

options
.entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
Expand All @@ -868,6 +899,7 @@ fn bind_sink_format_desc(
options,
secret_refs,
key_encode,
connection_id: schema_registry_conn_ref,
})
}

Expand Down
53 changes: 19 additions & 34 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ use crate::handler::create_table::{
bind_pk_and_row_id_on_relation, bind_sql_column_constraints, bind_sql_columns,
bind_sql_pk_names, bind_table_constraints, ColumnIdGenerator,
};
use crate::handler::util::SourceSchemaCompatExt;
use crate::handler::util::{
check_connector_match_connection_type, ensure_connection_type_allowed, SourceSchemaCompatExt,
};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
Expand Down Expand Up @@ -311,7 +313,7 @@ pub(crate) async fn bind_columns_from_source(

let options_with_secret = match with_properties {
Either::Left(options) => {
let (sec_resolve_props, connection_type) =
let (sec_resolve_props, connection_type, _) =
resolve_connection_ref_and_secret_ref(options.clone(), session)?;
if !ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) {
return Err(RwError::from(ProtocolError(format!(
Expand All @@ -328,16 +330,12 @@ pub(crate) async fn bind_columns_from_source(
let is_kafka: bool = options_with_secret.is_kafka_connector();

// todo: need to resolve connection ref for schema registry
let (sec_resolve_props, connection_type) = resolve_connection_ref_and_secret_ref(
WithOptions::try_from(format_encode.row_options())?,
session,
)?;
if !ALLOWED_CONNECTION_SCHEMA_REGISTRY.contains(&connection_type) {
return Err(RwError::from(ProtocolError(format!(
"connection type {:?} is not allowed, allowed types: {:?}",
connection_type, ALLOWED_CONNECTION_SCHEMA_REGISTRY
))));
}
let (sec_resolve_props, connection_type, schema_registry_conn_ref) =
resolve_connection_ref_and_secret_ref(
WithOptions::try_from(format_encode.row_options())?,
session,
)?;
ensure_connection_type_allowed(connection_type, &ALLOWED_CONNECTION_SCHEMA_REGISTRY)?;

let (format_encode_options, format_encode_secret_refs) = sec_resolve_props.into_parts();
// Need real secret to access the schema registry
Expand Down Expand Up @@ -372,6 +370,7 @@ pub(crate) async fn bind_columns_from_source(
row_encode: row_encode_to_prost(&format_encode.row_encode) as i32,
format_encode_options,
format_encode_secret_refs,
connection_id: schema_registry_conn_ref,
..Default::default()
};

Expand Down Expand Up @@ -1601,20 +1600,14 @@ pub async fn bind_create_source_or_table_with_connector(
let mut with_properties = with_properties;
resolve_privatelink_in_with_option(&mut with_properties)?;

let connector = with_properties.get_connector().unwrap();
let (with_properties, connection_type) =
let (with_properties, connection_type, connector_conn_ref) =
resolve_connection_ref_and_secret_ref(with_properties, session)?;
if !ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) {
return Err(RwError::from(ProtocolError(format!(
"connection type {:?} is not allowed, allowed types: {:?}",
connection_type, ALLOWED_CONNECTION_CONNECTOR
))));
}
if !connector.eq(connection_type_to_connector(&connection_type)) {
return Err(RwError::from(ProtocolError(format!(
"connector {} and connection type {:?} are not compatible",
connector, connection_type
))));
ensure_connection_type_allowed(connection_type, &ALLOWED_CONNECTION_CONNECTOR)?;

// if not using connection, we don't need to check connector match connection type
if !matches!(connection_type, PbConnectionType::Unspecified) {
let connector = with_properties.get_connector().unwrap();
check_connector_match_connection_type(connector.as_str(), &connection_type)?;
}

let pk_names = bind_source_pk(
Expand Down Expand Up @@ -1689,7 +1682,7 @@ pub async fn bind_create_source_or_table_with_connector(
watermark_descs,
associated_table_id,
definition,
connection_id: None, // deprecated: private link connection id
connection_id: connector_conn_ref,
created_at_epoch: None,
initialized_at_epoch: None,
version: INITIAL_SOURCE_VERSION_ID,
Expand Down Expand Up @@ -1826,14 +1819,6 @@ fn row_encode_to_prost(row_encode: &Encode) -> EncodeType {
}
}

fn connection_type_to_connector(connection_type: &PbConnectionType) -> &str {
match connection_type {
PbConnectionType::Kafka => KAFKA_CONNECTOR,
PbConnectionType::Iceberg => ICEBERG_CONNECTOR,
_ => unreachable!(),
}
}

#[cfg(test)]
pub mod tests {
use std::collections::HashMap;
Expand Down
41 changes: 40 additions & 1 deletion src/frontend/src/handler/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,19 @@ use risingwave_common::types::{
};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
use risingwave_connector::source::KAFKA_CONNECTOR;
use risingwave_pb::catalog::connection_params::PbConnectionType;
use risingwave_sqlparser::ast::{
CompatibleFormatEncode, Expr, FormatEncodeOptions, Ident, ObjectName, OrderByExpr, Query,
Select, SelectItem, SetExpr, TableFactor, TableWithJoins,
};
use thiserror_ext::AsReport;

use crate::error::{ErrorCode, Result as RwResult};
use crate::error::ErrorCode::ProtocolError;
use crate::error::{ErrorCode, Result as RwResult, RwError};
use crate::session::{current, SessionImpl};
use crate::HashSet;

pin_project! {
/// Wrapper struct that converts a stream of DataChunk to a stream of RowSet based on formatting
Expand Down Expand Up @@ -271,6 +276,40 @@ pub fn convert_interval_to_u64_seconds(interval: &String) -> RwResult<u64> {
Ok(seconds)
}

pub fn ensure_connection_type_allowed(
connection_type: PbConnectionType,
allowed_types: &HashSet<PbConnectionType>,
) -> RwResult<()> {
if !allowed_types.contains(&connection_type) {
return Err(RwError::from(ProtocolError(format!(
"connection type {:?} is not allowed, allowed types: {:?}",
connection_type, allowed_types
))));
}
Ok(())
}

fn connection_type_to_connector(connection_type: &PbConnectionType) -> &str {
match connection_type {
PbConnectionType::Kafka => KAFKA_CONNECTOR,
PbConnectionType::Iceberg => ICEBERG_CONNECTOR,
_ => unreachable!(),
}
}

pub fn check_connector_match_connection_type(
connector: &str,
connection_type: &PbConnectionType,
) -> RwResult<()> {
if !connector.eq(connection_type_to_connector(connection_type)) {
return Err(RwError::from(ProtocolError(format!(
"connector {} and connection type {:?} are not compatible",
connector, connection_type
))));
}
Ok(())
}

#[cfg(test)]
mod tests {
use postgres_types::{ToSql, Type};
Expand Down
9 changes: 8 additions & 1 deletion src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,11 @@ impl WithOptions {
pub(crate) fn resolve_connection_ref_and_secret_ref(
with_options: WithOptions,
session: &SessionImpl,
) -> RwResult<(WithOptionsSecResolved, PbConnectionType)> {
) -> RwResult<(WithOptionsSecResolved, PbConnectionType, Option<u32>)> {
let db_name: &str = session.database();
let (mut options, secret_refs, connection_refs) = with_options.clone().into_parts();

let mut connection_id = None;
let mut connection_params = None;
for connection_ref in connection_refs.values() {
// at most one connection ref in the map
Expand All @@ -202,6 +203,7 @@ pub(crate) fn resolve_connection_ref_and_secret_ref(
let connection_catalog =
session.get_connection_by_name(schema_name, &connection_name)?;
if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
connection_id = Some(connection_catalog.id);
Some(params.clone())
} else {
return Err(RwError::from(ErrorCode::InvalidParameterValue(
Expand Down Expand Up @@ -232,6 +234,7 @@ pub(crate) fn resolve_connection_ref_and_secret_ref(
};

let mut connection_type = PbConnectionType::Unspecified;
let connection_params_none_flag = connection_params.is_none();
if let Some(connection_params) = connection_params {
connection_type = connection_params.connection_type();
for (k, v) in connection_params.properties {
Expand All @@ -252,10 +255,14 @@ pub(crate) fn resolve_connection_ref_and_secret_ref(
}
}
}
debug_assert!(
matches!(connection_type, PbConnectionType::Unspecified) && connection_params_none_flag
);

Ok((
WithOptionsSecResolved::new(options, inner_secret_refs),
connection_type,
connection_id,
))
}

Expand Down

0 comments on commit 403868e

Please sign in to comment.