Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): support postgres_sink in rust #19328

Merged
merged 56 commits into from
Nov 28, 2024
Merged

Conversation

kwannoel
Copy link
Contributor

@kwannoel kwannoel commented Nov 11, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

In this PR we support native postgres_sink in rust. This is expected to have performance improvements, and give us greater control over the pg connections. Will follow up with benchmark + optimizations in separate PR.

We support upsert and append-only sink types in this PR.
We use insert do update on conflict to handle the update branch in upsert.

There's some logic to highlight:

  1. We convert ScalarAdapter from ScalarImpl. This is for compatibility with postgres types like Uuid, Array.
  2. We map pg types from sea_schema (which we use for table schema discovery), to tokio_postgres::Type (which we use for query execution).
  • Check type mapping.
  • Sink tests.
  • Type validation tests.
  • Benchmark & optimize (future pr):
  • Support switch a jdbc sink to pg. Not sure if totally feasible. But it will support more seamless upgrade.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

TODO: Will fill this in before merging.

risedev.yml Show resolved Hide resolved
@kwannoel kwannoel marked this pull request as ready for review November 19, 2024 15:44
@graphite-app graphite-app bot requested review from a team November 19, 2024 16:23
@kwannoel kwannoel force-pushed the kwannoel/postgres-sink branch from 4b4eaf6 to 7f746f9 Compare November 20, 2024 01:11
@hzxa21 hzxa21 self-requested a review November 20, 2024 03:31
@graphite-app graphite-app bot requested a review from a team November 20, 2024 04:36
src/connector/src/sink/postgres.rs Outdated Show resolved Hide resolved
src/connector/src/sink/postgres.rs Show resolved Hide resolved
src/connector/src/sink/postgres.rs Outdated Show resolved Hide resolved
match op {
Op::Insert => {
self.client
.execute_raw(&self.insert_statement, row.iter())
Copy link
Contributor

@StrikeW StrikeW Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we insert each row of a chunk one by one, the performance could be sub-optimal compared to bulk insert. e.g. INSERT INTO t VALUES (a1,b1,c1),(a2,b2,c2),...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize it later when I'm doing benchmarking. The entire chunk may have interleaving of insert, delete, update. I'm not sure if there will be a clear performance benefit.

Does jdbc sink batch insert?

Copy link
Contributor

@StrikeW StrikeW Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize it later when I'm doing benchmarking. The entire chunk may have interleaving of insert, delete, update. I'm not sure if there will be a clear performance benefit.

Does jdbc sink batch insert?

We have some improvements to support the batch DML to downstream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/connector/src/sink/postgres.rs Outdated Show resolved Hide resolved
src/connector/src/sink/postgres.rs Outdated Show resolved Hide resolved
src/connector/src/sink/postgres.rs Outdated Show resolved Hide resolved
Comment on lines -315 to -384
let mut pg_config = tokio_postgres::Config::new();
pg_config
.user(&config.username)
.password(&config.password)
.host(&config.host)
.port(config.port.parse::<u16>().unwrap())
.dbname(&config.database);

let (_verify_ca, verify_hostname) = match config.ssl_mode {
SslMode::VerifyCa => (true, false),
SslMode::VerifyFull => (true, true),
_ => (false, false),
};

#[cfg(not(madsim))]
let connector = match config.ssl_mode {
SslMode::Disabled => {
pg_config.ssl_mode(tokio_postgres::config::SslMode::Disable);
MaybeMakeTlsConnector::NoTls(NoTls)
}
SslMode::Preferred => {
pg_config.ssl_mode(tokio_postgres::config::SslMode::Prefer);
match SslConnector::builder(SslMethod::tls()) {
Ok(mut builder) => {
// disable certificate verification for `prefer`
builder.set_verify(SslVerifyMode::NONE);
MaybeMakeTlsConnector::Tls(MakeTlsConnector::new(builder.build()))
}
Err(e) => {
tracing::warn!(error = %e.as_report(), "SSL connector error");
MaybeMakeTlsConnector::NoTls(NoTls)
}
}
}
SslMode::Required => {
pg_config.ssl_mode(tokio_postgres::config::SslMode::Require);
let mut builder = SslConnector::builder(SslMethod::tls())?;
// disable certificate verification for `require`
builder.set_verify(SslVerifyMode::NONE);
MaybeMakeTlsConnector::Tls(MakeTlsConnector::new(builder.build()))
}

SslMode::VerifyCa | SslMode::VerifyFull => {
pg_config.ssl_mode(tokio_postgres::config::SslMode::Require);
let mut builder = SslConnector::builder(SslMethod::tls())?;
if let Some(ssl_root_cert) = config.ssl_root_cert {
builder.set_ca_file(ssl_root_cert).map_err(|e| {
anyhow!(format!("bad ssl root cert error: {}", e.to_report_string()))
})?;
}
let mut connector = MakeTlsConnector::new(builder.build());
if !verify_hostname {
connector.set_callback(|config, _| {
config.set_verify_hostname(false);
Ok(())
});
}
MaybeMakeTlsConnector::Tls(connector)
}
};
#[cfg(madsim)]
let connector = NoTls;

let (client, connection) = pg_config.connect(connector).await?;

tokio::spawn(async move {
if let Err(e) = connection.await {
tracing::error!(error = %e.as_report(), "postgres connection error");
}
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored into connector_common::create_pg_client.

Comment on lines -81 to -263
pk_names,
})
}

pub fn column_descs(&self) -> &Vec<ColumnDesc> {
&self.column_descs
}

pub fn pk_names(&self) -> &Vec<String> {
&self.pk_names
}
}

