Skip to content

Commit

Permalink
fix(iceberg): fix path style access (#18259)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Aug 28, 2024
1 parent 7998622 commit 04b9cb4
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 30 deletions.
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 10
timeout_in_minutes: 15
retry: *auto-retry

- label: "end-to-end iceberg sink v2 test"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
set streaming_parallelism=4;

Expand Down Expand Up @@ -33,7 +36,8 @@ CREATE SINK sink1 AS select * from mv1 WITH (
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin'
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1
);

statement ok
Expand All @@ -50,7 +54,7 @@ CREATE SINK sink2 AS select * from mv1 WITH (
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 5
commit_checkpoint_interval = 1
);

sleep 20s
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
set streaming_parallelism=4;

Expand Down Expand Up @@ -32,7 +35,8 @@ CREATE SINK s6 AS select * from mv6 WITH (
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin'
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
set streaming_parallelism=4;

Expand All @@ -21,7 +24,8 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3,
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
primary_key = 'v1'
primary_key = 'v1',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
set streaming_parallelism=4;

Expand Down Expand Up @@ -32,7 +35,8 @@ CREATE SINK s6 AS select * from mv6 WITH (
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin'
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
set streaming_parallelism=4;

Expand All @@ -21,7 +24,8 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3,
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
primary_key = 'v1'
primary_key = 'v1',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
set streaming_parallelism=4;

Expand Down Expand Up @@ -32,7 +35,8 @@ CREATE SINK s6 AS select * from mv6 WITH (
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin'
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
set streaming_parallelism=4;

Expand All @@ -21,7 +24,8 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3,
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
primary_key = 'v1'
primary_key = 'v1',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
6 changes: 5 additions & 1 deletion e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar);

Expand Down Expand Up @@ -27,7 +30,8 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH
catalog.name = 'demo',
catalog.type = 'storage',
database.name='demo_db',
table.name='e2e_demo_table'
table.name='e2e_demo_table',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
1 change: 1 addition & 0 deletions integration_tests/iceberg-sink2/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def init_risingwave_mv(docker):
sink_config = config['sink']
sink_param = ",\n".join([f"{k}='{v}'" for k, v in sink_config.items()])
sqls = [
"set sink_decouple = false",
"set streaming_parallelism = 4",
"""
CREATE SOURCE bid (
Expand Down
22 changes: 22 additions & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,28 @@ where
}
}

pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
deserializer: D,
) -> std::result::Result<Option<bool>, D::Error>
where
D: de::Deserializer<'de>,
{
let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
if let Some(s) = s {
let s = s.to_ascii_lowercase();
match s.as_str() {
"true" => Ok(Some(true)),
"false" => Ok(Some(false)),
_ => Err(de::Error::invalid_value(
de::Unexpected::Str(&s),
&"true or false",
)),
}
} else {
Ok(None)
}
}

pub(crate) fn deserialize_duration_from_string<'de, D>(
deserializer: D,
) -> Result<Duration, D::Error>
Expand Down
50 changes: 30 additions & 20 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ use crate::error::ConnectorResult;
use crate::sink::coordinate::CoordinatedSinkWriter;
use crate::sink::writer::SinkWriter;
use crate::sink::{Result, SinkCommitCoordinator, SinkDecouple, SinkParam};
use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
use crate::{
deserialize_bool_from_string, deserialize_optional_bool_from_string,
deserialize_optional_string_seq_from_string,
};

/// This iceberg sink is WIP. When it ready, we will change this name to "iceberg".
pub const ICEBERG_SINK: &str = "iceberg";
Expand Down Expand Up @@ -129,9 +132,9 @@ pub struct IcebergConfig {
#[serde(
rename = "s3.path.style.access",
default,
deserialize_with = "deserialize_bool_from_string"
deserialize_with = "deserialize_optional_bool_from_string"
)]
pub path_style_access: bool,
pub path_style_access: Option<bool>,

#[serde(
rename = "primary_key",
Expand Down Expand Up @@ -281,10 +284,12 @@ impl IcebergConfig {
"iceberg.table.io.secret_access_key".to_string(),
self.secret_key.clone().to_string(),
);
iceberg_configs.insert(
"iceberg.table.io.enable_virtual_host_style".to_string(),
(!self.path_style_access).to_string(),
);
if let Some(path_style_access) = self.path_style_access {
iceberg_configs.insert(
"iceberg.table.io.enable_virtual_host_style".to_string(),
(!path_style_access).to_string(),
);
}

let (bucket, root) = {
let url = Url::parse(&self.path).map_err(|e| SinkError::Iceberg(anyhow!(e)))?;
Expand Down Expand Up @@ -424,10 +429,13 @@ impl IcebergConfig {
"s3.secret-access-key".to_string(),
self.secret_key.clone().to_string(),
);
java_catalog_configs.insert(
"s3.path-style-access".to_string(),
self.path_style_access.to_string(),
);

if let Some(path_style_access) = self.path_style_access {
java_catalog_configs.insert(
"s3.path-style-access".to_string(),
path_style_access.to_string(),
);
}
if matches!(self.catalog_type.as_deref(), Some("glue")) {
java_catalog_configs.insert(
"client.credentials-provider".to_string(),
Expand Down Expand Up @@ -756,18 +764,20 @@ impl Sink for IcebergSink {

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let commit_checkpoint_interval =
if let Some(interval) = desc.properties.get("commit_checkpoint_interval") {
interval
.parse::<u64>()
.unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL)
} else {
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
};
desc.properties
.get("commit_checkpoint_interval")
.map(|interval| {
interval
.parse::<u64>()
.unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL)
});

match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => {
if commit_checkpoint_interval > 1 {
if let Some(commit_checkpoint_interval) = commit_checkpoint_interval
&& commit_checkpoint_interval > 1
{
return Err(SinkError::Config(anyhow!(
"config conflict: Iceberg config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
Expand Down Expand Up @@ -1377,7 +1387,7 @@ mod test {
endpoint: Some("http://127.0.0.1:9301".to_string()),
access_key: "hummockadmin".to_string(),
secret_key: "hummockadmin".to_string(),
path_style_access: true,
path_style_access: Some(true),
primary_key: Some(vec!["v1".to_string()]),
java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
.into_iter()
Expand Down

0 comments on commit 04b9cb4

Please sign in to comment.