Skip to content

Commit

Permalink
feat: replace icelake Transform with iceberg (#18625)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Dec 16, 2024
1 parent 45fcf8f commit 594ef89
Show file tree
Hide file tree
Showing 10 changed files with 314 additions and 57 deletions.
9 changes: 4 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "0ec44f
"prometheus",
] }
# branch dev-rebase-main-20241030
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "e28726443a57028f7c7df11d6d385470dc484d46" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "e28726443a57028f7c7df11d6d385470dc484d46" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "e28726443a57028f7c7df11d6d385470dc484d46" }
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "53f786fb2141b51d10a173cbcb5595edd5aa52a6" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "53f786fb2141b51d10a173cbcb5595edd5aa52a6" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "53f786fb2141b51d10a173cbcb5595edd5aa52a6" }
opendal = "0.49"
# used only by arrow-udf-flight
arrow-flight = "53"
Expand Down
250 changes: 250 additions & 0 deletions src/connector/src/connector_common/iceberg/mock_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use opendal::services::Memory;
use opendal::Operator;

/// A mock catalog for iceberg used for plan test.
#[derive(Debug)]
pub struct MockCatalog;

impl MockCatalog {
Expand Down Expand Up @@ -233,3 +234,252 @@ impl Catalog for MockCatalog {
unimplemented!()
}
}

mod v2 {
use std::collections::HashMap;

use async_trait::async_trait;
use iceberg::io::FileIO;
use iceberg::spec::{
NestedField, PrimitiveType, Schema, TableMetadataBuilder, Transform, Type,
UnboundPartitionField, UnboundPartitionSpec,
};
use iceberg::table::Table as TableV2;
use iceberg::{
Catalog as CatalogV2, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent,
};

use super::MockCatalog;

impl MockCatalog {
fn build_table_v2(
name: &str,
schema: Schema,
partition_spec: UnboundPartitionSpec,
) -> TableV2 {
let file_io = FileIO::from_path("memory://").unwrap().build().unwrap();
let table_creation = TableCreation {
name: "ignore".to_owned(),
location: Some("1".to_owned()),
schema,
partition_spec: Some(partition_spec),
sort_order: None,
properties: HashMap::new(),
};
TableV2::builder()
.identifier(TableIdent::new(
NamespaceIdent::new("mock_namespace".to_owned()),
name.to_owned(),
))
.file_io(file_io)
.metadata(
TableMetadataBuilder::from_table_creation(table_creation)
.unwrap()
.build()
.unwrap(),
)
.build()
.unwrap()
}

fn sparse_table_v2() -> TableV2 {
Self::build_table_v2(
Self::SPARSE_TABLE,
Schema::builder()
.with_fields(vec![
NestedField::new(1, "v1", Type::Primitive(PrimitiveType::Int), true).into(),
NestedField::new(2, "v2", Type::Primitive(PrimitiveType::Long), true)
.into(),
NestedField::new(3, "v3", Type::Primitive(PrimitiveType::String), true)
.into(),
NestedField::new(4, "v4", Type::Primitive(PrimitiveType::Time), true)
.into(),
])
.build()
.unwrap(),
UnboundPartitionSpec::builder()
.with_spec_id(1)
.add_partition_fields(vec![
UnboundPartitionField {
source_id: 1,
field_id: Some(5),
name: "f1".to_owned(),
transform: Transform::Identity,
},
UnboundPartitionField {
source_id: 2,
field_id: Some(6),
name: "f2".to_owned(),
transform: Transform::Bucket(1),
},
UnboundPartitionField {
source_id: 3,
field_id: Some(7),
name: "f3".to_owned(),
transform: Transform::Truncate(1),
},
UnboundPartitionField {
source_id: 4,
field_id: Some(8),
name: "f4".to_owned(),
transform: Transform::Void,
},
])
.unwrap()
.build(),
)
}

fn range_table_v2() -> TableV2 {
Self::build_table_v2(
Self::RANGE_TABLE,
Schema::builder()
.with_fields(vec![
NestedField::new(1, "v1", Type::Primitive(PrimitiveType::Date), true)
.into(),
NestedField::new(2, "v2", Type::Primitive(PrimitiveType::Timestamp), true)
.into(),
NestedField::new(
3,
"v3",
Type::Primitive(PrimitiveType::Timestamptz),
true,
)
.into(),
NestedField::new(
4,
"v4",
Type::Primitive(PrimitiveType::Timestamptz),
true,
)
.into(),
])
.build()
.unwrap(),
UnboundPartitionSpec::builder()
.with_spec_id(1)
.add_partition_fields(vec![
UnboundPartitionField {
source_id: 1,
field_id: Some(5),
name: "f1".to_owned(),
transform: Transform::Year,
},
UnboundPartitionField {
source_id: 2,
field_id: Some(6),
name: "f2".to_owned(),
transform: Transform::Month,
},
UnboundPartitionField {
source_id: 3,
field_id: Some(7),
name: "f3".to_owned(),
transform: Transform::Day,
},
UnboundPartitionField {
source_id: 4,
field_id: Some(8),
name: "f4".to_owned(),
transform: Transform::Hour,
},
])
.unwrap()
.build(),
)
}
}

#[async_trait]
impl CatalogV2 for MockCatalog {
/// List namespaces from table.
async fn list_namespaces(
&self,
_parent: Option<&NamespaceIdent>,
) -> iceberg::Result<Vec<NamespaceIdent>> {
todo!()
}

/// Create a new namespace inside the catalog.
async fn create_namespace(
&self,
_namespace: &iceberg::NamespaceIdent,
_properties: HashMap<String, String>,
) -> iceberg::Result<iceberg::Namespace> {
todo!()
}

/// Get a namespace information from the catalog.
async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
todo!()
}

/// Check if namespace exists in catalog.
async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> iceberg::Result<bool> {
todo!()
}

/// Drop a namespace from the catalog.
async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
todo!()
}

