Skip to content

Commit

Permalink
feat(sink): introduce file sink in PARQUET format (#17311)
Browse files Browse the repository at this point in the history
Co-authored-by: William Wen <[email protected]>
  • Loading branch information
2 people authored and wcy-fdu committed Aug 22, 2024
1 parent c0cf385 commit f2701fb
Show file tree
Hide file tree
Showing 19 changed files with 943 additions and 25 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ source ci/scripts/common.sh

# prepare environment
export CONNECTOR_LIBS_PATH="./connector-node/libs"

while getopts 'p:' opt; do
case ${opt} in
p )
Expand Down Expand Up @@ -65,6 +64,7 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/sink_into_table/*.slt'
sleep 1

echo "--- testing remote sinks"

# check sink destination postgres
sqllogictest -p 4566 -d dev './e2e_test/sink/remote/jdbc.load.slt'
sleep 1
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ risedev ci-kill
export RISINGWAVE_CI=true

echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source"
export RUST_MIN_STACK=4194304
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-kafka
./scripts/source/prepare_ci_kafka.sh
Expand Down
4 changes: 2 additions & 2 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,9 @@ steps:
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3 source check on parquet file"
- label: "S3 source and sink on parquet file"
key: "s3-v2-source-check-parquet-file"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source.py"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source_and_sink.py"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,110 @@ def _assert_eq(field, got, expect):
_assert_eq('count(*)', result[0], total_rows)
_assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2)

print('Test pass')
print('File source test pass!')

cur.execute(f'drop table {_table()}')
cur.close()
conn.close()

def do_sink(config, file_num, item_num_per_file, prefix):
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

# Open a cursor to execute SQL statements
cur = conn.cursor()

def _table():
return 's3_test_parquet'

# Execute a SELECT statement
cur.execute(f'''CREATE sink test_file_sink as select
id,
name,
sex,
mark,
test_int,
test_real,
test_double_precision,
test_varchar,
test_bytea,
test_date,
test_time,
test_timestamp,
test_timestamptz
from {_table()} WITH (
connector = 's3',
match_pattern = '*.parquet',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
s3.path = '',
s3.file_type = 'parquet',
type = 'append-only',
force_append_only='true'
) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''')

print('Sink into s3...')
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE test_sink_table(
id bigint primary key,
name TEXT,
sex bigint,
mark bigint,
test_int int,
test_real real,
test_double_precision double precision,
test_varchar varchar,
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
) WITH (
connector = 's3_v2',
match_pattern = '*.parquet',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
) FORMAT PLAIN ENCODE PARQUET;''')

total_rows = file_num * item_num_per_file
MAX_RETRIES = 40
for retry_no in range(MAX_RETRIES):
cur.execute(f'select count(*) from test_sink_table')
result = cur.fetchone()
if result[0] == total_rows:
break
print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s")
sleep(10)

stmt = f'select count(*), sum(id) from test_sink_table'
print(f'Execute reading sink files: {stmt}')
cur.execute(stmt)
result = cur.fetchone()

print('Got:', result)

def _assert_eq(field, got, expect):
assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.'

_assert_eq('count(*)', result[0], total_rows)
_assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2)

print('File sink test pass!')
cur.execute(f'drop sink test_file_sink')
cur.execute(f'drop table test_sink_table')
cur.close()
conn.close()



if __name__ == "__main__":
FILE_NUM = 10
Expand Down Expand Up @@ -134,4 +232,11 @@ def _assert_eq(field, got, expect):

# clean up s3 files
for idx, _ in enumerate(data):
client.remove_object(config["S3_BUCKET"], _s3(idx))
client.remove_object(config["S3_BUCKET"], _s3(idx))

do_sink(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id)

# clean up s3 files
for idx, _ in enumerate(data):
client.remove_object(config["S3_BUCKET"], _s3(idx))

2 changes: 1 addition & 1 deletion src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ async fn main() {
println!("Start Sink Bench!, Wait {:?}s", BENCH_TIME);
tokio::spawn(async move {
dispatch_sink!(sink, sink, {
consume_log_stream(sink, mock_range_log_reader, sink_writer_param).boxed()
consume_log_stream(*sink, mock_range_log_reader, sink_writer_param).boxed()
})
.await
.unwrap();
Expand Down
11 changes: 4 additions & 7 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ pub enum SinkEncode {
Protobuf,
Avro,
Template,
Parquet,
Text,
}

Expand Down Expand Up @@ -202,6 +203,7 @@ impl SinkFormatDesc {
SinkEncode::Protobuf => E::Protobuf,
SinkEncode::Avro => E::Avro,
SinkEncode::Template => E::Template,
SinkEncode::Parquet => E::Parquet,
SinkEncode::Text => E::Text,
};

Expand Down Expand Up @@ -250,13 +252,8 @@ impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
E::Protobuf => SinkEncode::Protobuf,
E::Template => SinkEncode::Template,
E::Avro => SinkEncode::Avro,
e @ (E::Unspecified
| E::Native
| E::Csv
| E::Bytes
| E::None
| E::Text
| E::Parquet) => {
E::Parquet => SinkEncode::Parquet,
e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
return Err(SinkError::Config(anyhow!(
"sink encode unsupported: {}",
e.as_str_name()
Expand Down
102 changes: 102 additions & 0 deletions src/connector/src/sink/file_sink/fs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap};

use anyhow::anyhow;
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Fs;
use opendal::Operator;
use serde::Deserialize;
use serde_with::serde_as;
use with_options::WithOptions;

use crate::sink::file_sink::opendal_sink::{FileSink, OpendalSinkBackend};
use crate::sink::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use crate::source::UnknownFields;

#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct FsCommon {
/// The directory where the sink file is located.
#[serde(rename = "fs.path")]
pub path: String,
}

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct FsConfig {
#[serde(flatten)]
pub common: FsCommon,

pub r#type: String, // accept "append-only"

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}

impl UnknownFields for FsConfig {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}

pub const FS_SINK: &str = "fs";

impl<S: OpendalSinkBackend> FileSink<S> {
pub fn new_fs_sink(config: FsConfig) -> Result<Operator> {
// Create fs builder.
let mut builder = Fs::default();
// Create fs backend builder.
builder.root(&config.common.path);
let operator: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
.finish();
Ok(operator)
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FsSink;

impl OpendalSinkBackend for FsSink {
type Properties = FsConfig;

const SINK_NAME: &'static str = FS_SINK;

fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties> {
let config = serde_json::from_value::<FsConfig>(serde_json::to_value(btree_map).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;
if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
return Err(SinkError::Config(anyhow!(
"`{}` must be {}, or {}",
SINK_TYPE_OPTION,
SINK_TYPE_APPEND_ONLY,
SINK_TYPE_UPSERT
)));
}
Ok(config)
}

fn new_operator(properties: FsConfig) -> Result<Operator> {
FileSink::<FsSink>::new_fs_sink(properties)
}

fn get_path(properties: Self::Properties) -> String {
properties.common.path
}

fn get_engine_type() -> super::opendal_sink::EngineType {
super::opendal_sink::EngineType::Fs
}
}
Loading

0 comments on commit f2701fb

Please sign in to comment.