Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support async for pubsub and nats #17358

Merged
merged 7 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 37 additions & 20 deletions src/connector/src/sink/google_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
use std::collections::BTreeMap;

use anyhow::{anyhow, Context};
use futures::prelude::future::FutureExt;
use futures::prelude::TryFuture;
use google_cloud_gax::conn::Environment;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::apiv1;
use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
use google_cloud_pubsub::client::google_cloud_auth::project;
use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider;
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_pubsub::publisher::Publisher;
use google_cloud_pubsub::publisher::{Awaiter, Publisher};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
Expand All @@ -42,6 +44,16 @@ use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, Sink
use crate::dispatch_sink_formatter_str_key_impl;

pub const PUBSUB_SINK: &str = "google_pubsub";
const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for the random value.


fn may_delivery_future(awaiter: Awaiter) -> GooglePubSubSinkDeliveryFuture {
Box::pin(awaiter.get().map(|result| {
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
result
.context("Google Pub/Sub sink error")
.map_err(SinkError::GooglePubSub)
.map(|_| ())
}))
}

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
Expand Down Expand Up @@ -130,7 +142,7 @@ impl Sink for GooglePubSubSink {
self.sink_from_name.clone(),
)
.await?
.into_log_sinker(usize::MAX))
.into_log_sinker(PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE))
}
}

Expand All @@ -157,10 +169,14 @@ impl TryFrom<SinkParam> for GooglePubSubSink {
}
}

struct GooglePubSubPayloadWriter {
publisher: Publisher,
struct GooglePubSubPayloadWriter<'w> {
publisher: &'w mut Publisher,
add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>,
}

pub type GooglePubSubSinkDeliveryFuture =
impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

impl GooglePubSubSinkWriter {
pub async fn new(
config: GooglePubSubConfig,
Expand Down Expand Up @@ -231,35 +247,38 @@ impl GooglePubSubSinkWriter {
.await?;

let publisher = topic.new_publisher(None);
let payload_writer = GooglePubSubPayloadWriter { publisher };

Ok(Self {
payload_writer,
formatter,
publisher,
})
}
}

pub struct GooglePubSubSinkWriter {
payload_writer: GooglePubSubPayloadWriter,
formatter: SinkFormatterImpl,
publisher: Publisher,
}

impl AsyncTruncateSinkWriter for GooglePubSubSinkWriter {
type DeliveryFuture = GooglePubSubSinkDeliveryFuture;

async fn write_chunk<'a>(
&'a mut self,
chunk: StreamChunk,
_add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
dispatch_sink_formatter_str_key_impl!(
&self.formatter,
formatter,
self.payload_writer.write_chunk(chunk, formatter).await
)
dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
let mut payload_writer = GooglePubSubPayloadWriter {
publisher: &mut self.publisher,
add_future,
};
payload_writer.write_chunk(chunk, formatter).await
})
}
}

impl FormattedSink for GooglePubSubPayloadWriter {
impl<'w> FormattedSink for GooglePubSubPayloadWriter<'w> {
type K = String;
type V = Vec<u8>;

Expand All @@ -273,12 +292,10 @@ impl FormattedSink for GooglePubSubPayloadWriter {
..Default::default()
};
let awaiter = self.publisher.publish(msg).await;
awaiter
.get()
.await
.context("Google Pub/Sub sink error")
.map_err(SinkError::GooglePubSub)
.map(|_| ())
self.add_future
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
.add_future_may_await(may_delivery_future(awaiter))
.await?;
Ok(())
}
None => Err(SinkError::GooglePubSub(anyhow!(
"Google Pub/Sub sink error: missing value to publish"
Expand Down
57 changes: 34 additions & 23 deletions src/connector/src/sink/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use core::fmt::Debug;
use core::future::IntoFuture;
use std::collections::BTreeMap;

use anyhow::{anyhow, Context as _};
use async_nats::jetstream::context::Context;
use futures::prelude::TryFuture;
use futures::FutureExt;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
Expand All @@ -38,6 +41,7 @@ use crate::sink::writer::{
use crate::sink::{Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY};

pub const NATS_SINK: &str = "nats";
const NATS_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this value a random value? Or some max queue size value mentioned in nats doc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Referring to pulsar and KINESIS


#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
Expand All @@ -64,6 +68,8 @@ pub struct NatsSinkWriter {
json_encoder: JsonEncoder,
}

pub type NatsSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

/// Basic data types for use with the nats interface
impl NatsConfig {
pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
Expand Down Expand Up @@ -122,7 +128,7 @@ impl Sink for NatsSink {
Ok(
NatsSinkWriter::new(self.config.clone(), self.schema.clone())
.await?
.into_log_sinker(usize::MAX),
.into_log_sinker(NATS_SEND_FUTURE_BUFFER_MAX_SIZE),
)
}
}
Expand All @@ -148,34 +154,39 @@ impl NatsSinkWriter {
),
})
}

async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
Retry::spawn(
ExponentialBackoff::from_millis(100).map(jitter).take(3),
|| async {
let data = chunk_to_json(chunk.clone(), &self.json_encoder).unwrap();
for item in data {
self.context
.publish(self.config.common.subject.clone(), item.into())
.await
.context("nats sink error")
.map_err(SinkError::Nats)?;
}
Ok::<_, SinkError>(())
},
)
.await
.context("nats sink error")
.map_err(SinkError::Nats)
}
}

impl AsyncTruncateSinkWriter for NatsSinkWriter {
type DeliveryFuture = NatsSinkDeliveryFuture;

async fn write_chunk<'a>(
&'a mut self,
chunk: StreamChunk,
_add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
self.append_only(chunk).await
let mut data = chunk_to_json(chunk, &self.json_encoder).unwrap();
for item in &mut data {
let publish_ack_future = Retry::spawn(
ExponentialBackoff::from_millis(100).map(jitter).take(3),
|| async {
self.context
.publish(self.config.common.subject.clone(), item.clone().into())
.await
.context("nats sink error")
.map_err(SinkError::Nats)
},
)
.await
.context("nats sink error")
.map_err(SinkError::Nats)?;
let future = publish_ack_future.into_future().map(|result| {
result
.context("Google Pub/Sub sink error")
.map_err(SinkError::GooglePubSub)
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
.map(|_| ())
});
add_future.add_future_may_await(future).await?;
}
Ok(())
}
}
Loading