/// List tables from namespace.
async fn list_tables(
&self,
_namespace: &NamespaceIdent,
) -> iceberg::Result<Vec<TableIdent>> {
todo!()
}

async fn update_namespace(
&self,
_namespace: &NamespaceIdent,
_properties: HashMap<String, String>,
) -> iceberg::Result<()> {
todo!()
}

/// Create a new table inside the namespace.
async fn create_table(
&self,
_namespace: &NamespaceIdent,
_creation: TableCreation,
) -> iceberg::Result<TableV2> {
todo!()
}

/// Load table from the catalog.
async fn load_table(&self, table: &TableIdent) -> iceberg::Result<TableV2> {
match table.name.as_ref() {
Self::SPARSE_TABLE => Ok(Self::sparse_table_v2()),
Self::RANGE_TABLE => Ok(Self::range_table_v2()),
_ => unimplemented!("table {} not found", table.name()),
}
}

/// Drop a table from the catalog.
async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> {
todo!()
}

/// Check if a table exists in the catalog.
async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
match table.name.as_ref() {
Self::SPARSE_TABLE => Ok(true),
Self::RANGE_TABLE => Ok(true),
_ => Ok(false),
}
}

/// Rename a table in the catalog.
async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
todo!()
}

/// Update a table to the catalog.
async fn update_table(&self, _commit: TableCommit) -> iceberg::Result<TableV2> {
todo!()
}
}
}
1 change: 1 addition & 0 deletions src/connector/src/connector_common/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ mod v2 {
java_catalog_props,
)
}
"mock" => Ok(Arc::new(mock_catalog::MockCatalog {})),
_ => {
bail!(
"Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`",
Expand Down
8 changes: 8 additions & 0 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::sync::Arc;

use anyhow::{anyhow, Context};
use async_trait::async_trait;
use iceberg::table::Table as TableV2;
use iceberg::{Catalog as CatalogV2, NamespaceIdent, TableCreation, TableIdent};
use icelake::catalog::CatalogRef;
use icelake::io_v2::input_wrapper::{DeltaWriter, RecordBatchWriter};
Expand Down Expand Up @@ -185,6 +186,13 @@ impl IcebergConfig {
.map_err(Into::into)
}

pub async fn load_table_v2(&self) -> Result<TableV2> {
self.common
.load_table_v2(&self.java_catalog_props)
.await
.map_err(Into::into)
}

pub fn full_table_name_v2(&self) -> Result<TableIdent> {
self.common.full_table_name_v2().map_err(Into::into)
}
Expand Down
2 changes: 1 addition & 1 deletion src/expr/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ futures-util = "0.3"
ginepro = "0.8"
hex = "0.4"
hmac = "0.12"
icelake = { workspace = true }
iceberg = { workspace = true }
itertools = { workspace = true }
jsonbb = { workspace = true }
linkme = { workspace = true }
Expand Down
Loading

0 comments on commit 594ef89

Please sign in to comment.