fn type_to_rw_type(col_type: &ColumnType) -> ConnectorResult<DataType> {
let dtype = match col_type {
ColumnType::SmallInt | ColumnType::SmallSerial => DataType::Int16,
ColumnType::Integer | ColumnType::Serial => DataType::Int32,
ColumnType::BigInt | ColumnType::BigSerial => DataType::Int64,
ColumnType::Money | ColumnType::Decimal(_) | ColumnType::Numeric(_) => DataType::Decimal,
ColumnType::Real => DataType::Float32,
ColumnType::DoublePrecision => DataType::Float64,
ColumnType::Varchar(_) | ColumnType::Char(_) | ColumnType::Text => DataType::Varchar,
ColumnType::Bytea => DataType::Bytea,
ColumnType::Timestamp(_) => DataType::Timestamp,
ColumnType::TimestampWithTimeZone(_) => DataType::Timestamptz,
ColumnType::Date => DataType::Date,
ColumnType::Time(_) | ColumnType::TimeWithTimeZone(_) => DataType::Time,
ColumnType::Interval(_) => DataType::Interval,
ColumnType::Boolean => DataType::Boolean,
ColumnType::Point => DataType::Struct(StructType::new(vec![
("x", DataType::Float32),
("y", DataType::Float32),
])),
ColumnType::Uuid => DataType::Varchar,
ColumnType::Xml => DataType::Varchar,
ColumnType::Json => DataType::Jsonb,
ColumnType::JsonBinary => DataType::Jsonb,
ColumnType::Array(def) => {
let item_type = match def.col_type.as_ref() {
Some(ty) => type_to_rw_type(ty.as_ref())?,
None => {
return Err(anyhow!("ARRAY type missing element type").into());
}
};

DataType::List(Box::new(item_type))
}
ColumnType::PgLsn => DataType::Int64,
ColumnType::Cidr
| ColumnType::Inet
| ColumnType::MacAddr
| ColumnType::MacAddr8
| ColumnType::Int4Range
| ColumnType::Int8Range
| ColumnType::NumRange
| ColumnType::TsRange
| ColumnType::TsTzRange
| ColumnType::DateRange
| ColumnType::Enum(_) => DataType::Varchar,

ColumnType::Line => {
return Err(anyhow!("LINE type not supported").into());
}
ColumnType::Lseg => {
return Err(anyhow!("LSEG type not supported").into());
}
ColumnType::Box => {
return Err(anyhow!("BOX type not supported").into());
}
ColumnType::Path => {
return Err(anyhow!("PATH type not supported").into());
}
ColumnType::Polygon => {
return Err(anyhow!("POLYGON type not supported").into());
}
ColumnType::Circle => {
return Err(anyhow!("CIRCLE type not supported").into());
}
ColumnType::Bit(_) => {
return Err(anyhow!("BIT type not supported").into());
}
ColumnType::VarBit(_) => {
return Err(anyhow!("VARBIT type not supported").into());
}
ColumnType::TsVector => {
return Err(anyhow!("TSVECTOR type not supported").into());
}
ColumnType::TsQuery => {
return Err(anyhow!("TSQUERY type not supported").into());
}
ColumnType::Unknown(name) => {
// NOTES: user-defined enum type is classified as `Unknown`
tracing::warn!("Unknown Postgres data type: {name}, map to varchar");
DataType::Varchar
}
};

Ok(dtype)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored into connector_common::postgres.

Comment on lines -266 to -303
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum SslMode {
#[serde(alias = "disable")]
Disabled,
#[serde(alias = "prefer")]
Preferred,
#[serde(alias = "require")]
Required,
/// verify that the server is trustworthy by checking the certificate chain
/// up to the root certificate stored on the client.
#[serde(alias = "verify-ca")]
VerifyCa,
/// Besides verify the certificate, will also verify that the serverhost name
/// matches the name stored in the server certificate.
#[serde(alias = "verify-full")]
VerifyFull,
}

impl Default for SslMode {
fn default() -> Self {
// default to `disabled` for backward compatibility
Self::Disabled
}
}

impl fmt::Display for SslMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
SslMode::Disabled => "disabled",
SslMode::Preferred => "preferred",
SslMode::Required => "required",
SslMode::VerifyCa => "verify-ca",
SslMode::VerifyFull => "verify-full",
})
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored into connector_common so we can reuse it for pg sink.

