diff --git a/src/connector/src/sink/iceberg/mock_catalog.rs b/src/connector/src/sink/iceberg/mock_catalog.rs index 9b9f98bedbedc..f9d60965b1d30 100644 --- a/src/connector/src/sink/iceberg/mock_catalog.rs +++ b/src/connector/src/sink/iceberg/mock_catalog.rs @@ -25,15 +25,12 @@ use opendal::Operator; /// A mock catalog for iceberg used for plan test. pub struct MockCatalog; -#[async_trait] -impl Catalog for MockCatalog { - fn name(&self) -> &str { - "mock" - } +impl MockCatalog { + const RANGE_TABLE: &'static str = "range_table"; + const SPARSE_TABLE: &'static str = "sparse_table"; - async fn load_table(self: Arc, table_name: &TableIdentifier) -> icelake::Result { - // A mock table for test - let table = Table::builder_from_catalog( + fn sparse_table(self: &Arc) -> Table { + Table::builder_from_catalog( { let mut builder = Memory::default(); builder.root("/tmp"); @@ -60,13 +57,19 @@ impl Catalog for MockCatalog { Field::required( 2, "v2", - icelake::types::Any::Primitive(icelake::types::Primitive::Timestamp), + icelake::types::Any::Primitive(icelake::types::Primitive::Long), ) .into(), Field::required( 3, "v3", - icelake::types::Any::Primitive(icelake::types::Primitive::Timestampz), + icelake::types::Any::Primitive(icelake::types::Primitive::String), + ) + .into(), + Field::required( + 4, + "v4", + icelake::types::Any::Primitive(icelake::types::Primitive::Time), ) .into(), ]), @@ -77,24 +80,98 @@ impl Catalog for MockCatalog { fields: vec![ PartitionField { source_column_id: 1, - partition_field_id: 4, + partition_field_id: 5, transform: icelake::types::Transform::Identity, name: "f1".to_string(), }, PartitionField { - source_column_id: 1, - partition_field_id: 5, + source_column_id: 2, + partition_field_id: 6, transform: icelake::types::Transform::Bucket(1), name: "f2".to_string(), }, PartitionField { - source_column_id: 1, - partition_field_id: 6, + source_column_id: 3, + partition_field_id: 7, transform: icelake::types::Transform::Truncate(1), name: "f3".to_string(), }, PartitionField { - source_column_id: 2, + source_column_id: 4, + partition_field_id: 8, + transform: icelake::types::Transform::Void, + name: "f4".to_string(), + }, + ], + }], + default_spec_id: 1, + last_partition_id: 1, + properties: None, + current_snapshot_id: None, + snapshots: None, + snapshot_log: None, + metadata_log: None, + sort_orders: vec![], + default_sort_order_id: 0, + refs: HashMap::new(), + }, + TableIdentifier::new(vec![Self::SPARSE_TABLE]).unwrap(), + ) + .build() + .unwrap() + } + + fn range_table(self: &Arc) -> Table { + Table::builder_from_catalog( + { + let mut builder = Memory::default(); + builder.root("/tmp"); + Operator::new(builder).unwrap().finish() + }, + self.clone(), + TableMetadata { + format_version: icelake::types::TableFormatVersion::V2, + table_uuid: "1".to_string(), + location: "1".to_string(), + last_sequence_number: 1, + last_updated_ms: 1, + last_column_id: 1, + schemas: vec![Schema::new( + 1, + None, + Struct::new(vec![ + Field::required( + 1, + "v1", + icelake::types::Any::Primitive(icelake::types::Primitive::Date), + ) + .into(), + Field::required( + 2, + "v2", + icelake::types::Any::Primitive(icelake::types::Primitive::Timestamp), + ) + .into(), + Field::required( + 3, + "v3", + icelake::types::Any::Primitive(icelake::types::Primitive::Timestampz), + ) + .into(), + Field::required( + 4, + "v4", + icelake::types::Any::Primitive(icelake::types::Primitive::Timestampz), + ) + .into(), + ]), + )], + current_schema_id: 1, + partition_specs: vec![icelake::types::PartitionSpec { + spec_id: 1, + fields: vec![ + PartitionField { + source_column_id: 1, partition_field_id: 7, transform: icelake::types::Transform::Year, name: "f4".to_string(), @@ -112,23 +189,11 @@ impl Catalog for MockCatalog { name: "f6".to_string(), }, PartitionField { - source_column_id: 3, + source_column_id: 4, partition_field_id: 10, transform: icelake::types::Transform::Hour, name: "f7".to_string(), }, - PartitionField { - source_column_id: 1, - partition_field_id: 11, - transform: icelake::types::Transform::Void, - name: "f8".to_string(), - }, - PartitionField { - source_column_id: 2, - partition_field_id: 12, - transform: icelake::types::Transform::Void, - name: "f9".to_string(), - }, ], }], default_spec_id: 1, @@ -142,11 +207,28 @@ impl Catalog for MockCatalog { default_sort_order_id: 0, refs: HashMap::new(), }, - table_name.clone(), + TableIdentifier::new(vec![Self::RANGE_TABLE]).unwrap(), ) .build() - .unwrap(); - Ok(table) + .unwrap() + } +} + +#[async_trait] +impl Catalog for MockCatalog { + fn name(&self) -> &str { + "mock" + } + + // Mock catalog load mock table according to table_name, there is 2 kinds of table for test: + // 1. sparse partition table + // 2. range partition table + async fn load_table(self: Arc, table_name: &TableIdentifier) -> icelake::Result
{ + match table_name.name.as_ref() { + Self::SPARSE_TABLE => Ok(self.sparse_table()), + Self::RANGE_TABLE => Ok(self.range_table()), + _ => unimplemented!("table {} not found", table_name), + } } async fn update_table(self: Arc, _update_table: &UpdateTable) -> icelake::Result
{ diff --git a/src/frontend/planner_test/tests/testdata/input/sink.yaml b/src/frontend/planner_test/tests/testdata/input/sink.yaml index 0c483430690f2..f9241cdc7c9a9 100644 --- a/src/frontend/planner_test/tests/testdata/input/sink.yaml +++ b/src/frontend/planner_test/tests/testdata/input/sink.yaml @@ -81,16 +81,16 @@ explain create sink sk1 from t2 emit on window close with (connector='blackhole'); expected_outputs: - explain_output -- id: create_mock_iceberg_sink_append_only_with_partition +- id: create_mock_iceberg_sink_append_only_with_sparse_partition sql: | - create table t1 (v1 int, v2 timestamp, v3 timestamp with time zone); - explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3 from t1 WITH ( + create table t1 (v1 int, v2 bigint, v3 varchar, v4 time); + explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3, v4 as v4 from t1 WITH ( connector = 'iceberg', type = 'append-only', force_append_only = 'true', catalog.type = 'mock', database.name = 'demo_db', - table.name = 'demo_table', + table.name = 'sparse_table', warehouse.path = 's3://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', s3.region = 'us-east-1', @@ -99,15 +99,51 @@ ); expected_outputs: - explain_output -- id: create_mock_iceberg_sink_upsert_with_partition +- id: create_mock_iceberg_sink_append_only_with_range_partition sql: | - create table t1 (v1 int primary key, v2 timestamp, v3 timestamp with time zone); - explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3 from t1 WITH ( + create table t1 (v1 date, v2 timestamp, v3 timestamp with time zone, v4 timestamp); + explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3, v4 as v4 from t1 WITH ( + connector = 'iceberg', + type = 'append-only', + force_append_only = 'true', + catalog.type = 'mock', + database.name = 'demo_db', + table.name = 'range_table', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin' + ); + expected_outputs: + - explain_output +- id: create_mock_iceberg_sink_upsert_with_sparse_partition + sql: | + create table t1 (v1 int, v2 bigint, v3 varchar, v4 time); + explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3, v4 as v4 from t1 WITH ( + connector = 'iceberg', + type = 'upsert', + catalog.type = 'mock', + database.name = 'demo_db', + table.name = 'sparse_table', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + primary_key = 'v1' + ); + expected_outputs: + - explain_output +- id: create_mock_iceberg_sink_upsert_with_range_partition + sql: | + create table t1 (v1 date, v2 timestamp, v3 timestamp with time zone, v4 timestamp); + explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3, v4 as v4 from t1 WITH ( connector = 'iceberg', type = 'upsert', catalog.type = 'mock', database.name = 'demo_db', - table.name = 'demo_table', + table.name = 'range_table', warehouse.path = 's3://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', s3.region = 'us-east-1', diff --git a/src/frontend/planner_test/tests/testdata/output/sink.yaml b/src/frontend/planner_test/tests/testdata/output/sink.yaml index 3379c6c5dc02e..8f759b107243c 100644 --- a/src/frontend/planner_test/tests/testdata/output/sink.yaml +++ b/src/frontend/planner_test/tests/testdata/output/sink.yaml @@ -172,16 +172,16 @@ StreamSink { type: upsert, columns: [a, b, t2._row_id(hidden)], pk: [t2._row_id] } └─StreamEowcSort { sort_column: t2.b } └─StreamTableScan { table: t2, columns: [a, b, _row_id] } -- id: create_mock_iceberg_sink_append_only_with_partition +- id: create_mock_iceberg_sink_append_only_with_sparse_partition sql: | - create table t1 (v1 int, v2 timestamp, v3 timestamp with time zone); - explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3 from t1 WITH ( + create table t1 (v1 int, v2 bigint, v3 varchar, v4 time); + explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3, v4 as v4 from t1 WITH ( connector = 'iceberg', type = 'append-only', force_append_only = 'true', catalog.type = 'mock', database.name = 'demo_db', - table.name = 'demo_table', + table.name = 'sparse_table', warehouse.path = 's3://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', s3.region = 'us-east-1', @@ -189,19 +189,38 @@ s3.secret.key = 'hummockadmin' ); explain_output: | - StreamSink { type: append-only, columns: [v1, v2, v3, t1._row_id(hidden)] } + StreamSink { type: append-only, columns: [v1, v2, v3, v4, t1._row_id(hidden)] } └─StreamExchange { dist: HashShard($expr1) } - └─StreamProject { exprs: [t1.v1, t1.v2, t1.v3, t1._row_id, Row(t1.v1, IcebergTransform('bucket[1]':Varchar, t1.v1), IcebergTransform('truncate[1]':Varchar, t1.v1), IcebergTransform('year':Varchar, t1.v2), IcebergTransform('month':Varchar, t1.v2), IcebergTransform('day':Varchar, t1.v3), IcebergTransform('hour':Varchar, t1.v3), null:Int32, null:Int32) as $expr1] } - └─StreamTableScan { table: t1, columns: [v1, v2, v3, _row_id] } -- id: create_mock_iceberg_sink_upsert_with_partition + └─StreamProject { exprs: [t1.v1, t1.v2, t1.v3, t1.v4, t1._row_id, Row(t1.v1, IcebergTransform('bucket[1]':Varchar, t1.v2), IcebergTransform('truncate[1]':Varchar, t1.v3), null:Int32) as $expr1] } + └─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] } +- id: create_mock_iceberg_sink_append_only_with_range_partition sql: | - create table t1 (v1 int primary key, v2 timestamp, v3 timestamp with time zone); - explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3 from t1 WITH ( + create table t1 (v1 date, v2 timestamp, v3 timestamp with time zone, v4 timestamp); + explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3, v4 as v4 from t1 WITH ( + connector = 'iceberg', + type = 'append-only', + force_append_only = 'true', + catalog.type = 'mock', + database.name = 'demo_db', + table.name = 'range_table', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin' + ); + explain_output: | + StreamSink { type: append-only, columns: [v1, v2, v3, v4, t1._row_id(hidden)] } + └─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] } +- id: create_mock_iceberg_sink_upsert_with_sparse_partition + sql: | + create table t1 (v1 int, v2 bigint, v3 varchar, v4 time); + explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3, v4 as v4 from t1 WITH ( connector = 'iceberg', type = 'upsert', catalog.type = 'mock', database.name = 'demo_db', - table.name = 'demo_table', + table.name = 'sparse_table', warehouse.path = 's3://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', s3.region = 'us-east-1', @@ -210,7 +229,26 @@ primary_key = 'v1' ); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3], pk: [t1.v1] } + StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], pk: [t1._row_id] } └─StreamExchange { dist: HashShard($expr1) } - └─StreamProject { exprs: [t1.v1, t1.v2, t1.v3, Row(t1.v1, IcebergTransform('bucket[1]':Varchar, t1.v1), IcebergTransform('truncate[1]':Varchar, t1.v1), IcebergTransform('year':Varchar, t1.v2), IcebergTransform('month':Varchar, t1.v2), IcebergTransform('day':Varchar, t1.v3), IcebergTransform('hour':Varchar, t1.v3), null:Int32, null:Int32) as $expr1] } - └─StreamTableScan { table: t1, columns: [v1, v2, v3] } + └─StreamProject { exprs: [t1.v1, t1.v2, t1.v3, t1.v4, t1._row_id, Row(t1.v1, IcebergTransform('bucket[1]':Varchar, t1.v2), IcebergTransform('truncate[1]':Varchar, t1.v3), null:Int32) as $expr1] } + └─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] } +- id: create_mock_iceberg_sink_upsert_with_range_partition + sql: | + create table t1 (v1 date, v2 timestamp, v3 timestamp with time zone, v4 timestamp); + explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3, v4 as v4 from t1 WITH ( + connector = 'iceberg', + type = 'upsert', + catalog.type = 'mock', + database.name = 'demo_db', + table.name = 'range_table', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + primary_key = 'v1' + ); + explain_output: | + StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], pk: [t1._row_id] } + └─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index c036b8f09aa83..2ccd32a4c0065 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -336,8 +336,6 @@ async fn get_partition_compute_info_for_iceberg( // Only compute the partition and shuffle by them for the sparse partition. let has_sparse_partition = partition_spec.fields.iter().any(|f| match f.transform { // Sparse partition - // # TODO - // `Identity`` transform may need to depends on the input type. icelake::types::Transform::Identity | icelake::types::Transform::Truncate(_) | icelake::types::Transform::Bucket(_) => true,