Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): introduce file sink in PARQUET format #17311

Merged
merged 86 commits into from
Aug 22, 2024
Merged
Changes from 1 commit
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
d8349c9
save work
wcy-fdu Mar 11, 2024
dea340a
save work, add gcs
wcy-fdu Mar 12, 2024
22fe512
implement sink writer
wcy-fdu Mar 12, 2024
0b47117
make clippy happy
wcy-fdu Mar 12, 2024
ad6f3a3
save work, add parquet writer
wcy-fdu Mar 13, 2024
08d05f0
minor
wcy-fdu Mar 13, 2024
f4618c1
add parquet writer, todo: add e2e test and comments
wcy-fdu Mar 14, 2024
31a8052
minor
wcy-fdu Mar 14, 2024
d1b61a9
fix typo
wcy-fdu Mar 14, 2024
f216379
add fs sink for test
wcy-fdu Mar 18, 2024
9b593c7
save work
wcy-fdu Jun 17, 2024
69d5052
save work
wcy-fdu Jun 17, 2024
0a72bdb
save work
wcy-fdu Jun 18, 2024
b200a3c
introduce file sink with parquet type
wcy-fdu Jun 18, 2024
7e18fe1
refactor
wcy-fdu Jun 19, 2024
da6a4dd
add fs sink for test
wcy-fdu Jun 19, 2024
ac785be
add comments
wcy-fdu Jun 20, 2024
a3c6449
minor for parquet change
wcy-fdu Jun 20, 2024
ac20951
todo: upgrade to opendal 0.47
wcy-fdu Jun 24, 2024
f555951
remove json encode, minor refactor
wcy-fdu Jul 9, 2024
b8b7479
resolve conflict
wcy-fdu Jul 9, 2024
4fccf52
fmt and clippy
wcy-fdu Jul 9, 2024
aeda674
minor
wcy-fdu Jul 10, 2024
e905920
minor
wcy-fdu Jul 11, 2024
ebbb6b0
rebase main
wcy-fdu Jul 11, 2024
53155ce
rebase main
wcy-fdu Jul 25, 2024
1011412
add ci test
wcy-fdu Jul 26, 2024
c39a1ab
refactor, code move
wcy-fdu Jul 26, 2024
f86aed6
mionor
wcy-fdu Jul 26, 2024
3042681
format python file
wcy-fdu Jul 29, 2024
0a061be
rebase main and resolve conflict
wcy-fdu Jul 29, 2024
3d47705
minor
wcy-fdu Jul 29, 2024
4d73654
fix duplicate table name in ci
wcy-fdu Jul 29, 2024
3f502dd
resolve comments
wcy-fdu Aug 2, 2024
2599256
fix local fs sink
wcy-fdu Aug 2, 2024
6f27352
remove java file sink
wcy-fdu Aug 2, 2024
530d8b5
bring back java file sink
wcy-fdu Aug 2, 2024
24b118c
fix ut
wcy-fdu Aug 2, 2024
a05dc03
minor
wcy-fdu Aug 6, 2024
6551523
update ut
wcy-fdu Aug 7, 2024
f79b71e
Merge branch 'main' into wcy/s3_sink
wcy-fdu Aug 7, 2024
b900776
make clippy happy
wcy-fdu Aug 7, 2024
663e568
try fix ci
wcy-fdu Aug 7, 2024
53aca7a
try fix ci
wcy-fdu Aug 7, 2024
b463b34
try fix ci
wcy-fdu Aug 9, 2024
60b387d
empty commit for retry
wcy-fdu Aug 12, 2024
8e87d08
rebase main and retry
wcy-fdu Aug 12, 2024
8f747a2
remove rust_min_stack in e2e sink test
wcy-fdu Aug 12, 2024
b5dd302
update connector-node java to 17 and may fix ci
wcy-fdu Aug 12, 2024
1891f3c
another try
wcy-fdu Aug 12, 2024
1b223ff
do not change pom.xml
wcy-fdu Aug 12, 2024
e3855b7
remove export RUST_MIN_STACK=4194304
wcy-fdu Aug 13, 2024
b168771
update RUST_MIN_STACK
wcy-fdu Aug 13, 2024
7024a8a
try increase JVM_HEAP_SIZE
wcy-fdu Aug 13, 2024
7dc0696
try increase thread stack
wcy-fdu Aug 13, 2024
358c8c4
revert JVM_HEAP_SIZE config
wcy-fdu Aug 14, 2024
c2f9d4b
Merge branch 'main' into wcy/s3_sink
wcy-fdu Aug 14, 2024
4bd3a6d
set JVM_HEAP_SIZE = 8G*0.07
wcy-fdu Aug 14, 2024
d0f87a7
Merge branch 'wcy/s3_sink' of https://github.com/risingwavelabs/risin…
wcy-fdu Aug 14, 2024
c1aae25
revert set JVM_HEAP_SIZE = 8G*0.07, remove file sink in macro
wcy-fdu Aug 14, 2024
0a4a6f1
clippy happier
wcy-fdu Aug 14, 2024
e8b1486
clippy happier
wcy-fdu Aug 14, 2024
76d9b65
revert remove file sink in macro, and set JVM_HEAP_SIZE = 5G
wcy-fdu Aug 14, 2024
171bd2c
revert remove file sink in macro, and set JVM_HEAP_SIZE = 5G
wcy-fdu Aug 14, 2024
a21b30e
only keep local fs sink
wcy-fdu Aug 14, 2024
a037b25
keep local fs and s3
wcy-fdu Aug 14, 2024
89f289e
keep gcs sink
wcy-fdu Aug 14, 2024
4510c48
keep s3 sink
wcy-fdu Aug 14, 2024
d32bd0b
keep s3, gcs sink
wcy-fdu Aug 15, 2024
1779470
random commit
wcy-fdu Aug 15, 2024
a5fa643
add clone for sync Properties
wcy-fdu Aug 15, 2024
a9214bf
add clone for sync Properties
wcy-fdu Aug 15, 2024
49ed345
enhacne sink Properties
wcy-fdu Aug 15, 2024
3d41044
minor
wcy-fdu Aug 15, 2024
fb9f13e
give another try
wcy-fdu Aug 15, 2024
67466d4
make clippy happy
wcy-fdu Aug 15, 2024
46031f2
fix cargo.toml
wcy-fdu Aug 15, 2024
ac9f869
use box for operator
wcy-fdu Aug 20, 2024
854c9ba
use box in dispatch_sink
wcy-fdu Aug 20, 2024
4edff17
try Box new operator
wcy-fdu Aug 21, 2024
a6b9151
try Box get_path
wcy-fdu Aug 21, 2024
223f7da
try again
wcy-fdu Aug 21, 2024
64100c0
boxed future
wenym1 Aug 21, 2024
774aa46
merge main
wcy-fdu Aug 21, 2024
fdd1472
Merge branch 'wcy/s3_sink' of https://github.com/risingwavelabs/risin…
wcy-fdu Aug 21, 2024
807c869
revert box in FileSink struct
wcy-fdu Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
introduce file sink with parquet type
wcy-fdu committed Jun 18, 2024
commit b200a3c910f285574c87551fa4eef69156cab05f
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
@@ -147,6 +147,7 @@ enum EncodeType {
ENCODE_TYPE_BYTES = 6;
ENCODE_TYPE_TEMPLATE = 7;
ENCODE_TYPE_NONE = 8;
ENCODE_TYPE_PARQUET= 10;
}

