diff --git a/Cargo.lock b/Cargo.lock
index 211143a6183a9..c58ea65709ca9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4189,7 +4189,7 @@ dependencies = [
"libc",
"log",
"rustversion",
- "windows",
+ "windows 0.48.0",
]
[[package]]
@@ -4678,7 +4678,7 @@ dependencies = [
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
- "windows",
+ "windows 0.48.0",
]
[[package]]
@@ -8715,6 +8715,7 @@ dependencies = [
"risingwave_connector",
"risingwave_expr",
"risingwave_expr_impl",
+ "risingwave_hummock_sdk",
"risingwave_pb",
"risingwave_rpc_client",
"risingwave_source",
@@ -8745,6 +8746,7 @@ dependencies = [
"hex",
"itertools 0.12.0",
"parse-display",
+ "prost 0.12.1",
"risingwave_common",
"risingwave_pb",
"tracing",
@@ -8946,6 +8948,7 @@ dependencies = [
name = "risingwave_meta_model_v2"
version = "1.5.0-alpha"
dependencies = [
+ "risingwave_hummock_sdk",
"risingwave_pb",
"sea-orm",
"serde",
@@ -8997,6 +9000,7 @@ dependencies = [
"regex",
"risingwave_common",
"risingwave_connector",
+ "risingwave_hummock_sdk",
"risingwave_meta",
"risingwave_meta_model_v2",
"risingwave_pb",
@@ -11020,16 +11024,16 @@ checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "sysinfo"
-version = "0.29.10"
+version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0a18d114d420ada3a891e6bc8e96a2023402203296a47cdd65083377dad18ba5"
+checksum = "c68492e7268037de59ae153d7efb79546cf94a18a9548235420d3d8d2436b4b1"
dependencies = [
"cfg-if",
"core-foundation-sys",
"libc",
"ntapi",
"once_cell",
- "winapi",
+ "windows 0.51.1",
]
[[package]]
@@ -12159,6 +12163,25 @@ dependencies = [
"windows-targets 0.48.5",
]
+[[package]]
+name = "windows"
+version = "0.51.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ca229916c5ee38c2f2bc1e9d8f04df975b4bd93f9955dc69fabb5d91270045c9"
+dependencies = [
+ "windows-core",
+ "windows-targets 0.48.5",
+]
+
+[[package]]
+name = "windows-core"
+version = "0.51.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64"
+dependencies = [
+ "windows-targets 0.48.5",
+]
+
[[package]]
name = "windows-sys"
version = "0.45.0"
@@ -12481,9 +12504,9 @@ checksum = "4d25c75bf9ea12c4040a97f829154768bbbce366287e2dc044af160cd79a13fd"
[[package]]
name = "xorf"
-version = "0.10.2"
+version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7d36478bcf71152a2f9f6cf9bc48273333f32780c769ef90e13d464ab778db5f"
+checksum = "cf24c008fe464f5d8f58b8d16a1ab7e930bd73b2a6933ff8704c414b2bed7f92"
dependencies = [
"libm",
"rand",
diff --git a/ci/scripts/gen-integration-test-yaml.py b/ci/scripts/gen-integration-test-yaml.py
index 9ca065f856e89..13022a06d5ebe 100644
--- a/ci/scripts/gen-integration-test-yaml.py
+++ b/ci/scripts/gen-integration-test-yaml.py
@@ -35,6 +35,7 @@
'nats': ['json'],
'doris-sink': ['json'],
'starrocks-sink': ['json'],
+ 'deltalake-sink': ['json'],
}
def gen_pipeline_steps():
diff --git a/ci/scripts/notify.py b/ci/scripts/notify.py
index 1e8e8580e9082..2f68733b26022 100755
--- a/ci/scripts/notify.py
+++ b/ci/scripts/notify.py
@@ -58,6 +58,7 @@
"vector-json": ["tao"],
"doris-sink": ["xinhao"],
"starrocks-sink": ["xinhao"],
+ "deltalake-sink": ["xinhao"],
}
def get_failed_tests(get_test_status, test_map):
diff --git a/e2e_test/batch/catalog/pg_attribute.slt.part b/e2e_test/batch/catalog/pg_attribute.slt.part
index 8bd43485c3ebe..7f96653af83a0 100644
--- a/e2e_test/batch/catalog/pg_attribute.slt.part
+++ b/e2e_test/batch/catalog/pg_attribute.slt.part
@@ -38,9 +38,7 @@ select i.relname, a.attname, ix.indkey from pg_catalog.pg_class t
join pg_catalog.pg_attribute a on t.oid = a.attrelid and a.attnum = ANY(ix.indkey)
where t.relname = 'tmp' order by a.attnum;
----
-tmp_idx id2 {2,3,4,5}
-tmp_idx id3 {2,3,4,5}
-tmp_idx id4 {2,3,4,5}
+tmp_idx id2 {2}
statement ok
drop table tmp;
diff --git a/e2e_test/batch/catalog/pg_index.slt.part b/e2e_test/batch/catalog/pg_index.slt.part
index 3ebace06f207c..c42e74d60ab49 100644
--- a/e2e_test/batch/catalog/pg_index.slt.part
+++ b/e2e_test/batch/catalog/pg_index.slt.part
@@ -10,7 +10,7 @@ select ix.indnatts, ix.indkey from pg_catalog.pg_class t
join pg_catalog.pg_class i on i.oid = ix.indexrelid
where t.relname = 'tmp' and i.relname = 'tmp_id2_idx';
----
-2 {2,3}
+1 {2}
statement ok
create index tmp_id2_idx_include_id1 on tmp(id2) include(id1);
@@ -21,7 +21,7 @@ select ix.indnatts, ix.indkey from pg_catalog.pg_class t
join pg_catalog.pg_class i on i.oid = ix.indexrelid
where t.relname = 'tmp' and i.relname = 'tmp_id2_idx_include_id1';
----
-3 {2,3,4}
+1 {2}
statement ok
create index tmp_id1_id2_idx on tmp(id1, id2);
@@ -32,7 +32,7 @@ select ix.indnatts, ix.indkey from pg_catalog.pg_class t
join pg_catalog.pg_class i on i.oid = ix.indexrelid
where t.relname = 'tmp' and i.relname = 'tmp_id1_id2_idx';
----
-3 {2,3,4}
+2 {1,2}
statement ok
drop table tmp;
diff --git a/e2e_test/over_window/generated/batch/main.slt.part b/e2e_test/over_window/generated/batch/main.slt.part
index 9ef787964fafe..2f9c16987df49 100644
--- a/e2e_test/over_window/generated/batch/main.slt.part
+++ b/e2e_test/over_window/generated/batch/main.slt.part
@@ -1,11 +1,27 @@
# This file is generated by `gen.py`. Do not edit it manually!
statement ok
-SET RW_IMPLICIT_FLUSH TO true;
-
-include ./basic/mod.slt.part
-include ./rank_func/mod.slt.part
-include ./expr_in_win_func/mod.slt.part
-include ./agg_in_win_func/mod.slt.part
-include ./opt_agg_then_join/mod.slt.part
-include ./with_filter/mod.slt.part
+set rw_implicit_flush = true;
+
+statement ok
+set rw_streaming_over_window_cache_policy = full;
+
+include ./run_all.slt.part
+
+statement ok
+set rw_streaming_over_window_cache_policy = recent;
+
+include ./run_all.slt.part
+
+statement ok
+set rw_streaming_over_window_cache_policy = recent_first_n;
+
+include ./run_all.slt.part
+
+statement ok
+set rw_streaming_over_window_cache_policy = recent_last_n;
+
+include ./run_all.slt.part
+
+statement ok
+set rw_streaming_over_window_cache_policy = default;
diff --git a/e2e_test/over_window/generated/batch/run_all.slt.part b/e2e_test/over_window/generated/batch/run_all.slt.part
new file mode 100644
index 0000000000000..07a7f538a7ba4
--- /dev/null
+++ b/e2e_test/over_window/generated/batch/run_all.slt.part
@@ -0,0 +1,8 @@
+# This file is generated by `gen.py`. Do not edit it manually!
+
+include ./basic/mod.slt.part
+include ./rank_func/mod.slt.part
+include ./expr_in_win_func/mod.slt.part
+include ./agg_in_win_func/mod.slt.part
+include ./opt_agg_then_join/mod.slt.part
+include ./with_filter/mod.slt.part
diff --git a/e2e_test/over_window/generated/streaming/main.slt.part b/e2e_test/over_window/generated/streaming/main.slt.part
index 9ef787964fafe..2f9c16987df49 100644
--- a/e2e_test/over_window/generated/streaming/main.slt.part
+++ b/e2e_test/over_window/generated/streaming/main.slt.part
@@ -1,11 +1,27 @@
# This file is generated by `gen.py`. Do not edit it manually!
statement ok
-SET RW_IMPLICIT_FLUSH TO true;
-
-include ./basic/mod.slt.part
-include ./rank_func/mod.slt.part
-include ./expr_in_win_func/mod.slt.part
-include ./agg_in_win_func/mod.slt.part
-include ./opt_agg_then_join/mod.slt.part
-include ./with_filter/mod.slt.part
+set rw_implicit_flush = true;
+
+statement ok
+set rw_streaming_over_window_cache_policy = full;
+
+include ./run_all.slt.part
+
+statement ok
+set rw_streaming_over_window_cache_policy = recent;
+
+include ./run_all.slt.part
+
+statement ok
+set rw_streaming_over_window_cache_policy = recent_first_n;
+
+include ./run_all.slt.part
+
+statement ok
+set rw_streaming_over_window_cache_policy = recent_last_n;
+
+include ./run_all.slt.part
+
+statement ok
+set rw_streaming_over_window_cache_policy = default;
diff --git a/e2e_test/over_window/generated/streaming/run_all.slt.part b/e2e_test/over_window/generated/streaming/run_all.slt.part
new file mode 100644
index 0000000000000..07a7f538a7ba4
--- /dev/null
+++ b/e2e_test/over_window/generated/streaming/run_all.slt.part
@@ -0,0 +1,8 @@
+# This file is generated by `gen.py`. Do not edit it manually!
+
+include ./basic/mod.slt.part
+include ./rank_func/mod.slt.part
+include ./expr_in_win_func/mod.slt.part
+include ./agg_in_win_func/mod.slt.part
+include ./opt_agg_then_join/mod.slt.part
+include ./with_filter/mod.slt.part
diff --git a/e2e_test/over_window/templates/main.slt.part b/e2e_test/over_window/templates/main.slt.part
index 5cd945b123ea1..26513fab2ff49 100644
--- a/e2e_test/over_window/templates/main.slt.part
+++ b/e2e_test/over_window/templates/main.slt.part
@@ -1,9 +1,25 @@
statement ok
-SET RW_IMPLICIT_FLUSH TO true;
-
-include ./basic/mod.slt.part
-include ./rank_func/mod.slt.part
-include ./expr_in_win_func/mod.slt.part
-include ./agg_in_win_func/mod.slt.part
-include ./opt_agg_then_join/mod.slt.part
-include ./with_filter/mod.slt.part
+set rw_implicit_flush = true;
+
+statement ok
+set rw_streaming_over_window_cache_policy = full;
+
+include ./run_all.slt.part
+
+statement ok
+set rw_streaming_over_window_cache_policy = recent;
+
+include ./run_all.slt.part
+
+statement ok
+set rw_streaming_over_window_cache_policy = recent_first_n;
+
+include ./run_all.slt.part
+
+statement ok
+set rw_streaming_over_window_cache_policy = recent_last_n;
+
+include ./run_all.slt.part
+
+statement ok
+set rw_streaming_over_window_cache_policy = default;
diff --git a/e2e_test/over_window/templates/run_all.slt.part b/e2e_test/over_window/templates/run_all.slt.part
new file mode 100644
index 0000000000000..d939acb58fc08
--- /dev/null
+++ b/e2e_test/over_window/templates/run_all.slt.part
@@ -0,0 +1,6 @@
+include ./basic/mod.slt.part
+include ./rank_func/mod.slt.part
+include ./expr_in_win_func/mod.slt.part
+include ./agg_in_win_func/mod.slt.part
+include ./opt_agg_then_join/mod.slt.part
+include ./with_filter/mod.slt.part
diff --git a/integration_tests/deltalake-sink/README.md b/integration_tests/deltalake-sink/README.md
new file mode 100644
index 0000000000000..0dd34cab2ff70
--- /dev/null
+++ b/integration_tests/deltalake-sink/README.md
@@ -0,0 +1,25 @@
+# Demo: Sinking to Delta Lake
+
+In this demo, we will create an append-only source via our datagen source,
+and sink the data generated from source to the downstream delta lake table
+stored on minio.
+
+1. Launch the cluster via docker compose
+```
+docker compose up -d
+```
+
+2. Create a delta lake table on minio
+```
+docker compose exec minio-0 mkdir /data/deltalake
+docker compose exec spark bash /spark-script/run-sql-file.sh create-table
+```
+
+3. Execute the SQL queries in sequence:
+ - create_source.sql
+ - create_sink.sql
+
+4. Query delta lake table. The following command will query the total count of records.
+```
+docker compose exec spark bash /spark-script/run-sql-file.sh query-table
+```
\ No newline at end of file
diff --git a/integration_tests/deltalake-sink/create_sink.sql b/integration_tests/deltalake-sink/create_sink.sql
new file mode 100644
index 0000000000000..c7dab6ef5dd9e
--- /dev/null
+++ b/integration_tests/deltalake-sink/create_sink.sql
@@ -0,0 +1,10 @@
+create sink delta_lake_sink from source
+with (
+ connector = 'deltalake',
+ type = 'append-only',
+ force_append_only='true',
+ location = 's3a://deltalake/delta',
+ s3.access.key = 'hummockadmin',
+ s3.secret.key = 'hummockadmin',
+ s3.endpoint = 'http://minio-0:9301'
+);
\ No newline at end of file
diff --git a/integration_tests/deltalake-sink/create_source.sql b/integration_tests/deltalake-sink/create_source.sql
new file mode 100644
index 0000000000000..d78b460512b8b
--- /dev/null
+++ b/integration_tests/deltalake-sink/create_source.sql
@@ -0,0 +1,3 @@
+CREATE table source (id int, name varchar);
+
+INSERT into source values (1, 'a'), (2, 'b'), (3, 'c');
\ No newline at end of file
diff --git a/integration_tests/deltalake-sink/docker-compose.yml b/integration_tests/deltalake-sink/docker-compose.yml
new file mode 100644
index 0000000000000..8e9a20533f25e
--- /dev/null
+++ b/integration_tests/deltalake-sink/docker-compose.yml
@@ -0,0 +1,43 @@
+---
+version: "3"
+services:
+ spark:
+ image: apache/spark:3.3.1
+ command: tail -f /dev/null
+ depends_on:
+ - minio-0
+ volumes:
+ - "./spark-script:/spark-script"
+ container_name: spark
+ risingwave-standalone:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: risingwave-standalone
+ etcd-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: etcd-0
+ grafana-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: grafana-0
+ minio-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: minio-0
+ prometheus-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: prometheus-0
+volumes:
+ compute-node-0:
+ external: false
+ etcd-0:
+ external: false
+ grafana-0:
+ external: false
+ minio-0:
+ external: false
+ prometheus-0:
+ external: false
+name: risingwave-compose
\ No newline at end of file
diff --git a/integration_tests/deltalake-sink/prepare.sh b/integration_tests/deltalake-sink/prepare.sh
new file mode 100644
index 0000000000000..419edbb9b794f
--- /dev/null
+++ b/integration_tests/deltalake-sink/prepare.sh
@@ -0,0 +1,7 @@
+#!/bin/bash
+
+set -euo pipefail
+
+# build minio dir and create table
+docker compose exec minio-0 mkdir /data/deltalake
+docker compose exec spark bash /spark-script/run-sql-file.sh create-table
\ No newline at end of file
diff --git a/integration_tests/deltalake-sink/sink_check.py b/integration_tests/deltalake-sink/sink_check.py
new file mode 100644
index 0000000000000..b3796d2ca56ee
--- /dev/null
+++ b/integration_tests/deltalake-sink/sink_check.py
@@ -0,0 +1,30 @@
+import subprocess
+from time import sleep
+
+sleep(60)
+
+query_sql = open("spark-script/query-table.sql").read()
+
+print("querying deltalake with sql: %s" % query_sql)
+
+query_output_file_name = "query_output.txt"
+
+query_output_file = open(query_output_file_name, "wb")
+
+subprocess.run(
+ ["docker", "compose", "exec", "spark", "bash", "/spark-script/run-sql-file.sh", "query-table"],
+ check=True, stdout=query_output_file)
+query_output_file.close()
+
+with open(query_output_file_name, 'r') as file:
+ all_lines = file.readlines()
+
+last_three_lines = all_lines[-3:]
+
+print("result", last_three_lines)
+
+line1, line2, line3 = last_three_lines
+
+assert line1.strip() == '1\ta'
+assert line2.strip() == '2\tb'
+assert line3.strip() == '3\tc'
\ No newline at end of file
diff --git a/integration_tests/deltalake-sink/spark-script/.gitignore b/integration_tests/deltalake-sink/spark-script/.gitignore
new file mode 100644
index 0000000000000..2af1a65665298
--- /dev/null
+++ b/integration_tests/deltalake-sink/spark-script/.gitignore
@@ -0,0 +1,3 @@
+derby.log
+metastore_db
+.ivy2
\ No newline at end of file
diff --git a/integration_tests/deltalake-sink/spark-script/create-table.sql b/integration_tests/deltalake-sink/spark-script/create-table.sql
new file mode 100644
index 0000000000000..edd31d08f3ebc
--- /dev/null
+++ b/integration_tests/deltalake-sink/spark-script/create-table.sql
@@ -0,0 +1 @@
+create table delta.`s3a://deltalake/delta`(id int, name string) using delta
\ No newline at end of file
diff --git a/integration_tests/deltalake-sink/spark-script/query-table.sql b/integration_tests/deltalake-sink/spark-script/query-table.sql
new file mode 100644
index 0000000000000..bdd1dea729836
--- /dev/null
+++ b/integration_tests/deltalake-sink/spark-script/query-table.sql
@@ -0,0 +1 @@
+SELECT * from delta.`s3a://deltalake/delta` order by id;
\ No newline at end of file
diff --git a/integration_tests/deltalake-sink/spark-script/run-sql-file.sh b/integration_tests/deltalake-sink/spark-script/run-sql-file.sh
new file mode 100644
index 0000000000000..58132bcfafa0a
--- /dev/null
+++ b/integration_tests/deltalake-sink/spark-script/run-sql-file.sh
@@ -0,0 +1,11 @@
+set -ex
+
+/opt/spark/bin/spark-sql --packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.3.2\
+ --conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \
+ --conf 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension' \
+ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog' \
+ --conf 'spark.hadoop.fs.s3a.access.key=hummockadmin' \
+ --conf 'spark.hadoop.fs.s3a.secret.key=hummockadmin' \
+ --conf 'spark.hadoop.fs.s3a.endpoint=http://minio-0:9301' \
+ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \
+ -f /spark-script/$1.sql
\ No newline at end of file
diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java
index 5fc94f2d87c24..6ef62ac3a9661 100644
--- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java
+++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java
@@ -41,7 +41,7 @@ public static boolean waitForStreamingRunning(SourceTypeE sourceType, String dbS
}
private static boolean waitForStreamingRunningInner(String connector, String dbServerName) {
- int maxPollCount = 10;
+ int maxPollCount = 11; // max poll 10 seconds
while (!isStreamingRunning(connector, dbServerName, "streaming")) {
maxPollCount--;
if (maxPollCount == 0) {
diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java
index 4f4a13f3027a8..65f961dbf523a 100644
--- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java
+++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java
@@ -14,11 +14,12 @@
package com.risingwave.connector.source.core;
+import static io.debezium.schema.AbstractTopicNamingStrategy.TOPIC_HEARTBEAT_PREFIX;
+
import com.risingwave.connector.api.source.CdcEngine;
import com.risingwave.proto.ConnectorServiceProto;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
-import io.debezium.heartbeat.Heartbeat;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -35,7 +36,7 @@ public DbzCdcEngine(
long sourceId,
Properties config,
DebeziumEngine.CompletionCallback completionCallback) {
- var dbzHeartbeatPrefix = config.getProperty(Heartbeat.HEARTBEAT_TOPICS_PREFIX.name());
+ var dbzHeartbeatPrefix = config.getProperty(TOPIC_HEARTBEAT_PREFIX.name());
var consumer =
new DbzCdcEventConsumer(
sourceId,
diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java
index caf8ed4b87285..d4728c333b9f0 100644
--- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java
+++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java
@@ -18,6 +18,7 @@
import com.risingwave.connector.source.common.DbzConnectorConfig;
import com.risingwave.connector.source.common.DbzSourceUtils;
import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse;
+import io.debezium.config.CommonConnectorConfig;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -113,7 +114,8 @@ public boolean start() throws InterruptedException {
// For backfill source, we need to wait for the streaming source to start before proceeding
if (config.isBackfillSource()) {
var databaseServerName =
- config.getResolvedDebeziumProps().getProperty("database.server.name");
+ config.getResolvedDebeziumProps()
+ .getProperty(CommonConnectorConfig.TOPIC_PREFIX.name());
startOk =
DbzSourceUtils.waitForStreamingRunning(
config.getSourceType(), databaseServerName);
diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties b/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties
index d98e5b710ecb2..3337aebf92c6c 100644
--- a/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties
+++ b/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties
@@ -1,5 +1,8 @@
# Store common debezium configs shared by all connectors
-database.server.name=RW_CDC_${source.id}
+topic.prefix=RW_CDC_${source.id}
+topic.heartbeat.prefix=${debezium.topic.heartbeat.prefix:-RW_CDC_HeartBeat_}
+schema.history.internal=io.debezium.relational.history.MemorySchemaHistory
+offset.storage=com.risingwave.connector.cdc.debezium.internal.ConfigurableOffsetBackingStore
converters=datetime
datetime.type=com.risingwave.connector.cdc.debezium.converters.DatetimeTypeConverter
# use string to preserve the precision of decimal, since currently we cannot
@@ -8,5 +11,4 @@ decimal.handling.mode=${debezium.decimal.handling.mode:-string}
interval.handling.mode=string
max.batch.size=${debezium.max.batch.size:-1024}
max.queue.size=${debezium.max.queue.size:-8192}
-
time.precision.mode=adaptive_time_microseconds
diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties
index 04ee9a2dad821..9c8f4f6e0d306 100644
--- a/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties
+++ b/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties
@@ -1,7 +1,5 @@
# configs for mysql connector
connector.class=io.debezium.connector.mysql.MySqlConnector
-offset.storage=com.risingwave.connector.cdc.debezium.internal.ConfigurableOffsetBackingStore
-database.history=io.debezium.relational.history.MemoryDatabaseHistory
# default snapshot mode to initial
snapshot.mode=${debezium.snapshot.mode:-initial}
database.hostname=${hostname}
@@ -17,7 +15,6 @@ database.server.id=${server.id}
database.connectionTimeZone=+00:00
# default heartbeat interval 60 seconds
heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-60000}
-heartbeat.topics.prefix=${debezium.heartbeat.topics.prefix:-RW_CDC_HeartBeat_}
# In sharing cdc mode, we will subscribe to multiple tables in the given database,
# so here we set ${table.name} to a default value `RW_CDC_Sharing` just for display.
name=${hostname}:${port}:${database.name}.${table.name:-RW_CDC_Sharing}
diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties b/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties
index 382a051a293a7..bb32c43a91c74 100644
--- a/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties
+++ b/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties
@@ -1,7 +1,5 @@
# configs for postgres conneoctor
connector.class=io.debezium.connector.postgresql.PostgresConnector
-offset.storage=com.risingwave.connector.cdc.debezium.internal.ConfigurableOffsetBackingStore
-database.history=io.debezium.relational.history.MemoryDatabaseHistory
# default snapshot mode to initial
snapshot.mode=${debezium.snapshot.mode:-initial}
database.hostname=${hostname}
@@ -17,10 +15,9 @@ plugin.name=${debezium.plugin.name:-pgoutput}
# allow to auto create publication for given tables
publication.autocreate.mode=${debezium.publication.autocreate.mode:-filtered}
publication.name=${publication.name:-rw_publication}
-# default heartbeat interval 5 mins
-heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-300000}
-heartbeat.topics.prefix=${debezium.heartbeat.topics.prefix:-RW_CDC_HeartBeat_}
-# In sharing cdc mode, we will subscribe to multiple tables in the given database,
+# default heartbeat interval 60 seconds
+heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-60000}
+# In sharing cdc source mode, we will subscribe to multiple tables in the given database,
# so here we set ${table.name} to a default value `RW_CDC_Sharing` just for display.
name=${hostname}:${port}:${database.name}.${schema.name}.${table.name:-RW_CDC_Sharing}
provide.transaction.metadata=${transactional:-false}
diff --git a/java/connector-node/risingwave-connector-test/pom.xml b/java/connector-node/risingwave-connector-test/pom.xml
index bd9195e099f41..14b1c7bd65fc0 100644
--- a/java/connector-node/risingwave-connector-test/pom.xml
+++ b/java/connector-node/risingwave-connector-test/pom.xml
@@ -1,7 +1,7 @@
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
risingwave-java-root
com.risingwave
@@ -14,7 +14,6 @@
risingwave-connector-test
- 1.17.6
1.0.0
@@ -111,20 +110,14 @@
org.testcontainers
testcontainers
- ${testcontainers.version}
- test
org.testcontainers
mysql
- ${testcontainers.version}
- test
org.testcontainers
postgresql
- ${testcontainers.version}
- test
org.testcontainers
@@ -145,11 +138,6 @@
test
-
- com.risingwave
- risingwave-source-cdc
- test
-
com.risingwave
risingwave-connector-service
diff --git a/java/connector-node/risingwave-connector-test/src/test/resources/log4j2.properties b/java/connector-node/risingwave-connector-test/src/test/resources/log4j2.properties
index 12c71029bcab6..a380d72fc3c2a 100644
--- a/java/connector-node/risingwave-connector-test/src/test/resources/log4j2.properties
+++ b/java/connector-node/risingwave-connector-test/src/test/resources/log4j2.properties
@@ -1,12 +1,10 @@
-rootLogger.level = ERROR
+rootLogger.level=ERROR
# declare the appender to use
-appenders = console
-
+appenders=console
# appender properties
-appender.console.type = Console
-appender.console.name = stdout
-appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%t] %c{2}:%L - %m%n
-
-rootLogger.appenderRefs = console
-rootLogger.appenderRef.console.ref = stdout
+appender.console.type=Console
+appender.console.name=stdout
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%t] %c{2}:%L - %m%n
+rootLogger.appenderRefs=console
+rootLogger.appenderRef.console.ref=stdout
diff --git a/java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/ConfigurableOffsetBackingStore.java b/java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/ConfigurableOffsetBackingStore.java
index 0ceb4ba16e11e..cb7c71ec08546 100644
--- a/java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/ConfigurableOffsetBackingStore.java
+++ b/java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/ConfigurableOffsetBackingStore.java
@@ -18,18 +18,22 @@
package com.risingwave.connector.cdc.debezium.internal;
+import io.debezium.config.Instantiator;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.engine.DebeziumEngine;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.*;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
@@ -81,12 +85,13 @@ public void configure(WorkerConfig config) {
}
String engineName = (String) conf.get(EmbeddedEngine.ENGINE_NAME.name());
- Converter keyConverter = new JsonConverter();
+ Map converterConfig =
+ Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
+
+ Converter keyConverter = Instantiator.getInstance(JsonConverter.class.getName());
+ keyConverter.configure(converterConfig, true);
Converter valueConverter = new JsonConverter();
- keyConverter.configure(config.originals(), true);
- Map valueConfigs = new HashMap<>(conf);
- valueConfigs.put("schemas.enable", false);
- valueConverter.configure(valueConfigs, true);
+ valueConverter.configure(converterConfig, true);
OffsetStorageWriter offsetWriter =
new OffsetStorageWriter(
this,
@@ -195,4 +200,9 @@ public Future set(
return null;
});
}
+
+ @Override
+ public Set
+
+ org.apache.logging.log4j
+ log4j-core
+
diff --git a/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jAdapter.java b/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jAdapter.java
index 25527a6c7e695..a3a1e752cd3ee 100644
--- a/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jAdapter.java
+++ b/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jAdapter.java
@@ -17,6 +17,9 @@
package com.risingwave.tracing;
+// Import log4j's ParameterizedMessage, so that we can format the messages
+// with the same interpolation as log4j (i.e. "{}" instead of "%s").
+import org.apache.logging.log4j.message.ParameterizedMessage;
import org.slf4j.Logger;
import org.slf4j.Marker;
@@ -45,17 +48,27 @@ public void trace(String msg) {
@Override
public void trace(String format, Object arg) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.TRACE, String.format(format, arg));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.TRACE,
+ new ParameterizedMessage(format, arg).getFormattedMessage());
}
@Override
public void trace(String format, Object arg1, Object arg2) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.TRACE, String.format(format, arg1, arg2));
+ new ParameterizedMessage(format, arg1, arg2).getFormattedMessage();
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.TRACE,
+ new ParameterizedMessage(format, arg1, arg2).getFormattedMessage());
}
@Override
public void trace(String format, Object... arguments) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.TRACE, String.format(format, arguments));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.TRACE,
+ new ParameterizedMessage(format, arguments).getFormattedMessage());
}
@Override
@@ -76,17 +89,26 @@ public void trace(Marker marker, String msg) {
@Override
public void trace(Marker marker, String format, Object arg) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.TRACE, String.format(format, arg));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.TRACE,
+ new ParameterizedMessage(format, arg).getFormattedMessage());
}
@Override
public void trace(Marker marker, String format, Object arg1, Object arg2) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.TRACE, String.format(format, arg1, arg2));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.TRACE,
+ new ParameterizedMessage(format, arg1, arg2).getFormattedMessage());
}
@Override
- public void trace(Marker marker, String format, Object... argArray) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.TRACE, String.format(format, argArray));
+ public void trace(Marker marker, String format, Object... arguments) {
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.TRACE,
+ new ParameterizedMessage(format, arguments).getFormattedMessage());
}
@Override
@@ -107,17 +129,26 @@ public void debug(String msg) {
@Override
public void debug(String format, Object arg) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.DEBUG, String.format(format, arg));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.DEBUG,
+ new ParameterizedMessage(format, arg).getFormattedMessage());
}
@Override
public void debug(String format, Object arg1, Object arg2) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.DEBUG, String.format(format, arg1, arg2));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.DEBUG,
+ new ParameterizedMessage(format, arg1, arg2).getFormattedMessage());
}
@Override
public void debug(String format, Object... arguments) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.DEBUG, String.format(format, arguments));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.DEBUG,
+ new ParameterizedMessage(format, arguments).getFormattedMessage());
}
@Override
@@ -138,17 +169,26 @@ public void debug(Marker marker, String msg) {
@Override
public void debug(Marker marker, String format, Object arg) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.DEBUG, String.format(format, arg));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.DEBUG,
+ new ParameterizedMessage(format, arg).getFormattedMessage());
}
@Override
public void debug(Marker marker, String format, Object arg1, Object arg2) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.DEBUG, String.format(format, arg1, arg2));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.DEBUG,
+ new ParameterizedMessage(format, arg1, arg2).getFormattedMessage());
}
@Override
public void debug(Marker marker, String format, Object... arguments) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.DEBUG, String.format(format, arguments));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.DEBUG,
+ new ParameterizedMessage(format, arguments).getFormattedMessage());
}
@Override
@@ -169,17 +209,26 @@ public void info(String msg) {
@Override
public void info(String format, Object arg) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.INFO, String.format(format, arg));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.INFO,
+ new ParameterizedMessage(format, arg).getFormattedMessage());
}
@Override
public void info(String format, Object arg1, Object arg2) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.INFO, String.format(format, arg1, arg2));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.INFO,
+ new ParameterizedMessage(format, arg1, arg2).getFormattedMessage());
}
@Override
public void info(String format, Object... arguments) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.INFO, String.format(format, arguments));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.INFO,
+ new ParameterizedMessage(format, arguments).getFormattedMessage());
}
@Override
@@ -200,17 +249,26 @@ public void info(Marker marker, String msg) {
@Override
public void info(Marker marker, String format, Object arg) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.INFO, String.format(format, arg));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.INFO,
+ new ParameterizedMessage(format, arg).getFormattedMessage());
}
@Override
public void info(Marker marker, String format, Object arg1, Object arg2) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.INFO, String.format(format, arg1, arg2));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.INFO,
+ new ParameterizedMessage(format, arg1, arg2).getFormattedMessage());
}
@Override
public void info(Marker marker, String format, Object... arguments) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.INFO, String.format(format, arguments));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.INFO,
+ new ParameterizedMessage(format, arguments).getFormattedMessage());
}
@Override
@@ -231,17 +289,26 @@ public void warn(String msg) {
@Override
public void warn(String format, Object arg) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.WARN, String.format(format, arg));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.WARN,
+ new ParameterizedMessage(format, arg).getFormattedMessage());
}
@Override
public void warn(String format, Object... arguments) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.WARN, String.format(format, arguments));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.WARN,
+ new ParameterizedMessage(format, arguments).getFormattedMessage());
}
@Override
public void warn(String format, Object arg1, Object arg2) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.WARN, String.format(format, arg1, arg2));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.WARN,
+ new ParameterizedMessage(format, arg1, arg2).getFormattedMessage());
}
@Override
@@ -262,17 +329,26 @@ public void warn(Marker marker, String msg) {
@Override
public void warn(Marker marker, String format, Object arg) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.WARN, String.format(format, arg));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.WARN,
+ new ParameterizedMessage(format, arg).getFormattedMessage());
}
@Override
public void warn(Marker marker, String format, Object arg1, Object arg2) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.WARN, String.format(format, arg1, arg2));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.WARN,
+ new ParameterizedMessage(format, arg1, arg2).getFormattedMessage());
}
@Override
public void warn(Marker marker, String format, Object... arguments) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.WARN, String.format(format, arguments));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.WARN,
+ new ParameterizedMessage(format, arguments).getFormattedMessage());
}
@Override
@@ -293,17 +369,26 @@ public void error(String msg) {
@Override
public void error(String format, Object arg) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.ERROR, String.format(format, arg));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.ERROR,
+ new ParameterizedMessage(format, arg).getFormattedMessage());
}
@Override
public void error(String format, Object arg1, Object arg2) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.ERROR, String.format(format, arg1, arg2));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.ERROR,
+ new ParameterizedMessage(format, arg1, arg2).getFormattedMessage());
}
@Override
public void error(String format, Object... arguments) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.ERROR, String.format(format, arguments));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.ERROR,
+ new ParameterizedMessage(format, arguments).getFormattedMessage());
}
@Override
@@ -324,17 +409,26 @@ public void error(Marker marker, String msg) {
@Override
public void error(Marker marker, String format, Object arg) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.ERROR, String.format(format, arg));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.ERROR,
+ new ParameterizedMessage(format, arg).getFormattedMessage());
}
@Override
public void error(Marker marker, String format, Object arg1, Object arg2) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.ERROR, String.format(format, arg1, arg2));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.ERROR,
+ new ParameterizedMessage(format, arg1, arg2).getFormattedMessage());
}
@Override
public void error(Marker marker, String format, Object... arguments) {
- TracingSlf4jImpl.event(name, TracingSlf4jImpl.ERROR, String.format(format, arguments));
+ TracingSlf4jImpl.event(
+ name,
+ TracingSlf4jImpl.ERROR,
+ new ParameterizedMessage(format, arguments).getFormattedMessage());
}
@Override
diff --git a/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jImpl.java b/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jImpl.java
index 0f87988ab57dc..9354a6043a56a 100644
--- a/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jImpl.java
+++ b/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jImpl.java
@@ -27,6 +27,6 @@ public class TracingSlf4jImpl {
public static final int TRACE = 4;
public static void event(String name, int level, String message) {
- Binding.tracingSlf4jEvent(name, level, message);
+ Binding.tracingSlf4jEvent(Thread.currentThread().getName(), name, level, message);
}
}
diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java
index 3001a180a15de..ff490982ccbd0 100644
--- a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java
+++ b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java
@@ -26,7 +26,8 @@ public class Binding {
}
}
- public static native void tracingSlf4jEvent(String name, int level, String message);
+ public static native void tracingSlf4jEvent(
+ String threadName, String name, int level, String message);
public static native int vnodeCount();
diff --git a/java/pom.xml b/java/pom.xml
index fca52806e878e..7e7e554abc0d4 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -1,7 +1,7 @@
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.risingwave
@@ -52,6 +52,7 @@
connector-node/risingwave-source-cdc
connector-node/risingwave-connector-test
connector-node/risingwave-connector-service
+ connector-node/risingwave-source-test
connector-node/assembly
connector-node/s3-common
connector-node/risingwave-sink-mock-flink
@@ -74,13 +75,14 @@
1.5.0
2.11.0
1.10.0
- 1.9.7.Final
+ 2.4.2.Final
2.13.5
3.3.1
3.3.3
7.17.14
4.15.0
1.18.0
+ 1.17.6
@@ -312,6 +314,24 @@
${spark_sql.version}
test
+
+ org.testcontainers
+ testcontainers
+ ${testcontainers.version}
+ test
+
+
+ org.testcontainers
+ mysql
+ ${testcontainers.version}
+ test
+
+
+ org.testcontainers
+ postgresql
+ ${testcontainers.version}
+ test
+
@@ -359,7 +379,7 @@
-
+
diff --git a/proto/catalog.proto b/proto/catalog.proto
index 33d56224976ea..4ebf9d0af6340 100644
--- a/proto/catalog.proto
+++ b/proto/catalog.proto
@@ -186,11 +186,14 @@ message Index {
// Only `InputRef` type index is supported Now.
// The index of `InputRef` is the column index of the primary table.
repeated expr.ExprNode index_item = 8;
- repeated int32 original_columns = 9;
+ reserved 9; // Deprecated repeated int32 original_columns = 9;
optional uint64 initialized_at_epoch = 10;
optional uint64 created_at_epoch = 11;
StreamJobStatus stream_job_status = 12;
+
+ // Use to record the prefix len of the index_item to reconstruct index columns provided by users.
+ uint32 index_columns_len = 13;
}
message Function {
diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs
index b2f20ea6412a0..e24c9ee50b288 100644
--- a/src/batch/src/task/task_manager.rs
+++ b/src/batch/src/task/task_manager.rs
@@ -70,7 +70,7 @@ impl BatchManager {
builder.worker_threads(worker_threads_num);
}
builder
- .thread_name("risingwave-batch-tasks")
+ .thread_name("rw-batch")
.enable_all()
.build()
.unwrap()
diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs
index b7693c6fa06a2..27d24096487bf 100644
--- a/src/cmd_all/src/bin/risingwave.rs
+++ b/src/cmd_all/src/bin/risingwave.rs
@@ -193,14 +193,16 @@ fn main() -> Result<()> {
fn playground(opts: PlaygroundOpts) {
let settings = risingwave_rt::LoggerSettings::new("playground")
- .with_target("risingwave_storage", Level::WARN);
+ .with_target("risingwave_storage", Level::WARN)
+ .with_thread_name(true);
risingwave_rt::init_risingwave_logger(settings);
risingwave_rt::main_okk(risingwave_cmd_all::playground(opts)).unwrap();
}
fn standalone(opts: StandaloneOpts) {
let settings = risingwave_rt::LoggerSettings::new("standalone")
- .with_target("risingwave_storage", Level::WARN);
+ .with_target("risingwave_storage", Level::WARN)
+ .with_thread_name(true);
risingwave_rt::init_risingwave_logger(settings);
risingwave_rt::main_okk(risingwave_cmd_all::standalone(opts)).unwrap();
}
diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml
index f47d32862cc42..02f0a9bdda539 100644
--- a/src/common/Cargo.toml
+++ b/src/common/Cargo.toml
@@ -93,7 +93,7 @@ speedate = "0.13.0"
static_assertions = "1"
strum = "0.25"
strum_macros = "0.25"
-sysinfo = { version = "0.29", default-features = false }
+sysinfo = { version = "0.30", default-features = false }
thiserror = "1"
thiserror-ext = { workspace = true }
tinyvec = { version = "1", features = ["rustc_1_55", "grab_spare_slice"] }
diff --git a/src/common/src/telemetry/mod.rs b/src/common/src/telemetry/mod.rs
index 42d6ef1eaafcf..092baa465c7f2 100644
--- a/src/common/src/telemetry/mod.rs
+++ b/src/common/src/telemetry/mod.rs
@@ -18,7 +18,7 @@ pub mod report;
use std::time::SystemTime;
use serde::{Deserialize, Serialize};
-use sysinfo::{System, SystemExt};
+use sysinfo::System;
use thiserror_ext::AsReport;
use crate::util::env_var::env_var_is_true_or;
@@ -95,21 +95,16 @@ struct Cpu {
impl SystemData {
pub fn new() -> Self {
- let mut sys = System::new();
-
let memory = {
let total = system_memory_available_bytes();
let used = total_memory_used_bytes();
Memory { used, total }
};
- let os = {
- sys.refresh_system();
- Os {
- name: sys.name().unwrap_or_default(),
- kernel_version: sys.kernel_version().unwrap_or_default(),
- version: sys.os_version().unwrap_or_default(),
- }
+ let os = Os {
+ name: System::name().unwrap_or_default(),
+ kernel_version: System::kernel_version().unwrap_or_default(),
+ version: System::os_version().unwrap_or_default(),
};
let cpu = Cpu {
diff --git a/src/common/src/util/resource_util.rs b/src/common/src/util/resource_util.rs
index 3a3825a14e95e..d97182e062666 100644
--- a/src/common/src/util/resource_util.rs
+++ b/src/common/src/util/resource_util.rs
@@ -132,7 +132,7 @@ mod runtime {
}
pub mod memory {
- use sysinfo::{System, SystemExt};
+ use sysinfo::System;
use super::runtime::get_resource;
diff --git a/src/connector/src/source/data_gen_util.rs b/src/connector/src/source/data_gen_util.rs
index 7d990cf0ff8c9..001a726f93018 100644
--- a/src/connector/src/source/data_gen_util.rs
+++ b/src/connector/src/source/data_gen_util.rs
@@ -31,7 +31,7 @@ pub fn spawn_data_generation_stream(
) -> impl Stream- + Send + 'static {
static RUNTIME: LazyLock = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
- .thread_name("risingwave-data-generation")
+ .thread_name("rw-datagen")
.enable_all()
.build()
.expect("failed to build data-generation runtime")
diff --git a/src/connector/src/source/filesystem/nd_streaming.rs b/src/connector/src/source/filesystem/nd_streaming.rs
index 428514d2d6adf..bce96fc0976d7 100644
--- a/src/connector/src/source/filesystem/nd_streaming.rs
+++ b/src/connector/src/source/filesystem/nd_streaming.rs
@@ -18,8 +18,14 @@ use futures::io::Cursor;
use futures::AsyncBufReadExt;
use futures_async_stream::try_stream;
+use crate::parser::EncodingProperties;
use crate::source::{BoxSourceStream, SourceMessage};
+pub fn need_nd_streaming(encode_config: &EncodingProperties) -> bool {
+ matches!(encode_config, &EncodingProperties::Json(_))
+ || matches!(encode_config, EncodingProperties::Csv(_))
+}
+
#[try_stream(boxed, ok = Vec, error = anyhow::Error)]
/// This function splits a byte stream by the newline separator "(\r)\n" into a message stream.
/// It can be difficult to split and compute offsets correctly when the bytes are received in
diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs
index d2758ba9bb0ef..7efb13f409478 100644
--- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs
+++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs
@@ -24,6 +24,7 @@ use tokio_util::io::{ReaderStream, StreamReader};
use super::opendal_enumerator::OpendalEnumerator;
use super::OpendalSource;
use crate::parser::{ByteStreamSourceParserImpl, ParserConfig};
+use crate::source::filesystem::nd_streaming::need_nd_streaming;
use crate::source::filesystem::{nd_streaming, OpendalFsSplit};
use crate::source::{
BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData,
@@ -81,10 +82,7 @@ impl OpendalReader {
let parser =
ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx).await?;
- let msg_stream = if matches!(
- parser,
- ByteStreamSourceParserImpl::Json(_) | ByteStreamSourceParserImpl::Csv(_)
- ) {
+ let msg_stream = if need_nd_streaming(&self.parser_config.specific.encoding_config) {
parser.into_stream(nd_streaming::split_stream(data_stream))
} else {
parser.into_stream(data_stream)
diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs
index 4d51dbc4d2b44..6cf4387ae3e92 100644
--- a/src/connector/src/source/filesystem/s3/source/reader.rs
+++ b/src/connector/src/source/filesystem/s3/source/reader.rs
@@ -36,11 +36,13 @@ use crate::parser::{ByteStreamSourceParserImpl, ParserConfig};
use crate::source::base::{SplitMetaData, SplitReader};
use crate::source::filesystem::file_common::FsSplit;
use crate::source::filesystem::nd_streaming;
+use crate::source::filesystem::nd_streaming::need_nd_streaming;
use crate::source::filesystem::s3::S3Properties;
use crate::source::{
BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SourceMeta,
StreamChunkWithState,
};
+
const MAX_CHANNEL_BUFFER_SIZE: usize = 2048;
const STREAM_READER_CAPACITY: usize = 4096;
@@ -221,10 +223,7 @@ impl S3FileReader {
let parser =
ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx).await?;
- let msg_stream = if matches!(
- parser,
- ByteStreamSourceParserImpl::Json(_) | ByteStreamSourceParserImpl::Csv(_)
- ) {
+ let msg_stream = if need_nd_streaming(&self.parser_config.specific.encoding_config) {
parser.into_stream(nd_streaming::split_stream(data_stream))
} else {
parser.into_stream(data_stream)
diff --git a/src/connector/src/source/nexmark/mod.rs b/src/connector/src/source/nexmark/mod.rs
index 474eeaf6f0619..0c63a96dfe683 100644
--- a/src/connector/src/source/nexmark/mod.rs
+++ b/src/connector/src/source/nexmark/mod.rs
@@ -43,11 +43,9 @@ const fn none() -> Option {
None
}
-pub type NexmarkProperties = Box;
-
#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
-pub struct NexmarkPropertiesInner {
+pub struct NexmarkProperties {
#[serde_as(as = "DisplayFromStr")]
#[serde(rename = "nexmark.split.num", default = "identity_i32::<1>")]
pub split_num: i32,
@@ -233,15 +231,15 @@ fn default_event_num() -> u64 {
u64::MAX
}
-impl Default for NexmarkPropertiesInner {
+impl Default for NexmarkProperties {
fn default() -> Self {
let v = serde_json::to_value(HashMap::::new()).unwrap();
- NexmarkPropertiesInner::deserialize(v).unwrap()
+ NexmarkProperties::deserialize(v).unwrap()
}
}
-impl From<&NexmarkPropertiesInner> for NexmarkConfig {
- fn from(value: &NexmarkPropertiesInner) -> Self {
+impl From<&NexmarkProperties> for NexmarkConfig {
+ fn from(value: &NexmarkProperties) -> Self {
// 2015-07-15 00:00:00
pub const BASE_TIME: u64 = 1_436_918_400_000;
diff --git a/src/connector/src/source/nexmark/source/reader.rs b/src/connector/src/source/nexmark/source/reader.rs
index 28dc76eadeb48..c8e7b0c41d7ea 100644
--- a/src/connector/src/source/nexmark/source/reader.rs
+++ b/src/connector/src/source/nexmark/source/reader.rs
@@ -77,7 +77,7 @@ impl SplitReader for NexmarkSplitReader {
let offset = split.start_offset.unwrap_or(split_index);
let assigned_split = split;
- let mut generator = EventGenerator::new(NexmarkConfig::from(&*properties))
+ let mut generator = EventGenerator::new(NexmarkConfig::from(&properties))
.with_offset(offset)
.with_step(split_num);
// If the user doesn't specify the event type in the source definition, then the user
@@ -188,18 +188,18 @@ mod tests {
use anyhow::Result;
use super::*;
- use crate::source::nexmark::{NexmarkPropertiesInner, NexmarkSplitEnumerator};
+ use crate::source::nexmark::{NexmarkProperties, NexmarkSplitEnumerator};
use crate::source::{SourceEnumeratorContext, SplitEnumerator};
#[tokio::test]
async fn test_nexmark_split_reader() -> Result<()> {
- let props = Box::new(NexmarkPropertiesInner {
+ let props = NexmarkProperties {
split_num: 2,
min_event_gap_in_ns: 0,
table_type: Some(EventType::Bid),
max_chunk_size: 5,
..Default::default()
- });
+ };
let mut enumerator =
NexmarkSplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into())
diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml
index 25e5ade6567d9..586ff624611ae 100644
--- a/src/connector/with_options_source.yaml
+++ b/src/connector/with_options_source.yaml
@@ -225,7 +225,7 @@ NatsProperties:
- name: stream
field_type: String
required: true
-NexmarkPropertiesInner:
+NexmarkProperties:
fields:
- name: nexmark.split.num
field_type: i32
diff --git a/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs b/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs
index 7e56078e77ed2..02a85a94953c0 100644
--- a/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs
+++ b/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs
@@ -25,6 +25,6 @@ pub async fn list_version_deltas(
let resp = meta_client
.list_version_deltas(start_id, num_epochs, HummockEpoch::MAX)
.await?;
- println!("{:#?}", resp.version_deltas);
+ println!("{:#?}", resp);
Ok(())
}
diff --git a/src/ctl/src/cmd_impl/hummock/pause_resume.rs b/src/ctl/src/cmd_impl/hummock/pause_resume.rs
index d599ce2327861..bc63144547c9a 100644
--- a/src/ctl/src/cmd_impl/hummock/pause_resume.rs
+++ b/src/ctl/src/cmd_impl/hummock/pause_resume.rs
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt;
+use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::HummockEpoch;
use crate::CtlContext;
@@ -51,11 +51,13 @@ pub async fn resume_version_checkpoint(context: &CtlContext) -> anyhow::Result<(
/// added/removed for what reason (row deletion/compaction/etc.).
pub async fn replay_version(context: &CtlContext) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
- let mut base_version = meta_client
- .risectl_get_checkpoint_hummock_version()
- .await?
- .checkpoint_version
- .unwrap();
+ let mut base_version = HummockVersion::from_rpc_protobuf(
+ &meta_client
+ .risectl_get_checkpoint_hummock_version()
+ .await?
+ .checkpoint_version
+ .unwrap(),
+ );
println!("replay starts");
println!("base version {}", base_version.id);
let delta_fetch_size = 100;
@@ -65,10 +67,10 @@ pub async fn replay_version(context: &CtlContext) -> anyhow::Result<()> {
.list_version_deltas(current_delta_id, delta_fetch_size, HummockEpoch::MAX)
.await
.unwrap();
- if deltas.version_deltas.is_empty() {
+ if deltas.is_empty() {
break;
}
- for delta in deltas.version_deltas {
+ for delta in deltas {
if delta.prev_id != base_version.id {
eprintln!("missing delta log for version {}", base_version.id);
break;
diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs
index 7ed529ec02834..3843dd65f4ad5 100644
--- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs
+++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs
@@ -28,7 +28,6 @@ use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
use risingwave_frontend::TableCatalog;
-use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionExt;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
@@ -75,7 +74,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
let hummock = context
.hummock_store(HummockServiceOpts::from_env(args.data_dir.clone())?)
.await?;
- let version = hummock.inner().get_pinned_version().version();
+ let version = hummock.inner().get_pinned_version().version().clone();
let sstable_store = hummock.sstable_store();
for level in version.get_combined_levels() {
for sstable_info in &level.table_infos {
diff --git a/src/expr/core/src/sig/cast.rs b/src/expr/core/src/sig/cast.rs
deleted file mode 100644
index 73841a85e303e..0000000000000
--- a/src/expr/core/src/sig/cast.rs
+++ /dev/null
@@ -1,143 +0,0 @@
-// Copyright 2023 RisingWave Labs
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-use std::collections::BTreeMap;
-use std::sync::LazyLock;
-
-use parse_display::Display;
-use risingwave_common::types::DataTypeName;
-
-#[derive(Clone, Debug)]
-pub struct CastSig {
- pub from_type: DataTypeName,
- pub to_type: DataTypeName,
- pub context: CastContext,
-}
-
-/// The context a cast operation is invoked in. An implicit cast operation is allowed in a context
-/// that allows explicit casts, but not vice versa. See details in
-/// [PG](https://www.postgresql.org/docs/current/catalog-pg-cast.html).
-#[derive(Clone, Copy, Debug, Display, Eq, Ord, PartialEq, PartialOrd)]
-pub enum CastContext {
- #[display("i")]
- Implicit,
- #[display("a")]
- Assign,
- #[display("e")]
- Explicit,
-}
-
-pub type CastMap = BTreeMap<(DataTypeName, DataTypeName), CastContext>;
-
-pub fn cast_sigs() -> impl Iterator
- {
- CAST_MAP
- .iter()
- .map(|((from_type, to_type), context)| CastSig {
- from_type: *from_type,
- to_type: *to_type,
- context: *context,
- })
-}
-
-pub static CAST_MAP: LazyLock = LazyLock::new(|| {
- use DataTypeName as T;
-
- // Implicit cast operations in PG are organized in 3 sequences, with the reverse direction being
- // assign cast operations.
- // https://github.com/postgres/postgres/blob/e0064f0ff6dfada2695330c6bc1945fa7ae813be/src/include/catalog/pg_cast.dat#L18-L20
- let mut m = BTreeMap::new();
- insert_cast_seq(
- &mut m,
- &[
- T::Int16,
- T::Int32,
- T::Int64,
- T::Decimal,
- T::Float32,
- T::Float64,
- ],
- );
- insert_cast_seq(&mut m, &[T::Date, T::Timestamp, T::Timestamptz]);
- insert_cast_seq(&mut m, &[T::Time, T::Interval]);
-
- // Casting to and from string type.
- for t in [
- T::Boolean,
- T::Int16,
- T::Int32,
- T::Int64,
- T::Int256,
- T::Decimal,
- T::Float32,
- T::Float64,
- T::Date,
- T::Timestamp,
- T::Timestamptz,
- T::Time,
- T::Interval,
- T::Jsonb,
- T::Bytea,
- ] {
- m.insert((t, T::Varchar), CastContext::Assign);
- m.insert((T::Varchar, t), CastContext::Explicit);
- }
-
- // Casting between `decimal`, `int256`, and `float` is not allowed.
- m.insert((T::Int16, T::Int256), CastContext::Implicit);
- m.insert((T::Int32, T::Int256), CastContext::Implicit);
- m.insert((T::Int64, T::Int256), CastContext::Implicit);
-
- m.insert((T::Int256, T::Float64), CastContext::Explicit);
-
- // Misc casts allowed by PG that are neither in implicit cast sequences nor from/to string.
- m.insert((T::Timestamp, T::Time), CastContext::Assign);
- m.insert((T::Timestamptz, T::Time), CastContext::Assign);
- m.insert((T::Boolean, T::Int32), CastContext::Explicit);
- m.insert((T::Int32, T::Boolean), CastContext::Explicit);
-
- // Casting from jsonb to bool / number.
- for t in [
- T::Boolean,
- T::Int16,
- T::Int32,
- T::Int64,
- T::Decimal,
- T::Float32,
- T::Float64,
- ] {
- m.insert((T::Jsonb, t), CastContext::Explicit);
- }
-
- m
-});
-
-fn insert_cast_seq(
- m: &mut BTreeMap<(DataTypeName, DataTypeName), CastContext>,
- types: &[DataTypeName],
-) {
- for (source_index, source_type) in types.iter().enumerate() {
- for (target_index, target_type) in types.iter().enumerate() {
- let cast_context = match source_index.cmp(&target_index) {
- std::cmp::Ordering::Less => CastContext::Implicit,
- // Unnecessary cast between the same type should have been removed.
- // Note that sizing cast between `NUMERIC(18, 3)` and `NUMERIC(20, 4)` or between
- // `int` and `int not null` may still be necessary. But we do not have such types
- // yet.
- std::cmp::Ordering::Equal => continue,
- std::cmp::Ordering::Greater => CastContext::Assign,
- };
- m.insert((*source_type, *target_type), cast_context);
- }
- }
-}
diff --git a/src/expr/core/src/sig/mod.rs b/src/expr/core/src/sig/mod.rs
index 738b4f6b9eaf9..32593607381c5 100644
--- a/src/expr/core/src/sig/mod.rs
+++ b/src/expr/core/src/sig/mod.rs
@@ -29,8 +29,6 @@ use crate::expr::BoxedExpression;
use crate::table_function::BoxedTableFunction;
use crate::ExprError;
-pub mod cast;
-
/// The global registry of all function signatures.
pub static FUNCTION_REGISTRY: LazyLock = LazyLock::new(|| unsafe {
// SAFETY: this function is called after all `#[ctor]` functions are called.
diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml
index 73df1156cd035..e279ccb38ba69 100644
--- a/src/frontend/Cargo.toml
+++ b/src/frontend/Cargo.toml
@@ -56,6 +56,7 @@ risingwave_common = { workspace = true }
risingwave_common_service = { workspace = true }
risingwave_connector = { workspace = true }
risingwave_expr = { workspace = true }
+risingwave_hummock_sdk = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_source = { workspace = true }
diff --git a/src/frontend/planner_test/tests/testdata/output/subquery.yaml b/src/frontend/planner_test/tests/testdata/output/subquery.yaml
index 7dd59eed432c6..ebdfed20bd32b 100644
--- a/src/frontend/planner_test/tests/testdata/output/subquery.yaml
+++ b/src/frontend/planner_test/tests/testdata/output/subquery.yaml
@@ -253,7 +253,7 @@
│ │ │ │ │ │ └─LogicalProject { exprs: [rw_sources.id, rw_sources.name, 'source':Varchar, rw_sources.schema_id, rw_sources.owner, rw_sources.definition, rw_sources.acl] }
│ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at] }
│ │ │ │ │ └─LogicalProject { exprs: [rw_indexes.id, rw_indexes.name, 'index':Varchar, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl] }
- │ │ │ │ │ └─LogicalSysScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.primary_table_id, rw_indexes.original_column_ids, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl, rw_indexes.initialized_at, rw_indexes.created_at] }
+ │ │ │ │ │ └─LogicalSysScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.primary_table_id, rw_indexes.indkey, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl, rw_indexes.initialized_at, rw_indexes.created_at] }
│ │ │ │ └─LogicalProject { exprs: [rw_sinks.id, rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.definition, rw_sinks.acl] }
│ │ │ │ └─LogicalSysScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.connector, rw_sinks.sink_type, rw_sinks.connection_id, rw_sinks.definition, rw_sinks.acl, rw_sinks.initialized_at, rw_sinks.created_at] }
│ │ │ └─LogicalProject { exprs: [rw_materialized_views.id, rw_materialized_views.name, 'materialized view':Varchar, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.acl] }
diff --git a/src/frontend/src/catalog/index_catalog.rs b/src/frontend/src/catalog/index_catalog.rs
index e5537b06717bd..4e66cea8e73ff 100644
--- a/src/frontend/src/catalog/index_catalog.rs
+++ b/src/frontend/src/catalog/index_catalog.rs
@@ -23,7 +23,6 @@ use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::{PbIndex, PbStreamJobStatus};
-use super::ColumnId;
use crate::catalog::{DatabaseId, OwnedByUserCatalog, SchemaId, TableCatalog};
use crate::expr::{Expr, ExprDisplay, ExprImpl, FunctionCall};
use crate::user::UserId;
@@ -58,7 +57,7 @@ pub struct IndexCatalog {
#[educe(Hash(ignore))]
pub function_mapping: HashMap,
- pub original_columns: Vec,
+ pub index_columns_len: u32,
pub created_at_epoch: Option,
@@ -105,13 +104,6 @@ impl IndexCatalog {
})
.collect();
- let original_columns = index_prost
- .original_columns
- .clone()
- .into_iter()
- .map(Into::into)
- .collect();
-
IndexCatalog {
id: index_prost.id.into(),
name: index_prost.name.clone(),
@@ -121,7 +113,7 @@ impl IndexCatalog {
primary_to_secondary_mapping,
secondary_to_primary_mapping,
function_mapping,
- original_columns,
+ index_columns_len: index_prost.index_columns_len,
created_at_epoch: index_prost.created_at_epoch.map(Epoch::from),
initialized_at_epoch: index_prost.initialized_at_epoch.map(Epoch::from),
}
@@ -181,7 +173,7 @@ impl IndexCatalog {
.iter()
.map(|expr| expr.to_expr_proto())
.collect_vec(),
- original_columns: self.original_columns.iter().map(Into::into).collect_vec(),
+ index_columns_len: self.index_columns_len,
initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
created_at_epoch: self.created_at_epoch.map(|e| e.0),
stream_job_status: PbStreamJobStatus::Creating.into(),
diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_index.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_index.rs
index c6ac7da458e2c..f98aacc680f6f 100644
--- a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_index.rs
+++ b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_index.rs
@@ -47,9 +47,9 @@ pub static PG_INDEX: LazyLock = LazyLock::new(|| BuiltinView {
columns: &PG_INDEX_COLUMNS,
sql: "SELECT id AS indexrelid, \
primary_table_id AS indrelid, \
- ARRAY_LENGTH(original_column_ids)::smallint AS indnatts, \
+ ARRAY_LENGTH(indkey)::smallint AS indnatts, \
false AS indisunique, \
- original_column_ids AS indkey, \
+ indkey, \
ARRAY[]::smallint[] as indoption, \
NULL AS indexprs, \
NULL AS indpred, \
diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs
index 97269341d59f3..0c62a61cab738 100644
--- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs
+++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs
@@ -16,7 +16,7 @@ use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME;
use risingwave_common::error::Result;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, ScalarImpl};
-use risingwave_pb::hummock::HummockVersion;
+use risingwave_hummock_sdk::version::HummockVersion;
use serde_json::json;
use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl};
diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs
index ab9f7b3f2eb22..f8991b7a229c8 100644
--- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs
+++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs
@@ -28,10 +28,7 @@ pub static RW_INDEXES_COLUMNS: LazyLock>> = Lazy
(DataType::Int32, "id"),
(DataType::Varchar, "name"),
(DataType::Int32, "primary_table_id"),
- (
- DataType::List(Box::new(DataType::Int16)),
- "original_column_ids",
- ),
+ (DataType::List(Box::new(DataType::Int16)), "indkey"),
(DataType::Int32, "schema_id"),
(DataType::Int32, "owner"),
(DataType::Varchar, "definition"),
@@ -62,9 +59,17 @@ impl SysCatalogReaderImpl {
Some(ScalarImpl::Int32(index.primary_table.id().table_id as i32)),
Some(ScalarImpl::List(ListValue::from_iter(
index
- .original_columns
+ .index_item
.iter()
- .map(|index| index.get_id() as i16 + 1),
+ .take(index.index_columns_len as usize)
+ .map(|index| {
+ let ind = if let Some(input_ref) = index.as_input_ref() {
+ input_ref.index() + 1
+ } else {
+ 0
+ };
+ ind as i16
+ }),
))),
Some(ScalarImpl::Int32(schema.id() as i32)),
Some(ScalarImpl::Int32(index.index_table.owner as i32)),
diff --git a/src/frontend/src/expr/type_inference/cast.rs b/src/frontend/src/expr/type_inference/cast.rs
index b941732a2a720..ccd99048557ee 100644
--- a/src/frontend/src/expr/type_inference/cast.rs
+++ b/src/frontend/src/expr/type_inference/cast.rs
@@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use std::collections::BTreeMap;
+use std::sync::LazyLock;
+
use itertools::Itertools as _;
+use parse_display::Display;
use risingwave_common::error::ErrorCode;
use risingwave_common::types::{DataType, DataTypeName};
use risingwave_common::util::iter_util::ZipEqFast;
-pub use risingwave_expr::sig::cast::*;
use crate::expr::{Expr as _, ExprImpl, InputRef, Literal};
@@ -165,6 +168,87 @@ pub fn cast_map_array() -> Vec<(DataTypeName, DataTypeName, CastContext)> {
.collect_vec()
}
+#[derive(Clone, Debug)]
+pub struct CastSig {
+ pub from_type: DataTypeName,
+ pub to_type: DataTypeName,
+ pub context: CastContext,
+}
+
+/// The context a cast operation is invoked in. An implicit cast operation is allowed in a context
+/// that allows explicit casts, but not vice versa. See details in
+/// [PG](https://www.postgresql.org/docs/current/catalog-pg-cast.html).
+#[derive(Clone, Copy, Debug, Display, Eq, Ord, PartialEq, PartialOrd)]
+pub enum CastContext {
+ #[display("i")]
+ Implicit,
+ #[display("a")]
+ Assign,
+ #[display("e")]
+ Explicit,
+}
+
+pub type CastMap = BTreeMap<(DataTypeName, DataTypeName), CastContext>;
+
+pub fn cast_sigs() -> impl Iterator
- {
+ CAST_MAP
+ .iter()
+ .map(|((from_type, to_type), context)| CastSig {
+ from_type: *from_type,
+ to_type: *to_type,
+ context: *context,
+ })
+}
+
+pub static CAST_MAP: LazyLock = LazyLock::new(|| {
+ // cast rules:
+ // 1. implicit cast operations in PG are organized in 3 sequences,
+ // with the reverse direction being assign cast operations.
+ // https://github.com/postgres/postgres/blob/e0064f0ff6dfada2695330c6bc1945fa7ae813be/src/include/catalog/pg_cast.dat#L18-L20
+ // 1. int2 -> int4 -> int8 -> numeric -> float4 -> float8
+ // 2. date -> timestamp -> timestamptz
+ // 3. time -> interval
+ // 2. any -> varchar is assign and varchar -> any is explicit
+ // 3. jsonb -> bool/number is explicit
+ // 4. int32 <-> bool is explicit
+ // 5. timestamp/timestamptz -> time is assign
+ // 6. int2/int4/int8 -> int256 is implicit and int256 -> float8 is explicit
+ use DataTypeName::*;
+ const CAST_TABLE: &[(&str, DataTypeName)] = &[
+ // 123456789ABCDEF
+ (". e a", Boolean), // 0
+ (" .iiiiii a", Int16), // 1
+ ("ea.iiiii a", Int32), // 2
+ (" aa.iiii a", Int64), // 3
+ (" aaa.ii a", Decimal), // 4
+ (" aaaa.i a", Float32), // 5
+ (" aaaaa. a", Float64), // 6
+ (" e. a", Int256), // 7
+ (" .ii a", Date), // 8
+ (" a.ia a", Timestamp), // 9
+ (" aa.a a", Timestamptz), // A
+ (" .i a", Time), // B
+ (" a. a", Interval), // C
+ ("eeeeeee . a", Jsonb), // D
+ (" .a", Bytea), // E
+ ("eeeeeeeeeeeeeee.", Varchar), // F
+ ];
+ let mut map = BTreeMap::new();
+ for (row, source) in CAST_TABLE {
+ for ((_, target), c) in CAST_TABLE.iter().zip_eq_fast(row.bytes()) {
+ let context = match c {
+ b' ' | b'.' => continue,
+ b'i' => CastContext::Implicit,
+ b'a' => CastContext::Assign,
+ b'e' => CastContext::Explicit,
+ _ => unreachable!("invalid cast table char"),
+ };
+ map.insert((*source, *target), context);
+ }
+ }
+ map
+});
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/src/frontend/src/handler/create_index.rs b/src/frontend/src/handler/create_index.rs
index 1553a84c1a09b..f2fa8bca2de4e 100644
--- a/src/frontend/src/handler/create_index.rs
+++ b/src/frontend/src/handler/create_index.rs
@@ -219,13 +219,7 @@ pub(crate) fn gen_create_index_plan(
index_table_prost.owner = session.user_id();
index_table_prost.dependent_relations = vec![table.id.table_id];
- // FIXME: why sqlalchemy need these information?
- let original_columns = index_table
- .columns
- .iter()
- .map(|x| x.column_desc.column_id.get_id())
- .collect();
-
+ let index_columns_len = index_columns_ordered_expr.len() as u32;
let index_item = build_index_item(
index_table.table_desc().into(),
table.name(),
@@ -242,7 +236,7 @@ pub(crate) fn gen_create_index_plan(
index_table_id: TableId::placeholder().table_id,
primary_table_id: table.id.table_id,
index_item,
- original_columns,
+ index_columns_len,
initialized_at_epoch: None,
created_at_epoch: None,
stream_job_status: PbStreamJobStatus::Creating.into(),
diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs
index ea382a03e2c5a..c7753dd20e01a 100644
--- a/src/frontend/src/meta_client.rs
+++ b/src/frontend/src/meta_client.rs
@@ -15,6 +15,7 @@
use std::collections::HashMap;
use risingwave_common::system_param::reader::SystemParamsReader;
+use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_pb::backup_service::MetaSnapshotMetadata;
use risingwave_pb::catalog::Table;
use risingwave_pb::common::WorkerNode;
@@ -22,7 +23,7 @@ use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
- HummockSnapshot, HummockVersion, HummockVersionDelta,
+ HummockSnapshot,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
@@ -223,15 +224,12 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0
.risectl_get_checkpoint_hummock_version()
.await
- .map(|v| v.checkpoint_version.unwrap())
+ .map(|v| HummockVersion::from_rpc_protobuf(&v.checkpoint_version.unwrap()))
}
async fn list_version_deltas(&self) -> Result> {
// FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks.
- self.0
- .list_version_deltas(0, u32::MAX, u64::MAX)
- .await
- .map(|v| v.version_deltas)
+ self.0.list_version_deltas(0, u32::MAX, u64::MAX).await
}
async fn list_branched_objects(&self) -> Result> {
diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs
index e68cb3ff80d28..ba786e2bb34a3 100644
--- a/src/frontend/src/session.rs
+++ b/src/frontend/src/session.rs
@@ -436,7 +436,7 @@ impl FrontendEnv {
Arc::new(BackgroundShutdownRuntime::from(
Builder::new_multi_thread()
.worker_threads(4)
- .thread_name("frontend-compute-threads")
+ .thread_name("rw-batch-local")
.enable_all()
.build()
.unwrap(),
diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs
index 2bfd561f14a0f..819542c431a38 100644
--- a/src/frontend/src/test_utils.rs
+++ b/src/frontend/src/test_utils.rs
@@ -31,6 +31,7 @@ use risingwave_common::catalog::{
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
+use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_pb::backup_service::MetaSnapshotMetadata;
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{
@@ -45,7 +46,7 @@ use risingwave_pb::ddl_service::{
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
- HummockSnapshot, HummockVersion, HummockVersionDelta,
+ HummockSnapshot,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs
index d7fa7533c4738..e0771ef973084 100644
--- a/src/jni_core/src/hummock_iterator.rs
+++ b/src/jni_core/src/hummock_iterator.rs
@@ -24,6 +24,7 @@ use risingwave_common::util::select_all;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange};
+use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_object_store::object::build_remote_object_store;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_pb::java_binding::key_range::Bound;
@@ -84,7 +85,10 @@ impl HummockJavaBindingIterator {
let mut streams = Vec::with_capacity(read_plan.vnode_ids.len());
let key_range = read_plan.key_range.unwrap();
- let pin_version = PinnedVersion::new(read_plan.version.unwrap(), unbounded_channel().0);
+ let pin_version = PinnedVersion::new(
+ HummockVersion::from_rpc_protobuf(&read_plan.version.unwrap()),
+ unbounded_channel().0,
+ );
let table_id = read_plan.table_id.into();
for vnode in read_plan.vnode_ids {
diff --git a/src/jni_core/src/macros.rs b/src/jni_core/src/macros.rs
index 89b10b98b9d4c..97a9e60492844 100644
--- a/src/jni_core/src/macros.rs
+++ b/src/jni_core/src/macros.rs
@@ -442,7 +442,7 @@ macro_rules! for_all_plain_native_methods {
($macro:path $(,$args:tt)*) => {
$macro! {
{
- public static native void tracingSlf4jEvent(String name, int level, String string);
+ public static native void tracingSlf4jEvent(String threadName, String name, int level, String string);
public static native int vnodeCount();
@@ -882,7 +882,7 @@ mod tests {
// This test shows the signature of all native methods
let expected = expect_test::expect![[r#"
[
- tracingSlf4jEvent (Ljava/lang/String;ILjava/lang/String;)V,
+ tracingSlf4jEvent (Ljava/lang/String;Ljava/lang/String;ILjava/lang/String;)V,
vnodeCount ()I,
iteratorNewHummock ([B)J,
iteratorNewStreamChunk (J)J,
diff --git a/src/jni_core/src/tracing_slf4j.rs b/src/jni_core/src/tracing_slf4j.rs
index ce410b9bcb001..8f7222d11c647 100644
--- a/src/jni_core/src/tracing_slf4j.rs
+++ b/src/jni_core/src/tracing_slf4j.rs
@@ -23,11 +23,15 @@ use crate::{execute_and_catch, EnvParam};
#[no_mangle]
pub(crate) extern "system" fn Java_com_risingwave_java_binding_Binding_tracingSlf4jEvent(
env: EnvParam<'_>,
+ thread_name: JString<'_>,
class_name: JString<'_>,
level: jint,
message: JString<'_>,
) {
execute_and_catch(env, move |env| {
+ let thread_name = env.get_string(&thread_name)?;
+ let thread_name: Cow<'_, str> = (&thread_name).into();
+
let class_name = env.get_string(&class_name)?;
let class_name: Cow<'_, str> = (&class_name).into();
@@ -39,7 +43,8 @@ pub(crate) extern "system" fn Java_com_risingwave_java_binding_Binding_tracingSl
tracing::event!(
target: "risingwave_connector_node",
$lvl,
- class = class_name.as_ref(),
+ thread = &*thread_name,
+ class = &*class_name,
"{message}",
)
};
diff --git a/src/meta/model_v2/Cargo.toml b/src/meta/model_v2/Cargo.toml
index 1d9992da8a832..f080645fc1c6a 100644
--- a/src/meta/model_v2/Cargo.toml
+++ b/src/meta/model_v2/Cargo.toml
@@ -14,6 +14,7 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]
[dependencies]
+risingwave_hummock_sdk = { workspace = true }
risingwave_pb = { workspace = true }
sea-orm = { version = "0.12.0", features = [
"sqlx-mysql",
diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs
index 4a4cb601f2a06..fbee1aba46b94 100644
--- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs
+++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs
@@ -395,6 +395,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null())
.col(ColumnDef::new(Actor::UpstreamActorIds).json())
.col(ColumnDef::new(Actor::VnodeBitmap).json())
+ .col(ColumnDef::new(Actor::ExprContext).json().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_actor_fragment_id")
@@ -538,7 +539,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Table::StreamKey).json().not_null())
.col(ColumnDef::new(Table::AppendOnly).boolean().not_null())
.col(ColumnDef::new(Table::Properties).json().not_null())
- .col(ColumnDef::new(Table::FragmentId).integer().not_null())
+ .col(ColumnDef::new(Table::FragmentId).integer())
.col(ColumnDef::new(Table::VnodeColIndex).integer())
.col(ColumnDef::new(Table::RowIdIndex).integer())
.col(ColumnDef::new(Table::ValueIndices).json().not_null())
@@ -562,10 +563,8 @@ impl MigrationTrait for Migration {
.boolean()
.not_null(),
)
- .col(ColumnDef::new(Table::JobStatus).string().not_null())
- .col(ColumnDef::new(Table::CreateType).string().not_null())
.col(ColumnDef::new(Table::Description).string())
- .col(ColumnDef::new(Table::Version).json().not_null())
+ .col(ColumnDef::new(Table::Version).json())
.foreign_key(
&mut ForeignKey::create()
.name("FK_table_object_id")
@@ -623,7 +622,6 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Sink::DbName).string().not_null())
.col(ColumnDef::new(Sink::SinkFromName).string().not_null())
.col(ColumnDef::new(Sink::SinkFormatDesc).json())
- .col(ColumnDef::new(Sink::JobStatus).string().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_sink_object_id")
@@ -671,8 +669,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Index::IndexTableId).integer().not_null())
.col(ColumnDef::new(Index::PrimaryTableId).integer().not_null())
.col(ColumnDef::new(Index::IndexItems).json().not_null())
- .col(ColumnDef::new(Index::OriginalColumns).json().not_null())
- .col(ColumnDef::new(Index::JobStatus).string().not_null())
+ .col(ColumnDef::new(Index::IndexColumnsLen).integer().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_index_object_id")
@@ -969,6 +966,7 @@ enum Actor {
ParallelUnitId,
UpstreamActorIds,
VnodeBitmap,
+ ExprContext,
}
#[derive(DeriveIden)]
@@ -1021,8 +1019,6 @@ enum Table {
DmlFragmentId,
Cardinality,
CleanedByWatermark,
- JobStatus,
- CreateType,
Description,
Version,
}
@@ -1060,7 +1056,6 @@ enum Sink {
DbName,
SinkFromName,
SinkFormatDesc,
- JobStatus,
}
#[derive(DeriveIden)]
@@ -1089,8 +1084,7 @@ enum Index {
IndexTableId,
PrimaryTableId,
IndexItems,
- OriginalColumns,
- JobStatus,
+ IndexColumnsLen,
}
#[derive(DeriveIden)]
diff --git a/src/meta/model_v2/src/function.rs b/src/meta/model_v2/src/function.rs
index c4774b177eabc..2013a58403e62 100644
--- a/src/meta/model_v2/src/function.rs
+++ b/src/meta/model_v2/src/function.rs
@@ -74,6 +74,16 @@ impl From for FunctionKind {
}
}
+impl From for Kind {
+ fn from(value: FunctionKind) -> Self {
+ match value {
+ FunctionKind::Scalar => Self::Scalar(Default::default()),
+ FunctionKind::Table => Self::Table(Default::default()),
+ FunctionKind::Aggregate => Self::Aggregate(Default::default()),
+ }
+ }
+}
+
impl From for ActiveModel {
fn from(function: PbFunction) -> Self {
Self {
diff --git a/src/meta/model_v2/src/hummock_version_delta.rs b/src/meta/model_v2/src/hummock_version_delta.rs
index f59f0c6f95455..ee7f5120b28a7 100644
--- a/src/meta/model_v2/src/hummock_version_delta.rs
+++ b/src/meta/model_v2/src/hummock_version_delta.rs
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use risingwave_pb::hummock::HummockVersionDelta;
+use risingwave_pb::hummock::PbHummockVersionDelta;
use sea_orm::entity::prelude::*;
use sea_orm::FromJsonQueryResult;
use serde::{Deserialize, Serialize};
@@ -36,9 +36,9 @@ pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
-crate::derive_from_json_struct!(FullVersionDelta, HummockVersionDelta);
+crate::derive_from_json_struct!(FullVersionDelta, PbHummockVersionDelta);
-impl From for HummockVersionDelta {
+impl From for PbHummockVersionDelta {
fn from(value: Model) -> Self {
let ret = value.full_version_delta.into_inner();
assert_eq!(value.id, ret.id as i64);
diff --git a/src/meta/model_v2/src/index.rs b/src/meta/model_v2/src/index.rs
index c85a896914240..0146fe34f4050 100644
--- a/src/meta/model_v2/src/index.rs
+++ b/src/meta/model_v2/src/index.rs
@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use risingwave_pb::catalog::PbIndex;
use sea_orm::entity::prelude::*;
+use sea_orm::ActiveValue::Set;
-use crate::{ExprNodeArray, I32Array, IndexId, JobStatus, TableId};
+use crate::{ExprNodeArray, IndexId, TableId};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "index")]
@@ -25,8 +27,7 @@ pub struct Model {
pub index_table_id: TableId,
pub primary_table_id: TableId,
pub index_items: ExprNodeArray,
- pub original_columns: I32Array,
- pub job_status: JobStatus,
+ pub index_columns_len: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@@ -64,3 +65,16 @@ impl Related for Entity {
}
impl ActiveModelBehavior for ActiveModel {}
+
+impl From for ActiveModel {
+ fn from(pb_index: PbIndex) -> Self {
+ Self {
+ index_id: Set(pb_index.id as _),
+ name: Set(pb_index.name),
+ index_table_id: Set(pb_index.index_table_id as _),
+ primary_table_id: Set(pb_index.primary_table_id as _),
+ index_items: Set(pb_index.index_item.into()),
+ index_columns_len: Set(pb_index.index_columns_len as _),
+ }
+ }
+}
diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs
index afbb24d24204c..ffb994d06f2e3 100644
--- a/src/meta/model_v2/src/lib.rs
+++ b/src/meta/model_v2/src/lib.rs
@@ -123,12 +123,27 @@ impl From for PbCreateType {
}
}
+impl From for CreateType {
+ fn from(create_type: PbCreateType) -> Self {
+ match create_type {
+ PbCreateType::Background => Self::Background,
+ PbCreateType::Foreground => Self::Foreground,
+ PbCreateType::Unspecified => unreachable!("Unspecified create type"),
+ }
+ }
+}
+
/// Defines struct with a single pb field that derives `FromJsonQueryResult`, it will helps to map json value stored in database to Pb struct.
macro_rules! derive_from_json_struct {
($struct_name:ident, $field_type:ty) => {
#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
pub struct $struct_name(pub $field_type);
impl Eq for $struct_name {}
+ impl From<$field_type> for $struct_name {
+ fn from(value: $field_type) -> Self {
+ Self(value)
+ }
+ }
impl $struct_name {
pub fn into_inner(self) -> $field_type {
diff --git a/src/meta/model_v2/src/sink.rs b/src/meta/model_v2/src/sink.rs
index 21ac172246703..7f2f6cc5f9a0b 100644
--- a/src/meta/model_v2/src/sink.rs
+++ b/src/meta/model_v2/src/sink.rs
@@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use risingwave_pb::catalog::PbSinkType;
+use risingwave_pb::catalog::{PbSink, PbSinkType};
use sea_orm::entity::prelude::*;
+use sea_orm::ActiveValue::Set;
use crate::{
- ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, JobStatus, Property,
- SinkFormatDesc, SinkId,
+ ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, Property, SinkFormatDesc, SinkId,
};
#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
@@ -41,6 +41,17 @@ impl From for PbSinkType {
}
}
+impl From for SinkType {
+ fn from(sink_type: PbSinkType) -> Self {
+ match sink_type {
+ PbSinkType::AppendOnly => Self::AppendOnly,
+ PbSinkType::ForceAppendOnly => Self::ForceAppendOnly,
+ PbSinkType::Upsert => Self::Upsert,
+ PbSinkType::Unspecified => unreachable!("Unspecified sink type"),
+ }
+ }
+}
+
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "sink")]
pub struct Model {
@@ -58,7 +69,6 @@ pub struct Model {
pub db_name: String,
pub sink_from_name: String,
pub sink_format_desc: Option,
- pub job_status: JobStatus,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@@ -94,3 +104,25 @@ impl Related for Entity {
}
impl ActiveModelBehavior for ActiveModel {}
+
+impl From for ActiveModel {
+ fn from(pb_sink: PbSink) -> Self {
+ let sink_type = pb_sink.sink_type();
+
+ Self {
+ sink_id: Set(pb_sink.id as _),
+ name: Set(pb_sink.name),
+ columns: Set(pb_sink.columns.into()),
+ plan_pk: Set(pb_sink.plan_pk.into()),
+ distribution_key: Set(pb_sink.distribution_key.into()),
+ downstream_pk: Set(pb_sink.downstream_pk.into()),
+ sink_type: Set(sink_type.into()),
+ properties: Set(pb_sink.properties.into()),
+ definition: Set(pb_sink.definition),
+ connection_id: Set(pb_sink.connection_id.map(|x| x as _)),
+ db_name: Set(pb_sink.db_name),
+ sink_from_name: Set(pb_sink.sink_from_name),
+ sink_format_desc: Set(pb_sink.format_desc.map(|x| x.into())),
+ }
+ }
+}
diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model_v2/src/table.rs
index 9b1538c059af3..1141c6adb48ab 100644
--- a/src/meta/model_v2/src/table.rs
+++ b/src/meta/model_v2/src/table.rs
@@ -13,12 +13,14 @@
// limitations under the License.
use risingwave_pb::catalog::table::PbTableType;
-use risingwave_pb::catalog::PbHandleConflictBehavior;
+use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable};
use sea_orm::entity::prelude::*;
+use sea_orm::ActiveValue::Set;
+use sea_orm::NotSet;
use crate::{
- Cardinality, ColumnCatalogArray, ColumnOrderArray, CreateType, FragmentId, I32Array, JobStatus,
- ObjectId, Property, SourceId, TableId, TableVersion,
+ Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, ObjectId, Property,
+ SourceId, TableId, TableVersion,
};
#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
@@ -45,6 +47,18 @@ impl From for PbTableType {
}
}
+impl From for TableType {
+ fn from(table_type: PbTableType) -> Self {
+ match table_type {
+ PbTableType::Table => Self::Table,
+ PbTableType::MaterializedView => Self::MaterializedView,
+ PbTableType::Index => Self::Index,
+ PbTableType::Internal => Self::Internal,
+ PbTableType::Unspecified => unreachable!("Unspecified table type"),
+ }
+ }
+}
+
#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum HandleConflictBehavior {
@@ -66,6 +80,19 @@ impl From for PbHandleConflictBehavior {
}
}
+impl From for HandleConflictBehavior {
+ fn from(handle_conflict_behavior: PbHandleConflictBehavior) -> Self {
+ match handle_conflict_behavior {
+ PbHandleConflictBehavior::Overwrite => Self::Overwrite,
+ PbHandleConflictBehavior::Ignore => Self::Ignore,
+ PbHandleConflictBehavior::NoCheck => Self::NoCheck,
+ PbHandleConflictBehavior::Unspecified => {
+ unreachable!("Unspecified handle conflict behavior")
+ }
+ }
+ }
+}
+
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "table")]
pub struct Model {
@@ -81,7 +108,7 @@ pub struct Model {
pub stream_key: I32Array,
pub append_only: bool,
pub properties: Property,
- pub fragment_id: FragmentId,
+ pub fragment_id: Option,
pub vnode_col_index: Option,
pub row_id_index: Option,
pub value_indices: I32Array,
@@ -93,10 +120,8 @@ pub struct Model {
pub dml_fragment_id: Option,
pub cardinality: Option,
pub cleaned_by_watermark: bool,
- pub job_status: JobStatus,
- pub create_type: CreateType,
pub description: Option,
- pub version: TableVersion,
+ pub version: Option,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@@ -156,3 +181,38 @@ impl Related for Entity {
}
impl ActiveModelBehavior for ActiveModel {}
+
+impl From for ActiveModel {
+ fn from(pb_table: PbTable) -> Self {
+ let table_type = pb_table.table_type();
+ let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();
+
+ Self {
+ table_id: Set(pb_table.id as _),
+ name: Set(pb_table.name),
+ optional_associated_source_id: NotSet,
+ table_type: Set(table_type.into()),
+ belongs_to_job_id: Set(None),
+ columns: Set(pb_table.columns.into()),
+ pk: Set(pb_table.pk.into()),
+ distribution_key: Set(pb_table.distribution_key.into()),
+ stream_key: Set(pb_table.stream_key.into()),
+ append_only: Set(pb_table.append_only),
+ properties: Set(pb_table.properties.into()),
+ fragment_id: NotSet,
+ vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
+ row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
+ value_indices: Set(pb_table.value_indices.into()),
+ definition: Set(pb_table.definition),
+ handle_pk_conflict_behavior: Set(handle_pk_conflict_behavior.into()),
+ read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
+ watermark_indices: Set(pb_table.watermark_indices.into()),
+ dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
+ dml_fragment_id: NotSet,
+ cardinality: Set(pb_table.cardinality.map(|x| x.into())),
+ cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
+ description: Set(pb_table.description),
+ version: Set(pb_table.version.map(|v| v.into())),
+ }
+ }
+}
diff --git a/src/meta/service/Cargo.toml b/src/meta/service/Cargo.toml
index b734b62106495..c9b6619565cd7 100644
--- a/src/meta/service/Cargo.toml
+++ b/src/meta/service/Cargo.toml
@@ -23,6 +23,7 @@ rand = "0.8"
regex = "1"
risingwave_common = { workspace = true }
risingwave_connector = { workspace = true }
+risingwave_hummock_sdk = { workspace = true }
risingwave_meta = { workspace = true }
risingwave_meta_model_v2 = { workspace = true }
risingwave_pb = { workspace = true }
diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs
index 835032769266f..f5a7e5b7353b1 100644
--- a/src/meta/service/src/hummock_service.rs
+++ b/src/meta/service/src/hummock_service.rs
@@ -18,10 +18,10 @@ use std::time::Duration;
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::catalog::{TableId, NON_RESERVED_SYS_CATALOG_ID};
+use risingwave_hummock_sdk::version::HummockVersionDelta;
use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
-use risingwave_pb::hummock::version_update_payload::Payload;
use risingwave_pb::hummock::*;
use tonic::{Request, Response, Status, Streaming};
@@ -29,6 +29,7 @@ use crate::hummock::compaction::selector::ManualCompactionOption;
use crate::hummock::{HummockManagerRef, VacuumManagerRef};
use crate::manager::FragmentManagerRef;
use crate::RwReceiverStream;
+
pub struct HummockServiceImpl {
hummock_manager: HummockManagerRef,
vacuum_manager: VacuumManagerRef,
@@ -83,7 +84,7 @@ impl HummockManagerService for HummockServiceImpl {
let current_version = self.hummock_manager.get_current_version().await;
Ok(Response::new(GetCurrentVersionResponse {
status: None,
- current_version: Some(current_version),
+ current_version: Some(current_version.to_protobuf()),
}))
}
@@ -94,10 +95,12 @@ impl HummockManagerService for HummockServiceImpl {
let req = request.into_inner();
let (version, compaction_groups) = self
.hummock_manager
- .replay_version_delta(req.version_delta.unwrap())
+ .replay_version_delta(HummockVersionDelta::from_rpc_protobuf(
+ &req.version_delta.unwrap(),
+ ))
.await?;
Ok(Response::new(ReplayVersionDeltaResponse {
- version: Some(version),
+ version: Some(version.to_protobuf()),
modified_compaction_groups: compaction_groups,
}))
}
@@ -119,7 +122,7 @@ impl HummockManagerService for HummockServiceImpl {
) -> Result, Status> {
let version = self.hummock_manager.disable_commit_epoch().await;
Ok(Response::new(DisableCommitEpochResponse {
- current_version: Some(version),
+ current_version: Some(version.to_protobuf()),
}))
}
@@ -133,7 +136,12 @@ impl HummockManagerService for HummockServiceImpl {
.list_version_deltas(req.start_id, req.num_limit, req.committed_epoch_limit)
.await?;
let resp = ListVersionDeltasResponse {
- version_deltas: Some(version_deltas),
+ version_deltas: Some(PbHummockVersionDeltas {
+ version_deltas: version_deltas
+ .iter()
+ .map(HummockVersionDelta::to_protobuf)
+ .collect(),
+ }),
};
Ok(Response::new(resp))
}
@@ -415,15 +423,10 @@ impl HummockManagerService for HummockServiceImpl {
request: Request,
) -> Result, Status> {
let req = request.into_inner();
- let payload = self.hummock_manager.pin_version(req.context_id).await?;
- match payload {
- Payload::PinnedVersion(version) => Ok(Response::new(PinVersionResponse {
- pinned_version: Some(version),
- })),
- Payload::VersionDeltas(_) => {
- unreachable!("pin_version should not return version delta")
- }
- }
+ let version = self.hummock_manager.pin_version(req.context_id).await?;
+ Ok(Response::new(PinVersionResponse {
+ pinned_version: Some(version.to_protobuf()),
+ }))
}
async fn split_compaction_group(
@@ -460,7 +463,7 @@ impl HummockManagerService for HummockServiceImpl {
) -> Result, Status> {
let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
- checkpoint_version: Some(checkpoint_version),
+ checkpoint_version: Some(checkpoint_version.to_protobuf()),
}))
}
diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs
index b7b63dcc6c164..1bd0be6551604 100644
--- a/src/meta/service/src/notification_service.rs
+++ b/src/meta/service/src/notification_service.rs
@@ -183,7 +183,7 @@ impl NotificationServiceImpl {
MetaSnapshot {
tables,
- hummock_version: Some(hummock_version),
+ hummock_version: Some(hummock_version.to_protobuf()),
version: Some(SnapshotVersion {
catalog_version,
..Default::default()
diff --git a/src/meta/src/backup_restore/meta_snapshot_builder.rs b/src/meta/src/backup_restore/meta_snapshot_builder.rs
index 0d3f37155a047..9d4294579d0dc 100644
--- a/src/meta/src/backup_restore/meta_snapshot_builder.rs
+++ b/src/meta/src/backup_restore/meta_snapshot_builder.rs
@@ -19,11 +19,11 @@ use anyhow::anyhow;
use risingwave_backup::error::{BackupError, BackupResult};
use risingwave_backup::meta_snapshot_v1::{ClusterMetadata, MetaSnapshotV1};
use risingwave_backup::MetaSnapshotId;
-use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt;
+use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_pb::catalog::{
Connection, Database, Function, Index, Schema, Sink, Source, Table, View,
};
-use risingwave_pb::hummock::{HummockVersion, HummockVersionDelta, HummockVersionStats};
+use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::SystemParams;
use risingwave_pb::user::UserInfo;
@@ -46,10 +46,10 @@ impl MetaSnapshotV1Builder
{
}
}
- pub async fn build>(
+ pub async fn build(
&mut self,
id: MetaSnapshotId,
- hummock_version_builder: D,
+ hummock_version_builder: impl Future