Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): introduce file sink in PARQUET format (#17311) #18170

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ source ci/scripts/common.sh

# prepare environment
export CONNECTOR_LIBS_PATH="./connector-node/libs"

while getopts 'p:' opt; do
case ${opt} in
p )
Expand Down Expand Up @@ -65,6 +64,7 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/sink_into_table/*.slt'
sleep 1

echo "--- testing remote sinks"

# check sink destination postgres
sqllogictest -p 4566 -d dev './e2e_test/sink/remote/jdbc.load.slt'
sleep 1
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ risedev ci-kill
export RISINGWAVE_CI=true

echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source"
export RUST_MIN_STACK=4194304
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-kafka
./scripts/source/prepare_ci_kafka.sh
Expand Down
4 changes: 2 additions & 2 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,9 @@ steps:
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3 source check on parquet file"
- label: "S3 source and sink on parquet file"
key: "s3-v2-source-check-parquet-file"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source.py"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source_and_sink.py"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,110 @@ def _assert_eq(field, got, expect):
_assert_eq('count(*)', result[0], total_rows)
_assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2)

print('Test pass')
print('File source test pass!')

cur.execute(f'drop table {_table()}')
cur.close()
conn.close()

def do_sink(config, file_num, item_num_per_file, prefix):
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

# Open a cursor to execute SQL statements
cur = conn.cursor()

def _table():
return 's3_test_parquet'

# Execute a SELECT statement
cur.execute(f'''CREATE sink test_file_sink as select
id,
name,
sex,
mark,
test_int,
test_real,
test_double_precision,
test_varchar,
test_bytea,
test_date,
test_time,
test_timestamp,
test_timestamptz
from {_table()} WITH (
connector = 's3',
match_pattern = '*.parquet',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
s3.path = '',
s3.file_type = 'parquet',
type = 'append-only',
force_append_only='true'
) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''')

print('Sink into s3...')
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE test_sink_table(
id bigint primary key,
name TEXT,
sex bigint,
mark bigint,
test_int int,
test_real real,
test_double_precision double precision,
test_varchar varchar,
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
) WITH (
connector = 's3_v2',
match_pattern = '*.parquet',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
) FORMAT PLAIN ENCODE PARQUET;''')

total_rows = file_num * item_num_per_file
MAX_RETRIES = 40
for retry_no in range(MAX_RETRIES):
cur.execute(f'select count(*) from test_sink_table')
result = cur.fetchone()
if result[0] == total_rows:
break
print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s")
sleep(10)

stmt = f'select count(*), sum(id) from test_sink_table'
print(f'Execute reading sink files: {stmt}')
cur.execute(stmt)
result = cur.fetchone()

print('Got:', result)

def _assert_eq(field, got, expect):
assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.'

_assert_eq('count(*)', result[0], total_rows)
_assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2)

print('File sink test pass!')
cur.execute(f'drop sink test_file_sink')
cur.execute(f'drop table test_sink_table')
cur.close()
conn.close()



if __name__ == "__main__":
FILE_NUM = 10
Expand Down Expand Up @@ -134,4 +232,11 @@ def _assert_eq(field, got, expect):

# clean up s3 files
for idx, _ in enumerate(data):
client.remove_object(config["S3_BUCKET"], _s3(idx))
client.remove_object(config["S3_BUCKET"], _s3(idx))

do_sink(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id)

# clean up s3 files
for idx, _ in enumerate(data):
client.remove_object(config["S3_BUCKET"], _s3(idx))

2 changes: 1 addition & 1 deletion src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ async fn main() {
println!("Start Sink Bench!, Wait {:?}s", BENCH_TIME);
tokio::spawn(async move {
dispatch_sink!(sink, sink, {
consume_log_stream(sink, mock_range_log_reader, sink_writer_param).boxed()
consume_log_stream(*sink, mock_range_log_reader, sink_writer_param).boxed()
})
.await
.unwrap();
Expand Down
11 changes: 4 additions & 7 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ pub enum SinkEncode {
Protobuf,
Avro,
Template,
Parquet,
Text,
}

Expand Down Expand Up @@ -202,6 +203,7 @@ impl SinkFormatDesc {
SinkEncode::Protobuf => E::Protobuf,
SinkEncode::Avro => E::Avro,
SinkEncode::Template => E::Template,
SinkEncode::Parquet => E::Parquet,
SinkEncode::Text => E::Text,
};

Expand Down Expand Up @@ -250,13 +252,8 @@ impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
E::Protobuf => SinkEncode::Protobuf,
E::Template => SinkEncode::Template,
E::Avro => SinkEncode::Avro,
e @ (E::Unspecified
| E::Native
| E::Csv
| E::Bytes
| E::None
| E::Text
| E::Parquet) => {
E::Parquet => SinkEncode::Parquet,
e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
return Err(SinkError::Config(anyhow!(
"sink encode unsupported: {}",
e.as_str_name()
Expand Down
102 changes: 102 additions & 0 deletions src/connector/src/sink/file_sink/fs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap};

use anyhow::anyhow;
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Fs;
use opendal::Operator;
use serde::Deserialize;
use serde_with::serde_as;
use with_options::WithOptions;

use crate::sink::file_sink::opendal_sink::{FileSink, OpendalSinkBackend};
use crate::sink::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use crate::source::UnknownFields;

#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct FsCommon {
/// The directory where the sink file is located.
#[serde(rename = "fs.path")]
pub path: String,
}

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct FsConfig {
#[serde(flatten)]
pub common: FsCommon,

pub r#type: String, // accept "append-only"

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}

impl UnknownFields for FsConfig {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}

pub const FS_SINK: &str = "fs";

impl<S: OpendalSinkBackend> FileSink<S> {
pub fn new_fs_sink(config: FsConfig) -> Result<Operator> {
// Create fs builder.
let mut builder = Fs::default();
// Create fs backend builder.
builder.root(&config.common.path);
let operator: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
.finish();
Ok(operator)
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FsSink;

impl OpendalSinkBackend for FsSink {
type Properties = FsConfig;

const SINK_NAME: &'static str = FS_SINK;

fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties> {
let config = serde_json::from_value::<FsConfig>(serde_json::to_value(btree_map).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;
if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
return Err(SinkError::Config(anyhow!(
"`{}` must be {}, or {}",
SINK_TYPE_OPTION,
SINK_TYPE_APPEND_ONLY,
SINK_TYPE_UPSERT
)));
}
Ok(config)
}

fn new_operator(properties: FsConfig) -> Result<Operator> {
FileSink::<FsSink>::new_fs_sink(properties)
}

fn get_path(properties: Self::Properties) -> String {
properties.common.path
}

fn get_engine_type() -> super::opendal_sink::EngineType {
super::opendal_sink::EngineType::Fs
}
}
Loading
Loading