enum RowFormatType {
3 changes: 3 additions & 0 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
@@ -134,6 +134,7 @@ pub enum SinkEncode {
Protobuf,
Avro,
Template,
Parquet,
}

impl SinkFormatDesc {
@@ -180,6 +181,7 @@ impl SinkFormatDesc {
SinkEncode::Protobuf => E::Protobuf,
SinkEncode::Avro => E::Avro,
SinkEncode::Template => E::Template,
SinkEncode::Parquet => E::Parquet,
};
let options = self
.options
@@ -222,6 +224,7 @@ impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
E::Protobuf => SinkEncode::Protobuf,
E::Template => SinkEncode::Template,
E::Avro => SinkEncode::Avro,
E::Parquet => SinkEncode::Parquet,
e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None) => {
return Err(SinkError::Config(anyhow!(
"sink encode unsupported: {}",
2 changes: 2 additions & 0 deletions src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
@@ -22,11 +22,13 @@ use crate::sink::Result;

mod avro;
mod json;
// mod parquet;
mod proto;
pub mod template;

pub use avro::{AvroEncoder, AvroHeader};
pub use json::JsonEncoder;
// pub use parquet::ParquetEncoder;
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
pub use proto::ProtoEncoder;

/// Encode a row of a relation into
5 changes: 4 additions & 1 deletion src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
@@ -109,6 +109,9 @@ impl SinkFormatterImpl {
});

match format_desc.encode {
SinkEncode::Parquet => {
unreachable!()
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}
SinkEncode::Json => {
let val_encoder = JsonEncoder::new(
schema,
@@ -256,7 +259,7 @@ impl SinkFormatterImpl {
let formatter = UpsertFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::UpsertAvro(formatter))
}
SinkEncode::Protobuf => err_unsupported(),
SinkEncode::Protobuf | SinkEncode::Parquet => err_unsupported(),
}
}
}
39 changes: 14 additions & 25 deletions src/connector/src/sink/opendal_sink/fs.rs
Original file line number Diff line number Diff line change
@@ -25,8 +25,8 @@ use with_options::WithOptions;
use crate::sink::opendal_sink::OpenDalSinkWriter;
use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
use crate::sink::{
DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY,
SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam,
SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};

#[derive(Deserialize, Debug, Clone, WithOptions)]
@@ -66,24 +66,9 @@ impl FsConfig {
pub struct FsSink {
pub config: FsConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
}

impl FsSink {
pub fn new(
config: FsConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
) -> Result<Self> {
Ok(Self {
config,
schema,
pk_indices,
is_append_only,
})
}
pk_indices: Vec<usize>,
format_desc: SinkFormatDesc,
}

impl FsSink {
@@ -117,14 +102,15 @@ impl Sink for FsSink {
) -> Result<Self::LogSinker> {
let op = Self::new_fs_sink(self.config.clone())?;
let path = self.config.common.path.as_ref();
println!("path = {}", path);

Ok(OpenDalSinkWriter::new(
op,
path,
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
writer_param.connector_params.sink_payload_format,
writer_param.executor_id,
self.format_desc.encode.clone(),
)?
.into_log_sinker(writer_param.sink_metrics))
}
@@ -136,11 +122,14 @@ impl TryFrom<SinkParam> for FsSink {
fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
let schema = param.schema();
let config = FsConfig::from_hashmap(param.properties)?;
FsSink::new(
Ok(Self {
config,
schema,
param.downstream_pk,
param.sink_type.is_append_only(),
)
is_append_only: param.sink_type.is_append_only(),
pk_indices: param.downstream_pk,
format_desc: param
.format_desc
.ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
})
}
}
36 changes: 12 additions & 24 deletions src/connector/src/sink/opendal_sink/gcs.rs
Original file line number Diff line number Diff line change
@@ -25,8 +25,8 @@ use with_options::WithOptions;
use crate::sink::opendal_sink::OpenDalSinkWriter;
use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
use crate::sink::{
DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY,
SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam,
SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};

const GCS_WRITE_BUFFER_SIZE: usize = 16 * 1024 * 1024;
@@ -83,22 +83,7 @@ pub struct GcsSink {
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
}

impl GcsSink {
pub fn new(
config: GcsConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
) -> Result<Self> {
Ok(Self {
config,
schema,
pk_indices,
is_append_only,
})
}
format_desc: SinkFormatDesc,
}

impl GcsSink {
@@ -146,14 +131,14 @@ impl Sink for GcsSink {
) -> Result<Self::LogSinker> {
let op = Self::new_gcs_sink(self.config.clone())?;
let path = self.config.common.path.as_ref();

Ok(OpenDalSinkWriter::new(
op,
path,
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
writer_param.connector_params.sink_payload_format,
writer_param.executor_id,
self.format_desc.encode.clone(),
)?
.into_log_sinker(writer_param.sink_metrics))
}
@@ -165,11 +150,14 @@ impl TryFrom<SinkParam> for GcsSink {
fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
let schema = param.schema();
let config = GcsConfig::from_hashmap(param.properties)?;
GcsSink::new(
Ok(Self {
config,
schema,
param.downstream_pk,
param.sink_type.is_append_only(),
)
is_append_only: param.sink_type.is_append_only(),
pk_indices: param.downstream_pk,
format_desc: param
.format_desc
.ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
})
}
}
113 changes: 74 additions & 39 deletions src/connector/src/sink/opendal_sink/mod.rs
Original file line number Diff line number Diff line change
@@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod fs;
pub mod gcs;
pub mod s3;
pub mod fs;
use std::collections::HashMap;
use std::sync::Arc;
use crate::SinkPayloadFormat;

use anyhow::anyhow;
use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, SchemaRef};
use async_trait::async_trait;
@@ -29,22 +29,27 @@ use risingwave_common::array::{to_record_batch_with_schema, Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;

use crate::sink::catalog::SinkEncode;
use crate::sink::{Result, SinkError, SinkWriter};

const SINK_WRITE_BUFFER_SIZE: usize = 16 * 1024 * 1024;

pub struct OpenDalSinkWriter {
schema: SchemaRef,
operator: Operator,
sink_writer: Option<AsyncArrowWriter<OpendalWriter>>,
sink_writer: Option<FileWriterEnum>,
pk_indices: Vec<usize>,
is_append_only: bool,
write_path: String,
epoch: Option<u64>,
sink_payload_format: SinkPayloadFormat,
executor_id: u64,
encode_type: SinkEncode,
}


enum FileWriterEnum {
ParquetWriter(AsyncArrowWriter<OpendalWriter>),
FileWriter(OpendalWriter),
}

#[async_trait]
impl SinkWriter for OpenDalSinkWriter {
@@ -72,12 +77,13 @@ impl SinkWriter for OpenDalSinkWriter {
}

async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
if is_checkpoint && let Some(sink_writer) =self
.sink_writer
.take() {


sink_writer.close().await?;
if is_checkpoint && let Some(sink_writer) = self.sink_writer.take() {
match sink_writer {
FileWriterEnum::ParquetWriter(mut w) => {
let _ = w.close().await?;
}
FileWriterEnum::FileWriter(mut w) => w.close().await?,
};
}

Ok(())
@@ -95,7 +101,8 @@ impl OpenDalSinkWriter {
rw_schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
sink_payload_format: SinkPayloadFormat,
executor_id: u64,
encode_type: SinkEncode,
) -> Result<Self> {
let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema)?;
Ok(Self {
@@ -105,37 +112,61 @@ impl OpenDalSinkWriter {
operator,
sink_writer: None,
is_append_only,
sink_payload_format,
epoch: None,
executor_id,
encode_type,
})
}

async fn create_sink_writer(&mut self, epoch: u64) -> Result<()> {
println!("这里epoch = {:?}", epoch.to_string());
let object_name = format!("{}/epoch_{}.parquet", self.write_path, epoch.to_string());
let object_store_writer = self
async fn create_object_writer(&mut self, epoch: u64) -> Result<(OpendalWriter)> {
let suffix = match self.encode_type {
SinkEncode::Json => "json",
SinkEncode::Parquet => "parquet",
_ => unimplemented!(),
};
let object_name = format!(
"{}/epoch_{}_executor_{}.{}",
self.write_path,
epoch.to_string(),
self.executor_id.to_string(),
suffix.to_string(),
);
Ok(self
.operator
.writer_with(&object_name)
.concurrent(8)
.buffer(SINK_WRITE_BUFFER_SIZE)
.await?;
let parquet_config = ParquetWriterConfig::default();
let mut props = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_1_0)
.set_bloom_filter_enabled(parquet_config.enable_bloom_filter)
.set_compression(parquet_config.compression)
.set_max_row_group_size(parquet_config.max_row_group_size)
.set_write_batch_size(parquet_config.write_batch_size)
.set_data_page_size_limit(parquet_config.data_page_size);
if let Some(created_by) = parquet_config.created_by.as_ref() {
props = props.set_created_by(created_by.to_string());
.await?)
}

async fn create_sink_writer(&mut self, epoch: u64) -> Result<()> {
let object_writer = self.create_object_writer(epoch).await?;
match self.encode_type {
SinkEncode::Parquet => {
let parquet_config = ParquetWriterConfig::default();
let mut props = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_1_0)
.set_bloom_filter_enabled(parquet_config.enable_bloom_filter)
.set_compression(parquet_config.compression)
.set_max_row_group_size(parquet_config.max_row_group_size)
.set_write_batch_size(parquet_config.write_batch_size)
.set_data_page_size_limit(parquet_config.data_page_size);
if let Some(created_by) = parquet_config.created_by.as_ref() {
props = props.set_created_by(created_by.to_string());
}
self.sink_writer = Some(FileWriterEnum::ParquetWriter(AsyncArrowWriter::try_new(
object_writer,
self.schema.clone(),
SINK_WRITE_BUFFER_SIZE,
Some(props.build()),
)?));
}
SinkEncode::Json => {
self.sink_writer = Some(FileWriterEnum::FileWriter(object_writer));
}
_ => unimplemented!(),
}
self.sink_writer = Some(AsyncArrowWriter::try_new(
object_store_writer,
self.schema.clone(),
SINK_WRITE_BUFFER_SIZE,
Some(props.build()),
)?);

Ok(())
}

@@ -144,14 +175,18 @@ impl OpenDalSinkWriter {
let filters =
chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
chunk.set_visibility(filters);
println!("schema是{:?}", self.schema.clone());
let batch = to_record_batch_with_schema(self.schema.clone(), &chunk.compact())?;

self.sink_writer
match self
.sink_writer
.as_mut()
.ok_or_else(|| SinkError::Opendal("Sink writer is not created.".to_string()))?
.write(&batch)
.await?;
{
FileWriterEnum::ParquetWriter(w) => {
let batch = to_record_batch_with_schema(self.schema.clone(), &chunk.compact())?;
w.write(&batch).await?;
}
FileWriterEnum::FileWriter(_w) => unimplemented!(),
}

Ok(())
}
Loading