Skip to content

Commit

Permalink
feat(iceberg): support iceberg engine table (in local env) (#19577)
Browse files Browse the repository at this point in the history
Co-authored-by: Li0k <[email protected]>
  • Loading branch information
chenzl25 and Li0k authored Dec 5, 2024
1 parent 45d2664 commit 59fa5f8
Show file tree
Hide file tree
Showing 50 changed files with 1,273 additions and 133 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ services:
volumes:
- ..:/risingwave

iceberg-engine-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20241030
depends_on:
- db
volumes:
- ..:/risingwave

ci-flamegraph-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20241030
# NOTE(kwannoel): This is used in order to permit
Expand Down
48 changes: 48 additions & 0 deletions ci/scripts/e2e-iceberg-engine-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/usr/bin/env bash

# Exits as soon as any line fails.
set -euo pipefail

source ci/scripts/common.sh

# prepare environment
export CONNECTOR_LIBS_PATH="./connector-node/libs"

while getopts 'p:' opt; do
case ${opt} in
p )
profile=$OPTARG
;;
\? )
echo "Invalid Option: -$OPTARG" 1>&2
exit 1
;;
: )
echo "Invalid option: $OPTARG requires an argument" 1>&2
;;
esac
done
shift $((OPTIND -1))

download_and_prepare_rw "$profile" source

echo "--- Download connector node package"
buildkite-agent artifact download risingwave-connector.tar.gz ./
mkdir ./connector-node
tar xf ./risingwave-connector.tar.gz -C ./connector-node

PGPASSWORD=postgres psql -h db -p 5432 -U postgres -c "DROP DATABASE IF EXISTS metadata;" -c "CREATE DATABASE metadata;"

echo "--- starting risingwave cluster"
mkdir -p .risingwave/log
risedev ci-start ci-iceberg-engine
sleep 1


echo "--- testing iceberg engine"
sqllogictest -p 4566 -d dev './e2e_test/iceberg/test_case/iceberg_engine.slt'
sleep 1


echo "--- Kill cluster"
risedev ci-kill
15 changes: 15 additions & 0 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,21 @@ steps:
timeout_in_minutes: 15
retry: *auto-retry

- label: "end-to-end iceberg engine test"
if: build.pull_request.labels includes "ci/run-e2e-iceberg-engine-tests" || build.env("CI_STEPS") =~ /(^|,)e2e-iceberg-engine-tests?(,|$$)/
command: "ci/scripts/e2e-iceberg-engine-test.sh -p ci-dev"
depends_on:
- "build"
- "build-other"
plugins:
- docker-compose#v5.1.0:
run: iceberg-engine-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
retry: *auto-retry

- label: "end-to-end pulsar sink test"
if: build.pull_request.labels includes "ci/run-e2e-pulsar-sink-tests" || build.env("CI_STEPS") =~ /(^|,)e2e-pulsar-sink-tests?(,|$$)/
command: "ci/scripts/e2e-pulsar-sink-test.sh -p ci-dev"
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_engine.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
statement ok
set sink_decouple = false;

statement ok
create table t(id int primary key, xxname varchar) engine = iceberg;

statement ok
insert into t values(1, 'xxx');

statement ok
FLUSH;

sleep 5s

query ??
select * from t;
----
1 xxx

statement ok
DROP TABLE t;
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -4203,7 +4203,7 @@ def section_iceberg_metrics(outer_panels):
"read @ {{table_name}}",
),
panels.target(
f"sum({metric('nimtable_read_bytes')})",
f"sum({metric('iceberg_read_bytes')})",
"total read",
),
],
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ public boolean tableExists(String tableIdentifier) {
return catalog.tableExists(id);
}

/**
* Drop a table from the catalog.
*
* @param tableIdentifier The identifier of the table to drop.
* @return true if the table was dropped, false otherwise.
*/
public boolean dropTable(String tableIdentifier) {
TableIdentifier id = TableIdentifier.parse(tableIdentifier);
return catalog.dropTable(id);
}

