From 92998eeafe9a552f08acd50f3fc03a39e98be5e2 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Fri, 24 Nov 2023 02:20:58 +0800 Subject: [PATCH] test: add tests for mysql timestamp encoding --- Cargo.lock | 112 ++++++++++------------ src/common/time/src/datetime.rs | 12 ++- src/common/time/src/timestamp.rs | 11 ++- src/servers/Cargo.toml | 4 +- src/servers/src/mysql/writer.rs | 7 +- tests-integration/Cargo.toml | 4 + tests-integration/tests/sql.rs | 156 +++++++++++++++++++++++++++++++ 7 files changed, 239 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index def2104e34be..da0b43f05d94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -840,17 +840,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "bigdecimal" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6773ddc0eafc0e509fb60e48dff7f450f8e674a0686ae8605e8d9901bd5eefa" -dependencies = [ - "num-bigint", - "num-integer", - "num-traits", -] - [[package]] name = "bigdecimal" version = "0.4.2" @@ -1019,6 +1008,15 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "btoi" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd6407f73a9b8b6162d8a2ef999fe6afd7cc15902ebf42c5cd296addf17e0ad" +dependencies = [ + "num-traits", +] + [[package]] name = "build-data" version = "0.1.5" @@ -1643,7 +1641,7 @@ name = "common-decimal" version = "0.4.3" dependencies = [ "arrow", - "bigdecimal 0.4.2", + "bigdecimal", "common-error", "common-macro", "rust_decimal", @@ -4120,6 +4118,15 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "keyed_priority_queue" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ee7893dab2e44ae5f9d0173f26ff4aa327c10b01b06a72b52dd9405b628640d" +dependencies = [ + "indexmap 2.1.0", +] + [[package]] name = "lalrpop" version = "0.19.12" @@ -4166,15 +4173,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" -[[package]] -name = "lexical" -version = "6.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7aefb36fd43fef7003334742cbf77b243fcd36418a1d1bdd480d613a67968f6" -dependencies = [ - "lexical-core", -] - [[package]] name = "lexical-core" version = "0.8.5" @@ -4424,11 +4422,11 @@ dependencies = [ [[package]] name = "lru" -version = "0.10.1" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "718e8fae447df0c7e1ba7f5189829e63fd536945c8988d61444c19039f16b670" +checksum = "2994eeba8ed550fd9b47a0b38f0242bc3344e496483c6180b69139cc2fa5d1d7" dependencies = [ - "hashbrown 0.13.2", + "hashbrown 0.14.2", ] [[package]] @@ -4908,8 +4906,9 @@ dependencies = [ [[package]] name = "mysql_async" -version = "0.32.1" -source = "git+https://github.com/blackbeam/mysql_async.git?rev=32c6f2a986789f97108502c2d0c755a089411b66#32c6f2a986789f97108502c2d0c755a089411b66" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6750b17ce50f8f112ef1a8394121090d47c596b56a6a17569ca680a9626e2ef2" dependencies = [ "bytes", "crossbeam", @@ -4917,15 +4916,16 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", + "keyed_priority_queue", "lazy_static", "lru", "mio", "mysql_common", "once_cell", - "pem 2.0.1", + "pem 3.0.2", "percent-encoding", "pin-project", - "priority-queue", + "rand", "rustls 0.21.9", "rustls-pemfile", "serde", @@ -4938,20 +4938,21 @@ dependencies = [ "twox-hash", "url", "webpki", - "webpki-roots 0.23.1", + "webpki-roots 0.25.3", ] [[package]] name = "mysql_common" -version = "0.30.6" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57349d5a326b437989b6ee4dc8f2f34b0cc131202748414712a8e7d98952fc8c" +checksum = "06f19e4cfa0ab5a76b627cec2d81331c49b034988eaf302c3bafeada684eadef" dependencies = [ "base64 0.21.5", - "bigdecimal 0.3.1", + "bigdecimal", "bindgen", "bitflags 2.4.1", "bitvec", + "btoi", "byteorder", "bytes", "cc", @@ -4961,7 +4962,6 @@ dependencies = [ "flate2", "frunk", "lazy_static", - "lexical", "mysql-common-derive", "num-bigint", "num-traits", @@ -4978,6 +4978,7 @@ dependencies = [ "thiserror", "time", "uuid", + "zstd 0.12.4", ] [[package]] @@ -5349,9 +5350,9 @@ dependencies = [ [[package]] name = "opensrv-mysql" -version = "0.4.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c66063eb6aca9e6b5354f91db29f7244a8e7f9c01219b3ce76a5340a78d9f6f" +checksum = "208bfa36c4b4a8d6ac90eda62e34efa66f7e692df91bd3626bc47329844a86b1" dependencies = [ "async-trait", "byteorder", @@ -5821,6 +5822,16 @@ dependencies = [ "serde", ] +[[package]] +name = "pem" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3163d2912b7c3b52d651a055f2c7eec9ba5cd22d26ef75b8dd3a59980b185923" +dependencies = [ + "base64 0.21.5", + "serde", +] + [[package]] name = "pem-rfc7468" version = "0.3.1" @@ -6218,16 +6229,6 @@ dependencies = [ "syn 2.0.39", ] -[[package]] -name = "priority-queue" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fff39edfcaec0d64e8d0da38564fad195d2d51b680940295fcc307366e101e61" -dependencies = [ - "autocfg", - "indexmap 1.9.3", -] - [[package]] name = "proc-macro-crate" version = "1.3.1" @@ -7408,7 +7409,7 @@ checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9" dependencies = [ "log", "ring 0.17.5", - "rustls-webpki 0.101.7", + "rustls-webpki", "sct", ] @@ -7433,16 +7434,6 @@ dependencies = [ "base64 0.21.5", ] -[[package]] -name = "rustls-webpki" -version = "0.100.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f6a5fc258f1c1276dfe3016516945546e2d5383911efc0fc4f1cdc5df3a4ae3" -dependencies = [ - "ring 0.16.20", - "untrusted 0.7.1", -] - [[package]] name = "rustls-webpki" version = "0.101.7" @@ -9211,6 +9202,7 @@ dependencies = [ "itertools 0.10.5", "meta-client", "meta-srv", + "mysql_async", "num_cpus", "object-store", "once_cell", @@ -9236,6 +9228,7 @@ dependencies = [ "substrait 0.4.3", "table", "tempfile", + "time", "tokio", "tokio-postgres", "tonic 0.10.2", @@ -10477,12 +10470,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.23.1" +version = "0.25.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338" -dependencies = [ - "rustls-webpki 0.100.3", -] +checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10" [[package]] name = "which" diff --git a/src/common/time/src/datetime.rs b/src/common/time/src/datetime.rs index 451dc57cf7ed..7f1517523072 100644 --- a/src/common/time/src/datetime.rs +++ b/src/common/time/src/datetime.rs @@ -15,10 +15,11 @@ use std::fmt::{Display, Formatter}; use std::str::FromStr; -use chrono::{LocalResult, NaiveDateTime}; +use chrono::{LocalResult, NaiveDateTime, TimeZone as ChronoTimeZone, Utc}; use serde::{Deserialize, Serialize}; use crate::error::{Error, InvalidDateStrSnafu, Result}; +use crate::timezone::TimeZone; use crate::util::{format_utc_datetime, local_datetime_to_utc}; use crate::Date; @@ -108,6 +109,15 @@ impl DateTime { NaiveDateTime::from_timestamp_millis(self.0) } + pub fn to_chrono_datetime_with_timezone(&self, tz: Option) -> Option { + let datetime = self.to_chrono_datetime(); + datetime.map(|v| match tz { + Some(TimeZone::Offset(offset)) => offset.from_utc_datetime(&v).naive_local(), + Some(TimeZone::Named(tz)) => tz.from_utc_datetime(&v).naive_local(), + None => Utc.from_utc_datetime(&v).naive_local(), + }) + } + /// Convert to [common_time::date]. pub fn to_date(&self) -> Option { self.to_chrono_datetime().map(|d| Date::from(d.date())) diff --git a/src/common/time/src/timestamp.rs b/src/common/time/src/timestamp.rs index 1036b86a22f2..898e1c7b39c4 100644 --- a/src/common/time/src/timestamp.rs +++ b/src/common/time/src/timestamp.rs @@ -21,7 +21,7 @@ use std::time::Duration; use arrow::datatypes::TimeUnit as ArrowTimeUnit; use chrono::{ - DateTime, LocalResult, NaiveDate, NaiveDateTime, NaiveTime, TimeZone as ChronoTimeZone, + DateTime, LocalResult, NaiveDate, NaiveDateTime, NaiveTime, TimeZone as ChronoTimeZone, Utc, }; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -252,6 +252,15 @@ impl Timestamp { NaiveDateTime::from_timestamp_opt(sec, nsec) } + pub fn to_chrono_datetime_with_timezone(&self, tz: Option) -> Option { + let datetime = self.to_chrono_datetime(); + datetime.map(|v| match tz { + Some(TimeZone::Offset(offset)) => offset.from_utc_datetime(&v).naive_local(), + Some(TimeZone::Named(tz)) => tz.from_utc_datetime(&v).naive_local(), + None => Utc.from_utc_datetime(&v).naive_local(), + }) + } + /// Convert timestamp to chrono date. pub fn to_chrono_date(&self) -> Option { self.to_chrono_datetime().map(|ndt| ndt.date()) diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index caeb4863e362..0ea4641cd5ee 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -54,7 +54,7 @@ lazy_static.workspace = true mime_guess = "2.0" once_cell.workspace = true openmetrics-parser = "0.4" -opensrv-mysql = "0.4" +opensrv-mysql = "0.5" opentelemetry-proto.workspace = true parking_lot = "0.12" pgwire = "0.16" @@ -103,7 +103,7 @@ catalog = { workspace = true, features = ["testing"] } client.workspace = true common-base.workspace = true common-test-util.workspace = true -mysql_async = { git = "https://github.com/blackbeam/mysql_async.git", rev = "32c6f2a986789f97108502c2d0c755a089411b66", default-features = false, features = [ +mysql_async = { version = "0.33", default-features = false, features = [ "default-rustls", ] } rand.workspace = true diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index 081ba16bc018..6d92fb3804e6 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -192,8 +192,11 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { Value::String(v) => row_writer.write_col(v.as_utf8())?, Value::Binary(v) => row_writer.write_col(v.deref())?, Value::Date(v) => row_writer.write_col(v.to_chrono_date())?, - Value::DateTime(v) => row_writer.write_col(v.to_chrono_datetime())?, - Value::Timestamp(v) => row_writer.write_col(v.to_chrono_datetime())?, + // convert datetime and timestamp to timezone of current connection + Value::DateTime(v) => row_writer + .write_col(v.to_chrono_datetime_with_timezone(query_context.time_zone()))?, + Value::Timestamp(v) => row_writer + .write_col(v.to_chrono_datetime_with_timezone(query_context.time_zone()))?, Value::Interval(v) => row_writer.write_col(v.to_iso8601_string())?, Value::Duration(v) => row_writer.write_col(v.to_std_duration())?, Value::List(_) => { diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 4c09640de424..bb613302f79d 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -36,6 +36,9 @@ frontend = { workspace = true, features = ["testing"] } futures.workspace = true meta-client.workspace = true meta-srv = { workspace = true, features = ["mock"] } +mysql_async = { version = "0.33", default-features = false, features = [ + "default-rustls", +] } object-store.workspace = true once_cell.workspace = true operator.workspace = true @@ -59,6 +62,7 @@ sqlx = { version = "0.6", features = [ substrait.workspace = true table.workspace = true tempfile.workspace = true +time = "0.3" tokio.workspace = true tonic.workspace = true tower = "0.4" diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index 1f1a17164d33..d3c7914b4291 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -56,6 +56,7 @@ macro_rules! sql_tests { test_mysql_auth, test_mysql_crud, test_mysql_timezone, + test_mysql_async_timestamp, test_postgres_auth, test_postgres_crud, test_postgres_parameter_inference, @@ -423,3 +424,158 @@ pub async fn test_postgres_parameter_inference(store_type: StorageType) { let _ = fe_pg_server.shutdown().await; guard.remove_all().await; } + +pub async fn test_mysql_async_timestamp(store_type: StorageType) { + use mysql_async::prelude::*; + use time::PrimitiveDateTime; + + #[derive(Debug)] + struct CpuMetric { + hostname: String, + environment: String, + usage_user: f64, + usage_system: f64, + usage_idle: f64, + ts: i64, + } + + impl CpuMetric { + fn new( + hostname: String, + environment: String, + usage_user: f64, + usage_system: f64, + usage_idle: f64, + ts: i64, + ) -> Self { + Self { + hostname, + environment, + usage_user, + usage_system, + usage_idle, + ts, + } + } + } + common_telemetry::init_default_ut_logging(); + + let (addr, mut guard, fe_mysql_server) = setup_mysql_server(store_type, "sql_timestamp").await; + let url = format!("mysql://{addr}/public"); + let opts = mysql_async::Opts::from_url(&url).unwrap(); + let mut conn = mysql_async::Conn::new(opts) + .await + .expect("create connection failure"); + + r"CREATE TABLE IF NOT EXISTS cpu_metrics ( + hostname STRING, + environment STRING, + usage_user DOUBLE, + usage_system DOUBLE, + usage_idle DOUBLE, + ts TIMESTAMP, + TIME INDEX(ts), + PRIMARY KEY(hostname, environment) +);" + .ignore(&mut conn) + .await + .expect("create table failure"); + + let metrics = vec![ + CpuMetric::new( + "host0".into(), + "test".into(), + 32f64, + 3f64, + 4f64, + 1680307200050, + ), + CpuMetric::new( + "host1".into(), + "test".into(), + 29f64, + 32f64, + 50f64, + 1680307200050, + ), + CpuMetric::new( + "host0".into(), + "test".into(), + 32f64, + 3f64, + 4f64, + 1680307260050, + ), + CpuMetric::new( + "host1".into(), + "test".into(), + 29f64, + 32f64, + 50f64, + 1680307260050, + ), + CpuMetric::new( + "host0".into(), + "test".into(), + 32f64, + 3f64, + 4f64, + 1680307320050, + ), + CpuMetric::new( + "host1".into(), + "test".into(), + 29f64, + 32f64, + 50f64, + 1680307320050, + ), + ]; + + r"INSERT INTO cpu_metrics (hostname, environment, usage_user, usage_system, usage_idle, ts) + VALUES (:hostname, :environment, :usage_user, :usage_system, :usage_idle, :ts)" + .with(metrics.iter().map(|metric| { + params! { + "hostname" => &metric.hostname, + "environment" => &metric.environment, + "usage_user" => metric.usage_user, + "usage_system" => metric.usage_system, + "usage_idle" => metric.usage_idle, + "ts" => metric.ts, + } + })) + .batch(&mut conn) + .await + .expect("insert data failure"); + + // query data + let loaded_metrics = "SELECT * FROM cpu_metrics" + .with(()) + .map( + &mut conn, + |(hostname, environment, usage_user, usage_system, usage_idle, raw_ts): ( + String, + String, + f64, + f64, + f64, + PrimitiveDateTime, + )| { + let ts = raw_ts.assume_utc().unix_timestamp() * 1000; + CpuMetric::new( + hostname, + environment, + usage_user, + usage_system, + usage_idle, + ts, + ) + }, + ) + .await + .expect("query data failure"); + assert_eq!(loaded_metrics.len(), 6); + + let _ = fe_mysql_server.shutdown().await; + guard.remove_all().await; +}