Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Apr 23, 2024
1 parent 4023723 commit 9c8fb2d
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 43 deletions.
15 changes: 0 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ criterion = { workspace = true, features = ["async_tokio", "async"] }
deltalake = { workspace = true, features = ["datafusion"] }
expect-test = "1"
paste = "1"
postgres = "0.19"
pretty_assertions = "1"
quote = "1"
rand = { workspace = true }
Expand Down
71 changes: 44 additions & 27 deletions src/connector/src/parser/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
DataType::Varchar => {
if let Kind::Enum(_) = row.columns()[i].type_().kind() {
// enum type needs to be handled separately
let res = row.try_get::<_, Option<EnumParser>>(i);
let res = row.try_get::<_, Option<EnumString>>(i);
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.value)),
Err(err) => {
Expand Down Expand Up @@ -269,7 +269,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
if let Kind::Array(item_type) = row.columns()[i].type_().kind()
&& let Kind::Enum(_) = item_type.kind()
{
let res = row.try_get::<_, Option<Vec<EnumParser>>>(i);
let res = row.try_get::<_, Option<Vec<EnumString>>>(i);
match res {
Ok(val) => {
if let Some(v) = val {
Expand Down Expand Up @@ -522,16 +522,16 @@ fn pg_numeric_to_string(val: Option<PgNumeric>) -> Option<String> {
}

#[derive(Clone, Debug)]
struct EnumParser {
struct EnumString {
value: String,
}

impl<'a> FromSql<'a> for EnumParser {
impl<'a> FromSql<'a> for EnumString {
fn from_sql(
_ty: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + 'static + Sync + Send>> {
Ok(EnumParser {
Ok(EnumString {
value: String::from_utf8_lossy(raw).into_owned(),
})
}
Expand All @@ -541,7 +541,7 @@ impl<'a> FromSql<'a> for EnumParser {
}
}

impl ToSql for EnumParser {
impl ToSql for EnumString {
to_sql_checked!();

fn accepts(ty: &Type) -> bool {
Expand All @@ -563,69 +563,86 @@ impl ToSql for EnumParser {
Ok(IsNull::No)
} else {
Err(format!(
"EnumParser value {} is not in the enum type {:?}",
"EnumString value {} is not in the enum type {:?}",
self.value, e
)
.into())
}
}
_ => Err("EnumParser can only be used with ENUM types".into()),
_ => Err("EnumString can only be used with ENUM types".into()),
}
}
}

#[cfg(test)]
mod tests {
use postgres::{Client, NoTls};

use crate::parser::postgres::EnumParser;
use tokio_postgres::NoTls;

use crate::parser::postgres::EnumString;
const DB: &str = "postgres";
const USER: &str = "kexiang";

#[ignore]
#[test]
fn enum_parser_integration_test() {
#[tokio::test]
async fn enum_string_integration_test() {
let connect = format!(
"host=localhost port=5432 user={} password={} dbname={}",
USER, DB, DB
);
let mut dbconn = Client::connect(connect.as_str(), NoTls).unwrap();
let (client, connection) = tokio_postgres::connect(connect.as_str(), NoTls)
.await
.unwrap();

// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});

// allow type existed
let _ = dbconn.execute("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')", &[]);
dbconn
let _ = client
.execute("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')", &[])
.await;
client
.execute(
"CREATE TABLE IF NOT EXISTS person(id int PRIMARY KEY, current_mood mood)",
&[],
)
.await
.unwrap();
dbconn.execute("DELETE FROM person;", &[]).unwrap();
dbconn
client.execute("DELETE FROM person;", &[]).await.unwrap();
client
.execute("INSERT INTO person VALUES (1, 'happy')", &[])
.await
.unwrap();

// test from_sql
let got: EnumParser = dbconn
let got: EnumString = client
.query_one("SELECT * FROM person", &[])
.await
.unwrap()
.get::<usize, Option<EnumParser>>(1)
.get::<usize, Option<EnumString>>(1)
.unwrap();
assert_eq!("happy", got.value.as_str());

dbconn.execute("DELETE FROM person;", &[]).unwrap();
client.execute("DELETE FROM person", &[]).await.unwrap();

// test to_sql
dbconn
.execute("INSERT INTO foobar VALUES ($1)", &[&got])
client
.execute("INSERT INTO person VALUES (2, $1)", &[&got])
.await
.unwrap();

let got_new: EnumParser = dbconn
let got_new: EnumString = client
.query_one("SELECT * FROM person", &[])
.await
.unwrap()
.get::<usize, Option<EnumParser>>(1)
.get::<usize, Option<EnumString>>(1)
.unwrap();
assert_eq!("happy", got_new.value.as_str());
dbconn.execute("DROP TABLE person", &[]).unwrap();
dbconn.execute("DROP TYPE mood", &[]).unwrap();
client.execute("DROP TABLE person", &[]).await.unwrap();
client.execute("DROP TYPE mood", &[]).await.unwrap();
}
}

0 comments on commit 9c8fb2d

Please sign in to comment.