/**
* Create JniCatalogWrapper instance.
*
Expand Down
9 changes: 9 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@ message Table {
int32 next_column_id = 2;
}

enum Engine {
ENGINE_UNSPECIFIED = 0;
HUMMOCK = 1;
ICEBERG = 2;
}

uint32 id = 1;
uint32 schema_id = 2;
uint32 database_id = 3;
Expand Down Expand Up @@ -473,6 +479,9 @@ message Table {
// This field stores the job ID for internal tables.
optional uint32 job_id = 42;

// Table Engine, currently only support hummock and iceberg
optional Engine engine = 43;

// Per-table catalog version, used by schema change. `None` for internal
// tables and tests. Not to be confused with the global catalog version for
// notification service.
Expand Down
17 changes: 17 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,23 @@ profile:
- use: compute-node
- use: frontend

ci-iceberg-engine:
steps:
- use: minio
- use: postgres
port: 5432
address: db
database: metadata
user: postgres
password: postgres
user-managed: true
application: metastore
- use: meta-node
meta-backend: postgres
- use: compute-node
- use: frontend
- use: compactor

meta-1cn-1fe-sqlite-with-recovery:
config-path: src/config/ci-recovery.toml
steps:
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/array/arrow/arrow_iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ impl ToArrow for IcebergCreateTableArrowConvert {
arrow_field
}

fn jsonb_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
let data_type = arrow_schema::DataType::Utf8;

let mut arrow_field = arrow_schema::Field::new(name, data_type, true);
self.add_field_id(&mut arrow_field);
arrow_field
}

/// Convert RisingWave data type to Arrow data type.
///
/// This function returns a `Field` instead of `DataType` because some may be converted to
Expand Down
31 changes: 31 additions & 0 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use futures::stream::BoxStream;
pub use internal_table::*;
use parse_display::Display;
pub use physical_table::*;
use risingwave_pb::catalog::table::PbEngine;
use risingwave_pb::catalog::{
CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
StreamJobStatus as PbStreamJobStatus,
Expand Down Expand Up @@ -550,6 +551,36 @@ impl ConflictBehavior {
}
}

#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Engine {
#[default]
Hummock,
Iceberg,
}

impl Engine {
pub fn from_protobuf(engine: &PbEngine) -> Self {
match engine {
PbEngine::Hummock | PbEngine::Unspecified => Engine::Hummock,
PbEngine::Iceberg => Engine::Iceberg,
}
}

pub fn to_protobuf(self) -> PbEngine {
match self {
Engine::Hummock => PbEngine::Hummock,
Engine::Iceberg => PbEngine::Iceberg,
}
}

pub fn debug_to_string(self) -> String {
match self {
Engine::Hummock => "Hummock".to_string(),
Engine::Iceberg => "Iceberg".to_string(),
}
}
}

#[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
pub enum StreamJobStatus {
#[default]
Expand Down
27 changes: 24 additions & 3 deletions src/connector/src/connector_common/iceberg/jni_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl CatalogV2 for JniCatalog {
.map_err(|e| {
iceberg::Error::new(
iceberg::ErrorKind::Unexpected,
"Failed to crete iceberg table.",
"Failed to create iceberg table.",
)
.with_source(e)
})?
Expand Down Expand Up @@ -342,8 +342,29 @@ impl CatalogV2 for JniCatalog {
}

/// Drop a table from the catalog.
async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> {
todo!()
async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> {
execute_with_jni_env(self.jvm, |env| {
let table_name_str = format!(
"{}.{}",
table.namespace().clone().inner().into_iter().join("."),
table.name()
);

let table_name_jstr = env.new_string(&table_name_str).unwrap();

call_method!(env, self.java_catalog.as_obj(), {boolean dropTable(String)},
&table_name_jstr)
.with_context(|| format!("Failed to drop iceberg table: {table_name_str}"))?;

Ok(())
})
.map_err(|e| {
iceberg::Error::new(
iceberg::ErrorKind::Unexpected,
"Failed to load iceberg table.",
)
.with_source(e)
})
}

/// Check if a table exists in the catalog.
Expand Down
Loading

0 comments on commit 59fa5f8

Please sign in to comment.