Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Connection for Kafka source & sink #19270

Merged
merged 70 commits into from
Nov 28, 2024
Merged

feat: Connection for Kafka source & sink #19270

merged 70 commits into from
Nov 28, 2024

Conversation

tabVersion
Copy link
Contributor

@tabVersion tabVersion commented Nov 5, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

following #18975

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

introducing a new catalog CONNECTION. and have integrated with SECRET

please note that the legacy create connection to AWS PrivateLink has been deprecated in #18975.

new syntax

CREATE CONNECTION [ IF NOT EXISTS ] <connection name> with (
  type = 'kafka' / 'iceberg'
  some_atter_1 = secret <secret name>,
  some_attr_2 = '...'
);

planned support for Kafka, iceberg (by @chenzl25 ) and FS (@wcy-fdu ) at the first stage.


when creating source/sink from a connection, connector must match with the connection type & the ref key must be connection

create source s ( ... ) with (
  connector = 'kafka', # <- match with connection type
  connection = <connection name> # <- the key must be `profile` and use `connection` to mark ref to connection catalog. 
) format ... encode ...;

and the attrs defined in connection and source/table/sink cannot have overlap

create connection conn with ( type = 'kafka', a = 'a', b = 'b' );

# reject as overlap key `a` and `b`
create source s with ( connector = 'kafka', a = '1', 'b' = '2' , connection = conn ) format ... encode ... ;
  • one more thing
    • we still allow creating source/table/sink without connection and the syntax remains unchanged.

to perform connection validate, we need a new kafka ACL: DESCRIBE CLUSTER (the privilege is auth via username and password, not related with consumer group. )


Connection stores KV in the catalog and validation only takes a copy.
When building source/sink/table, we first fill the KVs in connection catalog to the with options and then start the create procedure.

accepted kafka connection props

(connection related)

  • properties.bootstrap.server (only required)
  • properties.security.protocol
  • properties.ssl.endpoint.identification.algorithm
  • properties.ssl.ca.location
  • properties.ssl.ca.pem
  • properties.ssl.certificate.location
  • properties.ssl.certificate.pem
  • properties.ssl.key.location
  • properties.ssl.key.pem
  • properties.ssl.key.password
  • properties.sasl.mechanism
  • properties.sasl.username
  • properties.sasl.password
  • properties.sasl.kerberos.service.name
  • properties.sasl.kerberos.keytab
  • properties.sasl.kerberos.principal
  • properties.sasl.kerberos.kinit.cmd
  • properties.sasl.kerberos.min.time.before.relogin
  • properties.sasl.oauthbearer.config

(private link related)

  • privatelink.targets
  • privatelink.endpoint

handle_create_connection will do the private link resolve and remove both privatelink.targets and privatelink.endpoint and insert the broker.rewrite.endpoints to the props.

so if users specify privatelink.targets and privatelink.endpoint in connection, they cannot set it again when create source/table/sink.

(aws auth related: for msk)

  • aws.region
  • endpoint
  • aws.credentials.access_key_id
  • aws.credentials.secret_access_key
  • aws.credentials.session_token
  • aws.credentials.role.arn
  • aws.credentials.role.external_id

