Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): support postgres_sink in rust #19328

Merged
merged 56 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
0cb68c0
ignore sqrt error
kwannoel Nov 26, 2024
9b2383c
add postgres sink
kwannoel Nov 8, 2024
4cbbff5
skeleton
kwannoel Nov 11, 2024
6e0a00b
stub validation logic
kwannoel Nov 14, 2024
816082d
remove more unnecessary stuff + make compile
kwannoel Nov 14, 2024
e0d174d
init postgres client
kwannoel Nov 14, 2024
6a398b4
handle flush
kwannoel Nov 15, 2024
890dcb1
remove unnecessary serde
kwannoel Nov 15, 2024
5746e81
add password for risedev.yml:full profile
kwannoel Nov 15, 2024
5953687
convert numeric with_options for connector
kwannoel Nov 15, 2024
0ebd17b
add test
kwannoel Nov 15, 2024
582a876
handle deletes
kwannoel Nov 18, 2024
5c6c6fd
handle update
kwannoel Nov 19, 2024
e2c6e77
prepared typed
kwannoel Nov 19, 2024
bcd2b70
add test
kwannoel Nov 19, 2024
0f8bbb0
add to_sql for ScalarRefImpl
kwannoel Nov 19, 2024
81fdcca
separate sql string formatters into separate functions
kwannoel Nov 19, 2024
e679b06
add tests
kwannoel Nov 19, 2024
e4cfab7
use insert, delete, merge formatters
kwannoel Nov 19, 2024
e8e7bb5
directly iterate on row
kwannoel Nov 19, 2024
0e33eb8
cf
kwannoel Nov 19, 2024
aa94edd
handle max_batch_rows
kwannoel Nov 19, 2024
5e08577
remove unused schema field
kwannoel Nov 19, 2024
5ce0024
handle append-only
kwannoel Nov 19, 2024
0e4b222
test append-only + fix bug
kwannoel Nov 19, 2024
be52c6b
handle datatype validation
kwannoel Nov 19, 2024
704a09a
fix bug in validate + increase timeout for risedev:postgres service
kwannoel Nov 19, 2024
c4e08e1
test out-of-order pg and rw table
kwannoel Nov 19, 2024
eebf885
test more error states
kwannoel Nov 19, 2024
62dcf78
test
kwannoel Nov 20, 2024
670e740
handle connection error
kwannoel Nov 20, 2024
3fd129d
fix formatting
kwannoel Nov 20, 2024
b6b0f1e
fix
kwannoel Nov 20, 2024
8e7da05
fmt
kwannoel Nov 20, 2024
d84c4c4
fix
kwannoel Nov 20, 2024
a892bd8
use on conflict do update
kwannoel Nov 21, 2024
640d107
add schema fields
kwannoel Nov 21, 2024
fbd4f08
fmt
kwannoel Nov 21, 2024
23cb74f
refactor connector create_pg_client logic
kwannoel Nov 24, 2024
5c842b0
handle ssl options
kwannoel Nov 24, 2024
b5397db
add sanity check
kwannoel Nov 24, 2024
d123d6f
use pg_client method to construct
kwannoel Nov 24, 2024
b04feb9
move table reader and functions to common
kwannoel Nov 24, 2024
7f979da
decouple from pg config
kwannoel Nov 24, 2024
70065f2
use PostgresExternalTable to validate
kwannoel Nov 24, 2024
c87cd7c
reuse scalar adapter
kwannoel Nov 24, 2024
3829119
fix update sanity check + fix non-append-only pk check
kwannoel Nov 24, 2024
8caa1f0
loosen validation
kwannoel Nov 25, 2024
e5b47f6
fix
kwannoel Nov 25, 2024
46fd3db
handle edge cases
kwannoel Nov 25, 2024
364722a
fmt + fix warn
kwannoel Nov 25, 2024
fac753a
fix unit test
kwannoel Nov 25, 2024
7e75e4c
fix test + check
kwannoel Nov 25, 2024
6f93bb0
fix compile errors for madsim + fix lint
kwannoel Nov 25, 2024
dc7f788
more madsim fix
kwannoel Nov 25, 2024
e57cbd5
fix docs
kwannoel Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
461 changes: 461 additions & 0 deletions e2e_test/sink/postgres_sink.slt

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ profile:
- use: postgres
port: 8432
user: postgres
password: postgres
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
database: metadata
- use: meta-node
meta-backend: postgres
Expand Down
19 changes: 19 additions & 0 deletions src/common/src/types/jsonb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,3 +619,22 @@ impl<'a> FromSql<'a> for JsonbVal {
matches!(*ty, Type::JSONB)
}
}