use crate::parser::scalar_adapter::{validate_pg_type_to_rw_type, ScalarAdapter};
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam};

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main logic for postgres sink.

Comment on lines +41 to +56
macro_rules! rw_row_to_pg_values {
($row:expr, $statement:expr) => {
$row.iter().enumerate().map(|(i, d)| {
d.and_then(|d| {
let ty = &$statement.params()[i];
match ScalarAdapter::from_scalar(d, ty) {
Ok(scalar) => Some(scalar),
Err(e) => {
tracing::error!(error=%e.as_report(), scalar=?d, "Failed to convert scalar to pg value");
None
}
}
})
})
};
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use a macro rather than a function call here. The borrow checker does not support returning a reference to a value instantiated within itself, which happens when we call row.iter().


// Rewrite schema types for serialization
let schema_types = {
let pg_table = PostgresExternalTable::connect(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We called this once inside validate, but validate just does a check, we can't store the pg_table obtained from there. We have to call it again, so we can get the pg_table, and from there get the pg_types. This type info will tell us how to handle our dynamic values. For instance, the same value can be serialized to varchar / uuid, depending on the type info.

SeaType::Array(_) => bail!("nested array type is not supported"),
SeaType::Unknown(name) => {
// Treat as enum type
Ok(PgType::new(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We test this path in our e2e tests.

}
SeaType::Unknown(name) => {
// Treat as enum type
Ok(PgType::new(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We test this path in our e2e tests.

@kwannoel kwannoel requested a review from BugenZhao November 25, 2024 15:03
@kwannoel kwannoel force-pushed the kwannoel/postgres-sink branch from a7e6997 to e57cbd5 Compare November 26, 2024 05:06
@kwannoel kwannoel changed the base branch from main to kwannoel/sqlsmith-ignore-sqrt-err November 26, 2024 05:06
Base automatically changed from kwannoel/sqlsmith-ignore-sqrt-err to main November 26, 2024 07:45
Copy link
Contributor

@chenzl25 chenzl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

Comment on lines +322 to +329
match rw_type {
DataType::Varchar => matches!(pg_type, DataType::Decimal | DataType::Int256),
DataType::List(box DataType::Varchar) => {
matches!(
pg_type,
DataType::List(box (DataType::Decimal | DataType::Int256))
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the same check implemented in the Java version?

Copy link
Contributor Author

@kwannoel kwannoel Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the Java version validates type matching, only column names. I added this extra part.

public void validate(
TableSchema tableSchema, Map<String, String> tableProperties, SinkType sinkType) {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true);
JDBCSinkConfig config = mapper.convertValue(tableProperties, JDBCSinkConfig.class);
String jdbcUrl = config.getJdbcUrl();
String tableName = config.getTableName();
String schemaName = config.getSchemaName();
Set<String> jdbcColumns = new HashSet<>();
Set<String> jdbcPks = new HashSet<>();
Set<String> jdbcTableNames = new HashSet<>();
try (Connection conn = DriverManager.getConnection(jdbcUrl);
ResultSet tableNamesResultSet =
conn.getMetaData().getTables(null, schemaName, "%", null);
ResultSet columnResultSet =
conn.getMetaData().getColumns(null, schemaName, tableName, null);
ResultSet pkResultSet =
conn.getMetaData().getPrimaryKeys(null, schemaName, tableName); ) {
while (tableNamesResultSet.next()) {
jdbcTableNames.add(tableNamesResultSet.getString("TABLE_NAME"));
}
while (columnResultSet.next()) {
jdbcColumns.add(columnResultSet.getString("COLUMN_NAME"));
}
while (pkResultSet.next()) {
jdbcPks.add(pkResultSet.getString("COLUMN_NAME"));
}
} catch (SQLException e) {
LOG.error("failed to connect to target database. jdbcUrl: {}", jdbcUrl, e);
throw Status.INVALID_ARGUMENT
.withDescription(
"failed to connect to target database: "
+ e.getSQLState()
+ ": "
+ e.getMessage())
.asRuntimeException();
}
if (!jdbcTableNames.contains(tableName)) {
throw Status.INVALID_ARGUMENT
.withDescription("table not found: " + tableName)
.asRuntimeException();
}
// Check that all columns in tableSchema exist in the JDBC table.
for (String sinkColumn : tableSchema.getColumnNames()) {
if (!jdbcColumns.contains(sinkColumn)) {
LOG.error("column not found: {}", sinkColumn);
throw Status.FAILED_PRECONDITION
.withDescription(
"table schema does not match, column not found: " + sinkColumn)
.asRuntimeException();
}
}
if (sinkType == SinkType.SINK_TYPE_UPSERT) {
// For upsert JDBC sink, the primary key defined on the table must match the one in
// config and cannot be empty
var pkInWith = new HashSet<>(tableSchema.getPrimaryKeys());
if (jdbcPks.isEmpty() || !jdbcPks.equals(pkInWith)) {
throw Status.INVALID_ARGUMENT
.withDescription(
"JDBC table has no primary key or the primary key doesn't match the 'primary_key' option in the WITH clause")
.asRuntimeException();
}
if (tableSchema.getPrimaryKeys().isEmpty()) {
throw Status.INVALID_ARGUMENT
.withDescription("Must specify downstream primary key for upsert JDBC sink")
.asRuntimeException();
}
}
}
.

cc @StrikeW could you confirm if my understanding is correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data type validation is indeed a missing piece of the JDBC sink.

@kwannoel
Copy link
Contributor Author

@wenym1 mentioned to use LogSinker

Copy link
Contributor

@StrikeW StrikeW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I think we can add a switch session variable that switches a jdbc sink prefixed with jdbc:postgres to the rust implementation when we are confident about performance and stability.

Comment on lines +322 to +329
match rw_type {
DataType::Varchar => matches!(pg_type, DataType::Decimal | DataType::Int256),
DataType::List(box DataType::Varchar) => {
matches!(
pg_type,
DataType::List(box (DataType::Decimal | DataType::Int256))
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data type validation is indeed a missing piece of the JDBC sink.

@kwannoel
Copy link
Contributor Author

LGTM. I think we can add a switch session variable that switches a jdbc sink prefixed with jdbc:postgres to the rust implementation when we are confident about performance and stability.

Don't get this, won't users just create a native postgres sink with connector='postgres'? Do you mean you want to support changing existing sinks from jdbc to postgres? I don't think we should support that.

@StrikeW
Copy link
Contributor

StrikeW commented Nov 27, 2024

Don't get this, won't users just create a native postgres sink with connector='postgres'? Do you mean you want to support changing existing sinks from jdbc to postgres? I don't think we should support that.

Depends on whether we plan to totally deprecate the jdbc sink in future. For example when we also have a native rust mysql sink, then we can proxy the existing jdbc sink to native rust implementation in a major release in the future.

@kwannoel
Copy link
Contributor Author

wenym1 mentioned to use LogSinker

USe this instead of SinkWriter. Do it in future PR.

@kwannoel kwannoel added this pull request to the merge queue Nov 28, 2024
Merged via the queue into main with commit 15ba09d Nov 28, 2024
38 of 39 checks passed
@kwannoel kwannoel deleted the kwannoel/postgres-sink branch November 28, 2024 01:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants