Skip to content

Commit

Permalink
feat(iceberg): support create table for jdbc catalog (#18364)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Sep 3, 2024
1 parent f05d549 commit 5d8b165
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.util.Objects;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.CatalogHandlers;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.LoadTableResponse;

Expand Down Expand Up @@ -62,6 +64,38 @@ public String updateTable(String updateTableRequest) throws Exception {
return RESTObjectMapper.mapper().writer().writeValueAsString(resp);
}

/**
* Create table through this prox.
*
* @param namespaceStr String.
* @param createTableRequest Request serialized using json.
* @return Response serialized using json.
* @throws Exception
*/
public String createTable(String namespaceStr, String createTableRequest) throws Exception {
Namespace namespace;
if (namespaceStr == null) {
namespace = Namespace.empty();
} else {
namespace = Namespace.of(namespaceStr);
}
CreateTableRequest req =
RESTObjectMapper.mapper().readValue(createTableRequest, CreateTableRequest.class);
LoadTableResponse resp = CatalogHandlers.createTable(catalog, namespace, req);
return RESTObjectMapper.mapper().writer().writeValueAsString(resp);
}

/**
* Checks if a table exists in the catalog.
*
* @param tableIdentifier The identifier of the table to check.
* @return true if the table exists, false otherwise.
*/
public boolean tableExists(String tableIdentifier) {
TableIdentifier id = TableIdentifier.parse(tableIdentifier);
return catalog.tableExists(id);
}

/**
* Create JniCatalogWrapper instance.
*
Expand Down
123 changes: 113 additions & 10 deletions src/connector/src/sink/iceberg/jni_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;
use anyhow::Context;
use async_trait::async_trait;
use iceberg::io::FileIO;
use iceberg::spec::TableMetadata;
use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
use iceberg::table::Table as TableV2;
use iceberg::{
Catalog as CatalogV2, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent,
Expand All @@ -34,9 +34,10 @@ use icelake::{ErrorKind, Table, TableIdentifier};
use itertools::Itertools;
use jni::objects::{GlobalRef, JObject};
use jni::JavaVM;
use risingwave_common::bail;
use risingwave_jni_core::call_method;
use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str, JVM};
use serde::Deserialize;
use serde::{Deserialize, Serialize};

use crate::error::ConnectorResult;

Expand All @@ -48,6 +49,36 @@ struct LoadTableResponse {
pub _config: Option<HashMap<String, String>>,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "kebab-case")]
struct CreateTableRequest {
/// The name of the table.
pub name: String,
/// The location of the table.
pub location: Option<String>,
/// The schema of the table.
pub schema: Schema,
/// The partition spec of the table, could be None.
pub partition_spec: Option<UnboundPartitionSpec>,
/// The sort order of the table.
pub write_order: Option<SortOrder>,
/// The properties of the table.
pub properties: HashMap<String, String>,
}

impl From<&TableCreation> for CreateTableRequest {
fn from(value: &TableCreation) -> Self {
Self {
name: value.name.clone(),
location: value.location.clone(),
schema: value.schema.clone(),
partition_spec: value.partition_spec.clone(),
write_order: value.sort_order.clone(),
properties: value.properties.clone(),
}
}
}

#[derive(Debug)]
pub struct JniCatalog {
java_catalog: GlobalRef,
Expand Down Expand Up @@ -206,10 +237,58 @@ impl CatalogV2 for JniCatalog {
/// Create a new table inside the namespace.
async fn create_table(
&self,
_namespace: &NamespaceIdent,
_creation: TableCreation,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> iceberg::Result<TableV2> {
todo!()
execute_with_jni_env(self.jvm, |env| {
let namespace_jstr = if namespace.is_empty() {
env.new_string("").unwrap()
} else {
if namespace.len() > 1 {
bail!("Namespace with more than one level is not supported!")
}
env.new_string(&namespace[0]).unwrap()
};

let creation_str = serde_json::to_string(&CreateTableRequest::from(&creation))?;

let creation_jstr = env.new_string(&creation_str).unwrap();

let result_json =
call_method!(env, self.java_catalog.as_obj(), {String createTable(String, String)},
&namespace_jstr, &creation_jstr)
.with_context(|| format!("Failed to create iceberg table: {}", creation.name))?;

let rust_json_str = jobj_to_str(env, result_json)?;

let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;

let metadata_location = resp.metadata_location.ok_or_else(|| {
iceberg::Error::new(
iceberg::ErrorKind::FeatureUnsupported,
"Loading uncommitted table is not supported!",
)
})?;

let table_metadata = resp.metadata;

let file_io = FileIO::from_path(&metadata_location)?
.with_props(self.config.table_io_configs.iter())
.build()?;

Ok(TableV2::builder()
.file_io(file_io)
.identifier(TableIdent::new(namespace.clone(), creation.name))
.metadata(table_metadata)
.build())
})
.map_err(|e| {
iceberg::Error::new(
iceberg::ErrorKind::Unexpected,
"Failed to crete iceberg table.",
)
.with_source(e)
})
}

/// Load table from the catalog.
Expand All @@ -233,8 +312,8 @@ impl CatalogV2 for JniCatalog {
let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?;

let metadata_location = resp.metadata_location.ok_or_else(|| {
icelake::Error::new(
ErrorKind::IcebergFeatureUnsupported,
iceberg::Error::new(
iceberg::ErrorKind::FeatureUnsupported,
"Loading uncommitted table is not supported!",
)
})?;
Expand Down Expand Up @@ -268,8 +347,32 @@ impl CatalogV2 for JniCatalog {
}

/// Check if a table exists in the catalog.
async fn table_exists(&self, _table: &TableIdent) -> iceberg::Result<bool> {
todo!()
async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
execute_with_jni_env(self.jvm, |env| {
let table_name_str = format!(
"{}.{}",
table.namespace().clone().inner().into_iter().join("."),
table.name()
);

let table_name_jstr = env.new_string(&table_name_str).unwrap();

let exists =
call_method!(env, self.java_catalog.as_obj(), {boolean tableExists(String)},
&table_name_jstr)
.with_context(|| {
format!("Failed to check iceberg table exists: {table_name_str}")
})?;

Ok(exists)
})
.map_err(|e| {
iceberg::Error::new(
iceberg::ErrorKind::Unexpected,
"Failed to load iceberg table.",
)
.with_source(e)
})
}

/// Rename a table in the catalog.
Expand Down Expand Up @@ -326,7 +429,7 @@ impl JniCatalog {
config: base_config,
})
})
.map_err(Into::into)
.map_err(Into::into)
}

pub fn build_catalog(
Expand Down

0 comments on commit 5d8b165

Please sign in to comment.