impl ToSql for JsonbRef<'_> {
accepts!(JSONB);

to_sql_checked!();

fn to_sql(
&self,
_ty: &Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
where
Self: Sized,
{
out.put_u8(1);
write!(out, "{}", self.0).unwrap();
Ok(IsNull::No)
}
}
7 changes: 7 additions & 0 deletions src/common/src/types/postgres_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use postgres_types::Type as PgType;

use super::DataType;

/// `DataType` information extracted from PostgreSQL `pg_type`
Expand Down Expand Up @@ -149,4 +151,9 @@ impl DataType {
}
for_all_base_types! { impl_pg_name }
}

pub fn to_pg_type(&self) -> PgType {
let oid = self.to_oid();
PgType::from_oid(oid as u32).unwrap()
}
}
41 changes: 41 additions & 0 deletions src/common/src/types/to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::error::Error;

use bytes::BytesMut;
use postgres_types::{to_sql_checked, IsNull, ToSql, Type};
use risingwave_common::types::ScalarRefImpl;

use crate::types::ScalarImpl;

Expand Down Expand Up @@ -58,3 +59,43 @@ impl ToSql for ScalarImpl {
true
}
}

impl ToSql for ScalarRefImpl<'_> {
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
to_sql_checked!();

fn to_sql(&self, ty: &Type, out: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>>
where
Self: Sized,
{
match self {
ScalarRefImpl::Int16(v) => v.to_sql(ty, out),
ScalarRefImpl::Int32(v) => v.to_sql(ty, out),
ScalarRefImpl::Int64(v) => v.to_sql(ty, out),
ScalarRefImpl::Serial(v) => v.to_sql(ty, out),
ScalarRefImpl::Float32(v) => v.to_sql(ty, out),
ScalarRefImpl::Float64(v) => v.to_sql(ty, out),
ScalarRefImpl::Utf8(v) => v.to_sql(ty, out),
ScalarRefImpl::Bool(v) => v.to_sql(ty, out),
ScalarRefImpl::Decimal(v) => v.to_sql(ty, out),
ScalarRefImpl::Interval(v) => v.to_sql(ty, out),
ScalarRefImpl::Date(v) => v.to_sql(ty, out),
ScalarRefImpl::Timestamp(v) => v.to_sql(ty, out),
ScalarRefImpl::Timestamptz(v) => v.to_sql(ty, out),
ScalarRefImpl::Time(v) => v.to_sql(ty, out),
ScalarRefImpl::Bytea(v) => (&**v).to_sql(ty, out),
ScalarRefImpl::Jsonb(v) => v.to_sql(ty, out),
ScalarRefImpl::Int256(_) | ScalarRefImpl::Struct(_) | ScalarRefImpl::List(_) => {
bail_not_implemented!("the postgres encoding for {ty} is unsupported")
}
ScalarRefImpl::Map(_) => todo!(),
}
}

// return true to accept all types
fn accepts(_ty: &Type) -> bool
where
Self: Sized,
{
true
}
}
5 changes: 5 additions & 0 deletions src/connector/src/connector_common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ pub use common::{
};

mod iceberg;
#[cfg(not(madsim))]
mod maybe_tls_connector;
pub mod postgres;

pub use iceberg::IcebergCommon;
pub use postgres::{create_pg_client, PostgresExternalTable, SslMode};
Loading
Loading