diff --git a/Cargo.lock b/Cargo.lock
index 8b2d83d9157c7..fffd5b98e4807 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -187,7 +187,7 @@ dependencies = [
"serde_json",
"snap",
"strum",
- "strum_macros",
+ "strum_macros 0.25.3",
"thiserror",
"typed-builder 0.16.2",
"uuid",
@@ -212,7 +212,7 @@ dependencies = [
"serde",
"serde_json",
"strum",
- "strum_macros",
+ "strum_macros 0.25.3",
"thiserror",
"typed-builder 0.18.0",
"uuid",
@@ -2222,7 +2222,7 @@ checksum = "7c64043d6c7b7a4c58e39e7efccfdea7b93d885a795d0c054a69dbbf4dd52686"
dependencies = [
"crossterm 0.27.0",
"strum",
- "strum_macros",
+ "strum_macros 0.25.3",
"unicode-width",
]
@@ -3077,7 +3077,7 @@ dependencies = [
"datafusion-common",
"sqlparser",
"strum",
- "strum_macros",
+ "strum_macros 0.25.3",
]
[[package]]
@@ -8689,7 +8689,7 @@ dependencies = [
"risingwave_rt",
"shell-words",
"strum",
- "strum_macros",
+ "strum_macros 0.26.1",
"task_stats_alloc",
"tempfile",
"tikv-jemallocator",
@@ -8787,7 +8787,7 @@ dependencies = [
"speedate",
"static_assertions",
"strum",
- "strum_macros",
+ "strum_macros 0.26.1",
"sysinfo",
"tempfile",
"thiserror",
@@ -9030,7 +9030,7 @@ dependencies = [
"serde_yaml",
"simd-json",
"strum",
- "strum_macros",
+ "strum_macros 0.26.1",
"syn 1.0.109",
"tempfile",
"thiserror",
@@ -11171,7 +11171,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "242f76c50fd18cbf098607090ade73a08d39cfd84ea835f3796a2c855223b19b"
dependencies = [
"strum",
- "strum_macros",
+ "strum_macros 0.25.3",
]
[[package]]
@@ -11563,7 +11563,7 @@ version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125"
dependencies = [
- "strum_macros",
+ "strum_macros 0.25.3",
]
[[package]]
@@ -11579,6 +11579,19 @@ dependencies = [
"syn 2.0.48",
]
+[[package]]
+name = "strum_macros"
+version = "0.26.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7a3417fc93d76740d974a01654a09777cb500428cc874ca9f45edfe0c4d4cd18"
+dependencies = [
+ "heck 0.4.1",
+ "proc-macro2",
+ "quote",
+ "rustversion",
+ "syn 2.0.48",
+]
+
[[package]]
name = "subprocess"
version = "0.2.9"
diff --git a/README.md b/README.md
index e1cd266da5552..44443cfab8282 100644
--- a/README.md
+++ b/README.md
@@ -9,7 +9,7 @@
-### 🌊Reimagine Stream Processing.
+### 🌊 Reimagine Stream Processing.
@@ -40,12 +40,6 @@
>
-
-
-
-RisingWave is a distributed SQL streaming database engineered to provide the simplest and most cost-efficient approach for processing, analyzing, and managing real-time event streaming data.
+RisingWave is a Postgres-compatible streaming database engineered to provide the simplest and most cost-efficient approach for processing, analyzing, and managing real-time event streaming data.
![RisingWave](https://github.com/risingwavelabs/risingwave-docs/blob/main/docs/images/new_archi_grey.png)
@@ -129,8 +123,9 @@ Within your data stack, RisingWave can assist with:
* Processing and transforming event streaming data in real time
* Offloading event-driven queries (e.g., materialized views, triggers) from operational databases
* Performing real-time ETL (Extract, Transform, Load)
+* Supporting real-time feature stores
-RisingWave is extensively utilized in real-time applications such as monitoring, alerting, dashboard reporting, ML feature engineering, among others. It has already been adopted in fields such as financial trading, manufacturing, new media, logistics, gaming, and more. Check out [customer stories](https://www.risingwave.com/use-cases/).
+RisingWave is extensively utilized in real-time applications such as monitoring, alerting, dashboard reporting, machine learning, among others. It has already been adopted in fields such as financial trading, manufacturing, new media, logistics, gaming, and more. Check out [customer stories](https://www.risingwave.com/use-cases/).
## Community
diff --git a/ci/scripts/gen-integration-test-yaml.py b/ci/scripts/gen-integration-test-yaml.py
index 8451290a93c93..b8bc655b60bb1 100644
--- a/ci/scripts/gen-integration-test-yaml.py
+++ b/ci/scripts/gen-integration-test-yaml.py
@@ -51,6 +51,8 @@ def gen_pipeline_steps():
command: ci/scripts/integration-tests.sh -c {test_case} -f {test_format}
timeout_in_minutes: 30
retry: *auto-retry
+ concurrency: 10
+ concurrency_group: 'integration-test/run'
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
diff --git a/e2e_test/batch/basic/case_when_optimization.slt.part b/e2e_test/batch/basic/case_when_optimization.slt.part
new file mode 100644
index 0000000000000..7e01be030a911
--- /dev/null
+++ b/e2e_test/batch/basic/case_when_optimization.slt.part
@@ -0,0 +1,242 @@
+statement ok
+SET RW_IMPLICIT_FLUSH TO true;
+
+statement ok
+CREATE TABLE t1 (c1 INT, c2 INT, c3 INT);
+
+statement ok
+INSERT INTO t1 VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (5, 5, 5),
+ (6, 6, 6), (7, 7, 7), (8, 8, 8), (9, 9, 9), (10, 10, 10),
+ (11, 11, 11), (12, 12, 12), (13, 13, 13), (14, 14, 14), (15, 15, 15),
+ (16, 16, 16), (17, 17, 17), (18, 18, 18), (19, 19, 19), (20, 20, 20),
+ (21, 21, 21), (22, 22, 22), (23, 23, 23), (24, 24, 24), (25, 25, 25),
+ (26, 26, 26), (27, 27, 27), (28, 28, 28), (29, 29, 29), (30, 30, 30),
+ (31, 31, 31), (32, 32, 32), (33, 33, 33), (34, 34, 34), (35, 35, 35),
+ (36, 36, 36), (37, 37, 37), (38, 38, 38), (39, 39, 39), (40, 40, 40),
+ (41, 41, 41), (42, 42, 42), (43, 43, 43), (44, 44, 44), (45, 45, 45),
+ (46, 46, 46), (47, 47, 47), (48, 48, 48), (49, 49, 49), (50, 50, 50),
+ (51, 51, 51), (52, 52, 52), (53, 53, 53), (54, 54, 54), (55, 55, 55),
+ (56, 56, 56), (57, 57, 57), (58, 58, 58), (59, 59, 59), (60, 60, 60),
+ (61, 61, 61), (62, 62, 62), (63, 63, 63), (64, 64, 64), (65, 65, 65),
+ (66, 66, 66), (67, 67, 67), (68, 68, 68), (69, 69, 69), (70, 70, 70),
+ (71, 71, 71), (72, 72, 72), (73, 73, 73), (74, 74, 74), (75, 75, 75),
+ (76, 76, 76), (77, 77, 77), (78, 78, 78), (79, 79, 79), (80, 80, 80),
+ (81, 81, 81), (82, 82, 82), (83, 83, 83), (84, 84, 84), (85, 85, 85),
+ (86, 86, 86), (87, 87, 87), (88, 88, 88), (89, 89, 89), (90, 90, 90),
+ (91, 91, 91), (92, 92, 92), (93, 93, 93), (94, 94, 94), (95, 95, 95),
+ (96, 96, 96), (97, 97, 97), (98, 98, 98), (99, 99, 99), (100, 100, 100);
+
+
+# 101 arms case-when expression, with optimizable pattern
+query I
+SELECT
+ CASE c1
+ WHEN 1 THEN 'one'
+ WHEN 2 THEN 'two'
+ WHEN 3 THEN 'three'
+ WHEN 4 THEN 'four'
+ WHEN 5 THEN 'five'
+ WHEN 6 THEN 'six'
+ WHEN 7 THEN 'seven'
+ WHEN 8 THEN 'eight'
+ WHEN 9 THEN 'nine'
+ WHEN 10 THEN 'ten'
+ WHEN 11 THEN 'eleven'
+ WHEN 12 THEN 'twelve'
+ WHEN 13 THEN 'thirteen'
+ WHEN 14 THEN 'fourteen'
+ WHEN 15 THEN 'fifteen'
+ WHEN 16 THEN 'sixteen'
+ WHEN 17 THEN 'seventeen'
+ WHEN 18 THEN 'eighteen'
+ WHEN 19 THEN 'nineteen'
+ WHEN 20 THEN 'twenty'
+ WHEN 21 THEN 'twenty-one'
+ WHEN 22 THEN 'twenty-two'
+ WHEN 23 THEN 'twenty-three'
+ WHEN 24 THEN 'twenty-four'
+ WHEN 25 THEN 'twenty-five'
+ WHEN 26 THEN 'twenty-six'
+ WHEN 27 THEN 'twenty-seven'
+ WHEN 28 THEN 'twenty-eight'
+ WHEN 29 THEN 'twenty-nine'
+ WHEN 30 THEN 'thirty'
+ WHEN 31 THEN 'thirty-one'
+ WHEN 32 THEN 'thirty-two'
+ WHEN 33 THEN 'thirty-three'
+ WHEN 34 THEN 'thirty-four'
+ WHEN 35 THEN 'thirty-five'
+ WHEN 36 THEN 'thirty-six'
+ WHEN 37 THEN 'thirty-seven'
+ WHEN 38 THEN 'thirty-eight'
+ WHEN 39 THEN 'thirty-nine'
+ WHEN 40 THEN 'forty'
+ WHEN 41 THEN 'forty-one'
+ WHEN 42 THEN 'forty-two'
+ WHEN 43 THEN 'forty-three'
+ WHEN 44 THEN 'forty-four'
+ WHEN 45 THEN 'forty-five'
+ WHEN 46 THEN 'forty-six'
+ WHEN 47 THEN 'forty-seven'
+ WHEN 48 THEN 'forty-eight'
+ WHEN 49 THEN 'forty-nine'
+ WHEN 50 THEN 'fifty'
+ WHEN 51 THEN 'fifty-one'
+ WHEN 52 THEN 'fifty-two'
+ WHEN 53 THEN 'fifty-three'
+ WHEN 54 THEN 'fifty-four'
+ WHEN 55 THEN 'fifty-five'
+ WHEN 56 THEN 'fifty-six'
+ WHEN 57 THEN 'fifty-seven'
+ WHEN 58 THEN 'fifty-eight'
+ WHEN 59 THEN 'fifty-nine'
+ WHEN 60 THEN 'sixty'
+ WHEN 61 THEN 'sixty-one'
+ WHEN 62 THEN 'sixty-two'
+ WHEN 63 THEN 'sixty-three'
+ WHEN 64 THEN 'sixty-four'
+ WHEN 65 THEN 'sixty-five'
+ WHEN 66 THEN 'sixty-six'
+ WHEN 67 THEN 'sixty-seven'
+ WHEN 68 THEN 'sixty-eight'
+ WHEN 69 THEN 'sixty-nine'
+ WHEN 70 THEN 'seventy'
+ WHEN 71 THEN 'seventy-one'
+ WHEN 72 THEN 'seventy-two'
+ WHEN 73 THEN 'seventy-three'
+ WHEN 74 THEN 'seventy-four'
+ WHEN 75 THEN 'seventy-five'
+ WHEN 76 THEN 'seventy-six'
+ WHEN 77 THEN 'seventy-seven'
+ WHEN 78 THEN 'seventy-eight'
+ WHEN 79 THEN 'seventy-nine'
+ WHEN 80 THEN 'eighty'
+ WHEN 81 THEN 'eighty-one'
+ WHEN 82 THEN 'eighty-two'
+ WHEN 83 THEN 'eighty-three'
+ WHEN 84 THEN 'eighty-four'
+ WHEN 85 THEN 'eighty-five'
+ WHEN 86 THEN 'eighty-six'
+ WHEN 87 THEN 'eighty-seven'
+ WHEN 88 THEN 'eighty-eight'
+ WHEN 89 THEN 'eighty-nine'
+ WHEN 90 THEN 'ninety'
+ WHEN 91 THEN 'ninety-one'
+ WHEN 92 THEN 'ninety-two'
+ WHEN 93 THEN 'ninety-three'
+ WHEN 94 THEN 'ninety-four'
+ WHEN 95 THEN 'ninety-five'
+ WHEN 96 THEN 'ninety-six'
+ WHEN 97 THEN 'ninety-seven'
+ WHEN 98 THEN 'ninety-eight'
+ WHEN 99 THEN 'ninety-nine'
+ WHEN 100 THEN 'one hundred'
+ ELSE
+ '114514'
+ END
+FROM t1
+ORDER BY c1 ASC;
+----
+one
+two
+three
+four
+five
+six
+seven
+eight
+nine
+ten
+eleven
+twelve
+thirteen
+fourteen
+fifteen
+sixteen
+seventeen
+eighteen
+nineteen
+twenty
+twenty-one
+twenty-two
+twenty-three
+twenty-four
+twenty-five
+twenty-six
+twenty-seven
+twenty-eight
+twenty-nine
+thirty
+thirty-one
+thirty-two
+thirty-three
+thirty-four
+thirty-five
+thirty-six
+thirty-seven
+thirty-eight
+thirty-nine
+forty
+forty-one
+forty-two
+forty-three
+forty-four
+forty-five
+forty-six
+forty-seven
+forty-eight
+forty-nine
+fifty
+fifty-one
+fifty-two
+fifty-three
+fifty-four
+fifty-five
+fifty-six
+fifty-seven
+fifty-eight
+fifty-nine
+sixty
+sixty-one
+sixty-two
+sixty-three
+sixty-four
+sixty-five
+sixty-six
+sixty-seven
+sixty-eight
+sixty-nine
+seventy
+seventy-one
+seventy-two
+seventy-three
+seventy-four
+seventy-five
+seventy-six
+seventy-seven
+seventy-eight
+seventy-nine
+eighty
+eighty-one
+eighty-two
+eighty-three
+eighty-four
+eighty-five
+eighty-six
+eighty-seven
+eighty-eight
+eighty-nine
+ninety
+ninety-one
+ninety-two
+ninety-three
+ninety-four
+ninety-five
+ninety-six
+ninety-seven
+ninety-eight
+ninety-nine
+one hundred
+
+statement ok
+drop table t1;
\ No newline at end of file
diff --git a/proto/expr.proto b/proto/expr.proto
index f62ee2936d115..7ab48a405d139 100644
--- a/proto/expr.proto
+++ b/proto/expr.proto
@@ -82,6 +82,9 @@ message ExprNode {
LTRIM = 210;
RTRIM = 211;
CASE = 212;
+ // Optimize case-when expression to constant lookup
+ // when arms are in a large scale with simple form
+ CONSTANT_LOOKUP = 624;
// ROUND(numeric, integer) -> numeric
ROUND_DIGIT = 213;
// ROUND(numeric) -> numeric
diff --git a/risedev.yml b/risedev.yml
index 7be1334deb4b4..93c7c0c1e90d0 100644
--- a/risedev.yml
+++ b/risedev.yml
@@ -570,7 +570,7 @@ profile:
steps:
- use: minio
api-requests-max: 30
- api-requests-deadline: 2s
+ api-requests-deadline: 3s
- use: etcd
unsafe-no-fsync: true
- use: meta-node
@@ -809,6 +809,28 @@ profile:
- use: frontend
- use: compactor
+ ci-3cn-1fe-with-recovery:
+ config-path: src/config/ci-recovery.toml
+ steps:
+ - use: minio
+ - use: etcd
+ unsafe-no-fsync: true
+ - use: meta-node
+ - use: compute-node
+ port: 5687
+ exporter-port: 1222
+ enable-tiered-cache: true
+ - use: compute-node
+ port: 5688
+ exporter-port: 1223
+ enable-tiered-cache: true
+ - use: compute-node
+ port: 5689
+ exporter-port: 1224
+ enable-tiered-cache: true
+ - use: frontend
+ - use: compactor
+
ci-1cn-1fe-kafka-with-recovery:
config-path: src/config/ci-recovery.toml
steps:
diff --git a/src/cmd_all/Cargo.toml b/src/cmd_all/Cargo.toml
index e5aa9e3c68d04..f5a08e6c4b688 100644
--- a/src/cmd_all/Cargo.toml
+++ b/src/cmd_all/Cargo.toml
@@ -35,7 +35,7 @@ risingwave_meta_node = { workspace = true }
risingwave_rt = { workspace = true }
shell-words = "1.1.0"
strum = "0.25"
-strum_macros = "0.25"
+strum_macros = "0.26"
tempfile = "3"
tikv-jemallocator = { workspace = true, features = [
"unprefixed_malloc_on_supported_platforms",
diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs
index a51fb03120313..24b90ad613da4 100644
--- a/src/cmd_all/src/standalone.rs
+++ b/src/cmd_all/src/standalone.rs
@@ -300,7 +300,6 @@ mod test {
connector_rpc_sink_payload_format: None,
config_path: "src/config/test.toml",
total_memory_bytes: 34359738368,
- mem_table_spill_threshold: 4194304,
parallelism: 10,
role: Both,
metrics_level: None,
diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml
index 78de81f426bfe..30ecaa87a8d1a 100644
--- a/src/common/Cargo.toml
+++ b/src/common/Cargo.toml
@@ -93,7 +93,7 @@ smallbitset = "0.7.1"
speedate = "0.13.0"
static_assertions = "1"
strum = "0.25"
-strum_macros = "0.25"
+strum_macros = "0.26"
sysinfo = { version = "0.30", default-features = false }
thiserror = "1"
thiserror-ext = { workspace = true }
diff --git a/src/common/src/types/datetime.rs b/src/common/src/types/datetime.rs
index bc18858d678ec..af6b54b057c82 100644
--- a/src/common/src/types/datetime.rs
+++ b/src/common/src/types/datetime.rs
@@ -536,6 +536,12 @@ impl Timestamp {
self.0.timestamp_nanos_opt().unwrap()
}
+ pub fn with_millis(timestamp_millis: i64) -> Result {
+ let secs = timestamp_millis.div_euclid(1_000);
+ let nsecs = timestamp_millis.rem_euclid(1_000) * 1_000_000;
+ Self::with_secs_nsecs(secs, nsecs as u32)
+ }
+
pub fn with_micros(timestamp_micros: i64) -> Result {
let secs = timestamp_micros.div_euclid(1_000_000);
let nsecs = timestamp_micros.rem_euclid(1_000_000) * 1000;
diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs
index 342448066dd06..b3125b76052a6 100644
--- a/src/compute/src/lib.rs
+++ b/src/compute/src/lib.rs
@@ -92,10 +92,6 @@ pub struct ComputeNodeOpts {
#[clap(long, env = "RW_TOTAL_MEMORY_BYTES", default_value_t = default_total_memory_bytes())]
pub total_memory_bytes: usize,
- /// Spill threshold for mem table.
- #[clap(long, env = "RW_MEM_TABLE_SPILL_THRESHOLD", default_value_t = default_mem_table_spill_threshold())]
- pub mem_table_spill_threshold: usize,
-
/// The parallelism that the compute node will register to the scheduler of the meta service.
#[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())]
#[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)]
@@ -231,10 +227,6 @@ fn default_total_memory_bytes() -> usize {
(system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
}
-fn default_mem_table_spill_threshold() -> usize {
- (4 << 20) as usize
-}
-
fn default_parallelism() -> usize {
total_cpu_available().ceil() as usize
}
diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml
index 2bf42bf38d81b..e8582d1f3e294 100644
--- a/src/connector/Cargo.toml
+++ b/src/connector/Cargo.toml
@@ -119,7 +119,7 @@ serde_json = "1"
serde_with = { version = "3", features = ["json"] }
simd-json = "0.13.3"
strum = "0.25"
-strum_macros = "0.25"
+strum_macros = "0.26"
tempfile = "3"
thiserror = "1"
thiserror-ext = { workspace = true }
diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs
index 7cc9cf14c1f84..a6c5c6fbef5d1 100644
--- a/src/connector/src/parser/avro/util.rs
+++ b/src/connector/src/parser/avro/util.rs
@@ -103,6 +103,8 @@ fn avro_type_mapping(schema: &Schema) -> anyhow::Result {
DataType::Decimal
}
Schema::Date => DataType::Date,
+ Schema::LocalTimestampMillis => DataType::Timestamp,
+ Schema::LocalTimestampMicros => DataType::Timestamp,
Schema::TimestampMillis => DataType::Timestamptz,
Schema::TimestampMicros => DataType::Timestamptz,
Schema::Duration => DataType::Interval,
diff --git a/src/connector/src/parser/unified/avro.rs b/src/connector/src/parser/unified/avro.rs
index 02cdcb4de4ca5..af5658331270d 100644
--- a/src/connector/src/parser/unified/avro.rs
+++ b/src/connector/src/parser/unified/avro.rs
@@ -23,10 +23,11 @@ use chrono::Datelike;
use itertools::Itertools;
use num_bigint::{BigInt, Sign};
use risingwave_common::array::{ListValue, StructValue};
-use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz};
use risingwave_common::error::Result as RwResult;
use risingwave_common::log::LogSuppresser;
-use risingwave_common::types::{DataType, Date, Datum, Interval, JsonbVal, ScalarImpl, Time};
+use risingwave_common::types::{
+ DataType, Date, Datum, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
+};
use risingwave_common::util::iter_util::ZipEqFast;
use super::{Access, AccessError, AccessResult};
@@ -181,19 +182,27 @@ impl<'a> AvroParseOptions<'a> {
}
(Some(DataType::Varchar) | None, Value::String(s)) => s.clone().into_boxed_str().into(),
// ---- Timestamp -----
- (Some(DataType::Timestamp) | None, Value::TimestampMillis(ms)) => {
- i64_to_timestamp(*ms).map_err(|_| create_error())?.into()
+ (Some(DataType::Timestamp) | None, Value::LocalTimestampMillis(ms)) => {
+ Timestamp::with_millis(*ms)
+ .map_err(|_| create_error())?
+ .into()
}
- (Some(DataType::Timestamp) | None, Value::TimestampMicros(us)) => {
- i64_to_timestamp(*us).map_err(|_| create_error())?.into()
+ (Some(DataType::Timestamp) | None, Value::LocalTimestampMicros(us)) => {
+ Timestamp::with_micros(*us)
+ .map_err(|_| create_error())?
+ .into()
}
// ---- TimestampTz -----
- (Some(DataType::Timestamptz), Value::TimestampMillis(ms)) => {
- i64_to_timestamptz(*ms).map_err(|_| create_error())?.into()
+ (Some(DataType::Timestamptz) | None, Value::TimestampMillis(ms)) => {
+ Timestamptz::from_millis(*ms)
+ .ok_or(AccessError::Other(anyhow!(
+ "timestamptz with milliseconds {ms} * 1000 is out of range",
+ )))?
+ .into()
}
- (Some(DataType::Timestamptz), Value::TimestampMicros(us)) => {
- i64_to_timestamptz(*us).map_err(|_| create_error())?.into()
+ (Some(DataType::Timestamptz) | None, Value::TimestampMicros(us)) => {
+ Timestamptz::from_micros(*us).into()
}
// ---- Interval -----
@@ -424,7 +433,7 @@ pub(crate) fn unix_epoch_days() -> i32 {
mod tests {
use apache_avro::Decimal as AvroDecimal;
use risingwave_common::error::{ErrorCode, RwError};
- use risingwave_common::types::{Decimal, Timestamp};
+ use risingwave_common::types::{Decimal, Timestamptz};
use super::*;
@@ -486,24 +495,24 @@ mod tests {
}
#[test]
- fn test_avro_timestamp_micros() {
- let v1 = Value::TimestampMicros(1620000000000);
- let v2 = Value::TimestampMillis(1620000000);
+ fn test_avro_timestamptz_micros() {
+ let v1 = Value::TimestampMicros(1620000000000000);
+ let v2 = Value::TimestampMillis(1620000000000);
let value_schema1 = Schema::TimestampMicros;
let value_schema2 = Schema::TimestampMillis;
- let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamp).unwrap();
- let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamp).unwrap();
+ let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamptz).unwrap();
+ let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamptz).unwrap();
assert_eq!(
datum1,
- Some(ScalarImpl::Timestamp(Timestamp::new(
- "2021-05-03T00:00:00".parse().unwrap()
- )))
+ Some(ScalarImpl::Timestamptz(
+ Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
+ ))
);
assert_eq!(
datum2,
- Some(ScalarImpl::Timestamp(Timestamp::new(
- "2021-05-03T00:00:00".parse().unwrap()
- )))
+ Some(ScalarImpl::Timestamptz(
+ Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
+ ))
);
}
diff --git a/src/expr/impl/src/scalar/array_positions.rs b/src/expr/impl/src/scalar/array_positions.rs
index 5218dbce6780f..cbae53c001439 100644
--- a/src/expr/impl/src/scalar/array_positions.rs
+++ b/src/expr/impl/src/scalar/array_positions.rs
@@ -66,10 +66,7 @@ use risingwave_expr::{function, ExprError, Result};
/// 2
/// ```
#[function("array_position(anyarray, any) -> int4")]
-fn array_position(
- array: Option>,
- element: Option>,
-) -> Result