Skip to content

Commit

Permalink
refactor(sink): Use s3 sink to replace the original snowflake backend…
Browse files Browse the repository at this point in the history
… implementation (#18996)
  • Loading branch information
wcy-fdu authored Nov 7, 2024
1 parent 218e63e commit e6e1ca9
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 485 deletions.
14 changes: 6 additions & 8 deletions e2e_test/s3/file_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ def _table():
s3.credentials.secret = 'hummockadmin',
s3.endpoint_url = 'http://hummock001.127.0.0.1:9301',
s3.path = 'test_parquet_sink/',
s3.file_type = 'parquet',
type = 'append-only',
force_append_only='true'
) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''')
Expand Down Expand Up @@ -185,6 +184,7 @@ def _table():
stmt = f'select count(*), sum(id) from test_parquet_sink_table'
print(f'Execute reading sink files: {stmt}')

print(f'Create snowflake s3 sink ')
# Execute a SELECT statement
cur.execute(f'''CREATE sink test_file_sink_json as select
id,
Expand All @@ -201,15 +201,14 @@ def _table():
test_timestamp,
test_timestamptz
from {_table()} WITH (
connector = 's3',
connector = 'snowflake',
match_pattern = '*.parquet',
s3.region_name = 'custom',
s3.bucket_name = 'hummock001',
s3.credentials.access = 'hummockadmin',
s3.credentials.secret = 'hummockadmin',
snowflake.aws_region = 'custom',
snowflake.s3_bucket = 'hummock001',
snowflake.aws_access_key_id = 'hummockadmin',
snowflake.aws_secret_access_key = 'hummockadmin',
s3.endpoint_url = 'http://hummock001.127.0.0.1:9301',
s3.path = 'test_json_sink/',
s3.file_type = 'json',
type = 'append-only',
force_append_only='true'
) FORMAT PLAIN ENCODE JSON(force_append_only='true');''')
Expand Down Expand Up @@ -297,7 +296,6 @@ def test_file_sink_batching():
s3.credentials.secret = 'hummockadmin',
s3.endpoint_url = 'http://hummock001.127.0.0.1:9301',
s3.path = 'test_file_sink_batching/',
s3.file_type = 'parquet',
type = 'append-only',
rollover_seconds = 5,
max_row_count = 5,
Expand Down
1 change: 0 additions & 1 deletion e2e_test/sink/append_only_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ WITH
s3.region_name = 'test',
s3.bucket_name = 'test',
s3.path = '',
s3.file_type = 'parquet',
type = 'append-only',
force_append_only='true'
) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');
Expand Down
1 change: 0 additions & 1 deletion e2e_test/sink/license.slt
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ WITH
s3.region_name = 'us-east-1',
s3.bucket_name = 'test',
s3.path = '',
s3.file_type = 'parquet',
type = 'append-only',
force_append_only='true'
) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');
Expand Down
12 changes: 12 additions & 0 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,18 @@ impl SinkFormatDesc {
secret_refs: self.secret_refs.clone(),
}
}

// This function is for compatibility purposes. It sets the `SinkFormatDesc`
// when there is no configuration provided for the snowflake sink only.
pub fn plain_json_for_snowflake_only() -> Self {
Self {
format: SinkFormat::AppendOnly,
encode: SinkEncode::Json,
options: Default::default(),
secret_refs: Default::default(),
key_encode: None,
}
}
}

impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
Expand Down
46 changes: 25 additions & 21 deletions src/connector/src/sink/file_sink/opendal_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub enum EngineType {
Fs,
Azblob,
Webhdfs,
Snowflake,
}

impl<S: OpendalSinkBackend> Sink for FileSink<S> {
Expand Down Expand Up @@ -169,16 +170,23 @@ impl<S: OpendalSinkBackend> TryFrom<SinkParam> for FileSink<S> {
let op = S::new_operator(config.clone())?;
let batching_strategy = S::get_batching_strategy(config.clone());
let engine_type = S::get_engine_type();

let format_desc = match param.format_desc {
Some(desc) => desc,
None => {
if let EngineType::Snowflake = engine_type {
SinkFormatDesc::plain_json_for_snowflake_only()
} else {
return Err(SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")));
}
}
};
Ok(Self {
op,
path,
schema,
is_append_only: param.sink_type.is_append_only(),
batching_strategy,
format_desc: param
.format_desc
.ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
format_desc,
engine_type,
_marker: PhantomData,
})
Expand Down Expand Up @@ -390,28 +398,24 @@ impl OpenDalSinkWriter {
// 1. A subdirectory is defined based on `path_partition_prefix` (e.g., by day、hour or month or none.).
// 2. The file name includes the `executor_id` and the creation time in seconds since the UNIX epoch.
// If the engine type is `Fs`, the path is automatically handled, and the filename does not include a path prefix.
let object_name = match self.engine_type {
// For the local fs sink, the data will be automatically written to the defined path.
// Therefore, there is no need to specify the path in the file name.
EngineType::Fs => {
format!(
"{}{}_{}.{}",
self.path_partition_prefix(&create_time),
self.executor_id,
create_time.as_secs(),
suffix
)
}
_ => format!(
"{}/{}{}_{}.{}",
self.write_path,
// 3. For the Snowflake Sink, the `write_path` parameter can be empty.
// When the `write_path` is not specified, the data will be written to the root of the specified bucket.
let object_name = {
let base_path = match self.engine_type {
EngineType::Fs => "".to_string(),
EngineType::Snowflake if self.write_path.is_empty() => "".to_string(),
_ => format!("{}/", self.write_path),
};

format!(
"{}{}{}_{}.{}",
base_path,
self.path_partition_prefix(&create_time),
self.executor_id,
create_time.as_secs(),
suffix,
),
)
};

Ok(self
.operator
.writer_with(&object_name)
Expand Down
67 changes: 60 additions & 7 deletions src/connector/src/sink/file_sink/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,24 @@ use crate::sink::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SI
use crate::source::UnknownFields;
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct S3Common {
#[serde(rename = "s3.region_name")]
#[serde(rename = "s3.region_name", alias = "snowflake.aws_region")]
pub region_name: String,
#[serde(rename = "s3.bucket_name")]
#[serde(rename = "s3.bucket_name", alias = "snowflake.s3_bucket")]
pub bucket_name: String,
/// The directory where the sink file is located.
#[serde(rename = "s3.path")]
pub path: String,
#[serde(rename = "s3.credentials.access", default)]
#[serde(rename = "s3.path", alias = "snowflake.s3_path", default)]
pub path: Option<String>,
#[serde(
rename = "s3.credentials.access",
alias = "snowflake.aws_access_key_id",
default
)]
pub access: Option<String>,
#[serde(rename = "s3.credentials.secret", default)]
#[serde(
rename = "s3.credentials.secret",
alias = "snowflake.aws_secret_access_key",
default
)]
pub secret: Option<String>,
#[serde(rename = "s3.endpoint_url")]
pub endpoint_url: Option<String>,
Expand Down Expand Up @@ -136,7 +144,7 @@ impl OpendalSinkBackend for S3Sink {
}

fn get_path(properties: Self::Properties) -> String {
properties.common.path
properties.common.path.unwrap_or_default()
}

fn get_engine_type() -> super::opendal_sink::EngineType {
Expand All @@ -151,3 +159,48 @@ impl OpendalSinkBackend for S3Sink {
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SnowflakeSink;

pub const SNOWFLAKE_SINK: &str = "snowflake";

impl OpendalSinkBackend for SnowflakeSink {
type Properties = S3Config;

const SINK_NAME: &'static str = SNOWFLAKE_SINK;

fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties> {
let config = serde_json::from_value::<S3Config>(serde_json::to_value(btree_map).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;
if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
return Err(SinkError::Config(anyhow!(
"`{}` must be {}, or {}",
SINK_TYPE_OPTION,
SINK_TYPE_APPEND_ONLY,
SINK_TYPE_UPSERT
)));
}
Ok(config)
}

fn new_operator(properties: S3Config) -> Result<Operator> {
FileSink::<SnowflakeSink>::new_s3_sink(properties)
}

fn get_path(properties: Self::Properties) -> String {
properties.common.path.unwrap_or_default()
}

fn get_engine_type() -> super::opendal_sink::EngineType {
super::opendal_sink::EngineType::Snowflake
}

fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy {
BatchingStrategy {
max_row_count: properties.batching_strategy.max_row_count,
rollover_seconds: properties.batching_strategy.rollover_seconds,
path_partition_prefix: properties.batching_strategy.path_partition_prefix,
}
}
}
9 changes: 1 addition & 8 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ pub mod nats;
pub mod pulsar;
pub mod redis;
pub mod remote;
pub mod snowflake;
pub mod sqlserver;
pub mod starrocks;
pub mod test_sink;
Expand Down Expand Up @@ -128,7 +127,7 @@ macro_rules! for_all_sinks {
{ Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>},

{ Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink> },
{ Snowflake, $crate::sink::snowflake::SnowflakeSink },
{ Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>},
{ DeltaLake, $crate::sink::deltalake::DeltaLakeSink },
{ BigQuery, $crate::sink::big_query::BigQuerySink },
{ DynamoDb, $crate::sink::dynamodb::DynamoDbSink },
Expand Down Expand Up @@ -837,12 +836,6 @@ pub enum SinkError {
Starrocks(String),
#[error("File error: {0}")]
File(String),
#[error("Snowflake error: {0}")]
Snowflake(
#[source]
#[backtrace]
anyhow::Error,
),
#[error("Pulsar error: {0}")]
Pulsar(
#[source]
Expand Down
Loading

0 comments on commit e6e1ca9

Please sign in to comment.