From 7ebe0a2a5e7cd6e5b18de39bd165e94f3956190e Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Thu, 22 Aug 2024 10:11:02 +0800 Subject: [PATCH] feat(sink): introduce file sink in PARQUET format (#17311) Co-authored-by: William Wen --- ci/scripts/e2e-sink-test.sh | 2 +- ci/scripts/e2e-source-test.sh | 1 + ci/workflows/main-cron.yml | 4 +- ...ource.py => fs_parquet_source_and_sink.py} | 111 +++++- src/bench/sink_bench/main.rs | 2 +- src/connector/src/sink/catalog/mod.rs | 11 +- src/connector/src/sink/file_sink/fs.rs | 102 ++++++ src/connector/src/sink/file_sink/gcs.rs | 119 +++++++ src/connector/src/sink/file_sink/mod.rs | 18 + .../src/sink/file_sink/opendal_sink.rs | 331 ++++++++++++++++++ src/connector/src/sink/file_sink/s3.rs | 142 ++++++++ src/connector/src/sink/formatter/mod.rs | 2 + src/connector/src/sink/mod.rs | 33 +- src/connector/with_options_sink.yaml | 60 ++++ src/frontend/src/handler/create_sink.rs | 16 +- .../src/optimizer/plan_node/stream_sink.rs | 1 + src/sqlparser/src/ast/statement.rs | 2 +- src/stream/src/executor/sink.rs | 10 +- src/stream/src/from_proto/sink.rs | 1 + 19 files changed, 943 insertions(+), 25 deletions(-) rename e2e_test/s3/{fs_parquet_source.py => fs_parquet_source_and_sink.py} (56%) create mode 100644 src/connector/src/sink/file_sink/fs.rs create mode 100644 src/connector/src/sink/file_sink/gcs.rs create mode 100644 src/connector/src/sink/file_sink/mod.rs create mode 100644 src/connector/src/sink/file_sink/opendal_sink.rs create mode 100644 src/connector/src/sink/file_sink/s3.rs diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index eaee563c7a992..8b4f5aec671f6 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -7,7 +7,6 @@ source ci/scripts/common.sh # prepare environment export CONNECTOR_LIBS_PATH="./connector-node/libs" - while getopts 'p:' opt; do case ${opt} in p ) @@ -65,6 +64,7 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/sink_into_table/*.slt' sleep 1 echo "--- testing remote sinks" + # check sink destination postgres sqllogictest -p 4566 -d dev './e2e_test/sink/remote/jdbc.load.slt' sleep 1 diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 5cdd4b9e39995..56a06ac756931 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -147,6 +147,7 @@ risedev ci-kill export RISINGWAVE_CI=true echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source" +export RUST_MIN_STACK=4194304 RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ risedev ci-start ci-kafka ./scripts/source/prepare_ci_kafka.sh diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 53151a900ef1a..1c2083e1a0d4d 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -528,9 +528,9 @@ steps: timeout_in_minutes: 25 retry: *auto-retry - - label: "S3 source check on parquet file" + - label: "S3 source and sink on parquet file" key: "s3-v2-source-check-parquet-file" - command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source.py" + command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source_and_sink.py" if: | !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-s3-source-tests" diff --git a/e2e_test/s3/fs_parquet_source.py b/e2e_test/s3/fs_parquet_source_and_sink.py similarity index 56% rename from e2e_test/s3/fs_parquet_source.py rename to e2e_test/s3/fs_parquet_source_and_sink.py index 9422e1403e125..9e4db6e284d78 100644 --- a/e2e_test/s3/fs_parquet_source.py +++ b/e2e_test/s3/fs_parquet_source_and_sink.py @@ -95,12 +95,110 @@ def _assert_eq(field, got, expect): _assert_eq('count(*)', result[0], total_rows) _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) - print('Test pass') + print('File source test pass!') - cur.execute(f'drop table {_table()}') cur.close() conn.close() +def do_sink(config, file_num, item_num_per_file, prefix): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + # Open a cursor to execute SQL statements + cur = conn.cursor() + + def _table(): + return 's3_test_parquet' + + # Execute a SELECT statement + cur.execute(f'''CREATE sink test_file_sink as select + id, + name, + sex, + mark, + test_int, + test_real, + test_double_precision, + test_varchar, + test_bytea, + test_date, + test_time, + test_timestamp, + test_timestamptz + from {_table()} WITH ( + connector = 's3', + match_pattern = '*.parquet', + s3.region_name = '{config['S3_REGION']}', + s3.bucket_name = '{config['S3_BUCKET']}', + s3.credentials.access = '{config['S3_ACCESS_KEY']}', + s3.credentials.secret = '{config['S3_SECRET_KEY']}', + s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' + s3.path = '', + s3.file_type = 'parquet', + type = 'append-only', + force_append_only='true' + ) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''') + + print('Sink into s3...') + # Execute a SELECT statement + cur.execute(f'''CREATE TABLE test_sink_table( + id bigint primary key, + name TEXT, + sex bigint, + mark bigint, + test_int int, + test_real real, + test_double_precision double precision, + test_varchar varchar, + test_bytea bytea, + test_date date, + test_time time, + test_timestamp timestamp, + test_timestamptz timestamptz, + ) WITH ( + connector = 's3_v2', + match_pattern = '*.parquet', + s3.region_name = '{config['S3_REGION']}', + s3.bucket_name = '{config['S3_BUCKET']}', + s3.credentials.access = '{config['S3_ACCESS_KEY']}', + s3.credentials.secret = '{config['S3_SECRET_KEY']}', + s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' + ) FORMAT PLAIN ENCODE PARQUET;''') + + total_rows = file_num * item_num_per_file + MAX_RETRIES = 40 + for retry_no in range(MAX_RETRIES): + cur.execute(f'select count(*) from test_sink_table') + result = cur.fetchone() + if result[0] == total_rows: + break + print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s") + sleep(10) + + stmt = f'select count(*), sum(id) from test_sink_table' + print(f'Execute reading sink files: {stmt}') + cur.execute(stmt) + result = cur.fetchone() + + print('Got:', result) + + def _assert_eq(field, got, expect): + assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' + + _assert_eq('count(*)', result[0], total_rows) + _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) + + print('File sink test pass!') + cur.execute(f'drop sink test_file_sink') + cur.execute(f'drop table test_sink_table') + cur.close() + conn.close() + + if __name__ == "__main__": FILE_NUM = 10 @@ -134,4 +232,11 @@ def _assert_eq(field, got, expect): # clean up s3 files for idx, _ in enumerate(data): - client.remove_object(config["S3_BUCKET"], _s3(idx)) \ No newline at end of file + client.remove_object(config["S3_BUCKET"], _s3(idx)) + + do_sink(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id) + + # clean up s3 files + for idx, _ in enumerate(data): + client.remove_object(config["S3_BUCKET"], _s3(idx)) + diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index 0aa70ad00c364..05eabdb96e913 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -549,7 +549,7 @@ async fn main() { println!("Start Sink Bench!, Wait {:?}s", BENCH_TIME); tokio::spawn(async move { dispatch_sink!(sink, sink, { - consume_log_stream(sink, mock_range_log_reader, sink_writer_param).boxed() + consume_log_stream(*sink, mock_range_log_reader, sink_writer_param).boxed() }) .await .unwrap(); diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 7a9dc5d564ca7..23f34eab97417 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -147,6 +147,7 @@ pub enum SinkEncode { Protobuf, Avro, Template, + Parquet, Text, } @@ -202,6 +203,7 @@ impl SinkFormatDesc { SinkEncode::Protobuf => E::Protobuf, SinkEncode::Avro => E::Avro, SinkEncode::Template => E::Template, + SinkEncode::Parquet => E::Parquet, SinkEncode::Text => E::Text, }; @@ -250,13 +252,8 @@ impl TryFrom for SinkFormatDesc { E::Protobuf => SinkEncode::Protobuf, E::Template => SinkEncode::Template, E::Avro => SinkEncode::Avro, - e @ (E::Unspecified - | E::Native - | E::Csv - | E::Bytes - | E::None - | E::Text - | E::Parquet) => { + E::Parquet => SinkEncode::Parquet, + e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None | E::Text) => { return Err(SinkError::Config(anyhow!( "sink encode unsupported: {}", e.as_str_name() diff --git a/src/connector/src/sink/file_sink/fs.rs b/src/connector/src/sink/file_sink/fs.rs new file mode 100644 index 0000000000000..581f66ec7a793 --- /dev/null +++ b/src/connector/src/sink/file_sink/fs.rs @@ -0,0 +1,102 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use std::collections::{BTreeMap, HashMap}; + +use anyhow::anyhow; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::Fs; +use opendal::Operator; +use serde::Deserialize; +use serde_with::serde_as; +use with_options::WithOptions; + +use crate::sink::file_sink::opendal_sink::{FileSink, OpendalSinkBackend}; +use crate::sink::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use crate::source::UnknownFields; + +#[derive(Deserialize, Debug, Clone, WithOptions)] +pub struct FsCommon { + /// The directory where the sink file is located. + #[serde(rename = "fs.path")] + pub path: String, +} + +#[serde_as] +#[derive(Clone, Debug, Deserialize, WithOptions)] +pub struct FsConfig { + #[serde(flatten)] + pub common: FsCommon, + + pub r#type: String, // accept "append-only" + + #[serde(flatten)] + pub unknown_fields: HashMap, +} + +impl UnknownFields for FsConfig { + fn unknown_fields(&self) -> HashMap { + self.unknown_fields.clone() + } +} + +pub const FS_SINK: &str = "fs"; + +impl FileSink { + pub fn new_fs_sink(config: FsConfig) -> Result { + // Create fs builder. + let mut builder = Fs::default(); + // Create fs backend builder. + builder.root(&config.common.path); + let operator: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + Ok(operator) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct FsSink; + +impl OpendalSinkBackend for FsSink { + type Properties = FsConfig; + + const SINK_NAME: &'static str = FS_SINK; + + fn from_btreemap(btree_map: BTreeMap) -> Result { + let config = serde_json::from_value::(serde_json::to_value(btree_map).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { + return Err(SinkError::Config(anyhow!( + "`{}` must be {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_UPSERT + ))); + } + Ok(config) + } + + fn new_operator(properties: FsConfig) -> Result { + FileSink::::new_fs_sink(properties) + } + + fn get_path(properties: Self::Properties) -> String { + properties.common.path + } + + fn get_engine_type() -> super::opendal_sink::EngineType { + super::opendal_sink::EngineType::Fs + } +} diff --git a/src/connector/src/sink/file_sink/gcs.rs b/src/connector/src/sink/file_sink/gcs.rs new file mode 100644 index 0000000000000..c38669909c732 --- /dev/null +++ b/src/connector/src/sink/file_sink/gcs.rs @@ -0,0 +1,119 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use std::collections::{BTreeMap, HashMap}; + +use anyhow::anyhow; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::Gcs; +use opendal::Operator; +use serde::Deserialize; +use serde_with::serde_as; +use with_options::WithOptions; + +use super::opendal_sink::FileSink; +use crate::sink::file_sink::opendal_sink::OpendalSinkBackend; +use crate::sink::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use crate::source::UnknownFields; + +#[derive(Deserialize, Debug, Clone, WithOptions)] +pub struct GcsCommon { + #[serde(rename = "gcs.bucket_name")] + pub bucket_name: String, + + /// The base64 encoded credential key. If not set, ADC will be used. + #[serde(rename = "gcs.credential")] + pub credential: String, + + /// If credential/ADC is not set. The service account can be used to provide the credential info. + #[serde(rename = "gcs.service_account", default)] + pub service_account: String, + + /// The directory where the sink file is located + #[serde(rename = "gcs.path")] + pub path: String, +} + +#[serde_as] +#[derive(Clone, Debug, Deserialize, WithOptions)] +pub struct GcsConfig { + #[serde(flatten)] + pub common: GcsCommon, + + pub r#type: String, // accept "append-only" + + #[serde(flatten)] + pub unknown_fields: HashMap, +} + +impl UnknownFields for GcsConfig { + fn unknown_fields(&self) -> HashMap { + self.unknown_fields.clone() + } +} + +pub const GCS_SINK: &str = "gcs"; + +impl FileSink { + pub fn new_gcs_sink(config: GcsConfig) -> Result { + // Create gcs builder. + let mut builder = Gcs::default(); + + builder.bucket(&config.common.bucket_name); + + builder.credential(&config.common.credential); + + builder.service_account(&config.common.service_account); + + let operator: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + Ok(operator) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct GcsSink; + +impl OpendalSinkBackend for GcsSink { + type Properties = GcsConfig; + + const SINK_NAME: &'static str = GCS_SINK; + + fn from_btreemap(btree_map: BTreeMap) -> Result { + let config = serde_json::from_value::(serde_json::to_value(btree_map).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { + return Err(SinkError::Config(anyhow!( + "`{}` must be {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_UPSERT + ))); + } + Ok(config) + } + + fn new_operator(properties: GcsConfig) -> Result { + FileSink::::new_gcs_sink(properties) + } + + fn get_path(properties: Self::Properties) -> String { + properties.common.path + } + + fn get_engine_type() -> super::opendal_sink::EngineType { + super::opendal_sink::EngineType::Gcs + } +} diff --git a/src/connector/src/sink/file_sink/mod.rs b/src/connector/src/sink/file_sink/mod.rs new file mode 100644 index 0000000000000..39e0f0208f884 --- /dev/null +++ b/src/connector/src/sink/file_sink/mod.rs @@ -0,0 +1,18 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod fs; +pub mod gcs; +pub mod opendal_sink; +pub mod s3; diff --git a/src/connector/src/sink/file_sink/opendal_sink.rs b/src/connector/src/sink/file_sink/opendal_sink.rs new file mode 100644 index 0000000000000..f157d0c861d17 --- /dev/null +++ b/src/connector/src/sink/file_sink/opendal_sink.rs @@ -0,0 +1,331 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashMap}; +use std::marker::PhantomData; +use std::sync::Arc; + +use anyhow::anyhow; +use arrow_schema_iceberg::SchemaRef; +use async_trait::async_trait; +use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter}; +use parquet::arrow::AsyncArrowWriter; +use parquet::file::properties::WriterProperties; +use risingwave_common::array::arrow::IcebergArrowConvert; +use risingwave_common::array::StreamChunk; +use risingwave_common::catalog::Schema; +use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt}; + +use crate::sink::catalog::SinkEncode; +use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; +use crate::sink::{ + DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam, SinkWriter, +}; +use crate::source::TryFromBTreeMap; +use crate::with_options::WithOptions; + +/// The `FileSink` struct represents a file sink that uses the `OpendalSinkBackend` trait for its backend implementation. +/// +/// # Type Parameters +/// +/// - S: The type parameter S represents the concrete implementation of the `OpendalSinkBackend` trait used by this file sink. +#[derive(Debug, Clone)] +pub struct FileSink { + pub(crate) op: Operator, + /// The path to the file where the sink writes data. + pub(crate) path: String, + /// The schema describing the structure of the data being written to the file sink. + pub(crate) schema: Schema, + pub(crate) is_append_only: bool, + + /// The description of the sink's format. + pub(crate) format_desc: SinkFormatDesc, + pub(crate) engine_type: EngineType, + pub(crate) _marker: PhantomData, +} + +/// The `OpendalSinkBackend` trait unifies the behavior of various sink backends +/// implemented through `OpenDAL`(``). +/// +/// # Type Parameters +/// +/// - Properties: Represents the necessary parameters for establishing a backend. +/// +/// # Constants +/// +/// - `SINK_NAME`: A static string representing the name of the sink. +/// +/// # Functions +/// +/// - `from_btreemap`: Automatically parse the required parameters from the input create sink statement. +/// - `new_operator`: Creates a new operator using the provided backend properties. +/// - `get_path`: Returns the path of the sink file specified by the user's create sink statement. +pub trait OpendalSinkBackend: Send + Sync + 'static + Clone + PartialEq { + type Properties: TryFromBTreeMap + Send + Sync + Clone + WithOptions; + const SINK_NAME: &'static str; + + fn from_btreemap(btree_map: BTreeMap) -> Result; + fn new_operator(properties: Self::Properties) -> Result; + fn get_path(properties: Self::Properties) -> String; + fn get_engine_type() -> EngineType; +} + +#[derive(Clone, Debug)] +pub enum EngineType { + Gcs, + S3, + Fs, +} + +impl Sink for FileSink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = LogSinkerOf; + + const SINK_NAME: &'static str = S::SINK_NAME; + + async fn validate(&self) -> Result<()> { + if !self.is_append_only { + return Err(SinkError::Config(anyhow!( + "File sink only supports append-only mode at present. \ + Please change the query to append-only, and specify it \ + explicitly after the `FORMAT ... ENCODE ...` statement. \ + For example, `FORMAT xxx ENCODE xxx(force_append_only='true')`" + ))); + } + if self.format_desc.encode != SinkEncode::Parquet { + return Err(SinkError::Config(anyhow!( + "File sink only supports `PARQUET` encode at present." + ))); + } + Ok(()) + } + + async fn new_log_sinker( + &self, + writer_param: crate::sink::SinkWriterParam, + ) -> Result { + Ok(OpenDalSinkWriter::new( + self.op.clone(), + &self.path, + self.schema.clone(), + self.is_append_only, + writer_param.executor_id, + self.format_desc.encode.clone(), + self.engine_type.clone(), + )? + .into_log_sinker(writer_param.sink_metrics)) + } +} + +impl TryFrom for FileSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = S::from_btreemap(param.properties)?; + let path = S::get_path(config.clone()).to_string(); + let op = S::new_operator(config.clone())?; + let engine_type = S::get_engine_type(); + Ok(Self { + op, + path, + schema, + is_append_only: param.sink_type.is_append_only(), + format_desc: param + .format_desc + .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?, + engine_type, + _marker: PhantomData, + }) + } +} + +pub struct OpenDalSinkWriter { + schema: SchemaRef, + operator: Operator, + sink_writer: Option, + is_append_only: bool, + write_path: String, + epoch: Option, + executor_id: u64, + encode_type: SinkEncode, + engine_type: EngineType, +} + +/// The `FileWriterEnum` enum represents different types of file writers used for various sink +/// implementations. +/// +/// # Variants +/// +/// - `ParquetFileWriter`: Represents a Parquet file writer using the `AsyncArrowWriter` +/// for writing data to a Parquet file. It accepts an implementation of W: `AsyncWrite` + `Unpin` + `Send` +/// as the underlying writer. In this case, the `OpendalWriter` serves as the underlying writer. +/// +/// The choice of writer used during the actual writing process depends on the encode type of the sink. +enum FileWriterEnum { + ParquetFileWriter(AsyncArrowWriter>), +} + +#[async_trait] +impl SinkWriter for OpenDalSinkWriter { + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + // Note: epoch is used to name the output files. + // Todo: after enabling sink decouple, use the new naming convention. + let epoch = self.epoch.ok_or_else(|| { + SinkError::File("epoch has not been initialize, call `begin_epoch`".to_string()) + })?; + if self.sink_writer.is_none() { + self.create_sink_writer(epoch).await?; + } + if self.is_append_only { + self.append_only(chunk).await + } else { + // currently file sink only supports append only mode. + unimplemented!() + } + } + + async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { + self.epoch = Some(epoch); + Ok(()) + } + + /// For the file sink, currently, the sink decoupling feature is not enabled. + /// When a checkpoint arrives, the force commit is performed to write the data to the file. + /// In the future if flush and checkpoint is decoupled, we should enable sink decouple accordingly. + async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { + if is_checkpoint && let Some(sink_writer) = self.sink_writer.take() { + match sink_writer { + FileWriterEnum::ParquetFileWriter(w) => { + let _ = w.close().await?; + } + }; + } + + Ok(()) + } +} + +impl OpenDalSinkWriter { + pub fn new( + operator: Operator, + write_path: &str, + rw_schema: Schema, + is_append_only: bool, + executor_id: u64, + encode_type: SinkEncode, + engine_type: EngineType, + ) -> Result { + let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema)?; + Ok(Self { + schema: Arc::new(arrow_schema), + write_path: write_path.to_string(), + operator, + sink_writer: None, + is_append_only, + epoch: None, + executor_id, + + encode_type, + engine_type, + }) + } + + async fn create_object_writer(&mut self, epoch: u64) -> Result { + // Todo: specify more file suffixes based on encode_type. + let suffix = match self.encode_type { + SinkEncode::Parquet => "parquet", + _ => unimplemented!(), + }; + + // Note: sink decoupling is not currently supported, which means that output files will not be batched across checkpoints. + // The current implementation writes files every time a checkpoint arrives, so the naming convention is `epoch + executor_id + .suffix`. + let object_name = match self.engine_type { + // For the local fs sink, the data will be automatically written to the defined path. + // Therefore, there is no need to specify the path in the file name. + EngineType::Fs => { + format!("{}_{}.{}", epoch, self.executor_id, suffix,) + } + _ => format!( + "{}/{}_{}.{}", + self.write_path, epoch, self.executor_id, suffix, + ), + }; + + Ok(self + .operator + .writer_with(&object_name) + .concurrent(8) + .await?) + } + + async fn create_sink_writer(&mut self, epoch: u64) -> Result<()> { + let object_writer = self.create_object_writer(epoch).await?; + match self.encode_type { + SinkEncode::Parquet => { + let props = WriterProperties::builder(); + let parquet_writer: tokio_util::compat::Compat = + object_writer.into_futures_async_write().compat_write(); + self.sink_writer = Some(FileWriterEnum::ParquetFileWriter( + AsyncArrowWriter::try_new( + parquet_writer, + self.schema.clone(), + Some(props.build()), + )?, + )); + } + _ => unimplemented!(), + } + + Ok(()) + } + + async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + let (data_chunk, _) = chunk.compact().into_parts(); + + match self + .sink_writer + .as_mut() + .ok_or_else(|| SinkError::File("Sink writer is not created.".to_string()))? + { + FileWriterEnum::ParquetFileWriter(w) => { + let batch = + IcebergArrowConvert.to_record_batch(self.schema.clone(), &data_chunk)?; + w.write(&batch).await?; + } + } + + Ok(()) + } +} + +fn convert_rw_schema_to_arrow_schema( + rw_schema: risingwave_common::catalog::Schema, +) -> anyhow::Result { + let mut schema_fields = HashMap::new(); + rw_schema.fields.iter().for_each(|field| { + let res = schema_fields.insert(&field.name, &field.data_type); + // This assert is to make sure there is no duplicate field name in the schema. + assert!(res.is_none()) + }); + let mut arrow_fields = vec![]; + for rw_field in &rw_schema.fields { + let arrow_field = IcebergArrowConvert + .to_arrow_field(&rw_field.name.clone(), &rw_field.data_type.clone())?; + + arrow_fields.push(arrow_field); + } + + Ok(arrow_schema_iceberg::Schema::new(arrow_fields)) +} diff --git a/src/connector/src/sink/file_sink/s3.rs b/src/connector/src/sink/file_sink/s3.rs new file mode 100644 index 0000000000000..417094600e61d --- /dev/null +++ b/src/connector/src/sink/file_sink/s3.rs @@ -0,0 +1,142 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use std::collections::{BTreeMap, HashMap}; + +use anyhow::anyhow; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::S3; +use opendal::Operator; +use serde::Deserialize; +use serde_with::serde_as; +use with_options::WithOptions; + +use super::opendal_sink::FileSink; +use crate::sink::file_sink::opendal_sink::OpendalSinkBackend; +use crate::sink::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use crate::source::UnknownFields; +#[derive(Deserialize, Debug, Clone, WithOptions)] +pub struct S3Common { + #[serde(rename = "s3.region_name")] + pub region_name: String, + #[serde(rename = "s3.bucket_name")] + pub bucket_name: String, + /// The directory where the sink file is located. + #[serde(rename = "s3.path")] + pub path: String, + #[serde(rename = "s3.credentials.access", default)] + pub access: Option, + #[serde(rename = "s3.credentials.secret", default)] + pub secret: Option, + #[serde(rename = "s3.endpoint_url")] + pub endpoint_url: Option, + #[serde(rename = "s3.assume_role", default)] + pub assume_role: Option, +} + +#[serde_as] +#[derive(Clone, Debug, Deserialize, WithOptions)] +pub struct S3Config { + #[serde(flatten)] + pub common: S3Common, + + pub r#type: String, // accept "append-only" + + #[serde(flatten)] + pub unknown_fields: HashMap, +} + +pub const S3_SINK: &str = "s3"; + +impl FileSink { + pub fn new_s3_sink(config: S3Config) -> Result { + // Create s3 builder. + let mut builder = S3::default(); + builder.bucket(&config.common.bucket_name); + builder.region(&config.common.region_name); + + if let Some(endpoint_url) = config.common.endpoint_url { + builder.endpoint(&endpoint_url); + } + + if let Some(access) = config.common.access { + builder.access_key_id(&access); + } else { + tracing::error!( + "access key id of aws s3 is not set, bucket {}", + config.common.bucket_name + ); + } + + if let Some(secret) = config.common.secret { + builder.secret_access_key(&secret); + } else { + tracing::error!( + "secret access key of aws s3 is not set, bucket {}", + config.common.bucket_name + ); + } + + if let Some(assume_role) = config.common.assume_role { + builder.role_arn(&assume_role); + } + builder.disable_config_load(); + let operator: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + + Ok(operator) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct S3Sink; + +impl UnknownFields for S3Config { + fn unknown_fields(&self) -> HashMap { + self.unknown_fields.clone() + } +} + +impl OpendalSinkBackend for S3Sink { + type Properties = S3Config; + + const SINK_NAME: &'static str = S3_SINK; + + fn from_btreemap(btree_map: BTreeMap) -> Result { + let config = serde_json::from_value::(serde_json::to_value(btree_map).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { + return Err(SinkError::Config(anyhow!( + "`{}` must be {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_UPSERT + ))); + } + Ok(config) + } + + fn new_operator(properties: S3Config) -> Result { + FileSink::::new_s3_sink(properties) + } + + fn get_path(properties: Self::Properties) -> String { + properties.common.path + } + + fn get_engine_type() -> super::opendal_sink::EngineType { + super::opendal_sink::EngineType::S3 + } +} diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 6da8a1e0d2008..9ac7d2114e458 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -375,6 +375,8 @@ impl SinkFormatterImpl { | (F::Upsert, E::Protobuf, _) | (F::Debezium, E::Json, Some(_)) | (F::Debezium, E::Avro | E::Protobuf | E::Template | E::Text, _) + | (_, E::Parquet, _) + | (_, _, Some(E::Parquet)) | (F::AppendOnly | F::Upsert, _, Some(E::Template) | Some(E::Json) | Some(E::Avro) | Some(E::Protobuf)) // reject other encode as key encode => { return Err(SinkError::Config(anyhow!( diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 3391520ed0c23..0ee077bbdccb8 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - pub mod big_query; pub mod boxed; pub mod catalog; @@ -24,6 +23,7 @@ pub mod doris_starrocks_connector; pub mod dynamodb; pub mod elasticsearch; pub mod encoder; +pub mod file_sink; pub mod formatter; pub mod google_pubsub; pub mod iceberg; @@ -53,6 +53,8 @@ use ::deltalake::DeltaTableError; use ::redis::RedisError; use anyhow::anyhow; use async_trait::async_trait; +use opendal::Error as OpendalError; +use risingwave_common::array::ArrayError; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::metrics::{ @@ -73,9 +75,9 @@ use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEn use crate::error::ConnectorError; use crate::sink::catalog::desc::SinkDesc; use crate::sink::catalog::{SinkCatalog, SinkId}; +use crate::sink::file_sink::fs::FsSink; use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset}; use crate::sink::writer::SinkWriter; - const BOUNDED_CHANNEL_SIZE: usize = 16; #[macro_export] macro_rules! for_all_sinks { @@ -99,6 +101,9 @@ macro_rules! for_all_sinks { { HttpJava, $crate::sink::remote::HttpJavaSink }, { Doris, $crate::sink::doris::DorisSink }, { Starrocks, $crate::sink::starrocks::StarrocksSink }, + { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>}, + { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink> }, + { Fs, $crate::sink::file_sink::opendal_sink::FileSink }, { Snowflake, $crate::sink::snowflake::SnowflakeSink }, { DeltaLake, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, @@ -480,14 +485,14 @@ macro_rules! def_sink_impl { #[derive(Debug)] pub enum SinkImpl { $( - $variant_name($sink_type), + $variant_name(Box<$sink_type>), )* } $( impl From<$sink_type> for SinkImpl { fn from(sink: $sink_type) -> SinkImpl { - SinkImpl::$variant_name(sink) + SinkImpl::$variant_name(Box::new(sink)) } } )* @@ -572,6 +577,8 @@ pub enum SinkError { ), #[error("Starrocks error: {0}")] Starrocks(String), + #[error("File error: {0}")] + File(String), #[error("Snowflake error: {0}")] Snowflake( #[source] @@ -634,6 +641,24 @@ impl From for SinkError { } } +impl From for SinkError { + fn from(error: OpendalError) -> Self { + SinkError::File(error.to_report_string()) + } +} + +impl From for SinkError { + fn from(error: parquet::errors::ParquetError) -> Self { + SinkError::File(error.to_report_string()) + } +} + +impl From for SinkError { + fn from(error: ArrayError) -> Self { + SinkError::File(error.to_report_string()) + } +} + impl From for SinkError { fn from(value: RpcError) -> Self { SinkError::Remote(anyhow!(value)) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 48cc3ce81ec17..c6c36917e9c21 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -209,6 +209,36 @@ DynamoDbConfig: required: false alias: - profile +FsConfig: + fields: + - name: fs.path + field_type: String + comments: The directory where the sink file is located. + required: true + - name: r#type + field_type: String + required: true +GcsConfig: + fields: + - name: gcs.bucket_name + field_type: String + required: true + - name: gcs.credential + field_type: String + comments: The base64 encoded credential key. If not set, ADC will be used. + required: true + - name: gcs.service_account + field_type: String + comments: If credential/ADC is not set. The service account can be used to provide the credential info. + required: false + default: Default::default + - name: gcs.path + field_type: String + comments: The directory where the sink file is located + required: true + - name: r#type + field_type: String + required: true GooglePubSubConfig: fields: - name: pubsub.project_id @@ -819,6 +849,36 @@ RedisConfig: - name: redis.url field_type: String required: true +S3Config: + fields: + - name: s3.region_name + field_type: String + required: true + - name: s3.bucket_name + field_type: String + required: true + - name: s3.path + field_type: String + comments: The directory where the sink file is located. + required: true + - name: s3.credentials.access + field_type: String + required: false + default: Default::default + - name: s3.credentials.secret + field_type: String + required: false + default: Default::default + - name: s3.endpoint_url + field_type: String + required: false + - name: s3.assume_role + field_type: String + required: false + default: Default::default + - name: r#type + field_type: String + required: true SnowflakeConfig: fields: - name: snowflake.s3_bucket diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 834f92906efa2..84d98f43e2a14 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -830,7 +830,8 @@ fn bind_sink_format_desc(session: &SessionImpl, value: ConnectorSchema) -> Resul E::Protobuf => SinkEncode::Protobuf, E::Avro => SinkEncode::Avro, E::Template => SinkEncode::Template, - e @ (E::Native | E::Csv | E::Bytes | E::None | E::Text | E::Parquet) => { + E::Parquet => SinkEncode::Parquet, + e @ (E::Native | E::Csv | E::Bytes | E::None | E::Text) => { return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into()); } }; @@ -868,6 +869,10 @@ fn bind_sink_format_desc(session: &SessionImpl, value: ConnectorSchema) -> Resul static CONNECTORS_COMPATIBLE_FORMATS: LazyLock>>> = LazyLock::new(|| { + use risingwave_connector::sink::file_sink::fs::FsSink; + use risingwave_connector::sink::file_sink::gcs::GcsSink; + use risingwave_connector::sink::file_sink::opendal_sink::FileSink; + use risingwave_connector::sink::file_sink::s3::S3Sink; use risingwave_connector::sink::google_pubsub::GooglePubSubSink; use risingwave_connector::sink::kafka::KafkaSink; use risingwave_connector::sink::kinesis::KinesisSink; @@ -885,6 +890,15 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock vec![Encode::Json, Encode::Avro, Encode::Protobuf], Format::Debezium => vec![Encode::Json], ), + FileSink::::SINK_NAME => hashmap!( + Format::Plain => vec![Encode::Parquet], + ), + FileSink::::SINK_NAME => hashmap!( + Format::Plain => vec![Encode::Parquet], + ), + FileSink::::SINK_NAME => hashmap!( + Format::Plain => vec![Encode::Parquet], + ), KinesisSink::SINK_NAME => hashmap!( Format::Plain => vec![Encode::Json], Format::Upsert => vec![Encode::Json], diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index dcdd15b067a0e..2717c454e6435 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -26,6 +26,7 @@ use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::desc::SinkDesc; use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType}; +use risingwave_connector::sink::file_sink::fs::FsSink; use risingwave_connector::sink::iceberg::ICEBERG_SINK; use risingwave_connector::sink::trivial::TABLE_SINK; use risingwave_connector::sink::{ diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 732badfdd0a52..db314fc8d6f6e 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -208,9 +208,9 @@ impl Encode { "PROTOBUF" => Encode::Protobuf, "JSON" => Encode::Json, "TEMPLATE" => Encode::Template, + "PARQUET" => Encode::Parquet, "NATIVE" => Encode::Native, "NONE" => Encode::None, - "PARQUET" => Encode::Parquet, _ => parser_err!( "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | PARQUET | NONE after Encode" ), diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index ef8b57781cd95..23162241ef298 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -227,9 +227,9 @@ impl SinkExecutor { actor_id, ); - dispatch_sink!(self.sink, sink, { + let consume_log_stream_future = dispatch_sink!(self.sink, sink, { let consume_log_stream = Self::execute_consume_log( - sink, + *sink, log_reader, self.input_columns, self.sink_param, @@ -239,9 +239,9 @@ impl SinkExecutor { .instrument_await(format!("consume_log (sink_id {sink_id})")) .map_ok(|never| match never {}); // unify return type to `Message` - // TODO: may try to remove the boxed - select(consume_log_stream.into_stream(), write_log_stream).boxed() - }) + consume_log_stream.boxed() + }); + select(consume_log_stream_future.into_stream(), write_log_stream) }) .into_stream() .flatten() diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index cff2009af7830..c53c123a48cb5 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -20,6 +20,7 @@ use risingwave_common::secret::LocalSecretManager; use risingwave_common::types::DataType; use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkId, SinkType}; +use risingwave_connector::sink::file_sink::fs::FsSink; use risingwave_connector::sink::{ SinkError, SinkMetaClient, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, };