diff --git a/integration_tests/cassandra-and-syclladb-sink/README.md b/integration_tests/cassandra-and-syclladb-sink/README.md
new file mode 100644
index 0000000000000..b022c9ef09cf8
--- /dev/null
+++ b/integration_tests/cassandra-and-syclladb-sink/README.md
@@ -0,0 +1,65 @@
+# Demo: Sinking to Cassandra/Scylladb
+
+In this demo, we want to showcase how RisingWave is able to sink data to Cassandra.
+
+1. Set the compose profile accordingly:
+Demo with Apache Cassandra:
+```
+export COMPOSE_PROFILES=cassandra
+```
+
+Demo with Scylladb
+```
+export COMPOSE_PROFILES=scylladb
+```
+
+2. Launch the cluster:
+
+```sh
+docker-compose up -d
+```
+
+The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Cassandra for sink.
+
+
+3. Create the Cassandra table via cqlsh:
+
+Login to cqlsh
+```sh
+# cqlsh into cassandra
+docker compose exec cassandra cqlsh
+# cqlsh into scylladb
+docker compose exec scylladb cqlsh
+```
+
+Run the following queries to create keyspace and table.
+```sql
+CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+use demo;
+CREATE table demo_bhv_table(
+ user_id int primary key,
+ target_id text,
+ event_timestamp timestamp,
+);
+```
+
+3. Execute the SQL queries in sequence:
+
+- create_source.sql
+- create_mv.sql
+- create_sink.sql
+
+4. Execute a simple query to check the sink results via csqlsh:
+
+Login to cqlsh
+```sh
+# cqlsh into cassandra
+docker compose exec cassandra cqlsh
+# cqlsh into scylladb
+docker compose exec scylladb cqlsh
+```
+
+Run the following query
+```sql
+select user_id, count(*) from my_keyspace.demo_test group by user_id;
+```
diff --git a/integration_tests/cassandra-and-syclladb-sink/create_mv.sql b/integration_tests/cassandra-and-syclladb-sink/create_mv.sql
new file mode 100644
index 0000000000000..0a803f8a2762d
--- /dev/null
+++ b/integration_tests/cassandra-and-syclladb-sink/create_mv.sql
@@ -0,0 +1,7 @@
+CREATE MATERIALIZED VIEW bhv_mv AS
+SELECT
+ user_id,
+ target_id,
+ event_timestamp
+FROM
+ user_behaviors;
\ No newline at end of file
diff --git a/integration_tests/cassandra-and-syclladb-sink/create_sink.sql b/integration_tests/cassandra-and-syclladb-sink/create_sink.sql
new file mode 100644
index 0000000000000..724e784694c2f
--- /dev/null
+++ b/integration_tests/cassandra-and-syclladb-sink/create_sink.sql
@@ -0,0 +1,11 @@
+CREATE SINK bhv_cassandra_sink
+FROM
+ bhv_mv WITH (
+ connector = 'cassandra',
+ type = 'append-only',
+ force_append_only='true',
+ cassandra.url = 'cassandra:9042',
+ cassandra.keyspace = 'demo',
+ cassandra.table = 'demo_bhv_table',
+ cassandra.datacenter = 'datacenter1',
+);
\ No newline at end of file
diff --git a/integration_tests/cassandra-and-syclladb-sink/create_source.sql b/integration_tests/cassandra-and-syclladb-sink/create_source.sql
new file mode 100644
index 0000000000000..c28c10f3616da
--- /dev/null
+++ b/integration_tests/cassandra-and-syclladb-sink/create_source.sql
@@ -0,0 +1,18 @@
+CREATE table user_behaviors (
+ user_id int,
+ target_id VARCHAR,
+ target_type VARCHAR,
+ event_timestamp TIMESTAMP,
+ behavior_type VARCHAR,
+ parent_target_type VARCHAR,
+ parent_target_id VARCHAR,
+ PRIMARY KEY(user_id)
+) WITH (
+ connector = 'datagen',
+ fields.user_id.kind = 'sequence',
+ fields.user_id.start = '1',
+ fields.user_id.end = '1000',
+ fields.user_name.kind = 'random',
+ fields.user_name.length = '10',
+ datagen.rows.per.second = '10'
+) FORMAT PLAIN ENCODE JSON;
\ No newline at end of file
diff --git a/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml b/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml
new file mode 100644
index 0000000000000..27b77f850f882
--- /dev/null
+++ b/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml
@@ -0,0 +1,82 @@
+---
+version: "3"
+services:
+ cassandra:
+ image: cassandra:4.0
+ ports:
+ - 9042:9042
+ environment:
+ - CASSANDRA_CLUSTER_NAME=cloudinfra
+ profiles:
+ - cassandra
+ scylladb:
+ image: scylladb/scylla:5.1
+ ports:
+ - 9042:9042
+ environment:
+ - CASSANDRA_CLUSTER_NAME=cloudinfra
+ profiles:
+ - scylladb
+ compactor-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: compactor-0
+ compute-node-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: compute-node-0
+ etcd-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: etcd-0
+ frontend-node-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: frontend-node-0
+ grafana-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: grafana-0
+ meta-node-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: meta-node-0
+ connector-node:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: connector-node
+ minio-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: minio-0
+ prometheus-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: prometheus-0
+ message_queue:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: message_queue
+ datagen:
+ build: ../datagen
+ depends_on: [message_queue]
+ command:
+ - /bin/sh
+ - -c
+ - /datagen --mode clickstream --qps 2 kafka --brokers message_queue:29092
+ restart: always
+ container_name: datagen
+volumes:
+ compute-node-0:
+ external: false
+ etcd-0:
+ external: false
+ grafana-0:
+ external: false
+ minio-0:
+ external: false
+ prometheus-0:
+ external: false
+ message_queue:
+ external: false
+name: risingwave-compose
diff --git a/java/connector-node/assembly/assembly.xml b/java/connector-node/assembly/assembly.xml
index 327c7628dbb59..751ba2f410e0b 100644
--- a/java/connector-node/assembly/assembly.xml
+++ b/java/connector-node/assembly/assembly.xml
@@ -40,6 +40,7 @@
*:risingwave-sink-es-7
+ *:risingwave-sink-cassandra
*:risingwave-sink-jdbc
*:risingwave-sink-iceberg
*:risingwave-sink-deltalake
diff --git a/java/connector-node/assembly/pom.xml b/java/connector-node/assembly/pom.xml
index 5c518601bc8e7..6812bac5b63e6 100644
--- a/java/connector-node/assembly/pom.xml
+++ b/java/connector-node/assembly/pom.xml
@@ -33,6 +33,10 @@
com.risingwave.java
risingwave-sink-es-7
+
+ com.risingwave.java
+ risingwave-sink-cassandra
+
com.risingwave.java
risingwave-sink-jdbc
diff --git a/java/connector-node/risingwave-connector-service/pom.xml b/java/connector-node/risingwave-connector-service/pom.xml
index fccd84d24b9af..4e2dbe1d6ec96 100644
--- a/java/connector-node/risingwave-connector-service/pom.xml
+++ b/java/connector-node/risingwave-connector-service/pom.xml
@@ -99,5 +99,10 @@
risingwave-sink-es-7
provided
+
+ com.risingwave.java
+ risingwave-sink-cassandra
+ provided
+
diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java
index 6c0f779e287aa..ab3ac84346fa6 100644
--- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java
+++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java
@@ -43,6 +43,8 @@ public static SinkFactory getSinkFactory(String sinkName) {
return new DeltaLakeSinkFactory();
case "elasticsearch-7":
return new EsSink7Factory();
+ case "cassandra":
+ return new CassandraFactory();
default:
throw UNIMPLEMENTED
.withDescription("unknown sink type: " + sinkName)
diff --git a/java/connector-node/risingwave-connector-test/pom.xml b/java/connector-node/risingwave-connector-test/pom.xml
index d630eb183c856..7f8f6f1bc49cc 100644
--- a/java/connector-node/risingwave-connector-test/pom.xml
+++ b/java/connector-node/risingwave-connector-test/pom.xml
@@ -174,5 +174,10 @@
risingwave-sink-es-7
test
+
+ com.risingwave.java
+ risingwave-sink-cassandra
+ test
+
diff --git a/java/connector-node/risingwave-sink-cassandra/pom.xml b/java/connector-node/risingwave-sink-cassandra/pom.xml
new file mode 100644
index 0000000000000..e51faa9691cf9
--- /dev/null
+++ b/java/connector-node/risingwave-sink-cassandra/pom.xml
@@ -0,0 +1,60 @@
+
+
+
+ java-parent
+ com.risingwave.java
+ 1.0-SNAPSHOT
+ ../../pom.xml
+
+ 4.0.0
+
+ risingwave-sink-cassandra
+ 1.0-SNAPSHOT
+ risingwave-sink-cassandra
+
+
+
+ com.risingwave.java
+ proto
+
+
+ com.risingwave.java
+ connector-api
+
+
+ org.apache.logging.log4j
+ log4j-api
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+ org.apache.logging.log4j
+ log4j-core
+
+
+ org.apache.commons
+ commons-text
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ ${jackson.version}
+
+
+
+
+ com.datastax.oss
+ java-driver-core
+ ${datastax.version}
+
+
+
+
\ No newline at end of file
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java
new file mode 100644
index 0000000000000..a993a8988e1ee
--- /dev/null
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2023 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.risingwave.connector;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.risingwave.connector.api.sink.CommonSinkConfig;
+
+public class CassandraConfig extends CommonSinkConfig {
+ /** Required */
+ private String type;
+ /** Required */
+ private String url;
+
+ /** Required */
+ private String keyspace;
+
+ /** Required */
+ private String table;
+
+ /** Required */
+ private String datacenter;
+
+ @JsonProperty(value = "cassandra.username")
+ private String username;
+
+ @JsonProperty(value = "cassandra.password")
+ private String password;
+
+ @JsonCreator
+ public CassandraConfig(
+ @JsonProperty(value = "cassandra.url") String url,
+ @JsonProperty(value = "cassandra.keyspace") String keyspace,
+ @JsonProperty(value = "cassandra.table") String table,
+ @JsonProperty(value = "cassandra.datacenter") String datacenter,
+ @JsonProperty(value = "type") String type) {
+ this.url = url;
+ this.keyspace = keyspace;
+ this.table = table;
+ this.datacenter = datacenter;
+ this.type = type;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public String getKeyspace() {
+ return keyspace;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public String getDatacenter() {
+ return datacenter;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public CassandraConfig withUsername(String username) {
+ this.username = username;
+ return this;
+ }
+
+ public CassandraConfig withPassword(String password) {
+ this.password = password;
+ return this;
+ }
+}
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java
new file mode 100644
index 0000000000000..f9fb5bee020e2
--- /dev/null
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2023 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.risingwave.connector;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.CqlSessionBuilder;
+import com.datastax.oss.driver.api.core.cql.*;
+import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.risingwave.connector.api.TableSchema;
+import com.risingwave.connector.api.sink.SinkFactory;
+import com.risingwave.connector.api.sink.SinkWriter;
+import com.risingwave.connector.api.sink.SinkWriterV1;
+import com.risingwave.proto.Catalog.SinkType;
+import io.grpc.Status;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraFactory implements SinkFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraFactory.class);
+
+ public SinkWriter createWriter(TableSchema tableSchema, Map tableProperties) {
+ ObjectMapper mapper = new ObjectMapper();
+ CassandraConfig config = mapper.convertValue(tableProperties, CassandraConfig.class);
+ return new SinkWriterV1.Adapter(new CassandraSink(tableSchema, config));
+ }
+
+ @Override
+ public void validate(
+ TableSchema tableSchema, Map tableProperties, SinkType sinkType) {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true);
+ CassandraConfig config = mapper.convertValue(tableProperties, CassandraConfig.class);
+
+ // 1. check url
+ String url = config.getUrl();
+ String[] hostPort = url.split(":");
+ if (hostPort.length != 2) {
+ throw new IllegalArgumentException(
+ "Invalid cassandraURL: expected `host:port`, got " + url);
+ }
+ // 2. check connection
+ CqlSessionBuilder sessionBuilder =
+ CqlSession.builder()
+ .addContactPoint(
+ new InetSocketAddress(hostPort[0], Integer.parseInt(hostPort[1])))
+ .withKeyspace(config.getKeyspace())
+ .withLocalDatacenter(config.getDatacenter());
+ if (config.getUsername() != null && config.getPassword() != null) {
+ sessionBuilder =
+ sessionBuilder.withAuthCredentials(config.getUsername(), config.getPassword());
+ }
+ CqlSession session = sessionBuilder.build();
+
+ TableMetadata tableMetadata =
+ session.getMetadata()
+ .getKeyspace(config.getKeyspace())
+ .get()
+ .getTable(config.getTable())
+ .get();
+ CassandraUtil.checkSchema(tableSchema.getColumnDescs(), tableMetadata.getColumns());
+
+ if (session.isClosed()) {
+ throw Status.INVALID_ARGUMENT
+ .withDescription("Cannot connect to " + config.getUrl())
+ .asRuntimeException();
+ }
+ // 3. close client
+ session.close();
+ switch (sinkType) {
+ case UPSERT:
+ CassandraUtil.checkPrimaryKey(
+ tableMetadata.getPrimaryKey(), tableSchema.getPrimaryKeys());
+ break;
+ case APPEND_ONLY:
+ case FORCE_APPEND_ONLY:
+ break;
+ default:
+ throw Status.INTERNAL.asRuntimeException();
+ }
+ }
+}
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
new file mode 100644
index 0000000000000..af422dd2f5bdb
--- /dev/null
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
@@ -0,0 +1,261 @@
+/*
+ * Copyright 2023 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.risingwave.connector;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.CqlSessionBuilder;
+import com.datastax.oss.driver.api.core.cql.*;
+import com.risingwave.connector.api.TableSchema;
+import com.risingwave.connector.api.sink.SinkRow;
+import com.risingwave.connector.api.sink.SinkWriterBase;
+import com.risingwave.proto.Data;
+import com.risingwave.proto.Data.DataType.TypeName;
+import io.grpc.Status;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraSink extends SinkWriterBase {
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class);
+ private final CqlSession session;
+ private final List updateRowCache = new ArrayList<>(1);
+ private final HashMap stmtMap;
+ private final List nonKeyColumns;
+ private final BatchStatementBuilder batchBuilder;
+ private final CassandraConfig config;
+
+ public CassandraSink(TableSchema tableSchema, CassandraConfig config) {
+ super(tableSchema);
+ String url = config.getUrl();
+ String[] hostPort = url.split(":");
+ if (hostPort.length != 2) {
+ throw new IllegalArgumentException(
+ "Invalid cassandraURL: expected `host:port`, got " + url);
+ }
+ // check connection
+ CqlSessionBuilder sessionBuilder =
+ CqlSession.builder()
+ .addContactPoint(
+ new InetSocketAddress(hostPort[0], Integer.parseInt(hostPort[1])))
+ .withKeyspace(config.getKeyspace())
+ .withLocalDatacenter(config.getDatacenter());
+ if (config.getUsername() != null && config.getPassword() != null) {
+ sessionBuilder =
+ sessionBuilder.withAuthCredentials(config.getUsername(), config.getPassword());
+ }
+ this.session = sessionBuilder.build();
+ if (session.isClosed()) {
+ throw Status.INVALID_ARGUMENT
+ .withDescription("Cannot connect to " + config.getUrl())
+ .asRuntimeException();
+ }
+
+ this.config = config;
+ this.batchBuilder = BatchStatement.builder(DefaultBatchType.LOGGED);
+
+ // fetch non-pk columns for prepared statements
+ nonKeyColumns =
+ Arrays.stream(tableSchema.getColumnNames())
+ // cassandra does not allow SET on primary keys
+ .filter(c -> !tableSchema.getPrimaryKeys().contains(c))
+ .collect(Collectors.toList());
+
+ this.stmtMap = new HashMap<>();
+ // prepare statement for insert
+ this.stmtMap.put(
+ "insert", session.prepare(createInsertStatement(config.getTable(), tableSchema)));
+ if (config.getType().equals("upsert")) {
+ // prepare statement for update-insert/update-delete
+ this.stmtMap.put(
+ "update",
+ session.prepare(createUpdateStatement(config.getTable(), tableSchema)));
+
+ // prepare the delete statement
+ this.stmtMap.put(
+ "delete",
+ session.prepare(createDeleteStatement(config.getTable(), tableSchema)));
+ }
+ }
+
+ @Override
+ public void write(Iterator rows) {
+ if (this.config.getType().equals("append-only")) {
+ write_append_only(rows);
+ } else {
+ write_upsert(rows);
+ }
+ }
+
+ private void write_append_only(Iterator rows) {
+ while (rows.hasNext()) {
+ SinkRow row = rows.next();
+ Data.Op op = row.getOp();
+ switch (op) {
+ case INSERT:
+ batchBuilder.addStatement(bindInsertStatement(this.stmtMap.get("insert"), row));
+ break;
+ case UPDATE_DELETE:
+ break;
+ case UPDATE_INSERT:
+ break;
+ case DELETE:
+ break;
+ default:
+ throw Status.INTERNAL
+ .withDescription("Unknown operation: " + op)
+ .asRuntimeException();
+ }
+ }
+ }
+
+ private void write_upsert(Iterator rows) {
+ while (rows.hasNext()) {
+ SinkRow row = rows.next();
+ Data.Op op = row.getOp();
+ switch (op) {
+ case INSERT:
+ batchBuilder.addStatement(bindInsertStatement(this.stmtMap.get("insert"), row));
+ break;
+ case UPDATE_DELETE:
+ updateRowCache.clear();
+ updateRowCache.add(row);
+ break;
+ case UPDATE_INSERT:
+ SinkRow old = updateRowCache.remove(0);
+ if (old == null) {
+ throw Status.FAILED_PRECONDITION
+ .withDescription("UPDATE_INSERT without UPDATE_DELETE")
+ .asRuntimeException();
+ }
+ batchBuilder.addStatement(
+ bindUpdateInsertStatement(this.stmtMap.get("update"), old, row));
+ break;
+ case DELETE:
+ batchBuilder.addStatement(bindDeleteStatement(this.stmtMap.get("delete"), row));
+ break;
+ default:
+ throw Status.INTERNAL
+ .withDescription("Unknown operation: " + op)
+ .asRuntimeException();
+ }
+ }
+ }
+
+ @Override
+ public void sync() {
+ try {
+ session.execute(batchBuilder.build());
+ batchBuilder.clearStatements();
+ } catch (Exception e) {
+ throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException();
+ }
+ }
+
+ @Override
+ public void drop() {
+ session.close();
+ }
+
+ private String createInsertStatement(String tableName, TableSchema tableSchema) {
+ String[] columnNames = tableSchema.getColumnNames();
+ String columnNamesString = String.join(", ", columnNames);
+ String placeholdersString = String.join(", ", Collections.nCopies(columnNames.length, "?"));
+ return String.format(
+ "INSERT INTO %s (%s) VALUES (%s)",
+ tableName, columnNamesString, placeholdersString);
+ }
+
+ private String createUpdateStatement(String tableName, TableSchema tableSchema) {
+ List primaryKeys = tableSchema.getPrimaryKeys();
+ String setClause = // cassandra does not allow SET on primary keys
+ nonKeyColumns.stream()
+ .map(columnName -> columnName + " = ?")
+ .collect(Collectors.joining(", "));
+ String whereClause =
+ primaryKeys.stream()
+ .map(columnName -> columnName + " = ?")
+ .collect(Collectors.joining(" AND "));
+ return String.format("UPDATE %s SET %s WHERE %s", tableName, setClause, whereClause);
+ }
+
+ private static String createDeleteStatement(String tableName, TableSchema tableSchema) {
+ List primaryKeys = tableSchema.getPrimaryKeys();
+ String whereClause =
+ primaryKeys.stream()
+ .map(columnName -> columnName + " = ?")
+ .collect(Collectors.joining(" AND "));
+ return String.format("DELETE FROM %s WHERE %s", tableName, whereClause);
+ }
+
+ private BoundStatement bindInsertStatement(PreparedStatement stmt, SinkRow row) {
+ TableSchema schema = getTableSchema();
+ return stmt.bind(
+ IntStream.range(0, row.size())
+ .mapToObj(
+ (index) ->
+ CassandraUtil.convertRow(
+ row.get(index),
+ schema.getColumnDescs()
+ .get(index)
+ .getDataType()
+ .getTypeName()))
+ .toArray());
+ }
+
+ private BoundStatement bindDeleteStatement(PreparedStatement stmt, SinkRow row) {
+ TableSchema schema = getTableSchema();
+ Map columnDescs = schema.getColumnTypes();
+ return stmt.bind(
+ getTableSchema().getPrimaryKeys().stream()
+ .map(
+ key ->
+ CassandraUtil.convertRow(
+ schema.getFromRow(key, row), columnDescs.get(key)))
+ .toArray());
+ }
+
+ private BoundStatement bindUpdateInsertStatement(
+ PreparedStatement stmt, SinkRow updateRow, SinkRow insertRow) {
+ TableSchema schema = getTableSchema();
+ int numKeys = schema.getPrimaryKeys().size();
+ int numNonKeys = updateRow.size() - numKeys;
+ Map columnDescs = schema.getColumnTypes();
+ Object[] values = new Object[numNonKeys + numKeys];
+
+ // bind "SET" clause
+ Iterator nonKeyIter = nonKeyColumns.iterator();
+ for (int i = 0; i < numNonKeys; i++) {
+ String name = nonKeyIter.next();
+ values[i] =
+ CassandraUtil.convertRow(
+ schema.getFromRow(name, insertRow), columnDescs.get(name));
+ }
+
+ // bind "WHERE" clause
+ Iterator keyIter = schema.getPrimaryKeys().iterator();
+ for (int i = 0; i < numKeys; i++) {
+ String name = keyIter.next();
+ values[numNonKeys + i] =
+ CassandraUtil.convertRow(
+ schema.getFromRow(name, updateRow), columnDescs.get(name));
+ }
+ return stmt.bind(values);
+ }
+}
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java
new file mode 100644
index 0000000000000..fbe9c1c04a28b
--- /dev/null
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2023 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.risingwave.connector;
+
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.data.CqlDuration;
+import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
+import com.risingwave.connector.api.ColumnDesc;
+import com.risingwave.proto.Data.DataType;
+import com.risingwave.proto.Data.DataType.TypeName;
+import io.grpc.Status;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CassandraUtil {
+ private static int getCorrespondingCassandraType(DataType dataType) {
+ switch (dataType.getTypeName()) {
+ case INT16:
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.SMALLINT;
+ case INT32:
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.INT;
+ case INT64:
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.BIGINT;
+ case FLOAT:
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.FLOAT;
+ case DOUBLE:
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.DOUBLE;
+ case BOOLEAN:
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.BOOLEAN;
+ case VARCHAR:
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.VARCHAR;
+ case DECIMAL:
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.DECIMAL;
+ case TIMESTAMP:
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.TIMESTAMP;
+ case TIMESTAMPTZ:
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.TIMESTAMP;
+ case DATE:
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.DATE;
+ case TIME:
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.TIME;
+ case BYTEA:
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.BLOB;
+ case LIST:
+ case STRUCT:
+ throw Status.UNIMPLEMENTED
+ .withDescription(String.format("not support %s now", dataType))
+ .asRuntimeException();
+ case INTERVAL:
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.DURATION;
+ default:
+ throw Status.INVALID_ARGUMENT
+ .withDescription("unspecified type" + dataType)
+ .asRuntimeException();
+ }
+ }
+
+ public static void checkSchema(
+ List columnDescs,
+ Map cassandraColumnDescMap) {
+ if (columnDescs.size() != cassandraColumnDescMap.size()) {
+ throw Status.FAILED_PRECONDITION
+ .withDescription("Don't match in the number of columns in the table")
+ .asRuntimeException();
+ }
+ for (ColumnDesc columnDesc : columnDescs) {
+ CqlIdentifier cql = CqlIdentifier.fromInternal(columnDesc.getName());
+ if (!cassandraColumnDescMap.containsKey(cql)) {
+ throw Status.FAILED_PRECONDITION
+ .withDescription(
+ String.format(
+ "Don't match in the name, rw is %s cassandra can't find it",
+ columnDesc.getName()))
+ .asRuntimeException();
+ }
+ if (cassandraColumnDescMap.get(cql).getType().getProtocolCode()
+ != getCorrespondingCassandraType(columnDesc.getDataType())) {
+ throw Status.FAILED_PRECONDITION
+ .withDescription(
+ String.format(
+ "Don't match in the type, name is %s, cassandra is %s, rw is %s",
+ columnDesc.getName(),
+ cassandraColumnDescMap.get(cql),
+ columnDesc.getDataType().getTypeName()))
+ .asRuntimeException();
+ }
+ }
+ }
+
+ public static void checkPrimaryKey(
+ List cassandraColumnMetadatas, List columnMetadatas) {
+ if (cassandraColumnMetadatas.size() != columnMetadatas.size()) {
+ throw Status.FAILED_PRECONDITION
+ .withDescription("Primary key len don't match")
+ .asRuntimeException();
+ }
+ Set cassandraColumnsSet =
+ cassandraColumnMetadatas.stream()
+ .map((a) -> a.getName().toString())
+ .collect(Collectors.toSet());
+ for (String columnMetadata : columnMetadatas) {
+ if (!cassandraColumnsSet.contains(columnMetadata)) {
+ throw Status.FAILED_PRECONDITION
+ .withDescription(
+ String.format(
+ "Primary key don't match. RisingWave Primary key is %s, don't find it in cassandra",
+ columnMetadata))
+ .asRuntimeException();
+ }
+ }
+ }
+
+ public static Object convertRow(Object value, TypeName typeName) {
+ switch (typeName) {
+ case INT16:
+ case INT32:
+ case INT64:
+ case FLOAT:
+ case DOUBLE:
+ case BOOLEAN:
+ case VARCHAR:
+ case DECIMAL:
+ return value;
+ case TIMESTAMP:
+ case TIMESTAMPTZ:
+ return ((Timestamp) value).toInstant();
+ case DATE:
+ return ((Date) value).toLocalDate();
+ case TIME:
+ return ((Time) value).toLocalTime();
+ case INTERVAL:
+ return CqlDuration.from((String) value);
+ case BYTEA:
+ return ByteBuffer.wrap((byte[]) value);
+ case LIST:
+ case STRUCT:
+ throw Status.UNIMPLEMENTED
+ .withDescription(String.format("not support %s now", typeName))
+ .asRuntimeException();
+ default:
+ throw Status.INVALID_ARGUMENT
+ .withDescription("unspecified type" + typeName)
+ .asRuntimeException();
+ }
+ }
+}
diff --git a/java/connector-node/risingwave-sink-es-7/pom.xml b/java/connector-node/risingwave-sink-es-7/pom.xml
index dfeef31e83b90..04694ed98bb04 100644
--- a/java/connector-node/risingwave-sink-es-7/pom.xml
+++ b/java/connector-node/risingwave-sink-es-7/pom.xml
@@ -9,7 +9,6 @@
4.0.0
-
risingwave-sink-es-7
1.0-SNAPSHOT
risingwave-sink-es-7
diff --git a/java/pom.xml b/java/pom.xml
index 401db2aed123a..e7c790e37a79f 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -18,6 +18,7 @@
connector-node/risingwave-sink-iceberg
connector-node/risingwave-sink-deltalake
connector-node/risingwave-sink-es-7
+ connector-node/risingwave-sink-cassandra
connector-node/risingwave-sink-jdbc
connector-node/risingwave-source-cdc
connector-node/risingwave-connector-test
@@ -45,6 +46,7 @@
3.3.1
3.3.3
7.17.10
+ 4.15.0
@@ -225,6 +227,11 @@
risingwave-sink-es-7
${module.version}
+
+ com.risingwave.java
+ risingwave-sink-cassandra
+ ${module.version}
+
com.risingwave.java
risingwave-sink-jdbc
diff --git a/risedev.yml b/risedev.yml
index f17deec0f260e..35c6ae1043b91 100644
--- a/risedev.yml
+++ b/risedev.yml
@@ -20,14 +20,14 @@ profile:
# config-path: src/config/example.toml
steps:
# If you want to use the local s3 storage, enable the following line
- # - use: minio
+ - use: minio
# If you want to use aws-s3, configure AK and SK in env var and enable the following lines:
# - use: aws-s3
# bucket: test-bucket
# If you want to create CDC source table, uncomment the following line
- # - use: connector-node
+ - use: connector-node
# if you want to enable etcd backend, uncomment the following lines.
# - use: etcd
@@ -43,7 +43,7 @@ profile:
- use: frontend
# If you want to enable compactor, uncomment the following line, and enable either minio or aws-s3 as well.
- # - use: compactor
+ - use: compactor
# If you want to create source from Kafka, uncomment the following lines
# Note that kafka depends on zookeeper, so zookeeper must be started beforehand.
diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs
index dcaaeb0389d95..14537b229ee63 100644
--- a/src/connector/src/sink/remote.rs
+++ b/src/connector/src/sink/remote.rs
@@ -49,8 +49,13 @@ use crate::sink::{
};
use crate::ConnectorParams;
-pub const VALID_REMOTE_SINKS: [&str; 4] =
- ["jdbc", REMOTE_ICEBERG_SINK, "deltalake", "elasticsearch-7"];
+pub const VALID_REMOTE_SINKS: [&str; 5] = [
+ "jdbc",
+ REMOTE_ICEBERG_SINK,
+ "deltalake",
+ "elasticsearch-7",
+ "cassandra",
+];
pub fn is_valid_remote_sink(connector_type: &str) -> bool {
VALID_REMOTE_SINKS.contains(&connector_type)