Skip to content

Commit

Permalink
feat(connector): support postgres_sink in rust (#19328)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Nov 28, 2024
1 parent 42f1e1b commit 15ba09d
Show file tree
Hide file tree
Showing 14 changed files with 1,691 additions and 321 deletions.
461 changes: 461 additions & 0 deletions e2e_test/sink/postgres_sink.slt

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ profile:
- use: postgres
port: 8432
user: postgres
password: postgres
database: metadata
- use: meta-node
meta-backend: postgres
Expand Down
19 changes: 19 additions & 0 deletions src/common/src/types/jsonb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,3 +619,22 @@ impl<'a> FromSql<'a> for JsonbVal {
matches!(*ty, Type::JSONB)
}
}

impl ToSql for JsonbRef<'_> {
accepts!(JSONB);

to_sql_checked!();

fn to_sql(
&self,
_ty: &Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
where
Self: Sized,
{
out.put_u8(1);
write!(out, "{}", self.0).unwrap();
Ok(IsNull::No)
}
}
7 changes: 7 additions & 0 deletions src/common/src/types/postgres_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use postgres_types::Type as PgType;

use super::DataType;

/// `DataType` information extracted from PostgreSQL `pg_type`
Expand Down Expand Up @@ -149,4 +151,9 @@ impl DataType {
}
for_all_base_types! { impl_pg_name }
}

pub fn to_pg_type(&self) -> PgType {
let oid = self.to_oid();
PgType::from_oid(oid as u32).unwrap()
}
}
41 changes: 41 additions & 0 deletions src/common/src/types/to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::error::Error;

use bytes::BytesMut;
use postgres_types::{to_sql_checked, IsNull, ToSql, Type};
use risingwave_common::types::ScalarRefImpl;

use crate::types::ScalarImpl;

Expand Down Expand Up @@ -58,3 +59,43 @@ impl ToSql for ScalarImpl {
true
}
}

impl ToSql for ScalarRefImpl<'_> {
to_sql_checked!();

fn to_sql(&self, ty: &Type, out: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>>
where
Self: Sized,
{
match self {
ScalarRefImpl::Int16(v) => v.to_sql(ty, out),
ScalarRefImpl::Int32(v) => v.to_sql(ty, out),
ScalarRefImpl::Int64(v) => v.to_sql(ty, out),
ScalarRefImpl::Serial(v) => v.to_sql(ty, out),
ScalarRefImpl::Float32(v) => v.to_sql(ty, out),
ScalarRefImpl::Float64(v) => v.to_sql(ty, out),
ScalarRefImpl::Utf8(v) => v.to_sql(ty, out),
ScalarRefImpl::Bool(v) => v.to_sql(ty, out),
ScalarRefImpl::Decimal(v) => v.to_sql(ty, out),
ScalarRefImpl::Interval(v) => v.to_sql(ty, out),
ScalarRefImpl::Date(v) => v.to_sql(ty, out),
ScalarRefImpl::Timestamp(v) => v.to_sql(ty, out),
ScalarRefImpl::Timestamptz(v) => v.to_sql(ty, out),
ScalarRefImpl::Time(v) => v.to_sql(ty, out),
ScalarRefImpl::Bytea(v) => (&**v).to_sql(ty, out),
ScalarRefImpl::Jsonb(v) => v.to_sql(ty, out),
ScalarRefImpl::Int256(_) | ScalarRefImpl::Struct(_) | ScalarRefImpl::List(_) => {
bail_not_implemented!("the postgres encoding for {ty} is unsupported")
}
ScalarRefImpl::Map(_) => todo!(),
}
}

// return true to accept all types
fn accepts(_ty: &Type) -> bool
where
Self: Sized,
{
true
}
}
5 changes: 5 additions & 0 deletions src/connector/src/connector_common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ pub use common::{
};

mod iceberg;
#[cfg(not(madsim))]
mod maybe_tls_connector;
pub mod postgres;

pub use iceberg::IcebergCommon;
pub use postgres::{create_pg_client, PostgresExternalTable, SslMode};
Loading

0 comments on commit 15ba09d

Please sign in to comment.