Skip to content

Commit

Permalink
support list for storage_catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Jun 18, 2024
1 parent a24df58 commit 71974f0
Showing 1 changed file with 70 additions and 9 deletions.
79 changes: 70 additions & 9 deletions src/connector/src/sink/iceberg/storage_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
TableIdent,
};
use opendal::Operator;
use thiserror_ext::AsReport;
use tokio_stream::StreamExt;
use typed_builder::TypedBuilder;

#[derive(Debug, TypedBuilder)]
#[derive(Clone, Debug, TypedBuilder)]
pub struct StorageCatalogConfig {
warehouse: String,
access_key: String,
Expand All @@ -41,27 +43,29 @@ pub struct StorageCatalogConfig {
pub struct StorageCatalog {
warehouse: String,
file_io: FileIO,
config: StorageCatalogConfig,
}

impl StorageCatalog {
pub fn new(config: StorageCatalogConfig) -> Result<Self> {
let mut file_io_builder = FileIO::from_path(&config.warehouse)?
.with_prop(S3_ACCESS_KEY_ID, config.access_key)
.with_prop(S3_SECRET_ACCESS_KEY, config.secret_key);
file_io_builder = if let Some(endpoint) = config.endpoint {
.with_prop(S3_ACCESS_KEY_ID, &config.access_key)
.with_prop(S3_SECRET_ACCESS_KEY, &config.secret_key);
file_io_builder = if let Some(endpoint) = &config.endpoint {
file_io_builder.with_prop(S3_ENDPOINT, endpoint)
} else {
file_io_builder
};
file_io_builder = if let Some(region) = config.region {
file_io_builder = if let Some(region) = &config.region {
file_io_builder.with_prop(S3_REGION, region)
} else {
file_io_builder
};

Ok(StorageCatalog {
warehouse: config.warehouse,
warehouse: config.warehouse.clone(),
file_io: file_io_builder.build()?,
config,
})
}

Expand Down Expand Up @@ -103,6 +107,61 @@ impl StorageCatalog {
.parse()
.map_err(|_| Error::new(ErrorKind::DataInvalid, "parse version hint failed"))
}

/// List all paths of table metadata files.
///
/// The returned paths are sorted by name.
///
/// TODO: we can improve this by only fetch the latest metadata.
///
/// `table_path`: relative path of table dir under warehouse root.
async fn list_table_metadata_paths(&self, table_path: &str) -> Result<Vec<String>> {
// create s3 operator
let mut builder = opendal::services::S3::default();
builder
.root(&self.warehouse)
.access_key_id(&self.config.access_key)
.secret_access_key(&self.config.secret_key);
if let Some(endpoint) = &self.config.endpoint {
builder.endpoint(endpoint);
}
if let Some(region) = &self.config.region {
builder.region(region);
}
let op: Operator = Operator::new(builder)
.map_err(|err| Error::new(ErrorKind::Unexpected, err.to_report_string()))?
.finish();

// list metadata files
let mut lister = op
.lister(format!("{table_path}/metadata/").as_str())
.await
.map_err(|err| {
Error::new(
ErrorKind::Unexpected,
format!("list metadata failed: {}", err),
)
})?;
let mut paths = vec![];
while let Some(entry) = lister.next().await {
let entry = entry.map_err(|err| {
Error::new(
ErrorKind::Unexpected,
format!("list metadata entry failed: {}", err),
)
})?;

// Only push into paths if the entry is a metadata file.
if entry.path().ends_with(".metadata.json") {
paths.push(entry.path().to_string());
}
}

// Make the returned paths sorted by name.
paths.sort();

Ok(paths)
}
}

#[async_trait]
Expand Down Expand Up @@ -172,10 +231,12 @@ impl Catalog for StorageCatalog {
let version_hint = self.read_version_hint(&table_path).await?;
format!("{table_path}/metadata/v{}.metadata.json", version_hint)
} else {
return Err(Error::new(
let files = self.list_table_metadata_paths(&table_path).await?;

files.into_iter().last().ok_or(Error::new(
ErrorKind::DataInvalid,
"no table version hint found",
));
"no table metadata found",
))?
};

let metadata_file = self.file_io.new_input(path)?;
Expand Down

0 comments on commit 71974f0

Please sign in to comment.