From 35e690a54a4b2c431d0f6bca5e4f5b15f2e74d9b Mon Sep 17 00:00:00 2001 From: Jinser Kafka Date: Mon, 29 Apr 2024 21:32:23 +0800 Subject: [PATCH 01/10] feat(sink): add Google Pub/Sub sink support append-only mode and JSON encoding --- src/connector/Cargo.toml | 2 +- src/connector/src/sink/google_pubsub.rs | 327 ++++++++++++++++++++++++ src/connector/src/sink/mod.rs | 8 + src/frontend/src/handler/create_sink.rs | 8 +- 4 files changed, 342 insertions(+), 3 deletions(-) create mode 100644 src/connector/src/sink/google_pubsub.rs diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 81fa19f9643a2..e0b986b912202 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -60,7 +60,7 @@ gcp-bigquery-client = "0.18.0" glob = "0.3" google-cloud-bigquery = { version = "0.8.0", features = ["auth"] } google-cloud-gax = "0.17.0" -google-cloud-googleapis = "0.12.0" +google-cloud-googleapis = { version = "0.12", features = ["pubsub"] } google-cloud-pubsub = "0.24" http = "0.2" icelake = { workspace = true } diff --git a/src/connector/src/sink/google_pubsub.rs b/src/connector/src/sink/google_pubsub.rs new file mode 100644 index 0000000000000..76e153bd353ab --- /dev/null +++ b/src/connector/src/sink/google_pubsub.rs @@ -0,0 +1,327 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::usize; + +use anyhow::{anyhow, Context}; +use google_cloud_googleapis::pubsub::v1::PubsubMessage; +use google_cloud_pubsub::client::{Client, ClientConfig}; +use google_cloud_pubsub::publisher::Publisher; +use google_cloud_pubsub::topic::Topic; +use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::catalog::Schema; +use risingwave_common::row::Row; +use risingwave_common::session_config::sink_decouple::SinkDecouple; +use serde_derive::Deserialize; +use serde_with::serde_as; +use tonic::Status; +use with_options::WithOptions; + +use super::catalog::desc::SinkDesc; +use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc}; +use super::encoder::{ + AvroEncoder, DateHandlingMode, JsonEncoder, RowEncoder, SerTo, TimeHandlingMode, + TimestampHandlingMode, TimestamptzHandlingMode, +}; +use super::log_store::DeliveryFutureManagerAddFuture; +use super::writer::{ + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, +}; +use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam}; + +pub const PUBSUB_SINK: &str = "google_pubsub"; + +#[serde_as] +#[derive(Clone, Debug, Deserialize, WithOptions)] +pub struct GooglePubSubConfig { + /// The Google Pub/Sub Project ID + pub project_id: String, + + /// Specifies the Pub/Sub topic to publish messages + pub topic: String, + + /// The Google Pub/Sub endpoint URL + pub endpoint: String, + + // accept "append-only" + pub r#type: String, + + /// use the connector with a pubsub emulator + /// + // #[serde(rename = "pubsub.emulator_host")] + pub emulator_host: Option, + + /// A JSON string containing the service account credentials for authorization, + /// see the [service-account](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account) credentials guide. + /// The provided account credential must have the + /// `pubsub.publisher` [role](https://cloud.google.com/pubsub/docs/access-control#roles) + pub credentials: Option, +} + +impl GooglePubSubConfig { + fn from_hashmap(values: HashMap) -> Result { + serde_json::from_value::( + serde_json::to_value(values).expect("impossible"), + ) + .map_err(|e| SinkError::Config(anyhow!(e))) + } + + pub(crate) fn initialize_env(&self) { + tracing::debug!("setting pubsub environment variables"); + if let Some(emulator_host) = &self.emulator_host { + std::env::set_var("PUBSUB_EMULATOR_HOST", emulator_host); + } + if let Some(credentials) = &self.credentials { + std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS_JSON", credentials); + } + } +} + +// === + +#[derive(Clone, Debug)] +pub struct GooglePubSubSink { + pub config: GooglePubSubConfig, + schema: Schema, + format_desc: SinkFormatDesc, + name: String, + is_append_only: bool, +} + +impl Sink for GooglePubSubSink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = AsyncTruncateLogSinkerOf; + + const SINK_NAME: &'static str = PUBSUB_SINK; + + fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + match user_specified { + SinkDecouple::Default => Ok(desc.sink_type.is_append_only()), + SinkDecouple::Disable => Ok(false), + SinkDecouple::Enable => Ok(true), + } + } + + async fn validate(&self) -> Result<()> { + if !self.is_append_only { + return Err(SinkError::Nats(anyhow!( + "Google Pub/Sub sink only support append-only mode" + ))); + } + if !matches!(self.format_desc.encode, SinkEncode::Json) { + return Err(SinkError::GooglePubSub(anyhow!( + "Google Pub/Sub sink only support `Json` sink encode" + ))); + } + + let conf = &self.config; + if matches!((&conf.emulator_host, &conf.credentials), (None, None)) { + return Err(SinkError::GooglePubSub(anyhow!( + "Configure at least one of `emulator_host` and `credentials` in the Google Pub/Sub sink" + ))); + } + + Ok(()) + } + + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { + Ok( + GooglePubSubSinkWriter::new( + self.config.clone(), + self.schema.clone(), + &self.format_desc, + ) + .await? + .into_log_sinker(usize::MAX), + ) + } +} + +impl TryFrom for GooglePubSubSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = GooglePubSubConfig::from_hashmap(param.properties)?; + Ok(Self { + config, + schema, + name: param.sink_name, + format_desc: param + .format_desc + .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?, + is_append_only: param.sink_type.is_append_only(), + }) + } +} + +// === + +struct GooglePubSubPayloadWriter { + topic: Topic, + publisher: Publisher, +} + +impl GooglePubSubSinkWriter { + pub async fn new( + config: GooglePubSubConfig, + schema: Schema, + format_desc: &SinkFormatDesc, + ) -> Result { + config.initialize_env(); + + let client_config = ClientConfig { + endpoint: config.endpoint, + project_id: Some(config.project_id), + ..Default::default() + } + .with_auth() + .await + .map_err(|e| SinkError::GooglePubSub(anyhow!(e)))?; + let client = Client::new(client_config) + .await + .map_err(|e| SinkError::GooglePubSub(anyhow!(e)))?; + + let topic = async { + let topic = client.topic(&config.topic); + if !topic.exists(None).await? { + topic.create(None, None).await?; + } + Ok(topic) + } + .await + .map_err(|e: Status| SinkError::GooglePubSub(anyhow!(e)))?; + + let timestamptz_mode = TimestamptzHandlingMode::from_options(&format_desc.options)?; + let encoder = match format_desc.format { + SinkFormat::AppendOnly => match format_desc.encode { + SinkEncode::Json => RowEncoderWrapper::Json(JsonEncoder::new( + schema, + None, + DateHandlingMode::FromCe, + TimestampHandlingMode::Milli, + timestamptz_mode, + TimeHandlingMode::Milli, + )), + // TODO: support append-only Avro + // note: update `CONNECTORS_COMPATIBLE_FORMATS` + // in src/frontend/src/handler/create_sink.rs + // SinkEncode::Avro => { }, + _ => { + return Err(SinkError::Config(anyhow!( + "Google Pub/Sub sink encode unsupported: {:?}", + format_desc.encode, + ))) + } + }, + _ => { + return Err(SinkError::Config(anyhow!( + "Google Pub/Sub sink only support append-only mode" + ))) + } + }; + + let publisher = topic.new_publisher(None); + let payload_writer = GooglePubSubPayloadWriter { topic, publisher }; + + Ok(Self { + payload_writer, + encoder, + }) + } +} + +pub struct GooglePubSubSinkWriter { + payload_writer: GooglePubSubPayloadWriter, + encoder: RowEncoderWrapper, +} + +impl AsyncTruncateSinkWriter for GooglePubSubSinkWriter { + async fn write_chunk<'a>( + &'a mut self, + chunk: StreamChunk, + _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + ) -> Result<()> { + self.payload_writer.write_chunk(chunk, &self.encoder).await + } +} + +impl GooglePubSubPayloadWriter { + async fn write_chunk(&mut self, chunk: StreamChunk, encoder: &RowEncoderWrapper) -> Result<()> { + for (op, row) in chunk.rows() { + if op != Op::Insert { + continue; + } + + let data = encoder.encode(row)?; + + let msg = PubsubMessage { + data, + ..Default::default() + }; + let awaiter = self.publisher.publish(msg).await; + _ = awaiter + .get() + .await + .context("Google Pub/Sub sink error") + .map_err(SinkError::GooglePubSub)?; + } + + Ok(()) + } +} + +// === + +pub enum RowEncoderWrapper { + Json(JsonEncoder), + Avro(AvroEncoder), +} + +impl RowEncoder for RowEncoderWrapper { + type Output = Vec; + + fn encode_cols( + &self, + row: impl Row, + col_indices: impl Iterator, + ) -> Result { + match self { + RowEncoderWrapper::Json(json) => json.encode_cols(row, col_indices)?.ser_to(), + RowEncoderWrapper::Avro(avro) => avro.encode_cols(row, col_indices)?.ser_to(), + } + } + + fn schema(&self) -> &Schema { + match self { + RowEncoderWrapper::Json(json) => json.schema(), + RowEncoderWrapper::Avro(avro) => avro.schema(), + } + } + + fn col_indices(&self) -> Option<&[usize]> { + match self { + RowEncoderWrapper::Json(json) => json.col_indices(), + RowEncoderWrapper::Avro(avro) => avro.col_indices(), + } + } + + fn encode(&self, row: impl Row) -> Result { + match self { + RowEncoderWrapper::Json(json) => json.encode(row)?.ser_to(), + RowEncoderWrapper::Avro(avro) => avro.encode(row)?.ser_to(), + } + } +} diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index c430b4303f1e9..c0eadf144b43d 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -23,6 +23,7 @@ pub mod doris_starrocks_connector; pub mod elasticsearch; pub mod encoder; pub mod formatter; +pub mod google_pubsub; pub mod iceberg; pub mod kafka; pub mod kinesis; @@ -86,6 +87,7 @@ macro_rules! for_all_sinks { { ClickHouse, $crate::sink::clickhouse::ClickHouseSink }, { Iceberg, $crate::sink::iceberg::IcebergSink }, { Mqtt, $crate::sink::mqtt::MqttSink }, + { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink }, { Nats, $crate::sink::nats::NatsSink }, { Jdbc, $crate::sink::remote::JdbcSink }, { ElasticSearch, $crate::sink::remote::ElasticSearchSink }, @@ -525,6 +527,12 @@ pub enum SinkError { #[backtrace] anyhow::Error, ), + #[error("Google Pub/Sub error: {0}")] + GooglePubSub( + #[source] + #[backtrace] + anyhow::Error, + ), #[error("Doris/Starrocks connect error: {0}")] DorisStarrocksConnect( #[source] diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index bed409de178f1..d384e30a83440 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -736,6 +736,7 @@ fn bind_sink_format_desc(value: ConnectorSchema) -> Result { static CONNECTORS_COMPATIBLE_FORMATS: LazyLock>>> = LazyLock::new(|| { + use risingwave_connector::sink::google_pubsub::GooglePubSubSink; use risingwave_connector::sink::kafka::KafkaSink; use risingwave_connector::sink::kinesis::KinesisSink; use risingwave_connector::sink::mqtt::MqttSink; @@ -744,6 +745,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock hashmap!( + Format::Plain => vec![Encode::Json], + ), KafkaSink::SINK_NAME => hashmap!( Format::Plain => vec![Encode::Json, Encode::Protobuf], Format::Upsert => vec![Encode::Json, Encode::Avro], @@ -763,8 +767,8 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock vec![Encode::Json], ), RedisSink::SINK_NAME => hashmap!( - Format::Plain => vec![Encode::Json,Encode::Template], - Format::Upsert => vec![Encode::Json,Encode::Template], + Format::Plain => vec![Encode::Json, Encode::Template], + Format::Upsert => vec![Encode::Json, Encode::Template], ), )) }); From 25272cd279d1e3342c08591c81266e9ef132d015 Mon Sep 17 00:00:00 2001 From: Jinser Kafka Date: Wed, 17 Apr 2024 17:23:51 +0800 Subject: [PATCH 02/10] feat(sink): make google_pubsub sink config more consistent --- src/connector/src/sink/google_pubsub.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/connector/src/sink/google_pubsub.rs b/src/connector/src/sink/google_pubsub.rs index 76e153bd353ab..ac3159b973e5f 100644 --- a/src/connector/src/sink/google_pubsub.rs +++ b/src/connector/src/sink/google_pubsub.rs @@ -47,27 +47,31 @@ pub const PUBSUB_SINK: &str = "google_pubsub"; #[derive(Clone, Debug, Deserialize, WithOptions)] pub struct GooglePubSubConfig { /// The Google Pub/Sub Project ID + #[serde(rename = "pubsub.project_id")] pub project_id: String, /// Specifies the Pub/Sub topic to publish messages + #[serde(rename = "pubsub.topic")] pub topic: String, /// The Google Pub/Sub endpoint URL + #[serde(rename = "pubsub.endpoint")] pub endpoint: String, - // accept "append-only" - pub r#type: String, - /// use the connector with a pubsub emulator /// - // #[serde(rename = "pubsub.emulator_host")] + #[serde(rename = "pubsub.emulator_host")] pub emulator_host: Option, /// A JSON string containing the service account credentials for authorization, /// see the [service-account](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account) credentials guide. /// The provided account credential must have the /// `pubsub.publisher` [role](https://cloud.google.com/pubsub/docs/access-control#roles) + #[serde(rename = "pubsub.credentials")] pub credentials: Option, + + // accept "append-only" + pub r#type: String, } impl GooglePubSubConfig { From 986a7a113c06902c20dace56f0f0c0486d1b4065 Mon Sep 17 00:00:00 2001 From: Jinser Kafka Date: Mon, 22 Apr 2024 08:03:44 +0800 Subject: [PATCH 03/10] test: update `with_options_sink` test for google pubsub sink --- src/connector/with_options_sink.yaml | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index e99be370aec86..aa6d83a143b3a 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -113,6 +113,31 @@ DorisConfig: - name: r#type field_type: String required: true +GooglePubSubConfig: + fields: + - name: pubsub.project_id + field_type: String + comments: The Google Pub/Sub Project ID + required: true + - name: pubsub.topic + field_type: String + comments: Specifies the Pub/Sub topic to publish messages + required: true + - name: pubsub.endpoint + field_type: String + comments: The Google Pub/Sub endpoint URL + required: true + - name: pubsub.emulator_host + field_type: String + comments: use the connector with a pubsub emulator + required: false + - name: pubsub.credentials + field_type: String + comments: A JSON string containing the service account credentials for authorization, see the [service-account](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account) credentials guide. The provided account credential must have the `pubsub.publisher` [role](https://cloud.google.com/pubsub/docs/access-control#roles) + required: false + - name: r#type + field_type: String + required: true IcebergConfig: fields: - name: connector From c02bfb03a29a6615d96329b1ff9a600614c320ac Mon Sep 17 00:00:00 2001 From: Jinser Kafka Date: Mon, 22 Apr 2024 21:19:01 +0800 Subject: [PATCH 04/10] feat(sink): use formatter for google pubsub sink --- src/connector/src/sink/google_pubsub.rs | 194 +++++++++--------------- 1 file changed, 73 insertions(+), 121 deletions(-) diff --git a/src/connector/src/sink/google_pubsub.rs b/src/connector/src/sink/google_pubsub.rs index ac3159b973e5f..bbe06e34aab49 100644 --- a/src/connector/src/sink/google_pubsub.rs +++ b/src/connector/src/sink/google_pubsub.rs @@ -19,10 +19,8 @@ use anyhow::{anyhow, Context}; use google_cloud_googleapis::pubsub::v1::PubsubMessage; use google_cloud_pubsub::client::{Client, ClientConfig}; use google_cloud_pubsub::publisher::Publisher; -use google_cloud_pubsub::topic::Topic; -use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use risingwave_common::row::Row; use risingwave_common::session_config::sink_decouple::SinkDecouple; use serde_derive::Deserialize; use serde_with::serde_as; @@ -30,16 +28,14 @@ use tonic::Status; use with_options::WithOptions; use super::catalog::desc::SinkDesc; -use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc}; -use super::encoder::{ - AvroEncoder, DateHandlingMode, JsonEncoder, RowEncoder, SerTo, TimeHandlingMode, - TimestampHandlingMode, TimestamptzHandlingMode, -}; +use super::catalog::{SinkEncode, SinkFormatDesc}; +use super::formatter::SinkFormatterImpl; use super::log_store::DeliveryFutureManagerAddFuture; use super::writer::{ - AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, + AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink, }; use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam}; +use crate::dispatch_sink_formatter_str_key_impl; pub const PUBSUB_SINK: &str = "google_pubsub"; @@ -98,10 +94,13 @@ impl GooglePubSubConfig { #[derive(Clone, Debug)] pub struct GooglePubSubSink { pub config: GooglePubSubConfig, + is_append_only: bool, + schema: Schema, + pk_indices: Vec, format_desc: SinkFormatDesc, - name: String, - is_append_only: bool, + db_name: String, + sink_from_name: String, } impl Sink for GooglePubSubSink { @@ -133,7 +132,7 @@ impl Sink for GooglePubSubSink { let conf = &self.config; if matches!((&conf.emulator_host, &conf.credentials), (None, None)) { return Err(SinkError::GooglePubSub(anyhow!( - "Configure at least one of `emulator_host` and `credentials` in the Google Pub/Sub sink" + "Configure at least one of `pubsub.emulator_host` and `pubsub.credentials` in the Google Pub/Sub sink" ))); } @@ -141,15 +140,16 @@ impl Sink for GooglePubSubSink { } async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { - Ok( - GooglePubSubSinkWriter::new( - self.config.clone(), - self.schema.clone(), - &self.format_desc, - ) - .await? - .into_log_sinker(usize::MAX), + Ok(GooglePubSubSinkWriter::new( + self.config.clone(), + self.schema.clone(), + self.pk_indices.clone(), + &self.format_desc, + self.db_name.clone(), + self.sink_from_name.clone(), ) + .await? + .into_log_sinker(usize::MAX)) } } @@ -159,14 +159,19 @@ impl TryFrom for GooglePubSubSink { fn try_from(param: SinkParam) -> std::result::Result { let schema = param.schema(); let config = GooglePubSubConfig::from_hashmap(param.properties)?; + + let format_desc = param + .format_desc + .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?; Ok(Self { config, - schema, - name: param.sink_name, - format_desc: param - .format_desc - .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?, is_append_only: param.sink_type.is_append_only(), + + schema, + pk_indices: param.downstream_pk, + format_desc, + db_name: param.db_name, + sink_from_name: param.sink_from_name, }) } } @@ -174,7 +179,6 @@ impl TryFrom for GooglePubSubSink { // === struct GooglePubSubPayloadWriter { - topic: Topic, publisher: Publisher, } @@ -182,7 +186,10 @@ impl GooglePubSubSinkWriter { pub async fn new( config: GooglePubSubConfig, schema: Schema, + pk_indices: Vec, format_desc: &SinkFormatDesc, + db_name: String, + sink_from_name: String, ) -> Result { config.initialize_env(); @@ -208,48 +215,29 @@ impl GooglePubSubSinkWriter { .await .map_err(|e: Status| SinkError::GooglePubSub(anyhow!(e)))?; - let timestamptz_mode = TimestamptzHandlingMode::from_options(&format_desc.options)?; - let encoder = match format_desc.format { - SinkFormat::AppendOnly => match format_desc.encode { - SinkEncode::Json => RowEncoderWrapper::Json(JsonEncoder::new( - schema, - None, - DateHandlingMode::FromCe, - TimestampHandlingMode::Milli, - timestamptz_mode, - TimeHandlingMode::Milli, - )), - // TODO: support append-only Avro - // note: update `CONNECTORS_COMPATIBLE_FORMATS` - // in src/frontend/src/handler/create_sink.rs - // SinkEncode::Avro => { }, - _ => { - return Err(SinkError::Config(anyhow!( - "Google Pub/Sub sink encode unsupported: {:?}", - format_desc.encode, - ))) - } - }, - _ => { - return Err(SinkError::Config(anyhow!( - "Google Pub/Sub sink only support append-only mode" - ))) - } - }; + let formatter = SinkFormatterImpl::new( + format_desc, + schema, + pk_indices, + db_name, + sink_from_name, + topic.fully_qualified_name(), + ) + .await?; let publisher = topic.new_publisher(None); - let payload_writer = GooglePubSubPayloadWriter { topic, publisher }; + let payload_writer = GooglePubSubPayloadWriter { publisher }; Ok(Self { payload_writer, - encoder, + formatter, }) } } pub struct GooglePubSubSinkWriter { payload_writer: GooglePubSubPayloadWriter, - encoder: RowEncoderWrapper, + formatter: SinkFormatterImpl, } impl AsyncTruncateSinkWriter for GooglePubSubSinkWriter { @@ -258,74 +246,38 @@ impl AsyncTruncateSinkWriter for GooglePubSubSinkWriter { chunk: StreamChunk, _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { - self.payload_writer.write_chunk(chunk, &self.encoder).await + dispatch_sink_formatter_str_key_impl!( + &self.formatter, + formatter, + self.payload_writer.write_chunk(chunk, formatter).await + ) } } -impl GooglePubSubPayloadWriter { - async fn write_chunk(&mut self, chunk: StreamChunk, encoder: &RowEncoderWrapper) -> Result<()> { - for (op, row) in chunk.rows() { - if op != Op::Insert { - continue; +impl FormattedSink for GooglePubSubPayloadWriter { + type K = String; + type V = Vec; + + async fn write_one(&mut self, k: Option, v: Option) -> Result<()> { + let ordering_key = k.unwrap_or_default(); + match v { + Some(data) => { + let msg = PubsubMessage { + data, + ordering_key, + ..Default::default() + }; + let awaiter = self.publisher.publish(msg).await; + awaiter + .get() + .await + .context("Google Pub/Sub sink error") + .map_err(SinkError::GooglePubSub) + .map(|_| ()) } - - let data = encoder.encode(row)?; - - let msg = PubsubMessage { - data, - ..Default::default() - }; - let awaiter = self.publisher.publish(msg).await; - _ = awaiter - .get() - .await - .context("Google Pub/Sub sink error") - .map_err(SinkError::GooglePubSub)?; - } - - Ok(()) - } -} - -// === - -pub enum RowEncoderWrapper { - Json(JsonEncoder), - Avro(AvroEncoder), -} - -impl RowEncoder for RowEncoderWrapper { - type Output = Vec; - - fn encode_cols( - &self, - row: impl Row, - col_indices: impl Iterator, - ) -> Result { - match self { - RowEncoderWrapper::Json(json) => json.encode_cols(row, col_indices)?.ser_to(), - RowEncoderWrapper::Avro(avro) => avro.encode_cols(row, col_indices)?.ser_to(), - } - } - - fn schema(&self) -> &Schema { - match self { - RowEncoderWrapper::Json(json) => json.schema(), - RowEncoderWrapper::Avro(avro) => avro.schema(), - } - } - - fn col_indices(&self) -> Option<&[usize]> { - match self { - RowEncoderWrapper::Json(json) => json.col_indices(), - RowEncoderWrapper::Avro(avro) => avro.col_indices(), - } - } - - fn encode(&self, row: impl Row) -> Result { - match self { - RowEncoderWrapper::Json(json) => json.encode(row)?.ser_to(), - RowEncoderWrapper::Avro(avro) => avro.encode(row)?.ser_to(), + None => Err(SinkError::GooglePubSub(anyhow!( + "Google Pub/Sub sink error: missing value to publish" + ))), } } } From aa853128ed56d5fe4fa54587b95061423a11710b Mon Sep 17 00:00:00 2001 From: Jinser Kafka Date: Tue, 23 Apr 2024 09:17:51 +0800 Subject: [PATCH 05/10] chore: remove unnecessary comments --- src/connector/src/sink/google_pubsub.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/connector/src/sink/google_pubsub.rs b/src/connector/src/sink/google_pubsub.rs index bbe06e34aab49..c71cdd4da6b44 100644 --- a/src/connector/src/sink/google_pubsub.rs +++ b/src/connector/src/sink/google_pubsub.rs @@ -89,8 +89,6 @@ impl GooglePubSubConfig { } } -// === - #[derive(Clone, Debug)] pub struct GooglePubSubSink { pub config: GooglePubSubConfig, @@ -176,8 +174,6 @@ impl TryFrom for GooglePubSubSink { } } -// === - struct GooglePubSubPayloadWriter { publisher: Publisher, } From 164a18bb4d29ef2d29e310d517d524343072654e Mon Sep 17 00:00:00 2001 From: Jinser Kafka Date: Mon, 29 Apr 2024 15:29:37 +0800 Subject: [PATCH 06/10] fix: change error type to GooglePubSub for Google Pub/Sub sink --- src/connector/src/sink/google_pubsub.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/sink/google_pubsub.rs b/src/connector/src/sink/google_pubsub.rs index c71cdd4da6b44..02862f8a05591 100644 --- a/src/connector/src/sink/google_pubsub.rs +++ b/src/connector/src/sink/google_pubsub.rs @@ -117,7 +117,7 @@ impl Sink for GooglePubSubSink { async fn validate(&self) -> Result<()> { if !self.is_append_only { - return Err(SinkError::Nats(anyhow!( + return Err(SinkError::GooglePubSub(anyhow!( "Google Pub/Sub sink only support append-only mode" ))); } From aeeb127789a5523b8812d6899f3260e6024d90fa Mon Sep 17 00:00:00 2001 From: Jinser Kafka Date: Mon, 29 Apr 2024 15:30:02 +0800 Subject: [PATCH 07/10] refactor: remove encode check for google pubsub sink because we have already checked the encode in `CONNECTORS_COMPATIBLE_FORMATS` (src/frontend/src/handler/create_sink.rs) --- src/connector/src/sink/google_pubsub.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/connector/src/sink/google_pubsub.rs b/src/connector/src/sink/google_pubsub.rs index 02862f8a05591..f22867bedda28 100644 --- a/src/connector/src/sink/google_pubsub.rs +++ b/src/connector/src/sink/google_pubsub.rs @@ -121,11 +121,6 @@ impl Sink for GooglePubSubSink { "Google Pub/Sub sink only support append-only mode" ))); } - if !matches!(self.format_desc.encode, SinkEncode::Json) { - return Err(SinkError::GooglePubSub(anyhow!( - "Google Pub/Sub sink only support `Json` sink encode" - ))); - } let conf = &self.config; if matches!((&conf.emulator_host, &conf.credentials), (None, None)) { From 1a4df2cd3f78189cf371afd8708fe57cb6bf9e45 Mon Sep 17 00:00:00 2001 From: Jinser Kafka Date: Mon, 29 Apr 2024 15:51:08 +0800 Subject: [PATCH 08/10] refactor: use unwrap instead of expect --- src/connector/src/sink/google_pubsub.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/connector/src/sink/google_pubsub.rs b/src/connector/src/sink/google_pubsub.rs index f22867bedda28..47c7fa1d06b9d 100644 --- a/src/connector/src/sink/google_pubsub.rs +++ b/src/connector/src/sink/google_pubsub.rs @@ -72,10 +72,8 @@ pub struct GooglePubSubConfig { impl GooglePubSubConfig { fn from_hashmap(values: HashMap) -> Result { - serde_json::from_value::( - serde_json::to_value(values).expect("impossible"), - ) - .map_err(|e| SinkError::Config(anyhow!(e))) + serde_json::from_value::(serde_json::to_value(values).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e))) } pub(crate) fn initialize_env(&self) { From 94896a432f3e5a9a70e609dc1624442aa26bec06 Mon Sep 17 00:00:00 2001 From: Jinser Kafka Date: Mon, 29 Apr 2024 17:29:41 +0800 Subject: [PATCH 09/10] refactor: use google-cloud-gax for authentication to avoid the need to set environment variables for authentication --- src/connector/src/sink/google_pubsub.rs | 53 +++++++++++++++++-------- 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/src/connector/src/sink/google_pubsub.rs b/src/connector/src/sink/google_pubsub.rs index 47c7fa1d06b9d..4518a27b15f5f 100644 --- a/src/connector/src/sink/google_pubsub.rs +++ b/src/connector/src/sink/google_pubsub.rs @@ -16,7 +16,12 @@ use std::collections::HashMap; use std::usize; use anyhow::{anyhow, Context}; +use google_cloud_gax::conn::Environment; use google_cloud_googleapis::pubsub::v1::PubsubMessage; +use google_cloud_pubsub::apiv1; +use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile; +use google_cloud_pubsub::client::google_cloud_auth::project; +use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider; use google_cloud_pubsub::client::{Client, ClientConfig}; use google_cloud_pubsub::publisher::Publisher; use risingwave_common::array::StreamChunk; @@ -28,7 +33,7 @@ use tonic::Status; use with_options::WithOptions; use super::catalog::desc::SinkDesc; -use super::catalog::{SinkEncode, SinkFormatDesc}; +use super::catalog::SinkFormatDesc; use super::formatter::SinkFormatterImpl; use super::log_store::DeliveryFutureManagerAddFuture; use super::writer::{ @@ -75,16 +80,6 @@ impl GooglePubSubConfig { serde_json::from_value::(serde_json::to_value(values).unwrap()) .map_err(|e| SinkError::Config(anyhow!(e))) } - - pub(crate) fn initialize_env(&self) { - tracing::debug!("setting pubsub environment variables"); - if let Some(emulator_host) = &self.emulator_host { - std::env::set_var("PUBSUB_EMULATOR_HOST", emulator_host); - } - if let Some(credentials) = &self.credentials { - std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS_JSON", credentials); - } - } } #[derive(Clone, Debug)] @@ -180,16 +175,42 @@ impl GooglePubSubSinkWriter { db_name: String, sink_from_name: String, ) -> Result { - config.initialize_env(); + let environment = if let Some(ref cred) = config.credentials { + let auth_config = project::Config { + audience: Some(apiv1::conn_pool::AUDIENCE), + scopes: Some(&apiv1::conn_pool::SCOPES), + sub: None, + }; + let cred_file = CredentialsFile::new_from_str(cred).await.map_err(|e| { + SinkError::GooglePubSub(anyhow!( + "Failed to create Google Cloud Pub/Sub credentials file: {}", + e + )) + })?; + let provider = + DefaultTokenSourceProvider::new_with_credentials(auth_config, Box::new(cred_file)) + .await + .map_err(|e| { + SinkError::GooglePubSub(anyhow!( + "Failed to create Google Cloud Pub/Sub token source provider: {}", + e + )) + })?; + Environment::GoogleCloud(Box::new(provider)) + } else if let Some(emu_host) = config.emulator_host { + Environment::Emulator(emu_host) + } else { + return Err(SinkError::GooglePubSub(anyhow!( + "Missing emulator_host or credentials in Google Pub/Sub sink" + ))); + }; let client_config = ClientConfig { endpoint: config.endpoint, project_id: Some(config.project_id), + environment, ..Default::default() - } - .with_auth() - .await - .map_err(|e| SinkError::GooglePubSub(anyhow!(e)))?; + }; let client = Client::new(client_config) .await .map_err(|e| SinkError::GooglePubSub(anyhow!(e)))?; From 00ca0335441b38379d909dbacbed536c222f2ade Mon Sep 17 00:00:00 2001 From: Jinser Kafka Date: Wed, 8 May 2024 03:52:22 +0800 Subject: [PATCH 10/10] refactor: use `context` to add context to errors --- src/connector/src/sink/google_pubsub.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/connector/src/sink/google_pubsub.rs b/src/connector/src/sink/google_pubsub.rs index 4518a27b15f5f..6fffbc50570fa 100644 --- a/src/connector/src/sink/google_pubsub.rs +++ b/src/connector/src/sink/google_pubsub.rs @@ -182,19 +182,19 @@ impl GooglePubSubSinkWriter { sub: None, }; let cred_file = CredentialsFile::new_from_str(cred).await.map_err(|e| { - SinkError::GooglePubSub(anyhow!( - "Failed to create Google Cloud Pub/Sub credentials file: {}", - e - )) + SinkError::GooglePubSub( + anyhow!(e).context("Failed to create Google Cloud Pub/Sub credentials file"), + ) })?; let provider = DefaultTokenSourceProvider::new_with_credentials(auth_config, Box::new(cred_file)) .await .map_err(|e| { - SinkError::GooglePubSub(anyhow!( - "Failed to create Google Cloud Pub/Sub token source provider: {}", - e - )) + SinkError::GooglePubSub( + anyhow!(e).context( + "Failed to create Google Cloud Pub/Sub token source provider", + ), + ) })?; Environment::GoogleCloud(Box::new(provider)) } else if let Some(emu_host) = config.emulator_host {