Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(batch): add iceberg source integration test #15491

Merged
merged 10 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions integration_tests/iceberg-sink2/docker/storage/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,15 @@ catalog.type = storage
catalog.name = demo
warehouse.path = s3://icebergdata/demo
database.name=s1
table.name=t1

[source]
connector = iceberg
s3.endpoint=http://minio-0:9301
s3.access.key = hummockadmin
s3.secret.key = hummockadmin
s3.region = ap-southeast-1
catalog.type = storage
warehouse.path = s3://icebergdata/demo
database.name=s1
table.name=t1
32 changes: 32 additions & 0 deletions integration_tests/iceberg-sink2/python/main.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to move this to another dir such as iceberg-source

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.

Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ def init_risingwave_mv(docker):
"""
]

if 'source' in config:
source_config = config['source']
source_param = ",\n".join([f"{k}='{v}'" for k, v in source_config.items()])
sqls.append(
f"""
CREATE SOURCE iceberg_source
WITH (
{source_param}
);
"""
)

rw_config = config['risingwave']
with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'],
port=rw_config['port']) as conn:
Expand All @@ -127,6 +139,24 @@ def check_spark_table(docker):
result = spark.sql(sql).collect()
assert result[0][0] > 100, f"Inserted result is too small: {result[0][0]}, test failed"

def check_risingwave_iceberg_source(docker):
config = read_config(f"{docker.case_dir()}/config.ini")

sqls = [
"select count(*) from iceberg_source"
]

rw_config = config['risingwave']
with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'],
port=rw_config['port']) as conn:
with conn.cursor() as cursor:
for sql in sqls:
print(f"Executing sql {sql}")
# execute sql and collect result
cursor.execute(sql)
result = cursor.fetchall()
assert result[0][0] > 100, f"Inserted result is too small: {result[0][0]}, test failed"


def run_case(case):
with DockerCompose(case) as docker:
Expand All @@ -135,6 +165,8 @@ def run_case(case):
print("Let risingwave to run")
time.sleep(5)
check_spark_table(docker)
if case == "storage":
check_risingwave_iceberg_source(docker)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::executor::{DataChunk, Executor};
/// async fn test_iceberg_scan() {
/// let iceberg_scan_executor = IcebergScanExecutor::new(
/// IcebergConfig {
/// database_name: "demo_db".into(),
/// database_name: Some("demo_db".into()),
/// table_name: "demo_table".into(),
/// catalog_type: Some("storage".into()),
/// path: "s3a://hummock001/".into(),
Expand Down
17 changes: 11 additions & 6 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ pub const ICEBERG_CONNECTOR: &str = "iceberg";
#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
pub struct IcebergProperties {
#[serde(rename = "catalog.type")]
pub catalog_type: String,
pub catalog_type: Option<String>,
#[serde(rename = "s3.region")]
pub region_name: String,
pub region: Option<String>,
#[serde(rename = "s3.endpoint", default)]
pub endpoint: String,
#[serde(rename = "s3.access.key", default)]
Expand All @@ -46,8 +46,12 @@ pub struct IcebergProperties {
pub s3_secret: String,
#[serde(rename = "warehouse.path")]
pub warehouse_path: String,
// Catalog name, can be omitted for storage catalog, but
// must be set for other catalogs.
#[serde(rename = "catalog.name")]
pub catalog_name: Option<String>,
#[serde(rename = "database.name")]
pub database_name: String,
pub database_name: Option<String>,
#[serde(rename = "table.name")]
pub table_name: String,

Expand All @@ -58,14 +62,15 @@ pub struct IcebergProperties {
impl IcebergProperties {
pub fn to_iceberg_config(&self) -> IcebergConfig {
IcebergConfig {
database_name: Some(self.database_name.clone()),
catalog_name: self.catalog_name.clone(),
database_name: self.database_name.clone(),
table_name: self.table_name.clone(),
catalog_type: Some(self.catalog_type.clone()),
catalog_type: self.catalog_type.clone(),
path: self.warehouse_path.clone(),
endpoint: Some(self.endpoint.clone()),
access_key: self.s3_access.clone(),
secret_key: self.s3_secret.clone(),
region: Some(self.region_name.clone()),
region: self.region.clone(),
..Default::default()
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ IcebergProperties:
fields:
- name: catalog.type
field_type: String
required: true
required: false
- name: s3.region
field_type: String
required: true
required: false
- name: s3.endpoint
field_type: String
required: false
Expand All @@ -56,9 +56,12 @@ IcebergProperties:
- name: warehouse.path
field_type: String
required: true
- name: catalog.name
field_type: String
required: false
- name: database.name
field_type: String
required: true
required: false
- name: table.name
field_type: String
required: true
Expand Down
Loading