-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(connector): support postgres_sink in rust #19328
Conversation
4b4eaf6
to
7f746f9
Compare
src/connector/src/sink/postgres.rs
Outdated
match op { | ||
Op::Insert => { | ||
self.client | ||
.execute_raw(&self.insert_statement, row.iter()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we insert each row of a chunk one by one, the performance could be sub-optimal compared to bulk insert. e.g. INSERT INTO t VALUES (a1,b1,c1),(a2,b2,c2),...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimize it later when I'm doing benchmarking. The entire chunk may have interleaving of insert
, delete
, update
. I'm not sure if there will be a clear performance benefit.
Does jdbc sink batch insert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimize it later when I'm doing benchmarking. The entire chunk may have interleaving of
insert
,delete
,update
. I'm not sure if there will be a clear performance benefit.Does jdbc sink batch insert?
We have some improvements to support the batch DML to downstream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let mut pg_config = tokio_postgres::Config::new(); | ||
pg_config | ||
.user(&config.username) | ||
.password(&config.password) | ||
.host(&config.host) | ||
.port(config.port.parse::<u16>().unwrap()) | ||
.dbname(&config.database); | ||
|
||
let (_verify_ca, verify_hostname) = match config.ssl_mode { | ||
SslMode::VerifyCa => (true, false), | ||
SslMode::VerifyFull => (true, true), | ||
_ => (false, false), | ||
}; | ||
|
||
#[cfg(not(madsim))] | ||
let connector = match config.ssl_mode { | ||
SslMode::Disabled => { | ||
pg_config.ssl_mode(tokio_postgres::config::SslMode::Disable); | ||
MaybeMakeTlsConnector::NoTls(NoTls) | ||
} | ||
SslMode::Preferred => { | ||
pg_config.ssl_mode(tokio_postgres::config::SslMode::Prefer); | ||
match SslConnector::builder(SslMethod::tls()) { | ||
Ok(mut builder) => { | ||
// disable certificate verification for `prefer` | ||
builder.set_verify(SslVerifyMode::NONE); | ||
MaybeMakeTlsConnector::Tls(MakeTlsConnector::new(builder.build())) | ||
} | ||
Err(e) => { | ||
tracing::warn!(error = %e.as_report(), "SSL connector error"); | ||
MaybeMakeTlsConnector::NoTls(NoTls) | ||
} | ||
} | ||
} | ||
SslMode::Required => { | ||
pg_config.ssl_mode(tokio_postgres::config::SslMode::Require); | ||
let mut builder = SslConnector::builder(SslMethod::tls())?; | ||
// disable certificate verification for `require` | ||
builder.set_verify(SslVerifyMode::NONE); | ||
MaybeMakeTlsConnector::Tls(MakeTlsConnector::new(builder.build())) | ||
} | ||
|
||
SslMode::VerifyCa | SslMode::VerifyFull => { | ||
pg_config.ssl_mode(tokio_postgres::config::SslMode::Require); | ||
let mut builder = SslConnector::builder(SslMethod::tls())?; | ||
if let Some(ssl_root_cert) = config.ssl_root_cert { | ||
builder.set_ca_file(ssl_root_cert).map_err(|e| { | ||
anyhow!(format!("bad ssl root cert error: {}", e.to_report_string())) | ||
})?; | ||
} | ||
let mut connector = MakeTlsConnector::new(builder.build()); | ||
if !verify_hostname { | ||
connector.set_callback(|config, _| { | ||
config.set_verify_hostname(false); | ||
Ok(()) | ||
}); | ||
} | ||
MaybeMakeTlsConnector::Tls(connector) | ||
} | ||
}; | ||
#[cfg(madsim)] | ||
let connector = NoTls; | ||
|
||
let (client, connection) = pg_config.connect(connector).await?; | ||
|
||
tokio::spawn(async move { | ||
if let Err(e) = connection.await { | ||
tracing::error!(error = %e.as_report(), "postgres connection error"); | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored into connector_common::create_pg_client
.
pk_names, | ||
}) | ||
} | ||
|
||
pub fn column_descs(&self) -> &Vec<ColumnDesc> { | ||
&self.column_descs | ||
} | ||
|
||
pub fn pk_names(&self) -> &Vec<String> { | ||
&self.pk_names | ||
} | ||
} | ||
|
||
fn type_to_rw_type(col_type: &ColumnType) -> ConnectorResult<DataType> { | ||
let dtype = match col_type { | ||
ColumnType::SmallInt | ColumnType::SmallSerial => DataType::Int16, | ||
ColumnType::Integer | ColumnType::Serial => DataType::Int32, | ||
ColumnType::BigInt | ColumnType::BigSerial => DataType::Int64, | ||
ColumnType::Money | ColumnType::Decimal(_) | ColumnType::Numeric(_) => DataType::Decimal, | ||
ColumnType::Real => DataType::Float32, | ||
ColumnType::DoublePrecision => DataType::Float64, | ||
ColumnType::Varchar(_) | ColumnType::Char(_) | ColumnType::Text => DataType::Varchar, | ||
ColumnType::Bytea => DataType::Bytea, | ||
ColumnType::Timestamp(_) => DataType::Timestamp, | ||
ColumnType::TimestampWithTimeZone(_) => DataType::Timestamptz, | ||
ColumnType::Date => DataType::Date, | ||
ColumnType::Time(_) | ColumnType::TimeWithTimeZone(_) => DataType::Time, | ||
ColumnType::Interval(_) => DataType::Interval, | ||
ColumnType::Boolean => DataType::Boolean, | ||
ColumnType::Point => DataType::Struct(StructType::new(vec![ | ||
("x", DataType::Float32), | ||
("y", DataType::Float32), | ||
])), | ||
ColumnType::Uuid => DataType::Varchar, | ||
ColumnType::Xml => DataType::Varchar, | ||
ColumnType::Json => DataType::Jsonb, | ||
ColumnType::JsonBinary => DataType::Jsonb, | ||
ColumnType::Array(def) => { | ||
let item_type = match def.col_type.as_ref() { | ||
Some(ty) => type_to_rw_type(ty.as_ref())?, | ||
None => { | ||
return Err(anyhow!("ARRAY type missing element type").into()); | ||
} | ||
}; | ||
|
||
DataType::List(Box::new(item_type)) | ||
} | ||
ColumnType::PgLsn => DataType::Int64, | ||
ColumnType::Cidr | ||
| ColumnType::Inet | ||
| ColumnType::MacAddr | ||
| ColumnType::MacAddr8 | ||
| ColumnType::Int4Range | ||
| ColumnType::Int8Range | ||
| ColumnType::NumRange | ||
| ColumnType::TsRange | ||
| ColumnType::TsTzRange | ||
| ColumnType::DateRange | ||
| ColumnType::Enum(_) => DataType::Varchar, | ||
|
||
ColumnType::Line => { | ||
return Err(anyhow!("LINE type not supported").into()); | ||
} | ||
ColumnType::Lseg => { | ||
return Err(anyhow!("LSEG type not supported").into()); | ||
} | ||
ColumnType::Box => { | ||
return Err(anyhow!("BOX type not supported").into()); | ||
} | ||
ColumnType::Path => { | ||
return Err(anyhow!("PATH type not supported").into()); | ||
} | ||
ColumnType::Polygon => { | ||
return Err(anyhow!("POLYGON type not supported").into()); | ||
} | ||
ColumnType::Circle => { | ||
return Err(anyhow!("CIRCLE type not supported").into()); | ||
} | ||
ColumnType::Bit(_) => { | ||
return Err(anyhow!("BIT type not supported").into()); | ||
} | ||
ColumnType::VarBit(_) => { | ||
return Err(anyhow!("VARBIT type not supported").into()); | ||
} | ||
ColumnType::TsVector => { | ||
return Err(anyhow!("TSVECTOR type not supported").into()); | ||
} | ||
ColumnType::TsQuery => { | ||
return Err(anyhow!("TSQUERY type not supported").into()); | ||
} | ||
ColumnType::Unknown(name) => { | ||
// NOTES: user-defined enum type is classified as `Unknown` | ||
tracing::warn!("Unknown Postgres data type: {name}, map to varchar"); | ||
DataType::Varchar | ||
} | ||
}; | ||
|
||
Ok(dtype) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored into connector_common::postgres
.
#[derive(Debug, Clone, PartialEq, Deserialize)] | ||
#[serde(rename_all = "lowercase")] | ||
pub enum SslMode { | ||
#[serde(alias = "disable")] | ||
Disabled, | ||
#[serde(alias = "prefer")] | ||
Preferred, | ||
#[serde(alias = "require")] | ||
Required, | ||
/// verify that the server is trustworthy by checking the certificate chain | ||
/// up to the root certificate stored on the client. | ||
#[serde(alias = "verify-ca")] | ||
VerifyCa, | ||
/// Besides verify the certificate, will also verify that the serverhost name | ||
/// matches the name stored in the server certificate. | ||
#[serde(alias = "verify-full")] | ||
VerifyFull, | ||
} | ||
|
||
impl Default for SslMode { | ||
fn default() -> Self { | ||
// default to `disabled` for backward compatibility | ||
Self::Disabled | ||
} | ||
} | ||
|
||
impl fmt::Display for SslMode { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
f.write_str(match self { | ||
SslMode::Disabled => "disabled", | ||
SslMode::Preferred => "preferred", | ||
SslMode::Required => "required", | ||
SslMode::VerifyCa => "verify-ca", | ||
SslMode::VerifyFull => "verify-full", | ||
}) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored into connector_common
so we can reuse it for pg sink.
use crate::parser::scalar_adapter::{validate_pg_type_to_rw_type, ScalarAdapter}; | ||
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; | ||
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main logic for postgres sink.
macro_rules! rw_row_to_pg_values { | ||
($row:expr, $statement:expr) => { | ||
$row.iter().enumerate().map(|(i, d)| { | ||
d.and_then(|d| { | ||
let ty = &$statement.params()[i]; | ||
match ScalarAdapter::from_scalar(d, ty) { | ||
Ok(scalar) => Some(scalar), | ||
Err(e) => { | ||
tracing::error!(error=%e.as_report(), scalar=?d, "Failed to convert scalar to pg value"); | ||
None | ||
} | ||
} | ||
}) | ||
}) | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use a macro
rather than a function call here. The borrow checker does not support returning a reference to a value instantiated within itself, which happens when we call row.iter()
.
|
||
// Rewrite schema types for serialization | ||
let schema_types = { | ||
let pg_table = PostgresExternalTable::connect( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We called this once inside validate
, but validate
just does a check, we can't store the pg_table
obtained from there. We have to call it again, so we can get the pg_table
, and from there get the pg_types. This type info will tell us how to handle our dynamic values. For instance, the same value can be serialized to varchar / uuid, depending on the type info.
SeaType::Array(_) => bail!("nested array type is not supported"), | ||
SeaType::Unknown(name) => { | ||
// Treat as enum type | ||
Ok(PgType::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We test this path in our e2e tests.
} | ||
SeaType::Unknown(name) => { | ||
// Treat as enum type | ||
Ok(PgType::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We test this path in our e2e tests.
a7e6997
to
e57cbd5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
match rw_type { | ||
DataType::Varchar => matches!(pg_type, DataType::Decimal | DataType::Int256), | ||
DataType::List(box DataType::Varchar) => { | ||
matches!( | ||
pg_type, | ||
DataType::List(box (DataType::Decimal | DataType::Int256)) | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the same check implemented in the Java version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the Java version validates type matching, only column names. I added this extra part.
Lines 46 to 119 in 5d6aaf4
public void validate( | |
TableSchema tableSchema, Map<String, String> tableProperties, SinkType sinkType) { | |
ObjectMapper mapper = new ObjectMapper(); | |
mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true); | |
JDBCSinkConfig config = mapper.convertValue(tableProperties, JDBCSinkConfig.class); | |
String jdbcUrl = config.getJdbcUrl(); | |
String tableName = config.getTableName(); | |
String schemaName = config.getSchemaName(); | |
Set<String> jdbcColumns = new HashSet<>(); | |
Set<String> jdbcPks = new HashSet<>(); | |
Set<String> jdbcTableNames = new HashSet<>(); | |
try (Connection conn = DriverManager.getConnection(jdbcUrl); | |
ResultSet tableNamesResultSet = | |
conn.getMetaData().getTables(null, schemaName, "%", null); | |
ResultSet columnResultSet = | |
conn.getMetaData().getColumns(null, schemaName, tableName, null); | |
ResultSet pkResultSet = | |
conn.getMetaData().getPrimaryKeys(null, schemaName, tableName); ) { | |
while (tableNamesResultSet.next()) { | |
jdbcTableNames.add(tableNamesResultSet.getString("TABLE_NAME")); | |
} | |
while (columnResultSet.next()) { | |
jdbcColumns.add(columnResultSet.getString("COLUMN_NAME")); | |
} | |
while (pkResultSet.next()) { | |
jdbcPks.add(pkResultSet.getString("COLUMN_NAME")); | |
} | |
} catch (SQLException e) { | |
LOG.error("failed to connect to target database. jdbcUrl: {}", jdbcUrl, e); | |
throw Status.INVALID_ARGUMENT | |
.withDescription( | |
"failed to connect to target database: " | |
+ e.getSQLState() | |
+ ": " | |
+ e.getMessage()) | |
.asRuntimeException(); | |
} | |
if (!jdbcTableNames.contains(tableName)) { | |
throw Status.INVALID_ARGUMENT | |
.withDescription("table not found: " + tableName) | |
.asRuntimeException(); | |
} | |
// Check that all columns in tableSchema exist in the JDBC table. | |
for (String sinkColumn : tableSchema.getColumnNames()) { | |
if (!jdbcColumns.contains(sinkColumn)) { | |
LOG.error("column not found: {}", sinkColumn); | |
throw Status.FAILED_PRECONDITION | |
.withDescription( | |
"table schema does not match, column not found: " + sinkColumn) | |
.asRuntimeException(); | |
} | |
} | |
if (sinkType == SinkType.SINK_TYPE_UPSERT) { | |
// For upsert JDBC sink, the primary key defined on the table must match the one in | |
// config and cannot be empty | |
var pkInWith = new HashSet<>(tableSchema.getPrimaryKeys()); | |
if (jdbcPks.isEmpty() || !jdbcPks.equals(pkInWith)) { | |
throw Status.INVALID_ARGUMENT | |
.withDescription( | |
"JDBC table has no primary key or the primary key doesn't match the 'primary_key' option in the WITH clause") | |
.asRuntimeException(); | |
} | |
if (tableSchema.getPrimaryKeys().isEmpty()) { | |
throw Status.INVALID_ARGUMENT | |
.withDescription("Must specify downstream primary key for upsert JDBC sink") | |
.asRuntimeException(); | |
} | |
} | |
} |
cc @StrikeW could you confirm if my understanding is correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The data type validation is indeed a missing piece of the JDBC sink.
@wenym1 mentioned to use LogSinker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I think we can add a switch session variable that switches a jdbc sink prefixed with jdbc:postgres
to the rust implementation when we are confident about performance and stability.
match rw_type { | ||
DataType::Varchar => matches!(pg_type, DataType::Decimal | DataType::Int256), | ||
DataType::List(box DataType::Varchar) => { | ||
matches!( | ||
pg_type, | ||
DataType::List(box (DataType::Decimal | DataType::Int256)) | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The data type validation is indeed a missing piece of the JDBC sink.
Don't get this, won't users just create a native postgres sink with |
Depends on whether we plan to totally deprecate the jdbc sink in future. For example when we also have a native rust mysql sink, then we can proxy the existing jdbc sink to native rust implementation in a major release in the future. |
USe this instead of SinkWriter. Do it in future PR. |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
In this PR we support native
postgres_sink
in rust. This is expected to have performance improvements, and give us greater control over the pg connections. Will follow up with benchmark + optimizations in separate PR.We support
upsert
andappend-only
sink types in this PR.We use
insert do update on conflict
to handle the update branch inupsert
.There's some logic to highlight:
ScalarAdapter
fromScalarImpl
. This is for compatibility with postgres types like Uuid, Array.sea_schema
(which we use for table schema discovery), totokio_postgres::Type
(which we use for query execution).ScalarAdapter
.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
TODO: Will fill this in before merging.