diff --git a/Cargo.lock b/Cargo.lock index 5e8587d5edadd..56dac7591873b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -312,6 +312,12 @@ version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b3d0060af21e8d11a926981cc00c6c1541aa91dd64b9f881985c3da1094425f" +[[package]] +name = "array-init" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d62b7694a562cdf5a74227903507c56ab2cc8bdd1f781ed5cb4cf9c9f810bfc" + [[package]] name = "array-util" version = "1.0.1" @@ -879,6 +885,23 @@ dependencies = [ "regex-syntax 0.8.2", ] +[[package]] +name = "arrow-string" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e435ada8409bcafc910bc3e0077f532a4daa20e99060a496685c0e3e53cc2597" +dependencies = [ + "arrow-array 52.0.0", + "arrow-buffer 52.0.0", + "arrow-data 52.0.0", + "arrow-schema 52.0.0", + "arrow-select 52.0.0", + "memchr", + "num", + "regex", + "regex-syntax 0.8.2", +] + [[package]] name = "arrow-udf-flight" version = "0.1.0" @@ -2035,6 +2058,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bimap" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" + [[package]] name = "bincode" version = "1.3.3" @@ -6526,6 +6555,72 @@ dependencies = [ "cc", ] +[[package]] +name = "iceberg" +version = "0.2.0" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=0c6e133e6f4655ff9ce4ad57b577dc7f692dd902#0c6e133e6f4655ff9ce4ad57b577dc7f692dd902" +dependencies = [ + "anyhow", + "apache-avro 0.17.0", + "array-init", + "arrow-arith 52.0.0", + "arrow-array 52.0.0", + "arrow-ord 52.0.0", + "arrow-schema 52.0.0", + "arrow-select 52.0.0", + "arrow-string 52.0.0", + "async-stream", + "async-trait", + "bimap", + "bitvec", + "bytes", + "chrono", + "derive_builder 0.20.0", + "either", + "fnv", + "futures", + "itertools 0.13.0", + "lazy_static", + "log", + "murmur3", + "once_cell", + "opendal 0.47.0", + "ordered-float 4.1.1", + "parquet 52.0.0", + "reqwest 0.12.4", + "rust_decimal", + "serde", + "serde_bytes", + "serde_derive", + "serde_json", + "serde_repr", + "serde_with 3.8.0", + "tokio", + "typed-builder 0.18.2", + "url", + "urlencoding", + "uuid", +] + +[[package]] +name = "iceberg-catalog-rest" +version = "0.2.0" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=0c6e133e6f4655ff9ce4ad57b577dc7f692dd902#0c6e133e6f4655ff9ce4ad57b577dc7f692dd902" +dependencies = [ + "async-trait", + "chrono", + "iceberg", + "itertools 0.13.0", + "log", + "reqwest 0.12.4", + "serde", + "serde_derive", + "serde_json", + "typed-builder 0.18.2", + "urlencoding", + "uuid", +] + [[package]] name = "icelake" version = "0.0.10" @@ -8758,6 +8853,38 @@ dependencies = [ "zstd 0.13.0", ] +[[package]] +name = "parquet" +version = "50.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "547b92ebf0c1177e3892f44c8f79757ee62e678d564a9834189725f2c5b7a750" +dependencies = [ + "ahash 0.8.11", + "arrow-array 50.0.0", + "arrow-buffer 50.0.0", + "arrow-cast 50.0.0", + "arrow-data 50.0.0", + "arrow-ipc 50.0.0", + "arrow-schema 50.0.0", + "arrow-select 50.0.0", + "base64 0.21.7", + "brotli 3.5.0", + "bytes", + "chrono", + "flate2", + "half 2.3.1", + "hashbrown 0.14.3", + "lz4_flex", + "num", + "num-bigint", + "paste", + "seq-macro", + "snap", + "thrift", + "twox-hash", + "zstd 0.13.0", +] + [[package]] name = "parquet" version = "52.0.0" @@ -11012,6 +11139,8 @@ dependencies = [ "google-cloud-googleapis", "google-cloud-pubsub", "http 0.2.9", + "iceberg", + "iceberg-catalog-rest", "icelake", "indexmap 2.2.6", "itertools 0.12.1", @@ -11031,6 +11160,7 @@ dependencies = [ "opendal 0.47.1", "openssl", "parking_lot 0.12.1", + "parquet 50.0.0", "paste", "pg_bigdecimal", "postgres-openssl", @@ -11084,6 +11214,7 @@ dependencies = [ "tracing", "tracing-subscriber", "tracing-test", + "typed-builder 0.18.2", "url", "urlencoding", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 249a0f2258863..da795196dfbc9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -138,6 +138,10 @@ arrow-array-iceberg = { package = "arrow-array", version = "52" } arrow-schema-iceberg = { package = "arrow-schema", version = "52" } arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" } arrow-cast-iceberg = { package = "arrow-cast", version = "52" } +# TODO +# After apache/iceberg-rust#411 is merged, we move to the upstream version. +iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "0c6e133e6f4655ff9ce4ad57b577dc7f692dd902" } +iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "0c6e133e6f4655ff9ce4ad57b577dc7f692dd902" } arrow-array = "50" arrow-arith = "50" arrow-cast = "50" diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 17a00eb81735f..3d8bf618eca58 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -64,6 +64,8 @@ google-cloud-gax = "0.17.0" google-cloud-googleapis = { version = "0.13", features = ["pubsub", "bigquery"] } google-cloud-pubsub = "0.25" http = "0.2" +iceberg = { workspace = true } +iceberg-catalog-rest = { workspace = true } icelake = { workspace = true } indexmap = { version = "2.2.6", features = ["serde"] } itertools = { workspace = true } @@ -90,6 +92,7 @@ opendal = { version = "0.47", features = [ ] } openssl = "0.10" parking_lot = { workspace = true } +parquet = { workspace = true } paste = "1" pg_bigdecimal = { git = "https://github.com/risingwavelabs/rust-pg_bigdecimal", rev = "0b7893d88894ca082b4525f94f812da034486f7c" } postgres-openssl = "0.5.0" @@ -160,6 +163,7 @@ tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["codec", "io"] } tonic = { workspace = true } tracing = "0.1" +typed-builder = "^0.18" url = "2" urlencoding = "2" uuid = { version = "1", features = ["v4", "fast-rng"] } diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index c0ee90d5e984d..3a86062d18a07 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -57,6 +57,7 @@ def_anyhow_newtype! { async_nats::jetstream::context::CreateStreamError => "Nats error", async_nats::jetstream::stream::ConsumerError => "Nats error", icelake::Error => "Iceberg error", + iceberg::Error => "IcebergV2 error", redis::RedisError => "Redis error", arrow_schema::ArrowError => "Arrow error", arrow_schema_iceberg::ArrowError => "Arrow error", diff --git a/src/connector/src/sink/iceberg/jni_catalog.rs b/src/connector/src/sink/iceberg/jni_catalog.rs index d88a63d398c65..6ef251878ff94 100644 --- a/src/connector/src/sink/iceberg/jni_catalog.rs +++ b/src/connector/src/sink/iceberg/jni_catalog.rs @@ -15,22 +15,40 @@ //! This module provide jni catalog. use std::collections::HashMap; +use std::fmt::Debug; use std::sync::Arc; use anyhow::Context; use async_trait::async_trait; +use iceberg::io::FileIO; +use iceberg::spec::TableMetadata; +use iceberg::table::Table as TableV2; +use iceberg::{ + Catalog as CatalogV2, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, +}; use icelake::catalog::models::{CommitTableRequest, CommitTableResponse, LoadTableResult}; use icelake::catalog::{ - BaseCatalogConfig, Catalog, CatalogRef, IcebergTableIoArgs, OperatorCreator, UpdateTable, + BaseCatalogConfig, Catalog, IcebergTableIoArgs, OperatorCreator, UpdateTable, }; use icelake::{ErrorKind, Table, TableIdentifier}; +use itertools::Itertools; use jni::objects::{GlobalRef, JObject}; use jni::JavaVM; use risingwave_jni_core::call_method; use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str, JVM}; +use serde::Deserialize; use crate::error::ConnectorResult; +#[derive(Debug, Deserialize)] +#[serde(rename_all = "kebab-case")] +struct LoadTableResponse { + pub metadata_location: Option, + pub metadata: TableMetadata, + pub _config: Option>, +} + +#[derive(Debug)] pub struct JniCatalog { java_catalog: GlobalRef, jvm: &'static JavaVM, @@ -138,13 +156,140 @@ impl Catalog for JniCatalog { } } +#[async_trait] +impl CatalogV2 for JniCatalog { + /// List namespaces from table. + async fn list_namespaces( + &self, + _parent: Option<&NamespaceIdent>, + ) -> iceberg::Result> { + todo!() + } + + /// Create a new namespace inside the catalog. + async fn create_namespace( + &self, + _namespace: &iceberg::NamespaceIdent, + _properties: HashMap, + ) -> iceberg::Result { + todo!() + } + + /// Get a namespace information from the catalog. + async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result { + todo!() + } + + /// Check if namespace exists in catalog. + async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> iceberg::Result { + todo!() + } + + /// Drop a namespace from the catalog. + async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> { + todo!() + } + + /// List tables from namespace. + async fn list_tables(&self, _namespace: &NamespaceIdent) -> iceberg::Result> { + todo!() + } + + async fn update_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap, + ) -> iceberg::Result<()> { + todo!() + } + + /// Create a new table inside the namespace. + async fn create_table( + &self, + _namespace: &NamespaceIdent, + _creation: TableCreation, + ) -> iceberg::Result { + todo!() + } + + /// Load table from the catalog. + async fn load_table(&self, table: &TableIdent) -> iceberg::Result { + execute_with_jni_env(self.jvm, |env| { + let table_name_str = format!( + "{}.{}", + table.namespace().clone().inner().into_iter().join("."), + table.name() + ); + + let table_name_jstr = env.new_string(&table_name_str).unwrap(); + + let result_json = + call_method!(env, self.java_catalog.as_obj(), {String loadTable(String)}, + &table_name_jstr) + .with_context(|| format!("Failed to load iceberg table: {table_name_str}"))?; + + let rust_json_str = jobj_to_str(env, result_json)?; + + let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?; + + let metadata_location = resp.metadata_location.ok_or_else(|| { + icelake::Error::new( + ErrorKind::IcebergFeatureUnsupported, + "Loading uncommitted table is not supported!", + ) + })?; + + tracing::info!("Table metadata location of {table_name_str} is {metadata_location}"); + + let table_metadata = resp.metadata; + + let file_io = FileIO::from_path(&metadata_location)? + .with_props(self.config.table_io_configs.iter()) + .build()?; + + Ok(TableV2::builder() + .file_io(file_io) + .identifier(table.clone()) + .metadata(table_metadata) + .build()) + }) + .map_err(|e| { + iceberg::Error::new( + iceberg::ErrorKind::Unexpected, + "Failed to load iceberg table.", + ) + .with_source(e) + }) + } + + /// Drop a table from the catalog. + async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> { + todo!() + } + + /// Check if a table exists in the catalog. + async fn table_exists(&self, _table: &TableIdent) -> iceberg::Result { + todo!() + } + + /// Rename a table in the catalog. + async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> { + todo!() + } + + /// Update a table to the catalog. + async fn update_table(&self, _commit: TableCommit) -> iceberg::Result { + todo!() + } +} + impl JniCatalog { - pub fn build( + fn build( base_config: BaseCatalogConfig, name: impl ToString, catalog_impl: impl ToString, java_catalog_props: HashMap, - ) -> ConnectorResult { + ) -> ConnectorResult { let jvm = JVM.get_or_init()?; execute_with_jni_env(jvm, |env| { @@ -175,12 +320,32 @@ impl JniCatalog { let jni_catalog = env.new_global_ref(jni_catalog_wrapper.l().unwrap())?; - Ok(Arc::new(Self { + Ok(Self { java_catalog: jni_catalog, jvm, config: base_config, - }) as CatalogRef) + }) }) .map_err(Into::into) } + + pub fn build_catalog( + base_config: BaseCatalogConfig, + name: impl ToString, + catalog_impl: impl ToString, + java_catalog_props: HashMap, + ) -> ConnectorResult> { + let catalog = Self::build(base_config, name, catalog_impl, java_catalog_props)?; + Ok(Arc::new(catalog) as Arc) + } + + pub fn build_catalog_v2( + base_config: BaseCatalogConfig, + name: impl ToString, + catalog_impl: impl ToString, + java_catalog_props: HashMap, + ) -> ConnectorResult> { + let catalog = Self::build(base_config, name, catalog_impl, java_catalog_props)?; + Ok(Arc::new(catalog) as Arc) + } } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 5b9b175b22dc8..b4b3609783992 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -15,6 +15,7 @@ mod jni_catalog; mod mock_catalog; mod prometheus; +mod storage_catalog; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; @@ -27,6 +28,8 @@ use arrow_schema_iceberg::{ DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef, }; use async_trait::async_trait; +use iceberg::table::Table as TableV2; +use iceberg::{Catalog as CatalogV2, TableIdent}; use icelake::catalog::{ load_catalog, load_iceberg_base_catalog_config, BaseCatalogConfig, CatalogRef, CATALOG_NAME, CATALOG_TYPE, @@ -49,6 +52,7 @@ use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; use risingwave_pb::connector_service::SinkMetadata; use serde_derive::Deserialize; +use storage_catalog::StorageCatalogConfig; use thiserror_ext::AsReport; use url::Url; use with_options::WithOptions; @@ -401,7 +405,7 @@ impl IcebergConfig { _ => unreachable!(), }; - jni_catalog::JniCatalog::build( + jni_catalog::JniCatalog::build_catalog( base_catalog_config, self.catalog_name(), catalog_impl, @@ -432,6 +436,79 @@ impl IcebergConfig { } } +impl IcebergConfig { + fn full_table_name_v2(&self) -> Result { + let ret = if let Some(database_name) = &self.database_name { + TableIdent::from_strs(vec![database_name, &self.table_name]) + } else { + TableIdent::from_strs(vec![&self.table_name]) + }; + + ret.context("Failed to create table identifier") + .map_err(|e| SinkError::Iceberg(anyhow!(e))) + } + + async fn create_catalog_v2(&self) -> ConnectorResult> { + match self.catalog_type() { + "storage" => { + let config = StorageCatalogConfig::builder() + .warehouse(self.path.clone()) + .access_key(self.access_key.clone()) + .secret_key(self.secret_key.clone()) + .region(self.region.clone()) + .endpoint(self.endpoint.clone()) + .build(); + let catalog = storage_catalog::StorageCatalog::new(config)?; + Ok(Arc::new(catalog)) + } + "rest" => { + let config = iceberg_catalog_rest::RestCatalogConfig::builder() + .uri(self.uri.clone().ok_or_else(|| { + SinkError::Iceberg(anyhow!("`catalog.uri` must be set in rest catalog")) + })?) + .build(); + let catalog = iceberg_catalog_rest::RestCatalog::new(config).await?; + Ok(Arc::new(catalog)) + } + catalog_type if catalog_type == "hive" || catalog_type == "jdbc" => { + // Create java catalog + let (base_catalog_config, java_catalog_props) = self.build_jni_catalog_configs()?; + let catalog_impl = match catalog_type { + "hive" => "org.apache.iceberg.hive.HiveCatalog", + "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog", + _ => unreachable!(), + }; + + jni_catalog::JniCatalog::build_catalog_v2( + base_catalog_config, + self.catalog_name(), + catalog_impl, + java_catalog_props, + ) + } + _ => { + bail!( + "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`", + self.catalog_type() + ) + } + } + } + + pub async fn load_table_v2(&self) -> ConnectorResult { + let catalog = self + .create_catalog_v2() + .await + .context("Unable to load iceberg catalog")?; + + let table_id = self + .full_table_name_v2() + .context("Unable to parse table name")?; + + catalog.load_table(&table_id).await.map_err(Into::into) + } +} + pub struct IcebergSink { config: IcebergConfig, param: SinkParam, diff --git a/src/connector/src/sink/iceberg/storage_catalog.rs b/src/connector/src/sink/iceberg/storage_catalog.rs new file mode 100644 index 0000000000000..5fb2c5105fda5 --- /dev/null +++ b/src/connector/src/sink/iceberg/storage_catalog.rs @@ -0,0 +1,274 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This module provide storage catalog. + +use std::collections::HashMap; + +use async_trait::async_trait; +use iceberg::io::{FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg::spec::TableMetadata; +use iceberg::table::Table; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, + TableIdent, +}; +use opendal::Operator; +use thiserror_ext::AsReport; +use tokio_stream::StreamExt; +use typed_builder::TypedBuilder; + +#[derive(Clone, Debug, TypedBuilder)] +pub struct StorageCatalogConfig { + warehouse: String, + access_key: String, + secret_key: String, + endpoint: Option, + region: Option, +} + +/// File system catalog. +#[derive(Debug)] +pub struct StorageCatalog { + warehouse: String, + file_io: FileIO, + config: StorageCatalogConfig, +} + +impl StorageCatalog { + pub fn new(config: StorageCatalogConfig) -> Result { + let mut file_io_builder = FileIO::from_path(&config.warehouse)? + .with_prop(S3_ACCESS_KEY_ID, &config.access_key) + .with_prop(S3_SECRET_ACCESS_KEY, &config.secret_key); + file_io_builder = if let Some(endpoint) = &config.endpoint { + file_io_builder.with_prop(S3_ENDPOINT, endpoint) + } else { + file_io_builder + }; + file_io_builder = if let Some(region) = &config.region { + file_io_builder.with_prop(S3_REGION, region) + } else { + file_io_builder + }; + + Ok(StorageCatalog { + warehouse: config.warehouse.clone(), + file_io: file_io_builder.build()?, + config, + }) + } + + /// Check if version hint file exist. + /// + /// `table_path`: relative path of table dir under warehouse root. + async fn is_version_hint_exist(&self, table_path: &str) -> Result { + self.file_io + .is_exist(format!("{table_path}/metadata/version-hint.text").as_str()) + .await + .map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!("check if version hint exist failed: {}", err.as_report()), + ) + }) + } + + /// Read version hint of table. + /// + /// `table_path`: relative path of table dir under warehouse root. + async fn read_version_hint(&self, table_path: &str) -> Result { + let content = self + .file_io + .new_input(format!("{table_path}/metadata/version-hint.text").as_str())? + .read() + .await?; + let version_hint = String::from_utf8(content.to_vec()).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Fail to covert version_hint from utf8 to string: {}", + err.as_report() + ), + ) + })?; + + version_hint + .parse() + .map_err(|_| Error::new(ErrorKind::DataInvalid, "parse version hint failed")) + } + + /// List all paths of table metadata files. + /// + /// The returned paths are sorted by name. + /// + /// TODO: we can improve this by only fetch the latest metadata. + /// + /// `table_path`: relative path of table dir under warehouse root. + async fn list_table_metadata_paths(&self, table_path: &str) -> Result> { + // create s3 operator + let mut builder = opendal::services::S3::default(); + builder + .root(&self.warehouse) + .access_key_id(&self.config.access_key) + .secret_access_key(&self.config.secret_key); + if let Some(endpoint) = &self.config.endpoint { + builder.endpoint(endpoint); + } + if let Some(region) = &self.config.region { + builder.region(region); + } + let op: Operator = Operator::new(builder) + .map_err(|err| Error::new(ErrorKind::Unexpected, err.to_report_string()))? + .finish(); + + // list metadata files + let mut lister = op + .lister(format!("{table_path}/metadata/").as_str()) + .await + .map_err(|err| { + Error::new( + ErrorKind::Unexpected, + format!("list metadata failed: {}", err.as_report()), + ) + })?; + let mut paths = vec![]; + while let Some(entry) = lister.next().await { + let entry = entry.map_err(|err| { + Error::new( + ErrorKind::Unexpected, + format!("list metadata entry failed: {}", err.as_report()), + ) + })?; + + // Only push into paths if the entry is a metadata file. + if entry.path().ends_with(".metadata.json") { + paths.push(entry.path().to_string()); + } + } + + // Make the returned paths sorted by name. + paths.sort(); + + Ok(paths) + } +} + +#[async_trait] +impl Catalog for StorageCatalog { + /// List namespaces from table. + async fn list_namespaces( + &self, + _parent: Option<&NamespaceIdent>, + ) -> iceberg::Result> { + todo!() + } + + /// Create a new namespace inside the catalog. + async fn create_namespace( + &self, + _namespace: &iceberg::NamespaceIdent, + _properties: HashMap, + ) -> iceberg::Result { + todo!() + } + + /// Get a namespace information from the catalog. + async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result { + todo!() + } + + /// Check if namespace exists in catalog. + async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> iceberg::Result { + todo!() + } + + /// Drop a namespace from the catalog. + async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> { + todo!() + } + + /// List tables from namespace. + async fn list_tables(&self, _namespace: &NamespaceIdent) -> iceberg::Result> { + todo!() + } + + async fn update_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap, + ) -> iceberg::Result<()> { + todo!() + } + + /// Create a new table inside the namespace. + async fn create_table( + &self, + _namespace: &NamespaceIdent, + _creation: TableCreation, + ) -> iceberg::Result { + todo!() + } + + /// Load table from the catalog. + async fn load_table(&self, table: &TableIdent) -> iceberg::Result
{ + let table_path = { + let mut names = table.namespace.clone().inner(); + names.push(table.name.to_string()); + format!("{}/{}", self.warehouse, names.join("/")) + }; + let path = if self.is_version_hint_exist(&table_path).await? { + let version_hint = self.read_version_hint(&table_path).await?; + format!("{table_path}/metadata/v{}.metadata.json", version_hint) + } else { + let files = self.list_table_metadata_paths(&table_path).await?; + + files.into_iter().last().ok_or(Error::new( + ErrorKind::DataInvalid, + "no table metadata found", + ))? + }; + + let metadata_file = self.file_io.new_input(path)?; + let metadata_file_content = metadata_file.read().await?; + let table_metadata = serde_json::from_slice::(&metadata_file_content)?; + + Ok(Table::builder() + .metadata(table_metadata) + .identifier(table.clone()) + .file_io(self.file_io.clone()) + // Only support readonly table for storage catalog now. + .readonly(true) + .build()) + } + + /// Drop a table from the catalog. + async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> { + todo!() + } + + /// Check if a table exists in the catalog. + async fn table_exists(&self, _table: &TableIdent) -> iceberg::Result { + todo!() + } + + /// Rename a table in the catalog. + async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> { + todo!() + } + + /// Update a table to the catalog. + async fn update_table(&self, _commit: TableCommit) -> iceberg::Result
{ + todo!() + } +}