From 967b92bbcc0a505d34be47659f38be5020f830d9 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 24 Aug 2023 15:11:20 +0800 Subject: [PATCH] fix data types --- src/connector/src/parser/mysql.rs | 45 +++++++++++++++------------- src/connector/src/source/external.rs | 6 +++- 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index cb6b1fa522330..c0260a3c8d6f1 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -87,7 +87,11 @@ pub fn mysql_row_to_datums(mysql_row: &mut MysqlRow, schema: &Schema) -> Vec(i); v.map(|v| ScalarImpl::from(JsonbVal::from(v))) } - _ => { + DataType::Interval + | DataType::Struct(_) + | DataType::List(_) + | DataType::Int256 + | DataType::Serial => { // Interval, Struct, List, Int256 are not supported tracing::warn!(rw_field.name, ?rw_field.data_type, "unsupported data type, set to Null"); None @@ -104,12 +108,14 @@ mod tests { use futures::pin_mut; use mysql_async::prelude::*; - use mysql_async::Row; + use mysql_async::Row as MySqlRow; use risingwave_common::catalog::{Field, Schema}; - use risingwave_common::row::OwnedRow; - use risingwave_common::types::{DataType, ScalarImpl}; + use risingwave_common::row::{OwnedRow, Row}; + use risingwave_common::types::{DataType, ScalarImpl, ToText}; use tokio_stream::StreamExt; + use crate::parser::mysql_row_to_datums; + // manual test case #[ignore] #[tokio::test] @@ -119,34 +125,31 @@ mod tests { let t1schema = Schema::new(vec![ Field::with_name(DataType::Int32, "v1"), Field::with_name(DataType::Int32, "v2"), + Field::with_name(DataType::Timestamptz, "v3"), ]); let mut conn = pool.get_conn().await.unwrap(); - let mut result_set = conn.query_iter("SELECT * FROM `t1`").await.unwrap(); - let s = result_set.stream::().await.unwrap().unwrap(); + conn.exec_drop("SET time_zone = \"+08:00\"", ()) + .await + .unwrap(); + + let mut result_set = conn.query_iter("SELECT * FROM `t1m`").await.unwrap(); + let s = result_set.stream::().await.unwrap().unwrap(); let row_stream = s.map(|row| { // convert mysql row into OwnedRow let mut mysql_row = row.unwrap(); - let mut datums = vec![]; - - let _mysql_columns = mysql_row.columns_ref(); - for i in 0..mysql_row.len() { - let rw_field = &t1schema.fields[i]; - let datum = match rw_field.data_type { - DataType::Int32 => { - let value = mysql_row.take::(i); - value.map(ScalarImpl::from) - } - _ => None, - }; - datums.push(datum); - } + let datums = mysql_row_to_datums(&mut mysql_row, &t1schema); Ok::<_, anyhow::Error>(Some(OwnedRow::new(datums))) }); pin_mut!(row_stream); while let Some(row) = row_stream.next().await { if let Ok(ro) = row && ro.is_some() { - println!("OwnedRow {:?}", ro); + let owned_row = ro.unwrap(); + let d = owned_row.datum_at(2); + d.map(|scalar| { + let v = scalar.into_timestamptz(); + println!("timestamp: {}", v.to_text()); + }); } } } diff --git a/src/connector/src/source/external.rs b/src/connector/src/source/external.rs index c2a796bcebeb4..5ab6b2f1ec06d 100644 --- a/src/connector/src/source/external.rs +++ b/src/connector/src/source/external.rs @@ -298,7 +298,7 @@ impl MySqlExternalTableReader { })?; let database_url = format!( - "mysql://{}:{}@{}:{}/{}", + "mysql://{}:{}@{}:{}/{}?timezone=UTC", config.username, config.password, config.host, config.port, config.database ); let pool = mysql_async::Pool::from_url(database_url)?; @@ -343,6 +343,10 @@ impl MySqlExternalTableReader { .get_conn() .await .map_err(|e| ConnectorError::Connection(anyhow!(e)))?; + + // Set session timezone to UTC + conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?; + if start_pk.is_none() { let mut result_set = conn.query_iter(sql).await?; let rs_stream = result_set.stream::().await?;