diff --git a/Cargo.lock b/Cargo.lock
index c3af1be34d497..fffd5b98e4807 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -187,7 +187,7 @@ dependencies = [
"serde_json",
"snap",
"strum",
- "strum_macros",
+ "strum_macros 0.25.3",
"thiserror",
"typed-builder 0.16.2",
"uuid",
@@ -212,7 +212,7 @@ dependencies = [
"serde",
"serde_json",
"strum",
- "strum_macros",
+ "strum_macros 0.25.3",
"thiserror",
"typed-builder 0.18.0",
"uuid",
@@ -2222,7 +2222,7 @@ checksum = "7c64043d6c7b7a4c58e39e7efccfdea7b93d885a795d0c054a69dbbf4dd52686"
dependencies = [
"crossterm 0.27.0",
"strum",
- "strum_macros",
+ "strum_macros 0.25.3",
"unicode-width",
]
@@ -3077,7 +3077,7 @@ dependencies = [
"datafusion-common",
"sqlparser",
"strum",
- "strum_macros",
+ "strum_macros 0.25.3",
]
[[package]]
@@ -7997,6 +7997,7 @@ dependencies = [
"futures-io",
"futures-timer",
"log",
+ "lz4",
"native-tls",
"nom",
"oauth2",
@@ -8015,6 +8016,7 @@ dependencies = [
"tracing",
"url",
"uuid",
+ "zstd 0.12.4",
]
[[package]]
@@ -8687,7 +8689,7 @@ dependencies = [
"risingwave_rt",
"shell-words",
"strum",
- "strum_macros",
+ "strum_macros 0.26.1",
"task_stats_alloc",
"tempfile",
"tikv-jemallocator",
@@ -8785,7 +8787,7 @@ dependencies = [
"speedate",
"static_assertions",
"strum",
- "strum_macros",
+ "strum_macros 0.26.1",
"sysinfo",
"tempfile",
"thiserror",
@@ -9028,7 +9030,7 @@ dependencies = [
"serde_yaml",
"simd-json",
"strum",
- "strum_macros",
+ "strum_macros 0.26.1",
"syn 1.0.109",
"tempfile",
"thiserror",
@@ -9060,10 +9062,12 @@ dependencies = [
"clap",
"comfy-table",
"futures",
+ "hex",
"inquire",
"itertools 0.12.0",
"madsim-etcd-client",
"madsim-tokio",
+ "prost 0.12.1",
"regex",
"risingwave_common",
"risingwave_connector",
@@ -11167,7 +11171,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "242f76c50fd18cbf098607090ade73a08d39cfd84ea835f3796a2c855223b19b"
dependencies = [
"strum",
- "strum_macros",
+ "strum_macros 0.25.3",
]
[[package]]
@@ -11559,7 +11563,7 @@ version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125"
dependencies = [
- "strum_macros",
+ "strum_macros 0.25.3",
]
[[package]]
@@ -11575,6 +11579,19 @@ dependencies = [
"syn 2.0.48",
]
+[[package]]
+name = "strum_macros"
+version = "0.26.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7a3417fc93d76740d974a01654a09777cb500428cc874ca9f45edfe0c4d4cd18"
+dependencies = [
+ "heck 0.4.1",
+ "proc-macro2",
+ "quote",
+ "rustversion",
+ "syn 2.0.48",
+]
+
[[package]]
name = "subprocess"
version = "0.2.9"
diff --git a/README.md b/README.md
index e1cd266da5552..44443cfab8282 100644
--- a/README.md
+++ b/README.md
@@ -9,7 +9,7 @@
-### 🌊Reimagine Stream Processing.
+### 🌊 Reimagine Stream Processing.
@@ -40,12 +40,6 @@
>
-
-
-
-RisingWave is a distributed SQL streaming database engineered to provide the simplest and most cost-efficient approach for processing, analyzing, and managing real-time event streaming data.
+RisingWave is a Postgres-compatible streaming database engineered to provide the simplest and most cost-efficient approach for processing, analyzing, and managing real-time event streaming data.
![RisingWave](https://github.com/risingwavelabs/risingwave-docs/blob/main/docs/images/new_archi_grey.png)
@@ -129,8 +123,9 @@ Within your data stack, RisingWave can assist with:
* Processing and transforming event streaming data in real time
* Offloading event-driven queries (e.g., materialized views, triggers) from operational databases
* Performing real-time ETL (Extract, Transform, Load)
+* Supporting real-time feature stores
-RisingWave is extensively utilized in real-time applications such as monitoring, alerting, dashboard reporting, ML feature engineering, among others. It has already been adopted in fields such as financial trading, manufacturing, new media, logistics, gaming, and more. Check out [customer stories](https://www.risingwave.com/use-cases/).
+RisingWave is extensively utilized in real-time applications such as monitoring, alerting, dashboard reporting, machine learning, among others. It has already been adopted in fields such as financial trading, manufacturing, new media, logistics, gaming, and more. Check out [customer stories](https://www.risingwave.com/use-cases/).
## Community
diff --git a/ci/scripts/gen-integration-test-yaml.py b/ci/scripts/gen-integration-test-yaml.py
index 8451290a93c93..b8bc655b60bb1 100644
--- a/ci/scripts/gen-integration-test-yaml.py
+++ b/ci/scripts/gen-integration-test-yaml.py
@@ -51,6 +51,8 @@ def gen_pipeline_steps():
command: ci/scripts/integration-tests.sh -c {test_case} -f {test_format}
timeout_in_minutes: 30
retry: *auto-retry
+ concurrency: 10
+ concurrency_group: 'integration-test/run'
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
diff --git a/docker/docker-compose-distributed.yml b/docker/docker-compose-distributed.yml
index 8cb1e87651325..71b133e0c21f8 100644
--- a/docker/docker-compose-distributed.yml
+++ b/docker/docker-compose-distributed.yml
@@ -213,6 +213,8 @@ services:
- "0.0.0.0:5691"
- "--prometheus-host"
- "0.0.0.0:1250"
+ - "--prometheus-endpoint"
+ - "http://prometheus-0:9500"
- "--backend"
- etcd
- "--etcd-endpoints"
diff --git a/docker/docker-compose-with-azblob.yml b/docker/docker-compose-with-azblob.yml
index a2628b60e4742..6779919fceb1c 100644
--- a/docker/docker-compose-with-azblob.yml
+++ b/docker/docker-compose-with-azblob.yml
@@ -10,6 +10,7 @@ services:
--advertise-addr 0.0.0.0:5690 \
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
+ --prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
diff --git a/docker/docker-compose-with-gcs.yml b/docker/docker-compose-with-gcs.yml
index b036373994d69..85180ad0dfc7c 100644
--- a/docker/docker-compose-with-gcs.yml
+++ b/docker/docker-compose-with-gcs.yml
@@ -10,6 +10,7 @@ services:
--advertise-addr 0.0.0.0:5690 \
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
+ --prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
diff --git a/docker/docker-compose-with-hdfs.yml b/docker/docker-compose-with-hdfs.yml
index b0ed75296504a..74e21bbe3bdd5 100644
--- a/docker/docker-compose-with-hdfs.yml
+++ b/docker/docker-compose-with-hdfs.yml
@@ -206,6 +206,8 @@ services:
- "0.0.0.0:5691"
- "--prometheus-host"
- "0.0.0.0:1250"
+ - "--prometheus-endpoint"
+ - "http://prometheus-0:9500"
- "--backend"
- etcd
- "--etcd-endpoints"
diff --git a/docker/docker-compose-with-local-fs.yml b/docker/docker-compose-with-local-fs.yml
index e0bcafe088a47..8ffc0992747a3 100644
--- a/docker/docker-compose-with-local-fs.yml
+++ b/docker/docker-compose-with-local-fs.yml
@@ -10,6 +10,7 @@ services:
--advertise-addr 0.0.0.0:5690 \
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
+ --prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
diff --git a/docker/docker-compose-with-obs.yml b/docker/docker-compose-with-obs.yml
index 33f00a5aac2b1..66121628ba18a 100644
--- a/docker/docker-compose-with-obs.yml
+++ b/docker/docker-compose-with-obs.yml
@@ -10,6 +10,7 @@ services:
--advertise-addr 0.0.0.0:5690 \
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
+ --prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
diff --git a/docker/docker-compose-with-oss.yml b/docker/docker-compose-with-oss.yml
index 8978d48f11276..812a6fb5eaa07 100644
--- a/docker/docker-compose-with-oss.yml
+++ b/docker/docker-compose-with-oss.yml
@@ -10,6 +10,7 @@ services:
--advertise-addr 0.0.0.0:5690 \
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
+ --prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
diff --git a/docker/docker-compose-with-s3.yml b/docker/docker-compose-with-s3.yml
index b22af9e8c2ca7..4ce500f21ee5b 100644
--- a/docker/docker-compose-with-s3.yml
+++ b/docker/docker-compose-with-s3.yml
@@ -10,6 +10,7 @@ services:
--advertise-addr 0.0.0.0:5690 \
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
+ --prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
index f4530ed24a051..e12b86b0d48ca 100644
--- a/docker/docker-compose.yml
+++ b/docker/docker-compose.yml
@@ -10,6 +10,7 @@ services:
--advertise-addr 0.0.0.0:5690 \
--dashboard-host 0.0.0.0:5691 \
--prometheus-host 0.0.0.0:1250 \
+ --prometheus-endpoint http://prometheus-0:9500 \
--connector-rpc-endpoint 0.0.0.0:50051 \
--backend etcd \
--etcd-endpoints etcd-0:2388 \
diff --git a/e2e_test/batch/basic/case_when_optimization.slt.part b/e2e_test/batch/basic/case_when_optimization.slt.part
new file mode 100644
index 0000000000000..7e01be030a911
--- /dev/null
+++ b/e2e_test/batch/basic/case_when_optimization.slt.part
@@ -0,0 +1,242 @@
+statement ok
+SET RW_IMPLICIT_FLUSH TO true;
+
+statement ok
+CREATE TABLE t1 (c1 INT, c2 INT, c3 INT);
+
+statement ok
+INSERT INTO t1 VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (5, 5, 5),
+ (6, 6, 6), (7, 7, 7), (8, 8, 8), (9, 9, 9), (10, 10, 10),
+ (11, 11, 11), (12, 12, 12), (13, 13, 13), (14, 14, 14), (15, 15, 15),
+ (16, 16, 16), (17, 17, 17), (18, 18, 18), (19, 19, 19), (20, 20, 20),
+ (21, 21, 21), (22, 22, 22), (23, 23, 23), (24, 24, 24), (25, 25, 25),
+ (26, 26, 26), (27, 27, 27), (28, 28, 28), (29, 29, 29), (30, 30, 30),
+ (31, 31, 31), (32, 32, 32), (33, 33, 33), (34, 34, 34), (35, 35, 35),
+ (36, 36, 36), (37, 37, 37), (38, 38, 38), (39, 39, 39), (40, 40, 40),
+ (41, 41, 41), (42, 42, 42), (43, 43, 43), (44, 44, 44), (45, 45, 45),
+ (46, 46, 46), (47, 47, 47), (48, 48, 48), (49, 49, 49), (50, 50, 50),
+ (51, 51, 51), (52, 52, 52), (53, 53, 53), (54, 54, 54), (55, 55, 55),
+ (56, 56, 56), (57, 57, 57), (58, 58, 58), (59, 59, 59), (60, 60, 60),
+ (61, 61, 61), (62, 62, 62), (63, 63, 63), (64, 64, 64), (65, 65, 65),
+ (66, 66, 66), (67, 67, 67), (68, 68, 68), (69, 69, 69), (70, 70, 70),
+ (71, 71, 71), (72, 72, 72), (73, 73, 73), (74, 74, 74), (75, 75, 75),
+ (76, 76, 76), (77, 77, 77), (78, 78, 78), (79, 79, 79), (80, 80, 80),
+ (81, 81, 81), (82, 82, 82), (83, 83, 83), (84, 84, 84), (85, 85, 85),
+ (86, 86, 86), (87, 87, 87), (88, 88, 88), (89, 89, 89), (90, 90, 90),
+ (91, 91, 91), (92, 92, 92), (93, 93, 93), (94, 94, 94), (95, 95, 95),
+ (96, 96, 96), (97, 97, 97), (98, 98, 98), (99, 99, 99), (100, 100, 100);
+
+
+# 101 arms case-when expression, with optimizable pattern
+query I
+SELECT
+ CASE c1
+ WHEN 1 THEN 'one'
+ WHEN 2 THEN 'two'
+ WHEN 3 THEN 'three'
+ WHEN 4 THEN 'four'
+ WHEN 5 THEN 'five'
+ WHEN 6 THEN 'six'
+ WHEN 7 THEN 'seven'
+ WHEN 8 THEN 'eight'
+ WHEN 9 THEN 'nine'
+ WHEN 10 THEN 'ten'
+ WHEN 11 THEN 'eleven'
+ WHEN 12 THEN 'twelve'
+ WHEN 13 THEN 'thirteen'
+ WHEN 14 THEN 'fourteen'
+ WHEN 15 THEN 'fifteen'
+ WHEN 16 THEN 'sixteen'
+ WHEN 17 THEN 'seventeen'
+ WHEN 18 THEN 'eighteen'
+ WHEN 19 THEN 'nineteen'
+ WHEN 20 THEN 'twenty'
+ WHEN 21 THEN 'twenty-one'
+ WHEN 22 THEN 'twenty-two'
+ WHEN 23 THEN 'twenty-three'
+ WHEN 24 THEN 'twenty-four'
+ WHEN 25 THEN 'twenty-five'
+ WHEN 26 THEN 'twenty-six'
+ WHEN 27 THEN 'twenty-seven'
+ WHEN 28 THEN 'twenty-eight'
+ WHEN 29 THEN 'twenty-nine'
+ WHEN 30 THEN 'thirty'
+ WHEN 31 THEN 'thirty-one'
+ WHEN 32 THEN 'thirty-two'
+ WHEN 33 THEN 'thirty-three'
+ WHEN 34 THEN 'thirty-four'
+ WHEN 35 THEN 'thirty-five'
+ WHEN 36 THEN 'thirty-six'
+ WHEN 37 THEN 'thirty-seven'
+ WHEN 38 THEN 'thirty-eight'
+ WHEN 39 THEN 'thirty-nine'
+ WHEN 40 THEN 'forty'
+ WHEN 41 THEN 'forty-one'
+ WHEN 42 THEN 'forty-two'
+ WHEN 43 THEN 'forty-three'
+ WHEN 44 THEN 'forty-four'
+ WHEN 45 THEN 'forty-five'
+ WHEN 46 THEN 'forty-six'
+ WHEN 47 THEN 'forty-seven'
+ WHEN 48 THEN 'forty-eight'
+ WHEN 49 THEN 'forty-nine'
+ WHEN 50 THEN 'fifty'
+ WHEN 51 THEN 'fifty-one'
+ WHEN 52 THEN 'fifty-two'
+ WHEN 53 THEN 'fifty-three'
+ WHEN 54 THEN 'fifty-four'
+ WHEN 55 THEN 'fifty-five'
+ WHEN 56 THEN 'fifty-six'
+ WHEN 57 THEN 'fifty-seven'
+ WHEN 58 THEN 'fifty-eight'
+ WHEN 59 THEN 'fifty-nine'
+ WHEN 60 THEN 'sixty'
+ WHEN 61 THEN 'sixty-one'
+ WHEN 62 THEN 'sixty-two'
+ WHEN 63 THEN 'sixty-three'
+ WHEN 64 THEN 'sixty-four'
+ WHEN 65 THEN 'sixty-five'
+ WHEN 66 THEN 'sixty-six'
+ WHEN 67 THEN 'sixty-seven'
+ WHEN 68 THEN 'sixty-eight'
+ WHEN 69 THEN 'sixty-nine'
+ WHEN 70 THEN 'seventy'
+ WHEN 71 THEN 'seventy-one'
+ WHEN 72 THEN 'seventy-two'
+ WHEN 73 THEN 'seventy-three'
+ WHEN 74 THEN 'seventy-four'
+ WHEN 75 THEN 'seventy-five'
+ WHEN 76 THEN 'seventy-six'
+ WHEN 77 THEN 'seventy-seven'
+ WHEN 78 THEN 'seventy-eight'
+ WHEN 79 THEN 'seventy-nine'
+ WHEN 80 THEN 'eighty'
+ WHEN 81 THEN 'eighty-one'
+ WHEN 82 THEN 'eighty-two'
+ WHEN 83 THEN 'eighty-three'
+ WHEN 84 THEN 'eighty-four'
+ WHEN 85 THEN 'eighty-five'
+ WHEN 86 THEN 'eighty-six'
+ WHEN 87 THEN 'eighty-seven'
+ WHEN 88 THEN 'eighty-eight'
+ WHEN 89 THEN 'eighty-nine'
+ WHEN 90 THEN 'ninety'
+ WHEN 91 THEN 'ninety-one'
+ WHEN 92 THEN 'ninety-two'
+ WHEN 93 THEN 'ninety-three'
+ WHEN 94 THEN 'ninety-four'
+ WHEN 95 THEN 'ninety-five'
+ WHEN 96 THEN 'ninety-six'
+ WHEN 97 THEN 'ninety-seven'
+ WHEN 98 THEN 'ninety-eight'
+ WHEN 99 THEN 'ninety-nine'
+ WHEN 100 THEN 'one hundred'
+ ELSE
+ '114514'
+ END
+FROM t1
+ORDER BY c1 ASC;
+----
+one
+two
+three
+four
+five
+six
+seven
+eight
+nine
+ten
+eleven
+twelve
+thirteen
+fourteen
+fifteen
+sixteen
+seventeen
+eighteen
+nineteen
+twenty
+twenty-one
+twenty-two
+twenty-three
+twenty-four
+twenty-five
+twenty-six
+twenty-seven
+twenty-eight
+twenty-nine
+thirty
+thirty-one
+thirty-two
+thirty-three
+thirty-four
+thirty-five
+thirty-six
+thirty-seven
+thirty-eight
+thirty-nine
+forty
+forty-one
+forty-two
+forty-three
+forty-four
+forty-five
+forty-six
+forty-seven
+forty-eight
+forty-nine
+fifty
+fifty-one
+fifty-two
+fifty-three
+fifty-four
+fifty-five
+fifty-six
+fifty-seven
+fifty-eight
+fifty-nine
+sixty
+sixty-one
+sixty-two
+sixty-three
+sixty-four
+sixty-five
+sixty-six
+sixty-seven
+sixty-eight
+sixty-nine
+seventy
+seventy-one
+seventy-two
+seventy-three
+seventy-four
+seventy-five
+seventy-six
+seventy-seven
+seventy-eight
+seventy-nine
+eighty
+eighty-one
+eighty-two
+eighty-three
+eighty-four
+eighty-five
+eighty-six
+eighty-seven
+eighty-eight
+eighty-nine
+ninety
+ninety-one
+ninety-two
+ninety-three
+ninety-four
+ninety-five
+ninety-six
+ninety-seven
+ninety-eight
+ninety-nine
+one hundred
+
+statement ok
+drop table t1;
\ No newline at end of file
diff --git a/proto/expr.proto b/proto/expr.proto
index f62ee2936d115..7ab48a405d139 100644
--- a/proto/expr.proto
+++ b/proto/expr.proto
@@ -82,6 +82,9 @@ message ExprNode {
LTRIM = 210;
RTRIM = 211;
CASE = 212;
+ // Optimize case-when expression to constant lookup
+ // when arms are in a large scale with simple form
+ CONSTANT_LOOKUP = 624;
// ROUND(numeric, integer) -> numeric
ROUND_DIGIT = 213;
// ROUND(numeric) -> numeric
diff --git a/proto/monitor_service.proto b/proto/monitor_service.proto
index 7c7769da6b7ff..aa9880917d725 100644
--- a/proto/monitor_service.proto
+++ b/proto/monitor_service.proto
@@ -48,10 +48,25 @@ message AnalyzeHeapResponse {
bytes result = 1;
}
+// Back pressure
+message GetBackPressureRequest {}
+
+message BackPressureInfo {
+ uint32 actor_id = 1;
+ uint32 fragment_id = 2;
+ uint32 downstream_fragment_id = 3;
+ double value = 4;
+}
+
+message GetBackPressureResponse {
+ repeated BackPressureInfo back_pressure_infos = 1;
+}
+
service MonitorService {
rpc StackTrace(StackTraceRequest) returns (StackTraceResponse);
rpc Profiling(ProfilingRequest) returns (ProfilingResponse);
rpc HeapProfiling(HeapProfilingRequest) returns (HeapProfilingResponse);
rpc ListHeapProfiling(ListHeapProfilingRequest) returns (ListHeapProfilingResponse);
rpc AnalyzeHeap(AnalyzeHeapRequest) returns (AnalyzeHeapResponse);
+ rpc GetBackPressure(GetBackPressureRequest) returns (GetBackPressureResponse);
}
diff --git a/proto/stream_service.proto b/proto/stream_service.proto
index a632445aef7da..462f5ff0256a6 100644
--- a/proto/stream_service.proto
+++ b/proto/stream_service.proto
@@ -103,20 +103,6 @@ message WaitEpochCommitResponse {
common.Status status = 1;
}
-// Back pressure
-message GetBackPressureRequest {}
-
-message BackPressureInfo {
- uint32 actor_id = 1;
- uint32 fragment_id = 2;
- uint32 downstream_fragment_id = 3;
- double value = 4;
-}
-
-message GetBackPressureResponse {
- repeated BackPressureInfo back_pressure_infos = 1;
-}
-
service StreamService {
rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse);
rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse);
@@ -126,7 +112,6 @@ service StreamService {
rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse);
rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
- rpc GetBackPressure(GetBackPressureRequest) returns (GetBackPressureResponse);
}
// TODO: Lifecycle management for actors.
diff --git a/risedev.yml b/risedev.yml
index 7be1334deb4b4..93c7c0c1e90d0 100644
--- a/risedev.yml
+++ b/risedev.yml
@@ -570,7 +570,7 @@ profile:
steps:
- use: minio
api-requests-max: 30
- api-requests-deadline: 2s
+ api-requests-deadline: 3s
- use: etcd
unsafe-no-fsync: true
- use: meta-node
@@ -809,6 +809,28 @@ profile:
- use: frontend
- use: compactor
+ ci-3cn-1fe-with-recovery:
+ config-path: src/config/ci-recovery.toml
+ steps:
+ - use: minio
+ - use: etcd
+ unsafe-no-fsync: true
+ - use: meta-node
+ - use: compute-node
+ port: 5687
+ exporter-port: 1222
+ enable-tiered-cache: true
+ - use: compute-node
+ port: 5688
+ exporter-port: 1223
+ enable-tiered-cache: true
+ - use: compute-node
+ port: 5689
+ exporter-port: 1224
+ enable-tiered-cache: true
+ - use: frontend
+ - use: compactor
+
ci-1cn-1fe-kafka-with-recovery:
config-path: src/config/ci-recovery.toml
steps:
diff --git a/src/cmd_all/Cargo.toml b/src/cmd_all/Cargo.toml
index e5aa9e3c68d04..f5a08e6c4b688 100644
--- a/src/cmd_all/Cargo.toml
+++ b/src/cmd_all/Cargo.toml
@@ -35,7 +35,7 @@ risingwave_meta_node = { workspace = true }
risingwave_rt = { workspace = true }
shell-words = "1.1.0"
strum = "0.25"
-strum_macros = "0.25"
+strum_macros = "0.26"
tempfile = "3"
tikv-jemallocator = { workspace = true, features = [
"unprefixed_malloc_on_supported_platforms",
diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs
index a51fb03120313..24b90ad613da4 100644
--- a/src/cmd_all/src/standalone.rs
+++ b/src/cmd_all/src/standalone.rs
@@ -300,7 +300,6 @@ mod test {
connector_rpc_sink_payload_format: None,
config_path: "src/config/test.toml",
total_memory_bytes: 34359738368,
- mem_table_spill_threshold: 4194304,
parallelism: 10,
role: Both,
metrics_level: None,
diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml
index 78de81f426bfe..30ecaa87a8d1a 100644
--- a/src/common/Cargo.toml
+++ b/src/common/Cargo.toml
@@ -93,7 +93,7 @@ smallbitset = "0.7.1"
speedate = "0.13.0"
static_assertions = "1"
strum = "0.25"
-strum_macros = "0.25"
+strum_macros = "0.26"
sysinfo = { version = "0.30", default-features = false }
thiserror = "1"
thiserror-ext = { workspace = true }
diff --git a/src/common/src/config.rs b/src/common/src/config.rs
index b8c40cdb9442e..7f7cb500d1f47 100644
--- a/src/common/src/config.rs
+++ b/src/common/src/config.rs
@@ -406,6 +406,12 @@ pub struct MetaDeveloperConfig {
/// in the meta node.
#[serde(default = "default::developer::meta_cached_traces_memory_limit_bytes")]
pub cached_traces_memory_limit_bytes: usize,
+
+ /// Compaction picker config
+ #[serde(default = "default::developer::enable_trivial_move")]
+ pub enable_trivial_move: bool,
+ #[serde(default = "default::developer::enable_check_task_level_overlap")]
+ pub enable_check_task_level_overlap: bool,
}
/// The section `[server]` in `risingwave.toml`.
@@ -958,7 +964,23 @@ impl SystemConfig {
};
}
- for_all_params!(fields)
+ let mut system_params = for_all_params!(fields);
+
+ // Initialize backup_storage_url and backup_storage_directory if not set.
+ if let Some(state_store) = &system_params.state_store
+ && let Some(data_directory) = &system_params.data_directory
+ && let Some(hummock_state_store) = state_store.strip_prefix("hummock+")
+ {
+ if system_params.backup_storage_url.is_none() {
+ system_params.backup_storage_url = Some(hummock_state_store.to_owned());
+ tracing::info!("initialize backup_storage_url based on state_store");
+ }
+ if system_params.backup_storage_directory.is_none() {
+ system_params.backup_storage_directory = Some(format!("{data_directory}/backup"));
+ tracing::info!("initialize backup_storage_directory based on data_directory");
+ }
+ }
+ system_params
}
}
@@ -1406,6 +1428,14 @@ pub mod default {
pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
64 << 20 // 64MB
}
+
+ pub fn enable_trivial_move() -> bool {
+ true
+ }
+
+ pub fn enable_check_task_level_overlap() -> bool {
+ false
+ }
}
pub use crate::system_param::default as system;
diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs
index a55b236f4b310..278390887dd51 100644
--- a/src/common/src/system_param/mod.rs
+++ b/src/common/src/system_param/mod.rs
@@ -82,8 +82,8 @@ macro_rules! for_all_params {
{ bloom_false_positive, f64, Some(0.001_f64), false, "False positive probability of bloom filter.", },
{ state_store, String, None, false, "", },
{ data_directory, String, None, false, "Remote directory for storing data and metadata objects.", },
- { backup_storage_url, String, Some("memory".to_string()), true, "Remote storage url for storing snapshots.", },
- { backup_storage_directory, String, Some("backup".to_string()), true, "Remote directory for storing snapshots.", },
+ { backup_storage_url, String, None, true, "Remote storage url for storing snapshots.", },
+ { backup_storage_directory, String, None, true, "Remote directory for storing snapshots.", },
{ max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", },
{ pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", },
{ wasm_storage_url, String, Some("fs://.risingwave/data".to_string()), false, "", },
@@ -374,6 +374,8 @@ macro_rules! impl_system_params_for_test {
};
ret.data_directory = Some("hummock_001".to_string());
ret.state_store = Some("hummock+memory".to_string());
+ ret.backup_storage_url = Some("memory".into());
+ ret.backup_storage_directory = Some("backup".into());
ret
}
};
@@ -398,13 +400,17 @@ impl ValidateOnSet for OverrideValidateOnSet {
Self::expect_range(*v, 1..)
}
- fn backup_storage_directory(_v: &String) -> Result<()> {
- // TODO
+ fn backup_storage_directory(v: &String) -> Result<()> {
+ if v.trim().is_empty() {
+ return Err("backup_storage_directory cannot be empty".into());
+ }
Ok(())
}
- fn backup_storage_url(_v: &String) -> Result<()> {
- // TODO
+ fn backup_storage_url(v: &String) -> Result<()> {
+ if v.trim().is_empty() {
+ return Err("backup_storage_url cannot be empty".into());
+ }
Ok(())
}
}
diff --git a/src/common/src/types/datetime.rs b/src/common/src/types/datetime.rs
index bc18858d678ec..af6b54b057c82 100644
--- a/src/common/src/types/datetime.rs
+++ b/src/common/src/types/datetime.rs
@@ -536,6 +536,12 @@ impl Timestamp {
self.0.timestamp_nanos_opt().unwrap()
}
+ pub fn with_millis(timestamp_millis: i64) -> Result {
+ let secs = timestamp_millis.div_euclid(1_000);
+ let nsecs = timestamp_millis.rem_euclid(1_000) * 1_000_000;
+ Self::with_secs_nsecs(secs, nsecs as u32)
+ }
+
pub fn with_micros(timestamp_micros: i64) -> Result {
let secs = timestamp_micros.div_euclid(1_000_000);
let nsecs = timestamp_micros.rem_euclid(1_000_000) * 1000;
diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs
index 342448066dd06..b3125b76052a6 100644
--- a/src/compute/src/lib.rs
+++ b/src/compute/src/lib.rs
@@ -92,10 +92,6 @@ pub struct ComputeNodeOpts {
#[clap(long, env = "RW_TOTAL_MEMORY_BYTES", default_value_t = default_total_memory_bytes())]
pub total_memory_bytes: usize,
- /// Spill threshold for mem table.
- #[clap(long, env = "RW_MEM_TABLE_SPILL_THRESHOLD", default_value_t = default_mem_table_spill_threshold())]
- pub mem_table_spill_threshold: usize,
-
/// The parallelism that the compute node will register to the scheduler of the meta service.
#[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())]
#[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)]
@@ -231,10 +227,6 @@ fn default_total_memory_bytes() -> usize {
(system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
}
-fn default_mem_table_spill_threshold() -> usize {
- (4 << 20) as usize
-}
-
fn default_parallelism() -> usize {
total_cpu_available().ceil() as usize
}
diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs
index 82c254e58d62d..b832b7827adda 100644
--- a/src/compute/src/rpc/service/monitor_service.rs
+++ b/src/compute/src/rpc/service/monitor_service.rs
@@ -19,15 +19,18 @@ use std::sync::Arc;
use std::time::Duration;
use itertools::Itertools;
-use risingwave_common::config::ServerConfig;
+use prometheus::core::Collector;
+use risingwave_common::config::{MetricLevel, ServerConfig};
use risingwave_common_heap_profiling::{AUTO_DUMP_SUFFIX, COLLAPSED_SUFFIX, MANUALLY_DUMP_SUFFIX};
use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
use risingwave_pb::monitor_service::{
- AnalyzeHeapRequest, AnalyzeHeapResponse, HeapProfilingRequest, HeapProfilingResponse,
- ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse,
- StackTraceRequest, StackTraceResponse,
+ AnalyzeHeapRequest, AnalyzeHeapResponse, BackPressureInfo, GetBackPressureRequest,
+ GetBackPressureResponse, HeapProfilingRequest, HeapProfilingResponse, ListHeapProfilingRequest,
+ ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, StackTraceRequest,
+ StackTraceResponse,
};
use risingwave_rpc_client::error::ToTonicStatus;
+use risingwave_stream::executor::monitor::global_streaming_metrics;
use risingwave_stream::task::LocalStreamManager;
use thiserror_ext::AsReport;
use tonic::{Code, Request, Response, Status};
@@ -229,6 +232,39 @@ impl MonitorService for MonitorServiceImpl {
let file = fs::read(Path::new(&collapsed_path_str))?;
Ok(Response::new(AnalyzeHeapResponse { result: file }))
}
+
+ #[cfg_attr(coverage, coverage(off))]
+ async fn get_back_pressure(
+ &self,
+ _request: Request,
+ ) -> Result, Status> {
+ let metric_family = global_streaming_metrics(MetricLevel::Info)
+ .actor_output_buffer_blocking_duration_ns
+ .collect();
+ let metrics = metric_family.get(0).unwrap().get_metric();
+ let mut back_pressure_infos: Vec = Vec::new();
+ for label_pairs in metrics {
+ let mut back_pressure_info = BackPressureInfo::default();
+ for label_pair in label_pairs.get_label() {
+ if label_pair.get_name() == "actor_id" {
+ back_pressure_info.actor_id = label_pair.get_value().parse::().unwrap();
+ }
+ if label_pair.get_name() == "fragment_id" {
+ back_pressure_info.fragment_id = label_pair.get_value().parse::().unwrap();
+ }
+ if label_pair.get_name() == "downstream_fragment_id" {
+ back_pressure_info.downstream_fragment_id =
+ label_pair.get_value().parse::().unwrap();
+ }
+ }
+ back_pressure_info.value = label_pairs.get_counter().get_value();
+ back_pressure_infos.push(back_pressure_info);
+ }
+
+ Ok(Response::new(GetBackPressureResponse {
+ back_pressure_infos,
+ }))
+ }
}
pub use grpc_middleware::*;
diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs
index 78572f8f09727..2640b505b7873 100644
--- a/src/compute/src/rpc/service/stream_service.rs
+++ b/src/compute/src/rpc/service/stream_service.rs
@@ -16,8 +16,6 @@ use std::sync::Arc;
use await_tree::InstrumentAwait;
use itertools::Itertools;
-use prometheus::core::Collector;
-use risingwave_common::config::MetricLevel;
use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
use risingwave_hummock_sdk::LocalSstableInfo;
use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo;
@@ -25,7 +23,6 @@ use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
use risingwave_stream::error::StreamError;
-use risingwave_stream::executor::monitor::global_streaming_metrics;
use risingwave_stream::executor::Barrier;
use risingwave_stream::task::{BarrierCompleteResult, LocalStreamManager, StreamEnvironment};
use thiserror_ext::AsReport;
@@ -241,37 +238,4 @@ impl StreamService for StreamServiceImpl {
Ok(Response::new(WaitEpochCommitResponse { status: None }))
}
-
- #[cfg_attr(coverage, coverage(off))]
- async fn get_back_pressure(
- &self,
- _request: Request,
- ) -> Result, Status> {
- let metric_family = global_streaming_metrics(MetricLevel::Info)
- .actor_output_buffer_blocking_duration_ns
- .collect();
- let metrics = metric_family.get(0).unwrap().get_metric();
- let mut back_pressure_infos: Vec = Vec::new();
- for label_pairs in metrics {
- let mut back_pressure_info = BackPressureInfo::default();
- for label_pair in label_pairs.get_label() {
- if label_pair.get_name() == "actor_id" {
- back_pressure_info.actor_id = label_pair.get_value().parse::().unwrap();
- }
- if label_pair.get_name() == "fragment_id" {
- back_pressure_info.fragment_id = label_pair.get_value().parse::().unwrap();
- }
- if label_pair.get_name() == "downstream_fragment_id" {
- back_pressure_info.downstream_fragment_id =
- label_pair.get_value().parse::().unwrap();
- }
- }
- back_pressure_info.value = label_pairs.get_counter().get_value();
- back_pressure_infos.push(back_pressure_info);
- }
-
- Ok(Response::new(GetBackPressureResponse {
- back_pressure_infos,
- }))
- }
}
diff --git a/src/config/ci-meta-backup-test.toml b/src/config/ci-meta-backup-test.toml
index 9559390360e33..c15230262655c 100644
--- a/src/config/ci-meta-backup-test.toml
+++ b/src/config/ci-meta-backup-test.toml
@@ -4,7 +4,6 @@ collect_gc_watermark_spin_interval_sec = 1
vacuum_interval_sec = 10
[system]
-backup_storage_url = "minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001"
backup_storage_directory = "backup"
[server]
diff --git a/src/config/example.toml b/src/config/example.toml
index 914cfe63889d4..34937858db526 100644
--- a/src/config/example.toml
+++ b/src/config/example.toml
@@ -70,6 +70,8 @@ enable_emergency_picker = true
[meta.developer]
meta_cached_traces_num = 256
meta_cached_traces_memory_limit_bytes = 134217728
+meta_enable_trivial_move = true
+meta_enable_check_task_level_overlap = false
[batch]
enable_barrier_read = false
@@ -192,8 +194,6 @@ sstable_size_mb = 256
parallel_compact_size_mb = 512
block_size_kb = 64
bloom_false_positive = 0.001
-backup_storage_url = "memory"
-backup_storage_directory = "backup"
max_concurrent_creating_streaming_jobs = 1
pause_on_next_bootstrap = false
wasm_storage_url = "fs://.risingwave/data"
diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml
index 5231a55ea0583..e8582d1f3e294 100644
--- a/src/connector/Cargo.toml
+++ b/src/connector/Cargo.toml
@@ -92,6 +92,8 @@ pulsar = { version = "6.1", default-features = false, features = [
"tokio-runtime",
"telemetry",
"auth-oauth2",
+ "lz4",
+ "zstd",
] }
rdkafka = { workspace = true, features = [
"cmake-build",
@@ -117,7 +119,7 @@ serde_json = "1"
serde_with = { version = "3", features = ["json"] }
simd-json = "0.13.3"
strum = "0.25"
-strum_macros = "0.25"
+strum_macros = "0.26"
tempfile = "3"
thiserror = "1"
thiserror-ext = { workspace = true }
diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs
index 7cc9cf14c1f84..a6c5c6fbef5d1 100644
--- a/src/connector/src/parser/avro/util.rs
+++ b/src/connector/src/parser/avro/util.rs
@@ -103,6 +103,8 @@ fn avro_type_mapping(schema: &Schema) -> anyhow::Result {
DataType::Decimal
}
Schema::Date => DataType::Date,
+ Schema::LocalTimestampMillis => DataType::Timestamp,
+ Schema::LocalTimestampMicros => DataType::Timestamp,
Schema::TimestampMillis => DataType::Timestamptz,
Schema::TimestampMicros => DataType::Timestamptz,
Schema::Duration => DataType::Interval,
diff --git a/src/connector/src/parser/unified/avro.rs b/src/connector/src/parser/unified/avro.rs
index 02cdcb4de4ca5..af5658331270d 100644
--- a/src/connector/src/parser/unified/avro.rs
+++ b/src/connector/src/parser/unified/avro.rs
@@ -23,10 +23,11 @@ use chrono::Datelike;
use itertools::Itertools;
use num_bigint::{BigInt, Sign};
use risingwave_common::array::{ListValue, StructValue};
-use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz};
use risingwave_common::error::Result as RwResult;
use risingwave_common::log::LogSuppresser;
-use risingwave_common::types::{DataType, Date, Datum, Interval, JsonbVal, ScalarImpl, Time};
+use risingwave_common::types::{
+ DataType, Date, Datum, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
+};
use risingwave_common::util::iter_util::ZipEqFast;
use super::{Access, AccessError, AccessResult};
@@ -181,19 +182,27 @@ impl<'a> AvroParseOptions<'a> {
}
(Some(DataType::Varchar) | None, Value::String(s)) => s.clone().into_boxed_str().into(),
// ---- Timestamp -----
- (Some(DataType::Timestamp) | None, Value::TimestampMillis(ms)) => {
- i64_to_timestamp(*ms).map_err(|_| create_error())?.into()
+ (Some(DataType::Timestamp) | None, Value::LocalTimestampMillis(ms)) => {
+ Timestamp::with_millis(*ms)
+ .map_err(|_| create_error())?
+ .into()
}
- (Some(DataType::Timestamp) | None, Value::TimestampMicros(us)) => {
- i64_to_timestamp(*us).map_err(|_| create_error())?.into()
+ (Some(DataType::Timestamp) | None, Value::LocalTimestampMicros(us)) => {
+ Timestamp::with_micros(*us)
+ .map_err(|_| create_error())?
+ .into()
}
// ---- TimestampTz -----
- (Some(DataType::Timestamptz), Value::TimestampMillis(ms)) => {
- i64_to_timestamptz(*ms).map_err(|_| create_error())?.into()
+ (Some(DataType::Timestamptz) | None, Value::TimestampMillis(ms)) => {
+ Timestamptz::from_millis(*ms)
+ .ok_or(AccessError::Other(anyhow!(
+ "timestamptz with milliseconds {ms} * 1000 is out of range",
+ )))?
+ .into()
}
- (Some(DataType::Timestamptz), Value::TimestampMicros(us)) => {
- i64_to_timestamptz(*us).map_err(|_| create_error())?.into()
+ (Some(DataType::Timestamptz) | None, Value::TimestampMicros(us)) => {
+ Timestamptz::from_micros(*us).into()
}
// ---- Interval -----
@@ -424,7 +433,7 @@ pub(crate) fn unix_epoch_days() -> i32 {
mod tests {
use apache_avro::Decimal as AvroDecimal;
use risingwave_common::error::{ErrorCode, RwError};
- use risingwave_common::types::{Decimal, Timestamp};
+ use risingwave_common::types::{Decimal, Timestamptz};
use super::*;
@@ -486,24 +495,24 @@ mod tests {
}
#[test]
- fn test_avro_timestamp_micros() {
- let v1 = Value::TimestampMicros(1620000000000);
- let v2 = Value::TimestampMillis(1620000000);
+ fn test_avro_timestamptz_micros() {
+ let v1 = Value::TimestampMicros(1620000000000000);
+ let v2 = Value::TimestampMillis(1620000000000);
let value_schema1 = Schema::TimestampMicros;
let value_schema2 = Schema::TimestampMillis;
- let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamp).unwrap();
- let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamp).unwrap();
+ let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamptz).unwrap();
+ let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamptz).unwrap();
assert_eq!(
datum1,
- Some(ScalarImpl::Timestamp(Timestamp::new(
- "2021-05-03T00:00:00".parse().unwrap()
- )))
+ Some(ScalarImpl::Timestamptz(
+ Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
+ ))
);
assert_eq!(
datum2,
- Some(ScalarImpl::Timestamp(Timestamp::new(
- "2021-05-03T00:00:00".parse().unwrap()
- )))
+ Some(ScalarImpl::Timestamptz(
+ Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
+ ))
);
}
diff --git a/src/ctl/Cargo.toml b/src/ctl/Cargo.toml
index 5632846c64851..db8e96a7df27d 100644
--- a/src/ctl/Cargo.toml
+++ b/src/ctl/Cargo.toml
@@ -21,8 +21,10 @@ clap = { version = "4", features = ["derive"] }
comfy-table = "7"
etcd-client = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
+hex = "0.4"
inquire = "0.6.2"
itertools = "0.12"
+prost = { workspace = true }
regex = "1.10.0"
risingwave_common = { workspace = true }
risingwave_connector = { workspace = true }
diff --git a/src/ctl/src/cmd_impl/hummock/validate_version.rs b/src/ctl/src/cmd_impl/hummock/validate_version.rs
index c09b93e948143..b2ae1c22f66cf 100644
--- a/src/ctl/src/cmd_impl/hummock/validate_version.rs
+++ b/src/ctl/src/cmd_impl/hummock/validate_version.rs
@@ -12,9 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use std::cmp::Ordering;
+
+use chrono::offset::Utc;
+use chrono::DateTime;
+use itertools::Itertools;
+use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext;
+use risingwave_hummock_sdk::key::{FullKey, UserKey};
+use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
+use risingwave_hummock_sdk::{version_archive_dir, HummockSstableObjectId, HummockVersionId};
+use risingwave_object_store::object::ObjectStoreRef;
+use risingwave_pb::hummock::group_delta::DeltaType;
+use risingwave_pb::hummock::{HummockVersionArchive, SstableInfo};
use risingwave_rpc_client::HummockMetaClient;
+use risingwave_storage::hummock::value::HummockValue;
+use risingwave_storage::hummock::{Block, BlockHolder, BlockIterator, SstableStoreRef};
+use risingwave_storage::monitor::StoreLocalStatistic;
+use crate::common::HummockServiceOpts;
use crate::CtlContext;
pub async fn validate_version(context: &CtlContext) -> anyhow::Result<()> {
@@ -30,3 +46,177 @@ pub async fn validate_version(context: &CtlContext) -> anyhow::Result<()> {
Ok(())
}
+
+async fn get_archive(
+ archive_id: HummockVersionId,
+ data_dir: &str,
+ archive_object_store: ObjectStoreRef,
+) -> anyhow::Result {
+ use prost::Message;
+ let archive_dir = version_archive_dir(data_dir);
+ let archive_path = format!("{archive_dir}/{archive_id}");
+ let archive_bytes = archive_object_store.read(&archive_path, ..).await?;
+ let archive: HummockVersionArchive = HummockVersionArchive::decode(archive_bytes)?;
+ Ok(archive)
+}
+
+pub async fn print_user_key_in_archive(
+ context: &CtlContext,
+ archive_ids: Vec,
+ data_dir: String,
+ user_key: String,
+) -> anyhow::Result<()> {
+ let user_key_bytes = hex::decode(user_key.clone()).unwrap_or_else(|_| {
+ panic!("cannot decode user key {} into raw bytes", user_key);
+ });
+ let user_key = UserKey::decode(&user_key_bytes);
+ println!("user key: {user_key:?}");
+
+ let hummock_opts = HummockServiceOpts::from_env(Some(data_dir.clone()))?;
+ let hummock = context.hummock_store(hummock_opts).await?;
+ let sstable_store = hummock.sstable_store();
+ let archive_object_store = sstable_store.store();
+ for archive_id in archive_ids.into_iter().sorted() {
+ println!("search archive {archive_id}");
+ let archive = get_archive(archive_id, &data_dir, archive_object_store.clone()).await?;
+ let mut base_version =
+ HummockVersion::from_persisted_protobuf(archive.version.as_ref().unwrap());
+ print_user_key_in_version(sstable_store.clone(), &base_version, &user_key).await?;
+ for delta in &archive.version_deltas {
+ base_version.apply_version_delta(&HummockVersionDelta::from_persisted_protobuf(delta));
+ print_user_key_in_version(sstable_store.clone(), &base_version, &user_key).await?;
+ }
+ }
+ Ok(())
+}
+
+async fn print_user_key_in_version(
+ sstable_store: SstableStoreRef,
+ version: &HummockVersion,
+ target_key: &UserKey<&[u8]>,
+) -> anyhow::Result<()> {
+ println!("print key {:?} in version {}", target_key, version.id);
+ for cg in version.levels.values() {
+ for level in cg
+ .l0
+ .as_ref()
+ .unwrap()
+ .sub_levels
+ .iter()
+ .rev()
+ .chain(cg.levels.iter())
+ {
+ for sstable_info in &level.table_infos {
+ use risingwave_hummock_sdk::key_range::KeyRange;
+ let key_range: KeyRange = sstable_info.key_range.as_ref().unwrap().into();
+ let left_user_key = FullKey::decode(&key_range.left);
+ let right_user_key = FullKey::decode(&key_range.right);
+ if left_user_key.user_key > *target_key || *target_key > right_user_key.user_key {
+ continue;
+ }
+ print_user_key_in_sst(sstable_store.clone(), sstable_info, target_key).await?;
+ }
+ }
+ }
+ Ok(())
+}
+
+async fn print_user_key_in_sst(
+ sstable_store: SstableStoreRef,
+ sst: &SstableInfo,
+ user_key: &UserKey<&[u8]>,
+) -> anyhow::Result<()> {
+ // The implementation is mostly the same as `sst_dump`, with additional filter by `user_key`.
+ let mut dummy = StoreLocalStatistic::default();
+ let sst_metadata = sstable_store.sstable(sst, &mut dummy).await?;
+ dummy.ignore();
+ let data_path = sstable_store.get_sst_data_path(sst_metadata.id);
+ let mut is_first = true;
+ for block_meta in &sst_metadata.meta.block_metas {
+ let range =
+ block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
+ let block_data = sstable_store.store().read(&data_path, range).await?;
+ let block = Box::new(Block::decode(block_data, block_meta.uncompressed_size as _).unwrap());
+ let holder = BlockHolder::from_owned_block(block);
+ let mut block_iter = BlockIterator::new(holder);
+ block_iter.seek_to_first();
+ while block_iter.is_valid() {
+ let full_key = block_iter.key();
+ if full_key.user_key.cmp(user_key) != Ordering::Equal {
+ block_iter.next();
+ continue;
+ }
+ let full_val = block_iter.value();
+ let hummock_val = HummockValue::from_slice(full_val)?;
+ let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
+ let date_time = DateTime::::from(epoch.as_system_time());
+ if is_first {
+ is_first = false;
+ println!("\t\tSST id: {}, object id: {}", sst.sst_id, sst.object_id);
+ }
+ println!("\t\t key: {:?}, len={}", full_key, full_key.encoded_len());
+ println!(
+ "\t\t value: {:?}, len={}",
+ hummock_val,
+ hummock_val.encoded_len()
+ );
+ println!(
+ "\t\t epoch: {} offset = {} ({})",
+ epoch,
+ full_key.epoch_with_gap.offset(),
+ date_time
+ );
+ println!();
+ block_iter.next();
+ }
+ }
+ Ok(())
+}
+
+pub async fn print_version_delta_in_archive(
+ context: &CtlContext,
+ archive_ids: Vec,
+ data_dir: String,
+ sst_id: HummockSstableObjectId,
+) -> anyhow::Result<()> {
+ let hummock_opts = HummockServiceOpts::from_env(Some(data_dir.clone()))?;
+ let hummock = context.hummock_store(hummock_opts).await?;
+ let sstable_store = hummock.sstable_store();
+ let archive_object_store = sstable_store.store();
+ for archive_id in archive_ids.into_iter().sorted() {
+ println!("search archive {archive_id}");
+ let archive = get_archive(archive_id, &data_dir, archive_object_store.clone()).await?;
+ for delta in &archive.version_deltas {
+ let mut is_first = true;
+ for (cg_id, deltas) in &delta.group_deltas {
+ for d in &deltas.group_deltas {
+ let d = d.delta_type.as_ref().unwrap();
+ if match_delta(d, sst_id) {
+ if is_first {
+ is_first = false;
+ println!("delta: id {}, prev_id {}, max_committed_epoch {}, trivial_move {}, safe_epoch {}", delta.id, delta.prev_id, delta.max_committed_epoch, delta.trivial_move, delta.safe_epoch);
+ }
+ println!("compaction group id {cg_id}");
+ print_delta(d);
+ }
+ }
+ }
+ }
+ }
+ Ok(())
+}
+
+fn match_delta(delta: &DeltaType, sst_id: HummockSstableObjectId) -> bool {
+ let DeltaType::IntraLevel(delta) = delta else {
+ return false;
+ };
+ delta
+ .inserted_table_infos
+ .iter()
+ .any(|sst| sst.sst_id == sst_id)
+ || delta.removed_table_ids.iter().any(|sst| *sst == sst_id)
+}
+
+fn print_delta(delta: &DeltaType) {
+ println!("{:?}", delta);
+}
diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs
index 936a24a8d4bb6..715460deb27ac 100644
--- a/src/ctl/src/lib.rs
+++ b/src/ctl/src/lib.rs
@@ -266,11 +266,32 @@ enum HummockCommands {
ValidateVersion,
/// Rebuild table stats
RebuildTableStats,
-
CancelCompactTask {
#[clap(short, long)]
task_id: u64,
},
+ PrintUserKeyInArchive {
+ /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
+ #[clap(long, value_delimiter = ',')]
+ archive_ids: Vec,
+ /// The data directory of Hummock storage, where SSTable objects can be found.
+ #[clap(long)]
+ data_dir: String,
+ /// KVs that are matched with the user key are printed.
+ #[clap(long)]
+ user_key: String,
+ },
+ PrintVersionDeltaInArchive {
+ /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
+ #[clap(long, value_delimiter = ',')]
+ archive_ids: Vec,
+ /// The data directory of Hummock storage, where SSTable objects can be found.
+ #[clap(long)]
+ data_dir: String,
+ /// Version deltas that are related to the SST id are printed.
+ #[clap(long)]
+ sst_id: u64,
+ },
}
#[derive(Subcommand)]
@@ -667,6 +688,27 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
}
+ Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
+ archive_ids,
+ data_dir,
+ sst_id,
+ }) => {
+ cmd_impl::hummock::print_version_delta_in_archive(
+ context,
+ archive_ids,
+ data_dir,
+ sst_id,
+ )
+ .await?;
+ }
+ Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
+ archive_ids,
+ data_dir,
+ user_key,
+ }) => {
+ cmd_impl::hummock::print_user_key_in_archive(context, archive_ids, data_dir, user_key)
+ .await?;
+ }
Commands::Table(TableCommands::Scan { mv_name, data_dir }) => {
cmd_impl::table::scan(context, mv_name, data_dir).await?
}
diff --git a/src/expr/impl/src/scalar/array_positions.rs b/src/expr/impl/src/scalar/array_positions.rs
index 5218dbce6780f..cbae53c001439 100644
--- a/src/expr/impl/src/scalar/array_positions.rs
+++ b/src/expr/impl/src/scalar/array_positions.rs
@@ -66,10 +66,7 @@ use risingwave_expr::{function, ExprError, Result};
/// 2
/// ```
#[function("array_position(anyarray, any) -> int4")]
-fn array_position(
- array: Option>,
- element: Option>,
-) -> Result