Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sink): fix sink in to Cassandra failed when using column name containing upper case letter #17493

Merged
merged 14 commits into from
Aug 20, 2024
13 changes: 13 additions & 0 deletions ci/scripts/e2e-cassandra-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ export CQLSH_PORT=9042
./cqlsh --request-timeout=20 -e "CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};use demo;
CREATE table demo_bhv_table(v1 int primary key,v2 smallint,v3 bigint,v4 float,v5 double,v6 text,v7 date,v8 timestamp,v9 boolean);"

./cqlsh --request-timeout=20 -e "use demo;CREATE table test_uppercase(\"TEST_V1\" int primary key, \"TEST_V2\" int,\"TEST_V3\" int);"

echo "--- testing sinks"
cd ../../
sqllogictest -p 4566 -d dev './e2e_test/sink/cassandra_sink.slt'
sleep 1
cd apache-cassandra-4.1.3/bin
./cqlsh --request-timeout=20 -e "COPY demo.demo_bhv_table TO './query_result.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"
./cqlsh --request-timeout=20 -e "COPY demo.test_uppercase TO './query_result2.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"

if cat ./query_result.csv | awk -F "," '{
exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01.000+0000" && $9 == "False\r"); }'; then
Expand All @@ -68,6 +71,16 @@ else
exit 1
fi

if cat ./query_result2.csv | awk -F "," '{
exit !($1 == 1 && $2 == 1 && $3 == "1\r"); }'; then
echo "Cassandra sink check passed"
else
echo "The output is not as expected."
echo "output:"
cat ./query_result2.csv
exit 1
fi
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we can also put the validation logic in cassandra_sink.slt using system command

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okk


echo "--- Kill cluster"
cd ../../
risedev ci-kill
27 changes: 23 additions & 4 deletions e2e_test/sink/cassandra_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ statement ok
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
CREATE TABLE t7 ("TEST_V1" int primary key, "TEST_V2" int, "TEST_V3" int);

statement ok
CREATE SINK s6
FROM
mv6 WITH (
t6 WITH (
connector = 'cassandra',
type = 'append-only',
force_append_only='true',
Expand All @@ -17,17 +17,36 @@ FROM
cassandra.datacenter = 'datacenter1',
);

statement ok
CREATE SINK s7
FROM
t7 WITH (
connector = 'cassandra',
type = 'append-only',
force_append_only='true',
cassandra.url = 'cassandra-server:9042',
cassandra.keyspace = 'demo',
cassandra.table = 'test_uppercase',
cassandra.datacenter = 'datacenter1',
);

statement ok
INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false);

statement ok
INSERT INTO t7 VALUES (1, 1, 1);

statement ok
FLUSH;

statement ok
DROP SINK s6;

statement ok
DROP MATERIALIZED VIEW mv6;
DROP TABLE t6;

statement ok
DROP SINK s7;

statement ok
DROP TABLE t6;
DROP TABLE t7;
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,10 @@ public void drop() {

private String createInsertStatement(String tableName, TableSchema tableSchema) {
String[] columnNames = tableSchema.getColumnNames();
String columnNamesString = String.join(", ", columnNames);
String columnNamesString =
Arrays.stream(columnNames)
.map(columnName -> "\"" + columnName + "\"")
.collect(Collectors.joining(", "));
String placeholdersString = String.join(", ", Collections.nCopies(columnNames.length, "?"));
return String.format(
"INSERT INTO %s (%s) VALUES (%s)",
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -204,11 +207,11 @@ private String createUpdateStatement(String tableName, TableSchema tableSchema)
List<String> primaryKeys = tableSchema.getPrimaryKeys();
String setClause = // cassandra does not allow SET on primary keys
nonKeyColumns.stream()
.map(columnName -> columnName + " = ?")
.map(columnName -> "\"" + columnName + "\" = ?")
.collect(Collectors.joining(", "));
String whereClause =
primaryKeys.stream()
.map(columnName -> columnName + " = ?")
.map(columnName -> "\"" + columnName + "\" = ?")
.collect(Collectors.joining(" AND "));
return String.format("UPDATE %s SET %s WHERE %s", tableName, setClause, whereClause);
}
Expand All @@ -217,7 +220,7 @@ private static String createDeleteStatement(String tableName, TableSchema tableS
List<String> primaryKeys = tableSchema.getPrimaryKeys();
String whereClause =
primaryKeys.stream()
.map(columnName -> columnName + " = ?")
.map(columnName -> "\"" + columnName + "\" = ?")
.collect(Collectors.joining(" AND "));
return String.format("DELETE FROM %s WHERE %s", tableName, whereClause);
}
Expand Down
Loading