Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sink): fix sink in to Cassandra failed when using column name containing upper case letter #17493

Merged
merged 14 commits into from
Aug 20, 2024
13 changes: 13 additions & 0 deletions ci/scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,16 @@ get_latest_kafka_download_url() {
local download_url="https://downloads.apache.org/kafka/${latest_version}/kafka_2.13-${latest_version}.tgz"
echo "$download_url"
}

get_latest_cassandra_version() {
local versions=$(curl -s https://downloads.apache.org/cassandra/ | grep -Eo 'href="[0-9]+\.[0-9]+\.[0-9]+/"' | grep -Eo "[0-9]+\.[0-9]+\.[0-9]+")
# Sort the version numbers and get the latest one
local latest_version=$(echo "$versions" | sort -V | tail -n1)
echo "$latest_version"
}

get_latest_cassandra_download_url() {
local latest_version=$(get_latest_cassandra_version)
local download_url="https://downloads.apache.org/cassandra/${latest_version}/apache-cassandra-${latest_version}-bin.tar.gz"
echo "$download_url"
}
30 changes: 17 additions & 13 deletions ci/scripts/e2e-cassandra-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,21 @@ risedev ci-start ci-sink-test
# Wait cassandra server to start
sleep 40

echo "--- create cassandra table"
curl https://downloads.apache.org/cassandra/4.1.3/apache-cassandra-4.1.3-bin.tar.gz --output apache-cassandra-4.1.3-bin.tar.gz
tar xfvz apache-cassandra-4.1.3-bin.tar.gz
echo "--- install cassandra"
wget $(get_latest_cassandra_download_url) -O cassandra_latest.tar.gz
tar xfvz cassandra_latest.tar.gz
export LATEST_CASSANDRA_VERSION=$(get_latest_cassandra_version)
export CASSANDRA_DIR="./apache-cassandra-${LATEST_CASSANDRA_VERSION}"
# remove bundled packages, and use installed packages, because Python 3.12 has removed asyncore, but I failed to install libev support for bundled Python driver.
rm apache-cassandra-4.1.3/lib/six-1.12.0-py2.py3-none-any.zip
rm apache-cassandra-4.1.3/lib/cassandra-driver-internal-only-3.25.0.zip
rm ${CASSANDRA_DIR}/lib/six-1.12.0-py2.py3-none-any.zip
rm ${CASSANDRA_DIR}/lib/cassandra-driver-internal-only-3.25.0.zip
apt-get install -y libev4 libev-dev
pip3 install --break-system-packages cassandra-driver

cd apache-cassandra-4.1.3/bin
export CQLSH_HOST=cassandra-server
export CQLSH_PORT=9042
./cqlsh --request-timeout=20 -e "CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};use demo;
CREATE table demo_bhv_table(v1 int primary key,v2 smallint,v3 bigint,v4 float,v5 double,v6 text,v7 date,v8 timestamp,v9 boolean);"

echo "--- testing sinks"
cd ../../
sqllogictest -p 4566 -d dev './e2e_test/sink/cassandra_sink.slt'
sleep 1
cd apache-cassandra-4.1.3/bin
./cqlsh --request-timeout=20 -e "COPY demo.demo_bhv_table TO './query_result.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"

if cat ./query_result.csv | awk -F "," '{
exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01.000+0000" && $9 == "False\r"); }'; then
Expand All @@ -68,6 +62,16 @@ else
exit 1
fi

if cat ./query_result2.csv | awk -F "," '{
exit !($1 == 1 && $2 == 1 && $3 == "1\r"); }'; then
echo "Cassandra sink check passed"
else
echo "The output is not as expected."
echo "output:"
cat ./query_result2.csv
exit 1
fi
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we can also put the validation logic in cassandra_sink.slt using system command

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okk


echo "--- Kill cluster"
cd ../../
risedev ci-kill
37 changes: 18 additions & 19 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -881,25 +881,24 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

# FIXME(xxhZs): https://github.com/risingwavelabs/risingwave/issues/17855
# - label: "end-to-end cassandra sink test"
# key: "e2e-cassandra-sink-tests"
# command: "ci/scripts/e2e-cassandra-sink-test.sh -p ci-release"
# if: |
# !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
# || build.pull_request.labels includes "ci/run-e2e-cassandra-sink-tests"
# || build.env("CI_STEPS") =~ /(^|,)e2e-cassandra-sink-tests?(,|$$)/
# depends_on:
# - "build"
# - "build-other"
# plugins:
# - docker-compose#v5.1.0:
# run: sink-test-env
# config: ci/docker-compose.yml
# mount-buildkite-agent: true
# - ./ci/plugins/upload-failure-logs
# timeout_in_minutes: 10
# retry: *auto-retry
- label: "end-to-end cassandra sink test"
key: "e2e-cassandra-sink-tests"
command: "ci/scripts/e2e-cassandra-sink-test.sh -p ci-release"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-cassandra-sink-tests"
|| build.env("CI_STEPS") =~ /(^|,)e2e-cassandra-sink-tests?(,|$$)/
depends_on:
- "build"
- "build-other"
plugins:
- docker-compose#v5.1.0:
run: sink-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end clickhouse sink test"
key: "e2e-clickhouse-sink-tests"
Expand Down
39 changes: 35 additions & 4 deletions e2e_test/sink/cassandra_sink.slt
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
system ok
${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};use demo;CREATE table demo_bhv_table(v1 int primary key,v2 smallint,v3 bigint,v4 float,v5 double,v6 text,v7 date,v8 timestamp,v9 boolean);"

system ok
${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "use demo;CREATE table \"Test_uppercase\"(\"TEST_V1\" int primary key, \"TEST_V2\" int,\"TEST_V3\" int);"

statement ok
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
CREATE TABLE t7 ("TEST_V1" int primary key, "TEST_V2" int, "TEST_V3" int);

statement ok
CREATE SINK s6
FROM
mv6 WITH (
t6 WITH (
connector = 'cassandra',
type = 'append-only',
force_append_only='true',
Expand All @@ -17,17 +23,42 @@ FROM
cassandra.datacenter = 'datacenter1',
);

statement ok
CREATE SINK s7
FROM
t7 WITH (
connector = 'cassandra',
type = 'append-only',
force_append_only='true',
cassandra.url = 'cassandra-server:9042',
cassandra.keyspace = 'demo',
cassandra.table = 'Test_uppercase',
cassandra.datacenter = 'datacenter1',
);

statement ok
INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false);

statement ok
INSERT INTO t7 VALUES (1, 1, 1);

statement ok
FLUSH;

statement ok
DROP SINK s6;

statement ok
DROP MATERIALIZED VIEW mv6;
DROP TABLE t6;

statement ok
DROP TABLE t6;
DROP SINK s7;

statement ok
DROP TABLE t7;

system ok
${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "COPY demo.demo_bhv_table TO './query_result.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"

system ok
${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "COPY demo.\"Test_uppercase\" TO './query_result2.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public CassandraConfig(
@JsonProperty(value = "type") String type) {
this.url = url;
this.keyspace = keyspace;
this.table = table;
this.table = CassandraUtil.convertCQLIdentifiers(table);
this.datacenter = datacenter;
this.type = type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,15 @@ public void drop() {

private String createInsertStatement(String tableName, TableSchema tableSchema) {
String[] columnNames = tableSchema.getColumnNames();
String columnNamesString = String.join(", ", columnNames);
String columnNamesString =
Arrays.stream(columnNames)
.map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName))
.collect(Collectors.joining(", "));
String placeholdersString = String.join(", ", Collections.nCopies(columnNames.length, "?"));
System.out.println(
String.format(
"INSERT INTO %s (%s) VALUES (%s)",
tableName, columnNamesString, placeholdersString));
return String.format(
"INSERT INTO %s (%s) VALUES (%s)",
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
tableName, columnNamesString, placeholdersString);
Expand All @@ -204,11 +211,11 @@ private String createUpdateStatement(String tableName, TableSchema tableSchema)
List<String> primaryKeys = tableSchema.getPrimaryKeys();
String setClause = // cassandra does not allow SET on primary keys
nonKeyColumns.stream()
.map(columnName -> columnName + " = ?")
.map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?")
.collect(Collectors.joining(", "));
String whereClause =
primaryKeys.stream()
.map(columnName -> columnName + " = ?")
.map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?")
.collect(Collectors.joining(" AND "));
return String.format("UPDATE %s SET %s WHERE %s", tableName, setClause, whereClause);
}
Expand All @@ -217,7 +224,7 @@ private static String createDeleteStatement(String tableName, TableSchema tableS
List<String> primaryKeys = tableSchema.getPrimaryKeys();
String whereClause =
primaryKeys.stream()
.map(columnName -> columnName + " = ?")
.map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?")
.collect(Collectors.joining(" AND "));
return String.format("DELETE FROM %s WHERE %s", tableName, whereClause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,8 @@ public static Object convertRow(Object value, TypeName typeName) {
.asRuntimeException();
}
}

public static String convertCQLIdentifiers(String identifier) {
return "\"" + identifier + "\"";
}
}
Loading