From 38785f7e81239d641e43976be1a58f1feec92529 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Wed, 24 Jan 2024 23:26:05 +0800 Subject: [PATCH] refactor: Remove unused iceberg java (#14779) --- .../sink/iceberg/IcebergSinkFactoryTest.java | 152 ----------- .../sink/iceberg/IcebergSinkLocalTest.java | 217 --------------- .../iceberg/IcebergSinkPartitionTest.java | 233 ---------------- .../sink/iceberg/SinkRowMapTest.java | 209 --------------- .../iceberg/UpsertIcebergSinkLocalTest.java | 224 ---------------- .../UpsertIcebergSinkPartitionTest.java | 237 ----------------- .../AppendOnlyIcebergSinkWriter.java | 146 ----------- .../risingwave/connector/IcebergMetadata.java | 31 --- .../connector/IcebergSinkConfig.java | 69 ----- .../connector/IcebergSinkCoordinator.java | 82 ------ .../connector/IcebergSinkFactory.java | 187 ------------- .../risingwave/connector/IcebergSinkUtil.java | 112 -------- .../connector/IcebergSinkWriterBase.java | 81 ------ .../com/risingwave/connector/SinkRowMap.java | 86 ------ .../com/risingwave/connector/SinkRowOp.java | 67 ----- .../connector/UpsertIcebergSinkWriter.java | 248 ------------------ 16 files changed, 2381 deletions(-) delete mode 100755 java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkFactoryTest.java delete mode 100644 java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkLocalTest.java delete mode 100644 java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkPartitionTest.java delete mode 100644 java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/SinkRowMapTest.java delete mode 100644 java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/UpsertIcebergSinkLocalTest.java delete mode 100644 java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/UpsertIcebergSinkPartitionTest.java delete mode 100644 java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/AppendOnlyIcebergSinkWriter.java delete mode 100644 java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergMetadata.java delete mode 100644 java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkConfig.java delete mode 100644 java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkCoordinator.java delete mode 100644 java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java delete mode 100644 java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkUtil.java delete mode 100644 java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkWriterBase.java delete mode 100644 java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowMap.java delete mode 100644 java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowOp.java delete mode 100644 java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSinkWriter.java diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkFactoryTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkFactoryTest.java deleted file mode 100755 index 8294dfaa46f0d..0000000000000 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkFactoryTest.java +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.risingwave.connector.sink.iceberg; - -import static org.junit.Assert.*; - -import com.google.common.collect.Lists; -import com.risingwave.connector.AppendOnlyIcebergSinkWriter; -import com.risingwave.connector.IcebergSinkFactory; -import com.risingwave.connector.TestUtils; -import com.risingwave.connector.api.TableSchema; -import com.risingwave.proto.Catalog.SinkType; -import com.risingwave.proto.Data; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Map; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.types.Types; -import org.junit.Ignore; -import org.junit.Test; - -@Ignore -public class IcebergSinkFactoryTest { - static String warehousePath = "file:///tmp/rw-sinknode/iceberg-sink/warehouse"; - static String databaseName = "demo_db"; - static String tableName = "demo_table"; - static String sinkMode = "append-only"; - static Schema icebergTableSchema = - new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.required(2, "name", Types.StringType.get())); - - private void createMockTable() throws IOException { - if (!Paths.get(warehousePath).toFile().isDirectory()) { - Files.createDirectories(Paths.get(warehousePath)); - } - HadoopCatalog catalog = new HadoopCatalog(new Configuration(), warehousePath); - TableIdentifier tableIdent = TableIdentifier.of(databaseName, tableName); - try { - catalog.dropTable(tableIdent); - } catch (Exception e) { - // Ignored. - } - PartitionSpec spec = PartitionSpec.unpartitioned(); - catalog.createTable(tableIdent, icebergTableSchema, spec, Map.of("format-version", "2")); - catalog.close(); - } - - @Test - public void testCreate() throws IOException { - createMockTable(); - IcebergSinkFactory sinkFactory = new IcebergSinkFactory(); - AppendOnlyIcebergSinkWriter sink = - (AppendOnlyIcebergSinkWriter) - sinkFactory.createWriter( - TestUtils.getMockTableSchema(), - Map.of( - "type", - sinkMode, - "warehouse.path", - warehousePath, - "database.name", - databaseName, - "table.name", - tableName)); - try { - assertTrue( - sink.getHadoopCatalog() - .tableExists(TableIdentifier.of(databaseName, tableName))); - assertEquals( - sink.getIcebergTable().location(), - warehousePath + "/" + databaseName + "/" + tableName); - } catch (Exception e) { - fail("Exception: " + e); - } finally { - sink.drop(); - } - } - - @Test(expected = RuntimeException.class) - public void testValidateSchemaName() throws IOException { - createMockTable(); - IcebergSinkFactory sinkFactory = new IcebergSinkFactory(); - Map tableProperties = - Map.of( - "type", - sinkMode, - "warehouse.path", - warehousePath, - "database.name", - databaseName, - "table.name", - tableName); - TableSchema diffTypeTableSchema = - new TableSchema( - Lists.newArrayList("id", "names"), - Lists.newArrayList( - Data.DataType.newBuilder() - .setTypeName(Data.DataType.TypeName.INT32) - .build(), - Data.DataType.newBuilder() - .setTypeName(Data.DataType.TypeName.VARCHAR) - .build()), - Lists.newArrayList("id")); - sinkFactory.validate(diffTypeTableSchema, tableProperties, SinkType.SINK_TYPE_APPEND_ONLY); - } - - @Test(expected = RuntimeException.class) - public void testValidateSchemaType() throws IOException { - createMockTable(); - IcebergSinkFactory sinkFactory = new IcebergSinkFactory(); - Map tableProperties = - Map.of( - "type", - sinkMode, - "warehouse.path", - warehousePath, - "database.name", - databaseName, - "table.name", - tableName); - TableSchema diffTypeTableSchema = - new TableSchema( - Lists.newArrayList("id", "name"), - Lists.newArrayList( - Data.DataType.newBuilder() - .setTypeName(Data.DataType.TypeName.INT32) - .build(), - Data.DataType.newBuilder() - .setTypeName(Data.DataType.TypeName.INT32) - .build()), - Lists.newArrayList("id")); - sinkFactory.validate(diffTypeTableSchema, tableProperties, SinkType.SINK_TYPE_APPEND_ONLY); - } -} diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkLocalTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkLocalTest.java deleted file mode 100644 index 07dc39a0d19c3..0000000000000 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkLocalTest.java +++ /dev/null @@ -1,217 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.risingwave.connector.sink.iceberg; - -import static com.risingwave.proto.Data.*; -import static org.junit.Assert.*; -import static org.junit.Assert.assertEquals; - -import com.google.common.collect.Sets; -import com.risingwave.connector.AppendOnlyIcebergSinkWriter; -import com.risingwave.connector.IcebergSinkCoordinator; -import com.risingwave.connector.TestUtils; -import com.risingwave.connector.api.sink.ArraySinkRow; -import com.risingwave.proto.ConnectorServiceProto; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.*; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.*; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.IcebergGenerics; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.types.Types; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.junit.Ignore; -import org.junit.Test; - -@Ignore -public class IcebergSinkLocalTest { - static String warehousePath = "/tmp/rw-sinknode/iceberg-sink/warehouse"; - static String databaseName = "demo_db"; - static String tableName = "demo_table"; - static Schema icebergTableSchema = - new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.required(2, "name", Types.StringType.get())); - - private void createMockTable() throws IOException { - if (!Paths.get(warehousePath).toFile().isDirectory()) { - Files.createDirectories(Paths.get(warehousePath)); - } - HadoopCatalog catalog = new HadoopCatalog(new Configuration(), warehousePath); - TableIdentifier tableIdent = TableIdentifier.of(databaseName, tableName); - try { - catalog.dropTable(tableIdent); - } catch (Exception e) { - // Ignored. - } - PartitionSpec spec = PartitionSpec.unpartitioned(); - catalog.createTable(tableIdent, icebergTableSchema, spec, Map.of("format-version", "2")); - catalog.close(); - } - - private void validateTableWithIceberg(Set expected) { - HadoopCatalog catalog = new HadoopCatalog(new Configuration(), warehousePath); - TableIdentifier tableIdent = TableIdentifier.of(databaseName, tableName); - Table icebergTable = catalog.loadTable(tableIdent); - CloseableIterable iter = IcebergGenerics.read(icebergTable).build(); - Set actual = Sets.newHashSet(iter); - assertEquals(expected.size(), actual.size()); - assertEquals(expected, actual); - } - - private void validateTableWithSpark(Set expected) { - SparkConf sparkConf = new SparkConf(); - sparkConf.set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog"); - sparkConf.set("spark.sql.catalog.demo.type", "hadoop"); - sparkConf.set("spark.sql.catalog.demo.warehouse", warehousePath); - sparkConf.set("spark.sql.catalog.defaultCatalog", "demo"); - SparkSession spark = SparkSession.builder().master("local").config(sparkConf).getOrCreate(); - List rows = - spark.read() - .format("iceberg") - .load(String.format("demo.%s.%s", databaseName, tableName)) - .collectAsList(); - spark.close(); - Set actual = new HashSet<>(); - for (Row row : rows) { - int id = row.getInt(0); - String name = row.getString(1); - Record record = GenericRecord.create(icebergTableSchema); - record.setField("id", id); - record.setField("name", name); - actual.add(record); - } - assertEquals(expected.size(), actual.size()); - assertEquals(expected, actual); - } - - @Test - public void testSync() throws IOException { - createMockTable(); - Configuration hadoopConf = new Configuration(); - HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); - TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); - Table icebergTable = hadoopCatalog.loadTable(tableIdentifier); - IcebergSinkCoordinator coordinator = new IcebergSinkCoordinator(icebergTable); - AppendOnlyIcebergSinkWriter sink = - new AppendOnlyIcebergSinkWriter( - TestUtils.getMockTableSchema(), - hadoopCatalog, - hadoopCatalog.loadTable(tableIdentifier), - FileFormat.PARQUET); - - try { - sink.beginEpoch(233); - sink.write(Arrays.asList(new ArraySinkRow(Op.INSERT, 1, "Alice"))); - ConnectorServiceProto.SinkMetadata metadata = sink.barrier(true).get(); - coordinator.commit(233, Collections.singletonList(metadata)); - - Record record1 = GenericRecord.create(icebergTableSchema); - record1.setField("id", 1); - record1.setField("name", "Alice"); - Set expected = Sets.newHashSet(record1); - validateTableWithIceberg(expected); - validateTableWithSpark(expected); - - sink.beginEpoch(234); - sink.write(Arrays.asList(new ArraySinkRow(Op.INSERT, 2, "Bob"))); - validateTableWithIceberg(expected); - validateTableWithSpark(expected); - - metadata = sink.barrier(true).get(); - coordinator.commit(234, Collections.singletonList(metadata)); - - Record record2 = GenericRecord.create(icebergTableSchema); - record2.setField("id", 2); - record2.setField("name", "Bob"); - expected.add(record2); - validateTableWithIceberg(expected); - validateTableWithSpark(expected); - } catch (Exception e) { - fail("Exception: " + e); - } finally { - sink.drop(); - } - } - - @Test - public void testWrite() throws IOException { - createMockTable(); - Configuration hadoopConf = new Configuration(); - HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); - TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); - Table icebergTable = hadoopCatalog.loadTable(tableIdentifier); - IcebergSinkCoordinator coordinator = new IcebergSinkCoordinator(icebergTable); - AppendOnlyIcebergSinkWriter sink = - new AppendOnlyIcebergSinkWriter( - TestUtils.getMockTableSchema(), - hadoopCatalog, - icebergTable, - FileFormat.PARQUET); - - try { - sink.beginEpoch(233); - sink.write( - Arrays.asList( - new ArraySinkRow(Op.INSERT, 1, "Alice"), - new ArraySinkRow(Op.INSERT, 2, "Bob"))); - ConnectorServiceProto.SinkMetadata metadata = sink.barrier(true).get(); - coordinator.commit(233, Collections.singletonList(metadata)); - - Record record1 = GenericRecord.create(icebergTableSchema); - record1.setField("id", 1); - record1.setField("name", "Alice"); - Record record2 = GenericRecord.create(icebergTableSchema); - record2.setField("id", 2); - record2.setField("name", "Bob"); - Set expected = Sets.newHashSet(record1, record2); - validateTableWithIceberg(expected); - validateTableWithSpark(expected); - } catch (Exception e) { - fail("Exception: " + e); - } finally { - sink.drop(); - } - } - - @Test - public void testDrop() throws IOException { - createMockTable(); - Configuration hadoopConf = new Configuration(); - HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); - TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); - Table icebergTable = hadoopCatalog.loadTable(tableIdentifier); - IcebergSinkCoordinator coordinator = new IcebergSinkCoordinator(icebergTable); - AppendOnlyIcebergSinkWriter sink = - new AppendOnlyIcebergSinkWriter( - TestUtils.getMockTableSchema(), - hadoopCatalog, - icebergTable, - FileFormat.PARQUET); - - sink.drop(); - - assertTrue(sink.isClosed()); - assertTrue(Files.exists(Paths.get(warehousePath))); - } -} diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkPartitionTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkPartitionTest.java deleted file mode 100644 index 701c0a71f4116..0000000000000 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkPartitionTest.java +++ /dev/null @@ -1,233 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.risingwave.connector.sink.iceberg; - -import static com.risingwave.proto.Data.*; -import static org.junit.Assert.*; -import static org.junit.Assert.assertEquals; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.risingwave.connector.AppendOnlyIcebergSinkWriter; -import com.risingwave.connector.IcebergSinkCoordinator; -import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.ArraySinkRow; -import com.risingwave.proto.ConnectorServiceProto; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.IcebergGenerics; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.types.Types; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.junit.Ignore; -import org.junit.Test; - -@Ignore -public class IcebergSinkPartitionTest { - static String warehousePath = "/tmp/rw-sinknode/iceberg-sink/warehouse"; - static String databaseName = "demo_db"; - static String tableName = "demo_table_partitioned"; - static Schema icebergTableSchema = - new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.required(2, "name", Types.StringType.get()), - Types.NestedField.required(3, "part", Types.StringType.get())); - static TableSchema tableSchema = - new TableSchema( - Lists.newArrayList("id", "name", "part"), - Lists.newArrayList( - DataType.newBuilder().setTypeName(DataType.TypeName.INT32).build(), - DataType.newBuilder().setTypeName(DataType.TypeName.VARCHAR).build(), - DataType.newBuilder().setTypeName(DataType.TypeName.VARCHAR).build()), - Lists.newArrayList("id")); - - private void createMockTable() throws IOException { - if (!Paths.get(warehousePath).toFile().isDirectory()) { - Files.createDirectories(Paths.get(warehousePath)); - } - HadoopCatalog catalog = new HadoopCatalog(new Configuration(), warehousePath); - TableIdentifier tableIdent = TableIdentifier.of(databaseName, tableName); - try { - catalog.dropTable(tableIdent); - } catch (Exception e) { - // Ignored. - } - PartitionSpec spec = PartitionSpec.builderFor(icebergTableSchema).identity("part").build(); - catalog.createTable(tableIdent, icebergTableSchema, spec, Map.of("format-version", "2")); - catalog.close(); - } - - private void validateTableWithIceberg(Set expected) { - HadoopCatalog catalog = new HadoopCatalog(new Configuration(), warehousePath); - TableIdentifier tableIdent = TableIdentifier.of(databaseName, tableName); - Table icebergTable = catalog.loadTable(tableIdent); - CloseableIterable iter = IcebergGenerics.read(icebergTable).build(); - Set actual = Sets.newHashSet(iter); - assertEquals(expected.size(), actual.size()); - assertEquals(expected, actual); - } - - private void validateTableWithSpark(Set expected) { - SparkConf sparkConf = new SparkConf(); - sparkConf.set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog"); - sparkConf.set("spark.sql.catalog.demo.type", "hadoop"); - sparkConf.set("spark.sql.catalog.demo.warehouse", warehousePath); - sparkConf.set("spark.sql.catalog.defaultCatalog", "demo"); - SparkSession spark = SparkSession.builder().master("local").config(sparkConf).getOrCreate(); - List rows = - spark.read() - .format("iceberg") - .load(String.format("demo.%s.%s", databaseName, tableName)) - .collectAsList(); - spark.close(); - Set actual = new HashSet<>(); - for (Row row : rows) { - int id = row.getInt(0); - String name = row.getString(1); - String part = row.getString(2); - Record record = GenericRecord.create(icebergTableSchema); - record.setField("id", id); - record.setField("name", name); - record.setField("part", part); - actual.add(record); - } - assertEquals(expected.size(), actual.size()); - assertEquals(expected, actual); - } - - @Test - public void testSync() throws IOException { - createMockTable(); - Configuration hadoopConf = new Configuration(); - HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); - TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); - Table icebergTable = hadoopCatalog.loadTable(tableIdentifier); - IcebergSinkCoordinator coordinator = new IcebergSinkCoordinator(icebergTable); - AppendOnlyIcebergSinkWriter sink = - new AppendOnlyIcebergSinkWriter( - tableSchema, hadoopCatalog, icebergTable, FileFormat.PARQUET); - - try { - sink.beginEpoch(233); - sink.write(Arrays.asList(new ArraySinkRow(Op.INSERT, 1, "Alice", "aaa"))); - ConnectorServiceProto.SinkMetadata metadata = sink.barrier(true).get(); - coordinator.commit(233, Collections.singletonList(metadata)); - - Record record1 = GenericRecord.create(icebergTableSchema); - record1.setField("id", 1); - record1.setField("name", "Alice"); - record1.setField("part", "aaa"); - Set expected = Sets.newHashSet(record1); - validateTableWithIceberg(expected); - validateTableWithSpark(expected); - - sink.beginEpoch(234); - sink.write(Arrays.asList(new ArraySinkRow(Op.INSERT, 2, "Bob", "bbb"))); - validateTableWithIceberg(expected); - validateTableWithSpark(expected); - - metadata = sink.barrier(true).get(); - coordinator.commit(234, Collections.singletonList(metadata)); - - Record record2 = GenericRecord.create(icebergTableSchema); - record2.setField("id", 2); - record2.setField("name", "Bob"); - record2.setField("part", "bbb"); - expected.add(record2); - validateTableWithIceberg(expected); - validateTableWithSpark(expected); - } catch (Exception e) { - fail("Exception: " + e); - } finally { - sink.drop(); - } - } - - @Test - public void testWrite() throws IOException { - createMockTable(); - Configuration hadoopConf = new Configuration(); - HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); - TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); - Table icebergTable = hadoopCatalog.loadTable(tableIdentifier); - IcebergSinkCoordinator coordinator = new IcebergSinkCoordinator(icebergTable); - AppendOnlyIcebergSinkWriter sink = - new AppendOnlyIcebergSinkWriter( - tableSchema, hadoopCatalog, icebergTable, FileFormat.PARQUET); - - try { - sink.beginEpoch(233); - sink.write( - Arrays.asList( - new ArraySinkRow(Op.INSERT, 1, "Alice", "aaa"), - new ArraySinkRow(Op.INSERT, 2, "Bob", "bbb"))); - ConnectorServiceProto.SinkMetadata metadata = sink.barrier(true).get(); - coordinator.commit(233, Collections.singletonList(metadata)); - - Record record1 = GenericRecord.create(icebergTableSchema); - record1.setField("id", 1); - record1.setField("name", "Alice"); - record1.setField("part", "aaa"); - Record record2 = GenericRecord.create(icebergTableSchema); - record2.setField("id", 2); - record2.setField("name", "Bob"); - record2.setField("part", "bbb"); - Set expected = Sets.newHashSet(record1, record2); - validateTableWithIceberg(expected); - validateTableWithSpark(expected); - } catch (Exception e) { - fail("Exception: " + e); - } finally { - sink.drop(); - } - } - - @Test - public void testDrop() throws IOException { - createMockTable(); - Configuration hadoopConf = new Configuration(); - HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); - TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); - AppendOnlyIcebergSinkWriter sink = - new AppendOnlyIcebergSinkWriter( - tableSchema, - hadoopCatalog, - hadoopCatalog.loadTable(tableIdentifier), - FileFormat.PARQUET); - - sink.drop(); - - assertTrue(sink.isClosed()); - assertTrue(Files.exists(Paths.get(warehousePath))); - } -} diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/SinkRowMapTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/SinkRowMapTest.java deleted file mode 100644 index 60caee0ee4c35..0000000000000 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/SinkRowMapTest.java +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.risingwave.connector.sink.iceberg; - -import static org.junit.Assert.assertEquals; - -import com.risingwave.connector.SinkRowMap; -import com.risingwave.connector.api.sink.ArraySinkRow; -import com.risingwave.connector.api.sink.SinkRow; -import com.risingwave.proto.Data; -import java.util.ArrayList; -import java.util.List; -import org.apache.iceberg.Schema; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.types.Types; -import org.junit.Assert; -import org.junit.Test; - -public class SinkRowMapTest { - @Test - public void testInsert() { - SinkRowMap sinkRowMap = new SinkRowMap(); - SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1); - List> key = new ArrayList<>(); - key.add((Comparable) row.get(0)); - Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get())); - Record r = GenericRecord.create(schema); - r.set(0, row.get(0)); - - sinkRowMap.insert(key, r); - assertEquals(1, sinkRowMap.getMap().size()); - assertEquals(null, sinkRowMap.getMap().get(key).getDelete()); - assertEquals(r, sinkRowMap.getMap().get(key).getInsert()); - } - - @Test - public void testInsertAfterDelete() { - SinkRowMap sinkRowMap = new SinkRowMap(); - Schema schema = - new Schema( - Types.NestedField.optional(0, "id", Types.IntegerType.get()), - Types.NestedField.optional(1, "name", Types.StringType.get())); - - SinkRow row1 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Alice"); - List> key1 = new ArrayList<>(); - key1.add((Comparable) row1.get(0)); - Record r1 = GenericRecord.create(schema); - r1.set(0, row1.get(0)); - r1.set(1, row1.get(1)); - SinkRow row2 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Bob"); - List> key2 = new ArrayList<>(); - key2.add((Comparable) row2.get(0)); - Record r2 = GenericRecord.create(schema); - r2.set(0, row2.get(0)); - r2.set(1, row2.get(1)); - - sinkRowMap.delete(key1, r1); - sinkRowMap.insert(key1, r2); - assertEquals(1, sinkRowMap.getMap().size()); - assertEquals(r1, sinkRowMap.getMap().get(key1).getDelete()); - assertEquals(r2, sinkRowMap.getMap().get(key1).getInsert()); - } - - @Test - public void testInsertAfterInsert() { - SinkRowMap sinkRowMap = new SinkRowMap(); - SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1); - List> key = new ArrayList<>(); - key.add((Comparable) row.get(0)); - Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get())); - Record r = GenericRecord.create(schema); - r.set(0, row.get(0)); - - sinkRowMap.insert(key, r); - boolean exceptionThrown = false; - try { - sinkRowMap.insert(key, r); - } catch (RuntimeException e) { - exceptionThrown = true; - Assert.assertTrue( - e.getMessage() - .toLowerCase() - .contains("try to insert a duplicated primary key")); - } - if (!exceptionThrown) { - Assert.fail("Expected exception not thrown: `try to insert a duplicated primary key`"); - } - } - - @Test - public void testDelete() { - SinkRowMap sinkRowMap = new SinkRowMap(); - - SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1); - List> key = new ArrayList<>(); - key.add((Comparable) row.get(0)); - - Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get())); - Record r = GenericRecord.create(schema); - r.set(0, row.get(0)); - - sinkRowMap.delete(key, r); - assertEquals(1, sinkRowMap.getMap().size()); - assertEquals(null, sinkRowMap.getMap().get(key).getInsert()); - assertEquals(r, sinkRowMap.getMap().get(key).getDelete()); - } - - @Test - public void testDeleteAfterDelete() { - SinkRowMap sinkRowMap = new SinkRowMap(); - SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1); - List> key = new ArrayList<>(); - key.add((Comparable) row.get(0)); - - Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get())); - Record r = GenericRecord.create(schema); - r.set(0, row.get(0)); - - sinkRowMap.delete(key, r); - boolean exceptionThrown = false; - try { - sinkRowMap.delete(key, r); - } catch (RuntimeException e) { - exceptionThrown = true; - Assert.assertTrue( - e.getMessage().toLowerCase().contains("try to double delete a primary key")); - } - if (!exceptionThrown) { - Assert.fail("Expected exception not thrown: `try to double delete a primary key`"); - } - } - - @Test - public void testDeleteAfterInsert() { - SinkRowMap sinkRowMap = new SinkRowMap(); - - SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1); - List> key = new ArrayList<>(); - key.add((Comparable) row.get(0)); - - Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get())); - Record r = GenericRecord.create(schema); - r.set(0, row.get(0)); - - sinkRowMap.insert(key, r); - sinkRowMap.delete(key, r); - assertEquals(0, sinkRowMap.getMap().size()); - } - - @Test - public void testDeleteAfterUpdate() { - SinkRowMap sinkRowMap = new SinkRowMap(); - - Schema schema = - new Schema( - Types.NestedField.optional(0, "id", Types.IntegerType.get()), - Types.NestedField.optional(1, "name", Types.StringType.get())); - - SinkRow row1 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Alice"); - List> key1 = new ArrayList<>(); - key1.add((Comparable) row1.get(0)); - Record r1 = GenericRecord.create(schema); - r1.set(0, row1.get(0)); - r1.set(1, row1.get(1)); - - SinkRow row2 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Clare"); - List> key2 = new ArrayList<>(); - key2.add((Comparable) row2.get(0)); - Record r2 = GenericRecord.create(schema); - r2.set(0, row2.get(0)); - r2.set(1, row2.get(1)); - - sinkRowMap.delete(key1, r1); - sinkRowMap.insert(key2, r2); - sinkRowMap.delete(key2, r2); - assertEquals(1, sinkRowMap.getMap().size()); - assertEquals(null, sinkRowMap.getMap().get(key1).getInsert()); - assertEquals(r1, sinkRowMap.getMap().get(key1).getDelete()); - } - - @Test - public void testClear() { - SinkRowMap sinkRowMap = new SinkRowMap(); - - SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1); - List> key = new ArrayList<>(); - key.add((Comparable) row.get(0)); - Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get())); - Record r = GenericRecord.create(schema); - r.set(0, row.get(0)); - sinkRowMap.insert(key, r); - - sinkRowMap.clear(); - assertEquals(0, sinkRowMap.getMap().size()); - } -} diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/UpsertIcebergSinkLocalTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/UpsertIcebergSinkLocalTest.java deleted file mode 100644 index f619235cab6ce..0000000000000 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/UpsertIcebergSinkLocalTest.java +++ /dev/null @@ -1,224 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.risingwave.connector; - -import static com.risingwave.proto.Data.*; -import static org.junit.Assert.*; -import static org.junit.Assert.assertEquals; - -import com.google.common.collect.Sets; -import com.risingwave.connector.api.sink.ArraySinkRow; -import com.risingwave.proto.ConnectorServiceProto; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.*; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.*; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.IcebergGenerics; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.types.Types; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.junit.Ignore; -import org.junit.Test; - -@Ignore -public class UpsertIcebergSinkLocalTest { - static String warehousePath = "/tmp/rw-sinknode/iceberg-sink/warehouse"; - static String databaseName = "demo_db"; - static String tableName = "demo_table"; - static Schema icebergTableSchema = - new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.required(2, "name", Types.StringType.get())); - - private void createMockTable() throws IOException { - if (!Paths.get(warehousePath).toFile().isDirectory()) { - Files.createDirectories(Paths.get(warehousePath)); - } - HadoopCatalog catalog = new HadoopCatalog(new Configuration(), warehousePath); - TableIdentifier tableIdent = TableIdentifier.of(databaseName, tableName); - try { - catalog.dropTable(tableIdent); - } catch (Exception e) { - // Ignored. - } - PartitionSpec spec = PartitionSpec.unpartitioned(); - catalog.createTable(tableIdent, icebergTableSchema, spec, Map.of("format-version", "2")); - catalog.close(); - } - - private void validateTableWithIceberg(Set expected) { - HadoopCatalog catalog = new HadoopCatalog(new Configuration(), warehousePath); - TableIdentifier tableIdent = TableIdentifier.of(databaseName, tableName); - Table icebergTable = catalog.loadTable(tableIdent); - CloseableIterable iter = IcebergGenerics.read(icebergTable).build(); - Set actual = Sets.newHashSet(iter); - assertEquals(expected.size(), actual.size()); - assertEquals(expected, actual); - } - - private void validateTableWithSpark(Set expected) { - SparkConf sparkConf = new SparkConf(); - sparkConf.set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog"); - sparkConf.set("spark.sql.catalog.demo.type", "hadoop"); - sparkConf.set("spark.sql.catalog.demo.warehouse", warehousePath); - sparkConf.set("spark.sql.catalog.defaultCatalog", "demo"); - SparkSession spark = SparkSession.builder().master("local").config(sparkConf).getOrCreate(); - List rows = - spark.read() - .format("iceberg") - .load(String.format("demo.%s.%s", databaseName, tableName)) - .collectAsList(); - spark.close(); - Set actual = new HashSet<>(); - for (Row row : rows) { - int id = row.getInt(0); - String name = row.getString(1); - Record record = GenericRecord.create(icebergTableSchema); - record.setField("id", id); - record.setField("name", name); - actual.add(record); - } - assertEquals(expected.size(), actual.size()); - assertEquals(expected, actual); - } - - @Test - public void testSync() throws IOException { - createMockTable(); - Configuration hadoopConf = new Configuration(); - HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); - TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); - Table icebergTable = hadoopCatalog.loadTable(tableIdentifier); - IcebergSinkCoordinator coordinator = new IcebergSinkCoordinator(icebergTable); - UpsertIcebergSinkWriter sink = - new UpsertIcebergSinkWriter( - TestUtils.getMockTableSchema(), - hadoopCatalog, - icebergTable, - FileFormat.PARQUET); - - try { - sink.beginEpoch(233); - sink.write(Arrays.asList(new ArraySinkRow(Op.INSERT, 1, "Alice"))); - ConnectorServiceProto.SinkMetadata metadata = sink.barrier(true).get(); - coordinator.commit(233, Collections.singletonList(metadata)); - - Record record1 = GenericRecord.create(icebergTableSchema); - record1.setField("id", 1); - record1.setField("name", "Alice"); - Set expected = Sets.newHashSet(record1); - validateTableWithIceberg(expected); - validateTableWithSpark(expected); - - sink.beginEpoch(234); - sink.write(Arrays.asList(new ArraySinkRow(Op.INSERT, 2, "Bob"))); - validateTableWithIceberg(expected); - validateTableWithSpark(expected); - - metadata = sink.barrier(true).get(); - coordinator.commit(234, Collections.singletonList(metadata)); - - Record record2 = GenericRecord.create(icebergTableSchema); - record2.setField("id", 2); - record2.setField("name", "Bob"); - expected.add(record2); - validateTableWithIceberg(expected); - validateTableWithSpark(expected); - } catch (Exception e) { - fail("Exception: " + e); - } finally { - sink.drop(); - } - } - - @Test - public void testWrite() throws IOException { - createMockTable(); - Configuration hadoopConf = new Configuration(); - HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); - TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); - Table icebergTable = hadoopCatalog.loadTable(tableIdentifier); - IcebergSinkCoordinator coordinator = new IcebergSinkCoordinator(icebergTable); - UpsertIcebergSinkWriter sink = - new UpsertIcebergSinkWriter( - TestUtils.getMockTableSchema(), - hadoopCatalog, - icebergTable, - FileFormat.PARQUET); - - try { - sink.beginEpoch(233); - sink.write( - Arrays.asList( - new ArraySinkRow(Op.INSERT, 1, "Alice"), - new ArraySinkRow(Op.INSERT, 2, "Bob"), - new ArraySinkRow(Op.UPDATE_DELETE, 1, "Alice"), - new ArraySinkRow(Op.UPDATE_INSERT, 1, "Clare"), - new ArraySinkRow(Op.DELETE, 2, "Bob"))); - ConnectorServiceProto.SinkMetadata metadata = sink.barrier(true).get(); - coordinator.commit(233, Collections.singletonList(metadata)); - - Record record1 = GenericRecord.create(icebergTableSchema); - record1.setField("id", 1); - record1.setField("name", "Clare"); - Set expected = Sets.newHashSet(record1); - validateTableWithIceberg(expected); - validateTableWithSpark(expected); - - sink.beginEpoch(234); - sink.write( - Arrays.asList( - new ArraySinkRow(Op.UPDATE_DELETE, 1, "Clare"), - new ArraySinkRow(Op.UPDATE_INSERT, 1, "Alice"), - new ArraySinkRow(Op.DELETE, 1, "Alice"))); - metadata = sink.barrier(true).get(); - coordinator.commit(234, Collections.singletonList(metadata)); - - validateTableWithIceberg(Sets.newHashSet()); - validateTableWithSpark(Sets.newHashSet()); - } catch (Exception e) { - fail("Exception: " + e); - } finally { - sink.drop(); - } - } - - @Test - public void testDrop() throws IOException { - createMockTable(); - Configuration hadoopConf = new Configuration(); - HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); - TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); - UpsertIcebergSinkWriter sink = - new UpsertIcebergSinkWriter( - TestUtils.getMockTableSchema(), - hadoopCatalog, - hadoopCatalog.loadTable(tableIdentifier), - FileFormat.PARQUET); - - sink.drop(); - - assertTrue(sink.isClosed()); - assertTrue(Files.exists(Paths.get(warehousePath))); - } -} diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/UpsertIcebergSinkPartitionTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/UpsertIcebergSinkPartitionTest.java deleted file mode 100644 index ce2e5549c0a8c..0000000000000 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/UpsertIcebergSinkPartitionTest.java +++ /dev/null @@ -1,237 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.risingwave.connector.sink.iceberg; - -import static com.risingwave.proto.Data.*; -import static org.junit.Assert.*; -import static org.junit.Assert.assertEquals; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.risingwave.connector.IcebergSinkCoordinator; -import com.risingwave.connector.UpsertIcebergSinkWriter; -import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.ArraySinkRow; -import com.risingwave.proto.ConnectorServiceProto; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.*; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.*; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.IcebergGenerics; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.types.Types; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.junit.Ignore; -import org.junit.Test; - -@Ignore -public class UpsertIcebergSinkPartitionTest { - static String warehousePath = "/tmp/rw-sinknode/iceberg-sink/warehouse"; - static String databaseName = "demo_db"; - static String tableName = "demo_table_partitioned"; - static Schema icebergTableSchema = - new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.required(2, "name", Types.StringType.get()), - Types.NestedField.required(3, "part", Types.StringType.get())); - - static TableSchema tableSchema = - new TableSchema( - Lists.newArrayList("id", "name", "part"), - Lists.newArrayList( - DataType.newBuilder().setTypeName(DataType.TypeName.INT32).build(), - DataType.newBuilder().setTypeName(DataType.TypeName.VARCHAR).build(), - DataType.newBuilder().setTypeName(DataType.TypeName.VARCHAR).build()), - Lists.newArrayList("id")); - - private void createMockTable() throws IOException { - if (!Paths.get(warehousePath).toFile().isDirectory()) { - Files.createDirectories(Paths.get(warehousePath)); - } - HadoopCatalog catalog = new HadoopCatalog(new Configuration(), warehousePath); - TableIdentifier tableIdent = TableIdentifier.of(databaseName, tableName); - try { - catalog.dropTable(tableIdent); - } catch (Exception e) { - // Ignored. - } - PartitionSpec spec = PartitionSpec.builderFor(icebergTableSchema).identity("part").build(); - catalog.createTable(tableIdent, icebergTableSchema, spec, Map.of("format-version", "2")); - catalog.close(); - } - - private void validateTableWithIceberg(Set expected) { - HadoopCatalog catalog = new HadoopCatalog(new Configuration(), warehousePath); - TableIdentifier tableIdent = TableIdentifier.of(databaseName, tableName); - Table icebergTable = catalog.loadTable(tableIdent); - CloseableIterable iter = IcebergGenerics.read(icebergTable).build(); - Set actual = Sets.newHashSet(iter); - assertEquals(expected.size(), actual.size()); - assertEquals(expected, actual); - } - - private void validateTableWithSpark(Set expected) { - SparkConf sparkConf = new SparkConf(); - sparkConf.set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog"); - sparkConf.set("spark.sql.catalog.demo.type", "hadoop"); - sparkConf.set("spark.sql.catalog.demo.warehouse", warehousePath); - sparkConf.set("spark.sql.catalog.defaultCatalog", "demo"); - SparkSession spark = SparkSession.builder().master("local").config(sparkConf).getOrCreate(); - List rows = - spark.read() - .format("iceberg") - .load(String.format("demo.%s.%s", databaseName, tableName)) - .collectAsList(); - spark.close(); - Set actual = new HashSet<>(); - for (Row row : rows) { - int id = row.getInt(0); - String name = row.getString(1); - String part = row.getString(2); - Record record = GenericRecord.create(icebergTableSchema); - record.setField("id", id); - record.setField("name", name); - record.setField("part", part); - actual.add(record); - } - assertEquals(expected.size(), actual.size()); - assertEquals(expected, actual); - } - - @Test - public void testSync() throws IOException { - createMockTable(); - Configuration hadoopConf = new Configuration(); - HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); - TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); - Table icebergTable = hadoopCatalog.loadTable(tableIdentifier); - IcebergSinkCoordinator coordinator = new IcebergSinkCoordinator(icebergTable); - UpsertIcebergSinkWriter sink = - new UpsertIcebergSinkWriter( - tableSchema, hadoopCatalog, icebergTable, FileFormat.PARQUET); - - try { - sink.beginEpoch(233); - sink.write(Collections.singletonList(new ArraySinkRow(Op.INSERT, 1, "Alice", "aaa"))); - ConnectorServiceProto.SinkMetadata metadata = sink.barrier(true).get(); - coordinator.commit(233, Collections.singletonList(metadata)); - - Record record1 = GenericRecord.create(icebergTableSchema); - record1.setField("id", 1); - record1.setField("name", "Alice"); - record1.setField("part", "aaa"); - Set expected = Sets.newHashSet(record1); - validateTableWithIceberg(expected); - validateTableWithSpark(expected); - - sink.beginEpoch(234); - sink.write(Collections.singletonList((new ArraySinkRow(Op.INSERT, 2, "Bob", "bbb")))); - validateTableWithIceberg(expected); - validateTableWithSpark(expected); - - metadata = sink.barrier(true).get(); - coordinator.commit(234, Collections.singletonList(metadata)); - - Record record2 = GenericRecord.create(icebergTableSchema); - record2.setField("id", 2); - record2.setField("name", "Bob"); - record2.setField("part", "bbb"); - expected.add(record2); - validateTableWithIceberg(expected); - validateTableWithSpark(expected); - } catch (Exception e) { - fail("Exception: " + e); - } finally { - sink.drop(); - } - } - - @Test - public void testWrite() throws IOException { - createMockTable(); - Configuration hadoopConf = new Configuration(); - HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); - TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); - Table icebergTable = hadoopCatalog.loadTable(tableIdentifier); - IcebergSinkCoordinator coordinator = new IcebergSinkCoordinator(icebergTable); - UpsertIcebergSinkWriter sink = - new UpsertIcebergSinkWriter( - tableSchema, hadoopCatalog, icebergTable, FileFormat.PARQUET); - - try { - sink.beginEpoch(233); - sink.write( - Arrays.asList( - new ArraySinkRow(Op.INSERT, 1, "Alice", "aaa"), - new ArraySinkRow(Op.INSERT, 2, "Bob", "bbb"), - new ArraySinkRow(Op.UPDATE_DELETE, 1, "Alice", "aaa"), - new ArraySinkRow(Op.UPDATE_INSERT, 1, "Clare", "ccc"), - new ArraySinkRow(Op.DELETE, 2, "Bob", "bbb"))); - ConnectorServiceProto.SinkMetadata metadata = sink.barrier(true).get(); - coordinator.commit(233, Collections.singletonList(metadata)); - - Record record1 = GenericRecord.create(icebergTableSchema); - record1.setField("id", 1); - record1.setField("name", "Clare"); - record1.setField("part", "ccc"); - Set expected = Sets.newHashSet(record1); - validateTableWithIceberg(expected); - validateTableWithSpark(expected); - - sink.beginEpoch(234); - sink.write( - Arrays.asList( - new ArraySinkRow(Op.UPDATE_DELETE, 1, "Clare", "ccc"), - new ArraySinkRow(Op.UPDATE_INSERT, 1, "Alice", "aaa"), - new ArraySinkRow(Op.DELETE, 1, "Alice", "aaa"))); - metadata = sink.barrier(true).get(); - coordinator.commit(234, Collections.singletonList(metadata)); - - validateTableWithIceberg(Sets.newHashSet()); - validateTableWithSpark(Sets.newHashSet()); - } catch (Exception e) { - fail("Exception: " + e); - } finally { - sink.drop(); - } - } - - @Test - public void testDrop() throws IOException { - createMockTable(); - Configuration hadoopConf = new Configuration(); - HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); - TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); - UpsertIcebergSinkWriter sink = - new UpsertIcebergSinkWriter( - tableSchema, - hadoopCatalog, - hadoopCatalog.loadTable(tableIdentifier), - FileFormat.PARQUET); - - sink.drop(); - - assertTrue(sink.isClosed()); - assertTrue(Files.exists(Paths.get(warehousePath))); - } -} diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/AppendOnlyIcebergSinkWriter.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/AppendOnlyIcebergSinkWriter.java deleted file mode 100644 index c7c454c812530..0000000000000 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/AppendOnlyIcebergSinkWriter.java +++ /dev/null @@ -1,146 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.risingwave.connector; - -import static io.grpc.Status.INTERNAL; -import static io.grpc.Status.UNIMPLEMENTED; - -import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.SinkRow; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import org.apache.iceberg.*; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.parquet.GenericParquetWriter; -import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.io.DataWriter; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.parquet.Parquet; - -public class AppendOnlyIcebergSinkWriter extends IcebergSinkWriterBase { - private Map> dataWriterMap = new HashMap<>(); - private boolean closed = false; - - public AppendOnlyIcebergSinkWriter( - TableSchema tableSchema, - HadoopCatalog hadoopCatalog, - Table icebergTable, - FileFormat fileFormat) { - super( - tableSchema, - icebergTable, - hadoopCatalog, - icebergTable.schema().select(Arrays.asList(tableSchema.getColumnNames())), - fileFormat); - } - - @Override - public boolean write(Iterable rows) { - for (SinkRow row : rows) { - switch (row.getOp()) { - case INSERT: - Record record = GenericRecord.create(rowSchema); - if (row.size() != tableSchema.getColumnNames().length) { - throw INTERNAL.withDescription("row values do not match table schema") - .asRuntimeException(); - } - for (int i = 0; i < rowSchema.columns().size(); i++) { - record.set(i, row.get(i)); - } - PartitionKey partitionKey = - new PartitionKey(icebergTable.spec(), icebergTable.schema()); - partitionKey.partition(record); - DataWriter dataWriter; - if (dataWriterMap.containsKey(partitionKey)) { - dataWriter = dataWriterMap.get(partitionKey); - } else { - try { - String filename = fileFormat.addExtension(UUID.randomUUID().toString()); - OutputFile outputFile = - icebergTable - .io() - .newOutputFile( - icebergTable.location() - + "/data/" - + icebergTable - .spec() - .partitionToPath(partitionKey) - + "/" - + filename); - dataWriter = - Parquet.writeData(outputFile) - .schema(rowSchema) - .withSpec(icebergTable.spec()) - .withPartition(partitionKey) - .createWriterFunc(GenericParquetWriter::buildWriter) - .overwrite() - .build(); - } catch (Exception e) { - throw INTERNAL.withDescription("failed to create dataWriter") - .asRuntimeException(); - } - dataWriterMap.put(partitionKey, dataWriter); - } - dataWriter.write(record); - break; - default: - throw UNIMPLEMENTED - .withDescription("unsupported operation: " + row.getOp()) - .asRuntimeException(); - } - } - return false; - } - - @Override - protected IcebergMetadata collectSinkMetadata() { - try { - List dataFileList = new ArrayList<>(); - for (DataWriter dataWriter : dataWriterMap.values()) { - dataWriter.close(); - DataFile dataFile = dataWriter.toDataFile(); - dataFileList.add(dataFile); - } - dataWriterMap.clear(); - return new IcebergMetadata(dataFileList.toArray(new DataFile[0]), new DeleteFile[0]); - } catch (Exception e) { - throw INTERNAL.withDescription(String.format("failed to collect metadata: %s", e)) - .withCause(e) - .asRuntimeException(); - } - } - - @Override - public void drop() { - try { - for (DataWriter dataWriter : dataWriterMap.values()) { - dataWriter.close(); - } - hadoopCatalog.close(); - closed = true; - } catch (Exception e) { - throw INTERNAL.withCause(e).asRuntimeException(); - } - } - - public boolean isClosed() { - return closed; - } -} diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergMetadata.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergMetadata.java deleted file mode 100644 index c2568a8eaf9c1..0000000000000 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergMetadata.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2024 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.risingwave.connector; - -import java.io.Serializable; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; - -public class IcebergMetadata implements Serializable { - final DataFile[] dataFiles; - final DeleteFile[] deleteFiles; - - public IcebergMetadata(DataFile[] dataFiles, DeleteFile[] deleteFiles) { - this.dataFiles = dataFiles; - this.deleteFiles = deleteFiles; - } -} diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkConfig.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkConfig.java deleted file mode 100644 index fd89fc0163497..0000000000000 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkConfig.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2024 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.risingwave.connector; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.risingwave.connector.common.S3Config; - -public class IcebergSinkConfig extends S3Config { - private String sinkType; - - private String warehousePath; - - private String databaseName; - - private String tableName; - - @JsonProperty(value = "force_append_only") - private Boolean forceAppendOnly; - - @JsonProperty(value = "primary_key") - private String primaryKey; - - @JsonCreator - public IcebergSinkConfig( - @JsonProperty(value = "type") String sinkType, - @JsonProperty(value = "warehouse.path") String warehousePath, - @JsonProperty(value = "database.name") String databaseName, - @JsonProperty(value = "table.name") String tableName) { - this.sinkType = sinkType; - this.warehousePath = warehousePath; - this.databaseName = databaseName; - this.tableName = tableName; - } - - public String getSinkType() { - return sinkType; - } - - public String getWarehousePath() { - return warehousePath; - } - - public void setWarehousePath(String warehousePath) { - this.warehousePath = warehousePath; - } - - public String getDatabaseName() { - return databaseName; - } - - public String getTableName() { - return tableName; - } -} diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkCoordinator.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkCoordinator.java deleted file mode 100644 index f4ff45cb5bfb5..0000000000000 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkCoordinator.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright 2024 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.risingwave.connector; - -import com.risingwave.connector.api.sink.SinkCoordinator; -import com.risingwave.java.utils.ObjectSerde; -import com.risingwave.proto.ConnectorServiceProto; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; -import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.RowDelta; -import org.apache.iceberg.Table; -import org.apache.iceberg.Transaction; - -public class IcebergSinkCoordinator implements SinkCoordinator { - - private final Table icebergTable; - - public IcebergSinkCoordinator(Table icebergTable) { - this.icebergTable = icebergTable; - } - - @Override - public void commit(long epoch, List metadataList) { - List dataFileList = new ArrayList<>(metadataList.size()); - List deleteFileList = new ArrayList<>(metadataList.size()); - for (ConnectorServiceProto.SinkMetadata metadata : metadataList) { - IcebergMetadata icebergMetadata = - (IcebergMetadata) - ObjectSerde.deserializeObject( - metadata.getSerialized().getMetadata().toByteArray()); - dataFileList.addAll( - Arrays.stream(icebergMetadata.dataFiles).collect(Collectors.toList())); - deleteFileList.addAll( - Arrays.stream(icebergMetadata.deleteFiles).collect(Collectors.toList())); - } - boolean nonEmpty = false; - Transaction txn = icebergTable.newTransaction(); - if (!deleteFileList.isEmpty()) { - RowDelta rowDelta = txn.newRowDelta(); - for (DeleteFile deleteFile : deleteFileList) { - rowDelta.addDeletes(deleteFile); - } - rowDelta.commit(); - nonEmpty = true; - } - - if (!dataFileList.isEmpty()) { - AppendFiles append = txn.newAppend(); - for (DataFile dataFile : dataFileList) { - append.appendFile(dataFile); - } - append.commit(); - nonEmpty = true; - } - - if (nonEmpty) { - txn.commitTransaction(); - } - } - - @Override - public void drop() {} -} diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java deleted file mode 100644 index fd8317e94e039..0000000000000 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java +++ /dev/null @@ -1,187 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.risingwave.connector; - -import static io.grpc.Status.UNIMPLEMENTED; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.SinkCoordinator; -import com.risingwave.connector.api.sink.SinkFactory; -import com.risingwave.connector.api.sink.SinkWriter; -import com.risingwave.connector.common.S3Utils; -import com.risingwave.java.utils.UrlParser; -import com.risingwave.proto.Catalog.SinkType; -import io.grpc.Status; -import java.util.Map; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.hadoop.HadoopCatalog; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class IcebergSinkFactory implements SinkFactory { - - private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkFactory.class); - - public static final FileFormat FILE_FORMAT = FileFormat.PARQUET; - - // hadoop catalog config - - private static final String confIoImpl = "io-impl"; - private static final String s3FileIOImpl = "org.apache.iceberg.aws.s3.S3FileIO"; - - @Override - public SinkCoordinator createCoordinator( - TableSchema tableSchema, Map tableProperties) { - ObjectMapper mapper = new ObjectMapper(); - IcebergSinkConfig config = mapper.convertValue(tableProperties, IcebergSinkConfig.class); - String warehousePath = getWarehousePath(config); - config.setWarehousePath(warehousePath); - - String scheme = UrlParser.parseLocationScheme(warehousePath); - TableIdentifier tableIdentifier = - TableIdentifier.of(config.getDatabaseName(), config.getTableName()); - Configuration hadoopConf = createHadoopConf(scheme, config); - - try (HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); ) { - Table icebergTable = hadoopCatalog.loadTable(tableIdentifier); - return new IcebergSinkCoordinator(icebergTable); - } catch (Exception e) { - throw Status.FAILED_PRECONDITION - .withDescription( - String.format("failed to load iceberg table: %s", e.getMessage())) - .withCause(e) - .asRuntimeException(); - } - } - - @Override - public SinkWriter createWriter(TableSchema tableSchema, Map tableProperties) { - ObjectMapper mapper = new ObjectMapper(); - IcebergSinkConfig config = mapper.convertValue(tableProperties, IcebergSinkConfig.class); - String warehousePath = getWarehousePath(config); - config.setWarehousePath(warehousePath); - - String scheme = UrlParser.parseLocationScheme(warehousePath); - TableIdentifier tableIdentifier = - TableIdentifier.of(config.getDatabaseName(), config.getTableName()); - Configuration hadoopConf = createHadoopConf(scheme, config); - SinkWriter sink = null; - - try (HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); ) { - Table icebergTable = hadoopCatalog.loadTable(tableIdentifier); - String sinkType = config.getSinkType(); - if (sinkType.equals("append-only")) { - sink = - new AppendOnlyIcebergSinkWriter( - tableSchema, hadoopCatalog, icebergTable, FILE_FORMAT); - } else if (sinkType.equals("upsert")) { - sink = - new UpsertIcebergSinkWriter( - tableSchema, hadoopCatalog, - icebergTable, FILE_FORMAT); - } - } catch (Exception e) { - throw Status.FAILED_PRECONDITION - .withDescription( - String.format("failed to load iceberg table: %s", e.getMessage())) - .withCause(e) - .asRuntimeException(); - } - - if (sink == null) { - throw UNIMPLEMENTED - .withDescription("unsupported mode: " + config.getSinkType()) - .asRuntimeException(); - } - return sink; - } - - @Override - public void validate( - TableSchema tableSchema, Map tableProperties, SinkType sinkType) { - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true); - IcebergSinkConfig config = mapper.convertValue(tableProperties, IcebergSinkConfig.class); - - String warehousePath = getWarehousePath(config); - String scheme = UrlParser.parseLocationScheme(warehousePath); - TableIdentifier tableIdentifier = - TableIdentifier.of(config.getDatabaseName(), config.getTableName()); - Configuration hadoopConf = createHadoopConf(scheme, config); - - try (HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); ) { - Table icebergTable = hadoopCatalog.loadTable(tableIdentifier); - IcebergSinkUtil.checkSchema(tableSchema, icebergTable.schema()); - } catch (Exception e) { - throw Status.INTERNAL - .withDescription( - String.format("failed to load iceberg table: %s", e.getMessage())) - .withCause(e) - .asRuntimeException(); - } - - if (!config.getSinkType().equals("append-only") && !config.getSinkType().equals("upsert")) { - throw UNIMPLEMENTED - .withDescription("unsupported mode: " + config.getSinkType()) - .asRuntimeException(); - } - - switch (sinkType) { - case SINK_TYPE_UPSERT: - // For upsert iceberg sink, the user must specify its primary key explicitly. - if (tableSchema.getPrimaryKeys().isEmpty()) { - throw Status.INVALID_ARGUMENT - .withDescription("please define primary key for upsert iceberg sink") - .asRuntimeException(); - } - break; - case SINK_TYPE_APPEND_ONLY: - case SINK_TYPE_FORCE_APPEND_ONLY: - break; - default: - throw Status.INTERNAL.asRuntimeException(); - } - } - - private static String getWarehousePath(IcebergSinkConfig config) { - String warehousePath = config.getWarehousePath(); - // unify s3 and s3a - if (warehousePath.startsWith("s3://")) { - return warehousePath.replace("s3://", "s3a://"); - } - return warehousePath; - } - - private Configuration createHadoopConf(String scheme, IcebergSinkConfig config) { - switch (scheme) { - case "file": - return new Configuration(); - case "s3a": - Configuration hadoopConf = S3Utils.getHadoopConf(config); - hadoopConf.set(confIoImpl, s3FileIOImpl); - return hadoopConf; - default: - throw UNIMPLEMENTED - .withDescription( - String.format("scheme %s not supported for warehouse path", scheme)) - .asRuntimeException(); - } - } -} diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkUtil.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkUtil.java deleted file mode 100644 index 392ea0f4a39f3..0000000000000 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkUtil.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright 2024 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.risingwave.connector; - -import static io.grpc.Status.INVALID_ARGUMENT; -import static io.grpc.Status.UNIMPLEMENTED; - -import com.risingwave.connector.api.TableSchema; -import com.risingwave.proto.Data; -import io.grpc.Status; -import java.util.List; -import java.util.Map; -import org.apache.iceberg.Schema; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; - -public class IcebergSinkUtil { - private static Type convertType(Data.DataType.TypeName typeName) { - switch (typeName) { - case INT16: - case INT32: - return Types.IntegerType.get(); - case INT64: - return Types.LongType.get(); - case FLOAT: - return Types.FloatType.get(); - case DOUBLE: - return Types.DoubleType.get(); - case BOOLEAN: - return Types.BooleanType.get(); - case VARCHAR: - return Types.StringType.get(); - case DECIMAL: - return Types.DecimalType.of(10, 0); - case TIMESTAMP: - return Types.TimestampType.withoutZone(); - case TIMESTAMPTZ: - return Types.TimestampType.withZone(); - case DATE: - return Types.DateType.get(); - case TIME: - return Types.TimeType.get(); - case STRUCT: - case LIST: - throw UNIMPLEMENTED - .withDescription(String.format("not support %s now", typeName)) - .asRuntimeException(); - case INTERVAL: - throw INVALID_ARGUMENT - .withDescription(String.format("Illegal type %s in Iceberg", typeName)) - .asRuntimeException(); - default: - throw INVALID_ARGUMENT - .withDescription("unspecified type" + typeName) - .asRuntimeException(); - } - } - - public static void checkSchema(TableSchema tableSchema, Schema icebergSchema) { - if (icebergSchema == null) { - throw Status.FAILED_PRECONDITION - .withDescription("Schema of iceberg table is null") - .asRuntimeException(); - } - Map tableColumnTypes = tableSchema.getColumnTypes(); - List icebergNestedFields = icebergSchema.columns(); - // Check that all columns in tableSchema exist in the iceberg table and that existing column - // types match. - for (Map.Entry tableColumnEntry : - tableColumnTypes.entrySet()) { - Types.NestedField field = icebergSchema.findField(tableColumnEntry.getKey()); - if (field == null) { - throw Status.FAILED_PRECONDITION - .withDescription( - String.format( - "The name of table schema does not match. Column name: %s.", - tableColumnEntry.getKey())) - .asRuntimeException(); - } - if (!convertType(tableColumnEntry.getValue()).equals(field.type())) { - throw Status.FAILED_PRECONDITION - .withDescription( - String.format( - "The type of table schema does not match. Column name: %s.", - tableColumnEntry.getKey())) - .asRuntimeException(); - } - } - // Check that all required columns in the iceberg table exist in tableSchema. - for (Types.NestedField field : icebergNestedFields) { - if (tableColumnTypes.get(field.name()) == null) { - throw Status.FAILED_PRECONDITION - .withDescription(String.format("missing a required field %s", field.name())) - .asRuntimeException(); - } - } - } -} diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkWriterBase.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkWriterBase.java deleted file mode 100644 index e42030c217bfc..0000000000000 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkWriterBase.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2024 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.risingwave.connector; - -import com.google.protobuf.ByteString; -import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.SinkWriter; -import com.risingwave.java.utils.ObjectSerde; -import com.risingwave.proto.ConnectorServiceProto; -import java.util.Optional; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.hadoop.HadoopCatalog; - -public abstract class IcebergSinkWriterBase implements SinkWriter { - protected final TableSchema tableSchema; - protected final HadoopCatalog hadoopCatalog; - protected final Table icebergTable; - protected final Schema rowSchema; - protected final FileFormat fileFormat; - - public IcebergSinkWriterBase( - TableSchema tableSchema, - Table icebergTable, - HadoopCatalog hadoopCatalog, - Schema rowSchema, - FileFormat fileFormat) { - this.tableSchema = tableSchema; - this.hadoopCatalog = hadoopCatalog; - this.rowSchema = rowSchema; - this.icebergTable = icebergTable; - this.fileFormat = fileFormat; - } - - @Override - public void beginEpoch(long epoch) {} - - protected abstract IcebergMetadata collectSinkMetadata(); - - @Override - public Optional barrier(boolean isCheckpoint) { - if (isCheckpoint) { - IcebergMetadata metadata = collectSinkMetadata(); - return Optional.of( - ConnectorServiceProto.SinkMetadata.newBuilder() - .setSerialized( - ConnectorServiceProto.SinkMetadata.SerializedMetadata - .newBuilder() - .setMetadata( - ByteString.copyFrom( - ObjectSerde.serializeObject(metadata))) - .build()) - .build()); - } else { - return Optional.empty(); - } - } - - public HadoopCatalog getHadoopCatalog() { - return this.hadoopCatalog; - } - - public Table getIcebergTable() { - return this.icebergTable; - } -} diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowMap.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowMap.java deleted file mode 100644 index aba4d96f495f8..0000000000000 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowMap.java +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.risingwave.connector; - -import com.risingwave.connector.api.PkComparator; -import com.risingwave.connector.api.sink.SinkRow; -import io.grpc.Status; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import org.apache.iceberg.data.Record; - -public class SinkRowMap { - TreeMap>, SinkRowOp> map = new TreeMap<>(new PkComparator()); - - public void clear() { - map.clear(); - } - - public void insert(List> key, Record row) { - if (!map.containsKey(key)) { - map.put(key, SinkRowOp.insertOp(row)); - } else { - SinkRowOp sinkRowOp = map.get(key); - if (sinkRowOp.isDelete()) { - map.put(key, SinkRowOp.updateOp(sinkRowOp.getDelete(), row)); - } else { - throw Status.FAILED_PRECONDITION - .withDescription("try to insert a duplicated primary key") - .asRuntimeException(); - } - } - } - - public void delete(List> key, Record row) { - if (!map.containsKey(key)) { - map.put(key, SinkRowOp.deleteOp(row)); - } else { - SinkRowOp sinkRowOp = map.get(key); - Record insert = sinkRowOp.getInsert(); - if (insert == null) { - throw Status.FAILED_PRECONDITION - .withDescription("try to double delete a primary key") - .asRuntimeException(); - } - // TODO: may enable it again - // assertRowValuesEqual(insert, row); - Record delete = sinkRowOp.getDelete(); - if (delete != null) { - map.put(key, SinkRowOp.deleteOp(delete)); - } else { - map.remove(key); - } - } - } - - public Map>, SinkRowOp> getMap() { - return map; - } - - private void assertRowValuesEqual(SinkRow insert, SinkRow delete) { - for (int i = 0; i < delete.size(); i++) { - if (!insert.get(i).equals(delete.get(i))) { - throw Status.FAILED_PRECONDITION - .withDescription( - "delete row value " - + delete.get(i) - + " does not match inserted value " - + insert.get(i)) - .asRuntimeException(); - } - } - } -} diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowOp.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowOp.java deleted file mode 100644 index 2addfe01a486b..0000000000000 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowOp.java +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.risingwave.connector; - -import io.grpc.Status; -import org.apache.iceberg.data.Record; - -public class SinkRowOp { - private final Record delete; - private final Record insert; - - public static SinkRowOp insertOp(Record row) { - if (row == null) { - throw Status.FAILED_PRECONDITION - .withDescription("row op must not be null to initialize insertOp") - .asRuntimeException(); - } - return new SinkRowOp(null, row); - } - - public static SinkRowOp deleteOp(Record row) { - if (row == null) { - throw Status.FAILED_PRECONDITION - .withDescription("row op must not be null to initialize deleteOp") - .asRuntimeException(); - } - return new SinkRowOp(row, null); - } - - public static SinkRowOp updateOp(Record delete, Record insert) { - if (delete == null || insert == null) { - throw Status.FAILED_PRECONDITION - .withDescription("row ops must not be null initialize updateOp") - .asRuntimeException(); - } - return new SinkRowOp(delete, insert); - } - - private SinkRowOp(Record delete, Record insert) { - this.delete = delete; - this.insert = insert; - } - - public boolean isDelete() { - return insert == null && delete != null; - } - - public Record getDelete() { - return delete; - } - - public Record getInsert() { - return insert; - } -} diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSinkWriter.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSinkWriter.java deleted file mode 100644 index aae9bdf46c4c2..0000000000000 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSinkWriter.java +++ /dev/null @@ -1,248 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.risingwave.connector; - -import static io.grpc.Status.INTERNAL; -import static io.grpc.Status.UNIMPLEMENTED; - -import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.SinkRow; -import io.grpc.Status; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.stream.Collectors; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.parquet.GenericParquetWriter; -import org.apache.iceberg.deletes.EqualityDeleteWriter; -import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.io.DataWriter; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.types.Types; - -public class UpsertIcebergSinkWriter extends IcebergSinkWriterBase { - private final Schema deleteRowSchema; - private final List pkIndices; - private boolean closed = false; - private boolean updateBufferExists = false; - private Map sinkRowMapByPartition = new HashMap<>(); - - public UpsertIcebergSinkWriter( - TableSchema tableSchema, - HadoopCatalog hadoopCatalog, - Table icebergTable, - FileFormat fileFormat) { - super( - tableSchema, - icebergTable, - hadoopCatalog, - icebergTable.schema().select(Arrays.asList(tableSchema.getColumnNames())), - fileFormat); - this.deleteRowSchema = icebergTable.schema().select(tableSchema.getPrimaryKeys()); - this.pkIndices = - tableSchema.getPrimaryKeys().stream() - .map(columnName -> tableSchema.getColumnIndex(columnName)) - .collect(Collectors.toList()); - } - - private static Record newRecord(Schema schema, SinkRow row) { - Record record = GenericRecord.create(schema); - for (int i = 0; i < schema.columns().size(); i++) { - record.set(i, row.get(i)); - } - return record; - } - - private EqualityDeleteWriter newEqualityDeleteWriter(PartitionKey partitionKey) { - try { - String filename = fileFormat.addExtension(UUID.randomUUID().toString()); - OutputFile outputFile = - icebergTable - .io() - .newOutputFile( - icebergTable.location() - + "/data/" - + icebergTable.spec().partitionToPath(partitionKey) - + "/" - + filename); - return Parquet.writeDeletes(outputFile) - .forTable(icebergTable) - .rowSchema(deleteRowSchema) - .withSpec(icebergTable.spec()) - .withPartition(partitionKey) - .createWriterFunc(GenericParquetWriter::buildWriter) - .overwrite() - .equalityFieldIds( - deleteRowSchema.columns().stream() - .mapToInt(Types.NestedField::fieldId) - .toArray()) - .buildEqualityWriter(); - } catch (Exception e) { - throw INTERNAL.withDescription("failed to create outputFile and equalityDeleteWriter") - .asRuntimeException(); - } - } - - private DataWriter newDataWriter(PartitionKey partitionKey) { - try { - String filename = fileFormat.addExtension(UUID.randomUUID().toString()); - OutputFile outputFile = - icebergTable - .io() - .newOutputFile( - icebergTable.location() - + "/data/" - + icebergTable.spec().partitionToPath(partitionKey) - + "/" - + filename); - return Parquet.writeData(outputFile) - .schema(rowSchema) - .withSpec(icebergTable.spec()) - .withPartition(partitionKey) - .createWriterFunc(GenericParquetWriter::buildWriter) - .overwrite() - .build(); - } catch (Exception e) { - throw INTERNAL.withDescription("failed to create outputFile and dataWriter") - .asRuntimeException(); - } - } - - private List> getKeyFromRow(SinkRow row) { - return this.pkIndices.stream() - .map(idx -> (Comparable) row.get(idx)) - .collect(Collectors.toList()); - } - - @Override - public boolean write(Iterable rows) { - for (SinkRow row : rows) { - if (row.size() != tableSchema.getColumnNames().length) { - throw Status.FAILED_PRECONDITION - .withDescription("row values do not match table schema") - .asRuntimeException(); - } - Record record = newRecord(rowSchema, row); - PartitionKey partitionKey = - new PartitionKey(icebergTable.spec(), icebergTable.schema()); - partitionKey.partition(record); - SinkRowMap sinkRowMap; - if (sinkRowMapByPartition.containsKey(partitionKey)) { - sinkRowMap = sinkRowMapByPartition.get(partitionKey); - } else { - sinkRowMap = new SinkRowMap(); - sinkRowMapByPartition.put(partitionKey, sinkRowMap); - } - switch (row.getOp()) { - case INSERT: - sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row)); - break; - case DELETE: - sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row)); - break; - case UPDATE_DELETE: - if (updateBufferExists) { - throw Status.FAILED_PRECONDITION - .withDescription("an UPDATE_INSERT should precede an UPDATE_DELETE") - .asRuntimeException(); - } - sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row)); - updateBufferExists = true; - break; - case UPDATE_INSERT: - if (!updateBufferExists) { - throw Status.FAILED_PRECONDITION - .withDescription("an UPDATE_INSERT should precede an UPDATE_DELETE") - .asRuntimeException(); - } - sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row)); - updateBufferExists = false; - break; - default: - throw UNIMPLEMENTED - .withDescription("unsupported operation: " + row.getOp()) - .asRuntimeException(); - } - } - return false; - } - - @Override - protected IcebergMetadata collectSinkMetadata() { - List dataFileList = new ArrayList<>(); - List deleteFileList = new ArrayList<>(); - for (Map.Entry entry : sinkRowMapByPartition.entrySet()) { - EqualityDeleteWriter equalityDeleteWriter = - newEqualityDeleteWriter(entry.getKey()); - DataWriter dataWriter = newDataWriter(entry.getKey()); - for (SinkRowOp sinkRowOp : entry.getValue().map.values()) { - Record insert = sinkRowOp.getInsert(); - Record delete = sinkRowOp.getDelete(); - if (insert != null) { - dataWriter.write(insert); - } - if (delete != null) { - equalityDeleteWriter.write(delete); - } - } - try { - equalityDeleteWriter.close(); - dataWriter.close(); - } catch (IOException e) { - throw INTERNAL.withDescription( - "failed to close dataWriter and equalityDeleteWriter") - .asRuntimeException(); - } - - if (equalityDeleteWriter.length() > 0) { - DeleteFile eqDeletes = equalityDeleteWriter.toDeleteFile(); - deleteFileList.add(eqDeletes); - } - if (dataWriter.length() > 0) { - DataFile dataFile = dataWriter.toDataFile(); - dataFileList.add(dataFile); - } - } - sinkRowMapByPartition.clear(); - return new IcebergMetadata( - dataFileList.toArray(new DataFile[0]), deleteFileList.toArray(new DeleteFile[0])); - } - - @Override - public void drop() { - try { - hadoopCatalog.close(); - closed = true; - } catch (Exception e) { - throw INTERNAL.withCause(e).asRuntimeException(); - } - } - - public boolean isClosed() { - return closed; - } -}