Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: introduce Get and WithPropertiesExt #15632

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub mod common;
pub use paste::paste;

mod with_options;
pub use with_options::WithPropertiesExt;

#[cfg(test)]
mod with_options_test;
Expand Down
13 changes: 1 addition & 12 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::anyhow;
Expand Down Expand Up @@ -331,17 +331,6 @@ impl Default for ConnectorProperties {
}

impl ConnectorProperties {
pub fn is_new_fs_connector_b_tree_map(with_properties: &BTreeMap<String, String>) -> bool {
with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|s| {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
|| s.eq_ignore_ascii_case(GCS_CONNECTOR)
})
.unwrap_or(false)
}

pub fn is_new_fs_connector_hash_map(with_properties: &HashMap<String, String>) -> bool {
with_properties
.get(UPSTREAM_SOURCE_KEY)
Expand Down
11 changes: 0 additions & 11 deletions src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::common::{
use crate::error::ConnectorResult;
use crate::source::kafka::stats::RdKafkaStats;
use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS};
use crate::source::KAFKA_CONNECTOR;

pub const PRIVATELINK_ENDPOINT_KEY: &str = "privatelink.endpoint";
pub const CONNECTION_NAME_KEY: &str = "connection.name";
Expand Down Expand Up @@ -205,16 +204,6 @@ fn get_property_required(
.map_err(Into::into)
}

#[inline(always)]
fn is_kafka_connector(with_properties: &BTreeMap<String, String>) -> bool {
const UPSTREAM_SOURCE_KEY: &str = "connector";
with_properties
.get(UPSTREAM_SOURCE_KEY)
.unwrap_or(&"".to_string())
.to_lowercase()
.eq_ignore_ascii_case(KAFKA_CONNECTOR)
}

pub fn insert_privatelink_broker_rewrite_map(
with_options: &mut BTreeMap<String, String>,
svc: Option<&PrivateLinkService>,
Expand Down
72 changes: 71 additions & 1 deletion src/connector/src/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};

use crate::source::iceberg::ICEBERG_CONNECTOR;
use crate::source::{
GCS_CONNECTOR, KAFKA_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, UPSTREAM_SOURCE_KEY,
};

/// Marker trait for `WITH` options. Only for `#[derive(WithOptions)]`, should not be used manually.
///
Expand Down Expand Up @@ -55,3 +60,68 @@ impl WithOptions for std::time::Duration {}
impl WithOptions for crate::sink::kafka::CompressionCodec {}
impl WithOptions for nexmark::config::RateShape {}
impl WithOptions for nexmark::event::EventType {}

pub trait Get {
fn get(&self, key: &str) -> Option<&String>;
}

impl Get for HashMap<String, String> {
fn get(&self, key: &str) -> Option<&String> {
self.get(key)
}
}

impl Get for BTreeMap<String, String> {
fn get(&self, key: &str) -> Option<&String> {
self.get(key)
}
}

/// Utility methods for `WITH` properties (`HashMap` and `BTreeMap`).
pub trait WithPropertiesExt: Get {
#[inline(always)]
fn get_connector(&self) -> Option<String> {
self.get(UPSTREAM_SOURCE_KEY).map(|s| s.to_lowercase())
}

#[inline(always)]
fn is_kafka_connector(&self) -> bool {
let Some(connector) = self.get_connector() else {
return false;
};
connector == KAFKA_CONNECTOR
}

#[inline(always)]
fn is_cdc_connector(&self) -> bool {
let Some(connector) = self.get_connector() else {
return false;
};
connector.contains("-cdc")
}

#[inline(always)]
fn is_iceberg_connector(&self) -> bool {
let Some(connector) = self.get_connector() else {
return false;
};
connector == ICEBERG_CONNECTOR
}

fn connector_need_pk(&self) -> bool {
// Currently only iceberg connector doesn't need primary key
!self.is_iceberg_connector()
}

fn is_new_fs_connector(&self) -> bool {
self.get(UPSTREAM_SOURCE_KEY)
.map(|s| {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
|| s.eq_ignore_ascii_case(GCS_CONNECTOR)
})
.unwrap_or(false)
}
}

impl<T: Get> WithPropertiesExt for T {}
14 changes: 2 additions & 12 deletions src/frontend/src/catalog/connection_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;

use anyhow::anyhow;
use risingwave_connector::source::kafka::private_link::insert_privatelink_broker_rewrite_map;
use risingwave_connector::source::KAFKA_CONNECTOR;
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider;
use risingwave_pb::catalog::connection::Info;
use risingwave_pb::catalog::{connection, PbConnection};
Expand Down Expand Up @@ -65,23 +65,13 @@ impl OwnedByUserCatalog for ConnectionCatalog {
}
}

#[inline(always)]
fn is_kafka_connector(with_properties: &BTreeMap<String, String>) -> bool {
const UPSTREAM_SOURCE_KEY: &str = "connector";
with_properties
.get(UPSTREAM_SOURCE_KEY)
.unwrap_or(&"".to_string())
.to_lowercase()
.eq_ignore_ascii_case(KAFKA_CONNECTOR)
}

pub(crate) fn resolve_private_link_connection(
connection: &Arc<ConnectionCatalog>,
properties: &mut BTreeMap<String, String>,
) -> Result<()> {
#[allow(irrefutable_let_patterns)]
if let connection::Info::PrivateLinkService(svc) = &connection.info {
if !is_kafka_connector(properties) {
if properties.is_kafka_connector() {
return Err(RwError::from(anyhow!(
"Private link is only supported for Kafka connector"
)));
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use itertools::Itertools;
use pgwire::pg_response::StatementType;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::ColumnCatalog;
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::{EncodeType, FormatType};
use risingwave_sqlparser::ast::{
Expand All @@ -28,7 +29,6 @@ use risingwave_sqlparser::parser::Parser;

use super::alter_table_column::schema_has_schema_registry;
use super::create_source::{bind_columns_from_source, validate_compatibility};
use super::util::is_cdc_connector;
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::source_catalog::SourceCatalog;
Expand Down Expand Up @@ -152,7 +152,7 @@ pub async fn refresh_sr_and_get_columns_diff(
.collect();
validate_compatibility(connector_schema, &mut with_properties)?;

if is_cdc_connector(&with_properties) {
if with_properties.is_cdc_connector() {
bail_not_implemented!("altering a cdc source is not supported");
}

Expand Down
23 changes: 11 additions & 12 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use risingwave_connector::source::{
KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR,
PULSAR_CONNECTOR, S3_CONNECTOR,
};
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::{
PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc,
};
Expand All @@ -75,10 +76,7 @@ use crate::handler::create_table::{
bind_pk_on_relation, bind_sql_column_constraints, bind_sql_columns, bind_sql_pk_names,
ensure_table_constraints_supported, ColumnIdGenerator,
};
use crate::handler::util::{
connector_need_pk, get_connector, is_cdc_connector, is_iceberg_connector, is_kafka_connector,
SourceSchemaCompatExt,
};
use crate::handler::util::SourceSchemaCompatExt;
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
Expand Down Expand Up @@ -298,7 +296,7 @@ pub(crate) async fn bind_columns_from_source(
const KEY_MESSAGE_NAME_KEY: &str = "key.message";
const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy";

let is_kafka: bool = is_kafka_connector(with_properties);
let is_kafka: bool = with_properties.is_kafka_connector();
let format_encode_options = WithOptions::try_from(source_schema.row_options())?.into_inner();
let mut format_encode_options_to_consume = format_encode_options.clone();

Expand Down Expand Up @@ -447,7 +445,7 @@ pub(crate) async fn bind_columns_from_source(
.await?
}
(Format::None, Encode::None) => {
if is_iceberg_connector(with_properties) {
if with_properties.is_iceberg_connector() {
Some(
extract_iceberg_columns(with_properties)
.await
Expand Down Expand Up @@ -533,7 +531,7 @@ pub fn handle_addition_columns(
mut additional_columns: IncludeOption,
columns: &mut Vec<ColumnCatalog>,
) -> Result<()> {
let connector_name = get_connector(with_properties).unwrap(); // there must be a connector in source
let connector_name = with_properties.get_connector().unwrap(); // there must be a connector in source

if COMPATIBLE_ADDITIONAL_COLUMNS
.get(connector_name.as_str())
Expand Down Expand Up @@ -878,7 +876,7 @@ fn check_and_add_timestamp_column(
with_properties: &HashMap<String, String>,
columns: &mut Vec<ColumnCatalog>,
) {
if is_kafka_connector(with_properties) {
if with_properties.is_kafka_connector() {
if columns.iter().any(|col| {
matches!(
col.column_desc.additional_column.column_type,
Expand Down Expand Up @@ -1024,7 +1022,8 @@ pub fn validate_compatibility(
source_schema: &ConnectorSchema,
props: &mut HashMap<String, String>,
) -> Result<()> {
let connector = get_connector(props)
let connector = props
.get_connector()
.ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_string())))?;

let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
Expand Down Expand Up @@ -1102,7 +1101,7 @@ pub(super) async fn check_source_schema(
row_id_index: Option<usize>,
columns: &[ColumnCatalog],
) -> Result<()> {
let Some(connector) = get_connector(props) else {
let Some(connector) = props.get_connector() else {
return Ok(());
};

Expand Down Expand Up @@ -1306,7 +1305,7 @@ pub async fn handle_create_source(
let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?;

// gated the feature with a session variable
let create_cdc_source_job = if is_cdc_connector(&with_properties) {
let create_cdc_source_job = if with_properties.is_cdc_connector() {
CdcTableType::from_properties(&with_properties).can_backfill()
} else {
false
Expand Down Expand Up @@ -1362,7 +1361,7 @@ pub async fn handle_create_source(
.into());
}
let (mut columns, pk_column_ids, row_id_index) =
bind_pk_on_relation(columns, pk_names, connector_need_pk(&with_properties))?;
bind_pk_on_relation(columns, pk_names, with_properties.connector_need_pk())?;

debug_assert!(is_column_ids_dedup(&columns));

Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use risingwave_common::catalog::{
};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_connector::source;
use risingwave_connector::source::cdc::external::{
DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
};
use risingwave_connector::source::cdc::CDC_BACKFILL_ENABLE_KEY;
use risingwave_connector::{source, WithPropertiesExt};
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc};
use risingwave_pb::ddl_service::TableJobType;
Expand Down Expand Up @@ -61,7 +61,6 @@ use crate::handler::create_source::{
bind_all_columns, bind_columns_from_source, bind_source_pk, bind_source_watermark,
check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY,
};
use crate::handler::util::is_iceberg_connector;
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
Expand Down Expand Up @@ -514,7 +513,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
c.column_desc.column_id = col_id_gen.generate(c.name())
}

if is_iceberg_connector(&with_properties) {
if with_properties.is_iceberg_connector() {
return Err(
ErrorCode::BindError("can't create table with iceberg connector".to_string()).into(),
);
Expand Down
41 changes: 0 additions & 41 deletions src/frontend/src/handler/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand All @@ -31,12 +30,9 @@ use risingwave_common::catalog::Field;
use risingwave_common::row::Row as _;
use risingwave_common::types::{write_date_time_tz, DataType, ScalarRefImpl, Timestamptz};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
use risingwave_connector::source::KAFKA_CONNECTOR;
use risingwave_sqlparser::ast::{CompatibleSourceSchema, ConnectorSchema};

use crate::error::{ErrorCode, Result as RwResult};
use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
use crate::session::{current, SessionImpl};

pin_project! {
Expand Down Expand Up @@ -180,43 +176,6 @@ pub fn to_pg_field(f: &Field) -> PgFieldDescriptor {
)
}

pub fn connector_need_pk(with_properties: &HashMap<String, String>) -> bool {
// Currently only iceberg connector doesn't need primary key
!is_iceberg_connector(with_properties)
}

#[inline(always)]
pub fn get_connector(with_properties: &HashMap<String, String>) -> Option<String> {
with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.to_lowercase())
}

#[inline(always)]
pub fn is_kafka_connector(with_properties: &HashMap<String, String>) -> bool {
let Some(connector) = get_connector(with_properties) else {
return false;
};

connector == KAFKA_CONNECTOR
}

#[inline(always)]
pub fn is_cdc_connector(with_properties: &HashMap<String, String>) -> bool {
let Some(connector) = get_connector(with_properties) else {
return false;
};
connector.contains("-cdc")
}

#[inline(always)]
pub fn is_iceberg_connector(with_properties: &HashMap<String, String>) -> bool {
let Some(connector) = get_connector(with_properties) else {
return false;
};
connector == ICEBERG_CONNECTOR
}

#[easy_ext::ext(SourceSchemaCompatExt)]
impl CompatibleSourceSchema {
/// Convert `self` to [`ConnectorSchema`] and warn the user if the syntax is deprecated.
Expand Down
Loading
Loading