Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mysql-cdc): support ssl.mode to allow configure the ssl behavior #16579

Merged
merged 5 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class DbzConnectorConfig {

/* MySQL configs */
public static final String MYSQL_SERVER_ID = "server.id";
public static final String MYSQL_SSL_MODE = "ssl.mode";

/* Postgres configs */
public static final String PG_SLOT_NAME = "slot.name";
Expand Down Expand Up @@ -231,8 +232,8 @@ public DbzConnectorConfig(
ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset);
}

var mongodbUrl = userProps.get("mongodb.url");
var collection = userProps.get("collection.name");
var mongodbUrl = userProps.get(MongoDb.MONGO_URL);
var collection = userProps.get(MongoDb.MONGO_COLLECTION_NAME);
var connectionStr = new ConnectionString(mongodbUrl);
var connectorName =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.*;

public class MySqlValidator extends DatabaseValidator implements AutoCloseable {
private final Map<String, String> userProps;
Expand Down Expand Up @@ -51,9 +48,14 @@ public MySqlValidator(
var dbName = userProps.get(DbzConnectorConfig.DB_NAME);
var jdbcUrl = ValidatorUtils.getJdbcUrl(SourceTypeE.MYSQL, dbHost, dbPort, dbName);

var user = userProps.get(DbzConnectorConfig.USER);
var password = userProps.get(DbzConnectorConfig.PASSWORD);
this.jdbcConnection = DriverManager.getConnection(jdbcUrl, user, password);
var properties = new Properties();
properties.setProperty("user", userProps.get(DbzConnectorConfig.USER));
properties.setProperty("password", userProps.get(DbzConnectorConfig.PASSWORD));
properties.setProperty(
"sslMode", userProps.getOrDefault(DbzConnectorConfig.MYSQL_SSL_MODE, "DISABLED"));
properties.setProperty("allowPublicKeyRetrieval", "true");

this.jdbcConnection = DriverManager.getConnection(jdbcUrl, properties);
this.isCdcSourceJob = isCdcSourceJob;
this.isBackfillTable = isBackfillTable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ schema.history.internal.store.only.captured.databases.ddl=true
# default to disable schema change events
include.schema.changes=${debezium.include.schema.changes:-false}
database.server.id=${server.id}
# default to use unencrypted connection
database.ssl.mode=${ssl.mode:-disabled}
# default heartbeat interval 60 seconds
heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-60000}
# In sharing cdc mode, we will subscribe to multiple tables in the given database,
Expand Down
4 changes: 2 additions & 2 deletions java/connector-node/risingwave-sink-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<dependency>
<groupId>org.xerial</groupId>
Expand Down
4 changes: 2 additions & 2 deletions java/connector-node/risingwave-sink-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>

</dependencies>
Expand Down
4 changes: 2 additions & 2 deletions java/connector-node/risingwave-source-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@

<!-- database dependencies -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<dependency>
<groupId>com.zendesk</groupId>
Expand Down
11 changes: 3 additions & 8 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
<flink.version>1.18.0</flink.version>
<testcontainers.version>1.17.6</testcontainers.version>
<postgresql.version>42.5.5</postgresql.version>
<mysql.connector.java.version>8.0.28</mysql.connector.java.version>
<mysql.connector.java.version>8.0.33</mysql.connector.java.version>
<mongodb.driver.sync.version>4.11.1</mongodb.driver.sync.version>
<sqlite.version>3.45.0.0</sqlite.version>
<aws.version>2.21.42</aws.version>
Expand Down Expand Up @@ -178,8 +178,8 @@
<version>${postgresql.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>${mysql.connector.java.version}</version>
</dependency>
<dependency>
Expand Down Expand Up @@ -360,11 +360,6 @@
<artifactId>apache-client</artifactId>
<version>${aws.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
Expand Down
46 changes: 27 additions & 19 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ pub enum ExternalTableReaderImpl {

#[derive(Debug)]
pub struct MySqlExternalTableReader {
config: ExternalTableConfig,
rw_schema: Schema,
field_names: String,
// use mutex to provide shared mutable access to the connection
Expand All @@ -250,7 +249,7 @@ pub struct ExternalTableConfig {
#[serde(rename = "table.name")]
pub table: String,
/// `ssl.mode` specifies the SSL/TLS encryption level for secure communication with Postgres.
/// Choices include `disable`, `prefer`, and `require`.
/// Choices include `disabled`, `preferred`, and `required`.
Copy link
Contributor

@neverchanje neverchanje May 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an FYI: 😄

disable/prefer/require actually uses the naming convention from PG, although it may look like a grammar mistake. I guess the intention was to give shorter names to these options.

Screenshot 2024-05-05 at 19 20 29

Copy link
Contributor Author

@StrikeW StrikeW May 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I know. The ssl.mode is just a interface to the end users, and mysql uses disabled, preferred, and required which have same semantic as PG. So I decide to unify the naming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change...

/// This field is optional.
#[serde(rename = "ssl.mode", default = "Default::default")]
pub sslmode: SslMode,
Expand All @@ -259,24 +258,24 @@ pub struct ExternalTableConfig {
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum SslMode {
Disable,
Prefer,
Require,
Disabled,
Preferred,
Required,
}

impl Default for SslMode {
fn default() -> Self {
// default to `disable` for backward compatibility
Self::Disable
// default to `disabled` for backward compatibility
Self::Disabled
}
}

impl fmt::Display for SslMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
SslMode::Disable => "disable",
SslMode::Prefer => "prefer",
SslMode::Require => "require",
SslMode::Disabled => "disabled",
SslMode::Preferred => "preferred",
SslMode::Required => "required",
})
}
}
Expand Down Expand Up @@ -320,19 +319,29 @@ impl MySqlExternalTableReader {
with_properties: HashMap<String, String>,
rw_schema: Schema,
) -> ConnectorResult<Self> {
tracing::debug!(?rw_schema, "create mysql external table reader");

let config = serde_json::from_value::<ExternalTableConfig>(
serde_json::to_value(with_properties).unwrap(),
)
.context("failed to extract mysql connector properties")?;

let database_url = format!(
"mysql://{}:{}@{}:{}/{}",
config.username, config.password, config.host, config.port, config.database
);
let opts = mysql_async::Opts::from_url(&database_url).map_err(mysql_async::Error::Url)?;
let conn = mysql_async::Conn::new(opts).await?;
let mut opts_builder = mysql_async::OptsBuilder::default()
.user(Some(config.username))
.pass(Some(config.password))
.ip_or_hostname(config.host)
.tcp_port(config.port.parse::<u16>().unwrap())
.db_name(Some(config.database));

opts_builder = match config.sslmode {
SslMode::Disabled | SslMode::Preferred => opts_builder.ssl_opts(None),
SslMode::Required => {
let ssl_without_verify = mysql_async::SslOpts::default()
.with_danger_accept_invalid_certs(true)
.with_danger_skip_domain_validation(true);
opts_builder.ssl_opts(Some(ssl_without_verify))
}
};

let conn = mysql_async::Conn::new(mysql_async::Opts::from(opts_builder)).await?;

let field_names = rw_schema
.fields
Expand All @@ -342,7 +351,6 @@ impl MySqlExternalTableReader {
.join(",");

Ok(Self {
config,
rw_schema,
field_names,
conn: tokio::sync::Mutex::new(conn),
Expand Down
49 changes: 27 additions & 22 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,31 +132,36 @@ impl PostgresExternalTableReader {
)
.context("failed to extract postgres connector properties")?;

let database_url = format!(
"postgresql://{}:{}@{}:{}/{}?sslmode={}",
config.username,
config.password,
config.host,
config.port,
config.database,
config.sslmode
);
let mut pg_config = tokio_postgres::Config::new();
pg_config
.user(&config.username)
.password(&config.password)
.host(&config.host)
.port(config.port.parse::<u16>().unwrap())
.dbname(&config.database);

#[cfg(not(madsim))]
let connector = match config.sslmode {
SslMode::Disable => MaybeMakeTlsConnector::NoTls(NoTls),
SslMode::Prefer => match SslConnector::builder(SslMethod::tls()) {
Ok(mut builder) => {
// disable certificate verification for `prefer`
builder.set_verify(SslVerifyMode::NONE);
MaybeMakeTlsConnector::Tls(MakeTlsConnector::new(builder.build()))
}
Err(e) => {
tracing::warn!(error = %e.as_report(), "SSL connector error");
MaybeMakeTlsConnector::NoTls(NoTls)
SslMode::Disabled => {
pg_config.ssl_mode(tokio_postgres::config::SslMode::Disable);
MaybeMakeTlsConnector::NoTls(NoTls)
}
SslMode::Preferred => {
pg_config.ssl_mode(tokio_postgres::config::SslMode::Prefer);
match SslConnector::builder(SslMethod::tls()) {
Ok(mut builder) => {
// disable certificate verification for `prefer`
builder.set_verify(SslVerifyMode::NONE);
MaybeMakeTlsConnector::Tls(MakeTlsConnector::new(builder.build()))
}
Err(e) => {
tracing::warn!(error = %e.as_report(), "SSL connector error");
MaybeMakeTlsConnector::NoTls(NoTls)
}
}
},
SslMode::Require => {
}
SslMode::Required => {
pg_config.ssl_mode(tokio_postgres::config::SslMode::Require);
let mut builder = SslConnector::builder(SslMethod::tls())?;
// disable certificate verification for `require`
builder.set_verify(SslVerifyMode::NONE);
Expand All @@ -166,7 +171,7 @@ impl PostgresExternalTableReader {
#[cfg(madsim)]
let connector = NoTls;

let (client, connection) = tokio_postgres::connect(&database_url, connector).await?;
let (client, connection) = pg_config.connect(connector).await?;

tokio::spawn(async move {
if let Err(e) = connection.await {
Expand Down
Loading