Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): introduce file sink in PARQUET format #17311

Merged
merged 86 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
d8349c9
save work
wcy-fdu Mar 11, 2024
dea340a
save work, add gcs
wcy-fdu Mar 12, 2024
22fe512
implement sink writer
wcy-fdu Mar 12, 2024
0b47117
make clippy happy
wcy-fdu Mar 12, 2024
ad6f3a3
save work, add parquet writer
wcy-fdu Mar 13, 2024
08d05f0
minor
wcy-fdu Mar 13, 2024
f4618c1
add parquet writer, todo: add e2e test and comments
wcy-fdu Mar 14, 2024
31a8052
minor
wcy-fdu Mar 14, 2024
d1b61a9
fix typo
wcy-fdu Mar 14, 2024
f216379
add fs sink for test
wcy-fdu Mar 18, 2024
9b593c7
save work
wcy-fdu Jun 17, 2024
69d5052
save work
wcy-fdu Jun 17, 2024
0a72bdb
save work
wcy-fdu Jun 18, 2024
b200a3c
introduce file sink with parquet type
wcy-fdu Jun 18, 2024
7e18fe1
refactor
wcy-fdu Jun 19, 2024
da6a4dd
add fs sink for test
wcy-fdu Jun 19, 2024
ac785be
add comments
wcy-fdu Jun 20, 2024
a3c6449
minor for parquet change
wcy-fdu Jun 20, 2024
ac20951
todo: upgrade to opendal 0.47
wcy-fdu Jun 24, 2024
f555951
remove json encode, minor refactor
wcy-fdu Jul 9, 2024
b8b7479
resolve conflict
wcy-fdu Jul 9, 2024
4fccf52
fmt and clippy
wcy-fdu Jul 9, 2024
aeda674
minor
wcy-fdu Jul 10, 2024
e905920
minor
wcy-fdu Jul 11, 2024
ebbb6b0
rebase main
wcy-fdu Jul 11, 2024
53155ce
rebase main
wcy-fdu Jul 25, 2024
1011412
add ci test
wcy-fdu Jul 26, 2024
c39a1ab
refactor, code move
wcy-fdu Jul 26, 2024
f86aed6
mionor
wcy-fdu Jul 26, 2024
3042681
format python file
wcy-fdu Jul 29, 2024
0a061be
rebase main and resolve conflict
wcy-fdu Jul 29, 2024
3d47705
minor
wcy-fdu Jul 29, 2024
4d73654
fix duplicate table name in ci
wcy-fdu Jul 29, 2024
3f502dd
resolve comments
wcy-fdu Aug 2, 2024
2599256
fix local fs sink
wcy-fdu Aug 2, 2024
6f27352
remove java file sink
wcy-fdu Aug 2, 2024
530d8b5
bring back java file sink
wcy-fdu Aug 2, 2024
24b118c
fix ut
wcy-fdu Aug 2, 2024
a05dc03
minor
wcy-fdu Aug 6, 2024
6551523
update ut
wcy-fdu Aug 7, 2024
f79b71e
Merge branch 'main' into wcy/s3_sink
wcy-fdu Aug 7, 2024
b900776
make clippy happy
wcy-fdu Aug 7, 2024
663e568
try fix ci
wcy-fdu Aug 7, 2024
53aca7a
try fix ci
wcy-fdu Aug 7, 2024
b463b34
try fix ci
wcy-fdu Aug 9, 2024
60b387d
empty commit for retry
wcy-fdu Aug 12, 2024
8e87d08
rebase main and retry
wcy-fdu Aug 12, 2024
8f747a2
remove rust_min_stack in e2e sink test
wcy-fdu Aug 12, 2024
b5dd302
update connector-node java to 17 and may fix ci
wcy-fdu Aug 12, 2024
1891f3c
another try
wcy-fdu Aug 12, 2024
1b223ff
do not change pom.xml
wcy-fdu Aug 12, 2024
e3855b7
remove export RUST_MIN_STACK=4194304
wcy-fdu Aug 13, 2024
b168771
update RUST_MIN_STACK
wcy-fdu Aug 13, 2024
7024a8a
try increase JVM_HEAP_SIZE
wcy-fdu Aug 13, 2024
7dc0696
try increase thread stack
wcy-fdu Aug 13, 2024
358c8c4
revert JVM_HEAP_SIZE config
wcy-fdu Aug 14, 2024
c2f9d4b
Merge branch 'main' into wcy/s3_sink
wcy-fdu Aug 14, 2024
4bd3a6d
set JVM_HEAP_SIZE = 8G*0.07
wcy-fdu Aug 14, 2024
d0f87a7
Merge branch 'wcy/s3_sink' of https://github.com/risingwavelabs/risin…
wcy-fdu Aug 14, 2024
c1aae25
revert set JVM_HEAP_SIZE = 8G*0.07, remove file sink in macro
wcy-fdu Aug 14, 2024
0a4a6f1
clippy happier
wcy-fdu Aug 14, 2024
e8b1486
clippy happier
wcy-fdu Aug 14, 2024
76d9b65
revert remove file sink in macro, and set JVM_HEAP_SIZE = 5G
wcy-fdu Aug 14, 2024
171bd2c
revert remove file sink in macro, and set JVM_HEAP_SIZE = 5G
wcy-fdu Aug 14, 2024
a21b30e
only keep local fs sink
wcy-fdu Aug 14, 2024
a037b25
keep local fs and s3
wcy-fdu Aug 14, 2024
89f289e
keep gcs sink
wcy-fdu Aug 14, 2024
4510c48
keep s3 sink
wcy-fdu Aug 14, 2024
d32bd0b
keep s3, gcs sink
wcy-fdu Aug 15, 2024
1779470
random commit
wcy-fdu Aug 15, 2024
a5fa643
add clone for sync Properties
wcy-fdu Aug 15, 2024
a9214bf
add clone for sync Properties
wcy-fdu Aug 15, 2024
49ed345
enhacne sink Properties
wcy-fdu Aug 15, 2024
3d41044
minor
wcy-fdu Aug 15, 2024
fb9f13e
give another try
wcy-fdu Aug 15, 2024
67466d4
make clippy happy
wcy-fdu Aug 15, 2024
46031f2
fix cargo.toml
wcy-fdu Aug 15, 2024
ac9f869
use box for operator
wcy-fdu Aug 20, 2024
854c9ba
use box in dispatch_sink
wcy-fdu Aug 20, 2024
4edff17
try Box new operator
wcy-fdu Aug 21, 2024
a6b9151
try Box get_path
wcy-fdu Aug 21, 2024
223f7da
try again
wcy-fdu Aug 21, 2024
64100c0
boxed future
wenym1 Aug 21, 2024
774aa46
merge main
wcy-fdu Aug 21, 2024
fdd1472
Merge branch 'wcy/s3_sink' of https://github.com/risingwavelabs/risin…
wcy-fdu Aug 21, 2024
807c869
revert box in FileSink struct
wcy-fdu Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,9 @@ steps:
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3_v2 source check on parquet file"
- label: "S3_v2 source and sink on parquet file"
key: "s3-v2-source-check-parquet-file"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source.py"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source_and_sink.py"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,110 @@ def _assert_eq(field, got, expect):
_assert_eq('count(*)', result[0], total_rows)
_assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2)

