Skip to content

Commit

Permalink
Merge branch 'main' into refactor/actor-aware-only-barrier
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Jan 16, 2024
2 parents d9bea59 + 1a70c3a commit 24669d3
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 45 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ tonic = { package = "madsim-tonic", version = "0.4.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "58c1f003484449d7c6dd693b348bf19dd44889cb" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "b4f4ca3c6d29092bd331925ead0bcceaa38bdd57", features = [
icelake = { git = "https://github.com/icelake-io/icelake", rev = "32c0bbf242f5c47b1e743f10577012fe7436c770", features = [
"prometheus",
] }
arrow-array = "49"
Expand Down
22 changes: 4 additions & 18 deletions e2e_test/source/basic/pubsub.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@
statement error
CREATE TABLE s1 (v1 int, v2 varchar) WITH (
pubsub.subscription = 'test-subscription-1',
pubsub.emulator_host = 'localhost:5981',
pubsub.split_count = 3
pubsub.emulator_host = 'invalid_host:5981'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE s1 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-1',
pubsub.emulator_host = 'localhost:5980',
pubsub.split_count = 3
pubsub.emulator_host = 'localhost:5980'
) FORMAT PLAIN ENCODE JSON;

statement ok
Expand All @@ -25,25 +23,14 @@ statement error
CREATE TABLE s2 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-not-2',
pubsub.emulator_host = 'localhost:5980',
pubsub.split_count = 3
pubsub.emulator_host = 'localhost:5980'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE s2 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-2',
pubsub.emulator_host = 'localhost:5980',
pubsub.split_count = 3
) FORMAT PLAIN ENCODE JSON;

# fail with invalid split count
statement error
CREATE TABLE s3 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-3',
pubsub.emulator_host = 'localhost:5980',
pubsub.split_count = 0
pubsub.emulator_host = 'localhost:5980'
) FORMAT PLAIN ENCODE JSON;

# fail if both start_offset and start_snapshot are provided
Expand All @@ -52,7 +39,6 @@ CREATE TABLE s3 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-3',
pubsub.emulator_host = 'localhost:5980',
pubsub.split_count = 2,
pubsub.start_offset = "121212",
pubsub.start_snapshot = "snapshot-that-doesnt-exist"
) FORMAT PLAIN ENCODE JSON;
Expand Down
9 changes: 2 additions & 7 deletions src/connector/src/source/google_pubsub/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,8 @@ impl SplitEnumerator for PubsubSplitEnumerator {
properties: Self::Properties,
_context: SourceEnumeratorContextRef,
) -> anyhow::Result<PubsubSplitEnumerator> {
let split_count = properties.split_count;
let subscription = properties.subscription.to_owned();

if split_count < 1 {
bail!("split_count must be >= 1")
}

if properties.credentials.is_none() && properties.emulator_host.is_none() {
bail!("credentials must be set if not using the pubsub emulator")
}
Expand Down Expand Up @@ -87,7 +82,7 @@ impl SplitEnumerator for PubsubSplitEnumerator {
}
(None, Some(snapshot)) => Some(SeekTo::Snapshot(snapshot)),
(Some(_), Some(_)) => {
bail!("specify atmost one of start_offset or start_snapshot")
bail!("specify at most one of start_offset or start_snapshot")
}
};

Expand All @@ -99,7 +94,7 @@ impl SplitEnumerator for PubsubSplitEnumerator {

Ok(Self {
subscription,
split_count,
split_count: 1,
})
}

Expand Down
13 changes: 3 additions & 10 deletions src/connector/src/source/google_pubsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub mod source;
pub mod split;

pub use enumerator::*;
use serde_with::{serde_as, DisplayFromStr};
pub use source::*;
pub use split::*;
use with_options::WithOptions;
Expand All @@ -30,13 +29,8 @@ use crate::source::SourceProperties;

pub const GOOGLE_PUBSUB_CONNECTOR: &str = "google_pubsub";

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct PubsubProperties {
#[serde_as(as = "DisplayFromStr")]
#[serde(rename = "pubsub.split_count")]
pub split_count: u32,

/// pubsub subscription to consume messages from
/// The subscription should be configured with the `retain-on-ack` property to enable
/// message recovery within risingwave.
Expand All @@ -48,19 +42,19 @@ pub struct PubsubProperties {
#[serde(rename = "pubsub.emulator_host")]
pub emulator_host: Option<String>,

/// credentials JSON object encoded with base64
/// `credentials` is a JSON string containing the service account credentials.
/// See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account).
/// The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles).
#[serde(rename = "pubsub.credentials")]
pub credentials: Option<String>,

/// `start_offset` is a numeric timestamp, ideallly the publish timestamp of a message
/// `start_offset` is a numeric timestamp, ideally the publish timestamp of a message
/// in the subscription. If present, the connector will attempt to seek the subscription
/// to the timestamp and start consuming from there. Note that the seek operation is
/// subject to limitations around the message retention policy of the subscription. See
/// [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for
/// more details.
#[serde(rename = "pubsub.start_offset")]
#[serde(rename = "pubsub.start_offset.nanos")]
pub start_offset: Option<String>,

/// `start_snapshot` is a named pub/sub snapshot. If present, the connector will first seek
Expand Down Expand Up @@ -127,7 +121,6 @@ mod tests {
let default_properties = PubsubProperties {
credentials: None,
emulator_host: None,
split_count: 1,
start_offset: None,
start_snapshot: None,
subscription: String::from("test-subscription"),
Expand Down
9 changes: 3 additions & 6 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,6 @@ PosixFsProperties:
default: Default::default
PubsubProperties:
fields:
- name: pubsub.split_count
field_type: u32
required: true
- name: pubsub.subscription
field_type: String
comments: pubsub subscription to consume messages from The subscription should be configured with the `retain-on-ack` property to enable message recovery within risingwave.
Expand All @@ -484,11 +481,11 @@ PubsubProperties:
required: false
- name: pubsub.credentials
field_type: String
comments: credentials JSON object encoded with base64 See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account). The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles).
comments: '`credentials` is a JSON string containing the service account credentials. See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account). The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles).'
required: false
- name: pubsub.start_offset
- name: pubsub.start_offset.nanos
field_type: String
comments: '`start_offset` is a numeric timestamp, ideallly the publish timestamp of a message in the subscription. If present, the connector will attempt to seek the subscription to the timestamp and start consuming from there. Note that the seek operation is subject to limitations around the message retention policy of the subscription. See [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.'
comments: '`start_offset` is a numeric timestamp, ideally the publish timestamp of a message in the subscription. If present, the connector will attempt to seek the subscription to the timestamp and start consuming from there. Note that the seek operation is subject to limitations around the message retention policy of the subscription. See [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.'
required: false
- name: pubsub.start_snapshot
field_type: String
Expand Down

0 comments on commit 24669d3

Please sign in to comment.