Skip to content

Commit

Permalink
feat(sink): Support cassandra sink (#11878)
Browse files Browse the repository at this point in the history
Co-authored-by: Patrick Huang <[email protected]>
  • Loading branch information
xxhZs and hzxa21 authored Sep 7, 2023
1 parent cc3faf6 commit 7d940cf
Show file tree
Hide file tree
Showing 19 changed files with 898 additions and 6 deletions.
65 changes: 65 additions & 0 deletions integration_tests/cassandra-and-syclladb-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Demo: Sinking to Cassandra/Scylladb

In this demo, we want to showcase how RisingWave is able to sink data to Cassandra.

1. Set the compose profile accordingly:
Demo with Apache Cassandra:
```
export COMPOSE_PROFILES=cassandra
```

Demo with Scylladb
```
export COMPOSE_PROFILES=scylladb
```

2. Launch the cluster:

```sh
docker-compose up -d
```

The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Cassandra for sink.


3. Create the Cassandra table via cqlsh:

Login to cqlsh
```sh
# cqlsh into cassandra
docker compose exec cassandra cqlsh
# cqlsh into scylladb
docker compose exec scylladb cqlsh
```

Run the following queries to create keyspace and table.
```sql
CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
use demo;
CREATE table demo_bhv_table(
user_id int primary key,
target_id text,
event_timestamp timestamp,
);
```

3. Execute the SQL queries in sequence:

- create_source.sql
- create_mv.sql
- create_sink.sql

4. Execute a simple query to check the sink results via csqlsh:

Login to cqlsh
```sh
# cqlsh into cassandra
docker compose exec cassandra cqlsh
# cqlsh into scylladb
docker compose exec scylladb cqlsh
```

Run the following query
```sql
select user_id, count(*) from my_keyspace.demo_test group by user_id;
```
7 changes: 7 additions & 0 deletions integration_tests/cassandra-and-syclladb-sink/create_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
FROM
user_behaviors;
11 changes: 11 additions & 0 deletions integration_tests/cassandra-and-syclladb-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE SINK bhv_cassandra_sink
FROM
bhv_mv WITH (
connector = 'cassandra',
type = 'append-only',
force_append_only='true',
cassandra.url = 'cassandra:9042',
cassandra.keyspace = 'demo',
cassandra.table = 'demo_bhv_table',
cassandra.datacenter = 'datacenter1',
);
18 changes: 18 additions & 0 deletions integration_tests/cassandra-and-syclladb-sink/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '1000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '10'
) FORMAT PLAIN ENCODE JSON;
82 changes: 82 additions & 0 deletions integration_tests/cassandra-and-syclladb-sink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
---
version: "3"
services:
cassandra:
image: cassandra:4.0
ports:
- 9042:9042
environment:
- CASSANDRA_CLUSTER_NAME=cloudinfra
profiles:
- cassandra
scylladb:
image: scylladb/scylla:5.1
ports:
- 9042:9042
environment:
- CASSANDRA_CLUSTER_NAME=cloudinfra
profiles:
- scylladb
compactor-0:
extends:
file: ../../docker/docker-compose.yml
service: compactor-0
compute-node-0:
extends:
file: ../../docker/docker-compose.yml
service: compute-node-0
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
frontend-node-0:
extends:
file: ../../docker/docker-compose.yml
service: frontend-node-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
meta-node-0:
extends:
file: ../../docker/docker-compose.yml
service: meta-node-0
connector-node:
extends:
file: ../../docker/docker-compose.yml
service: connector-node
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
message_queue:
extends:
file: ../../docker/docker-compose.yml
service: message_queue
datagen:
build: ../datagen
depends_on: [message_queue]
command:
- /bin/sh
- -c
- /datagen --mode clickstream --qps 2 kafka --brokers message_queue:29092
restart: always
container_name: datagen
volumes:
compute-node-0:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
message_queue:
external: false
name: risingwave-compose
1 change: 1 addition & 0 deletions java/connector-node/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

<!-- Sink connectors -->
<include>*:risingwave-sink-es-7</include>
<include>*:risingwave-sink-cassandra</include>
<include>*:risingwave-sink-jdbc</include>
<include>*:risingwave-sink-iceberg</include>
<include>*:risingwave-sink-deltalake</include>
Expand Down
4 changes: 4 additions & 0 deletions java/connector-node/assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
<groupId>com.risingwave.java</groupId>
<artifactId>risingwave-sink-es-7</artifactId>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>risingwave-sink-cassandra</artifactId>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>risingwave-sink-jdbc</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions java/connector-node/risingwave-connector-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,10 @@
<artifactId>risingwave-sink-es-7</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>risingwave-sink-cassandra</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public static SinkFactory getSinkFactory(String sinkName) {
return new DeltaLakeSinkFactory();
case "elasticsearch-7":
return new EsSink7Factory();
case "cassandra":
return new CassandraFactory();
default:
throw UNIMPLEMENTED
.withDescription("unknown sink type: " + sinkName)
Expand Down
5 changes: 5 additions & 0 deletions java/connector-node/risingwave-connector-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,5 +174,10 @@
<artifactId>risingwave-sink-es-7</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>risingwave-sink-cassandra</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
60 changes: 60 additions & 0 deletions java/connector-node/risingwave-sink-cassandra/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>java-parent</artifactId>
<groupId>com.risingwave.java</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>risingwave-sink-cassandra</artifactId>
<version>1.0-SNAPSHOT</version>
<name>risingwave-sink-cassandra</name>

<dependencies>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>proto</artifactId>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>connector-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>

<!-- cassandra drivers -->
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>${datastax.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2023 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.risingwave.connector;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.risingwave.connector.api.sink.CommonSinkConfig;

public class CassandraConfig extends CommonSinkConfig {
/** Required */
private String type;
/** Required */
private String url;

/** Required */
private String keyspace;

/** Required */
private String table;

/** Required */
private String datacenter;

@JsonProperty(value = "cassandra.username")
private String username;

@JsonProperty(value = "cassandra.password")
private String password;

@JsonCreator
public CassandraConfig(
@JsonProperty(value = "cassandra.url") String url,
@JsonProperty(value = "cassandra.keyspace") String keyspace,
@JsonProperty(value = "cassandra.table") String table,
@JsonProperty(value = "cassandra.datacenter") String datacenter,
@JsonProperty(value = "type") String type) {
this.url = url;
this.keyspace = keyspace;
this.table = table;
this.datacenter = datacenter;
this.type = type;
}

public String getType() {
return type;
}

public String getUrl() {
return url;
}

public String getKeyspace() {
return keyspace;
}

public String getTable() {
return table;
}

public String getDatacenter() {
return datacenter;
}

public String getUsername() {
return username;
}

public String getPassword() {
return password;
}

public CassandraConfig withUsername(String username) {
this.username = username;
return this;
}

public CassandraConfig withPassword(String password) {
this.password = password;
return this;
}
}
Loading

0 comments on commit 7d940cf

Please sign in to comment.