diff --git a/integration_tests/cassandra-sink/README.md b/integration_tests/cassandra-sink/README.md index d49ce9545fac..ac957bc5f7dd 100644 --- a/integration_tests/cassandra-sink/README.md +++ b/integration_tests/cassandra-sink/README.md @@ -31,5 +31,5 @@ docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh ``` ```sql -select user_id, count(*) from default.demo_test group by user_id +select user_id, count(*) from my_keyspace.demo_test group by user_id ``` diff --git a/integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql b/integration_tests/cassandra-sink/cassandra-sql/cassandra_query.sql similarity index 91% rename from integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql rename to integration_tests/cassandra-sink/cassandra-sql/cassandra_query.sql index 004d9fdd1c00..a18cc472173a 100644 --- a/integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql +++ b/integration_tests/cassandra-sink/cassandra-sql/cassandra_query.sql @@ -1 +1 @@ -select user_id, count(*) from my_keyspace.demo_test group by user_id +select user_id, count(*) from my_keyspace.demo_test group by user_id; diff --git a/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql b/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql index 058fd57c80a6..d858e7c47ae6 100644 --- a/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql +++ b/integration_tests/cassandra-sink/cassandra-sql/create_cassandra_table.sql @@ -1,7 +1,7 @@ CREATE KEYSPACE my_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; use my_keyspace; CREATE table demo_test( - user_id text primary key, + user_id int primary key, target_id text, event_timestamp timestamp, ); \ No newline at end of file diff --git a/integration_tests/cassandra-sink/create_sink.sql b/integration_tests/cassandra-sink/create_sink.sql index 56fab31689ef..010577530120 100644 --- a/integration_tests/cassandra-sink/create_sink.sql +++ b/integration_tests/cassandra-sink/create_sink.sql @@ -1,10 +1,10 @@ -CREATE SINK bhv_iceberg_sink +CREATE SINK bhv_cassandra_sink FROM bhv_mv WITH ( connector = 'cassandra', type = 'append-only', force_append_only='true', - cassandra.url = 'http://127.0.0.1:9042', + cassandra.url = 'cassandra:9042', cassandra.keyspace = 'mykeyspace', cassandra.table = 'demo_test', cassandra.datacenter = 'datacenter1', diff --git a/integration_tests/cassandra-sink/create_source.sql b/integration_tests/cassandra-sink/create_source.sql index e704eb785ec0..acb48fecf4e5 100644 --- a/integration_tests/cassandra-sink/create_source.sql +++ b/integration_tests/cassandra-sink/create_source.sql @@ -1,5 +1,5 @@ CREATE table user_behaviors ( - user_id VARCHAR, + user_id int, target_id VARCHAR, target_type VARCHAR, event_timestamp TIMESTAMP, @@ -9,13 +9,10 @@ CREATE table user_behaviors ( PRIMARY KEY(user_id) ) WITH ( connector = 'datagen', - fields.seq_id.kind = 'sequence', - fields.seq_id.start = '1', - fields.seq_id.end = '10000000', - fields.user_id.kind = 'random', - fields.user_id.min = '1', - fields.user_id.max = '10000000', + fields.user_id.kind = 'sequence', + fields.user_id.start = '1', + fields.user_id.end = '1000', fields.user_name.kind = 'random', fields.user_name.length = '10', - datagen.rows.per.second = '20000' + datagen.rows.per.second = '500' ) FORMAT PLAIN ENCODE JSON; \ No newline at end of file diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java index 49141514d1a2..40e23d4537c2 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java @@ -49,7 +49,7 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) { throw new IllegalArgumentException( "Invalid cassandraURL: expected `host:port`, got " + url); } - // 2. check connection + // check connection CqlSessionBuilder sessionBuilder = CqlSession.builder() .addContactPoint( @@ -97,7 +97,6 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) { @Override public void write(Iterator rows) { - System.out.println(this.config.getType()); if (this.config.getType().equals("append-only")) { write_append_only(rows); } else { diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java index ead056bfdba2..f6f68012c25c 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java @@ -124,6 +124,7 @@ public static Object convertRow(Object value, TypeName typeName) { return CqlDuration.from((String) value); case BYTEA: return ByteBuffer.wrap((byte[]) value); + case LIST: case STRUCT: throw Status.UNIMPLEMENTED .withDescription(String.format("not support %s now", typeName))