Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): add Google Pub/Sub support #16363

Merged
merged 10 commits into from
May 8, 2024
Merged

Conversation

jetjinser
Copy link
Contributor

@jetjinser jetjinser commented Apr 17, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Add new sink Goolge Pub/Sub with only support Plain => Json in append-only mode.

e.g.

CREATE TABLE IF NOT EXISTS personnel (id integer, name varchar);

CREATE SINK ggps_sink
FROM
  personnel
WITH
(
    connector = 'google_pubsub',
    endpoint = 'localhost:8900',
    emulator_host = 'localhost:8900',
    project_id = 'demo',
    topic = 'test',
    type = 'append-only',
) FORMAT PLAIN ENCODE JSON (
    force_append_only='true',
);

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

Support new sink google_pubsub.

@jetjinser jetjinser added the user-facing-changes Contains changes that are visible to users label Apr 19, 2024
@jetjinser jetjinser marked this pull request as ready for review April 19, 2024 08:46
@jetjinser jetjinser requested a review from a team as a code owner April 19, 2024 08:46
Comment on lines 217 to 220
DateHandlingMode::FromCe,
TimestampHandlingMode::Milli,
timestamptz_mode,
TimeHandlingMode::Milli,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use their string form by default? It is quite confusing to see "date_of_birth": 730119, "daily_meeting_time": 39600000.

Or another question, why not reuse SinkFormatterImpl::new?

@jetjinser jetjinser force-pushed the jinser/google-pubsub-sink branch from a0fe7dc to bd7907f Compare April 19, 2024 09:07
@jetjinser jetjinser requested a review from xiangjinwu April 23, 2024 00:40
@neverchanje neverchanje linked an issue Apr 23, 2024 that may be closed by this pull request
@neverchanje neverchanje requested a review from tabVersion April 24, 2024 09:30
@neverchanje
Copy link
Contributor

@tabVersion Please take a look. I hope that this can be included in the 1.9 version.

@stdrc stdrc self-requested a review April 29, 2024 05:28
impl GooglePubSubConfig {
fn from_hashmap(values: HashMap<String, String>) -> Result<Self> {
serde_json::from_value::<GooglePubSubConfig>(
serde_json::to_value(values).expect("impossible"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"impossible" is not a good expect reason. May just use unwrap(), no much difference.

Comment on lines 81 to 82
pub(crate) fn initialize_env(&self) {
tracing::debug!("setting pubsub environment variables");
if let Some(emulator_host) = &self.emulator_host {
std::env::set_var("PUBSUB_EMULATOR_HOST", emulator_host);
}
if let Some(credentials) = &self.credentials {
std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS_JSON", credentials);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if first create a pubsub sink with PUBSUB_EMULATOR_HOST and then create another without PUBSUB_EMULATOR_HOST?

I guess we should clean existing such env vars before setting them.

Btw, could you plz add some comment to explain why we set these env vars here? Any doc links?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that this part is copied from source/google_pubsub/mod.rs. The source and sink code can also conflict. You can fix the original code together.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can also try set the emulator host and credentials via ClientConfig instead of env var, to eliminate the possibility of env var conflict.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, it is quite strange that we set env variable for a sink job. Can you share more on why we need this ENV?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to set the environment in ClientConfig, we need to introduce another direct dependency (now indirect): https://docs.rs/google-cloud-gax/0.17.0/google_cloud_gax/conn/enum.Environment.html. google-cloud-pubsub dont re-export it (still) :|
If it's ok, I'll set it up via this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem.

Comment on lines +61 to 64
google-cloud-googleapis = { version = "0.12", features = ["pubsub"] }
google-cloud-pubsub = "0.24"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any overlap between the deps? Shall we just keep one of them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, google-cloud-pubsub depends on google-cloud-googleapis but does not re-export, which is required for publishing messages.

Comment on lines 81 to 82
pub(crate) fn initialize_env(&self) {
tracing::debug!("setting pubsub environment variables");
if let Some(emulator_host) = &self.emulator_host {
std::env::set_var("PUBSUB_EMULATOR_HOST", emulator_host);
}
if let Some(credentials) = &self.credentials {
std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS_JSON", credentials);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, it is quite strange that we set env variable for a sink job. Can you share more on why we need this ENV?

Comment on lines 124 to 116
if !matches!(self.format_desc.encode, SinkEncode::Json) {
return Err(SinkError::GooglePubSub(anyhow!(
"Google Pub/Sub sink only support `Json` sink encode"
)));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please impl this in CONNECTORS_COMPATIBLE_FORMATS (src/frontend/src/handler/create_sink.rs)


async fn validate(&self) -> Result<()> {
if !self.is_append_only {
return Err(SinkError::Nats(anyhow!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why NatsError?

@jetjinser jetjinser force-pushed the jinser/google-pubsub-sink branch from 14aeb03 to 94896a4 Compare April 29, 2024 13:35
Copy link
Member

@stdrc stdrc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jetjinser jetjinser added this pull request to the merge queue May 7, 2024
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks May 7, 2024
@jetjinser jetjinser added this pull request to the merge queue May 7, 2024
Merged via the queue into main with commit a4adc5e May 8, 2024
27 of 28 checks passed
@jetjinser jetjinser deleted the jinser/google-pubsub-sink branch May 8, 2024 00:30
@WanYixian
Copy link
Contributor

@tabVersion Please take a look. I hope that this can be included in the 1.9 version.

Is this included in 1.9 now? Seems not in the comparing changes.

@lmatz @neverchanje @jetjinser PTAL, thanks!

@neverchanje
Copy link
Contributor

neverchanje commented May 22, 2024

@WanYixian It's not cherrypicked in v1.9. I'll let users try out with the nightly version.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature user-facing-changes Contains changes that are visible to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Sink to BigQuery with google pubsub sink
6 participants