tabVersion and others added 30 commits October 22, 2024 21:37
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
fix
Signed-off-by: tabversion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
e2e_test/source_inline/connection/ddl.slt Outdated Show resolved Hide resolved
src/connector/src/connector_common/connection_util.rs Outdated Show resolved Hide resolved
config.create_with_context(client_ctx).await?;
if self.inner.is_aws_msk_iam() {
#[cfg(not(madsim))]
client.poll(Duration::from_secs(10)); // note: this is a blocking call
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps the madsim needs to be updated?

src/connector/src/connector_common/connection_util.rs Outdated Show resolved Hide resolved
@@ -226,6 +236,19 @@ message Subscription {
SubscriptionState subscription_state = 19;
}

message ConnectionParams {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of a weak-typed implementation, as the properties are kept as a map<string,string> here.

On the contrary, we may optionally make it strong-typed by defining connection's fields as a message (such as message KafkaConnection), and meanwhile, the corresponding struct (such as struct KafkaConnectionInner) would be substituted.

I tend to prefer the latter one, but I am not sure how to handle secrets. Comments are welcomed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this idea before.
If we are going with a concrete type here, we have to accept a more complex impl when handling create source/sink/table, ie. we have to know an attr comes from either with-clause or connection catalog and make sure it won't collide. Besides, secret ref is a problem.

Current impl overcomes the prior problem by merging everything into hashmap and keeps the original path to create source/sink/table unchanged.

@@ -246,6 +269,7 @@ message Connection {
string name = 4;
oneof info {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to keep the oneof now. May remove it to improve readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are still using the proto in meta backup. Removing the oneof might break compatibility.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, then let's place connection_params outside the oneof at least

@@ -132,6 +133,13 @@ pub enum MetaErrorInner {

#[error("{0} has been deprecated, please use {1} instead.")]
Deprecated(String, String),

#[error("Secret error: {0}")]
SecretError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will validate the connection on meta, which needs to fill in the secrets. So introducing Secret Error here for handling possible failure.

@@ -156,6 +161,9 @@ message SinkFormatDesc {
optional plan_common.EncodeType key_encode = 4;
// Secret used for format encode options.
map<string, secret.SecretRef> secret_refs = 5;

// ref connection for schema registry
optional uint32 connection_id = 6;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be added to SinkParam & SinkDesc instead, side-by-side with the Sink's properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me clear the logic for the related proto defs

  • Sink (proto/catalog.proto): def of sink in catalog for all sink, we will read props and resolve connection dep based on this message. And the message contains SinkFormatDesc. We already have connection.id field here.
    • We do solve connection ref when creating sink, so the props in Sink.properties contains the ones from connection catalog.
  • SinkFormatDesc (proto/catalog.proto): specifying format ... encode ... options (and contained by sink catalog). We ref a connection here for schema registry.
  • SinkDesc (proto/stream_plan.proto): It defines what field we dispatch to CNs. But we have already resolved the props from connection def in meta so there is no need to change things here.
  • SinkParam (proto/connector_service.proto): seems to define the transport protocol between kernel and connector node (most ref in Java). And ditto, the step is behind the meta node so no change is needed.

Copy link
Member

@fuyufjh fuyufjh Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do solve connection ref when creating sink, so the props in Sink.properties contains the ones from connection catalog.

I see. So here we expand all the connections into concrete parameters on creation, right? This is okay to me, but in this way, I think connection_id here in SinkFormatDesc should not appear as well, because all connection-related things are already eliminated on creation.

(It seems SinkFormatDesc::connection_id was indeed not used anywhere?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the source side, the connector part and schema registry part will have independent connection, just like individual secret_refs.

we are offering syntax like

create source s(...) with (
  connector = 'kafka',
  connection = <connector connection>, ...
) format ... encode ... (
  connection = <schema registry connection / glue connection>
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I know. What I mean is: should it be resolved and eliminated before converting into a proto message?

We do solve connection ref when creating sink, so the props in Sink.properties contains the ones from connection catalog.

Copy link
Contributor Author

@tabVersion tabVersion Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I know. What I mean is: should it be resolved and eliminated before converting into a proto message?

We resolve the props in the frontend and the catalog is persisted in the meta. We have to reserve a field to keep the connection_id to maintain the dependency. And to keep the design simple and align with the secret ref, I think adding connection.id for each connector and schema registry is the best practice.

& We are going to support alter connection and apply the changes to all related source/sink. Eliminating the connection.id in the catalog makes the step harder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. You keep both connection_id and the resolved connection arguments in these message structures, right? It's acceptable to me but a bit counter-intuitive.

And to keep the design simple and align with the secret ref,

IIRC, secret ref doesn't keep the resolved plaintext secret at all. It always resolves whenever using a secret.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, secret ref doesn't keep the resolved plaintext secret at all. It always resolves whenever using a secret.

Oh, here is a some gap on resolve.
You can ref to resolve_connection_ref_and_secret_ref, resolve here means extracting xxx = secret <secret name> from "with props" to PbSecretRef (secret_id). We will do the same op for secrets coming from both connection catalog and with props.

@@ -124,6 +124,7 @@ pub struct SinkFormatDesc {
pub options: BTreeMap<String, String>,
pub secret_refs: BTreeMap<String, PbSecretRef>,
pub key_encode: Option<SinkEncode>,
pub connection_id: Option<u32>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

@fuyufjh

This comment was marked as resolved.

@tabVersion
Copy link
Contributor Author

Please add or update some e2e test to cover this feature.

Does e2e_test/source_inline/connection/ddl.slt lack some scenes?

Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM

@@ -246,6 +269,7 @@ message Connection {
string name = 4;
oneof info {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, then let's place connection_params outside the oneof at least

@@ -156,6 +161,9 @@ message SinkFormatDesc {
optional plan_common.EncodeType key_encode = 4;
// Secret used for format encode options.
map<string, secret.SecretRef> secret_refs = 5;

// ref connection for schema registry
optional uint32 connection_id = 6;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. You keep both connection_id and the resolved connection arguments in these message structures, right? It's acceptable to me but a bit counter-intuitive.

And to keep the design simple and align with the secret ref,

IIRC, secret ref doesn't keep the resolved plaintext secret at all. It always resolves whenever using a secret.

e2e_test/source_inline/connection/ddl.slt Outdated Show resolved Hide resolved
@fuyufjh fuyufjh changed the title feat: Connection for connector usage feat: Connection for Kafka source & sink Nov 26, 2024
Copy link
Contributor

@wcy-fdu wcy-fdu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Common part generally LGTM.

@@ -123,6 +126,8 @@ message Source {
uint32 associated_table_id = 12;
}
string definition = 13;

// ref connection for connector
optional uint32 connection_id = 14;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we just reuse Source.connection_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and same for the schema registry part and sink.

@tabVersion tabVersion added this pull request to the merge queue Nov 28, 2024
Merged via the queue into main with commit 01b703d Nov 28, 2024
33 of 34 checks passed
@tabVersion tabVersion deleted the tab/connection branch November 28, 2024 14:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants