Skip to content

Commit

Permalink
fix data types
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Aug 24, 2023
1 parent 8cb12ed commit 967b92b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 22 deletions.
45 changes: 24 additions & 21 deletions src/connector/src/parser/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ pub fn mysql_row_to_datums(mysql_row: &mut MysqlRow, schema: &Schema) -> Vec<Dat
let v = mysql_row.take::<serde_json::Value, _>(i);
v.map(|v| ScalarImpl::from(JsonbVal::from(v)))
}
_ => {
DataType::Interval
| DataType::Struct(_)
| DataType::List(_)
| DataType::Int256
| DataType::Serial => {
// Interval, Struct, List, Int256 are not supported
tracing::warn!(rw_field.name, ?rw_field.data_type, "unsupported data type, set to Null");
None
Expand All @@ -104,12 +108,14 @@ mod tests {

use futures::pin_mut;
use mysql_async::prelude::*;
use mysql_async::Row;
use mysql_async::Row as MySqlRow;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, ScalarImpl, ToText};
use tokio_stream::StreamExt;

use crate::parser::mysql_row_to_datums;

// manual test case
#[ignore]
#[tokio::test]
Expand All @@ -119,34 +125,31 @@ mod tests {
let t1schema = Schema::new(vec![
Field::with_name(DataType::Int32, "v1"),
Field::with_name(DataType::Int32, "v2"),
Field::with_name(DataType::Timestamptz, "v3"),
]);

let mut conn = pool.get_conn().await.unwrap();
let mut result_set = conn.query_iter("SELECT * FROM `t1`").await.unwrap();
let s = result_set.stream::<Row>().await.unwrap().unwrap();
conn.exec_drop("SET time_zone = \"+08:00\"", ())
.await
.unwrap();

let mut result_set = conn.query_iter("SELECT * FROM `t1m`").await.unwrap();
let s = result_set.stream::<MySqlRow>().await.unwrap().unwrap();
let row_stream = s.map(|row| {
// convert mysql row into OwnedRow
let mut mysql_row = row.unwrap();
let mut datums = vec![];

let _mysql_columns = mysql_row.columns_ref();
for i in 0..mysql_row.len() {
let rw_field = &t1schema.fields[i];
let datum = match rw_field.data_type {
DataType::Int32 => {
let value = mysql_row.take::<i32, _>(i);
value.map(ScalarImpl::from)
}
_ => None,
};
datums.push(datum);
}
let datums = mysql_row_to_datums(&mut mysql_row, &t1schema);
Ok::<_, anyhow::Error>(Some(OwnedRow::new(datums)))
});
pin_mut!(row_stream);
while let Some(row) = row_stream.next().await {
if let Ok(ro) = row && ro.is_some() {
println!("OwnedRow {:?}", ro);
let owned_row = ro.unwrap();
let d = owned_row.datum_at(2);
d.map(|scalar| {
let v = scalar.into_timestamptz();
println!("timestamp: {}", v.to_text());
});
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/source/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl MySqlExternalTableReader {
})?;

let database_url = format!(
"mysql://{}:{}@{}:{}/{}",
"mysql://{}:{}@{}:{}/{}?timezone=UTC",
config.username, config.password, config.host, config.port, config.database
);
let pool = mysql_async::Pool::from_url(database_url)?;
Expand Down Expand Up @@ -343,6 +343,10 @@ impl MySqlExternalTableReader {
.get_conn()
.await
.map_err(|e| ConnectorError::Connection(anyhow!(e)))?;

// Set session timezone to UTC
conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?;

if start_pk.is_none() {
let mut result_set = conn.query_iter(sql).await?;
let rs_stream = result_set.stream::<mysql_async::Row>().await?;
Expand Down

0 comments on commit 967b92b

Please sign in to comment.