Skip to content

Commit

Permalink
add iceberg source integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Mar 6, 2024
1 parent 086749f commit 3590a64
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 7 deletions.
11 changes: 11 additions & 0 deletions integration_tests/iceberg-sink2/docker/storage/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,15 @@ catalog.type = storage
catalog.name = demo
warehouse.path = s3://icebergdata/demo
database.name=s1
table.name=t1

[source]
connector = iceberg
s3.endpoint=http://minio-0:9301
s3.access.key = hummockadmin
s3.secret.key = hummockadmin
s3.region = ap-southeast-1
catalog.type = storage
warehouse.path = s3://icebergdata/demo
database.name=s1
table.name=t1
32 changes: 32 additions & 0 deletions integration_tests/iceberg-sink2/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ def init_risingwave_mv(docker):
"""
]

if 'source' in config:
source_config = config['source']
source_param = ",\n".join([f"{k}='{v}'" for k, v in source_config.items()])
sqls.append(
f"""
CREATE SOURCE iceberg_source
WITH (
{source_param}
);
"""
)

rw_config = config['risingwave']
with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'],
port=rw_config['port']) as conn:
Expand All @@ -127,6 +139,24 @@ def check_spark_table(docker):
result = spark.sql(sql).collect()
assert result[0][0] > 100, f"Inserted result is too small: {result[0][0]}, test failed"

def check_risingwave_iceberg_source(docker):
config = read_config(f"{docker.case_dir()}/config.ini")

sqls = [
"select count(*) from iceberg_source"
]

rw_config = config['risingwave']
with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'],
port=rw_config['port']) as conn:
with conn.cursor() as cursor:
for sql in sqls:
print(f"Executing sql {sql}")
# execute sql and collect result
cursor.execute(sql)
result = cursor.fetchall()
assert result[0][0] > 100, f"Inserted result is too small: {result[0][0]}, test failed"


def run_case(case):
with DockerCompose(case) as docker:
Expand All @@ -135,6 +165,8 @@ def run_case(case):
print("Let risingwave to run")
time.sleep(5)
check_spark_table(docker)
if case == "storage":
check_risingwave_iceberg_source(docker)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::executor::{DataChunk, Executor};
/// async fn test_iceberg_scan() {
/// let iceberg_scan_executor = IcebergScanExecutor::new(
/// IcebergConfig {
/// database_name: "demo_db".into(),
/// database_name: Some("demo_db".into()),
/// table_name: "demo_table".into(),
/// catalog_type: Some("storage".into()),
/// path: "s3a://hummock001/".into(),
Expand Down
17 changes: 11 additions & 6 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ pub const ICEBERG_CONNECTOR: &str = "iceberg";
#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
pub struct IcebergProperties {
#[serde(rename = "catalog.type")]
pub catalog_type: String,
pub catalog_type: Option<String>,
#[serde(rename = "s3.region")]
pub region_name: String,
pub region: Option<String>,
#[serde(rename = "s3.endpoint", default)]
pub endpoint: String,
#[serde(rename = "s3.access.key", default)]
Expand All @@ -46,8 +46,12 @@ pub struct IcebergProperties {
pub s3_secret: String,
#[serde(rename = "warehouse.path")]
pub warehouse_path: String,
// Catalog name, can be omitted for storage catalog, but
// must be set for other catalogs.
#[serde(rename = "catalog.name")]
pub catalog_name: Option<String>,
#[serde(rename = "database.name")]
pub database_name: String,
pub database_name: Option<String>,
#[serde(rename = "table.name")]
pub table_name: String,

Expand All @@ -58,14 +62,15 @@ pub struct IcebergProperties {
impl IcebergProperties {
pub fn to_iceberg_config(&self) -> IcebergConfig {
IcebergConfig {
database_name: Some(self.database_name.clone()),
catalog_name: self.catalog_name.clone(),
database_name: self.database_name.clone(),
table_name: self.table_name.clone(),
catalog_type: Some(self.catalog_type.clone()),
catalog_type: self.catalog_type.clone(),
path: self.warehouse_path.clone(),
endpoint: Some(self.endpoint.clone()),
access_key: self.s3_access.clone(),
secret_key: self.s3_secret.clone(),
region: Some(self.region_name.clone()),
region: self.region.clone(),
..Default::default()
}
}
Expand Down

0 comments on commit 3590a64

Please sign in to comment.