From 486caa93ab60414158b3bd63639d7e8e48119f55 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 25 Aug 2023 15:14:00 +0800 Subject: [PATCH] fmt fmt fmt fmt fmt fmt --- integration_tests/cassandra-sink/README.md | 2 +- .../{cassandra-query.sql => cassandra_query.sql} | 2 +- integration_tests/cassandra-sink/create_sink.sql | 2 +- integration_tests/clickhouse-sink/create_source.sql | 9 +++------ .../java/com/risingwave/connector/CassandraSink.java | 3 +-- .../java/com/risingwave/connector/CassandraUtil.java | 1 + 6 files changed, 8 insertions(+), 11 deletions(-) rename integration_tests/cassandra-sink/cassandra-sql/{cassandra-query.sql => cassandra_query.sql} (91%) diff --git a/integration_tests/cassandra-sink/README.md b/integration_tests/cassandra-sink/README.md index d49ce9545fac9..ac957bc5f7dde 100644 --- a/integration_tests/cassandra-sink/README.md +++ b/integration_tests/cassandra-sink/README.md @@ -31,5 +31,5 @@ docker compose exec cassandra bash /opt/cassandra/cassandra-sql/run-sql-file.sh ``` ```sql -select user_id, count(*) from default.demo_test group by user_id +select user_id, count(*) from my_keyspace.demo_test group by user_id ``` diff --git a/integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql b/integration_tests/cassandra-sink/cassandra-sql/cassandra_query.sql similarity index 91% rename from integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql rename to integration_tests/cassandra-sink/cassandra-sql/cassandra_query.sql index 004d9fdd1c00e..a18cc472173ab 100644 --- a/integration_tests/cassandra-sink/cassandra-sql/cassandra-query.sql +++ b/integration_tests/cassandra-sink/cassandra-sql/cassandra_query.sql @@ -1 +1 @@ -select user_id, count(*) from my_keyspace.demo_test group by user_id +select user_id, count(*) from my_keyspace.demo_test group by user_id; diff --git a/integration_tests/cassandra-sink/create_sink.sql b/integration_tests/cassandra-sink/create_sink.sql index 56fab31689efd..7fb151ba1e98b 100644 --- a/integration_tests/cassandra-sink/create_sink.sql +++ b/integration_tests/cassandra-sink/create_sink.sql @@ -1,4 +1,4 @@ -CREATE SINK bhv_iceberg_sink +CREATE SINK bhv_cassandra_sink FROM bhv_mv WITH ( connector = 'cassandra', diff --git a/integration_tests/clickhouse-sink/create_source.sql b/integration_tests/clickhouse-sink/create_source.sql index e704eb785ec06..a8e9a0f9e882e 100644 --- a/integration_tests/clickhouse-sink/create_source.sql +++ b/integration_tests/clickhouse-sink/create_source.sql @@ -9,12 +9,9 @@ CREATE table user_behaviors ( PRIMARY KEY(user_id) ) WITH ( connector = 'datagen', - fields.seq_id.kind = 'sequence', - fields.seq_id.start = '1', - fields.seq_id.end = '10000000', - fields.user_id.kind = 'random', - fields.user_id.min = '1', - fields.user_id.max = '10000000', + fields.user_id.kind = 'sequence', + fields.user_id.start = '1', + fields.user_id.end = '1000', fields.user_name.kind = 'random', fields.user_name.length = '10', datagen.rows.per.second = '20000' diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java index 49141514d1a2c..40e23d4537c2d 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java @@ -49,7 +49,7 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) { throw new IllegalArgumentException( "Invalid cassandraURL: expected `host:port`, got " + url); } - // 2. check connection + // check connection CqlSessionBuilder sessionBuilder = CqlSession.builder() .addContactPoint( @@ -97,7 +97,6 @@ public CassandraSink(TableSchema tableSchema, CassandraConfig config) { @Override public void write(Iterator rows) { - System.out.println(this.config.getType()); if (this.config.getType().equals("append-only")) { write_append_only(rows); } else { diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java index ead056bfdba22..f6f68012c25c6 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java @@ -124,6 +124,7 @@ public static Object convertRow(Object value, TypeName typeName) { return CqlDuration.from((String) value); case BYTEA: return ByteBuffer.wrap((byte[]) value); + case LIST: case STRUCT: throw Status.UNIMPLEMENTED .withDescription(String.format("not support %s now", typeName))