From 7d940cfa669db5df3450a4109f270745cc35e7cc Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Thu, 7 Sep 2023 21:22:06 +0800 Subject: [PATCH] feat(sink): Support cassandra sink (#11878) Co-authored-by: Patrick Huang --- .../cassandra-and-syclladb-sink/README.md | 65 +++++ .../cassandra-and-syclladb-sink/create_mv.sql | 7 + .../create_sink.sql | 11 + .../create_source.sql | 18 ++ .../docker-compose.yml | 82 ++++++ java/connector-node/assembly/assembly.xml | 1 + java/connector-node/assembly/pom.xml | 4 + .../risingwave-connector-service/pom.xml | 5 + .../com/risingwave/connector/SinkUtils.java | 2 + .../risingwave-connector-test/pom.xml | 5 + .../risingwave-sink-cassandra/pom.xml | 60 ++++ .../risingwave/connector/CassandraConfig.java | 95 +++++++ .../connector/CassandraFactory.java | 99 +++++++ .../risingwave/connector/CassandraSink.java | 261 ++++++++++++++++++ .../risingwave/connector/CassandraUtil.java | 166 +++++++++++ .../risingwave-sink-es-7/pom.xml | 1 - java/pom.xml | 7 + risedev.yml | 6 +- src/connector/src/sink/remote.rs | 9 +- 19 files changed, 898 insertions(+), 6 deletions(-) create mode 100644 integration_tests/cassandra-and-syclladb-sink/README.md create mode 100644 integration_tests/cassandra-and-syclladb-sink/create_mv.sql create mode 100644 integration_tests/cassandra-and-syclladb-sink/create_sink.sql create mode 100644 integration_tests/cassandra-and-syclladb-sink/create_source.sql create mode 100644 integration_tests/cassandra-and-syclladb-sink/docker-compose.yml create mode 100644 java/connector-node/risingwave-sink-cassandra/pom.xml create mode 100644 java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java create mode 100644 java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java create mode 100644 java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java create mode 100644 java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java diff --git a/integration_tests/cassandra-and-syclladb-sink/README.md b/integration_tests/cassandra-and-syclladb-sink/README.md new file mode 100644 index 0000000000000..b022c9ef09cf8 --- /dev/null +++ b/integration_tests/cassandra-and-syclladb-sink/README.md @@ -0,0 +1,65 @@ +# Demo: Sinking to Cassandra/Scylladb + +In this demo, we want to showcase how RisingWave is able to sink data to Cassandra. + +1. Set the compose profile accordingly: +Demo with Apache Cassandra: +``` +export COMPOSE_PROFILES=cassandra +``` + +Demo with Scylladb +``` +export COMPOSE_PROFILES=scylladb +``` + +2. Launch the cluster: + +```sh +docker-compose up -d +``` + +The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Cassandra for sink. + + +3. Create the Cassandra table via cqlsh: + +Login to cqlsh +```sh +# cqlsh into cassandra +docker compose exec cassandra cqlsh +# cqlsh into scylladb +docker compose exec scylladb cqlsh +``` + +Run the following queries to create keyspace and table. +```sql +CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; +use demo; +CREATE table demo_bhv_table( + user_id int primary key, + target_id text, + event_timestamp timestamp, +); +``` + +3. Execute the SQL queries in sequence: + +- create_source.sql +- create_mv.sql +- create_sink.sql + +4. Execute a simple query to check the sink results via csqlsh: + +Login to cqlsh +```sh +# cqlsh into cassandra +docker compose exec cassandra cqlsh +# cqlsh into scylladb +docker compose exec scylladb cqlsh +``` + +Run the following query +```sql +select user_id, count(*) from my_keyspace.demo_test group by user_id; +``` diff --git a/integration_tests/cassandra-and-syclladb-sink/create_mv.sql b/integration_tests/cassandra-and-syclladb-sink/create_mv.sql new file mode 100644 index 0000000000000..0a803f8a2762d --- /dev/null +++ b/integration_tests/cassandra-and-syclladb-sink/create_mv.sql @@ -0,0 +1,7 @@ +CREATE MATERIALIZED VIEW bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp +FROM + user_behaviors; \ No newline at end of file diff --git a/integration_tests/cassandra-and-syclladb-sink/create_sink.sql b/integration_tests/cassandra-and-syclladb-sink/create_sink.sql new file mode 100644 index 0000000000000..724e784694c2f --- /dev/null +++ b/integration_tests/cassandra-and-syclladb-sink/create_sink.sql @@ -0,0 +1,11 @@ +CREATE SINK bhv_cassandra_sink +FROM + bhv_mv WITH ( + connector = 'cassandra', + type = 'append-only', + force_append_only='true', + cassandra.url = 'cassandra:9042', + cassandra.keyspace = 'demo', + cassandra.table = 'demo_bhv_table', + cassandra.datacenter = 'datacenter1', +); \ No newline at end of file diff --git a/integration_tests/cassandra-and-syclladb-sink/create_source.sql b/integration_tests/cassandra-and-syclladb-sink/create_source.sql new file mode 100644 index 0000000000000..c28c10f3616da --- /dev/null +++ b/integration_tests/cassandra-and-syclladb-sink/create_source.sql @@ -0,0 +1,18 @@ +CREATE table user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMP, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +) WITH ( + connector = 'datagen', + fields.user_id.kind = 'sequence', + fields.user_id.start = '1', + fields.user_id.end = '1000', + fields.user_name.kind = 'random', + fields.user_name.length = '10', + datagen.rows.per.second = '10' +) FORMAT PLAIN ENCODE JSON; \ No newline at end of file diff --git a/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml b/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml new file mode 100644 index 0000000000000..27b77f850f882 --- /dev/null +++ b/integration_tests/cassandra-and-syclladb-sink/docker-compose.yml @@ -0,0 +1,82 @@ +--- +version: "3" +services: + cassandra: + image: cassandra:4.0 + ports: + - 9042:9042 + environment: + - CASSANDRA_CLUSTER_NAME=cloudinfra + profiles: + - cassandra + scylladb: + image: scylladb/scylla:5.1 + ports: + - 9042:9042 + environment: + - CASSANDRA_CLUSTER_NAME=cloudinfra + profiles: + - scylladb + compactor-0: + extends: + file: ../../docker/docker-compose.yml + service: compactor-0 + compute-node-0: + extends: + file: ../../docker/docker-compose.yml + service: compute-node-0 + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + frontend-node-0: + extends: + file: ../../docker/docker-compose.yml + service: frontend-node-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + meta-node-0: + extends: + file: ../../docker/docker-compose.yml + service: meta-node-0 + connector-node: + extends: + file: ../../docker/docker-compose.yml + service: connector-node + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 + message_queue: + extends: + file: ../../docker/docker-compose.yml + service: message_queue + datagen: + build: ../datagen + depends_on: [message_queue] + command: + - /bin/sh + - -c + - /datagen --mode clickstream --qps 2 kafka --brokers message_queue:29092 + restart: always + container_name: datagen +volumes: + compute-node-0: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose diff --git a/java/connector-node/assembly/assembly.xml b/java/connector-node/assembly/assembly.xml index 327c7628dbb59..751ba2f410e0b 100644 --- a/java/connector-node/assembly/assembly.xml +++ b/java/connector-node/assembly/assembly.xml @@ -40,6 +40,7 @@ *:risingwave-sink-es-7 + *:risingwave-sink-cassandra *:risingwave-sink-jdbc *:risingwave-sink-iceberg *:risingwave-sink-deltalake diff --git a/java/connector-node/assembly/pom.xml b/java/connector-node/assembly/pom.xml index 5c518601bc8e7..6812bac5b63e6 100644 --- a/java/connector-node/assembly/pom.xml +++ b/java/connector-node/assembly/pom.xml @@ -33,6 +33,10 @@ com.risingwave.java risingwave-sink-es-7 + + com.risingwave.java + risingwave-sink-cassandra + com.risingwave.java risingwave-sink-jdbc diff --git a/java/connector-node/risingwave-connector-service/pom.xml b/java/connector-node/risingwave-connector-service/pom.xml index fccd84d24b9af..4e2dbe1d6ec96 100644 --- a/java/connector-node/risingwave-connector-service/pom.xml +++ b/java/connector-node/risingwave-connector-service/pom.xml @@ -99,5 +99,10 @@ risingwave-sink-es-7 provided + + com.risingwave.java + risingwave-sink-cassandra + provided + diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java index 6c0f779e287aa..ab3ac84346fa6 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java @@ -43,6 +43,8 @@ public static SinkFactory getSinkFactory(String sinkName) { return new DeltaLakeSinkFactory(); case "elasticsearch-7": return new EsSink7Factory(); + case "cassandra": + return new CassandraFactory(); default: throw UNIMPLEMENTED .withDescription("unknown sink type: " + sinkName) diff --git a/java/connector-node/risingwave-connector-test/pom.xml b/java/connector-node/risingwave-connector-test/pom.xml index d630eb183c856..7f8f6f1bc49cc 100644 --- a/java/connector-node/risingwave-connector-test/pom.xml +++ b/java/connector-node/risingwave-connector-test/pom.xml @@ -174,5 +174,10 @@ risingwave-sink-es-7 test + + com.risingwave.java + risingwave-sink-cassandra + test + diff --git a/java/connector-node/risingwave-sink-cassandra/pom.xml b/java/connector-node/risingwave-sink-cassandra/pom.xml new file mode 100644 index 0000000000000..e51faa9691cf9 --- /dev/null +++ b/java/connector-node/risingwave-sink-cassandra/pom.xml @@ -0,0 +1,60 @@ + + + + java-parent + com.risingwave.java + 1.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + + risingwave-sink-cassandra + 1.0-SNAPSHOT + risingwave-sink-cassandra + + + + com.risingwave.java + proto + + + com.risingwave.java + connector-api + + + org.apache.logging.log4j + log4j-api + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.logging.log4j + log4j-core + + + org.apache.commons + commons-text + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + + + + + com.datastax.oss + java-driver-core + ${datastax.version} + + + + \ No newline at end of file diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java new file mode 100644 index 0000000000000..a993a8988e1ee --- /dev/null +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java @@ -0,0 +1,95 @@ +/* + * Copyright 2023 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.risingwave.connector.api.sink.CommonSinkConfig; + +public class CassandraConfig extends CommonSinkConfig { + /** Required */ + private String type; + /** Required */ + private String url; + + /** Required */ + private String keyspace; + + /** Required */ + private String table; + + /** Required */ + private String datacenter; + + @JsonProperty(value = "cassandra.username") + private String username; + + @JsonProperty(value = "cassandra.password") + private String password; + + @JsonCreator + public CassandraConfig( + @JsonProperty(value = "cassandra.url") String url, + @JsonProperty(value = "cassandra.keyspace") String keyspace, + @JsonProperty(value = "cassandra.table") String table, + @JsonProperty(value = "cassandra.datacenter") String datacenter, + @JsonProperty(value = "type") String type) { + this.url = url; + this.keyspace = keyspace; + this.table = table; + this.datacenter = datacenter; + this.type = type; + } + + public String getType() { + return type; + } + + public String getUrl() { + return url; + } + + public String getKeyspace() { + return keyspace; + } + + public String getTable() { + return table; + } + + public String getDatacenter() { + return datacenter; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public CassandraConfig withUsername(String username) { + this.username = username; + return this; + } + + public CassandraConfig withPassword(String password) { + this.password = password; + return this; + } +} diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java new file mode 100644 index 0000000000000..f9fb5bee020e2 --- /dev/null +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraFactory.java @@ -0,0 +1,99 @@ +/* + * Copyright 2023 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.datastax.oss.driver.api.core.cql.*; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.risingwave.connector.api.TableSchema; +import com.risingwave.connector.api.sink.SinkFactory; +import com.risingwave.connector.api.sink.SinkWriter; +import com.risingwave.connector.api.sink.SinkWriterV1; +import com.risingwave.proto.Catalog.SinkType; +import io.grpc.Status; +import java.net.InetSocketAddress; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CassandraFactory implements SinkFactory { + private static final Logger LOG = LoggerFactory.getLogger(CassandraFactory.class); + + public SinkWriter createWriter(TableSchema tableSchema, Map tableProperties) { + ObjectMapper mapper = new ObjectMapper(); + CassandraConfig config = mapper.convertValue(tableProperties, CassandraConfig.class); + return new SinkWriterV1.Adapter(new CassandraSink(tableSchema, config)); + } + + @Override + public void validate( + TableSchema tableSchema, Map tableProperties, SinkType sinkType) { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true); + CassandraConfig config = mapper.convertValue(tableProperties, CassandraConfig.class); + + // 1. check url + String url = config.getUrl(); + String[] hostPort = url.split(":"); + if (hostPort.length != 2) { + throw new IllegalArgumentException( + "Invalid cassandraURL: expected `host:port`, got " + url); + } + // 2. check connection + CqlSessionBuilder sessionBuilder = + CqlSession.builder() + .addContactPoint( + new InetSocketAddress(hostPort[0], Integer.parseInt(hostPort[1]))) + .withKeyspace(config.getKeyspace()) + .withLocalDatacenter(config.getDatacenter()); + if (config.getUsername() != null && config.getPassword() != null) { + sessionBuilder = + sessionBuilder.withAuthCredentials(config.getUsername(), config.getPassword()); + } + CqlSession session = sessionBuilder.build(); + + TableMetadata tableMetadata = + session.getMetadata() + .getKeyspace(config.getKeyspace()) + .get() + .getTable(config.getTable()) + .get(); + CassandraUtil.checkSchema(tableSchema.getColumnDescs(), tableMetadata.getColumns()); + + if (session.isClosed()) { + throw Status.INVALID_ARGUMENT + .withDescription("Cannot connect to " + config.getUrl()) + .asRuntimeException(); + } + // 3. close client + session.close(); + switch (sinkType) { + case UPSERT: + CassandraUtil.checkPrimaryKey( + tableMetadata.getPrimaryKey(), tableSchema.getPrimaryKeys()); + break; + case APPEND_ONLY: + case FORCE_APPEND_ONLY: + break; + default: + throw Status.INTERNAL.asRuntimeException(); + } + } +} diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java new file mode 100644 index 0000000000000..af422dd2f5bdb --- /dev/null +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java @@ -0,0 +1,261 @@ +/* + * Copyright 2023 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.datastax.oss.driver.api.core.cql.*; +import com.risingwave.connector.api.TableSchema; +import com.risingwave.connector.api.sink.SinkRow; +import com.risingwave.connector.api.sink.SinkWriterBase; +import com.risingwave.proto.Data; +import com.risingwave.proto.Data.DataType.TypeName; +import io.grpc.Status; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CassandraSink extends SinkWriterBase { + private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class); + private final CqlSession session; + private final List updateRowCache = new ArrayList<>(1); + private final HashMap stmtMap; + private final List nonKeyColumns; + private final BatchStatementBuilder batchBuilder; + private final CassandraConfig config; + + public CassandraSink(TableSchema tableSchema, CassandraConfig config) { + super(tableSchema); + String url = config.getUrl(); + String[] hostPort = url.split(":"); + if (hostPort.length != 2) { + throw new IllegalArgumentException( + "Invalid cassandraURL: expected `host:port`, got " + url); + } + // check connection + CqlSessionBuilder sessionBuilder = + CqlSession.builder() + .addContactPoint( + new InetSocketAddress(hostPort[0], Integer.parseInt(hostPort[1]))) + .withKeyspace(config.getKeyspace()) + .withLocalDatacenter(config.getDatacenter()); + if (config.getUsername() != null && config.getPassword() != null) { + sessionBuilder = + sessionBuilder.withAuthCredentials(config.getUsername(), config.getPassword()); + } + this.session = sessionBuilder.build(); + if (session.isClosed()) { + throw Status.INVALID_ARGUMENT + .withDescription("Cannot connect to " + config.getUrl()) + .asRuntimeException(); + } + + this.config = config; + this.batchBuilder = BatchStatement.builder(DefaultBatchType.LOGGED); + + // fetch non-pk columns for prepared statements + nonKeyColumns = + Arrays.stream(tableSchema.getColumnNames()) + // cassandra does not allow SET on primary keys + .filter(c -> !tableSchema.getPrimaryKeys().contains(c)) + .collect(Collectors.toList()); + + this.stmtMap = new HashMap<>(); + // prepare statement for insert + this.stmtMap.put( + "insert", session.prepare(createInsertStatement(config.getTable(), tableSchema))); + if (config.getType().equals("upsert")) { + // prepare statement for update-insert/update-delete + this.stmtMap.put( + "update", + session.prepare(createUpdateStatement(config.getTable(), tableSchema))); + + // prepare the delete statement + this.stmtMap.put( + "delete", + session.prepare(createDeleteStatement(config.getTable(), tableSchema))); + } + } + + @Override + public void write(Iterator rows) { + if (this.config.getType().equals("append-only")) { + write_append_only(rows); + } else { + write_upsert(rows); + } + } + + private void write_append_only(Iterator rows) { + while (rows.hasNext()) { + SinkRow row = rows.next(); + Data.Op op = row.getOp(); + switch (op) { + case INSERT: + batchBuilder.addStatement(bindInsertStatement(this.stmtMap.get("insert"), row)); + break; + case UPDATE_DELETE: + break; + case UPDATE_INSERT: + break; + case DELETE: + break; + default: + throw Status.INTERNAL + .withDescription("Unknown operation: " + op) + .asRuntimeException(); + } + } + } + + private void write_upsert(Iterator rows) { + while (rows.hasNext()) { + SinkRow row = rows.next(); + Data.Op op = row.getOp(); + switch (op) { + case INSERT: + batchBuilder.addStatement(bindInsertStatement(this.stmtMap.get("insert"), row)); + break; + case UPDATE_DELETE: + updateRowCache.clear(); + updateRowCache.add(row); + break; + case UPDATE_INSERT: + SinkRow old = updateRowCache.remove(0); + if (old == null) { + throw Status.FAILED_PRECONDITION + .withDescription("UPDATE_INSERT without UPDATE_DELETE") + .asRuntimeException(); + } + batchBuilder.addStatement( + bindUpdateInsertStatement(this.stmtMap.get("update"), old, row)); + break; + case DELETE: + batchBuilder.addStatement(bindDeleteStatement(this.stmtMap.get("delete"), row)); + break; + default: + throw Status.INTERNAL + .withDescription("Unknown operation: " + op) + .asRuntimeException(); + } + } + } + + @Override + public void sync() { + try { + session.execute(batchBuilder.build()); + batchBuilder.clearStatements(); + } catch (Exception e) { + throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException(); + } + } + + @Override + public void drop() { + session.close(); + } + + private String createInsertStatement(String tableName, TableSchema tableSchema) { + String[] columnNames = tableSchema.getColumnNames(); + String columnNamesString = String.join(", ", columnNames); + String placeholdersString = String.join(", ", Collections.nCopies(columnNames.length, "?")); + return String.format( + "INSERT INTO %s (%s) VALUES (%s)", + tableName, columnNamesString, placeholdersString); + } + + private String createUpdateStatement(String tableName, TableSchema tableSchema) { + List primaryKeys = tableSchema.getPrimaryKeys(); + String setClause = // cassandra does not allow SET on primary keys + nonKeyColumns.stream() + .map(columnName -> columnName + " = ?") + .collect(Collectors.joining(", ")); + String whereClause = + primaryKeys.stream() + .map(columnName -> columnName + " = ?") + .collect(Collectors.joining(" AND ")); + return String.format("UPDATE %s SET %s WHERE %s", tableName, setClause, whereClause); + } + + private static String createDeleteStatement(String tableName, TableSchema tableSchema) { + List primaryKeys = tableSchema.getPrimaryKeys(); + String whereClause = + primaryKeys.stream() + .map(columnName -> columnName + " = ?") + .collect(Collectors.joining(" AND ")); + return String.format("DELETE FROM %s WHERE %s", tableName, whereClause); + } + + private BoundStatement bindInsertStatement(PreparedStatement stmt, SinkRow row) { + TableSchema schema = getTableSchema(); + return stmt.bind( + IntStream.range(0, row.size()) + .mapToObj( + (index) -> + CassandraUtil.convertRow( + row.get(index), + schema.getColumnDescs() + .get(index) + .getDataType() + .getTypeName())) + .toArray()); + } + + private BoundStatement bindDeleteStatement(PreparedStatement stmt, SinkRow row) { + TableSchema schema = getTableSchema(); + Map columnDescs = schema.getColumnTypes(); + return stmt.bind( + getTableSchema().getPrimaryKeys().stream() + .map( + key -> + CassandraUtil.convertRow( + schema.getFromRow(key, row), columnDescs.get(key))) + .toArray()); + } + + private BoundStatement bindUpdateInsertStatement( + PreparedStatement stmt, SinkRow updateRow, SinkRow insertRow) { + TableSchema schema = getTableSchema(); + int numKeys = schema.getPrimaryKeys().size(); + int numNonKeys = updateRow.size() - numKeys; + Map columnDescs = schema.getColumnTypes(); + Object[] values = new Object[numNonKeys + numKeys]; + + // bind "SET" clause + Iterator nonKeyIter = nonKeyColumns.iterator(); + for (int i = 0; i < numNonKeys; i++) { + String name = nonKeyIter.next(); + values[i] = + CassandraUtil.convertRow( + schema.getFromRow(name, insertRow), columnDescs.get(name)); + } + + // bind "WHERE" clause + Iterator keyIter = schema.getPrimaryKeys().iterator(); + for (int i = 0; i < numKeys; i++) { + String name = keyIter.next(); + values[numNonKeys + i] = + CassandraUtil.convertRow( + schema.getFromRow(name, updateRow), columnDescs.get(name)); + } + return stmt.bind(values); + } +} diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java new file mode 100644 index 0000000000000..fbe9c1c04a28b --- /dev/null +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java @@ -0,0 +1,166 @@ +/* + * Copyright 2023 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.data.CqlDuration; +import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata; +import com.risingwave.connector.api.ColumnDesc; +import com.risingwave.proto.Data.DataType; +import com.risingwave.proto.Data.DataType.TypeName; +import io.grpc.Status; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class CassandraUtil { + private static int getCorrespondingCassandraType(DataType dataType) { + switch (dataType.getTypeName()) { + case INT16: + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.SMALLINT; + case INT32: + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.INT; + case INT64: + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.BIGINT; + case FLOAT: + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.FLOAT; + case DOUBLE: + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.DOUBLE; + case BOOLEAN: + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.BOOLEAN; + case VARCHAR: + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.VARCHAR; + case DECIMAL: + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.DECIMAL; + case TIMESTAMP: + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.TIMESTAMP; + case TIMESTAMPTZ: + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.TIMESTAMP; + case DATE: + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.DATE; + case TIME: + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.TIME; + case BYTEA: + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.BLOB; + case LIST: + case STRUCT: + throw Status.UNIMPLEMENTED + .withDescription(String.format("not support %s now", dataType)) + .asRuntimeException(); + case INTERVAL: + return com.datastax.oss.protocol.internal.ProtocolConstants.DataType.DURATION; + default: + throw Status.INVALID_ARGUMENT + .withDescription("unspecified type" + dataType) + .asRuntimeException(); + } + } + + public static void checkSchema( + List columnDescs, + Map cassandraColumnDescMap) { + if (columnDescs.size() != cassandraColumnDescMap.size()) { + throw Status.FAILED_PRECONDITION + .withDescription("Don't match in the number of columns in the table") + .asRuntimeException(); + } + for (ColumnDesc columnDesc : columnDescs) { + CqlIdentifier cql = CqlIdentifier.fromInternal(columnDesc.getName()); + if (!cassandraColumnDescMap.containsKey(cql)) { + throw Status.FAILED_PRECONDITION + .withDescription( + String.format( + "Don't match in the name, rw is %s cassandra can't find it", + columnDesc.getName())) + .asRuntimeException(); + } + if (cassandraColumnDescMap.get(cql).getType().getProtocolCode() + != getCorrespondingCassandraType(columnDesc.getDataType())) { + throw Status.FAILED_PRECONDITION + .withDescription( + String.format( + "Don't match in the type, name is %s, cassandra is %s, rw is %s", + columnDesc.getName(), + cassandraColumnDescMap.get(cql), + columnDesc.getDataType().getTypeName())) + .asRuntimeException(); + } + } + } + + public static void checkPrimaryKey( + List cassandraColumnMetadatas, List columnMetadatas) { + if (cassandraColumnMetadatas.size() != columnMetadatas.size()) { + throw Status.FAILED_PRECONDITION + .withDescription("Primary key len don't match") + .asRuntimeException(); + } + Set cassandraColumnsSet = + cassandraColumnMetadatas.stream() + .map((a) -> a.getName().toString()) + .collect(Collectors.toSet()); + for (String columnMetadata : columnMetadatas) { + if (!cassandraColumnsSet.contains(columnMetadata)) { + throw Status.FAILED_PRECONDITION + .withDescription( + String.format( + "Primary key don't match. RisingWave Primary key is %s, don't find it in cassandra", + columnMetadata)) + .asRuntimeException(); + } + } + } + + public static Object convertRow(Object value, TypeName typeName) { + switch (typeName) { + case INT16: + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + case BOOLEAN: + case VARCHAR: + case DECIMAL: + return value; + case TIMESTAMP: + case TIMESTAMPTZ: + return ((Timestamp) value).toInstant(); + case DATE: + return ((Date) value).toLocalDate(); + case TIME: + return ((Time) value).toLocalTime(); + case INTERVAL: + return CqlDuration.from((String) value); + case BYTEA: + return ByteBuffer.wrap((byte[]) value); + case LIST: + case STRUCT: + throw Status.UNIMPLEMENTED + .withDescription(String.format("not support %s now", typeName)) + .asRuntimeException(); + default: + throw Status.INVALID_ARGUMENT + .withDescription("unspecified type" + typeName) + .asRuntimeException(); + } + } +} diff --git a/java/connector-node/risingwave-sink-es-7/pom.xml b/java/connector-node/risingwave-sink-es-7/pom.xml index dfeef31e83b90..04694ed98bb04 100644 --- a/java/connector-node/risingwave-sink-es-7/pom.xml +++ b/java/connector-node/risingwave-sink-es-7/pom.xml @@ -9,7 +9,6 @@ 4.0.0 - risingwave-sink-es-7 1.0-SNAPSHOT risingwave-sink-es-7 diff --git a/java/pom.xml b/java/pom.xml index 401db2aed123a..e7c790e37a79f 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -18,6 +18,7 @@ connector-node/risingwave-sink-iceberg connector-node/risingwave-sink-deltalake connector-node/risingwave-sink-es-7 + connector-node/risingwave-sink-cassandra connector-node/risingwave-sink-jdbc connector-node/risingwave-source-cdc connector-node/risingwave-connector-test @@ -45,6 +46,7 @@ 3.3.1 3.3.3 7.17.10 + 4.15.0 @@ -225,6 +227,11 @@ risingwave-sink-es-7 ${module.version} + + com.risingwave.java + risingwave-sink-cassandra + ${module.version} + com.risingwave.java risingwave-sink-jdbc diff --git a/risedev.yml b/risedev.yml index 0ad428794e37c..000971004bf51 100644 --- a/risedev.yml +++ b/risedev.yml @@ -20,14 +20,14 @@ profile: # config-path: src/config/example.toml steps: # If you want to use the local s3 storage, enable the following line - # - use: minio + - use: minio # If you want to use aws-s3, configure AK and SK in env var and enable the following lines: # - use: aws-s3 # bucket: test-bucket # If you want to create CDC source table, uncomment the following line - # - use: connector-node + - use: connector-node # if you want to enable etcd backend, uncomment the following lines. # - use: etcd @@ -43,7 +43,7 @@ profile: - use: frontend # If you want to enable compactor, uncomment the following line, and enable either minio or aws-s3 as well. - # - use: compactor + - use: compactor # If you want to create source from Kafka, uncomment the following lines # Note that kafka depends on zookeeper, so zookeeper must be started beforehand. diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 1d62577a88ad8..403ee4c7b73e5 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -48,8 +48,13 @@ use crate::sink::{ }; use crate::ConnectorParams; -pub const VALID_REMOTE_SINKS: [&str; 4] = - ["jdbc", REMOTE_ICEBERG_SINK, "deltalake", "elasticsearch-7"]; +pub const VALID_REMOTE_SINKS: [&str; 5] = [ + "jdbc", + REMOTE_ICEBERG_SINK, + "deltalake", + "elasticsearch-7", + "cassandra", +]; pub fn is_valid_remote_sink(connector_type: &str) -> bool { VALID_REMOTE_SINKS.contains(&connector_type)