diff --git a/Cargo.lock b/Cargo.lock index 50088a3be1ad1..a11aae52a904f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,6 +1197,24 @@ dependencies = [ "paste", ] +[[package]] +name = "aws-msk-iam-sasl-signer" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7036b8409ffe698dfdc5ae08722999d960092aeb738026ea99c3071c94831668" +dependencies = [ + "aws-config", + "aws-credential-types", + "aws-sdk-sts", + "aws-sigv4", + "aws-types", + "base64 0.22.0", + "chrono", + "futures", + "thiserror", + "url", +] + [[package]] name = "aws-runtime" version = "1.0.1" @@ -7055,9 +7073,9 @@ dependencies = [ [[package]] name = "madsim-rdkafka" -version = "0.3.4+0.34.0" +version = "0.4.1+0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0992d46c19414bda24bad935ea223ad025dcd7e64f38f0acee388efa8ff5319b" +checksum = "053a71e7138773932b4195c70503129a3795e3ace59c7e7786fd64acfdc12b6f" dependencies = [ "async-channel", "async-trait", @@ -10464,6 +10482,7 @@ dependencies = [ "await-tree", "aws-config", "aws-credential-types", + "aws-msk-iam-sasl-signer", "aws-sdk-kinesis", "aws-sdk-s3", "aws-smithy-http", @@ -10474,6 +10493,7 @@ dependencies = [ "base64 0.22.0", "byteorder", "bytes", + "cfg-or-panic", "chrono", "clickhouse", "criterion", diff --git a/Cargo.toml b/Cargo.toml index ce1a66c94bdaa..9fd680c796e8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,7 +118,7 @@ axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain etcd-client = { package = "madsim-etcd-client", version = "0.4" } futures-async-stream = "0.2.9" hytra = "0.1" -rdkafka = { package = "madsim-rdkafka", version = "0.3.4", features = [ +rdkafka = { package = "madsim-rdkafka", version = "0.4.1", features = [ "cmake-build", ] } hashbrown = { version = "0.14", features = ["ahash", "inline-more", "nightly"] } diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 8e0a16c95f8bb..c8953e1c46d84 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -33,6 +33,7 @@ auto_impl = "1" await-tree = { workspace = true } aws-config = { workspace = true } aws-credential-types = { workspace = true } +aws-msk-iam-sasl-signer = "1.0.0" aws-sdk-kinesis = { workspace = true } aws-sdk-s3 = { workspace = true } aws-smithy-http = { workspace = true } @@ -43,6 +44,7 @@ aws-types = { workspace = true } base64 = "0.22" byteorder = "1" bytes = { version = "1", features = ["serde"] } +cfg-or-panic = "0.2" chrono = { version = "0.4", default-features = false, features = [ "clock", "std", diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 302b68dd664a1..7a5e3ba5f8b2b 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -44,6 +44,8 @@ use crate::source::nats::source::NatsOffset; pub const PRIVATE_LINK_BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints"; pub const PRIVATE_LINK_TARGETS_KEY: &str = "privatelink.targets"; +const AWS_MSK_IAM_AUTH: &str = "AWS_MSK_IAM"; + #[derive(Debug, Clone, Deserialize)] pub struct AwsPrivateLinkItem { pub az_id: Option, @@ -65,9 +67,9 @@ pub struct AwsAuthProps { pub access_key: Option, pub secret_key: Option, pub session_token: Option, + /// IAM role pub arn: Option, - /// This field was added for kinesis. Not sure if it's useful for other connectors. - /// Please ignore it in the documentation for now. + /// external ID in IAM role trust policy pub external_id: Option, pub profile: Option, } @@ -181,7 +183,7 @@ pub struct KafkaCommon { #[serde(rename = "properties.ssl.key.password")] ssl_key_password: Option, - /// SASL mechanism if SASL is enabled. Currently support PLAIN, SCRAM and GSSAPI. + /// SASL mechanism if SASL is enabled. Currently support PLAIN, SCRAM, GSSAPI, and AWS_MSK_IAM. #[serde(rename = "properties.sasl.mechanism")] sasl_mechanism: Option, @@ -286,6 +288,13 @@ impl RdKafkaPropertiesCommon { impl KafkaCommon { pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) { + // AWS_MSK_IAM + if self.is_aws_msk_iam() { + config.set("security.protocol", "SASL_SSL"); + config.set("sasl.mechanism", "OAUTHBEARER"); + return; + } + // Security protocol if let Some(security_protocol) = self.security_protocol.as_ref() { config.set("security.protocol", security_protocol); @@ -356,6 +365,16 @@ impl KafkaCommon { // Currently, we only support unsecured OAUTH. config.set("enable.sasl.oauthbearer.unsecure.jwt", "true"); } + + pub(crate) fn is_aws_msk_iam(&self) -> bool { + if let Some(sasl_mechanism) = self.sasl_mechanism.as_ref() + && sasl_mechanism == AWS_MSK_IAM_AUTH + { + true + } else { + false + } + } } #[derive(Clone, Debug, Deserialize, WithOptions)] diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index f15173a4aabf6..8313ea20dafa7 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -35,7 +35,9 @@ use with_options::WithOptions; use super::catalog::{SinkFormat, SinkFormatDesc}; use super::{Sink, SinkError, SinkParam}; -use crate::connector_common::{KafkaCommon, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon}; +use crate::connector_common::{ + AwsAuthProps, KafkaCommon, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon, +}; use crate::sink::catalog::desc::SinkDesc; use crate::sink::formatter::SinkFormatterImpl; use crate::sink::log_store::DeliveryFutureManagerAddFuture; @@ -43,7 +45,9 @@ use crate::sink::writer::{ AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink, }; use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam}; -use crate::source::kafka::{KafkaProperties, KafkaSplitEnumerator, PrivateLinkProducerContext}; +use crate::source::kafka::{ + KafkaContextCommon, KafkaProperties, KafkaSplitEnumerator, RwProducerContext, +}; use crate::source::{SourceEnumeratorContext, SplitEnumerator}; use crate::{ deserialize_duration_from_string, deserialize_u32_from_string, dispatch_sink_formatter_impl, @@ -244,6 +248,9 @@ pub struct KafkaConfig { #[serde(flatten)] pub privatelink_common: KafkaPrivateLinkCommon, + + #[serde(flatten)] + pub aws_auth_props: AwsAuthProps, } impl KafkaConfig { @@ -274,6 +281,7 @@ impl From for KafkaProperties { rdkafka_properties_common: val.rdkafka_properties_common, rdkafka_properties_consumer: Default::default(), privatelink_common: val.privatelink_common, + aws_auth_props: val.aws_auth_props, unknown_fields: Default::default(), } } @@ -393,7 +401,7 @@ const KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO: f32 = 1.2; const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 100000; struct KafkaPayloadWriter<'a> { - inner: &'a FutureProducer, + inner: &'a FutureProducer, add_future: DeliveryFutureManagerAddFuture<'a, KafkaSinkDeliveryFuture>, config: &'a KafkaConfig, } @@ -402,13 +410,13 @@ pub type KafkaSinkDeliveryFuture = impl TryFuture + pub struct KafkaSinkWriter { formatter: SinkFormatterImpl, - inner: FutureProducer, + inner: FutureProducer, config: KafkaConfig, } impl KafkaSinkWriter { async fn new(config: KafkaConfig, formatter: SinkFormatterImpl) -> Result { - let inner: FutureProducer = { + let inner: FutureProducer = { let mut c = ClientConfig::new(); // KafkaConfig configuration @@ -419,13 +427,16 @@ impl KafkaSinkWriter { c.set("bootstrap.servers", &config.common.brokers); // Create the producer context, will be used to create the producer - let producer_ctx = PrivateLinkProducerContext::new( - config.privatelink_common.broker_rewrite_map.clone(), - // fixme: enable kafka native metrics for sink + let broker_rewrite_map = config.privatelink_common.broker_rewrite_map.clone(); + let ctx_common = KafkaContextCommon::new( + broker_rewrite_map, None, None, - )?; - + config.aws_auth_props.clone(), + config.common.is_aws_msk_iam(), + ) + .await?; + let producer_ctx = RwProducerContext::new(ctx_common); // Generate the producer c.create_with_context(producer_ctx).await? }; diff --git a/src/connector/src/source/kafka/client_context.rs b/src/connector/src/source/kafka/client_context.rs new file mode 100644 index 0000000000000..6a01da356ff51 --- /dev/null +++ b/src/connector/src/source/kafka/client_context.rs @@ -0,0 +1,227 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; +use std::thread; + +use anyhow::anyhow; +use aws_config::Region; +use aws_sdk_s3::config::SharedCredentialsProvider; +use rdkafka::client::{BrokerAddr, OAuthToken}; +use rdkafka::consumer::ConsumerContext; +use rdkafka::message::DeliveryResult; +use rdkafka::producer::ProducerContext; +use rdkafka::{ClientContext, Statistics}; + +use super::private_link::{BrokerAddrRewriter, PrivateLinkContextRole}; +use super::stats::RdKafkaStats; +use crate::connector_common::AwsAuthProps; +use crate::error::ConnectorResult; + +struct IamAuthEnv { + credentials_provider: SharedCredentialsProvider, + region: Region, + // XXX(runji): madsim does not support `Handle` for now + #[cfg(not(madsim))] + rt: tokio::runtime::Handle, +} + +pub struct KafkaContextCommon { + // For VPC PrivateLink support + addr_rewriter: BrokerAddrRewriter, + + // identifier is required when reporting metrics as a label, usually it is compose by connector + // format (source or sink) and corresponding id (source_id or sink_id) + // identifier and metrics should be set at the same time + identifier: Option, + metrics: Option>, + + /// Credential and region for AWS MSK + auth: Option, +} + +impl KafkaContextCommon { + pub async fn new( + broker_rewrite_map: Option>, + identifier: Option, + metrics: Option>, + auth: AwsAuthProps, + is_aws_msk_iam: bool, + ) -> ConnectorResult { + let addr_rewriter = + BrokerAddrRewriter::new(PrivateLinkContextRole::Consumer, broker_rewrite_map)?; + let auth = if is_aws_msk_iam { + let config = auth.build_config().await?; + let credentials_provider = config + .credentials_provider() + .ok_or_else(|| anyhow!("missing aws credentials_provider"))?; + let region = config + .region() + .ok_or_else(|| anyhow!("missing aws region"))? + .clone(); + Some(IamAuthEnv { + credentials_provider, + region, + #[cfg(not(madsim))] + rt: tokio::runtime::Handle::current(), + }) + } else { + None + }; + Ok(Self { + addr_rewriter, + identifier, + metrics, + auth, + }) + } +} + +impl KafkaContextCommon { + fn stats(&self, statistics: Statistics) { + if let Some(metrics) = &self.metrics + && let Some(id) = &self.identifier + { + metrics.report(id.as_str(), &statistics); + } + } + + fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr { + self.addr_rewriter.rewrite_broker_addr(addr) + } + + // XXX(runji): oauth is ignored in simulation + #[cfg_or_panic::cfg_or_panic(not(madsim))] + fn generate_oauth_token( + &self, + _oauthbearer_config: Option<&str>, + ) -> Result> { + use aws_msk_iam_sasl_signer::generate_auth_token_from_credentials_provider; + use tokio::time::{timeout, Duration}; + + if let Some(IamAuthEnv { + credentials_provider, + region, + rt, + }) = &self.auth + { + let region = region.clone(); + let credentials_provider = credentials_provider.clone(); + let rt = rt.clone(); + let (token, expiration_time_ms) = { + let handle = thread::spawn(move || { + rt.block_on(async { + timeout( + Duration::from_secs(10), + generate_auth_token_from_credentials_provider( + region, + credentials_provider, + ), + ) + .await + }) + }); + handle.join().unwrap()?? + }; + Ok(OAuthToken { + token, + principal_name: "".to_string(), + lifetime_ms: expiration_time_ms, + }) + } else { + Err("must provide AWS IAM credential".into()) + } + } + + fn enable_refresh_oauth_token(&self) -> bool { + self.auth.is_some() + } +} + +pub type BoxConsumerContext = Box; + +/// Kafka consumer context used for private link, IAM auth, and metrics +pub struct RwConsumerContext { + common: KafkaContextCommon, +} + +impl RwConsumerContext { + pub fn new(common: KafkaContextCommon) -> Self { + Self { common } + } +} + +impl ClientContext for RwConsumerContext { + /// this func serves as a callback when `poll` is completed. + fn stats(&self, statistics: Statistics) { + self.common.stats(statistics); + } + + fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr { + self.common.rewrite_broker_addr(addr) + } + + fn generate_oauth_token( + &self, + oauthbearer_config: Option<&str>, + ) -> Result> { + self.common.generate_oauth_token(oauthbearer_config) + } + + fn enable_refresh_oauth_token(&self) -> bool { + self.common.enable_refresh_oauth_token() + } +} + +// required by the trait bound of BaseConsumer +impl ConsumerContext for RwConsumerContext {} + +/// Kafka producer context used for private link, IAM auth, and metrics +pub struct RwProducerContext { + common: KafkaContextCommon, +} + +impl RwProducerContext { + pub fn new(common: KafkaContextCommon) -> Self { + Self { common } + } +} + +impl ClientContext for RwProducerContext { + fn stats(&self, statistics: Statistics) { + self.common.stats(statistics); + } + + fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr { + self.common.rewrite_broker_addr(addr) + } + + fn generate_oauth_token( + &self, + oauthbearer_config: Option<&str>, + ) -> Result> { + self.common.generate_oauth_token(oauthbearer_config) + } + + fn enable_refresh_oauth_token(&self) -> bool { + self.common.enable_refresh_oauth_token() + } +} + +impl ProducerContext for RwProducerContext { + type DeliveryOpaque = (); + + fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {} +} diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 697fe421a5fb2..40b952f08216c 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -25,7 +25,9 @@ use risingwave_common::bail; use crate::error::ConnectorResult; use crate::source::base::SplitEnumerator; use crate::source::kafka::split::KafkaSplit; -use crate::source::kafka::{KafkaProperties, PrivateLinkConsumerContext, KAFKA_ISOLATION_LEVEL}; +use crate::source::kafka::{ + KafkaContextCommon, KafkaProperties, RwConsumerContext, KAFKA_ISOLATION_LEVEL, +}; use crate::source::SourceEnumeratorContextRef; #[derive(Debug, Copy, Clone, Eq, PartialEq)] @@ -40,7 +42,7 @@ pub struct KafkaSplitEnumerator { context: SourceEnumeratorContextRef, broker_address: String, topic: String, - client: BaseConsumer, + client: BaseConsumer, start_offset: KafkaEnumeratorOffset, // maybe used in the future for batch processing @@ -90,10 +92,30 @@ impl SplitEnumerator for KafkaSplitEnumerator { } // don't need kafka metrics from enumerator - let client_ctx = PrivateLinkConsumerContext::new(broker_rewrite_map, None, None)?; - let client: BaseConsumer = + let ctx_common = KafkaContextCommon::new( + broker_rewrite_map, + None, + None, + properties.aws_auth_props, + common_props.is_aws_msk_iam(), + ) + .await?; + let client_ctx = RwConsumerContext::new(ctx_common); + let client: BaseConsumer = config.create_with_context(client_ctx).await?; + // Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call + // rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either + // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval + // of an initial token to occur. + // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf + if common_props.is_aws_msk_iam() { + #[cfg(not(madsim))] + client.poll(Duration::from_secs(10)); // note: this is a blocking call + #[cfg(madsim)] + client.poll(Duration::from_secs(10)).await; + } + Ok(Self { context, broker_address, diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index 91d4ccce5ca88..0f16108bb5c82 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -17,16 +17,17 @@ use std::collections::HashMap; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; -use crate::connector_common::KafkaPrivateLinkCommon; +use crate::connector_common::{AwsAuthProps, KafkaPrivateLinkCommon}; +mod client_context; pub mod enumerator; pub mod private_link; pub mod source; pub mod split; pub mod stats; +pub use client_context::*; pub use enumerator::*; -pub use private_link::*; pub use source::*; pub use split::*; use with_options::WithOptions; @@ -137,6 +138,9 @@ pub struct KafkaProperties { #[serde(flatten)] pub privatelink_common: KafkaPrivateLinkCommon, + #[serde(flatten)] + pub aws_auth_props: AwsAuthProps, + #[serde(flatten)] pub unknown_fields: HashMap, } diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs index 6187078ae24fb..348749ba3f113 100644 --- a/src/connector/src/source/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -14,14 +14,10 @@ use std::collections::{BTreeMap, HashMap}; use std::str::FromStr; -use std::sync::Arc; use anyhow::{anyhow, Context}; use itertools::Itertools; use rdkafka::client::BrokerAddr; -use rdkafka::consumer::ConsumerContext; -use rdkafka::producer::{DeliveryResult, ProducerContext}; -use rdkafka::{ClientContext, Statistics}; use risingwave_common::bail; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; @@ -31,14 +27,13 @@ use crate::connector_common::{ AwsPrivateLinkItem, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY, }; use crate::error::ConnectorResult; -use crate::source::kafka::stats::RdKafkaStats; use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS}; pub const PRIVATELINK_ENDPOINT_KEY: &str = "privatelink.endpoint"; pub const CONNECTION_NAME_KEY: &str = "connection.name"; #[derive(Debug)] -enum PrivateLinkContextRole { +pub(super) enum PrivateLinkContextRole { Consumer, Producer, } @@ -52,13 +47,13 @@ impl std::fmt::Display for PrivateLinkContextRole { } } -struct BrokerAddrRewriter { +pub(super) struct BrokerAddrRewriter { role: PrivateLinkContextRole, rewrite_map: BTreeMap, } impl BrokerAddrRewriter { - fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr { + pub(super) fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr { let rewrote_addr = match self.rewrite_map.get(&addr) { None => addr, Some(new_addr) => new_addr.clone(), @@ -95,94 +90,6 @@ impl BrokerAddrRewriter { } } -pub struct PrivateLinkConsumerContext { - inner: BrokerAddrRewriter, - - // identifier is required when reporting metrics as a label, usually it is compose by connector - // format (source or sink) and corresponding id (source_id or sink_id) - // identifier and metrics should be set at the same time - identifier: Option, - metrics: Option>, -} - -impl PrivateLinkConsumerContext { - pub fn new( - broker_rewrite_map: Option>, - identifier: Option, - metrics: Option>, - ) -> ConnectorResult { - let inner = BrokerAddrRewriter::new(PrivateLinkContextRole::Consumer, broker_rewrite_map)?; - Ok(Self { - inner, - identifier, - metrics, - }) - } -} - -impl ClientContext for PrivateLinkConsumerContext { - /// this func serves as a callback when `poll` is completed. - fn stats(&self, statistics: Statistics) { - if let Some(metrics) = &self.metrics - && let Some(id) = &self.identifier - { - metrics.report(id.as_str(), &statistics); - } - } - - fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr { - self.inner.rewrite_broker_addr(addr) - } -} - -// required by the trait bound of BaseConsumer -impl ConsumerContext for PrivateLinkConsumerContext {} - -pub struct PrivateLinkProducerContext { - inner: BrokerAddrRewriter, - - // identifier is required when reporting metrics as a label, usually it is compose by connector - // format (source or sink) and corresponding id (source_id or sink_id) - // identifier and metrics should be set at the same time - identifier: Option, - metrics: Option>, -} - -impl PrivateLinkProducerContext { - pub fn new( - broker_rewrite_map: Option>, - identifier: Option, - metrics: Option>, - ) -> ConnectorResult { - let inner = BrokerAddrRewriter::new(PrivateLinkContextRole::Producer, broker_rewrite_map)?; - Ok(Self { - inner, - identifier, - metrics, - }) - } -} - -impl ClientContext for PrivateLinkProducerContext { - fn stats(&self, statistics: Statistics) { - if let Some(metrics) = &self.metrics - && let Some(id) = &self.identifier - { - metrics.report(id.as_str(), &statistics); - } - } - - fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr { - self.inner.rewrite_broker_addr(addr) - } -} - -impl ProducerContext for PrivateLinkProducerContext { - type DeliveryOpaque = (); - - fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {} -} - #[inline(always)] fn kafka_props_broker_key(with_properties: &BTreeMap) -> &str { if with_properties.contains_key(KAFKA_PROPS_BROKER_KEY) { diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 73c24dd5f810d..a28ecb67b0f98 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -31,7 +31,7 @@ use crate::error::ConnectorResult as Result; use crate::parser::ParserConfig; use crate::source::base::SourceMessage; use crate::source::kafka::{ - KafkaProperties, KafkaSplit, PrivateLinkConsumerContext, KAFKA_ISOLATION_LEVEL, + KafkaContextCommon, KafkaProperties, KafkaSplit, RwConsumerContext, KAFKA_ISOLATION_LEVEL, }; use crate::source::{ into_chunk_stream, BoxChunkSourceStream, Column, CommonSplitReader, SourceContextRef, SplitId, @@ -39,7 +39,7 @@ use crate::source::{ }; pub struct KafkaSplitReader { - consumer: StreamConsumer, + consumer: StreamConsumer, offsets: HashMap, Option)>, bytes_per_second: usize, max_num_messages: usize, @@ -78,7 +78,7 @@ impl SplitReader for KafkaSplitReader { format!("rw-consumer-{}", source_ctx.fragment_id), ); - let client_ctx = PrivateLinkConsumerContext::new( + let ctx_common = KafkaContextCommon::new( broker_rewrite_map, Some(format!( "fragment-{}-source-{}-actor-{}", @@ -87,8 +87,13 @@ impl SplitReader for KafkaSplitReader { // thread consumer will keep polling in the background, we don't need to call `poll` // explicitly Some(source_ctx.metrics.rdkafka_native_metric.clone()), - )?; - let consumer: StreamConsumer = config + properties.aws_auth_props, + properties.common.is_aws_msk_iam(), + ) + .await?; + + let client_ctx = RwConsumerContext::new(ctx_common); + let consumer: StreamConsumer = config .set_log_level(RDKafkaLogLevel::Info) .create_with_context(client_ctx) .await diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index aa6d83a143b3a..219d7bc8a7337 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -39,10 +39,11 @@ BigQueryConfig: required: false - name: arn field_type: String + comments: IAM role required: false - name: external_id field_type: String - comments: This field was added for kinesis. Not sure if it's useful for other connectors. Please ignore it in the documentation for now. + comments: external ID in IAM role trust policy required: false - name: profile field_type: String @@ -230,7 +231,7 @@ KafkaConfig: required: false - name: properties.sasl.mechanism field_type: String - comments: SASL mechanism if SASL is enabled. Currently support PLAIN, SCRAM and GSSAPI. + comments: SASL mechanism if SASL is enabled. Currently support PLAIN, SCRAM, GSSAPI, and AWS_MSK_IAM. required: false - name: properties.sasl.username field_type: String @@ -351,6 +352,33 @@ KafkaConfig: field_type: HashMap comments: This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users. required: false + - name: region + field_type: String + required: false + - name: endpoint + field_type: String + required: false + alias: endpoint_url + - name: access_key + field_type: String + required: false + - name: secret_key + field_type: String + required: false + - name: session_token + field_type: String + required: false + - name: arn + field_type: String + comments: IAM role + required: false + - name: external_id + field_type: String + comments: external ID in IAM role trust policy + required: false + - name: profile + field_type: String + required: false KinesisSinkConfig: fields: - name: stream @@ -535,10 +563,11 @@ PulsarConfig: required: false - name: arn field_type: String + comments: IAM role required: false - name: external_id field_type: String - comments: This field was added for kinesis. Not sure if it's useful for other connectors. Please ignore it in the documentation for now. + comments: external ID in IAM role trust policy required: false - name: profile field_type: String diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 2ea0e5f3488ee..c62db228eeb02 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -135,7 +135,7 @@ KafkaProperties: required: false - name: properties.sasl.mechanism field_type: String - comments: SASL mechanism if SASL is enabled. Currently support PLAIN, SCRAM and GSSAPI. + comments: SASL mechanism if SASL is enabled. Currently support PLAIN, SCRAM, GSSAPI, and AWS_MSK_IAM. required: false - name: properties.sasl.username field_type: String @@ -214,6 +214,33 @@ KafkaProperties: field_type: HashMap comments: This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users. required: false + - name: region + field_type: String + required: false + - name: endpoint + field_type: String + required: false + alias: endpoint_url + - name: access_key + field_type: String + required: false + - name: secret_key + field_type: String + required: false + - name: session_token + field_type: String + required: false + - name: arn + field_type: String + comments: IAM role + required: false + - name: external_id + field_type: String + comments: external ID in IAM role trust policy + required: false + - name: profile + field_type: String + required: false KinesisProperties: fields: - name: scan.startup.mode @@ -638,10 +665,11 @@ PulsarProperties: required: false - name: arn field_type: String + comments: IAM role required: false - name: external_id field_type: String - comments: This field was added for kinesis. Not sure if it's useful for other connectors. Please ignore it in the documentation for now. + comments: external ID in IAM role trust policy required: false - name: profile field_type: String diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index c4c1b2ad0aace..803c1de034ebb 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -718,7 +718,7 @@ fn derive_default_column_project_for_sink( // If users specified the columns to be inserted e.g. `CREATE SINK s INTO t(a, b)`, the expressions of `Project` will be generated accordingly. // The missing columns will be filled with default value (`null` if not explicitly defined). - // Otherwhise, e.g. `CREATE SINK s INTO t`, the columns will be matched by their order in `select` query and the target table. + // Otherwise, e.g. `CREATE SINK s INTO t`, the columns will be matched by their order in `select` query and the target table. #[allow(clippy::collapsible_else_if)] if user_specified_columns { if let Some(idx) = sink_visible_col_idxes_by_name.get(table_column.name()) { diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index dc3e66bdfbac2..5526e92d7be67 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, HashMap}; use std::convert::TryFrom; use std::num::NonZeroU32; -use risingwave_connector::source::kafka::{ +use risingwave_connector::source::kafka::private_link::{ insert_privatelink_broker_rewrite_map, CONNECTION_NAME_KEY, PRIVATELINK_ENDPOINT_KEY, }; use risingwave_connector::WithPropertiesExt; diff --git a/src/meta/model_v2/src/subscription.rs b/src/meta/model_v2/src/subscription.rs index 47ebbc63a2dc1..3d53c4b767804 100644 --- a/src/meta/model_v2/src/subscription.rs +++ b/src/meta/model_v2/src/subscription.rs @@ -28,7 +28,7 @@ pub struct Model { pub retention_seconds: i64, pub definition: String, pub subscription_state: i32, - pub dependent_table_id: u32, + pub dependent_table_id: i32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -59,7 +59,7 @@ impl From for ActiveModel { retention_seconds: Set(pb_subscription.retention_seconds as _), definition: Set(pb_subscription.definition), subscription_state: Set(pb_subscription.subscription_state), - dependent_table_id: Set(pb_subscription.dependent_table_id), + dependent_table_id: Set(pb_subscription.dependent_table_id as _), } } } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index ff90dd33297d2..7e9f20f7557d6 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -235,7 +235,7 @@ impl From> for PbSubscription { ), initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, - dependent_table_id: value.0.dependent_table_id, + dependent_table_id: value.0.dependent_table_id as _, subscription_state: PbSubscriptionState::Init as _, } } diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 2f32cb7c0716d..125ae9b9d33bb 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -1023,7 +1023,7 @@ fn get_attempt_timeout_by_type(config: &ObjectStoreConfig, operation_type: Opera } OperationType::Metadata => config.retry.metadata_attempt_timeout_ms, OperationType::Delete => config.retry.delete_attempt_timeout_ms, - OperationType::DeleteObjects => config.retry.delete_attempt_timeout_ms, + OperationType::DeleteObjects => config.retry.delete_objects_attempt_timeout_ms, OperationType::List => config.retry.list_attempt_timeout_ms, } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 05217fd53a723..0751eb7aec140 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -180,12 +180,6 @@ pub struct Parser { /// Since we cannot distinguish `>>` and double `>`, so use `angle_brackets_num` to store the /// number of `<` to match `>` in sql like `struct>`. angle_brackets_num: i32, - /// It's important that already in named Array or not. so use this field check in or not. - /// Consider 0 is you're not in named Array. if more than 0 is you're in named Array - array_depth: usize, - /// We cannot know current array should be keep named or not, so by using this field store - /// every depth of array that should be keep named or not. - array_named_stack: Vec, } impl Parser { @@ -195,8 +189,6 @@ impl Parser { tokens, index: 0, angle_brackets_num: 0, - array_depth: 0, - array_named_stack: Vec::new(), } } @@ -312,25 +304,6 @@ impl Parser { Ok(Statement::Analyze { table_name }) } - /// Check is enter array expression. - pub fn peek_array_depth(&self) -> usize { - self.array_depth - } - - /// When enter specify ARRAY prefix expression. - pub fn increase_array_depth(&mut self, num: usize) { - self.array_depth += num; - } - - /// When exit specify ARRAY prefix expression. - pub fn decrease_array_depth(&mut self, num: usize) { - self.array_depth -= num; - } - - pub fn is_in_array(&self) -> bool { - self.peek_array_depth() > 0 - } - /// Tries to parse a wildcard expression. If it is not a wildcard, parses an expression. /// /// A wildcard expression either means: @@ -596,10 +569,6 @@ impl Parser { expr: Box::new(self.parse_subexpr(Precedence::UnaryNot)?), }), Keyword::ROW => self.parse_row_expr(), - Keyword::ARRAY if self.peek_token() == Token::LBracket => { - self.expect_token(&Token::LBracket)?; - self.parse_array_expr(true) - } Keyword::ARRAY if self.peek_token() == Token::LParen => { // similar to `exists(subquery)` self.expect_token(&Token::LParen)?; @@ -607,6 +576,7 @@ impl Parser { self.expect_token(&Token::RParen)?; Ok(exists_node) } + Keyword::ARRAY if self.peek_token() == Token::LBracket => self.parse_array_expr(), // `LEFT` and `RIGHT` are reserved as identifier but okay as function Keyword::LEFT | Keyword::RIGHT => { self.parse_function(ObjectName(vec![w.to_ident()?])) @@ -669,8 +639,6 @@ impl Parser { }, }, // End of Token::Word - Token::LBracket if self.is_in_array() => self.parse_array_expr(false), - tok @ Token::Minus | tok @ Token::Plus => { let op = if tok == Token::Plus { UnaryOperator::Plus @@ -1242,46 +1210,50 @@ impl Parser { } /// Parses an array expression `[ex1, ex2, ..]` - /// if `named` is `true`, came from an expression like `ARRAY[ex1, ex2]` - pub fn parse_array_expr(&mut self, named: bool) -> Result { - self.increase_array_depth(1); - if self.array_named_stack.len() < self.peek_array_depth() { - self.array_named_stack.push(named); - } else if let Err(parse_err) = self.check_same_named_array(named) { - Err(parse_err)? - } - - if self.peek_token() == Token::RBracket { - let _ = self.next_token(); // consume ] - self.decrease_array_depth(1); - Ok(Expr::Array(Array { - elem: vec![], - named, - })) - } else { - let exprs = self.parse_comma_separated(Parser::parse_expr)?; - self.expect_token(&Token::RBracket)?; - if self.array_named_stack.len() > self.peek_array_depth() { - self.array_named_stack.pop(); - } - self.decrease_array_depth(1); - Ok(Expr::Array(Array { elem: exprs, named })) - } + pub fn parse_array_expr(&mut self) -> Result { + let mut expected_depth = None; + let exprs = self.parse_array_inner(0, &mut expected_depth)?; + Ok(Expr::Array(Array { + elem: exprs, + // Top-level array is named. + named: true, + })) } - fn check_same_named_array(&mut self, current_named: bool) -> Result<(), ParserError> { - let previous_named = self.array_named_stack.last().unwrap(); - if current_named != *previous_named { - // for '[' - self.prev_token(); - if current_named { - // for keyword 'array' - self.prev_token(); - } - parser_err!(format!("syntax error at or near {}", self.peek_token()))? + fn parse_array_inner( + &mut self, + depth: usize, + expected_depth: &mut Option, + ) -> Result, ParserError> { + self.expect_token(&Token::LBracket)?; + if let Some(expected_depth) = *expected_depth + && depth > expected_depth + { + return self.expected("]", self.peek_token()); + } + let exprs = if self.peek_token() == Token::LBracket { + self.parse_comma_separated(|parser| { + let exprs = parser.parse_array_inner(depth + 1, expected_depth)?; + Ok(Expr::Array(Array { + elem: exprs, + named: false, + })) + })? } else { - Ok(()) - } + if let Some(expected_depth) = *expected_depth { + if depth < expected_depth { + return self.expected("[", self.peek_token()); + } + } else { + *expected_depth = Some(depth); + } + if self.consume_token(&Token::RBracket) { + return Ok(vec![]); + } + self.parse_comma_separated(Self::parse_expr)? + }; + self.expect_token(&Token::RBracket)?; + Ok(exprs) } // This function parses date/time fields for interval qualifiers. diff --git a/src/sqlparser/tests/sqlparser_postgres.rs b/src/sqlparser/tests/sqlparser_postgres.rs index d70625c33d770..55e4eefe0e406 100644 --- a/src/sqlparser/tests/sqlparser_postgres.rs +++ b/src/sqlparser/tests/sqlparser_postgres.rs @@ -1119,41 +1119,19 @@ fn parse_array() { ); let sql = "SELECT ARRAY[ARRAY[1, 2], [3, 4]]"; - assert_eq!( - parse_sql_statements(sql), - Err(ParserError::ParserError( - "syntax error at or near [ at line:1, column:28".to_string() - )) - ); + assert!(parse_sql_statements(sql).is_err()); let sql = "SELECT ARRAY[ARRAY[], []]"; - assert_eq!( - parse_sql_statements(sql), - Err(ParserError::ParserError( - "syntax error at or near [ at line:1, column:24".to_string() - )) - ); + assert!(parse_sql_statements(sql).is_err()); let sql = "SELECT ARRAY[[1, 2], ARRAY[3, 4]]"; - assert_eq!( - parse_sql_statements(sql), - Err(ParserError::ParserError( - "syntax error at or near ARRAY at line:1, column:27".to_string() - )) - ); + assert!(parse_sql_statements(sql).is_err()); let sql = "SELECT ARRAY[[], ARRAY[]]"; - assert_eq!( - parse_sql_statements(sql), - Err(ParserError::ParserError( - "syntax error at or near ARRAY at line:1, column:23".to_string() - )) - ); + assert!(parse_sql_statements(sql).is_err()); let sql = "SELECT [[1, 2], [3, 4]]"; - let res = parse_sql_statements(sql); - let err_msg = "Expected an expression:, found: ["; - assert!(format!("{}", res.unwrap_err()).contains(err_msg)); + assert!(parse_sql_statements(sql).is_err()); } #[test] diff --git a/src/sqlparser/tests/testdata/array.yaml b/src/sqlparser/tests/testdata/array.yaml index aa655652b2b7f..9af94c041fdcb 100644 --- a/src/sqlparser/tests/testdata/array.yaml +++ b/src/sqlparser/tests/testdata/array.yaml @@ -25,3 +25,41 @@ formatted_sql: SELECT (CAST(ARRAY[ARRAY[2, 3]] AS INT[][]))[1][2] - input: SELECT ARRAY[] formatted_sql: SELECT ARRAY[] +- input: SELECT ARRAY[[1,2],[3,4]] + formatted_sql: SELECT ARRAY[[1, 2], [3, 4]] +- input: SELECT ARRAY[ARRAY[1,2],ARRAY[3,4]] + formatted_sql: SELECT ARRAY[ARRAY[1, 2], ARRAY[3, 4]] +- input: SELECT ARRAY[[],[]] + formatted_sql: SELECT ARRAY[[], []] +- input: SELECT ARRAY[ARRAY[],[]] + error_msg: |- + sql parser error: Expected an expression:, found: [ at line:1, column:23 + Near "SELECT ARRAY[ARRAY[],[" +- input: SELECT ARRAY[[],ARRAY[]] + error_msg: |- + sql parser error: Expected [, found: ARRAY at line:1, column:22 + Near "SELECT ARRAY[[]," +- input: SELECT ARRAY[[1,2],3] + error_msg: |- + sql parser error: Expected [, found: 3 at line:1, column:21 + Near "SELECT ARRAY[[1,2]," +- input: SELECT ARRAY[1,[2,3]] + error_msg: |- + sql parser error: Expected an expression:, found: [ at line:1, column:17 + Near "SELECT ARRAY[1,[" +- input: SELECT ARRAY[ARRAY[1,2],[3,4]] + error_msg: |- + sql parser error: Expected an expression:, found: [ at line:1, column:26 + Near "ARRAY[ARRAY[1,2],[" +- input: SELECT ARRAY[[1,2],ARRAY[3,4]] + error_msg: |- + sql parser error: Expected [, found: ARRAY at line:1, column:25 + Near "SELECT ARRAY[[1,2]," +- input: SELECT ARRAY[[1,2],[3] || [4]] + error_msg: |- + sql parser error: Expected ], found: || at line:1, column:25 + Near "[[1,2],[3]" +- input: SELECT [1,2] + error_msg: |- + sql parser error: Expected an expression:, found: [ at line:1, column:9 + Near "SELECT ["