-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): add Google Pub/Sub support #16363
Conversation
DateHandlingMode::FromCe, | ||
TimestampHandlingMode::Milli, | ||
timestamptz_mode, | ||
TimeHandlingMode::Milli, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use their string form by default? It is quite confusing to see "date_of_birth": 730119, "daily_meeting_time": 39600000
.
Or another question, why not reuse SinkFormatterImpl::new
?
a0fe7dc
to
bd7907f
Compare
@tabVersion Please take a look. I hope that this can be included in the 1.9 version. |
impl GooglePubSubConfig { | ||
fn from_hashmap(values: HashMap<String, String>) -> Result<Self> { | ||
serde_json::from_value::<GooglePubSubConfig>( | ||
serde_json::to_value(values).expect("impossible"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"impossible"
is not a good expect reason. May just use unwrap()
, no much difference.
pub(crate) fn initialize_env(&self) { | ||
tracing::debug!("setting pubsub environment variables"); | ||
if let Some(emulator_host) = &self.emulator_host { | ||
std::env::set_var("PUBSUB_EMULATOR_HOST", emulator_host); | ||
} | ||
if let Some(credentials) = &self.credentials { | ||
std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS_JSON", credentials); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if first create a pubsub sink with PUBSUB_EMULATOR_HOST
and then create another without PUBSUB_EMULATOR_HOST
?
I guess we should clean existing such env vars before setting them.
Btw, could you plz add some comment to explain why we set these env vars here? Any doc links?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that this part is copied from source/google_pubsub/mod.rs
. The source and sink code can also conflict. You can fix the original code together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can also try set the emulator host and credentials via ClientConfig
instead of env var, to eliminate the possibility of env var conflict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, it is quite strange that we set env variable for a sink job. Can you share more on why we need this ENV?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to set the environment in ClientConfig
, we need to introduce another direct dependency (now indirect): https://docs.rs/google-cloud-gax/0.17.0/google_cloud_gax/conn/enum.Environment.html. google-cloud-pubsub
dont re-export it (still) :|
If it's ok, I'll set it up via this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem.
google-cloud-googleapis = { version = "0.12", features = ["pubsub"] } | ||
google-cloud-pubsub = "0.24" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any overlap between the deps? Shall we just keep one of them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, google-cloud-pubsub
depends on google-cloud-googleapis
but does not re-export, which is required for publishing messages.
pub(crate) fn initialize_env(&self) { | ||
tracing::debug!("setting pubsub environment variables"); | ||
if let Some(emulator_host) = &self.emulator_host { | ||
std::env::set_var("PUBSUB_EMULATOR_HOST", emulator_host); | ||
} | ||
if let Some(credentials) = &self.credentials { | ||
std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS_JSON", credentials); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, it is quite strange that we set env variable for a sink job. Can you share more on why we need this ENV?
if !matches!(self.format_desc.encode, SinkEncode::Json) { | ||
return Err(SinkError::GooglePubSub(anyhow!( | ||
"Google Pub/Sub sink only support `Json` sink encode" | ||
))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please impl this in CONNECTORS_COMPATIBLE_FORMATS
(src/frontend/src/handler/create_sink.rs
)
|
||
async fn validate(&self) -> Result<()> { | ||
if !self.is_append_only { | ||
return Err(SinkError::Nats(anyhow!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why NatsError
?
support append-only mode and JSON encoding
because we have already checked the encode in `CONNECTORS_COMPATIBLE_FORMATS` (src/frontend/src/handler/create_sink.rs)
to avoid the need to set environment variables for authentication
14aeb03
to
94896a4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Is this included in 1.9 now? Seems not in the comparing changes. @lmatz @neverchanje @jetjinser PTAL, thanks! |
@WanYixian It's not cherrypicked in v1.9. I'll let users try out with the nightly version. |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Add new sink Goolge Pub/Sub with only support
Plain => Json
in append-only mode.e.g.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
Support new sink
google_pubsub
.