From 0595aa1d901b2f99811ba1e7e40161b395ddd6a4 Mon Sep 17 00:00:00 2001 From: ka-weihe Date: Mon, 19 Aug 2024 18:44:08 +0200 Subject: [PATCH 1/4] feat(connector): add support for path-style access in Iceberg sink connector (#17747) Co-authored-by: ka-weihe Co-authored-by: lmatz --- src/connector/src/sink/iceberg/mod.rs | 22 +++++++++++++++++++++- src/connector/with_options_sink.yaml | 4 ++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 540fea13b6c03..2274b5805d81d 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -123,6 +123,13 @@ pub struct IcebergConfig { #[serde(rename = "s3.secret.key")] pub secret_key: String, + #[serde( + rename = "s3.path.style.access", + default, + deserialize_with = "deserialize_bool_from_string" + )] + pub path_style_access: bool, + #[serde( rename = "primary_key", default, @@ -270,6 +277,10 @@ impl IcebergConfig { "iceberg.table.io.secret_access_key".to_string(), self.secret_key.clone().to_string(), ); + iceberg_configs.insert( + "iceberg.table.io.enable_virtual_host_style".to_string(), + (!self.path_style_access).to_string(), + ); let (bucket, root) = { let url = Url::parse(&self.path).map_err(|e| SinkError::Iceberg(anyhow!(e)))?; @@ -409,7 +420,10 @@ impl IcebergConfig { "s3.secret-access-key".to_string(), self.secret_key.clone().to_string(), ); - + java_catalog_configs.insert( + "s3.path-style-access".to_string(), + self.path_style_access.to_string(), + ); if matches!(self.catalog_type.as_deref(), Some("glue")) { java_catalog_configs.insert( "client.credentials-provider".to_string(), @@ -1286,6 +1300,7 @@ mod test { ("s3.endpoint", "http://127.0.0.1:9301"), ("s3.access.key", "hummockadmin"), ("s3.secret.key", "hummockadmin"), + ("s3.path.style.access", "true"), ("s3.region", "us-east-1"), ("catalog.type", "jdbc"), ("catalog.name", "demo"), @@ -1315,6 +1330,7 @@ mod test { endpoint: Some("http://127.0.0.1:9301".to_string()), access_key: "hummockadmin".to_string(), secret_key: "hummockadmin".to_string(), + path_style_access: true, primary_key: Some(vec!["v1".to_string()]), java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")] .into_iter() @@ -1350,6 +1366,7 @@ mod test { ("s3.access.key", "hummockadmin"), ("s3.secret.key", "hummockadmin"), ("s3.region", "us-east-1"), + ("s3.path.style.access", "true"), ("catalog.name", "demo"), ("catalog.type", "storage"), ("warehouse.path", "s3://icebergdata/demo"), @@ -1374,6 +1391,7 @@ mod test { ("s3.access.key", "hummockadmin"), ("s3.secret.key", "hummockadmin"), ("s3.region", "us-east-1"), + ("s3.path.style.access", "true"), ("catalog.name", "demo"), ("catalog.type", "rest"), ("catalog.uri", "http://192.168.167.4:8181"), @@ -1399,6 +1417,7 @@ mod test { ("s3.access.key", "hummockadmin"), ("s3.secret.key", "hummockadmin"), ("s3.region", "us-east-1"), + ("s3.path.style.access", "true"), ("catalog.name", "demo"), ("catalog.type", "jdbc"), ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"), @@ -1426,6 +1445,7 @@ mod test { ("s3.access.key", "hummockadmin"), ("s3.secret.key", "hummockadmin"), ("s3.region", "us-east-1"), + ("s3.path.style.access", "true"), ("catalog.name", "demo"), ("catalog.type", "hive"), ("catalog.uri", "thrift://localhost:9083"), diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 653acaadaaaf1..36fa2559ab1af 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -272,6 +272,10 @@ IcebergConfig: - name: s3.secret.key field_type: String required: true + - name: s3.path.style.access + field_type: bool + required: false + default: Default::default - name: primary_key field_type: Vec required: false From 747245f12815715f6df6a3b97eafc1267578292d Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 20 Aug 2024 12:30:32 +0800 Subject: [PATCH 2/4] fix: udf error ui (#18118) Signed-off-by: Richard Chien --- e2e_test/error_ui/simple/main.slt | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/e2e_test/error_ui/simple/main.slt b/e2e_test/error_ui/simple/main.slt index f09e47302f3c6..e6907f25fff8b 100644 --- a/e2e_test/error_ui/simple/main.slt +++ b/e2e_test/error_ui/simple/main.slt @@ -27,11 +27,10 @@ db error: ERROR: Failed to run the query Caused by these errors (recent errors listed first): 1: failed to check UDF signature 2: failed to send requests to UDF service - 3: status: Unavailable, message: "error trying to connect: tcp connect error: deadline has elapsed", details: [], metadata: MetadataMap { headers: {} } + 3: status: Unknown, message: "transport error", details: [], metadata: MetadataMap { headers: {} } 4: transport error - 5: error trying to connect - 6: tcp connect error - 7: deadline has elapsed + 5: connection error + 6: connection reset statement error From e383ad60cb3a07c0008b53921816271fd64979a1 Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Tue, 20 Aug 2024 13:29:29 +0800 Subject: [PATCH 3/4] fix(connector): file source do not panic when credential is wrong (#17935) --- .../filesystem/opendal_source/opendal_enumerator.rs | 13 +++++++++++-- src/stream/src/executor/source/list_executor.rs | 4 ++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs index 864d1de56c7be..cffeb5dfe5f65 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs @@ -14,6 +14,7 @@ use std::marker::PhantomData; +use anyhow::anyhow; use async_trait::async_trait; use chrono::{DateTime, Utc}; use futures::stream::{self, BoxStream}; @@ -51,14 +52,22 @@ impl SplitEnumerator for OpendalEnumerator { async fn list_splits(&mut self) -> ConnectorResult>> { let empty_split: OpendalFsSplit = OpendalFsSplit::empty_split(); + let prefix = self.prefix.as_deref().unwrap_or("/"); - Ok(vec![empty_split]) + match self.op.list(prefix).await { + Ok(_) => return Ok(vec![empty_split]), + Err(e) => { + return Err(anyhow!(e) + .context("fail to create source, please check your config.") + .into()) + } + } } } impl OpendalEnumerator { pub async fn list(&self) -> ConnectorResult { - let prefix = self.prefix.as_deref().unwrap_or(""); + let prefix = self.prefix.as_deref().unwrap_or("/"); let object_lister = self .op diff --git a/src/stream/src/executor/source/list_executor.rs b/src/stream/src/executor/source/list_executor.rs index 25b32c0a0e4b8..c11ba773648ba 100644 --- a/src/stream/src/executor/source/list_executor.rs +++ b/src/stream/src/executor/source/list_executor.rs @@ -99,6 +99,10 @@ impl FsListExecutor { .collect::>(); let res: Vec<(Op, OwnedRow)> = rows.into_iter().flatten().collect(); + if res.is_empty() { + tracing::warn!("No items were listed from source."); + return Ok(StreamChunk::default()); + } Ok(StreamChunk::from_rows( &res, &[DataType::Varchar, DataType::Timestamptz, DataType::Int64], From aba3232cae52426211c261d43b456e778185a793 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Tue, 20 Aug 2024 14:49:42 +0800 Subject: [PATCH 4/4] fix(sink): fix sink in to Cassandra failed when using column name containing upper case letter (#17493) --- ci/scripts/common.sh | 13 +++++ ci/scripts/e2e-cassandra-sink-test.sh | 30 +++-------- ci/workflows/main-cron.yml | 37 +++++++------- e2e_test/sink/cassandra_sink.slt | 50 +++++++++++++++++-- .../risingwave/connector/CassandraConfig.java | 2 +- .../risingwave/connector/CassandraSink.java | 15 ++++-- .../risingwave/connector/CassandraUtil.java | 4 ++ 7 files changed, 100 insertions(+), 51 deletions(-) diff --git a/ci/scripts/common.sh b/ci/scripts/common.sh index 176c10b4ebc4f..3d47e1ae90768 100755 --- a/ci/scripts/common.sh +++ b/ci/scripts/common.sh @@ -115,3 +115,16 @@ get_latest_kafka_download_url() { local download_url="https://downloads.apache.org/kafka/${latest_version}/kafka_2.13-${latest_version}.tgz" echo "$download_url" } + +get_latest_cassandra_version() { + local versions=$(curl -s https://downloads.apache.org/cassandra/ | grep -Eo 'href="[0-9]+\.[0-9]+\.[0-9]+/"' | grep -Eo "[0-9]+\.[0-9]+\.[0-9]+") + # Sort the version numbers and get the latest one + local latest_version=$(echo "$versions" | sort -V | tail -n1) + echo "$latest_version" +} + +get_latest_cassandra_download_url() { + local latest_version=$(get_latest_cassandra_version) + local download_url="https://downloads.apache.org/cassandra/${latest_version}/apache-cassandra-${latest_version}-bin.tar.gz" + echo "$download_url" +} diff --git a/ci/scripts/e2e-cassandra-sink-test.sh b/ci/scripts/e2e-cassandra-sink-test.sh index 41ff0e97190d0..0e1c9a98d49e8 100755 --- a/ci/scripts/e2e-cassandra-sink-test.sh +++ b/ci/scripts/e2e-cassandra-sink-test.sh @@ -36,37 +36,21 @@ risedev ci-start ci-sink-test # Wait cassandra server to start sleep 40 -echo "--- create cassandra table" -curl https://downloads.apache.org/cassandra/4.1.3/apache-cassandra-4.1.3-bin.tar.gz --output apache-cassandra-4.1.3-bin.tar.gz -tar xfvz apache-cassandra-4.1.3-bin.tar.gz +echo "--- install cassandra" +wget $(get_latest_cassandra_download_url) -O cassandra_latest.tar.gz +tar xfvz cassandra_latest.tar.gz +export LATEST_CASSANDRA_VERSION=$(get_latest_cassandra_version) +export CASSANDRA_DIR="./apache-cassandra-${LATEST_CASSANDRA_VERSION}" # remove bundled packages, and use installed packages, because Python 3.12 has removed asyncore, but I failed to install libev support for bundled Python driver. -rm apache-cassandra-4.1.3/lib/six-1.12.0-py2.py3-none-any.zip -rm apache-cassandra-4.1.3/lib/cassandra-driver-internal-only-3.25.0.zip +rm ${CASSANDRA_DIR}/lib/six-1.12.0-py2.py3-none-any.zip +rm ${CASSANDRA_DIR}/lib/cassandra-driver-internal-only-3.25.0.zip apt-get install -y libev4 libev-dev pip3 install --break-system-packages cassandra-driver - -cd apache-cassandra-4.1.3/bin export CQLSH_HOST=cassandra-server export CQLSH_PORT=9042 -./cqlsh --request-timeout=20 -e "CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};use demo; -CREATE table demo_bhv_table(v1 int primary key,v2 smallint,v3 bigint,v4 float,v5 double,v6 text,v7 date,v8 timestamp,v9 boolean);" echo "--- testing sinks" -cd ../../ sqllogictest -p 4566 -d dev './e2e_test/sink/cassandra_sink.slt' -sleep 1 -cd apache-cassandra-4.1.3/bin -./cqlsh --request-timeout=20 -e "COPY demo.demo_bhv_table TO './query_result.csv' WITH HEADER = false AND ENCODING = 'UTF-8';" - -if cat ./query_result.csv | awk -F "," '{ - exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01.000+0000" && $9 == "False\r"); }'; then - echo "Cassandra sink check passed" -else - echo "The output is not as expected." - echo "output:" - cat ./query_result.csv - exit 1 -fi echo "--- Kill cluster" cd ../../ diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index ac5b27eee773d..18e54d7b70faf 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -901,25 +901,24 @@ steps: timeout_in_minutes: 10 retry: *auto-retry - # FIXME(xxhZs): https://github.com/risingwavelabs/risingwave/issues/17855 - # - label: "end-to-end cassandra sink test" - # key: "e2e-cassandra-sink-tests" - # command: "ci/scripts/e2e-cassandra-sink-test.sh -p ci-release" - # if: | - # !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null - # || build.pull_request.labels includes "ci/run-e2e-cassandra-sink-tests" - # || build.env("CI_STEPS") =~ /(^|,)e2e-cassandra-sink-tests?(,|$$)/ - # depends_on: - # - "build" - # - "build-other" - # plugins: - # - docker-compose#v5.1.0: - # run: sink-test-env - # config: ci/docker-compose.yml - # mount-buildkite-agent: true - # - ./ci/plugins/upload-failure-logs - # timeout_in_minutes: 10 - # retry: *auto-retry + - label: "end-to-end cassandra sink test" + key: "e2e-cassandra-sink-tests" + command: "ci/scripts/e2e-cassandra-sink-test.sh -p ci-release" + if: | + !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-e2e-cassandra-sink-tests" + || build.env("CI_STEPS") =~ /(^|,)e2e-cassandra-sink-tests?(,|$$)/ + depends_on: + - "build" + - "build-other" + plugins: + - docker-compose#v5.1.0: + run: sink-test-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 10 + retry: *auto-retry - label: "end-to-end clickhouse sink test" key: "e2e-clickhouse-sink-tests" diff --git a/e2e_test/sink/cassandra_sink.slt b/e2e_test/sink/cassandra_sink.slt index 7091e8da70783..fe5ca331b591a 100644 --- a/e2e_test/sink/cassandra_sink.slt +++ b/e2e_test/sink/cassandra_sink.slt @@ -1,13 +1,19 @@ +system ok +${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};use demo;CREATE table demo_bhv_table(v1 int primary key,v2 smallint,v3 bigint,v4 float,v5 double,v6 text,v7 date,v8 timestamp,v9 boolean);" + +system ok +${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "use demo;CREATE table \"Test_uppercase\"(\"TEST_V1\" int primary key, \"TEST_V2\" int,\"TEST_V3\" int);" + statement ok CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean); statement ok -CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; +CREATE TABLE t7 ("TEST_V1" int primary key, "TEST_V2" int, "TEST_V3" int); statement ok CREATE SINK s6 FROM - mv6 WITH ( + t6 WITH ( connector = 'cassandra', type = 'append-only', force_append_only='true', @@ -17,9 +23,25 @@ FROM cassandra.datacenter = 'datacenter1', ); +statement ok +CREATE SINK s7 +FROM + t7 WITH ( + connector = 'cassandra', + type = 'append-only', + force_append_only='true', + cassandra.url = 'cassandra-server:9042', + cassandra.keyspace = 'demo', + cassandra.table = 'Test_uppercase', + cassandra.datacenter = 'datacenter1', +); + statement ok INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false); +statement ok +INSERT INTO t7 VALUES (1, 1, 1); + statement ok FLUSH; @@ -27,7 +49,27 @@ statement ok DROP SINK s6; statement ok -DROP MATERIALIZED VIEW mv6; +DROP TABLE t6; + +statement ok +DROP SINK s7; statement ok -DROP TABLE t6; \ No newline at end of file +DROP TABLE t7; + +system ok +${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "COPY demo.demo_bhv_table TO './query_result.csv' WITH HEADER = false AND ENCODING = 'UTF-8';" + +system ok +${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "COPY demo.\"Test_uppercase\" TO './query_result2.csv' WITH HEADER = false AND ENCODING = 'UTF-8';" + +system ok +cat ./query_result.csv +---- +1,1,1,1.1,1.2,test,2013-01-01,2013-01-01 01:01:01.000+0000,False + + +system ok +cat ./query_result2.csv +---- +1,1,1 \ No newline at end of file diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java index bfc40111818a4..afc29e2f5f43d 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java @@ -57,7 +57,7 @@ public CassandraConfig( @JsonProperty(value = "type") String type) { this.url = url; this.keyspace = keyspace; - this.table = table; + this.table = CassandraUtil.convertCQLIdentifiers(table); this.datacenter = datacenter; this.type = type; } diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java index 2f8a035911f24..e277505400b55 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java @@ -193,8 +193,15 @@ public void drop() { private String createInsertStatement(String tableName, TableSchema tableSchema) { String[] columnNames = tableSchema.getColumnNames(); - String columnNamesString = String.join(", ", columnNames); + String columnNamesString = + Arrays.stream(columnNames) + .map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName)) + .collect(Collectors.joining(", ")); String placeholdersString = String.join(", ", Collections.nCopies(columnNames.length, "?")); + System.out.println( + String.format( + "INSERT INTO %s (%s) VALUES (%s)", + tableName, columnNamesString, placeholdersString)); return String.format( "INSERT INTO %s (%s) VALUES (%s)", tableName, columnNamesString, placeholdersString); @@ -204,11 +211,11 @@ private String createUpdateStatement(String tableName, TableSchema tableSchema) List primaryKeys = tableSchema.getPrimaryKeys(); String setClause = // cassandra does not allow SET on primary keys nonKeyColumns.stream() - .map(columnName -> columnName + " = ?") + .map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?") .collect(Collectors.joining(", ")); String whereClause = primaryKeys.stream() - .map(columnName -> columnName + " = ?") + .map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?") .collect(Collectors.joining(" AND ")); return String.format("UPDATE %s SET %s WHERE %s", tableName, setClause, whereClause); } @@ -217,7 +224,7 @@ private static String createDeleteStatement(String tableName, TableSchema tableS List primaryKeys = tableSchema.getPrimaryKeys(); String whereClause = primaryKeys.stream() - .map(columnName -> columnName + " = ?") + .map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?") .collect(Collectors.joining(" AND ")); return String.format("DELETE FROM %s WHERE %s", tableName, whereClause); } diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java index 463bdb9d3f113..37fdd0ccdd4d2 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java @@ -167,4 +167,8 @@ public static Object convertRow(Object value, TypeName typeName) { .asRuntimeException(); } } + + public static String convertCQLIdentifiers(String identifier) { + return "\"" + identifier + "\""; + } }