From 0644ea33db6e14dacf6921c1e29fa21114ec4886 Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Tue, 22 Aug 2023 12:33:41 +0800
Subject: [PATCH 1/8] support cassandra
---
java/connector-node/assembly/assembly.xml | 1 +
java/connector-node/assembly/pom.xml | 4 +
.../risingwave-connector-service/pom.xml | 5 +
.../com/risingwave/connector/SinkUtils.java | 2 +
.../risingwave-connector-test/pom.xml | 5 +
.../risingwave-sink-cassandra/pom.xml | 60 +++++
.../risingwave/connector/CassandraConfig.java | 95 +++++++
.../connector/CassandraFactory.java | 65 +++++
.../risingwave/connector/CassandraSink.java | 237 ++++++++++++++++++
.../risingwave-sink-es-7/pom.xml | 1 -
java/pom.xml | 7 +
risedev.yml | 2 +-
src/connector/src/sink/remote.rs | 2 +-
13 files changed, 483 insertions(+), 3 deletions(-)
create mode 100644 java/connector-node/risingwave-sink-cassandra/pom.xml
create mode 100644 java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java
create mode 100644 java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java
create mode 100644 java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
diff --git a/java/connector-node/assembly/assembly.xml b/java/connector-node/assembly/assembly.xml
index 327c7628dbb59..751ba2f410e0b 100644
--- a/java/connector-node/assembly/assembly.xml
+++ b/java/connector-node/assembly/assembly.xml
@@ -40,6 +40,7 @@
*:risingwave-sink-es-7
+ *:risingwave-sink-cassandra
*:risingwave-sink-jdbc
*:risingwave-sink-iceberg
*:risingwave-sink-deltalake
diff --git a/java/connector-node/assembly/pom.xml b/java/connector-node/assembly/pom.xml
index 5c518601bc8e7..6812bac5b63e6 100644
--- a/java/connector-node/assembly/pom.xml
+++ b/java/connector-node/assembly/pom.xml
@@ -33,6 +33,10 @@
com.risingwave.java
risingwave-sink-es-7
+
+ com.risingwave.java
+ risingwave-sink-cassandra
+
com.risingwave.java
risingwave-sink-jdbc
diff --git a/java/connector-node/risingwave-connector-service/pom.xml b/java/connector-node/risingwave-connector-service/pom.xml
index fccd84d24b9af..4e2dbe1d6ec96 100644
--- a/java/connector-node/risingwave-connector-service/pom.xml
+++ b/java/connector-node/risingwave-connector-service/pom.xml
@@ -99,5 +99,10 @@
risingwave-sink-es-7
provided
+
+ com.risingwave.java
+ risingwave-sink-cassandra
+ provided
+
diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java
index b48cff9bacdd4..abc2be7bcbaab 100644
--- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java
+++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java
@@ -43,6 +43,8 @@ public static SinkFactory getSinkFactory(String sinkName) {
return new DeltaLakeSinkFactory();
case "elasticsearch-7":
return new EsSink7Factory();
+ case "cassandra":
+ return new CassandraFactory();
default:
throw UNIMPLEMENTED
.withDescription("unknown sink type: " + sinkName)
diff --git a/java/connector-node/risingwave-connector-test/pom.xml b/java/connector-node/risingwave-connector-test/pom.xml
index d630eb183c856..7f8f6f1bc49cc 100644
--- a/java/connector-node/risingwave-connector-test/pom.xml
+++ b/java/connector-node/risingwave-connector-test/pom.xml
@@ -174,5 +174,10 @@
risingwave-sink-es-7
test
+
+ com.risingwave.java
+ risingwave-sink-cassandra
+ test
+
diff --git a/java/connector-node/risingwave-sink-cassandra/pom.xml b/java/connector-node/risingwave-sink-cassandra/pom.xml
new file mode 100644
index 0000000000000..e51faa9691cf9
--- /dev/null
+++ b/java/connector-node/risingwave-sink-cassandra/pom.xml
@@ -0,0 +1,60 @@
+
+
+
+ java-parent
+ com.risingwave.java
+ 1.0-SNAPSHOT
+ ../../pom.xml
+
+ 4.0.0
+
+ risingwave-sink-cassandra
+ 1.0-SNAPSHOT
+ risingwave-sink-cassandra
+
+
+
+ com.risingwave.java
+ proto
+
+
+ com.risingwave.java
+ connector-api
+
+
+ org.apache.logging.log4j
+ log4j-api
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+ org.apache.logging.log4j
+ log4j-core
+
+
+ org.apache.commons
+ commons-text
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ ${jackson.version}
+
+
+
+
+ com.datastax.oss
+ java-driver-core
+ ${datastax.version}
+
+
+
+
\ No newline at end of file
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java
new file mode 100644
index 0000000000000..5b946d43a8015
--- /dev/null
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2023 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.risingwave.connector;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.risingwave.connector.api.sink.CommonSinkConfig;
+
+public class CassandraConfig extends CommonSinkConfig {
+ /** Required */
+ private String type;
+ /** Required */
+ private String url;
+
+ /** Required */
+ private String keyspace;
+
+ /** Required */
+ private String table;
+
+ /** Required */
+ private String datacenter;
+
+ @JsonProperty(value = "username")
+ private String username;
+
+ @JsonProperty(value = "password")
+ private String password;
+
+ @JsonCreator
+ public CassandraConfig(
+ @JsonProperty(value = "url") String url,
+ @JsonProperty(value = "keyspace") String keyspace,
+ @JsonProperty(value = "table") String table,
+ @JsonProperty(value = "datacenter") String datacenter,
+ @JsonProperty(value = "type") String type) {
+ this.url = url;
+ this.keyspace = keyspace;
+ this.table = table;
+ this.datacenter = datacenter;
+ this.type = type;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public String getKeyspace() {
+ return keyspace;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public String getDatacenter() {
+ return datacenter;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public CassandraConfig withUsername(String username) {
+ this.username = username;
+ return this;
+ }
+
+ public CassandraConfig withPassword(String password) {
+ this.password = password;
+ return this;
+ }
+}
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java
new file mode 100644
index 0000000000000..ca52c3e40cd59
--- /dev/null
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java
@@ -0,0 +1,65 @@
+package com.risingwave.connector;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.CqlSessionBuilder;
+import com.datastax.oss.driver.api.core.cql.*;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.risingwave.connector.api.TableSchema;
+import com.risingwave.connector.api.sink.SinkFactory;
+import com.risingwave.connector.api.sink.SinkWriter;
+import com.risingwave.connector.api.sink.SinkWriterV1;
+import com.risingwave.proto.Catalog.SinkType;
+import io.grpc.Status;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraFactory implements SinkFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraFactory.class);
+
+ public SinkWriter createWriter(TableSchema tableSchema, Map tableProperties) {
+ ObjectMapper mapper = new ObjectMapper();
+ CassandraConfig config = mapper.convertValue(tableProperties, CassandraConfig.class);
+ return new SinkWriterV1.Adapter(new CassandraSink(tableSchema, config));
+ }
+
+ @Override
+ public void validate(
+ TableSchema tableSchema, Map tableProperties, SinkType sinkType) {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true);
+ CassandraConfig config = mapper.convertValue(tableProperties, CassandraConfig.class);
+
+ // 1. check url
+ String url = config.getUrl();
+ String[] hostPort = url.split(":");
+ if (hostPort.length != 2) {
+ throw new IllegalArgumentException(
+ "Invalid cassandraURL: expected `host:port`, got " + url);
+ }
+ for (String s : hostPort) {
+ System.out.println(s);
+ }
+ // 2. check connection
+ CqlSessionBuilder sessionBuilder =
+ CqlSession.builder()
+ .addContactPoint(
+ new InetSocketAddress(hostPort[0], Integer.parseInt(hostPort[1])))
+ .withKeyspace(config.getKeyspace())
+ .withLocalDatacenter(config.getDatacenter());
+ if (config.getUsername() != null && config.getPassword() != null) {
+ sessionBuilder =
+ sessionBuilder.withAuthCredentials(config.getUsername(), config.getPassword());
+ }
+ CqlSession session = sessionBuilder.build();
+ if (session.isClosed()) {
+ throw Status.INVALID_ARGUMENT
+ .withDescription("Cannot connect to " + config.getUrl())
+ .asRuntimeException();
+ }
+ // 3. close client
+ session.close();
+ }
+}
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
new file mode 100644
index 0000000000000..2034a3a95d287
--- /dev/null
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
@@ -0,0 +1,237 @@
+// Copyright 2023 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.risingwave.connector;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.CqlSessionBuilder;
+import com.datastax.oss.driver.api.core.cql.*;
+import com.risingwave.connector.api.TableSchema;
+import com.risingwave.connector.api.sink.SinkRow;
+import com.risingwave.connector.api.sink.SinkWriterBase;
+import com.risingwave.proto.Data;
+import io.grpc.Status;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraSink extends SinkWriterBase {
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class);
+ private final CqlSession session;
+ private final List updateRowCache = new ArrayList<>(1);
+ private final PreparedStatement insertStmt;
+ private final PreparedStatement updateStmt;
+ private final PreparedStatement deleteStmt;
+ private final List nonKeyColumns;
+ private final BatchStatementBuilder batchBuilder;
+ private final CassandraConfig config;
+ private final List primaryKeyIndexes;
+
+ public CassandraSink(TableSchema tableSchema, CassandraConfig config) {
+ super(tableSchema);
+ String url = config.getUrl();
+ String[] hostPort = url.split(":");
+ if (hostPort.length != 2) {
+ throw new IllegalArgumentException(
+ "Invalid cassandraURL: expected `host:port`, got " + url);
+ }
+ // 2. check connection
+ CqlSessionBuilder sessionBuilder =
+ CqlSession.builder()
+ .addContactPoint(
+ new InetSocketAddress(hostPort[0], Integer.parseInt(hostPort[1])))
+ .withKeyspace(config.getKeyspace())
+ .withLocalDatacenter(config.getDatacenter());
+ if (config.getUsername() != null && config.getPassword() != null) {
+ sessionBuilder =
+ sessionBuilder.withAuthCredentials(config.getUsername(), config.getPassword());
+ }
+ this.session = sessionBuilder.build();
+ if (session.isClosed()) {
+ throw Status.INVALID_ARGUMENT
+ .withDescription("Cannot connect to " + config.getUrl())
+ .asRuntimeException();
+ }
+
+ this.config = config;
+
+ primaryKeyIndexes = new ArrayList();
+ for (String primaryKey : tableSchema.getPrimaryKeys()) {
+ primaryKeyIndexes.add(tableSchema.getColumnIndex(primaryKey));
+ }
+
+ this.batchBuilder =
+ BatchStatement.builder(DefaultBatchType.LOGGED).setKeyspace(config.getKeyspace());
+
+ // fetch non-pk columns for prepared statements
+ nonKeyColumns =
+ Arrays.stream(tableSchema.getColumnNames())
+ // cassandra does not allow SET on primary keys
+ .filter(c -> !tableSchema.getPrimaryKeys().contains(c))
+ .collect(Collectors.toList());
+
+ // prepare statement for insert
+ this.insertStmt = session.prepare(createInsertStatement(config.getTable(), tableSchema));
+
+ // prepare statement for update-insert/update-delete
+ this.updateStmt = session.prepare(createUpdateStatement(config.getTable(), tableSchema));
+
+ // prepare the delete statement
+ this.deleteStmt = session.prepare(createDeleteStatement(config.getTable(), tableSchema));
+ }
+
+ @Override
+ public void write(Iterator rows) {
+ if (this.config.getType() == "append-only") {
+ write_apend_only(rows);
+ } else {
+ write_upsert(rows);
+ }
+ }
+
+ private void write_apend_only(Iterator rows) {
+ while (rows.hasNext()) {
+ SinkRow row = rows.next();
+ Data.Op op = row.getOp();
+ switch (op) {
+ case INSERT:
+ batchBuilder.addStatement(bindInsertStatement(insertStmt, row));
+ break;
+ case UPDATE_DELETE:
+ break;
+ case UPDATE_INSERT:
+ break;
+ case DELETE:
+ break;
+ default:
+ throw Status.INTERNAL
+ .withDescription("Unknown operation: " + op)
+ .asRuntimeException();
+ }
+ }
+ }
+
+ private void write_upsert(Iterator rows) {
+ while (rows.hasNext()) {
+ SinkRow row = rows.next();
+ Data.Op op = row.getOp();
+ switch (op) {
+ case INSERT:
+ batchBuilder.addStatement(bindInsertStatement(insertStmt, row));
+ break;
+ case UPDATE_DELETE:
+ updateRowCache.clear();
+ updateRowCache.add(row);
+ break;
+ case UPDATE_INSERT:
+ SinkRow old = updateRowCache.remove(0);
+ if (old == null) {
+ throw Status.FAILED_PRECONDITION
+ .withDescription("UPDATE_INSERT without UPDATE_DELETE")
+ .asRuntimeException();
+ }
+ batchBuilder.addStatement(bindUpdateInsertStatement(updateStmt, old, row));
+ break;
+ case DELETE:
+ batchBuilder.addStatement(bindDeleteStatement(deleteStmt, row));
+ break;
+ default:
+ throw Status.INTERNAL
+ .withDescription("Unknown operation: " + op)
+ .asRuntimeException();
+ }
+ }
+ }
+
+ @Override
+ public void sync() {
+ try {
+ session.execute(batchBuilder.build());
+ batchBuilder.clearStatements();
+ } catch (Exception e) {
+ throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException();
+ }
+ }
+
+ @Override
+ public void drop() {
+ session.close();
+ }
+
+ private String createInsertStatement(String tableName, TableSchema tableSchema) {
+ String[] columnNames = tableSchema.getColumnNames();
+ String columnNamesString = String.join(", ", columnNames);
+ String placeholdersString = String.join(", ", Collections.nCopies(columnNames.length, "?"));
+ return String.format(
+ "INSERT INTO %s (%s) VALUES (%s)",
+ tableName, columnNamesString, placeholdersString);
+ }
+
+ private String createUpdateStatement(String tableName, TableSchema tableSchema) {
+ List primaryKeys = tableSchema.getPrimaryKeys();
+ String setClause = // cassandra does not allow SET on primary keys
+ nonKeyColumns.stream()
+ .map(columnName -> columnName + " = ?")
+ .collect(Collectors.joining(", "));
+ String whereClause =
+ primaryKeys.stream()
+ .map(columnName -> columnName + " = ?")
+ .collect(Collectors.joining(" AND "));
+ return String.format("UPDATE %s SET %s WHERE %s", tableName, setClause, whereClause);
+ }
+
+ private static String createDeleteStatement(String tableName, TableSchema tableSchema) {
+ List primaryKeys = tableSchema.getPrimaryKeys();
+ String whereClause =
+ primaryKeys.stream()
+ .map(columnName -> columnName + " = ?")
+ .collect(Collectors.joining(" AND "));
+ return String.format("DELETE FROM %s WHERE %s", tableName, whereClause);
+ }
+
+ private BoundStatement bindInsertStatement(PreparedStatement stmt, SinkRow row) {
+ return stmt.bind(IntStream.range(0, row.size()).mapToObj(row::get).toArray());
+ }
+
+ private BoundStatement bindDeleteStatement(PreparedStatement stmt, SinkRow row) {
+ return stmt.bind(
+ getTableSchema().getPrimaryKeys().stream()
+ .map(key -> getTableSchema().getFromRow(key, row))
+ .toArray());
+ }
+
+ private BoundStatement bindUpdateInsertStatement(
+ PreparedStatement stmt, SinkRow updateRow, SinkRow insertRow) {
+ TableSchema schema = getTableSchema();
+ int numKeys = schema.getPrimaryKeys().size();
+ int numNonKeys = updateRow.size() - numKeys;
+ Object[] values = new Object[numNonKeys + numKeys];
+
+ // bind "SET" clause
+ Iterator nonKeyIter = nonKeyColumns.iterator();
+ for (int i = 0; i < numNonKeys; i++) {
+ values[i] = schema.getFromRow(nonKeyIter.next(), insertRow);
+ }
+
+ // bind "WHERE" clause
+ Iterator keyIter = schema.getPrimaryKeys().iterator();
+ for (int i = 0; i < numKeys; i++) {
+ values[numNonKeys + i] = schema.getFromRow(keyIter.next(), updateRow);
+ }
+ return stmt.bind(values);
+ }
+}
diff --git a/java/connector-node/risingwave-sink-es-7/pom.xml b/java/connector-node/risingwave-sink-es-7/pom.xml
index dfeef31e83b90..04694ed98bb04 100644
--- a/java/connector-node/risingwave-sink-es-7/pom.xml
+++ b/java/connector-node/risingwave-sink-es-7/pom.xml
@@ -9,7 +9,6 @@
4.0.0
-
risingwave-sink-es-7
1.0-SNAPSHOT
risingwave-sink-es-7
diff --git a/java/pom.xml b/java/pom.xml
index 401db2aed123a..e7c790e37a79f 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -18,6 +18,7 @@
connector-node/risingwave-sink-iceberg
connector-node/risingwave-sink-deltalake
connector-node/risingwave-sink-es-7
+ connector-node/risingwave-sink-cassandra
connector-node/risingwave-sink-jdbc
connector-node/risingwave-source-cdc
connector-node/risingwave-connector-test
@@ -45,6 +46,7 @@
3.3.1
3.3.3
7.17.10
+ 4.15.0
@@ -225,6 +227,11 @@
risingwave-sink-es-7
${module.version}
+
+ com.risingwave.java
+ risingwave-sink-cassandra
+ ${module.version}
+
com.risingwave.java
risingwave-sink-jdbc
diff --git a/risedev.yml b/risedev.yml
index 6bcaef69be027..50e4dcefaa433 100644
--- a/risedev.yml
+++ b/risedev.yml
@@ -27,7 +27,7 @@ profile:
# bucket: test-bucket
# If you want to create CDC source table, uncomment the following line
- # - use: connector-node
+ - use: connector-node
# if you want to enable etcd backend, uncomment the following lines.
# - use: etcd
diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs
index dfffb2c87b0ce..633f34d09c9eb 100644
--- a/src/connector/src/sink/remote.rs
+++ b/src/connector/src/sink/remote.rs
@@ -48,7 +48,7 @@ use crate::sink::{
};
use crate::ConnectorParams;
-pub const VALID_REMOTE_SINKS: [&str; 4] = ["jdbc", "iceberg", "deltalake", "elasticsearch-7"];
+pub const VALID_REMOTE_SINKS: [&str; 5] = ["jdbc", "iceberg", "deltalake", "elasticsearch-7","cassandra"];
pub fn is_valid_remote_sink(connector_type: &str) -> bool {
VALID_REMOTE_SINKS.contains(&connector_type)
From 6117f25657a1e30e2f34c8cae62886bdf354f9a9 Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Thu, 24 Aug 2023 17:31:41 +0800
Subject: [PATCH 2/8] support cassandra sink
---
integration_tests/cassandra-sink/README.md | 35 +++++
.../cassandra-sql/cassandra-query.sql | 1 +
.../cassandra-sql/create_cassandra_table.sql | 7 +
.../cassandra-sql/run-sql-file.sh | 3 +
.../cassandra-sink/create_mv.sql | 7 +
.../cassandra-sink/create_sink.sql | 11 ++
.../cassandra-sink/create_source.sql | 21 +++
.../cassandra-sink/docker-compose.yml | 71 ++++++++++
.../risingwave/connector/CassandraConfig.java | 12 +-
.../connector/CassandraFactory.java | 33 ++++-
.../risingwave/connector/CassandraSink.java | 74 +++++++----
.../risingwave/connector/CassandraUtil.java | 121 ++++++++++++++++++
src/connector/src/sink/remote.rs | 8 +-
13 files changed, 369 insertions(+), 35 deletions(-)
create mode 100644 integration_tests/cassandra-sink/README.md
create mode 100644 integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql
create mode 100644 integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql
create mode 100644 integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh
create mode 100644 integration_tests/cassandra-sink/create_mv.sql
create mode 100644 integration_tests/cassandra-sink/create_sink.sql
create mode 100644 integration_tests/cassandra-sink/create_source.sql
create mode 100644 integration_tests/cassandra-sink/docker-compose.yml
create mode 100644 java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java
diff --git a/integration_tests/cassandra-sink/README.md b/integration_tests/cassandra-sink/README.md
new file mode 100644
index 0000000000000..6a76914bcbf30
--- /dev/null
+++ b/integration_tests/cassandra-sink/README.md
@@ -0,0 +1,35 @@
+# Demo: Sinking to Cassandra
+
+In this demo, we want to showcase how RisingWave is able to sink data to Cassandra.
+
+1. Launch the cluster:
+
+```sh
+docker compose up -d
+```
+
+The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a clichouse for sink.
+
+
+2. Create the Cassandra table:
+
+```sh
+docker compose exec Cassandra bash /var/lib/cassandra-sink/run-sql-file.sh create_cassandra_table
+```
+
+3. Execute the SQL queries in sequence:
+
+- create_source.sql
+- create_mv.sql
+- create_sink.sql
+
+4. Execute a simple query:
+
+```sh
+docker compose exec Cassandra bash /var/lib/cassandra-sink/run-sql-file.sh cassandra_query
+
+```
+
+```sql
+select user_id, count(*) from default.demo_test group by user_id
+```
diff --git a/integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql b/integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql
new file mode 100644
index 0000000000000..004d9fdd1c00e
--- /dev/null
+++ b/integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql
@@ -0,0 +1 @@
+select user_id, count(*) from my_keyspace.demo_test group by user_id
diff --git a/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql b/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql
new file mode 100644
index 0000000000000..058fd57c80a63
--- /dev/null
+++ b/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql
@@ -0,0 +1,7 @@
+CREATE KEYSPACE my_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+use my_keyspace;
+CREATE table demo_test(
+ user_id text primary key,
+ target_id text,
+ event_timestamp timestamp,
+);
\ No newline at end of file
diff --git a/integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh b/integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh
new file mode 100644
index 0000000000000..1061ecd65bfa0
--- /dev/null
+++ b/integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh
@@ -0,0 +1,3 @@
+set -ex
+
+cqlsh < /var/lib/cassandra-sink/$1.sql
\ No newline at end of file
diff --git a/integration_tests/cassandra-sink/create_mv.sql b/integration_tests/cassandra-sink/create_mv.sql
new file mode 100644
index 0000000000000..0a803f8a2762d
--- /dev/null
+++ b/integration_tests/cassandra-sink/create_mv.sql
@@ -0,0 +1,7 @@
+CREATE MATERIALIZED VIEW bhv_mv AS
+SELECT
+ user_id,
+ target_id,
+ event_timestamp
+FROM
+ user_behaviors;
\ No newline at end of file
diff --git a/integration_tests/cassandra-sink/create_sink.sql b/integration_tests/cassandra-sink/create_sink.sql
new file mode 100644
index 0000000000000..8fbc7c7e63905
--- /dev/null
+++ b/integration_tests/cassandra-sink/create_sink.sql
@@ -0,0 +1,11 @@
+CREATE SINK bhv_iceberg_sink
+FROM
+ bhv_mv WITH (
+ connector = 'cassandra',
+ type = 'append-only',
+ force_append_only='true',
+ cassandra.url = 'http://127.0.0.1:9042',
+ cassandra.keyspace = 'mykeyspace',
+ cassandra.table = 'demo_test',
+ cassandra.datacenter = 'datacenter1',
+)
\ No newline at end of file
diff --git a/integration_tests/cassandra-sink/create_source.sql b/integration_tests/cassandra-sink/create_source.sql
new file mode 100644
index 0000000000000..e704eb785ec06
--- /dev/null
+++ b/integration_tests/cassandra-sink/create_source.sql
@@ -0,0 +1,21 @@
+CREATE table user_behaviors (
+ user_id VARCHAR,
+ target_id VARCHAR,
+ target_type VARCHAR,
+ event_timestamp TIMESTAMP,
+ behavior_type VARCHAR,
+ parent_target_type VARCHAR,
+ parent_target_id VARCHAR,
+ PRIMARY KEY(user_id)
+) WITH (
+ connector = 'datagen',
+ fields.seq_id.kind = 'sequence',
+ fields.seq_id.start = '1',
+ fields.seq_id.end = '10000000',
+ fields.user_id.kind = 'random',
+ fields.user_id.min = '1',
+ fields.user_id.max = '10000000',
+ fields.user_name.kind = 'random',
+ fields.user_name.length = '10',
+ datagen.rows.per.second = '20000'
+) FORMAT PLAIN ENCODE JSON;
\ No newline at end of file
diff --git a/integration_tests/cassandra-sink/docker-compose.yml b/integration_tests/cassandra-sink/docker-compose.yml
new file mode 100644
index 0000000000000..db9b761502c3e
--- /dev/null
+++ b/integration_tests/cassandra-sink/docker-compose.yml
@@ -0,0 +1,71 @@
+---
+version: "3"
+services:
+ services:
+ cassandra:
+ image: cassandra:4.0
+ ports:
+ - 9042:9042
+ volumes:
+ - ~/cassandra-sink:/var/lib/cassandra-sink
+ environment:
+ - CASSANDRA_CLUSTER_NAME=cloudinfra
+ compactor-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: compactor-0
+ compute-node-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: compute-node-0
+ etcd-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: etcd-0
+ frontend-node-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: frontend-node-0
+ grafana-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: grafana-0
+ meta-node-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: meta-node-0
+ minio-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: minio-0
+ prometheus-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: prometheus-0
+ message_queue:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: message_queue
+ datagen:
+ build: ../datagen
+ depends_on: [message_queue]
+ command:
+ - /bin/sh
+ - -c
+ - /datagen --mode clickstream --qps 2 kafka --brokers message_queue:29092
+ restart: always
+ container_name: datagen
+volumes:
+ compute-node-0:
+ external: false
+ etcd-0:
+ external: false
+ grafana-0:
+ external: false
+ minio-0:
+ external: false
+ prometheus-0:
+ external: false
+ message_queue:
+ external: false
+name: risingwave-compose
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java
index 5b946d43a8015..a993a8988e1ee 100644
--- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java
@@ -35,18 +35,18 @@ public class CassandraConfig extends CommonSinkConfig {
/** Required */
private String datacenter;
- @JsonProperty(value = "username")
+ @JsonProperty(value = "cassandra.username")
private String username;
- @JsonProperty(value = "password")
+ @JsonProperty(value = "cassandra.password")
private String password;
@JsonCreator
public CassandraConfig(
- @JsonProperty(value = "url") String url,
- @JsonProperty(value = "keyspace") String keyspace,
- @JsonProperty(value = "table") String table,
- @JsonProperty(value = "datacenter") String datacenter,
+ @JsonProperty(value = "cassandra.url") String url,
+ @JsonProperty(value = "cassandra.keyspace") String keyspace,
+ @JsonProperty(value = "cassandra.table") String table,
+ @JsonProperty(value = "cassandra.datacenter") String datacenter,
@JsonProperty(value = "type") String type) {
this.url = url;
this.keyspace = keyspace;
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java
index ca52c3e40cd59..467ff41a8941a 100644
--- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java
@@ -5,6 +5,7 @@
import com.datastax.oss.driver.api.core.cql.*;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.risingwave.connector.api.ColumnDesc;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkFactory;
import com.risingwave.connector.api.sink.SinkWriter;
@@ -12,6 +13,8 @@
import com.risingwave.proto.Catalog.SinkType;
import io.grpc.Status;
import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,9 +42,6 @@ public void validate(
throw new IllegalArgumentException(
"Invalid cassandraURL: expected `host:port`, got " + url);
}
- for (String s : hostPort) {
- System.out.println(s);
- }
// 2. check connection
CqlSessionBuilder sessionBuilder =
CqlSession.builder()
@@ -54,6 +54,19 @@ public void validate(
sessionBuilder.withAuthCredentials(config.getUsername(), config.getPassword());
}
CqlSession session = sessionBuilder.build();
+
+ String cql =
+ String.format(
+ "SELECT column_name , type FROM system_schema.columns WHERE keyspace_name = '%s' AND table_name = '%s';",
+ config.getKeyspace(), config.getTable());
+
+ HashMap cassandraColumnDescMap = new HashMap<>();
+ for (Row i : session.execute(cql)) {
+ cassandraColumnDescMap.put(i.getString(0), i.getString(1));
+ }
+ List columnDescs = tableSchema.getColumnDescs();
+ CassandraUtil.checkSchema(columnDescs, cassandraColumnDescMap);
+
if (session.isClosed()) {
throw Status.INVALID_ARGUMENT
.withDescription("Cannot connect to " + config.getUrl())
@@ -61,5 +74,19 @@ public void validate(
}
// 3. close client
session.close();
+ switch (sinkType) {
+ case UPSERT:
+ if (tableSchema.getPrimaryKeys().isEmpty()) {
+ throw Status.INVALID_ARGUMENT
+ .withDescription("please define primary key for upsert cassandra sink")
+ .asRuntimeException();
+ }
+ break;
+ case APPEND_ONLY:
+ case FORCE_APPEND_ONLY:
+ break;
+ default:
+ throw Status.INTERNAL.asRuntimeException();
+ }
}
}
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
index 2034a3a95d287..fb33debccc782 100644
--- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
@@ -21,6 +21,7 @@
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.connector.api.sink.SinkWriterBase;
import com.risingwave.proto.Data;
+import com.risingwave.proto.Data.DataType.TypeName;
import io.grpc.Status;
import java.net.InetSocketAddress;
import java.util.*;
@@ -33,13 +34,10 @@ public class CassandraSink extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class);
private final CqlSession session;
private final List updateRowCache = new ArrayList<>(1);
- private final PreparedStatement insertStmt;
- private final PreparedStatement updateStmt;
- private final PreparedStatement deleteStmt;
+ private final HashMap stmtMap;
private final List nonKeyColumns;
private final BatchStatementBuilder batchBuilder;
private final CassandraConfig config;
- private final List primaryKeyIndexes;
public CassandraSink(TableSchema tableSchema, CassandraConfig config) {
super(tableSchema);
@@ -68,12 +66,6 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) {
}
this.config = config;
-
- primaryKeyIndexes = new ArrayList();
- for (String primaryKey : tableSchema.getPrimaryKeys()) {
- primaryKeyIndexes.add(tableSchema.getColumnIndex(primaryKey));
- }
-
this.batchBuilder =
BatchStatement.builder(DefaultBatchType.LOGGED).setKeyspace(config.getKeyspace());
@@ -84,19 +76,27 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) {
.filter(c -> !tableSchema.getPrimaryKeys().contains(c))
.collect(Collectors.toList());
+ this.stmtMap = new HashMap<>();
// prepare statement for insert
- this.insertStmt = session.prepare(createInsertStatement(config.getTable(), tableSchema));
+ this.stmtMap.put(
+ "insert", session.prepare(createInsertStatement(config.getTable(), tableSchema)));
+ if (config.getType().equals("upsert")) {
+ // prepare statement for update-insert/update-delete
+ this.stmtMap.put(
+ "update",
+ session.prepare(createUpdateStatement(config.getTable(), tableSchema)));
- // prepare statement for update-insert/update-delete
- this.updateStmt = session.prepare(createUpdateStatement(config.getTable(), tableSchema));
-
- // prepare the delete statement
- this.deleteStmt = session.prepare(createDeleteStatement(config.getTable(), tableSchema));
+ // prepare the delete statement
+ this.stmtMap.put(
+ "delete",
+ session.prepare(createDeleteStatement(config.getTable(), tableSchema)));
+ }
}
@Override
public void write(Iterator rows) {
- if (this.config.getType() == "append-only") {
+ System.out.println(this.config.getType());
+ if (this.config.getType().equals("append-only")) {
write_apend_only(rows);
} else {
write_upsert(rows);
@@ -109,7 +109,7 @@ private void write_apend_only(Iterator rows) {
Data.Op op = row.getOp();
switch (op) {
case INSERT:
- batchBuilder.addStatement(bindInsertStatement(insertStmt, row));
+ batchBuilder.addStatement(bindInsertStatement(this.stmtMap.get("insert"), row));
break;
case UPDATE_DELETE:
break;
@@ -131,7 +131,7 @@ private void write_upsert(Iterator rows) {
Data.Op op = row.getOp();
switch (op) {
case INSERT:
- batchBuilder.addStatement(bindInsertStatement(insertStmt, row));
+ batchBuilder.addStatement(bindInsertStatement(this.stmtMap.get("insert"), row));
break;
case UPDATE_DELETE:
updateRowCache.clear();
@@ -144,10 +144,11 @@ private void write_upsert(Iterator rows) {
.withDescription("UPDATE_INSERT without UPDATE_DELETE")
.asRuntimeException();
}
- batchBuilder.addStatement(bindUpdateInsertStatement(updateStmt, old, row));
+ batchBuilder.addStatement(
+ bindUpdateInsertStatement(this.stmtMap.get("update"), old, row));
break;
case DELETE:
- batchBuilder.addStatement(bindDeleteStatement(deleteStmt, row));
+ batchBuilder.addStatement(bindDeleteStatement(this.stmtMap.get("delete"), row));
break;
default:
throw Status.INTERNAL
@@ -204,13 +205,29 @@ private static String createDeleteStatement(String tableName, TableSchema tableS
}
private BoundStatement bindInsertStatement(PreparedStatement stmt, SinkRow row) {
- return stmt.bind(IntStream.range(0, row.size()).mapToObj(row::get).toArray());
+ TableSchema schema = getTableSchema();
+ return stmt.bind(
+ IntStream.range(0, row.size())
+ .mapToObj(
+ (index) ->
+ CassandraUtil.convertRow(
+ row.get(index),
+ schema.getColumnDescs()
+ .get(index)
+ .getDataType()
+ .getTypeName()))
+ .toArray());
}
private BoundStatement bindDeleteStatement(PreparedStatement stmt, SinkRow row) {
+ TableSchema schema = getTableSchema();
+ Map columnDescs = schema.getColumnTypes();
return stmt.bind(
getTableSchema().getPrimaryKeys().stream()
- .map(key -> getTableSchema().getFromRow(key, row))
+ .map(
+ key ->
+ CassandraUtil.convertRow(
+ schema.getFromRow(key, row), columnDescs.get(key)))
.toArray());
}
@@ -219,18 +236,25 @@ private BoundStatement bindUpdateInsertStatement(
TableSchema schema = getTableSchema();
int numKeys = schema.getPrimaryKeys().size();
int numNonKeys = updateRow.size() - numKeys;
+ Map columnDescs = schema.getColumnTypes();
Object[] values = new Object[numNonKeys + numKeys];
// bind "SET" clause
Iterator nonKeyIter = nonKeyColumns.iterator();
for (int i = 0; i < numNonKeys; i++) {
- values[i] = schema.getFromRow(nonKeyIter.next(), insertRow);
+ String name = nonKeyIter.next();
+ values[i] =
+ CassandraUtil.convertRow(
+ schema.getFromRow(name, insertRow), columnDescs.get(name));
}
// bind "WHERE" clause
Iterator keyIter = schema.getPrimaryKeys().iterator();
for (int i = 0; i < numKeys; i++) {
- values[numNonKeys + i] = schema.getFromRow(keyIter.next(), updateRow);
+ String name = keyIter.next();
+ values[numNonKeys + i] =
+ CassandraUtil.convertRow(
+ schema.getFromRow(name, updateRow), columnDescs.get(name));
}
return stmt.bind(values);
}
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java
new file mode 100644
index 0000000000000..65f47855cead9
--- /dev/null
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java
@@ -0,0 +1,121 @@
+package com.risingwave.connector;
+
+import com.datastax.oss.driver.api.core.data.CqlDuration;
+import com.risingwave.connector.api.ColumnDesc;
+import com.risingwave.proto.Data.DataType;
+import com.risingwave.proto.Data.DataType.TypeName;
+import io.grpc.Status;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+
+public class CassandraUtil {
+ private static String getCorrespondingCassandraType(DataType dataType) {
+ switch (dataType.getTypeName()) {
+ case INT16:
+ return "smallint";
+ case INT32:
+ return "int";
+ case INT64:
+ return "bigint";
+ case FLOAT:
+ return "float";
+ case DOUBLE:
+ return "double";
+ case BOOLEAN:
+ return "boolean";
+ case VARCHAR:
+ return "text";
+ case DECIMAL:
+ return "decimal";
+ case TIMESTAMP:
+ return "timestamp";
+ case TIMESTAMPTZ:
+ return "timestamp";
+ case DATE:
+ return "date";
+ case TIME:
+ return "time";
+ case BYTEA:
+ return "blob";
+ case LIST:
+ case STRUCT:
+ throw Status.UNIMPLEMENTED
+ .withDescription(String.format("not support %s now", dataType))
+ .asRuntimeException();
+ case INTERVAL:
+ return "duration";
+ default:
+ throw Status.INVALID_ARGUMENT
+ .withDescription("unspecified type" + dataType)
+ .asRuntimeException();
+ }
+ }
+
+ public static void checkSchema(
+ List columnDescs, Map cassandraColumnDescMap) {
+ if (columnDescs.size() != cassandraColumnDescMap.size()) {
+ throw Status.FAILED_PRECONDITION
+ .withDescription("Don't match in the number of columns in the table")
+ .asRuntimeException();
+ }
+ for (ColumnDesc columnDesc : columnDescs) {
+ if (!cassandraColumnDescMap.containsKey(columnDesc.getName())) {
+ throw Status.FAILED_PRECONDITION
+ .withDescription(
+ String.format(
+ "Don't match in the name, rw is %s cassandra can't find it",
+ columnDesc.getName()))
+ .asRuntimeException();
+ }
+ if (!cassandraColumnDescMap
+ .get(columnDesc.getName())
+ .equals(getCorrespondingCassandraType(columnDesc.getDataType()))) {
+ throw Status.FAILED_PRECONDITION
+ .withDescription(
+ String.format(
+ "Don't match in the type, name is %s, cassandra is %s, rw is %s",
+ columnDesc.getName(),
+ cassandraColumnDescMap.get(columnDesc.getName()),
+ getCorrespondingCassandraType(columnDesc.getDataType())))
+ .asRuntimeException();
+ }
+ }
+ }
+
+ public static Object convertRow(Object value, TypeName typeName) {
+ switch (typeName) {
+ case INT16:
+ case INT32:
+ case INT64:
+ case FLOAT:
+ case DOUBLE:
+ case BOOLEAN:
+ case VARCHAR:
+ case DECIMAL:
+ return value;
+ case TIMESTAMP:
+ case TIMESTAMPTZ:
+ return ((Timestamp) value).toInstant();
+ case DATE:
+ return ((Date) value).toLocalDate();
+ case TIME:
+ return ((Time) value).toLocalTime();
+ case INTERVAL:
+ return CqlDuration.from((String) value);
+ case BYTEA:
+ return ByteBuffer.wrap((byte[]) value);
+ case STRUCT:
+ throw Status.UNIMPLEMENTED
+ .withDescription(String.format("not support %s now", typeName))
+ .asRuntimeException();
+ default:
+ throw Status.INVALID_ARGUMENT
+ .withDescription("unspecified type" + typeName)
+ .asRuntimeException();
+ }
+ }
+}
diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs
index 633f34d09c9eb..e211f51bec42f 100644
--- a/src/connector/src/sink/remote.rs
+++ b/src/connector/src/sink/remote.rs
@@ -48,7 +48,13 @@ use crate::sink::{
};
use crate::ConnectorParams;
-pub const VALID_REMOTE_SINKS: [&str; 5] = ["jdbc", "iceberg", "deltalake", "elasticsearch-7","cassandra"];
+pub const VALID_REMOTE_SINKS: [&str; 5] = [
+ "jdbc",
+ "iceberg",
+ "deltalake",
+ "elasticsearch-7",
+ "cassandra",
+];
pub fn is_valid_remote_sink(connector_type: &str) -> bool {
VALID_REMOTE_SINKS.contains(&connector_type)
From e329893e913d40c7529e5427a7fc78ad2aba5239 Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Thu, 24 Aug 2023 18:39:41 +0800
Subject: [PATCH 3/8] add cassandra sink
fmt
fmt
---
integration_tests/cassandra-sink/README.md | 6 +++---
.../cassandra-sink/cassandra-sql/run-sql-file.sh | 2 +-
integration_tests/cassandra-sink/create_sink.sql | 2 +-
integration_tests/cassandra-sink/docker-compose.yml | 7 +++++--
risedev.yml | 2 +-
5 files changed, 11 insertions(+), 8 deletions(-)
diff --git a/integration_tests/cassandra-sink/README.md b/integration_tests/cassandra-sink/README.md
index 6a76914bcbf30..d49ce9545fac9 100644
--- a/integration_tests/cassandra-sink/README.md
+++ b/integration_tests/cassandra-sink/README.md
@@ -8,13 +8,13 @@ In this demo, we want to showcase how RisingWave is able to sink data to Cassand
docker compose up -d
```
-The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a clichouse for sink.
+The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Cassandra for sink.
2. Create the Cassandra table:
```sh
-docker compose exec Cassandra bash /var/lib/cassandra-sink/run-sql-file.sh create_cassandra_table
+docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh create_cassandra_table
```
3. Execute the SQL queries in sequence:
@@ -26,7 +26,7 @@ docker compose exec Cassandra bash /var/lib/cassandra-sink/run-sql-file.sh creat
4. Execute a simple query:
```sh
-docker compose exec Cassandra bash /var/lib/cassandra-sink/run-sql-file.sh cassandra_query
+docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh cassandra_query
```
diff --git a/integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh b/integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh
index 1061ecd65bfa0..053f94398559d 100644
--- a/integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh
+++ b/integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh
@@ -1,3 +1,3 @@
set -ex
-cqlsh < /var/lib/cassandra-sink/$1.sql
\ No newline at end of file
+cqlsh < /opt/cassandra/cassandra-sql/$1.sql
\ No newline at end of file
diff --git a/integration_tests/cassandra-sink/create_sink.sql b/integration_tests/cassandra-sink/create_sink.sql
index 8fbc7c7e63905..56fab31689efd 100644
--- a/integration_tests/cassandra-sink/create_sink.sql
+++ b/integration_tests/cassandra-sink/create_sink.sql
@@ -8,4 +8,4 @@ FROM
cassandra.keyspace = 'mykeyspace',
cassandra.table = 'demo_test',
cassandra.datacenter = 'datacenter1',
-)
\ No newline at end of file
+);
\ No newline at end of file
diff --git a/integration_tests/cassandra-sink/docker-compose.yml b/integration_tests/cassandra-sink/docker-compose.yml
index db9b761502c3e..6c4bb61c013af 100644
--- a/integration_tests/cassandra-sink/docker-compose.yml
+++ b/integration_tests/cassandra-sink/docker-compose.yml
@@ -1,13 +1,12 @@
---
version: "3"
services:
- services:
cassandra:
image: cassandra:4.0
ports:
- 9042:9042
volumes:
- - ~/cassandra-sink:/var/lib/cassandra-sink
+ - ./cassandra-sql:/opt/cassandra/cassandra-sql
environment:
- CASSANDRA_CLUSTER_NAME=cloudinfra
compactor-0:
@@ -34,6 +33,10 @@ services:
extends:
file: ../../docker/docker-compose.yml
service: meta-node-0
+ connector-node:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: connector-node
minio-0:
extends:
file: ../../docker/docker-compose.yml
diff --git a/risedev.yml b/risedev.yml
index 50e4dcefaa433..6bcaef69be027 100644
--- a/risedev.yml
+++ b/risedev.yml
@@ -27,7 +27,7 @@ profile:
# bucket: test-bucket
# If you want to create CDC source table, uncomment the following line
- - use: connector-node
+ # - use: connector-node
# if you want to enable etcd backend, uncomment the following lines.
# - use: etcd
From 1f6eaa52581a27db895aa6648fd4187e8bfe8d5c Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Thu, 24 Aug 2023 18:57:34 +0800
Subject: [PATCH 4/8] fmt
fmt
---
.../connector/CassandraFactory.java | 16 ++++++++++
.../risingwave/connector/CassandraSink.java | 32 ++++++++++---------
.../risingwave/connector/CassandraUtil.java | 16 ++++++++++
src/connector/src/sink/remote.rs | 9 ++++--
4 files changed, 56 insertions(+), 17 deletions(-)
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java
index 467ff41a8941a..6c06c16c3bf4f 100644
--- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2023 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.risingwave.connector;
import com.datastax.oss.driver.api.core.CqlSession;
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
index fb33debccc782..49141514d1a2c 100644
--- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
@@ -1,16 +1,18 @@
-// Copyright 2023 RisingWave Labs
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Copyright 2023 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package com.risingwave.connector;
@@ -97,13 +99,13 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) {
public void write(Iterator rows) {
System.out.println(this.config.getType());
if (this.config.getType().equals("append-only")) {
- write_apend_only(rows);
+ write_append_only(rows);
} else {
write_upsert(rows);
}
}
- private void write_apend_only(Iterator rows) {
+ private void write_append_only(Iterator rows) {
while (rows.hasNext()) {
SinkRow row = rows.next();
Data.Op op = row.getOp();
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java
index 65f47855cead9..ead056bfdba22 100644
--- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2023 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.risingwave.connector;
import com.datastax.oss.driver.api.core.data.CqlDuration;
diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs
index cfea73b9f4f63..14537b229ee63 100644
--- a/src/connector/src/sink/remote.rs
+++ b/src/connector/src/sink/remote.rs
@@ -49,8 +49,13 @@ use crate::sink::{
};
use crate::ConnectorParams;
-pub const VALID_REMOTE_SINKS: [&str; 5] =
- ["jdbc", REMOTE_ICEBERG_SINK, "deltalake", "elasticsearch-7", "cassandra"];
+pub const VALID_REMOTE_SINKS: [&str; 5] = [
+ "jdbc",
+ REMOTE_ICEBERG_SINK,
+ "deltalake",
+ "elasticsearch-7",
+ "cassandra",
+];
pub fn is_valid_remote_sink(connector_type: &str) -> bool {
VALID_REMOTE_SINKS.contains(&connector_type)
From f88ead4e8344fe51cf0b64113123fe0eae038fdb Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Fri, 25 Aug 2023 15:14:00 +0800
Subject: [PATCH 5/8] fmt
fmt
fmt
fmt
fmt
fmt
fmt
---
integration_tests/cassandra-sink/README.md | 2 +-
.../{cassandra-query.sql => cassandra_query.sql} | 2 +-
.../cassandra-sql/create_cassandra_table.sql | 2 +-
integration_tests/cassandra-sink/create_sink.sql | 4 ++--
integration_tests/cassandra-sink/create_source.sql | 13 +++++--------
.../com/risingwave/connector/CassandraSink.java | 3 +--
.../com/risingwave/connector/CassandraUtil.java | 1 +
7 files changed, 12 insertions(+), 15 deletions(-)
rename integration_tests/cassandra-sink/cassandra-sql/{cassandra-query.sql => cassandra_query.sql} (91%)
diff --git a/integration_tests/cassandra-sink/README.md b/integration_tests/cassandra-sink/README.md
index d49ce9545fac9..ac957bc5f7dde 100644
--- a/integration_tests/cassandra-sink/README.md
+++ b/integration_tests/cassandra-sink/README.md
@@ -31,5 +31,5 @@ docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh
```
```sql
-select user_id, count(*) from default.demo_test group by user_id
+select user_id, count(*) from my_keyspace.demo_test group by user_id
```
diff --git a/integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql b/integration_tests/cassandra-sink/cassandra-sql/cassandra_query.sql
similarity index 91%
rename from integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql
rename to integration_tests/cassandra-sink/cassandra-sql/cassandra_query.sql
index 004d9fdd1c00e..a18cc472173ab 100644
--- a/integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql
+++ b/integration_tests/cassandra-sink/cassandra-sql/cassandra_query.sql
@@ -1 +1 @@
-select user_id, count(*) from my_keyspace.demo_test group by user_id
+select user_id, count(*) from my_keyspace.demo_test group by user_id;
diff --git a/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql b/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql
index 058fd57c80a63..d858e7c47ae69 100644
--- a/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql
+++ b/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql
@@ -1,7 +1,7 @@
CREATE KEYSPACE my_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
use my_keyspace;
CREATE table demo_test(
- user_id text primary key,
+ user_id int primary key,
target_id text,
event_timestamp timestamp,
);
\ No newline at end of file
diff --git a/integration_tests/cassandra-sink/create_sink.sql b/integration_tests/cassandra-sink/create_sink.sql
index 56fab31689efd..0105775301205 100644
--- a/integration_tests/cassandra-sink/create_sink.sql
+++ b/integration_tests/cassandra-sink/create_sink.sql
@@ -1,10 +1,10 @@
-CREATE SINK bhv_iceberg_sink
+CREATE SINK bhv_cassandra_sink
FROM
bhv_mv WITH (
connector = 'cassandra',
type = 'append-only',
force_append_only='true',
- cassandra.url = 'http://127.0.0.1:9042',
+ cassandra.url = 'cassandra:9042',
cassandra.keyspace = 'mykeyspace',
cassandra.table = 'demo_test',
cassandra.datacenter = 'datacenter1',
diff --git a/integration_tests/cassandra-sink/create_source.sql b/integration_tests/cassandra-sink/create_source.sql
index e704eb785ec06..acb48fecf4e54 100644
--- a/integration_tests/cassandra-sink/create_source.sql
+++ b/integration_tests/cassandra-sink/create_source.sql
@@ -1,5 +1,5 @@
CREATE table user_behaviors (
- user_id VARCHAR,
+ user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
@@ -9,13 +9,10 @@ CREATE table user_behaviors (
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
- fields.seq_id.kind = 'sequence',
- fields.seq_id.start = '1',
- fields.seq_id.end = '10000000',
- fields.user_id.kind = 'random',
- fields.user_id.min = '1',
- fields.user_id.max = '10000000',
+ fields.user_id.kind = 'sequence',
+ fields.user_id.start = '1',
+ fields.user_id.end = '1000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
- datagen.rows.per.second = '20000'
+ datagen.rows.per.second = '500'
) FORMAT PLAIN ENCODE JSON;
\ No newline at end of file
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
index 49141514d1a2c..40e23d4537c2d 100644
--- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
@@ -49,7 +49,7 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) {
throw new IllegalArgumentException(
"Invalid cassandraURL: expected `host:port`, got " + url);
}
- // 2. check connection
+ // check connection
CqlSessionBuilder sessionBuilder =
CqlSession.builder()
.addContactPoint(
@@ -97,7 +97,6 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) {
@Override
public void write(Iterator rows) {
- System.out.println(this.config.getType());
if (this.config.getType().equals("append-only")) {
write_append_only(rows);
} else {
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java
index ead056bfdba22..f6f68012c25c6 100644
--- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java
@@ -124,6 +124,7 @@ public static Object convertRow(Object value, TypeName typeName) {
return CqlDuration.from((String) value);
case BYTEA:
return ByteBuffer.wrap((byte[]) value);
+ case LIST:
case STRUCT:
throw Status.UNIMPLEMENTED
.withDescription(String.format("not support %s now", typeName))
From b0d35722668eb13170508f7e9be91e0e16d18760 Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Wed, 6 Sep 2023 15:14:23 +0800
Subject: [PATCH 6/8] support syclladb
---
.../README.md | 15 +++-
.../cassandra-sql/cassandra_query.sql | 0
.../cassandra-sql/create_cassandra_table.sql | 0
.../cassandra-sql/run-sql-file-syclladb.sh | 3 +
.../cassandra-sql/run-sql-file.sh | 0
.../create_mv.sql | 0
.../create_sink.sql | 2 +-
.../create_source.sql | 0
.../docker-compose-cassandra.yml} | 0
.../docker-compose-syclladb.yml | 74 +++++++++++++++++++
.../connector/CassandraFactory.java | 29 +++-----
.../risingwave/connector/CassandraSink.java | 3 +-
.../risingwave/connector/CassandraUtil.java | 72 ++++++++++++------
risedev.yml | 6 +-
14 files changed, 155 insertions(+), 49 deletions(-)
rename integration_tests/{cassandra-sink => cassandra-and-syclladb-sink}/README.md (64%)
rename integration_tests/{cassandra-sink => cassandra-and-syclladb-sink}/cassandra-sql/cassandra_query.sql (100%)
rename integration_tests/{cassandra-sink => cassandra-and-syclladb-sink}/cassandra-sql/create_cassandra_table.sql (100%)
create mode 100644 integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file-syclladb.sh
rename integration_tests/{cassandra-sink => cassandra-and-syclladb-sink}/cassandra-sql/run-sql-file.sh (100%)
rename integration_tests/{cassandra-sink => cassandra-and-syclladb-sink}/create_mv.sql (100%)
rename integration_tests/{cassandra-sink => cassandra-and-syclladb-sink}/create_sink.sql (86%)
rename integration_tests/{cassandra-sink => cassandra-and-syclladb-sink}/create_source.sql (100%)
rename integration_tests/{cassandra-sink/docker-compose.yml => cassandra-and-syclladb-sink/docker-compose-cassandra.yml} (100%)
create mode 100644 integration_tests/cassandra-and-syclladb-sink/docker-compose-syclladb.yml
diff --git a/integration_tests/cassandra-sink/README.md b/integration_tests/cassandra-and-syclladb-sink/README.md
similarity index 64%
rename from integration_tests/cassandra-sink/README.md
rename to integration_tests/cassandra-and-syclladb-sink/README.md
index ac957bc5f7dde..6c54752755822 100644
--- a/integration_tests/cassandra-sink/README.md
+++ b/integration_tests/cassandra-and-syclladb-sink/README.md
@@ -5,7 +5,11 @@ In this demo, we want to showcase how RisingWave is able to sink data to Cassand
1. Launch the cluster:
```sh
-docker compose up -d
+docker-compose -f docker-compose-cassandra.yml up -d
+```
+If we use syclladb.
+```sh
+docker-compose -f docker-compose-syclladb.yml up -d
```
The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Cassandra for sink.
@@ -16,6 +20,10 @@ The cluster contains a RisingWave cluster and its necessary dependencies, a data
```sh
docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh create_cassandra_table
```
+If we use syclladb.
+```sh
+docker compose exec syclladb bash /opt/scylladb/scylladb-sql/run-sql-file-syclladb.sh create_cassandra_table
+```
3. Execute the SQL queries in sequence:
@@ -27,7 +35,10 @@ docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh
```sh
docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh cassandra_query
-
+```
+If we use syclladb.
+```sh
+docker compose exec syclladb bash /opt/scylladb/scylladb-sql/run-sql-file-syclladb.sh cassandra_query
```
```sql
diff --git a/integration_tests/cassandra-sink/cassandra-sql/cassandra_query.sql b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/cassandra_query.sql
similarity index 100%
rename from integration_tests/cassandra-sink/cassandra-sql/cassandra_query.sql
rename to integration_tests/cassandra-and-syclladb-sink/cassandra-sql/cassandra_query.sql
diff --git a/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/create_cassandra_table.sql
similarity index 100%
rename from integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql
rename to integration_tests/cassandra-and-syclladb-sink/cassandra-sql/create_cassandra_table.sql
diff --git a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file-syclladb.sh b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file-syclladb.sh
new file mode 100644
index 0000000000000..4941c6adaf16b
--- /dev/null
+++ b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file-syclladb.sh
@@ -0,0 +1,3 @@
+set -ex
+
+cqlsh < /opt/scylladb/scylladb-sql/$1.sql
\ No newline at end of file
diff --git a/integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file.sh
similarity index 100%
rename from integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh
rename to integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file.sh
diff --git a/integration_tests/cassandra-sink/create_mv.sql b/integration_tests/cassandra-and-syclladb-sink/create_mv.sql
similarity index 100%
rename from integration_tests/cassandra-sink/create_mv.sql
rename to integration_tests/cassandra-and-syclladb-sink/create_mv.sql
diff --git a/integration_tests/cassandra-sink/create_sink.sql b/integration_tests/cassandra-and-syclladb-sink/create_sink.sql
similarity index 86%
rename from integration_tests/cassandra-sink/create_sink.sql
rename to integration_tests/cassandra-and-syclladb-sink/create_sink.sql
index 0105775301205..a556b5bcde5f5 100644
--- a/integration_tests/cassandra-sink/create_sink.sql
+++ b/integration_tests/cassandra-and-syclladb-sink/create_sink.sql
@@ -5,7 +5,7 @@ FROM
type = 'append-only',
force_append_only='true',
cassandra.url = 'cassandra:9042',
- cassandra.keyspace = 'mykeyspace',
+ cassandra.keyspace = 'my_keyspace',
cassandra.table = 'demo_test',
cassandra.datacenter = 'datacenter1',
);
\ No newline at end of file
diff --git a/integration_tests/cassandra-sink/create_source.sql b/integration_tests/cassandra-and-syclladb-sink/create_source.sql
similarity index 100%
rename from integration_tests/cassandra-sink/create_source.sql
rename to integration_tests/cassandra-and-syclladb-sink/create_source.sql
diff --git a/integration_tests/cassandra-sink/docker-compose.yml b/integration_tests/cassandra-and-syclladb-sink/docker-compose-cassandra.yml
similarity index 100%
rename from integration_tests/cassandra-sink/docker-compose.yml
rename to integration_tests/cassandra-and-syclladb-sink/docker-compose-cassandra.yml
diff --git a/integration_tests/cassandra-and-syclladb-sink/docker-compose-syclladb.yml b/integration_tests/cassandra-and-syclladb-sink/docker-compose-syclladb.yml
new file mode 100644
index 0000000000000..a133f53d8c384
--- /dev/null
+++ b/integration_tests/cassandra-and-syclladb-sink/docker-compose-syclladb.yml
@@ -0,0 +1,74 @@
+---
+version: "3"
+services:
+ syclladb:
+ image: scylladb/scylla:5.1
+ ports:
+ - 9042:9042
+ volumes:
+ - ./scylladb-sql:/opt/scylladb/scylladb-sql
+ environment:
+ - CASSANDRA_CLUSTER_NAME=cloudinfra
+ compactor-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: compactor-0
+ compute-node-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: compute-node-0
+ etcd-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: etcd-0
+ frontend-node-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: frontend-node-0
+ grafana-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: grafana-0
+ meta-node-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: meta-node-0
+ connector-node:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: connector-node
+ minio-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: minio-0
+ prometheus-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: prometheus-0
+ message_queue:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: message_queue
+ datagen:
+ build: ../datagen
+ depends_on: [message_queue]
+ command:
+ - /bin/sh
+ - -c
+ - /datagen --mode clickstream --qps 2 kafka --brokers message_queue:29092
+ restart: always
+ container_name: datagen
+volumes:
+ compute-node-0:
+ external: false
+ etcd-0:
+ external: false
+ grafana-0:
+ external: false
+ minio-0:
+ external: false
+ prometheus-0:
+ external: false
+ message_queue:
+ external: false
+name: risingwave-compose
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java
index 6c06c16c3bf4f..f9fb5bee020e2 100644
--- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java
@@ -19,9 +19,9 @@
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.cql.*;
+import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.risingwave.connector.api.ColumnDesc;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkFactory;
import com.risingwave.connector.api.sink.SinkWriter;
@@ -29,8 +29,6 @@
import com.risingwave.proto.Catalog.SinkType;
import io.grpc.Status;
import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,17 +69,13 @@ public void validate(
}
CqlSession session = sessionBuilder.build();
- String cql =
- String.format(
- "SELECT column_name , type FROM system_schema.columns WHERE keyspace_name = '%s' AND table_name = '%s';",
- config.getKeyspace(), config.getTable());
-
- HashMap cassandraColumnDescMap = new HashMap<>();
- for (Row i : session.execute(cql)) {
- cassandraColumnDescMap.put(i.getString(0), i.getString(1));
- }
- List columnDescs = tableSchema.getColumnDescs();
- CassandraUtil.checkSchema(columnDescs, cassandraColumnDescMap);
+ TableMetadata tableMetadata =
+ session.getMetadata()
+ .getKeyspace(config.getKeyspace())
+ .get()
+ .getTable(config.getTable())
+ .get();
+ CassandraUtil.checkSchema(tableSchema.getColumnDescs(), tableMetadata.getColumns());
if (session.isClosed()) {
throw Status.INVALID_ARGUMENT
@@ -92,11 +86,8 @@ public void validate(
session.close();
switch (sinkType) {
case UPSERT:
- if (tableSchema.getPrimaryKeys().isEmpty()) {
- throw Status.INVALID_ARGUMENT
- .withDescription("please define primary key for upsert cassandra sink")
- .asRuntimeException();
- }
+ CassandraUtil.checkPrimaryKey(
+ tableMetadata.getPrimaryKey(), tableSchema.getPrimaryKeys());
break;
case APPEND_ONLY:
case FORCE_APPEND_ONLY:
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
index 40e23d4537c2d..af422dd2f5bdb 100644
--- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java
@@ -68,8 +68,7 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) {
}
this.config = config;
- this.batchBuilder =
- BatchStatement.builder(DefaultBatchType.LOGGED).setKeyspace(config.getKeyspace());
+ this.batchBuilder = BatchStatement.builder(DefaultBatchType.LOGGED);
// fetch non-pk columns for prepared statements
nonKeyColumns =
diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java
index f6f68012c25c6..fbe9c1c04a28b 100644
--- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java
+++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java
@@ -16,7 +16,9 @@
package com.risingwave.connector;
+import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.data.CqlDuration;
+import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
import com.risingwave.connector.api.ColumnDesc;
import com.risingwave.proto.Data.DataType;
import com.risingwave.proto.Data.DataType.TypeName;
@@ -27,43 +29,45 @@
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
public class CassandraUtil {
- private static String getCorrespondingCassandraType(DataType dataType) {
+ private static int getCorrespondingCassandraType(DataType dataType) {
switch (dataType.getTypeName()) {
case INT16:
- return "smallint";
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.SMALLINT;
case INT32:
- return "int";
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.INT;
case INT64:
- return "bigint";
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.BIGINT;
case FLOAT:
- return "float";
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.FLOAT;
case DOUBLE:
- return "double";
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.DOUBLE;
case BOOLEAN:
- return "boolean";
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.BOOLEAN;
case VARCHAR:
- return "text";
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.VARCHAR;
case DECIMAL:
- return "decimal";
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.DECIMAL;
case TIMESTAMP:
- return "timestamp";
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.TIMESTAMP;
case TIMESTAMPTZ:
- return "timestamp";
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.TIMESTAMP;
case DATE:
- return "date";
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.DATE;
case TIME:
- return "time";
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.TIME;
case BYTEA:
- return "blob";
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.BLOB;
case LIST:
case STRUCT:
throw Status.UNIMPLEMENTED
.withDescription(String.format("not support %s now", dataType))
.asRuntimeException();
case INTERVAL:
- return "duration";
+ return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.DURATION;
default:
throw Status.INVALID_ARGUMENT
.withDescription("unspecified type" + dataType)
@@ -72,14 +76,16 @@ private static String getCorrespondingCassandraType(DataType dataType) {
}
public static void checkSchema(
- List columnDescs, Map cassandraColumnDescMap) {
+ List columnDescs,
+ Map cassandraColumnDescMap) {
if (columnDescs.size() != cassandraColumnDescMap.size()) {
throw Status.FAILED_PRECONDITION
.withDescription("Don't match in the number of columns in the table")
.asRuntimeException();
}
for (ColumnDesc columnDesc : columnDescs) {
- if (!cassandraColumnDescMap.containsKey(columnDesc.getName())) {
+ CqlIdentifier cql = CqlIdentifier.fromInternal(columnDesc.getName());
+ if (!cassandraColumnDescMap.containsKey(cql)) {
throw Status.FAILED_PRECONDITION
.withDescription(
String.format(
@@ -87,16 +93,38 @@ public static void checkSchema(
columnDesc.getName()))
.asRuntimeException();
}
- if (!cassandraColumnDescMap
- .get(columnDesc.getName())
- .equals(getCorrespondingCassandraType(columnDesc.getDataType()))) {
+ if (cassandraColumnDescMap.get(cql).getType().getProtocolCode()
+ != getCorrespondingCassandraType(columnDesc.getDataType())) {
throw Status.FAILED_PRECONDITION
.withDescription(
String.format(
"Don't match in the type, name is %s, cassandra is %s, rw is %s",
columnDesc.getName(),
- cassandraColumnDescMap.get(columnDesc.getName()),
- getCorrespondingCassandraType(columnDesc.getDataType())))
+ cassandraColumnDescMap.get(cql),
+ columnDesc.getDataType().getTypeName()))
+ .asRuntimeException();
+ }
+ }
+ }
+
+ public static void checkPrimaryKey(
+ List cassandraColumnMetadatas, List columnMetadatas) {
+ if (cassandraColumnMetadatas.size() != columnMetadatas.size()) {
+ throw Status.FAILED_PRECONDITION
+ .withDescription("Primary key len don't match")
+ .asRuntimeException();
+ }
+ Set cassandraColumnsSet =
+ cassandraColumnMetadatas.stream()
+ .map((a) -> a.getName().toString())
+ .collect(Collectors.toSet());
+ for (String columnMetadata : columnMetadatas) {
+ if (!cassandraColumnsSet.contains(columnMetadata)) {
+ throw Status.FAILED_PRECONDITION
+ .withDescription(
+ String.format(
+ "Primary key don't match. RisingWave Primary key is %s, don't find it in cassandra",
+ columnMetadata))
.asRuntimeException();
}
}
diff --git a/risedev.yml b/risedev.yml
index f17deec0f260e..35c6ae1043b91 100644
--- a/risedev.yml
+++ b/risedev.yml
@@ -20,14 +20,14 @@ profile:
# config-path: src/config/example.toml
steps:
# If you want to use the local s3 storage, enable the following line
- # - use: minio
+ - use: minio
# If you want to use aws-s3, configure AK and SK in env var and enable the following lines:
# - use: aws-s3
# bucket: test-bucket
# If you want to create CDC source table, uncomment the following line
- # - use: connector-node
+ - use: connector-node
# if you want to enable etcd backend, uncomment the following lines.
# - use: etcd
@@ -43,7 +43,7 @@ profile:
- use: frontend
# If you want to enable compactor, uncomment the following line, and enable either minio or aws-s3 as well.
- # - use: compactor
+ - use: compactor
# If you want to create source from Kafka, uncomment the following lines
# Note that kafka depends on zookeeper, so zookeeper must be started beforehand.
From 6134de41b3afc2c08be6f59217d51b6f7438b34a Mon Sep 17 00:00:00 2001
From: Patrick Huang
Date: Wed, 6 Sep 2023 19:06:37 +0800
Subject: [PATCH 7/8] Make the demo more initiative and some fixes
---
.../cassandra-and-syclladb-sink/README.md | 53 ++++++++-----
.../cassandra-sql/cassandra_query.sql | 1 -
.../cassandra-sql/create_cassandra_table.sql | 7 --
.../cassandra-sql/run-sql-file-syclladb.sh | 3 -
.../cassandra-sql/run-sql-file.sh | 3 -
.../create_sink.sql | 4 +-
.../create_source.sql | 2 +-
.../docker-compose-syclladb.yml | 74 -------------------
...mpose-cassandra.yml => docker-compose.yml} | 14 +++-
9 files changed, 52 insertions(+), 109 deletions(-)
delete mode 100644 integration_tests/cassandra-and-syclladb-sink/cassandra-sql/cassandra_query.sql
delete mode 100644 integration_tests/cassandra-and-syclladb-sink/cassandra-sql/create_cassandra_table.sql
delete mode 100644 integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file-syclladb.sh
delete mode 100644 integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file.sh
delete mode 100644 integration_tests/cassandra-and-syclladb-sink/docker-compose-syclladb.yml
rename integration_tests/cassandra-and-syclladb-sink/{docker-compose-cassandra.yml => docker-compose.yml} (85%)
diff --git a/integration_tests/cassandra-and-syclladb-sink/README.md b/integration_tests/cassandra-and-syclladb-sink/README.md
index 6c54752755822..071fc88d0df46 100644
--- a/integration_tests/cassandra-and-syclladb-sink/README.md
+++ b/integration_tests/cassandra-and-syclladb-sink/README.md
@@ -2,27 +2,45 @@
In this demo, we want to showcase how RisingWave is able to sink data to Cassandra.
-1. Launch the cluster:
+1. Set the compose profile accordingly:
+Demo with Apache Cassandra:
+```
+export COMPOSE_PROFILES=cassandra
+```
-```sh
-docker-compose -f docker-compose-cassandra.yml up -d
+Demo with Scylladb
```
-If we use syclladb.
+export COMPOSE_PROFILES=scylladb
+```
+
+2. Launch the cluster:
+
```sh
-docker-compose -f docker-compose-syclladb.yml up -d
+docker-compose up -d
```
The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Cassandra for sink.
-2. Create the Cassandra table:
+3. Create the Cassandra table via cqlsh:
+Login to cqlsh
```sh
-docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh create_cassandra_table
+# cqlsh into cassandra
+docker compose exec cassandra cqlsh
+# cqlsh into scylladb
+docker compose exec scylladb cqlsh
```
-If we use syclladb.
-```sh
-docker compose exec syclladb bash /opt/scylladb/scylladb-sql/run-sql-file-syclladb.sh create_cassandra_table
+
+Run the following queries to create keyspace and table.
+```sql
+CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+use demo;
+CREATE table demo_bhv_table(
+ user_id int primary key,
+ target_id text,
+ event_timestamp timestamp,
+);
```
3. Execute the SQL queries in sequence:
@@ -31,16 +49,17 @@ docker compose exec syclladb bash /opt/scylladb/scylladb-sql/run-sql-file-syclla
- create_mv.sql
- create_sink.sql
-4. Execute a simple query:
+4. Execute a simple query to check the sink results via csqlsh:
+Login to cqlsh
```sh
-docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh cassandra_query
-```
-If we use syclladb.
-```sh
-docker compose exec syclladb bash /opt/scylladb/scylladb-sql/run-sql-file-syclladb.sh cassandra_query
+# cqlsh into cassandra
+docker compose exec cassandra cqlsh
+# cqlsh into scylladb
+docker compose exec scylladb cqlsh
```
+Run the following query
```sql
-select user_id, count(*) from my_keyspace.demo_test group by user_id
+select user_id, count(*) from my_keyspace.demo_test group by user_id;
```
diff --git a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/cassandra_query.sql b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/cassandra_query.sql
deleted file mode 100644
index a18cc472173ab..0000000000000
--- a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/cassandra_query.sql
+++ /dev/null
@@ -1 +0,0 @@
-select user_id, count(*) from my_keyspace.demo_test group by user_id;
diff --git a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/create_cassandra_table.sql b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/create_cassandra_table.sql
deleted file mode 100644
index d858e7c47ae69..0000000000000
--- a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/create_cassandra_table.sql
+++ /dev/null
@@ -1,7 +0,0 @@
-CREATE KEYSPACE my_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
-use my_keyspace;
-CREATE table demo_test(
- user_id int primary key,
- target_id text,
- event_timestamp timestamp,
-);
\ No newline at end of file
diff --git a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file-syclladb.sh b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file-syclladb.sh
deleted file mode 100644
index 4941c6adaf16b..0000000000000
--- a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file-syclladb.sh
+++ /dev/null
@@ -1,3 +0,0 @@
-set -ex
-
-cqlsh < /opt/scylladb/scylladb-sql/$1.sql
\ No newline at end of file
diff --git a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file.sh b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file.sh
deleted file mode 100644
index 053f94398559d..0000000000000
--- a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file.sh
+++ /dev/null
@@ -1,3 +0,0 @@
-set -ex
-
-cqlsh < /opt/cassandra/cassandra-sql/$1.sql
\ No newline at end of file
diff --git a/integration_tests/cassandra-and-syclladb-sink/create_sink.sql b/integration_tests/cassandra-and-syclladb-sink/create_sink.sql
index a556b5bcde5f5..724e784694c2f 100644
--- a/integration_tests/cassandra-and-syclladb-sink/create_sink.sql
+++ b/integration_tests/cassandra-and-syclladb-sink/create_sink.sql
@@ -5,7 +5,7 @@ FROM
type = 'append-only',
force_append_only='true',
cassandra.url = 'cassandra:9042',
- cassandra.keyspace = 'my_keyspace',
- cassandra.table = 'demo_test',
+ cassandra.keyspace = 'demo',
+ cassandra.table = 'demo_bhv_table',
cassandra.datacenter = 'datacenter1',
);
\ No newline at end of file
diff --git a/integration_tests/cassandra-and-syclladb-sink/create_source.sql b/integration_tests/cassandra-and-syclladb-sink/create_source.sql
index acb48fecf4e54..c28c10f3616da 100644
--- a/integration_tests/cassandra-and-syclladb-sink/create_source.sql
+++ b/integration_tests/cassandra-and-syclladb-sink/create_source.sql
@@ -14,5 +14,5 @@ CREATE table user_behaviors (
fields.user_id.end = '1000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
- datagen.rows.per.second = '500'
+ datagen.rows.per.second = '10'
) FORMAT PLAIN ENCODE JSON;
\ No newline at end of file
diff --git a/integration_tests/cassandra-and-syclladb-sink/docker-compose-syclladb.yml b/integration_tests/cassandra-and-syclladb-sink/docker-compose-syclladb.yml
deleted file mode 100644
index a133f53d8c384..0000000000000
--- a/integration_tests/cassandra-and-syclladb-sink/docker-compose-syclladb.yml
+++ /dev/null
@@ -1,74 +0,0 @@
----
-version: "3"
-services:
- syclladb:
- image: scylladb/scylla:5.1
- ports:
- - 9042:9042
- volumes:
- - ./scylladb-sql:/opt/scylladb/scylladb-sql
- environment:
- - CASSANDRA_CLUSTER_NAME=cloudinfra
- compactor-0:
- extends:
- file: ../../docker/docker-compose.yml
- service: compactor-0
- compute-node-0:
- extends:
- file: ../../docker/docker-compose.yml
- service: compute-node-0
- etcd-0:
- extends:
- file: ../../docker/docker-compose.yml
- service: etcd-0
- frontend-node-0:
- extends:
- file: ../../docker/docker-compose.yml
- service: frontend-node-0
- grafana-0:
- extends:
- file: ../../docker/docker-compose.yml
- service: grafana-0
- meta-node-0:
- extends:
- file: ../../docker/docker-compose.yml
- service: meta-node-0
- connector-node:
- extends:
- file: ../../docker/docker-compose.yml
- service: connector-node
- minio-0:
- extends:
- file: ../../docker/docker-compose.yml
- service: minio-0
- prometheus-0:
- extends:
- file: ../../docker/docker-compose.yml
- service: prometheus-0
- message_queue:
- extends:
- file: ../../docker/docker-compose.yml
- service: message_queue
- datagen:
- build: ../datagen
- depends_on: [message_queue]
- command:
- - /bin/sh
- - -c
- - /datagen --mode clickstream --qps 2 kafka --brokers message_queue:29092
- restart: always
- container_name: datagen
-volumes:
- compute-node-0:
- external: false
- etcd-0:
- external: false
- grafana-0:
- external: false
- minio-0:
- external: false
- prometheus-0:
- external: false
- message_queue:
- external: false
-name: risingwave-compose
diff --git a/integration_tests/cassandra-and-syclladb-sink/docker-compose-cassandra.yml b/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml
similarity index 85%
rename from integration_tests/cassandra-and-syclladb-sink/docker-compose-cassandra.yml
rename to integration_tests/cassandra-and-syclladb-sink/docker-compose.yml
index 6c4bb61c013af..07c2f9fe0228e 100644
--- a/integration_tests/cassandra-and-syclladb-sink/docker-compose-cassandra.yml
+++ b/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml
@@ -6,9 +6,21 @@ services:
ports:
- 9042:9042
volumes:
- - ./cassandra-sql:/opt/cassandra/cassandra-sql
+ - ./cassandra-sql:/cassandra-sql
environment:
- CASSANDRA_CLUSTER_NAME=cloudinfra
+ profiles:
+ - cassandra
+ scylladb:
+ image: scylladb/scylla:5.1
+ ports:
+ - 9042:9042
+ volumes:
+ - ./cassandra-sql:/cassandra-sql
+ environment:
+ - CASSANDRA_CLUSTER_NAME=cloudinfra
+ profiles:
+ - scylladb
compactor-0:
extends:
file: ../../docker/docker-compose.yml
From 4db9182dd4cd10096269063a6a35d9762c791d51 Mon Sep 17 00:00:00 2001
From: Patrick Huang
Date: Wed, 6 Sep 2023 19:08:13 +0800
Subject: [PATCH 8/8] minor fix
---
integration_tests/cassandra-and-syclladb-sink/README.md | 2 +-
.../cassandra-and-syclladb-sink/docker-compose.yml | 4 ----
2 files changed, 1 insertion(+), 5 deletions(-)
diff --git a/integration_tests/cassandra-and-syclladb-sink/README.md b/integration_tests/cassandra-and-syclladb-sink/README.md
index 071fc88d0df46..b022c9ef09cf8 100644
--- a/integration_tests/cassandra-and-syclladb-sink/README.md
+++ b/integration_tests/cassandra-and-syclladb-sink/README.md
@@ -1,4 +1,4 @@
-# Demo: Sinking to Cassandra
+# Demo: Sinking to Cassandra/Scylladb
In this demo, we want to showcase how RisingWave is able to sink data to Cassandra.
diff --git a/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml b/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml
index 07c2f9fe0228e..27b77f850f882 100644
--- a/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml
+++ b/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml
@@ -5,8 +5,6 @@ services:
image: cassandra:4.0
ports:
- 9042:9042
- volumes:
- - ./cassandra-sql:/cassandra-sql
environment:
- CASSANDRA_CLUSTER_NAME=cloudinfra
profiles:
@@ -15,8 +13,6 @@ services:
image: scylladb/scylla:5.1
ports:
- 9042:9042
- volumes:
- - ./cassandra-sql:/cassandra-sql
environment:
- CASSANDRA_CLUSTER_NAME=cloudinfra
profiles: