Skip to content

Commit

Permalink
Merge branch 'main' into yiming/dag-uploader
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jun 26, 2024
2 parents b0aad6d + 4e22820 commit 372dfc4
Show file tree
Hide file tree
Showing 34 changed files with 1,015 additions and 211 deletions.
135 changes: 133 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ arrow-array-iceberg = { package = "arrow-array", version = "52" }
arrow-schema-iceberg = { package = "arrow-schema", version = "52" }
arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" }
arrow-cast-iceberg = { package = "arrow-cast", version = "52" }
# TODO
# After apache/iceberg-rust#411 is merged, we move to the upstream version.
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "0c6e133e6f4655ff9ce4ad57b577dc7f692dd902" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "0c6e133e6f4655ff9ce4ad57b577dc7f692dd902" }
arrow-array = "50"
arrow-arith = "50"
arrow-cast = "50"
Expand Down
1 change: 1 addition & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ message InjectBarrierRequest {
stream_plan.Barrier barrier = 2;
repeated uint32 actor_ids_to_send = 3;
repeated uint32 actor_ids_to_collect = 4;
repeated uint32 table_ids_to_sync = 5;
}

message BarrierCompleteResponse {
Expand Down
4 changes: 4 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ google-cloud-gax = "0.17.0"
google-cloud-googleapis = { version = "0.13", features = ["pubsub", "bigquery"] }
google-cloud-pubsub = "0.25"
http = "0.2"
iceberg = { workspace = true }
iceberg-catalog-rest = { workspace = true }
icelake = { workspace = true }
indexmap = { version = "2.2.6", features = ["serde"] }
itertools = { workspace = true }
Expand All @@ -90,6 +92,7 @@ opendal = { version = "0.47", features = [
] }
openssl = "0.10"
parking_lot = { workspace = true }
parquet = { workspace = true }
paste = "1"
pg_bigdecimal = { git = "https://github.com/risingwavelabs/rust-pg_bigdecimal", rev = "0b7893d88894ca082b4525f94f812da034486f7c" }
postgres-openssl = "0.5.0"
Expand Down Expand Up @@ -160,6 +163,7 @@ tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["codec", "io"] }
tonic = { workspace = true }
tracing = "0.1"
typed-builder = "^0.18"
url = "2"
urlencoding = "2"
uuid = { version = "1", features = ["v4", "fast-rng"] }
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def_anyhow_newtype! {
async_nats::jetstream::context::CreateStreamError => "Nats error",
async_nats::jetstream::stream::ConsumerError => "Nats error",
icelake::Error => "Iceberg error",
iceberg::Error => "IcebergV2 error",
redis::RedisError => "Redis error",
arrow_schema::ArrowError => "Arrow error",
arrow_schema_iceberg::ArrowError => "Arrow error",
Expand Down
76 changes: 55 additions & 21 deletions src/connector/src/sink/google_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
use std::collections::BTreeMap;

use anyhow::{anyhow, Context};
use futures::future::try_join_all;
use futures::prelude::future::FutureExt;
use futures::prelude::TryFuture;
use futures::TryFutureExt;
use google_cloud_gax::conn::Environment;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::apiv1;
use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
use google_cloud_pubsub::client::google_cloud_auth::project;
use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider;
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_pubsub::publisher::Publisher;
use google_cloud_pubsub::publisher::{Awaiter, Publisher};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
Expand All @@ -42,6 +46,20 @@ use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, Sink
use crate::dispatch_sink_formatter_str_key_impl;

pub const PUBSUB_SINK: &str = "google_pubsub";
const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;

fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
try_join_all(awaiter.into_iter().map(|awaiter| {
awaiter.get().map(|result| {
result
.context("Google Pub/Sub sink error")
.map_err(SinkError::GooglePubSub)
.map(|_| ())
})
}))
.map_ok(|_: Vec<()>| ())
.boxed()
}

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
Expand Down Expand Up @@ -130,7 +148,7 @@ impl Sink for GooglePubSubSink {
self.sink_from_name.clone(),
)
.await?
.into_log_sinker(usize::MAX))
.into_log_sinker(PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE))
}
}

Expand All @@ -157,10 +175,15 @@ impl TryFrom<SinkParam> for GooglePubSubSink {
}
}

struct GooglePubSubPayloadWriter {
publisher: Publisher,
struct GooglePubSubPayloadWriter<'w> {
publisher: &'w mut Publisher,
message_vec: Vec<PubsubMessage>,
add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>,
}

pub type GooglePubSubSinkDeliveryFuture =
impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

impl GooglePubSubSinkWriter {
pub async fn new(
config: GooglePubSubConfig,
Expand Down Expand Up @@ -231,35 +254,51 @@ impl GooglePubSubSinkWriter {
.await?;

let publisher = topic.new_publisher(None);
let payload_writer = GooglePubSubPayloadWriter { publisher };

Ok(Self {
payload_writer,
formatter,
publisher,
})
}
}

pub struct GooglePubSubSinkWriter {
payload_writer: GooglePubSubPayloadWriter,
formatter: SinkFormatterImpl,
publisher: Publisher,
}

impl AsyncTruncateSinkWriter for GooglePubSubSinkWriter {
type DeliveryFuture = GooglePubSubSinkDeliveryFuture;

async fn write_chunk<'a>(
&'a mut self,
chunk: StreamChunk,
_add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
dispatch_sink_formatter_str_key_impl!(
&self.formatter,
formatter,
self.payload_writer.write_chunk(chunk, formatter).await
)
let mut payload_writer = GooglePubSubPayloadWriter {
publisher: &mut self.publisher,
message_vec: Vec::with_capacity(chunk.cardinality()),
add_future,
};
dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
payload_writer.write_chunk(chunk, formatter).await
})?;
payload_writer.finish().await
}
}

impl FormattedSink for GooglePubSubPayloadWriter {
impl<'w> GooglePubSubPayloadWriter<'w> {
pub async fn finish(&mut self) -> Result<()> {
let message_vec = std::mem::take(&mut self.message_vec);
let awaiters = self.publisher.publish_bulk(message_vec).await;
self.add_future
.add_future_may_await(may_delivery_future(awaiters))
.await?;
Ok(())
}
}

impl<'w> FormattedSink for GooglePubSubPayloadWriter<'w> {
type K = String;
type V = Vec<u8>;

Expand All @@ -272,13 +311,8 @@ impl FormattedSink for GooglePubSubPayloadWriter {
ordering_key,
..Default::default()
};
let awaiter = self.publisher.publish(msg).await;
awaiter
.get()
.await
.context("Google Pub/Sub sink error")
.map_err(SinkError::GooglePubSub)
.map(|_| ())
self.message_vec.push(msg);
Ok(())
}
None => Err(SinkError::GooglePubSub(anyhow!(
"Google Pub/Sub sink error: missing value to publish"
Expand Down
Loading

0 comments on commit 372dfc4

Please sign in to comment.