Skip to content

Commit

Permalink
refactor: introduce Get and WithPropertiesExt (#15632)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Mar 12, 2024
1 parent eada37a commit d527ba8
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 116 deletions.
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub mod mqtt_common;
pub use paste::paste;

mod with_options;
pub use with_options::WithPropertiesExt;

#[cfg(test)]
mod with_options_test;
Expand Down
13 changes: 1 addition & 12 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::anyhow;
Expand Down Expand Up @@ -331,17 +331,6 @@ impl Default for ConnectorProperties {
}

impl ConnectorProperties {
pub fn is_new_fs_connector_b_tree_map(with_properties: &BTreeMap<String, String>) -> bool {
with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|s| {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
|| s.eq_ignore_ascii_case(GCS_CONNECTOR)
})
.unwrap_or(false)
}

pub fn is_new_fs_connector_hash_map(with_properties: &HashMap<String, String>) -> bool {
with_properties
.get(UPSTREAM_SOURCE_KEY)
Expand Down
11 changes: 0 additions & 11 deletions src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::common::{
use crate::error::ConnectorResult;
use crate::source::kafka::stats::RdKafkaStats;
use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS};
use crate::source::KAFKA_CONNECTOR;

pub const PRIVATELINK_ENDPOINT_KEY: &str = "privatelink.endpoint";
pub const CONNECTION_NAME_KEY: &str = "connection.name";
Expand Down Expand Up @@ -205,16 +204,6 @@ fn get_property_required(
.map_err(Into::into)
}

#[inline(always)]
fn is_kafka_connector(with_properties: &BTreeMap<String, String>) -> bool {
const UPSTREAM_SOURCE_KEY: &str = "connector";
with_properties
.get(UPSTREAM_SOURCE_KEY)
.unwrap_or(&"".to_string())
.to_lowercase()
.eq_ignore_ascii_case(KAFKA_CONNECTOR)
}

pub fn insert_privatelink_broker_rewrite_map(
with_options: &mut BTreeMap<String, String>,
svc: Option<&PrivateLinkService>,
Expand Down
72 changes: 71 additions & 1 deletion src/connector/src/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};

use crate::source::iceberg::ICEBERG_CONNECTOR;
use crate::source::{
GCS_CONNECTOR, KAFKA_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, UPSTREAM_SOURCE_KEY,
};

/// Marker trait for `WITH` options. Only for `#[derive(WithOptions)]`, should not be used manually.
///
Expand Down Expand Up @@ -56,3 +61,68 @@ impl WithOptions for crate::mqtt_common::QualityOfService {}
impl WithOptions for crate::sink::kafka::CompressionCodec {}
impl WithOptions for nexmark::config::RateShape {}
impl WithOptions for nexmark::event::EventType {}

pub trait Get {
fn get(&self, key: &str) -> Option<&String>;
}

impl Get for HashMap<String, String> {
fn get(&self, key: &str) -> Option<&String> {
self.get(key)
}
}

impl Get for BTreeMap<String, String> {
fn get(&self, key: &str) -> Option<&String> {
self.get(key)
}
}

/// Utility methods for `WITH` properties (`HashMap` and `BTreeMap`).
pub trait WithPropertiesExt: Get {
#[inline(always)]
fn get_connector(&self) -> Option<String> {
self.get(UPSTREAM_SOURCE_KEY).map(|s| s.to_lowercase())
}

#[inline(always)]
fn is_kafka_connector(&self) -> bool {
let Some(connector) = self.get_connector() else {
return false;
};
connector == KAFKA_CONNECTOR
}

#[inline(always)]
fn is_cdc_connector(&self) -> bool {
let Some(connector) = self.get_connector() else {
return false;
};
connector.contains("-cdc")
}

#[inline(always)]
fn is_iceberg_connector(&self) -> bool {
let Some(connector) = self.get_connector() else {
return false;
};
connector == ICEBERG_CONNECTOR
}

fn connector_need_pk(&self) -> bool {
// Currently only iceberg connector doesn't need primary key
!self.is_iceberg_connector()
}

fn is_new_fs_connector(&self) -> bool {
self.get(UPSTREAM_SOURCE_KEY)
.map(|s| {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
|| s.eq_ignore_ascii_case(GCS_CONNECTOR)
})
.unwrap_or(false)
}
}

impl<T: Get> WithPropertiesExt for T {}
14 changes: 2 additions & 12 deletions src/frontend/src/catalog/connection_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;

use anyhow::anyhow;
use risingwave_connector::source::kafka::private_link::insert_privatelink_broker_rewrite_map;
use risingwave_connector::source::KAFKA_CONNECTOR;
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider;
use risingwave_pb::catalog::connection::Info;
use risingwave_pb::catalog::{connection, PbConnection};
Expand Down Expand Up @@ -65,23 +65,13 @@ impl OwnedByUserCatalog for ConnectionCatalog {
}
}

#[inline(always)]
fn is_kafka_connector(with_properties: &BTreeMap<String, String>) -> bool {
const UPSTREAM_SOURCE_KEY: &str = "connector";
with_properties
.get(UPSTREAM_SOURCE_KEY)
.unwrap_or(&"".to_string())
.to_lowercase()
.eq_ignore_ascii_case(KAFKA_CONNECTOR)
}

pub(crate) fn resolve_private_link_connection(
connection: &Arc<ConnectionCatalog>,
properties: &mut BTreeMap<String, String>,
) -> Result<()> {
#[allow(irrefutable_let_patterns)]
if let connection::Info::PrivateLinkService(svc) = &connection.info {
if !is_kafka_connector(properties) {
if !properties.is_kafka_connector() {
return Err(RwError::from(anyhow!(
"Private link is only supported for Kafka connector"
)));
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use itertools::Itertools;
use pgwire::pg_response::StatementType;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::ColumnCatalog;
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::{EncodeType, FormatType};
use risingwave_sqlparser::ast::{
Expand All @@ -28,7 +29,6 @@ use risingwave_sqlparser::parser::Parser;

use super::alter_table_column::schema_has_schema_registry;
use super::create_source::{bind_columns_from_source, validate_compatibility};
use super::util::is_cdc_connector;
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::source_catalog::SourceCatalog;
Expand Down Expand Up @@ -152,7 +152,7 @@ pub async fn refresh_sr_and_get_columns_diff(
.collect();
validate_compatibility(connector_schema, &mut with_properties)?;

if is_cdc_connector(&with_properties) {
if with_properties.is_cdc_connector() {
bail_not_implemented!("altering a cdc source is not supported");
}

Expand Down
23 changes: 11 additions & 12 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use risingwave_connector::source::{
KINESIS_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR,
POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR,
};
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::{
PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc,
};
Expand All @@ -75,10 +76,7 @@ use crate::handler::create_table::{
bind_pk_on_relation, bind_sql_column_constraints, bind_sql_columns, bind_sql_pk_names,
ensure_table_constraints_supported, ColumnIdGenerator,
};
use crate::handler::util::{
connector_need_pk, get_connector, is_cdc_connector, is_iceberg_connector, is_kafka_connector,
SourceSchemaCompatExt,
};
use crate::handler::util::SourceSchemaCompatExt;
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
Expand Down Expand Up @@ -298,7 +296,7 @@ pub(crate) async fn bind_columns_from_source(
const KEY_MESSAGE_NAME_KEY: &str = "key.message";
const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy";

let is_kafka: bool = is_kafka_connector(with_properties);
let is_kafka: bool = with_properties.is_kafka_connector();
let format_encode_options = WithOptions::try_from(source_schema.row_options())?.into_inner();
let mut format_encode_options_to_consume = format_encode_options.clone();

Expand Down Expand Up @@ -447,7 +445,7 @@ pub(crate) async fn bind_columns_from_source(
.await?
}
(Format::None, Encode::None) => {
if is_iceberg_connector(with_properties) {
if with_properties.is_iceberg_connector() {
Some(
extract_iceberg_columns(with_properties)
.await
Expand Down Expand Up @@ -533,7 +531,7 @@ pub fn handle_addition_columns(
mut additional_columns: IncludeOption,
columns: &mut Vec<ColumnCatalog>,
) -> Result<()> {
let connector_name = get_connector(with_properties).unwrap(); // there must be a connector in source
let connector_name = with_properties.get_connector().unwrap(); // there must be a connector in source

if COMPATIBLE_ADDITIONAL_COLUMNS
.get(connector_name.as_str())
Expand Down Expand Up @@ -878,7 +876,7 @@ fn check_and_add_timestamp_column(
with_properties: &HashMap<String, String>,
columns: &mut Vec<ColumnCatalog>,
) {
if is_kafka_connector(with_properties) {
if with_properties.is_kafka_connector() {
if columns.iter().any(|col| {
matches!(
col.column_desc.additional_column.column_type,
Expand Down Expand Up @@ -1027,7 +1025,8 @@ pub fn validate_compatibility(
source_schema: &ConnectorSchema,
props: &mut HashMap<String, String>,
) -> Result<()> {
let connector = get_connector(props)
let connector = props
.get_connector()
.ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_string())))?;

let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
Expand Down Expand Up @@ -1105,7 +1104,7 @@ pub(super) async fn check_source_schema(
row_id_index: Option<usize>,
columns: &[ColumnCatalog],
) -> Result<()> {
let Some(connector) = get_connector(props) else {
let Some(connector) = props.get_connector() else {
return Ok(());
};

Expand Down Expand Up @@ -1309,7 +1308,7 @@ pub async fn handle_create_source(
let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?;

// gated the feature with a session variable
let create_cdc_source_job = if is_cdc_connector(&with_properties) {
let create_cdc_source_job = if with_properties.is_cdc_connector() {
CdcTableType::from_properties(&with_properties).can_backfill()
} else {
false
Expand Down Expand Up @@ -1365,7 +1364,7 @@ pub async fn handle_create_source(
.into());
}
let (mut columns, pk_column_ids, row_id_index) =
bind_pk_on_relation(columns, pk_names, connector_need_pk(&with_properties))?;
bind_pk_on_relation(columns, pk_names, with_properties.connector_need_pk())?;

debug_assert!(is_column_ids_dedup(&columns));

Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use risingwave_common::catalog::{
};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_connector::source;
use risingwave_connector::source::cdc::external::{
DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
};
use risingwave_connector::source::cdc::CDC_BACKFILL_ENABLE_KEY;
use risingwave_connector::{source, WithPropertiesExt};
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc};
use risingwave_pb::ddl_service::TableJobType;
Expand Down Expand Up @@ -61,7 +61,6 @@ use crate::handler::create_source::{
bind_all_columns, bind_columns_from_source, bind_source_pk, bind_source_watermark,
check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY,
};
use crate::handler::util::is_iceberg_connector;
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
Expand Down Expand Up @@ -514,7 +513,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
c.column_desc.column_id = col_id_gen.generate(c.name())
}

if is_iceberg_connector(&with_properties) {
if with_properties.is_iceberg_connector() {
return Err(
ErrorCode::BindError("can't create table with iceberg connector".to_string()).into(),
);
Expand Down
41 changes: 0 additions & 41 deletions src/frontend/src/handler/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand All @@ -31,12 +30,9 @@ use risingwave_common::catalog::Field;
use risingwave_common::row::Row as _;
use risingwave_common::types::{write_date_time_tz, DataType, ScalarRefImpl, Timestamptz};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
use risingwave_connector::source::KAFKA_CONNECTOR;
use risingwave_sqlparser::ast::{CompatibleSourceSchema, ConnectorSchema};

use crate::error::{ErrorCode, Result as RwResult};
use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
use crate::session::{current, SessionImpl};

pin_project! {
Expand Down Expand Up @@ -180,43 +176,6 @@ pub fn to_pg_field(f: &Field) -> PgFieldDescriptor {
)
}

pub fn connector_need_pk(with_properties: &HashMap<String, String>) -> bool {
// Currently only iceberg connector doesn't need primary key
!is_iceberg_connector(with_properties)
}

#[inline(always)]
pub fn get_connector(with_properties: &HashMap<String, String>) -> Option<String> {
with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.to_lowercase())
}

#[inline(always)]
pub fn is_kafka_connector(with_properties: &HashMap<String, String>) -> bool {
let Some(connector) = get_connector(with_properties) else {
return false;
};

connector == KAFKA_CONNECTOR
}

#[inline(always)]
pub fn is_cdc_connector(with_properties: &HashMap<String, String>) -> bool {
let Some(connector) = get_connector(with_properties) else {
return false;
};
connector.contains("-cdc")
}

#[inline(always)]
pub fn is_iceberg_connector(with_properties: &HashMap<String, String>) -> bool {
let Some(connector) = get_connector(with_properties) else {
return false;
};
connector == ICEBERG_CONNECTOR
}

#[easy_ext::ext(SourceSchemaCompatExt)]
impl CompatibleSourceSchema {
/// Convert `self` to [`ConnectorSchema`] and warn the user if the syntax is deprecated.
Expand Down
Loading

0 comments on commit d527ba8

Please sign in to comment.