print('Test pass')
print('File source test pass!')

cur.execute(f'drop table {_table()}')
cur.close()
conn.close()

def do_sink(config, file_num, item_num_per_file, prefix):
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

# Open a cursor to execute SQL statements
cur = conn.cursor()

def _table():
return 's3_test_parquet'

# Execute a SELECT statement
cur.execute(f'''CREATE sink test_file_sink as select
id,
name,
sex,
mark,
test_int,
test_real,
test_double_precision,
test_varchar,
test_bytea,
test_date,
test_time,
test_timestamp,
test_timestamptz
from {_table()} WITH (
connector = 's3',
match_pattern = '*.parquet',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
s3.path = '',
s3.file_type = 'parquet',
type = 'append-only',
force_append_only='true'
) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''')

print('Sink into s3...')
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE test_sink_table(
id bigint primary key,
name TEXT,
sex bigint,
mark bigint,
test_int int,
test_real real,
test_double_precision double precision,
test_varchar varchar,
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
) WITH (
connector = 's3_v2',
match_pattern = '*.parquet',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
) FORMAT PLAIN ENCODE PARQUET;''')

total_rows = file_num * item_num_per_file
MAX_RETRIES = 40
for retry_no in range(MAX_RETRIES):
cur.execute(f'select count(*) from test_sink_table')
result = cur.fetchone()
if result[0] == total_rows:
break
print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s")
sleep(10)

stmt = f'select count(*), sum(id) from test_sink_table'
print(f'Execute reading sink files: {stmt}')
cur.execute(stmt)
result = cur.fetchone()

print('Got:', result)

def _assert_eq(field, got, expect):
assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.'

_assert_eq('count(*)', result[0], total_rows)
_assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2)

print('File sink test pass!')
cur.execute(f'drop sink test_file_sink')
cur.execute(f'drop table test_sink_table')
cur.close()
conn.close()



if __name__ == "__main__":
FILE_NUM = 10
Expand Down Expand Up @@ -134,4 +232,11 @@ def _assert_eq(field, got, expect):

# clean up s3 files
for idx, _ in enumerate(data):
client.remove_object(config["S3_BUCKET"], _s3(idx))
client.remove_object(config["S3_BUCKET"], _s3(idx))

do_sink(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id)

# clean up s3 files
for idx, _ in enumerate(data):
client.remove_object(config["S3_BUCKET"], _s3(idx))

2 changes: 1 addition & 1 deletion risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1620,4 +1620,4 @@ template:

# If `user-managed` is true, user is responsible for starting the service
# to serve at the above address and port in any way they see fit.
user-managed: false
user-managed: false
11 changes: 4 additions & 7 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ pub enum SinkEncode {
Protobuf,
Avro,
Template,
Parquet,
Text,
}

Expand Down Expand Up @@ -202,6 +203,7 @@ impl SinkFormatDesc {
SinkEncode::Protobuf => E::Protobuf,
SinkEncode::Avro => E::Avro,
SinkEncode::Template => E::Template,
SinkEncode::Parquet => E::Parquet,
SinkEncode::Text => E::Text,
};

Expand Down Expand Up @@ -250,13 +252,8 @@ impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
E::Protobuf => SinkEncode::Protobuf,
E::Template => SinkEncode::Template,
E::Avro => SinkEncode::Avro,
e @ (E::Unspecified
| E::Native
| E::Csv
| E::Bytes
| E::None
| E::Text
| E::Parquet) => {
E::Parquet => SinkEncode::Parquet,
e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
return Err(SinkError::Config(anyhow!(
"sink encode unsupported: {}",
e.as_str_name()
Expand Down
92 changes: 92 additions & 0 deletions src/connector/src/sink/file_sink/fs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;

use anyhow::anyhow;
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Fs;
use opendal::Operator;
use serde::Deserialize;
use serde_with::serde_as;
use with_options::WithOptions;

use crate::sink::file_sink::opendal_sink::{FileSink, OpendalSinkBackend};
use crate::sink::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};

#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct FsCommon {
/// The directory where the sink file is located.
#[serde(rename = "fs.path")]
pub path: String,
}

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct FsConfig {
#[serde(flatten)]
pub common: FsCommon,

pub r#type: String, // accept "append-only"
}

pub const FS_SINK: &str = "fs";

impl<S: OpendalSinkBackend> FileSink<S> {
pub fn new_fs_sink(config: FsConfig) -> Result<Operator> {
// Create fs builder.
let mut builder = Fs::default();
// Create fs backend builder.
builder.root(&config.common.path);
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
let operator: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
.finish();
Ok(operator)
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FsSink;

impl OpendalSinkBackend for FsSink {
type Properties = FsConfig;

const SINK_NAME: &'static str = FS_SINK;

fn from_btreemap(hash_map: BTreeMap<String, String>) -> Result<Self::Properties> {
let config = serde_json::from_value::<FsConfig>(serde_json::to_value(hash_map).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;
if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
return Err(SinkError::Config(anyhow!(
"`{}` must be {}, or {}",
SINK_TYPE_OPTION,
SINK_TYPE_APPEND_ONLY,
SINK_TYPE_UPSERT
)));
}
Ok(config)
}

fn new_operator(properties: FsConfig) -> Result<Operator> {
FileSink::<FsSink>::new_fs_sink(properties)
}

fn get_path(properties: &Self::Properties) -> String {
(*properties.common.path).to_string()
}

fn get_engine_type() -> super::opendal_sink::EngineType {
super::opendal_sink::EngineType::Fs
}
}
109 changes: 109 additions & 0 deletions src/connector/src/sink/file_sink/gcs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;

use anyhow::anyhow;
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Gcs;
use opendal::Operator;
use serde::Deserialize;
use serde_with::serde_as;
use with_options::WithOptions;

use super::opendal_sink::FileSink;
use crate::sink::file_sink::opendal_sink::OpendalSinkBackend;
use crate::sink::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};

#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct GcsCommon {
#[serde(rename = "gcs.bucket_name")]
pub bucket_name: String,

/// The base64 encoded credential key. If not set, ADC will be used.
#[serde(rename = "gcs.credential")]
pub credential: String,

/// If credential/ADC is not set. The service account can be used to provide the credential info.
#[serde(rename = "gcs.service_account", default)]
pub service_account: String,

/// The directory where the sink file is located
#[serde(rename = "gcs.path")]
pub path: String,
}

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct GcsConfig {
#[serde(flatten)]
pub common: GcsCommon,

pub r#type: String, // accept "append-only"
}

pub const GCS_SINK: &str = "gcs";

impl<S: OpendalSinkBackend> FileSink<S> {
pub fn new_gcs_sink(config: GcsConfig) -> Result<Operator> {
// Create gcs builder.
let mut builder = Gcs::default();

builder.bucket(&config.common.bucket_name);

builder.credential(&config.common.credential);

builder.service_account(&config.common.service_account);

let operator: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
.finish();
Ok(operator)
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct GcsSink;

impl OpendalSinkBackend for GcsSink {
type Properties = GcsConfig;

const SINK_NAME: &'static str = GCS_SINK;

fn from_btreemap(hash_map: BTreeMap<String, String>) -> Result<Self::Properties> {
let config = serde_json::from_value::<GcsConfig>(serde_json::to_value(hash_map).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;
if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
return Err(SinkError::Config(anyhow!(
"`{}` must be {}, or {}",
SINK_TYPE_OPTION,
SINK_TYPE_APPEND_ONLY,
SINK_TYPE_UPSERT
)));
}
Ok(config)
}

fn new_operator(properties: GcsConfig) -> Result<Operator> {
FileSink::<GcsSink>::new_gcs_sink(properties)
}

fn get_path(properties: &Self::Properties) -> String {
(*properties.common.path).to_string()
}

fn get_engine_type() -> super::opendal_sink::EngineType {
super::opendal_sink::EngineType::Gcs
}
}
18 changes: 18 additions & 0 deletions src/connector/src/sink/file_sink/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod fs;
pub mod gcs;
pub mod opendal_sink;
pub mod s3;
Loading
Loading