Skip to content

Commit

Permalink
fix(sink): fix sink in to Cassandra failed when using column name con…
Browse files Browse the repository at this point in the history
…taining upper case letter (#17493)
  • Loading branch information
xxhZs authored Aug 20, 2024
1 parent e383ad6 commit aba3232
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 51 deletions.
13 changes: 13 additions & 0 deletions ci/scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,16 @@ get_latest_kafka_download_url() {
local download_url="https://downloads.apache.org/kafka/${latest_version}/kafka_2.13-${latest_version}.tgz"
echo "$download_url"
}

get_latest_cassandra_version() {
local versions=$(curl -s https://downloads.apache.org/cassandra/ | grep -Eo 'href="[0-9]+\.[0-9]+\.[0-9]+/"' | grep -Eo "[0-9]+\.[0-9]+\.[0-9]+")
# Sort the version numbers and get the latest one
local latest_version=$(echo "$versions" | sort -V | tail -n1)
echo "$latest_version"
}

get_latest_cassandra_download_url() {
local latest_version=$(get_latest_cassandra_version)
local download_url="https://downloads.apache.org/cassandra/${latest_version}/apache-cassandra-${latest_version}-bin.tar.gz"
echo "$download_url"
}
30 changes: 7 additions & 23 deletions ci/scripts/e2e-cassandra-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,37 +36,21 @@ risedev ci-start ci-sink-test
# Wait cassandra server to start
sleep 40

echo "--- create cassandra table"
curl https://downloads.apache.org/cassandra/4.1.3/apache-cassandra-4.1.3-bin.tar.gz --output apache-cassandra-4.1.3-bin.tar.gz
tar xfvz apache-cassandra-4.1.3-bin.tar.gz
echo "--- install cassandra"
wget $(get_latest_cassandra_download_url) -O cassandra_latest.tar.gz
tar xfvz cassandra_latest.tar.gz
export LATEST_CASSANDRA_VERSION=$(get_latest_cassandra_version)
export CASSANDRA_DIR="./apache-cassandra-${LATEST_CASSANDRA_VERSION}"
# remove bundled packages, and use installed packages, because Python 3.12 has removed asyncore, but I failed to install libev support for bundled Python driver.
rm apache-cassandra-4.1.3/lib/six-1.12.0-py2.py3-none-any.zip
rm apache-cassandra-4.1.3/lib/cassandra-driver-internal-only-3.25.0.zip
rm ${CASSANDRA_DIR}/lib/six-1.12.0-py2.py3-none-any.zip
rm ${CASSANDRA_DIR}/lib/cassandra-driver-internal-only-3.25.0.zip
apt-get install -y libev4 libev-dev
pip3 install --break-system-packages cassandra-driver

cd apache-cassandra-4.1.3/bin
export CQLSH_HOST=cassandra-server
export CQLSH_PORT=9042
./cqlsh --request-timeout=20 -e "CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};use demo;
CREATE table demo_bhv_table(v1 int primary key,v2 smallint,v3 bigint,v4 float,v5 double,v6 text,v7 date,v8 timestamp,v9 boolean);"

echo "--- testing sinks"
cd ../../
sqllogictest -p 4566 -d dev './e2e_test/sink/cassandra_sink.slt'
sleep 1
cd apache-cassandra-4.1.3/bin
./cqlsh --request-timeout=20 -e "COPY demo.demo_bhv_table TO './query_result.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"

if cat ./query_result.csv | awk -F "," '{
exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01.000+0000" && $9 == "False\r"); }'; then
echo "Cassandra sink check passed"
else
echo "The output is not as expected."
echo "output:"
cat ./query_result.csv
exit 1
fi

echo "--- Kill cluster"
cd ../../
Expand Down
37 changes: 18 additions & 19 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -901,25 +901,24 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

# FIXME(xxhZs): https://github.com/risingwavelabs/risingwave/issues/17855
# - label: "end-to-end cassandra sink test"
# key: "e2e-cassandra-sink-tests"
# command: "ci/scripts/e2e-cassandra-sink-test.sh -p ci-release"
# if: |
# !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
# || build.pull_request.labels includes "ci/run-e2e-cassandra-sink-tests"
# || build.env("CI_STEPS") =~ /(^|,)e2e-cassandra-sink-tests?(,|$$)/
# depends_on:
# - "build"
# - "build-other"
# plugins:
# - docker-compose#v5.1.0:
# run: sink-test-env
# config: ci/docker-compose.yml
# mount-buildkite-agent: true
# - ./ci/plugins/upload-failure-logs
# timeout_in_minutes: 10
# retry: *auto-retry
- label: "end-to-end cassandra sink test"
key: "e2e-cassandra-sink-tests"
command: "ci/scripts/e2e-cassandra-sink-test.sh -p ci-release"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-cassandra-sink-tests"
|| build.env("CI_STEPS") =~ /(^|,)e2e-cassandra-sink-tests?(,|$$)/
depends_on:
- "build"
- "build-other"
plugins:
- docker-compose#v5.1.0:
run: sink-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end clickhouse sink test"
key: "e2e-clickhouse-sink-tests"
Expand Down
50 changes: 46 additions & 4 deletions e2e_test/sink/cassandra_sink.slt
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
system ok
${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};use demo;CREATE table demo_bhv_table(v1 int primary key,v2 smallint,v3 bigint,v4 float,v5 double,v6 text,v7 date,v8 timestamp,v9 boolean);"

system ok
${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "use demo;CREATE table \"Test_uppercase\"(\"TEST_V1\" int primary key, \"TEST_V2\" int,\"TEST_V3\" int);"

statement ok
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
CREATE TABLE t7 ("TEST_V1" int primary key, "TEST_V2" int, "TEST_V3" int);

statement ok
CREATE SINK s6
FROM
mv6 WITH (
t6 WITH (
connector = 'cassandra',
type = 'append-only',
force_append_only='true',
Expand All @@ -17,17 +23,53 @@ FROM
cassandra.datacenter = 'datacenter1',
);

statement ok
CREATE SINK s7
FROM
t7 WITH (
connector = 'cassandra',
type = 'append-only',
force_append_only='true',
cassandra.url = 'cassandra-server:9042',
cassandra.keyspace = 'demo',
cassandra.table = 'Test_uppercase',
cassandra.datacenter = 'datacenter1',
);

statement ok
INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false);

statement ok
INSERT INTO t7 VALUES (1, 1, 1);

statement ok
FLUSH;

statement ok
DROP SINK s6;

statement ok
DROP MATERIALIZED VIEW mv6;
DROP TABLE t6;

statement ok
DROP SINK s7;

statement ok
DROP TABLE t6;
DROP TABLE t7;

system ok
${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "COPY demo.demo_bhv_table TO './query_result.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"

system ok
${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "COPY demo.\"Test_uppercase\" TO './query_result2.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"

system ok
cat ./query_result.csv
----
1,1,1,1.1,1.2,test,2013-01-01,2013-01-01 01:01:01.000+0000,False


system ok
cat ./query_result2.csv
----
1,1,1
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public CassandraConfig(
@JsonProperty(value = "type") String type) {
this.url = url;
this.keyspace = keyspace;
this.table = table;
this.table = CassandraUtil.convertCQLIdentifiers(table);
this.datacenter = datacenter;
this.type = type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,15 @@ public void drop() {

private String createInsertStatement(String tableName, TableSchema tableSchema) {
String[] columnNames = tableSchema.getColumnNames();
String columnNamesString = String.join(", ", columnNames);
String columnNamesString =
Arrays.stream(columnNames)
.map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName))
.collect(Collectors.joining(", "));
String placeholdersString = String.join(", ", Collections.nCopies(columnNames.length, "?"));
System.out.println(
String.format(
"INSERT INTO %s (%s) VALUES (%s)",
tableName, columnNamesString, placeholdersString));
return String.format(
"INSERT INTO %s (%s) VALUES (%s)",
tableName, columnNamesString, placeholdersString);
Expand All @@ -204,11 +211,11 @@ private String createUpdateStatement(String tableName, TableSchema tableSchema)
List<String> primaryKeys = tableSchema.getPrimaryKeys();
String setClause = // cassandra does not allow SET on primary keys
nonKeyColumns.stream()
.map(columnName -> columnName + " = ?")
.map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?")
.collect(Collectors.joining(", "));
String whereClause =
primaryKeys.stream()
.map(columnName -> columnName + " = ?")
.map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?")
.collect(Collectors.joining(" AND "));
return String.format("UPDATE %s SET %s WHERE %s", tableName, setClause, whereClause);
}
Expand All @@ -217,7 +224,7 @@ private static String createDeleteStatement(String tableName, TableSchema tableS
List<String> primaryKeys = tableSchema.getPrimaryKeys();
String whereClause =
primaryKeys.stream()
.map(columnName -> columnName + " = ?")
.map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?")
.collect(Collectors.joining(" AND "));
return String.format("DELETE FROM %s WHERE %s", tableName, whereClause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,8 @@ public static Object convertRow(Object value, TypeName typeName) {
.asRuntimeException();
}
}

public static String convertCQLIdentifiers(String identifier) {
return "\"" + identifier + "\"";
}
}

0 comments on commit aba3232

Please sign in to comment.