From 0644ea33db6e14dacf6921c1e29fa21114ec4886 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 22 Aug 2023 12:33:41 +0800 Subject: [PATCH 1/8] support cassandra --- java/connector-node/assembly/assembly.xml | 1 + java/connector-node/assembly/pom.xml | 4 + .../risingwave-connector-service/pom.xml | 5 + .../com/risingwave/connector/SinkUtils.java | 2 + .../risingwave-connector-test/pom.xml | 5 + .../risingwave-sink-cassandra/pom.xml | 60 +++++ .../risingwave/connector/CassandraConfig.java | 95 +++++++ .../connector/CassandraFactory.java | 65 +++++ .../risingwave/connector/CassandraSink.java | 237 ++++++++++++++++++ .../risingwave-sink-es-7/pom.xml | 1 - java/pom.xml | 7 + risedev.yml | 2 +- src/connector/src/sink/remote.rs | 2 +- 13 files changed, 483 insertions(+), 3 deletions(-) create mode 100644 java/connector-node/risingwave-sink-cassandra/pom.xml create mode 100644 java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java create mode 100644 java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java create mode 100644 java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java diff --git a/java/connector-node/assembly/assembly.xml b/java/connector-node/assembly/assembly.xml index 327c7628dbb59..751ba2f410e0b 100644 --- a/java/connector-node/assembly/assembly.xml +++ b/java/connector-node/assembly/assembly.xml @@ -40,6 +40,7 @@ *:risingwave-sink-es-7 + *:risingwave-sink-cassandra *:risingwave-sink-jdbc *:risingwave-sink-iceberg *:risingwave-sink-deltalake diff --git a/java/connector-node/assembly/pom.xml b/java/connector-node/assembly/pom.xml index 5c518601bc8e7..6812bac5b63e6 100644 --- a/java/connector-node/assembly/pom.xml +++ b/java/connector-node/assembly/pom.xml @@ -33,6 +33,10 @@ com.risingwave.java risingwave-sink-es-7 + + com.risingwave.java + risingwave-sink-cassandra + com.risingwave.java risingwave-sink-jdbc diff --git a/java/connector-node/risingwave-connector-service/pom.xml b/java/connector-node/risingwave-connector-service/pom.xml index fccd84d24b9af..4e2dbe1d6ec96 100644 --- a/java/connector-node/risingwave-connector-service/pom.xml +++ b/java/connector-node/risingwave-connector-service/pom.xml @@ -99,5 +99,10 @@ risingwave-sink-es-7 provided + + com.risingwave.java + risingwave-sink-cassandra + provided + diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java index b48cff9bacdd4..abc2be7bcbaab 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java @@ -43,6 +43,8 @@ public static SinkFactory getSinkFactory(String sinkName) { return new DeltaLakeSinkFactory(); case "elasticsearch-7": return new EsSink7Factory(); + case "cassandra": + return new CassandraFactory(); default: throw UNIMPLEMENTED .withDescription("unknown sink type: " + sinkName) diff --git a/java/connector-node/risingwave-connector-test/pom.xml b/java/connector-node/risingwave-connector-test/pom.xml index d630eb183c856..7f8f6f1bc49cc 100644 --- a/java/connector-node/risingwave-connector-test/pom.xml +++ b/java/connector-node/risingwave-connector-test/pom.xml @@ -174,5 +174,10 @@ risingwave-sink-es-7 test + + com.risingwave.java + risingwave-sink-cassandra + test + diff --git a/java/connector-node/risingwave-sink-cassandra/pom.xml b/java/connector-node/risingwave-sink-cassandra/pom.xml new file mode 100644 index 0000000000000..e51faa9691cf9 --- /dev/null +++ b/java/connector-node/risingwave-sink-cassandra/pom.xml @@ -0,0 +1,60 @@ + + + + java-parent + com.risingwave.java + 1.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + + risingwave-sink-cassandra + 1.0-SNAPSHOT + risingwave-sink-cassandra + + + + com.risingwave.java + proto + + + com.risingwave.java + connector-api + + + org.apache.logging.log4j + log4j-api + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.logging.log4j + log4j-core + + + org.apache.commons + commons-text + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + + + + + com.datastax.oss + java-driver-core + ${datastax.version} + + + + \ No newline at end of file diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java new file mode 100644 index 0000000000000..5b946d43a8015 --- /dev/null +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java @@ -0,0 +1,95 @@ +/* + * Copyright 2023 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.risingwave.connector.api.sink.CommonSinkConfig; + +public class CassandraConfig extends CommonSinkConfig { + /** Required */ + private String type; + /** Required */ + private String url; + + /** Required */ + private String keyspace; + + /** Required */ + private String table; + + /** Required */ + private String datacenter; + + @JsonProperty(value = "username") + private String username; + + @JsonProperty(value = "password") + private String password; + + @JsonCreator + public CassandraConfig( + @JsonProperty(value = "url") String url, + @JsonProperty(value = "keyspace") String keyspace, + @JsonProperty(value = "table") String table, + @JsonProperty(value = "datacenter") String datacenter, + @JsonProperty(value = "type") String type) { + this.url = url; + this.keyspace = keyspace; + this.table = table; + this.datacenter = datacenter; + this.type = type; + } + + public String getType() { + return type; + } + + public String getUrl() { + return url; + } + + public String getKeyspace() { + return keyspace; + } + + public String getTable() { + return table; + } + + public String getDatacenter() { + return datacenter; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public CassandraConfig withUsername(String username) { + this.username = username; + return this; + } + + public CassandraConfig withPassword(String password) { + this.password = password; + return this; + } +} diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java new file mode 100644 index 0000000000000..ca52c3e40cd59 --- /dev/null +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java @@ -0,0 +1,65 @@ +package com.risingwave.connector; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.datastax.oss.driver.api.core.cql.*; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.risingwave.connector.api.TableSchema; +import com.risingwave.connector.api.sink.SinkFactory; +import com.risingwave.connector.api.sink.SinkWriter; +import com.risingwave.connector.api.sink.SinkWriterV1; +import com.risingwave.proto.Catalog.SinkType; +import io.grpc.Status; +import java.net.InetSocketAddress; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CassandraFactory implements SinkFactory { + private static final Logger LOG = LoggerFactory.getLogger(CassandraFactory.class); + + public SinkWriter createWriter(TableSchema tableSchema, Map tableProperties) { + ObjectMapper mapper = new ObjectMapper(); + CassandraConfig config = mapper.convertValue(tableProperties, CassandraConfig.class); + return new SinkWriterV1.Adapter(new CassandraSink(tableSchema, config)); + } + + @Override + public void validate( + TableSchema tableSchema, Map tableProperties, SinkType sinkType) { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true); + CassandraConfig config = mapper.convertValue(tableProperties, CassandraConfig.class); + + // 1. check url + String url = config.getUrl(); + String[] hostPort = url.split(":"); + if (hostPort.length != 2) { + throw new IllegalArgumentException( + "Invalid cassandraURL: expected `host:port`, got " + url); + } + for (String s : hostPort) { + System.out.println(s); + } + // 2. check connection + CqlSessionBuilder sessionBuilder = + CqlSession.builder() + .addContactPoint( + new InetSocketAddress(hostPort[0], Integer.parseInt(hostPort[1]))) + .withKeyspace(config.getKeyspace()) + .withLocalDatacenter(config.getDatacenter()); + if (config.getUsername() != null && config.getPassword() != null) { + sessionBuilder = + sessionBuilder.withAuthCredentials(config.getUsername(), config.getPassword()); + } + CqlSession session = sessionBuilder.build(); + if (session.isClosed()) { + throw Status.INVALID_ARGUMENT + .withDescription("Cannot connect to " + config.getUrl()) + .asRuntimeException(); + } + // 3. close client + session.close(); + } +} diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java new file mode 100644 index 0000000000000..2034a3a95d287 --- /dev/null +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java @@ -0,0 +1,237 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.datastax.oss.driver.api.core.cql.*; +import com.risingwave.connector.api.TableSchema; +import com.risingwave.connector.api.sink.SinkRow; +import com.risingwave.connector.api.sink.SinkWriterBase; +import com.risingwave.proto.Data; +import io.grpc.Status; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CassandraSink extends SinkWriterBase { + private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class); + private final CqlSession session; + private final List updateRowCache = new ArrayList<>(1); + private final PreparedStatement insertStmt; + private final PreparedStatement updateStmt; + private final PreparedStatement deleteStmt; + private final List nonKeyColumns; + private final BatchStatementBuilder batchBuilder; + private final CassandraConfig config; + private final List primaryKeyIndexes; + + public CassandraSink(TableSchema tableSchema, CassandraConfig config) { + super(tableSchema); + String url = config.getUrl(); + String[] hostPort = url.split(":"); + if (hostPort.length != 2) { + throw new IllegalArgumentException( + "Invalid cassandraURL: expected `host:port`, got " + url); + } + // 2. check connection + CqlSessionBuilder sessionBuilder = + CqlSession.builder() + .addContactPoint( + new InetSocketAddress(hostPort[0], Integer.parseInt(hostPort[1]))) + .withKeyspace(config.getKeyspace()) + .withLocalDatacenter(config.getDatacenter()); + if (config.getUsername() != null && config.getPassword() != null) { + sessionBuilder = + sessionBuilder.withAuthCredentials(config.getUsername(), config.getPassword()); + } + this.session = sessionBuilder.build(); + if (session.isClosed()) { + throw Status.INVALID_ARGUMENT + .withDescription("Cannot connect to " + config.getUrl()) + .asRuntimeException(); + } + + this.config = config; + + primaryKeyIndexes = new ArrayList(); + for (String primaryKey : tableSchema.getPrimaryKeys()) { + primaryKeyIndexes.add(tableSchema.getColumnIndex(primaryKey)); + } + + this.batchBuilder = + BatchStatement.builder(DefaultBatchType.LOGGED).setKeyspace(config.getKeyspace()); + + // fetch non-pk columns for prepared statements + nonKeyColumns = + Arrays.stream(tableSchema.getColumnNames()) + // cassandra does not allow SET on primary keys + .filter(c -> !tableSchema.getPrimaryKeys().contains(c)) + .collect(Collectors.toList()); + + // prepare statement for insert + this.insertStmt = session.prepare(createInsertStatement(config.getTable(), tableSchema)); + + // prepare statement for update-insert/update-delete + this.updateStmt = session.prepare(createUpdateStatement(config.getTable(), tableSchema)); + + // prepare the delete statement + this.deleteStmt = session.prepare(createDeleteStatement(config.getTable(), tableSchema)); + } + + @Override + public void write(Iterator rows) { + if (this.config.getType() == "append-only") { + write_apend_only(rows); + } else { + write_upsert(rows); + } + } + + private void write_apend_only(Iterator rows) { + while (rows.hasNext()) { + SinkRow row = rows.next(); + Data.Op op = row.getOp(); + switch (op) { + case INSERT: + batchBuilder.addStatement(bindInsertStatement(insertStmt, row)); + break; + case UPDATE_DELETE: + break; + case UPDATE_INSERT: + break; + case DELETE: + break; + default: + throw Status.INTERNAL + .withDescription("Unknown operation: " + op) + .asRuntimeException(); + } + } + } + + private void write_upsert(Iterator rows) { + while (rows.hasNext()) { + SinkRow row = rows.next(); + Data.Op op = row.getOp(); + switch (op) { + case INSERT: + batchBuilder.addStatement(bindInsertStatement(insertStmt, row)); + break; + case UPDATE_DELETE: + updateRowCache.clear(); + updateRowCache.add(row); + break; + case UPDATE_INSERT: + SinkRow old = updateRowCache.remove(0); + if (old == null) { + throw Status.FAILED_PRECONDITION + .withDescription("UPDATE_INSERT without UPDATE_DELETE") + .asRuntimeException(); + } + batchBuilder.addStatement(bindUpdateInsertStatement(updateStmt, old, row)); + break; + case DELETE: + batchBuilder.addStatement(bindDeleteStatement(deleteStmt, row)); + break; + default: + throw Status.INTERNAL + .withDescription("Unknown operation: " + op) + .asRuntimeException(); + } + } + } + + @Override + public void sync() { + try { + session.execute(batchBuilder.build()); + batchBuilder.clearStatements(); + } catch (Exception e) { + throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException(); + } + } + + @Override + public void drop() { + session.close(); + } + + private String createInsertStatement(String tableName, TableSchema tableSchema) { + String[] columnNames = tableSchema.getColumnNames(); + String columnNamesString = String.join(", ", columnNames); + String placeholdersString = String.join(", ", Collections.nCopies(columnNames.length, "?")); + return String.format( + "INSERT INTO %s (%s) VALUES (%s)", + tableName, columnNamesString, placeholdersString); + } + + private String createUpdateStatement(String tableName, TableSchema tableSchema) { + List primaryKeys = tableSchema.getPrimaryKeys(); + String setClause = // cassandra does not allow SET on primary keys + nonKeyColumns.stream() + .map(columnName -> columnName + " = ?") + .collect(Collectors.joining(", ")); + String whereClause = + primaryKeys.stream() + .map(columnName -> columnName + " = ?") + .collect(Collectors.joining(" AND ")); + return String.format("UPDATE %s SET %s WHERE %s", tableName, setClause, whereClause); + } + + private static String createDeleteStatement(String tableName, TableSchema tableSchema) { + List primaryKeys = tableSchema.getPrimaryKeys(); + String whereClause = + primaryKeys.stream() + .map(columnName -> columnName + " = ?") + .collect(Collectors.joining(" AND ")); + return String.format("DELETE FROM %s WHERE %s", tableName, whereClause); + } + + private BoundStatement bindInsertStatement(PreparedStatement stmt, SinkRow row) { + return stmt.bind(IntStream.range(0, row.size()).mapToObj(row::get).toArray()); + } + + private BoundStatement bindDeleteStatement(PreparedStatement stmt, SinkRow row) { + return stmt.bind( + getTableSchema().getPrimaryKeys().stream() + .map(key -> getTableSchema().getFromRow(key, row)) + .toArray()); + } + + private BoundStatement bindUpdateInsertStatement( + PreparedStatement stmt, SinkRow updateRow, SinkRow insertRow) { + TableSchema schema = getTableSchema(); + int numKeys = schema.getPrimaryKeys().size(); + int numNonKeys = updateRow.size() - numKeys; + Object[] values = new Object[numNonKeys + numKeys]; + + // bind "SET" clause + Iterator nonKeyIter = nonKeyColumns.iterator(); + for (int i = 0; i < numNonKeys; i++) { + values[i] = schema.getFromRow(nonKeyIter.next(), insertRow); + } + + // bind "WHERE" clause + Iterator keyIter = schema.getPrimaryKeys().iterator(); + for (int i = 0; i < numKeys; i++) { + values[numNonKeys + i] = schema.getFromRow(keyIter.next(), updateRow); + } + return stmt.bind(values); + } +} diff --git a/java/connector-node/risingwave-sink-es-7/pom.xml b/java/connector-node/risingwave-sink-es-7/pom.xml index dfeef31e83b90..04694ed98bb04 100644 --- a/java/connector-node/risingwave-sink-es-7/pom.xml +++ b/java/connector-node/risingwave-sink-es-7/pom.xml @@ -9,7 +9,6 @@ 4.0.0 - risingwave-sink-es-7 1.0-SNAPSHOT risingwave-sink-es-7 diff --git a/java/pom.xml b/java/pom.xml index 401db2aed123a..e7c790e37a79f 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -18,6 +18,7 @@ connector-node/risingwave-sink-iceberg connector-node/risingwave-sink-deltalake connector-node/risingwave-sink-es-7 + connector-node/risingwave-sink-cassandra connector-node/risingwave-sink-jdbc connector-node/risingwave-source-cdc connector-node/risingwave-connector-test @@ -45,6 +46,7 @@ 3.3.1 3.3.3 7.17.10 + 4.15.0 @@ -225,6 +227,11 @@ risingwave-sink-es-7 ${module.version} + + com.risingwave.java + risingwave-sink-cassandra + ${module.version} + com.risingwave.java risingwave-sink-jdbc diff --git a/risedev.yml b/risedev.yml index 6bcaef69be027..50e4dcefaa433 100644 --- a/risedev.yml +++ b/risedev.yml @@ -27,7 +27,7 @@ profile: # bucket: test-bucket # If you want to create CDC source table, uncomment the following line - # - use: connector-node + - use: connector-node # if you want to enable etcd backend, uncomment the following lines. # - use: etcd diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index dfffb2c87b0ce..633f34d09c9eb 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -48,7 +48,7 @@ use crate::sink::{ }; use crate::ConnectorParams; -pub const VALID_REMOTE_SINKS: [&str; 4] = ["jdbc", "iceberg", "deltalake", "elasticsearch-7"]; +pub const VALID_REMOTE_SINKS: [&str; 5] = ["jdbc", "iceberg", "deltalake", "elasticsearch-7","cassandra"]; pub fn is_valid_remote_sink(connector_type: &str) -> bool { VALID_REMOTE_SINKS.contains(&connector_type) From 6117f25657a1e30e2f34c8cae62886bdf354f9a9 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 24 Aug 2023 17:31:41 +0800 Subject: [PATCH 2/8] support cassandra sink --- integration_tests/cassandra-sink/README.md | 35 +++++ .../cassandra-sql/cassandra-query.sql | 1 + .../cassandra-sql/create_cassandra_table.sql | 7 + .../cassandra-sql/run-sql-file.sh | 3 + .../cassandra-sink/create_mv.sql | 7 + .../cassandra-sink/create_sink.sql | 11 ++ .../cassandra-sink/create_source.sql | 21 +++ .../cassandra-sink/docker-compose.yml | 71 ++++++++++ .../risingwave/connector/CassandraConfig.java | 12 +- .../connector/CassandraFactory.java | 33 ++++- .../risingwave/connector/CassandraSink.java | 74 +++++++---- .../risingwave/connector/CassandraUtil.java | 121 ++++++++++++++++++ src/connector/src/sink/remote.rs | 8 +- 13 files changed, 369 insertions(+), 35 deletions(-) create mode 100644 integration_tests/cassandra-sink/README.md create mode 100644 integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql create mode 100644 integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql create mode 100644 integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh create mode 100644 integration_tests/cassandra-sink/create_mv.sql create mode 100644 integration_tests/cassandra-sink/create_sink.sql create mode 100644 integration_tests/cassandra-sink/create_source.sql create mode 100644 integration_tests/cassandra-sink/docker-compose.yml create mode 100644 java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java diff --git a/integration_tests/cassandra-sink/README.md b/integration_tests/cassandra-sink/README.md new file mode 100644 index 0000000000000..6a76914bcbf30 --- /dev/null +++ b/integration_tests/cassandra-sink/README.md @@ -0,0 +1,35 @@ +# Demo: Sinking to Cassandra + +In this demo, we want to showcase how RisingWave is able to sink data to Cassandra. + +1. Launch the cluster: + +```sh +docker compose up -d +``` + +The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a clichouse for sink. + + +2. Create the Cassandra table: + +```sh +docker compose exec Cassandra bash /var/lib/cassandra-sink/run-sql-file.sh create_cassandra_table +``` + +3. Execute the SQL queries in sequence: + +- create_source.sql +- create_mv.sql +- create_sink.sql + +4. Execute a simple query: + +```sh +docker compose exec Cassandra bash /var/lib/cassandra-sink/run-sql-file.sh cassandra_query + +``` + +```sql +select user_id, count(*) from default.demo_test group by user_id +``` diff --git a/integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql b/integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql new file mode 100644 index 0000000000000..004d9fdd1c00e --- /dev/null +++ b/integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql @@ -0,0 +1 @@ +select user_id, count(*) from my_keyspace.demo_test group by user_id diff --git a/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql b/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql new file mode 100644 index 0000000000000..058fd57c80a63 --- /dev/null +++ b/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql @@ -0,0 +1,7 @@ +CREATE KEYSPACE my_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; +use my_keyspace; +CREATE table demo_test( + user_id text primary key, + target_id text, + event_timestamp timestamp, +); \ No newline at end of file diff --git a/integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh b/integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh new file mode 100644 index 0000000000000..1061ecd65bfa0 --- /dev/null +++ b/integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh @@ -0,0 +1,3 @@ +set -ex + +cqlsh < /var/lib/cassandra-sink/$1.sql \ No newline at end of file diff --git a/integration_tests/cassandra-sink/create_mv.sql b/integration_tests/cassandra-sink/create_mv.sql new file mode 100644 index 0000000000000..0a803f8a2762d --- /dev/null +++ b/integration_tests/cassandra-sink/create_mv.sql @@ -0,0 +1,7 @@ +CREATE MATERIALIZED VIEW bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp +FROM + user_behaviors; \ No newline at end of file diff --git a/integration_tests/cassandra-sink/create_sink.sql b/integration_tests/cassandra-sink/create_sink.sql new file mode 100644 index 0000000000000..8fbc7c7e63905 --- /dev/null +++ b/integration_tests/cassandra-sink/create_sink.sql @@ -0,0 +1,11 @@ +CREATE SINK bhv_iceberg_sink +FROM + bhv_mv WITH ( + connector = 'cassandra', + type = 'append-only', + force_append_only='true', + cassandra.url = 'http://127.0.0.1:9042', + cassandra.keyspace = 'mykeyspace', + cassandra.table = 'demo_test', + cassandra.datacenter = 'datacenter1', +) \ No newline at end of file diff --git a/integration_tests/cassandra-sink/create_source.sql b/integration_tests/cassandra-sink/create_source.sql new file mode 100644 index 0000000000000..e704eb785ec06 --- /dev/null +++ b/integration_tests/cassandra-sink/create_source.sql @@ -0,0 +1,21 @@ +CREATE table user_behaviors ( + user_id VARCHAR, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMP, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +) WITH ( + connector = 'datagen', + fields.seq_id.kind = 'sequence', + fields.seq_id.start = '1', + fields.seq_id.end = '10000000', + fields.user_id.kind = 'random', + fields.user_id.min = '1', + fields.user_id.max = '10000000', + fields.user_name.kind = 'random', + fields.user_name.length = '10', + datagen.rows.per.second = '20000' +) FORMAT PLAIN ENCODE JSON; \ No newline at end of file diff --git a/integration_tests/cassandra-sink/docker-compose.yml b/integration_tests/cassandra-sink/docker-compose.yml new file mode 100644 index 0000000000000..db9b761502c3e --- /dev/null +++ b/integration_tests/cassandra-sink/docker-compose.yml @@ -0,0 +1,71 @@ +--- +version: "3" +services: + services: + cassandra: + image: cassandra:4.0 + ports: + - 9042:9042 + volumes: + - ~/cassandra-sink:/var/lib/cassandra-sink + environment: + - CASSANDRA_CLUSTER_NAME=cloudinfra + compactor-0: + extends: + file: ../../docker/docker-compose.yml + service: compactor-0 + compute-node-0: + extends: + file: ../../docker/docker-compose.yml + service: compute-node-0 + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + frontend-node-0: + extends: + file: ../../docker/docker-compose.yml + service: frontend-node-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + meta-node-0: + extends: + file: ../../docker/docker-compose.yml + service: meta-node-0 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 + message_queue: + extends: + file: ../../docker/docker-compose.yml + service: message_queue + datagen: + build: ../datagen + depends_on: [message_queue] + command: + - /bin/sh + - -c + - /datagen --mode clickstream --qps 2 kafka --brokers message_queue:29092 + restart: always + container_name: datagen +volumes: + compute-node-0: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java index 5b946d43a8015..a993a8988e1ee 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java @@ -35,18 +35,18 @@ public class CassandraConfig extends CommonSinkConfig { /** Required */ private String datacenter; - @JsonProperty(value = "username") + @JsonProperty(value = "cassandra.username") private String username; - @JsonProperty(value = "password") + @JsonProperty(value = "cassandra.password") private String password; @JsonCreator public CassandraConfig( - @JsonProperty(value = "url") String url, - @JsonProperty(value = "keyspace") String keyspace, - @JsonProperty(value = "table") String table, - @JsonProperty(value = "datacenter") String datacenter, + @JsonProperty(value = "cassandra.url") String url, + @JsonProperty(value = "cassandra.keyspace") String keyspace, + @JsonProperty(value = "cassandra.table") String table, + @JsonProperty(value = "cassandra.datacenter") String datacenter, @JsonProperty(value = "type") String type) { this.url = url; this.keyspace = keyspace; diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java index ca52c3e40cd59..467ff41a8941a 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java @@ -5,6 +5,7 @@ import com.datastax.oss.driver.api.core.cql.*; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.risingwave.connector.api.ColumnDesc; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkFactory; import com.risingwave.connector.api.sink.SinkWriter; @@ -12,6 +13,8 @@ import com.risingwave.proto.Catalog.SinkType; import io.grpc.Status; import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,9 +42,6 @@ public void validate( throw new IllegalArgumentException( "Invalid cassandraURL: expected `host:port`, got " + url); } - for (String s : hostPort) { - System.out.println(s); - } // 2. check connection CqlSessionBuilder sessionBuilder = CqlSession.builder() @@ -54,6 +54,19 @@ public void validate( sessionBuilder.withAuthCredentials(config.getUsername(), config.getPassword()); } CqlSession session = sessionBuilder.build(); + + String cql = + String.format( + "SELECT column_name , type FROM system_schema.columns WHERE keyspace_name = '%s' AND table_name = '%s';", + config.getKeyspace(), config.getTable()); + + HashMap cassandraColumnDescMap = new HashMap<>(); + for (Row i : session.execute(cql)) { + cassandraColumnDescMap.put(i.getString(0), i.getString(1)); + } + List columnDescs = tableSchema.getColumnDescs(); + CassandraUtil.checkSchema(columnDescs, cassandraColumnDescMap); + if (session.isClosed()) { throw Status.INVALID_ARGUMENT .withDescription("Cannot connect to " + config.getUrl()) @@ -61,5 +74,19 @@ public void validate( } // 3. close client session.close(); + switch (sinkType) { + case UPSERT: + if (tableSchema.getPrimaryKeys().isEmpty()) { + throw Status.INVALID_ARGUMENT + .withDescription("please define primary key for upsert cassandra sink") + .asRuntimeException(); + } + break; + case APPEND_ONLY: + case FORCE_APPEND_ONLY: + break; + default: + throw Status.INTERNAL.asRuntimeException(); + } } } diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java index 2034a3a95d287..fb33debccc782 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java @@ -21,6 +21,7 @@ import com.risingwave.connector.api.sink.SinkRow; import com.risingwave.connector.api.sink.SinkWriterBase; import com.risingwave.proto.Data; +import com.risingwave.proto.Data.DataType.TypeName; import io.grpc.Status; import java.net.InetSocketAddress; import java.util.*; @@ -33,13 +34,10 @@ public class CassandraSink extends SinkWriterBase { private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class); private final CqlSession session; private final List updateRowCache = new ArrayList<>(1); - private final PreparedStatement insertStmt; - private final PreparedStatement updateStmt; - private final PreparedStatement deleteStmt; + private final HashMap stmtMap; private final List nonKeyColumns; private final BatchStatementBuilder batchBuilder; private final CassandraConfig config; - private final List primaryKeyIndexes; public CassandraSink(TableSchema tableSchema, CassandraConfig config) { super(tableSchema); @@ -68,12 +66,6 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) { } this.config = config; - - primaryKeyIndexes = new ArrayList(); - for (String primaryKey : tableSchema.getPrimaryKeys()) { - primaryKeyIndexes.add(tableSchema.getColumnIndex(primaryKey)); - } - this.batchBuilder = BatchStatement.builder(DefaultBatchType.LOGGED).setKeyspace(config.getKeyspace()); @@ -84,19 +76,27 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) { .filter(c -> !tableSchema.getPrimaryKeys().contains(c)) .collect(Collectors.toList()); + this.stmtMap = new HashMap<>(); // prepare statement for insert - this.insertStmt = session.prepare(createInsertStatement(config.getTable(), tableSchema)); + this.stmtMap.put( + "insert", session.prepare(createInsertStatement(config.getTable(), tableSchema))); + if (config.getType().equals("upsert")) { + // prepare statement for update-insert/update-delete + this.stmtMap.put( + "update", + session.prepare(createUpdateStatement(config.getTable(), tableSchema))); - // prepare statement for update-insert/update-delete - this.updateStmt = session.prepare(createUpdateStatement(config.getTable(), tableSchema)); - - // prepare the delete statement - this.deleteStmt = session.prepare(createDeleteStatement(config.getTable(), tableSchema)); + // prepare the delete statement + this.stmtMap.put( + "delete", + session.prepare(createDeleteStatement(config.getTable(), tableSchema))); + } } @Override public void write(Iterator rows) { - if (this.config.getType() == "append-only") { + System.out.println(this.config.getType()); + if (this.config.getType().equals("append-only")) { write_apend_only(rows); } else { write_upsert(rows); @@ -109,7 +109,7 @@ private void write_apend_only(Iterator rows) { Data.Op op = row.getOp(); switch (op) { case INSERT: - batchBuilder.addStatement(bindInsertStatement(insertStmt, row)); + batchBuilder.addStatement(bindInsertStatement(this.stmtMap.get("insert"), row)); break; case UPDATE_DELETE: break; @@ -131,7 +131,7 @@ private void write_upsert(Iterator rows) { Data.Op op = row.getOp(); switch (op) { case INSERT: - batchBuilder.addStatement(bindInsertStatement(insertStmt, row)); + batchBuilder.addStatement(bindInsertStatement(this.stmtMap.get("insert"), row)); break; case UPDATE_DELETE: updateRowCache.clear(); @@ -144,10 +144,11 @@ private void write_upsert(Iterator rows) { .withDescription("UPDATE_INSERT without UPDATE_DELETE") .asRuntimeException(); } - batchBuilder.addStatement(bindUpdateInsertStatement(updateStmt, old, row)); + batchBuilder.addStatement( + bindUpdateInsertStatement(this.stmtMap.get("update"), old, row)); break; case DELETE: - batchBuilder.addStatement(bindDeleteStatement(deleteStmt, row)); + batchBuilder.addStatement(bindDeleteStatement(this.stmtMap.get("delete"), row)); break; default: throw Status.INTERNAL @@ -204,13 +205,29 @@ private static String createDeleteStatement(String tableName, TableSchema tableS } private BoundStatement bindInsertStatement(PreparedStatement stmt, SinkRow row) { - return stmt.bind(IntStream.range(0, row.size()).mapToObj(row::get).toArray()); + TableSchema schema = getTableSchema(); + return stmt.bind( + IntStream.range(0, row.size()) + .mapToObj( + (index) -> + CassandraUtil.convertRow( + row.get(index), + schema.getColumnDescs() + .get(index) + .getDataType() + .getTypeName())) + .toArray()); } private BoundStatement bindDeleteStatement(PreparedStatement stmt, SinkRow row) { + TableSchema schema = getTableSchema(); + Map columnDescs = schema.getColumnTypes(); return stmt.bind( getTableSchema().getPrimaryKeys().stream() - .map(key -> getTableSchema().getFromRow(key, row)) + .map( + key -> + CassandraUtil.convertRow( + schema.getFromRow(key, row), columnDescs.get(key))) .toArray()); } @@ -219,18 +236,25 @@ private BoundStatement bindUpdateInsertStatement( TableSchema schema = getTableSchema(); int numKeys = schema.getPrimaryKeys().size(); int numNonKeys = updateRow.size() - numKeys; + Map columnDescs = schema.getColumnTypes(); Object[] values = new Object[numNonKeys + numKeys]; // bind "SET" clause Iterator nonKeyIter = nonKeyColumns.iterator(); for (int i = 0; i < numNonKeys; i++) { - values[i] = schema.getFromRow(nonKeyIter.next(), insertRow); + String name = nonKeyIter.next(); + values[i] = + CassandraUtil.convertRow( + schema.getFromRow(name, insertRow), columnDescs.get(name)); } // bind "WHERE" clause Iterator keyIter = schema.getPrimaryKeys().iterator(); for (int i = 0; i < numKeys; i++) { - values[numNonKeys + i] = schema.getFromRow(keyIter.next(), updateRow); + String name = keyIter.next(); + values[numNonKeys + i] = + CassandraUtil.convertRow( + schema.getFromRow(name, updateRow), columnDescs.get(name)); } return stmt.bind(values); } diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java new file mode 100644 index 0000000000000..65f47855cead9 --- /dev/null +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java @@ -0,0 +1,121 @@ +package com.risingwave.connector; + +import com.datastax.oss.driver.api.core.data.CqlDuration; +import com.risingwave.connector.api.ColumnDesc; +import com.risingwave.proto.Data.DataType; +import com.risingwave.proto.Data.DataType.TypeName; +import io.grpc.Status; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.List; +import java.util.Map; + +public class CassandraUtil { + private static String getCorrespondingCassandraType(DataType dataType) { + switch (dataType.getTypeName()) { + case INT16: + return "smallint"; + case INT32: + return "int"; + case INT64: + return "bigint"; + case FLOAT: + return "float"; + case DOUBLE: + return "double"; + case BOOLEAN: + return "boolean"; + case VARCHAR: + return "text"; + case DECIMAL: + return "decimal"; + case TIMESTAMP: + return "timestamp"; + case TIMESTAMPTZ: + return "timestamp"; + case DATE: + return "date"; + case TIME: + return "time"; + case BYTEA: + return "blob"; + case LIST: + case STRUCT: + throw Status.UNIMPLEMENTED + .withDescription(String.format("not support %s now", dataType)) + .asRuntimeException(); + case INTERVAL: + return "duration"; + default: + throw Status.INVALID_ARGUMENT + .withDescription("unspecified type" + dataType) + .asRuntimeException(); + } + } + + public static void checkSchema( + List columnDescs, Map cassandraColumnDescMap) { + if (columnDescs.size() != cassandraColumnDescMap.size()) { + throw Status.FAILED_PRECONDITION + .withDescription("Don't match in the number of columns in the table") + .asRuntimeException(); + } + for (ColumnDesc columnDesc : columnDescs) { + if (!cassandraColumnDescMap.containsKey(columnDesc.getName())) { + throw Status.FAILED_PRECONDITION + .withDescription( + String.format( + "Don't match in the name, rw is %s cassandra can't find it", + columnDesc.getName())) + .asRuntimeException(); + } + if (!cassandraColumnDescMap + .get(columnDesc.getName()) + .equals(getCorrespondingCassandraType(columnDesc.getDataType()))) { + throw Status.FAILED_PRECONDITION + .withDescription( + String.format( + "Don't match in the type, name is %s, cassandra is %s, rw is %s", + columnDesc.getName(), + cassandraColumnDescMap.get(columnDesc.getName()), + getCorrespondingCassandraType(columnDesc.getDataType()))) + .asRuntimeException(); + } + } + } + + public static Object convertRow(Object value, TypeName typeName) { + switch (typeName) { + case INT16: + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + case BOOLEAN: + case VARCHAR: + case DECIMAL: + return value; + case TIMESTAMP: + case TIMESTAMPTZ: + return ((Timestamp) value).toInstant(); + case DATE: + return ((Date) value).toLocalDate(); + case TIME: + return ((Time) value).toLocalTime(); + case INTERVAL: + return CqlDuration.from((String) value); + case BYTEA: + return ByteBuffer.wrap((byte[]) value); + case STRUCT: + throw Status.UNIMPLEMENTED + .withDescription(String.format("not support %s now", typeName)) + .asRuntimeException(); + default: + throw Status.INVALID_ARGUMENT + .withDescription("unspecified type" + typeName) + .asRuntimeException(); + } + } +} diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 633f34d09c9eb..e211f51bec42f 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -48,7 +48,13 @@ use crate::sink::{ }; use crate::ConnectorParams; -pub const VALID_REMOTE_SINKS: [&str; 5] = ["jdbc", "iceberg", "deltalake", "elasticsearch-7","cassandra"]; +pub const VALID_REMOTE_SINKS: [&str; 5] = [ + "jdbc", + "iceberg", + "deltalake", + "elasticsearch-7", + "cassandra", +]; pub fn is_valid_remote_sink(connector_type: &str) -> bool { VALID_REMOTE_SINKS.contains(&connector_type) From e329893e913d40c7529e5427a7fc78ad2aba5239 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 24 Aug 2023 18:39:41 +0800 Subject: [PATCH 3/8] add cassandra sink fmt fmt --- integration_tests/cassandra-sink/README.md | 6 +++--- .../cassandra-sink/cassandra-sql/run-sql-file.sh | 2 +- integration_tests/cassandra-sink/create_sink.sql | 2 +- integration_tests/cassandra-sink/docker-compose.yml | 7 +++++-- risedev.yml | 2 +- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/integration_tests/cassandra-sink/README.md b/integration_tests/cassandra-sink/README.md index 6a76914bcbf30..d49ce9545fac9 100644 --- a/integration_tests/cassandra-sink/README.md +++ b/integration_tests/cassandra-sink/README.md @@ -8,13 +8,13 @@ In this demo, we want to showcase how RisingWave is able to sink data to Cassand docker compose up -d ``` -The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a clichouse for sink. +The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Cassandra for sink. 2. Create the Cassandra table: ```sh -docker compose exec Cassandra bash /var/lib/cassandra-sink/run-sql-file.sh create_cassandra_table +docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh create_cassandra_table ``` 3. Execute the SQL queries in sequence: @@ -26,7 +26,7 @@ docker compose exec Cassandra bash /var/lib/cassandra-sink/run-sql-file.sh creat 4. Execute a simple query: ```sh -docker compose exec Cassandra bash /var/lib/cassandra-sink/run-sql-file.sh cassandra_query +docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh cassandra_query ``` diff --git a/integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh b/integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh index 1061ecd65bfa0..053f94398559d 100644 --- a/integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh +++ b/integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh @@ -1,3 +1,3 @@ set -ex -cqlsh < /var/lib/cassandra-sink/$1.sql \ No newline at end of file +cqlsh < /opt/cassandra/cassandra-sql/$1.sql \ No newline at end of file diff --git a/integration_tests/cassandra-sink/create_sink.sql b/integration_tests/cassandra-sink/create_sink.sql index 8fbc7c7e63905..56fab31689efd 100644 --- a/integration_tests/cassandra-sink/create_sink.sql +++ b/integration_tests/cassandra-sink/create_sink.sql @@ -8,4 +8,4 @@ FROM cassandra.keyspace = 'mykeyspace', cassandra.table = 'demo_test', cassandra.datacenter = 'datacenter1', -) \ No newline at end of file +); \ No newline at end of file diff --git a/integration_tests/cassandra-sink/docker-compose.yml b/integration_tests/cassandra-sink/docker-compose.yml index db9b761502c3e..6c4bb61c013af 100644 --- a/integration_tests/cassandra-sink/docker-compose.yml +++ b/integration_tests/cassandra-sink/docker-compose.yml @@ -1,13 +1,12 @@ --- version: "3" services: - services: cassandra: image: cassandra:4.0 ports: - 9042:9042 volumes: - - ~/cassandra-sink:/var/lib/cassandra-sink + - ./cassandra-sql:/opt/cassandra/cassandra-sql environment: - CASSANDRA_CLUSTER_NAME=cloudinfra compactor-0: @@ -34,6 +33,10 @@ services: extends: file: ../../docker/docker-compose.yml service: meta-node-0 + connector-node: + extends: + file: ../../docker/docker-compose.yml + service: connector-node minio-0: extends: file: ../../docker/docker-compose.yml diff --git a/risedev.yml b/risedev.yml index 50e4dcefaa433..6bcaef69be027 100644 --- a/risedev.yml +++ b/risedev.yml @@ -27,7 +27,7 @@ profile: # bucket: test-bucket # If you want to create CDC source table, uncomment the following line - - use: connector-node + # - use: connector-node # if you want to enable etcd backend, uncomment the following lines. # - use: etcd From 1f6eaa52581a27db895aa6648fd4187e8bfe8d5c Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 24 Aug 2023 18:57:34 +0800 Subject: [PATCH 4/8] fmt fmt --- .../connector/CassandraFactory.java | 16 ++++++++++ .../risingwave/connector/CassandraSink.java | 32 ++++++++++--------- .../risingwave/connector/CassandraUtil.java | 16 ++++++++++ src/connector/src/sink/remote.rs | 9 ++++-- 4 files changed, 56 insertions(+), 17 deletions(-) diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java index 467ff41a8941a..6c06c16c3bf4f 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java @@ -1,3 +1,19 @@ +/* + * Copyright 2023 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.risingwave.connector; import com.datastax.oss.driver.api.core.CqlSession; diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java index fb33debccc782..49141514d1a2c 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java @@ -1,16 +1,18 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* + * Copyright 2023 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.risingwave.connector; @@ -97,13 +99,13 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) { public void write(Iterator rows) { System.out.println(this.config.getType()); if (this.config.getType().equals("append-only")) { - write_apend_only(rows); + write_append_only(rows); } else { write_upsert(rows); } } - private void write_apend_only(Iterator rows) { + private void write_append_only(Iterator rows) { while (rows.hasNext()) { SinkRow row = rows.next(); Data.Op op = row.getOp(); diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java index 65f47855cead9..ead056bfdba22 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java @@ -1,3 +1,19 @@ +/* + * Copyright 2023 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.risingwave.connector; import com.datastax.oss.driver.api.core.data.CqlDuration; diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index cfea73b9f4f63..14537b229ee63 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -49,8 +49,13 @@ use crate::sink::{ }; use crate::ConnectorParams; -pub const VALID_REMOTE_SINKS: [&str; 5] = - ["jdbc", REMOTE_ICEBERG_SINK, "deltalake", "elasticsearch-7", "cassandra"]; +pub const VALID_REMOTE_SINKS: [&str; 5] = [ + "jdbc", + REMOTE_ICEBERG_SINK, + "deltalake", + "elasticsearch-7", + "cassandra", +]; pub fn is_valid_remote_sink(connector_type: &str) -> bool { VALID_REMOTE_SINKS.contains(&connector_type) From f88ead4e8344fe51cf0b64113123fe0eae038fdb Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 25 Aug 2023 15:14:00 +0800 Subject: [PATCH 5/8] fmt fmt fmt fmt fmt fmt fmt --- integration_tests/cassandra-sink/README.md | 2 +- .../{cassandra-query.sql => cassandra_query.sql} | 2 +- .../cassandra-sql/create_cassandra_table.sql | 2 +- integration_tests/cassandra-sink/create_sink.sql | 4 ++-- integration_tests/cassandra-sink/create_source.sql | 13 +++++-------- .../com/risingwave/connector/CassandraSink.java | 3 +-- .../com/risingwave/connector/CassandraUtil.java | 1 + 7 files changed, 12 insertions(+), 15 deletions(-) rename integration_tests/cassandra-sink/cassandra-sql/{cassandra-query.sql => cassandra_query.sql} (91%) diff --git a/integration_tests/cassandra-sink/README.md b/integration_tests/cassandra-sink/README.md index d49ce9545fac9..ac957bc5f7dde 100644 --- a/integration_tests/cassandra-sink/README.md +++ b/integration_tests/cassandra-sink/README.md @@ -31,5 +31,5 @@ docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh ``` ```sql -select user_id, count(*) from default.demo_test group by user_id +select user_id, count(*) from my_keyspace.demo_test group by user_id ``` diff --git a/integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql b/integration_tests/cassandra-sink/cassandra-sql/cassandra_query.sql similarity index 91% rename from integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql rename to integration_tests/cassandra-sink/cassandra-sql/cassandra_query.sql index 004d9fdd1c00e..a18cc472173ab 100644 --- a/integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql +++ b/integration_tests/cassandra-sink/cassandra-sql/cassandra_query.sql @@ -1 +1 @@ -select user_id, count(*) from my_keyspace.demo_test group by user_id +select user_id, count(*) from my_keyspace.demo_test group by user_id; diff --git a/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql b/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql index 058fd57c80a63..d858e7c47ae69 100644 --- a/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql +++ b/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql @@ -1,7 +1,7 @@ CREATE KEYSPACE my_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; use my_keyspace; CREATE table demo_test( - user_id text primary key, + user_id int primary key, target_id text, event_timestamp timestamp, ); \ No newline at end of file diff --git a/integration_tests/cassandra-sink/create_sink.sql b/integration_tests/cassandra-sink/create_sink.sql index 56fab31689efd..0105775301205 100644 --- a/integration_tests/cassandra-sink/create_sink.sql +++ b/integration_tests/cassandra-sink/create_sink.sql @@ -1,10 +1,10 @@ -CREATE SINK bhv_iceberg_sink +CREATE SINK bhv_cassandra_sink FROM bhv_mv WITH ( connector = 'cassandra', type = 'append-only', force_append_only='true', - cassandra.url = 'http://127.0.0.1:9042', + cassandra.url = 'cassandra:9042', cassandra.keyspace = 'mykeyspace', cassandra.table = 'demo_test', cassandra.datacenter = 'datacenter1', diff --git a/integration_tests/cassandra-sink/create_source.sql b/integration_tests/cassandra-sink/create_source.sql index e704eb785ec06..acb48fecf4e54 100644 --- a/integration_tests/cassandra-sink/create_source.sql +++ b/integration_tests/cassandra-sink/create_source.sql @@ -1,5 +1,5 @@ CREATE table user_behaviors ( - user_id VARCHAR, + user_id int, target_id VARCHAR, target_type VARCHAR, event_timestamp TIMESTAMP, @@ -9,13 +9,10 @@ CREATE table user_behaviors ( PRIMARY KEY(user_id) ) WITH ( connector = 'datagen', - fields.seq_id.kind = 'sequence', - fields.seq_id.start = '1', - fields.seq_id.end = '10000000', - fields.user_id.kind = 'random', - fields.user_id.min = '1', - fields.user_id.max = '10000000', + fields.user_id.kind = 'sequence', + fields.user_id.start = '1', + fields.user_id.end = '1000', fields.user_name.kind = 'random', fields.user_name.length = '10', - datagen.rows.per.second = '20000' + datagen.rows.per.second = '500' ) FORMAT PLAIN ENCODE JSON; \ No newline at end of file diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java index 49141514d1a2c..40e23d4537c2d 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java @@ -49,7 +49,7 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) { throw new IllegalArgumentException( "Invalid cassandraURL: expected `host:port`, got " + url); } - // 2. check connection + // check connection CqlSessionBuilder sessionBuilder = CqlSession.builder() .addContactPoint( @@ -97,7 +97,6 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) { @Override public void write(Iterator rows) { - System.out.println(this.config.getType()); if (this.config.getType().equals("append-only")) { write_append_only(rows); } else { diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java index ead056bfdba22..f6f68012c25c6 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java @@ -124,6 +124,7 @@ public static Object convertRow(Object value, TypeName typeName) { return CqlDuration.from((String) value); case BYTEA: return ByteBuffer.wrap((byte[]) value); + case LIST: case STRUCT: throw Status.UNIMPLEMENTED .withDescription(String.format("not support %s now", typeName)) From b0d35722668eb13170508f7e9be91e0e16d18760 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 6 Sep 2023 15:14:23 +0800 Subject: [PATCH 6/8] support syclladb --- .../README.md | 15 +++- .../cassandra-sql/cassandra_query.sql | 0 .../cassandra-sql/create_cassandra_table.sql | 0 .../cassandra-sql/run-sql-file-syclladb.sh | 3 + .../cassandra-sql/run-sql-file.sh | 0 .../create_mv.sql | 0 .../create_sink.sql | 2 +- .../create_source.sql | 0 .../docker-compose-cassandra.yml} | 0 .../docker-compose-syclladb.yml | 74 +++++++++++++++++++ .../connector/CassandraFactory.java | 29 +++----- .../risingwave/connector/CassandraSink.java | 3 +- .../risingwave/connector/CassandraUtil.java | 72 ++++++++++++------ risedev.yml | 6 +- 14 files changed, 155 insertions(+), 49 deletions(-) rename integration_tests/{cassandra-sink => cassandra-and-syclladb-sink}/README.md (64%) rename integration_tests/{cassandra-sink => cassandra-and-syclladb-sink}/cassandra-sql/cassandra_query.sql (100%) rename integration_tests/{cassandra-sink => cassandra-and-syclladb-sink}/cassandra-sql/create_cassandra_table.sql (100%) create mode 100644 integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file-syclladb.sh rename integration_tests/{cassandra-sink => cassandra-and-syclladb-sink}/cassandra-sql/run-sql-file.sh (100%) rename integration_tests/{cassandra-sink => cassandra-and-syclladb-sink}/create_mv.sql (100%) rename integration_tests/{cassandra-sink => cassandra-and-syclladb-sink}/create_sink.sql (86%) rename integration_tests/{cassandra-sink => cassandra-and-syclladb-sink}/create_source.sql (100%) rename integration_tests/{cassandra-sink/docker-compose.yml => cassandra-and-syclladb-sink/docker-compose-cassandra.yml} (100%) create mode 100644 integration_tests/cassandra-and-syclladb-sink/docker-compose-syclladb.yml diff --git a/integration_tests/cassandra-sink/README.md b/integration_tests/cassandra-and-syclladb-sink/README.md similarity index 64% rename from integration_tests/cassandra-sink/README.md rename to integration_tests/cassandra-and-syclladb-sink/README.md index ac957bc5f7dde..6c54752755822 100644 --- a/integration_tests/cassandra-sink/README.md +++ b/integration_tests/cassandra-and-syclladb-sink/README.md @@ -5,7 +5,11 @@ In this demo, we want to showcase how RisingWave is able to sink data to Cassand 1. Launch the cluster: ```sh -docker compose up -d +docker-compose -f docker-compose-cassandra.yml up -d +``` +If we use syclladb. +```sh +docker-compose -f docker-compose-syclladb.yml up -d ``` The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Cassandra for sink. @@ -16,6 +20,10 @@ The cluster contains a RisingWave cluster and its necessary dependencies, a data ```sh docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh create_cassandra_table ``` +If we use syclladb. +```sh +docker compose exec syclladb bash /opt/scylladb/scylladb-sql/run-sql-file-syclladb.sh create_cassandra_table +``` 3. Execute the SQL queries in sequence: @@ -27,7 +35,10 @@ docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh ```sh docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh cassandra_query - +``` +If we use syclladb. +```sh +docker compose exec syclladb bash /opt/scylladb/scylladb-sql/run-sql-file-syclladb.sh cassandra_query ``` ```sql diff --git a/integration_tests/cassandra-sink/cassandra-sql/cassandra_query.sql b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/cassandra_query.sql similarity index 100% rename from integration_tests/cassandra-sink/cassandra-sql/cassandra_query.sql rename to integration_tests/cassandra-and-syclladb-sink/cassandra-sql/cassandra_query.sql diff --git a/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/create_cassandra_table.sql similarity index 100% rename from integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql rename to integration_tests/cassandra-and-syclladb-sink/cassandra-sql/create_cassandra_table.sql diff --git a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file-syclladb.sh b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file-syclladb.sh new file mode 100644 index 0000000000000..4941c6adaf16b --- /dev/null +++ b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file-syclladb.sh @@ -0,0 +1,3 @@ +set -ex + +cqlsh < /opt/scylladb/scylladb-sql/$1.sql \ No newline at end of file diff --git a/integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file.sh similarity index 100% rename from integration_tests/cassandra-sink/cassandra-sql/run-sql-file.sh rename to integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file.sh diff --git a/integration_tests/cassandra-sink/create_mv.sql b/integration_tests/cassandra-and-syclladb-sink/create_mv.sql similarity index 100% rename from integration_tests/cassandra-sink/create_mv.sql rename to integration_tests/cassandra-and-syclladb-sink/create_mv.sql diff --git a/integration_tests/cassandra-sink/create_sink.sql b/integration_tests/cassandra-and-syclladb-sink/create_sink.sql similarity index 86% rename from integration_tests/cassandra-sink/create_sink.sql rename to integration_tests/cassandra-and-syclladb-sink/create_sink.sql index 0105775301205..a556b5bcde5f5 100644 --- a/integration_tests/cassandra-sink/create_sink.sql +++ b/integration_tests/cassandra-and-syclladb-sink/create_sink.sql @@ -5,7 +5,7 @@ FROM type = 'append-only', force_append_only='true', cassandra.url = 'cassandra:9042', - cassandra.keyspace = 'mykeyspace', + cassandra.keyspace = 'my_keyspace', cassandra.table = 'demo_test', cassandra.datacenter = 'datacenter1', ); \ No newline at end of file diff --git a/integration_tests/cassandra-sink/create_source.sql b/integration_tests/cassandra-and-syclladb-sink/create_source.sql similarity index 100% rename from integration_tests/cassandra-sink/create_source.sql rename to integration_tests/cassandra-and-syclladb-sink/create_source.sql diff --git a/integration_tests/cassandra-sink/docker-compose.yml b/integration_tests/cassandra-and-syclladb-sink/docker-compose-cassandra.yml similarity index 100% rename from integration_tests/cassandra-sink/docker-compose.yml rename to integration_tests/cassandra-and-syclladb-sink/docker-compose-cassandra.yml diff --git a/integration_tests/cassandra-and-syclladb-sink/docker-compose-syclladb.yml b/integration_tests/cassandra-and-syclladb-sink/docker-compose-syclladb.yml new file mode 100644 index 0000000000000..a133f53d8c384 --- /dev/null +++ b/integration_tests/cassandra-and-syclladb-sink/docker-compose-syclladb.yml @@ -0,0 +1,74 @@ +--- +version: "3" +services: + syclladb: + image: scylladb/scylla:5.1 + ports: + - 9042:9042 + volumes: + - ./scylladb-sql:/opt/scylladb/scylladb-sql + environment: + - CASSANDRA_CLUSTER_NAME=cloudinfra + compactor-0: + extends: + file: ../../docker/docker-compose.yml + service: compactor-0 + compute-node-0: + extends: + file: ../../docker/docker-compose.yml + service: compute-node-0 + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + frontend-node-0: + extends: + file: ../../docker/docker-compose.yml + service: frontend-node-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + meta-node-0: + extends: + file: ../../docker/docker-compose.yml + service: meta-node-0 + connector-node: + extends: + file: ../../docker/docker-compose.yml + service: connector-node + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 + message_queue: + extends: + file: ../../docker/docker-compose.yml + service: message_queue + datagen: + build: ../datagen + depends_on: [message_queue] + command: + - /bin/sh + - -c + - /datagen --mode clickstream --qps 2 kafka --brokers message_queue:29092 + restart: always + container_name: datagen +volumes: + compute-node-0: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java index 6c06c16c3bf4f..f9fb5bee020e2 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java @@ -19,9 +19,9 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.CqlSessionBuilder; import com.datastax.oss.driver.api.core.cql.*; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import com.risingwave.connector.api.ColumnDesc; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkFactory; import com.risingwave.connector.api.sink.SinkWriter; @@ -29,8 +29,6 @@ import com.risingwave.proto.Catalog.SinkType; import io.grpc.Status; import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,17 +69,13 @@ public void validate( } CqlSession session = sessionBuilder.build(); - String cql = - String.format( - "SELECT column_name , type FROM system_schema.columns WHERE keyspace_name = '%s' AND table_name = '%s';", - config.getKeyspace(), config.getTable()); - - HashMap cassandraColumnDescMap = new HashMap<>(); - for (Row i : session.execute(cql)) { - cassandraColumnDescMap.put(i.getString(0), i.getString(1)); - } - List columnDescs = tableSchema.getColumnDescs(); - CassandraUtil.checkSchema(columnDescs, cassandraColumnDescMap); + TableMetadata tableMetadata = + session.getMetadata() + .getKeyspace(config.getKeyspace()) + .get() + .getTable(config.getTable()) + .get(); + CassandraUtil.checkSchema(tableSchema.getColumnDescs(), tableMetadata.getColumns()); if (session.isClosed()) { throw Status.INVALID_ARGUMENT @@ -92,11 +86,8 @@ public void validate( session.close(); switch (sinkType) { case UPSERT: - if (tableSchema.getPrimaryKeys().isEmpty()) { - throw Status.INVALID_ARGUMENT - .withDescription("please define primary key for upsert cassandra sink") - .asRuntimeException(); - } + CassandraUtil.checkPrimaryKey( + tableMetadata.getPrimaryKey(), tableSchema.getPrimaryKeys()); break; case APPEND_ONLY: case FORCE_APPEND_ONLY: diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java index 40e23d4537c2d..af422dd2f5bdb 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java @@ -68,8 +68,7 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) { } this.config = config; - this.batchBuilder = - BatchStatement.builder(DefaultBatchType.LOGGED).setKeyspace(config.getKeyspace()); + this.batchBuilder = BatchStatement.builder(DefaultBatchType.LOGGED); // fetch non-pk columns for prepared statements nonKeyColumns = diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java index f6f68012c25c6..fbe9c1c04a28b 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java @@ -16,7 +16,9 @@ package com.risingwave.connector; +import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.data.CqlDuration; +import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata; import com.risingwave.connector.api.ColumnDesc; import com.risingwave.proto.Data.DataType; import com.risingwave.proto.Data.DataType.TypeName; @@ -27,43 +29,45 @@ import java.sql.Timestamp; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; public class CassandraUtil { - private static String getCorrespondingCassandraType(DataType dataType) { + private static int getCorrespondingCassandraType(DataType dataType) { switch (dataType.getTypeName()) { case INT16: - return "smallint"; + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.SMALLINT; case INT32: - return "int"; + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.INT; case INT64: - return "bigint"; + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.BIGINT; case FLOAT: - return "float"; + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.FLOAT; case DOUBLE: - return "double"; + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.DOUBLE; case BOOLEAN: - return "boolean"; + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.BOOLEAN; case VARCHAR: - return "text"; + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.VARCHAR; case DECIMAL: - return "decimal"; + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.DECIMAL; case TIMESTAMP: - return "timestamp"; + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.TIMESTAMP; case TIMESTAMPTZ: - return "timestamp"; + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.TIMESTAMP; case DATE: - return "date"; + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.DATE; case TIME: - return "time"; + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.TIME; case BYTEA: - return "blob"; + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.BLOB; case LIST: case STRUCT: throw Status.UNIMPLEMENTED .withDescription(String.format("not support %s now", dataType)) .asRuntimeException(); case INTERVAL: - return "duration"; + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.DURATION; default: throw Status.INVALID_ARGUMENT .withDescription("unspecified type" + dataType) @@ -72,14 +76,16 @@ private static String getCorrespondingCassandraType(DataType dataType) { } public static void checkSchema( - List columnDescs, Map cassandraColumnDescMap) { + List columnDescs, + Map cassandraColumnDescMap) { if (columnDescs.size() != cassandraColumnDescMap.size()) { throw Status.FAILED_PRECONDITION .withDescription("Don't match in the number of columns in the table") .asRuntimeException(); } for (ColumnDesc columnDesc : columnDescs) { - if (!cassandraColumnDescMap.containsKey(columnDesc.getName())) { + CqlIdentifier cql = CqlIdentifier.fromInternal(columnDesc.getName()); + if (!cassandraColumnDescMap.containsKey(cql)) { throw Status.FAILED_PRECONDITION .withDescription( String.format( @@ -87,16 +93,38 @@ public static void checkSchema( columnDesc.getName())) .asRuntimeException(); } - if (!cassandraColumnDescMap - .get(columnDesc.getName()) - .equals(getCorrespondingCassandraType(columnDesc.getDataType()))) { + if (cassandraColumnDescMap.get(cql).getType().getProtocolCode() + != getCorrespondingCassandraType(columnDesc.getDataType())) { throw Status.FAILED_PRECONDITION .withDescription( String.format( "Don't match in the type, name is %s, cassandra is %s, rw is %s", columnDesc.getName(), - cassandraColumnDescMap.get(columnDesc.getName()), - getCorrespondingCassandraType(columnDesc.getDataType()))) + cassandraColumnDescMap.get(cql), + columnDesc.getDataType().getTypeName())) + .asRuntimeException(); + } + } + } + + public static void checkPrimaryKey( + List cassandraColumnMetadatas, List columnMetadatas) { + if (cassandraColumnMetadatas.size() != columnMetadatas.size()) { + throw Status.FAILED_PRECONDITION + .withDescription("Primary key len don't match") + .asRuntimeException(); + } + Set cassandraColumnsSet = + cassandraColumnMetadatas.stream() + .map((a) -> a.getName().toString()) + .collect(Collectors.toSet()); + for (String columnMetadata : columnMetadatas) { + if (!cassandraColumnsSet.contains(columnMetadata)) { + throw Status.FAILED_PRECONDITION + .withDescription( + String.format( + "Primary key don't match. RisingWave Primary key is %s, don't find it in cassandra", + columnMetadata)) .asRuntimeException(); } } diff --git a/risedev.yml b/risedev.yml index f17deec0f260e..35c6ae1043b91 100644 --- a/risedev.yml +++ b/risedev.yml @@ -20,14 +20,14 @@ profile: # config-path: src/config/example.toml steps: # If you want to use the local s3 storage, enable the following line - # - use: minio + - use: minio # If you want to use aws-s3, configure AK and SK in env var and enable the following lines: # - use: aws-s3 # bucket: test-bucket # If you want to create CDC source table, uncomment the following line - # - use: connector-node + - use: connector-node # if you want to enable etcd backend, uncomment the following lines. # - use: etcd @@ -43,7 +43,7 @@ profile: - use: frontend # If you want to enable compactor, uncomment the following line, and enable either minio or aws-s3 as well. - # - use: compactor + - use: compactor # If you want to create source from Kafka, uncomment the following lines # Note that kafka depends on zookeeper, so zookeeper must be started beforehand. From 6134de41b3afc2c08be6f59217d51b6f7438b34a Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Wed, 6 Sep 2023 19:06:37 +0800 Subject: [PATCH 7/8] Make the demo more initiative and some fixes --- .../cassandra-and-syclladb-sink/README.md | 53 ++++++++----- .../cassandra-sql/cassandra_query.sql | 1 - .../cassandra-sql/create_cassandra_table.sql | 7 -- .../cassandra-sql/run-sql-file-syclladb.sh | 3 - .../cassandra-sql/run-sql-file.sh | 3 - .../create_sink.sql | 4 +- .../create_source.sql | 2 +- .../docker-compose-syclladb.yml | 74 ------------------- ...mpose-cassandra.yml => docker-compose.yml} | 14 +++- 9 files changed, 52 insertions(+), 109 deletions(-) delete mode 100644 integration_tests/cassandra-and-syclladb-sink/cassandra-sql/cassandra_query.sql delete mode 100644 integration_tests/cassandra-and-syclladb-sink/cassandra-sql/create_cassandra_table.sql delete mode 100644 integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file-syclladb.sh delete mode 100644 integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file.sh delete mode 100644 integration_tests/cassandra-and-syclladb-sink/docker-compose-syclladb.yml rename integration_tests/cassandra-and-syclladb-sink/{docker-compose-cassandra.yml => docker-compose.yml} (85%) diff --git a/integration_tests/cassandra-and-syclladb-sink/README.md b/integration_tests/cassandra-and-syclladb-sink/README.md index 6c54752755822..071fc88d0df46 100644 --- a/integration_tests/cassandra-and-syclladb-sink/README.md +++ b/integration_tests/cassandra-and-syclladb-sink/README.md @@ -2,27 +2,45 @@ In this demo, we want to showcase how RisingWave is able to sink data to Cassandra. -1. Launch the cluster: +1. Set the compose profile accordingly: +Demo with Apache Cassandra: +``` +export COMPOSE_PROFILES=cassandra +``` -```sh -docker-compose -f docker-compose-cassandra.yml up -d +Demo with Scylladb ``` -If we use syclladb. +export COMPOSE_PROFILES=scylladb +``` + +2. Launch the cluster: + ```sh -docker-compose -f docker-compose-syclladb.yml up -d +docker-compose up -d ``` The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Cassandra for sink. -2. Create the Cassandra table: +3. Create the Cassandra table via cqlsh: +Login to cqlsh ```sh -docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh create_cassandra_table +# cqlsh into cassandra +docker compose exec cassandra cqlsh +# cqlsh into scylladb +docker compose exec scylladb cqlsh ``` -If we use syclladb. -```sh -docker compose exec syclladb bash /opt/scylladb/scylladb-sql/run-sql-file-syclladb.sh create_cassandra_table + +Run the following queries to create keyspace and table. +```sql +CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; +use demo; +CREATE table demo_bhv_table( + user_id int primary key, + target_id text, + event_timestamp timestamp, +); ``` 3. Execute the SQL queries in sequence: @@ -31,16 +49,17 @@ docker compose exec syclladb bash /opt/scylladb/scylladb-sql/run-sql-file-syclla - create_mv.sql - create_sink.sql -4. Execute a simple query: +4. Execute a simple query to check the sink results via csqlsh: +Login to cqlsh ```sh -docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh cassandra_query -``` -If we use syclladb. -```sh -docker compose exec syclladb bash /opt/scylladb/scylladb-sql/run-sql-file-syclladb.sh cassandra_query +# cqlsh into cassandra +docker compose exec cassandra cqlsh +# cqlsh into scylladb +docker compose exec scylladb cqlsh ``` +Run the following query ```sql -select user_id, count(*) from my_keyspace.demo_test group by user_id +select user_id, count(*) from my_keyspace.demo_test group by user_id; ``` diff --git a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/cassandra_query.sql b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/cassandra_query.sql deleted file mode 100644 index a18cc472173ab..0000000000000 --- a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/cassandra_query.sql +++ /dev/null @@ -1 +0,0 @@ -select user_id, count(*) from my_keyspace.demo_test group by user_id; diff --git a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/create_cassandra_table.sql b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/create_cassandra_table.sql deleted file mode 100644 index d858e7c47ae69..0000000000000 --- a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/create_cassandra_table.sql +++ /dev/null @@ -1,7 +0,0 @@ -CREATE KEYSPACE my_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; -use my_keyspace; -CREATE table demo_test( - user_id int primary key, - target_id text, - event_timestamp timestamp, -); \ No newline at end of file diff --git a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file-syclladb.sh b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file-syclladb.sh deleted file mode 100644 index 4941c6adaf16b..0000000000000 --- a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file-syclladb.sh +++ /dev/null @@ -1,3 +0,0 @@ -set -ex - -cqlsh < /opt/scylladb/scylladb-sql/$1.sql \ No newline at end of file diff --git a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file.sh b/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file.sh deleted file mode 100644 index 053f94398559d..0000000000000 --- a/integration_tests/cassandra-and-syclladb-sink/cassandra-sql/run-sql-file.sh +++ /dev/null @@ -1,3 +0,0 @@ -set -ex - -cqlsh < /opt/cassandra/cassandra-sql/$1.sql \ No newline at end of file diff --git a/integration_tests/cassandra-and-syclladb-sink/create_sink.sql b/integration_tests/cassandra-and-syclladb-sink/create_sink.sql index a556b5bcde5f5..724e784694c2f 100644 --- a/integration_tests/cassandra-and-syclladb-sink/create_sink.sql +++ b/integration_tests/cassandra-and-syclladb-sink/create_sink.sql @@ -5,7 +5,7 @@ FROM type = 'append-only', force_append_only='true', cassandra.url = 'cassandra:9042', - cassandra.keyspace = 'my_keyspace', - cassandra.table = 'demo_test', + cassandra.keyspace = 'demo', + cassandra.table = 'demo_bhv_table', cassandra.datacenter = 'datacenter1', ); \ No newline at end of file diff --git a/integration_tests/cassandra-and-syclladb-sink/create_source.sql b/integration_tests/cassandra-and-syclladb-sink/create_source.sql index acb48fecf4e54..c28c10f3616da 100644 --- a/integration_tests/cassandra-and-syclladb-sink/create_source.sql +++ b/integration_tests/cassandra-and-syclladb-sink/create_source.sql @@ -14,5 +14,5 @@ CREATE table user_behaviors ( fields.user_id.end = '1000', fields.user_name.kind = 'random', fields.user_name.length = '10', - datagen.rows.per.second = '500' + datagen.rows.per.second = '10' ) FORMAT PLAIN ENCODE JSON; \ No newline at end of file diff --git a/integration_tests/cassandra-and-syclladb-sink/docker-compose-syclladb.yml b/integration_tests/cassandra-and-syclladb-sink/docker-compose-syclladb.yml deleted file mode 100644 index a133f53d8c384..0000000000000 --- a/integration_tests/cassandra-and-syclladb-sink/docker-compose-syclladb.yml +++ /dev/null @@ -1,74 +0,0 @@ ---- -version: "3" -services: - syclladb: - image: scylladb/scylla:5.1 - ports: - - 9042:9042 - volumes: - - ./scylladb-sql:/opt/scylladb/scylladb-sql - environment: - - CASSANDRA_CLUSTER_NAME=cloudinfra - compactor-0: - extends: - file: ../../docker/docker-compose.yml - service: compactor-0 - compute-node-0: - extends: - file: ../../docker/docker-compose.yml - service: compute-node-0 - etcd-0: - extends: - file: ../../docker/docker-compose.yml - service: etcd-0 - frontend-node-0: - extends: - file: ../../docker/docker-compose.yml - service: frontend-node-0 - grafana-0: - extends: - file: ../../docker/docker-compose.yml - service: grafana-0 - meta-node-0: - extends: - file: ../../docker/docker-compose.yml - service: meta-node-0 - connector-node: - extends: - file: ../../docker/docker-compose.yml - service: connector-node - minio-0: - extends: - file: ../../docker/docker-compose.yml - service: minio-0 - prometheus-0: - extends: - file: ../../docker/docker-compose.yml - service: prometheus-0 - message_queue: - extends: - file: ../../docker/docker-compose.yml - service: message_queue - datagen: - build: ../datagen - depends_on: [message_queue] - command: - - /bin/sh - - -c - - /datagen --mode clickstream --qps 2 kafka --brokers message_queue:29092 - restart: always - container_name: datagen -volumes: - compute-node-0: - external: false - etcd-0: - external: false - grafana-0: - external: false - minio-0: - external: false - prometheus-0: - external: false - message_queue: - external: false -name: risingwave-compose diff --git a/integration_tests/cassandra-and-syclladb-sink/docker-compose-cassandra.yml b/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml similarity index 85% rename from integration_tests/cassandra-and-syclladb-sink/docker-compose-cassandra.yml rename to integration_tests/cassandra-and-syclladb-sink/docker-compose.yml index 6c4bb61c013af..07c2f9fe0228e 100644 --- a/integration_tests/cassandra-and-syclladb-sink/docker-compose-cassandra.yml +++ b/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml @@ -6,9 +6,21 @@ services: ports: - 9042:9042 volumes: - - ./cassandra-sql:/opt/cassandra/cassandra-sql + - ./cassandra-sql:/cassandra-sql environment: - CASSANDRA_CLUSTER_NAME=cloudinfra + profiles: + - cassandra + scylladb: + image: scylladb/scylla:5.1 + ports: + - 9042:9042 + volumes: + - ./cassandra-sql:/cassandra-sql + environment: + - CASSANDRA_CLUSTER_NAME=cloudinfra + profiles: + - scylladb compactor-0: extends: file: ../../docker/docker-compose.yml From 4db9182dd4cd10096269063a6a35d9762c791d51 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Wed, 6 Sep 2023 19:08:13 +0800 Subject: [PATCH 8/8] minor fix --- integration_tests/cassandra-and-syclladb-sink/README.md | 2 +- .../cassandra-and-syclladb-sink/docker-compose.yml | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/integration_tests/cassandra-and-syclladb-sink/README.md b/integration_tests/cassandra-and-syclladb-sink/README.md index 071fc88d0df46..b022c9ef09cf8 100644 --- a/integration_tests/cassandra-and-syclladb-sink/README.md +++ b/integration_tests/cassandra-and-syclladb-sink/README.md @@ -1,4 +1,4 @@ -# Demo: Sinking to Cassandra +# Demo: Sinking to Cassandra/Scylladb In this demo, we want to showcase how RisingWave is able to sink data to Cassandra. diff --git a/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml b/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml index 07c2f9fe0228e..27b77f850f882 100644 --- a/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml +++ b/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml @@ -5,8 +5,6 @@ services: image: cassandra:4.0 ports: - 9042:9042 - volumes: - - ./cassandra-sql:/cassandra-sql environment: - CASSANDRA_CLUSTER_NAME=cloudinfra profiles: @@ -15,8 +13,6 @@ services: image: scylladb/scylla:5.1 ports: - 9042:9042 - volumes: - - ./cassandra-sql:/cassandra-sql environment: - CASSANDRA_CLUSTER_NAME=cloudinfra profiles: