From db803486a71fa6ca7976984c6441294dc5051398 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Thu, 22 Feb 2024 16:38:24 +0800 Subject: [PATCH] Make linters happy --- .licenserc.yaml | 1 + integration_tests/iceberg-sink2/README.md | 2 +- .../catalog/JniCatalogWrapperTest.java | 43 +++- src/connector/src/sink/iceberg/mod.rs | 214 +++++++++--------- 4 files changed, 143 insertions(+), 117 deletions(-) diff --git a/.licenserc.yaml b/.licenserc.yaml index 137943a1f6dd0..d0f6a10a7fec5 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -17,6 +17,7 @@ header: - "**/*.d.ts" - "src/sqlparser/**/*.rs" - "java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/*.java" + - "java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/*.java" - "src/meta/model_v2/migration/**/*.rs" - "lints/ui/**" diff --git a/integration_tests/iceberg-sink2/README.md b/integration_tests/iceberg-sink2/README.md index a280ff5b87c59..496111fca71fe 100644 --- a/integration_tests/iceberg-sink2/README.md +++ b/integration_tests/iceberg-sink2/README.md @@ -8,7 +8,7 @@ poetry update poetry run python main.py ``` -# How to override risingwave iamge version: +# How to override risingwave image version: ```bash export RW_IMAGE= diff --git a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/catalog/JniCatalogWrapperTest.java b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/catalog/JniCatalogWrapperTest.java index 08fa0fdb85e80..500c222211b48 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/catalog/JniCatalogWrapperTest.java +++ b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/catalog/JniCatalogWrapperTest.java @@ -1,7 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package com.risingwave.connector.catalog; +import org.junit.Ignore; import org.junit.Test; +@Ignore public class JniCatalogWrapperTest { @Test public void testJdbc() throws Exception { @@ -10,17 +31,17 @@ public void testJdbc() throws Exception { JniCatalogWrapper.create( "demo", "org.apache.iceberg.jdbc.JdbcCatalog", - new String[] { - "uri", "jdbc:postgresql://172.17.0.3:5432/iceberg", - "jdbc.user", "admin", - "jdbc.password", "123456", - "warehouse", "s3://icebergdata/demo", - "io-impl", "org.apache.iceberg.aws.s3.S3FileIO", - "s3.endpoint", "http://172.17.0.2:9301", - "s3.region", "us-east-1", - "s3.path-style-access", "true", - "s3.access-key-id", "hummockadmin", - "s3.secret-access-key", "hummockadmin", + new String[]{ + "uri", "jdbc:postgresql://172.17.0.3:5432/iceberg", + "jdbc.user", "admin", + "jdbc.password", "123456", + "warehouse", "s3://icebergdata/demo", + "io-impl", "org.apache.iceberg.aws.s3.S3FileIO", + "s3.endpoint", "http://172.17.0.2:9301", + "s3.region", "us-east-1", + "s3.path-style-access", "true", + "s3.access-key-id", "hummockadmin", + "s3.secret-access-key", "hummockadmin", }); System.out.println(catalog.loadTable("s1.t1")); diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 6730320549ea9..9b3d86d840811 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -310,7 +310,7 @@ impl IcebergConfig { self.secret_key.clone().to_string(), ); - let (bucket, root) = { + let (bucket, _) = { let url = Url::parse(&self.path).map_err(|e| SinkError::Iceberg(anyhow!(e)))?; let bucket = url .host_str() @@ -1084,108 +1084,112 @@ mod test { ); } - // async fn test_create_catalog(configs: HashMap) { - // let iceberg_config = IcebergConfig::from_hashmap(configs).unwrap(); - // - // let table = iceberg_config.load_table().await.unwrap(); - // - // println!("{:?}", table.table_name()); - // } - // - // #[tokio::test] - // async fn test_storage_catalog() { - // let values = [ - // ("connector", "iceberg"), - // ("type", "append-only"), - // ("force_append_only", "true"), - // ("s3.endpoint", "http://127.0.0.1:9301"), - // ("s3.access.key", "hummockadmin"), - // ("s3.secret.key", "hummockadmin"), - // ("s3.region", "us-east-1"), - // ("catalog.name", "demo"), - // ("catalog.type", "storage"), - // ("warehouse.path", "s3://icebergdata/demo"), - // ("database.name", "s1"), - // ("table.name", "t1"), - // ] - // .into_iter() - // .map(|(k, v)| (k.to_string(), v.to_string())) - // .collect(); - // - // test_create_catalog(values).await; - // } - // - // #[tokio::test] - // async fn test_rest_catalog() { - // let values = [ - // ("connector", "iceberg"), - // ("type", "append-only"), - // ("force_append_only", "true"), - // ("s3.endpoint", "http://127.0.0.1:9301"), - // ("s3.access.key", "hummockadmin"), - // ("s3.secret.key", "hummockadmin"), - // ("s3.region", "us-east-1"), - // ("catalog.name", "demo"), - // ("catalog.type", "rest"), - // ("catalog.uri", "http://192.168.167.4:8181"), - // ("warehouse.path", "s3://icebergdata/demo"), - // ("database.name", "s1"), - // ("table.name", "t1"), - // ] - // .into_iter() - // .map(|(k, v)| (k.to_string(), v.to_string())) - // .collect(); - // - // test_create_catalog(values).await; - // } - // - // #[tokio::test] - // async fn test_jdbc_catalog() { - // let values = [ - // ("connector", "iceberg"), - // ("type", "append-only"), - // ("force_append_only", "true"), - // ("s3.endpoint", "http://127.0.0.1:9301"), - // ("s3.access.key", "hummockadmin"), - // ("s3.secret.key", "hummockadmin"), - // ("s3.region", "us-east-1"), - // ("catalog.name", "demo"), - // ("catalog.type", "jdbc"), - // ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"), - // ("catalog.jdbc.user", "admin"), - // ("catalog.jdbc.password", "123456"), - // ("warehouse.path", "s3://icebergdata/demo"), - // ("database.name", "s1"), - // ("table.name", "t1"), - // ] - // .into_iter() - // .map(|(k, v)| (k.to_string(), v.to_string())) - // .collect(); - // - // test_create_catalog(values).await; - // } - // - // #[tokio::test] - // async fn test_hive_catalog() { - // let values = [ - // ("connector", "iceberg"), - // ("type", "append-only"), - // ("force_append_only", "true"), - // ("s3.endpoint", "http://127.0.0.1:9301"), - // ("s3.access.key", "hummockadmin"), - // ("s3.secret.key", "hummockadmin"), - // ("s3.region", "us-east-1"), - // ("catalog.name", "demo"), - // ("catalog.type", "hive"), - // ("catalog.uri", "thrift://localhost:9083"), - // ("warehouse.path", "s3://icebergdata/demo"), - // ("database.name", "s1"), - // ("table.name", "t1"), - // ] - // .into_iter() - // .map(|(k, v)| (k.to_string(), v.to_string())) - // .collect(); - // - // test_create_catalog(values).await; - // } + async fn test_create_catalog(configs: HashMap) { + let iceberg_config = IcebergConfig::from_hashmap(configs).unwrap(); + + let table = iceberg_config.load_table().await.unwrap(); + + println!("{:?}", table.table_name()); + } + + #[tokio::test] + #[ignore] + async fn test_storage_catalog() { + let values = [ + ("connector", "iceberg"), + ("type", "append-only"), + ("force_append_only", "true"), + ("s3.endpoint", "http://127.0.0.1:9301"), + ("s3.access.key", "hummockadmin"), + ("s3.secret.key", "hummockadmin"), + ("s3.region", "us-east-1"), + ("catalog.name", "demo"), + ("catalog.type", "storage"), + ("warehouse.path", "s3://icebergdata/demo"), + ("database.name", "s1"), + ("table.name", "t1"), + ] + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + + test_create_catalog(values).await; + } + + #[tokio::test] + #[ignore] + async fn test_rest_catalog() { + let values = [ + ("connector", "iceberg"), + ("type", "append-only"), + ("force_append_only", "true"), + ("s3.endpoint", "http://127.0.0.1:9301"), + ("s3.access.key", "hummockadmin"), + ("s3.secret.key", "hummockadmin"), + ("s3.region", "us-east-1"), + ("catalog.name", "demo"), + ("catalog.type", "rest"), + ("catalog.uri", "http://192.168.167.4:8181"), + ("warehouse.path", "s3://icebergdata/demo"), + ("database.name", "s1"), + ("table.name", "t1"), + ] + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + + test_create_catalog(values).await; + } + + #[tokio::test] + #[ignore] + async fn test_jdbc_catalog() { + let values = [ + ("connector", "iceberg"), + ("type", "append-only"), + ("force_append_only", "true"), + ("s3.endpoint", "http://127.0.0.1:9301"), + ("s3.access.key", "hummockadmin"), + ("s3.secret.key", "hummockadmin"), + ("s3.region", "us-east-1"), + ("catalog.name", "demo"), + ("catalog.type", "jdbc"), + ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"), + ("catalog.jdbc.user", "admin"), + ("catalog.jdbc.password", "123456"), + ("warehouse.path", "s3://icebergdata/demo"), + ("database.name", "s1"), + ("table.name", "t1"), + ] + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + + test_create_catalog(values).await; + } + + #[tokio::test] + #[ignore] + async fn test_hive_catalog() { + let values = [ + ("connector", "iceberg"), + ("type", "append-only"), + ("force_append_only", "true"), + ("s3.endpoint", "http://127.0.0.1:9301"), + ("s3.access.key", "hummockadmin"), + ("s3.secret.key", "hummockadmin"), + ("s3.region", "us-east-1"), + ("catalog.name", "demo"), + ("catalog.type", "hive"), + ("catalog.uri", "thrift://localhost:9083"), + ("warehouse.path", "s3://icebergdata/demo"), + ("database.name", "s1"), + ("table.name", "t1"), + ] + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + + test_create_catalog(values).await; + } }