From 71974f051431765eb80e7b076832e2a49713bc99 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 18 Jun 2024 19:13:36 +0800 Subject: [PATCH] support list for storage_catalog --- .../src/sink/iceberg/storage_catalog.rs | 79 ++++++++++++++++--- 1 file changed, 70 insertions(+), 9 deletions(-) diff --git a/src/connector/src/sink/iceberg/storage_catalog.rs b/src/connector/src/sink/iceberg/storage_catalog.rs index d440da1d04615..4da8cfbdbec5a 100644 --- a/src/connector/src/sink/iceberg/storage_catalog.rs +++ b/src/connector/src/sink/iceberg/storage_catalog.rs @@ -24,10 +24,12 @@ use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; +use opendal::Operator; use thiserror_ext::AsReport; +use tokio_stream::StreamExt; use typed_builder::TypedBuilder; -#[derive(Debug, TypedBuilder)] +#[derive(Clone, Debug, TypedBuilder)] pub struct StorageCatalogConfig { warehouse: String, access_key: String, @@ -41,27 +43,29 @@ pub struct StorageCatalogConfig { pub struct StorageCatalog { warehouse: String, file_io: FileIO, + config: StorageCatalogConfig, } impl StorageCatalog { pub fn new(config: StorageCatalogConfig) -> Result { let mut file_io_builder = FileIO::from_path(&config.warehouse)? - .with_prop(S3_ACCESS_KEY_ID, config.access_key) - .with_prop(S3_SECRET_ACCESS_KEY, config.secret_key); - file_io_builder = if let Some(endpoint) = config.endpoint { + .with_prop(S3_ACCESS_KEY_ID, &config.access_key) + .with_prop(S3_SECRET_ACCESS_KEY, &config.secret_key); + file_io_builder = if let Some(endpoint) = &config.endpoint { file_io_builder.with_prop(S3_ENDPOINT, endpoint) } else { file_io_builder }; - file_io_builder = if let Some(region) = config.region { + file_io_builder = if let Some(region) = &config.region { file_io_builder.with_prop(S3_REGION, region) } else { file_io_builder }; Ok(StorageCatalog { - warehouse: config.warehouse, + warehouse: config.warehouse.clone(), file_io: file_io_builder.build()?, + config, }) } @@ -103,6 +107,61 @@ impl StorageCatalog { .parse() .map_err(|_| Error::new(ErrorKind::DataInvalid, "parse version hint failed")) } + + /// List all paths of table metadata files. + /// + /// The returned paths are sorted by name. + /// + /// TODO: we can improve this by only fetch the latest metadata. + /// + /// `table_path`: relative path of table dir under warehouse root. + async fn list_table_metadata_paths(&self, table_path: &str) -> Result> { + // create s3 operator + let mut builder = opendal::services::S3::default(); + builder + .root(&self.warehouse) + .access_key_id(&self.config.access_key) + .secret_access_key(&self.config.secret_key); + if let Some(endpoint) = &self.config.endpoint { + builder.endpoint(endpoint); + } + if let Some(region) = &self.config.region { + builder.region(region); + } + let op: Operator = Operator::new(builder) + .map_err(|err| Error::new(ErrorKind::Unexpected, err.to_report_string()))? + .finish(); + + // list metadata files + let mut lister = op + .lister(format!("{table_path}/metadata/").as_str()) + .await + .map_err(|err| { + Error::new( + ErrorKind::Unexpected, + format!("list metadata failed: {}", err), + ) + })?; + let mut paths = vec![]; + while let Some(entry) = lister.next().await { + let entry = entry.map_err(|err| { + Error::new( + ErrorKind::Unexpected, + format!("list metadata entry failed: {}", err), + ) + })?; + + // Only push into paths if the entry is a metadata file. + if entry.path().ends_with(".metadata.json") { + paths.push(entry.path().to_string()); + } + } + + // Make the returned paths sorted by name. + paths.sort(); + + Ok(paths) + } } #[async_trait] @@ -172,10 +231,12 @@ impl Catalog for StorageCatalog { let version_hint = self.read_version_hint(&table_path).await?; format!("{table_path}/metadata/v{}.metadata.json", version_hint) } else { - return Err(Error::new( + let files = self.list_table_metadata_paths(&table_path).await?; + + files.into_iter().last().ok_or(Error::new( ErrorKind::DataInvalid, - "no table version hint found", - )); + "no table metadata found", + ))? }; let metadata_file = self.file_io.new_input(path)?;