From 8d6453d3c2d84d4ca569d6847a99b983ea51b9d7 Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Tue, 8 Oct 2024 16:09:45 +0800 Subject: [PATCH] feat(sink): add json encode for file sink (#18744) --- ci/workflows/main-cron.yml | 6 +- ..._parquet_source_and_sink.py => fs_sink.py} | 88 +++++++++++++++++-- .../src/sink/file_sink/opendal_sink.rs | 62 +++++++++++-- src/frontend/src/handler/create_sink.rs | 10 +-- 4 files changed, 140 insertions(+), 26 deletions(-) rename e2e_test/s3/{fs_parquet_source_and_sink.py => fs_sink.py} (71%) diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 73fd3f3303758..140b43ed5ca21 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -508,9 +508,9 @@ steps: timeout_in_minutes: 25 retry: *auto-retry - - label: "S3 source and sink on parquet file" - key: "s3-source-and-sink-parquet-encode" - command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source_and_sink.py" + - label: "S3 sink on parquet and json file" + key: "s3-sink-parquet-and-json-encode" + command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_sink.py" if: | !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-s3-source-tests" diff --git a/e2e_test/s3/fs_parquet_source_and_sink.py b/e2e_test/s3/fs_sink.py similarity index 71% rename from e2e_test/s3/fs_parquet_source_and_sink.py rename to e2e_test/s3/fs_sink.py index 1bf7de6ad6ec1..344b1b807d7e4 100644 --- a/e2e_test/s3/fs_parquet_source_and_sink.py +++ b/e2e_test/s3/fs_sink.py @@ -116,7 +116,7 @@ def _table(): return 's3_test_parquet' # Execute a SELECT statement - cur.execute(f'''CREATE sink test_file_sink as select + cur.execute(f'''CREATE sink test_file_sink_parquet as select id, name, sex, @@ -138,15 +138,15 @@ def _table(): s3.credentials.access = 'hummockadmin', s3.credentials.secret = 'hummockadmin', s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', - s3.path = '', + s3.path = 'test_parquet_sink/', s3.file_type = 'parquet', type = 'append-only', force_append_only='true' ) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''') - print('Sink into s3...') + print('Sink into s3 in parquet encode...') # Execute a SELECT statement - cur.execute(f'''CREATE TABLE test_sink_table( + cur.execute(f'''CREATE TABLE test_parquet_sink_table( id bigint primary key, name TEXT, sex bigint, @@ -162,7 +162,7 @@ def _table(): test_timestamptz timestamptz, ) WITH ( connector = 's3', - match_pattern = '*.parquet', + match_pattern = 'test_parquet_sink/*.parquet', s3.region_name = 'custom', s3.bucket_name = 'hummock001', s3.credentials.access = 'hummockadmin', @@ -173,14 +173,82 @@ def _table(): total_rows = file_num * item_num_per_file MAX_RETRIES = 40 for retry_no in range(MAX_RETRIES): - cur.execute(f'select count(*) from test_sink_table') + cur.execute(f'select count(*) from test_parquet_sink_table') + result = cur.fetchone() + if result[0] == total_rows: + break + print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s") + sleep(10) + + stmt = f'select count(*), sum(id) from test_parquet_sink_table' + print(f'Execute reading sink files: {stmt}') + + # Execute a SELECT statement + cur.execute(f'''CREATE sink test_file_sink_json as select + id, + name, + sex, + mark, + test_int, + test_real, + test_double_precision, + test_varchar, + test_bytea, + test_date, + test_time, + test_timestamp, + test_timestamptz + from {_table()} WITH ( + connector = 's3', + match_pattern = '*.parquet', + s3.region_name = 'custom', + s3.bucket_name = 'hummock001', + s3.credentials.access = 'hummockadmin', + s3.credentials.secret = 'hummockadmin', + s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', + s3.path = 'test_json_sink/', + s3.file_type = 'json', + type = 'append-only', + force_append_only='true' + ) FORMAT PLAIN ENCODE JSON(force_append_only='true');''') + + print('Sink into s3 in json encode...') + # Execute a SELECT statement + cur.execute(f'''CREATE TABLE test_json_sink_table( + id bigint primary key, + name TEXT, + sex bigint, + mark bigint, + test_int int, + test_real real, + test_double_precision double precision, + test_varchar varchar, + test_bytea bytea, + test_date date, + test_time time, + test_timestamp timestamp, + test_timestamptz timestamptz, + ) WITH ( + connector = 's3', + match_pattern = 'test_json_sink/*.json', + s3.region_name = 'custom', + s3.bucket_name = 'hummock001', + s3.credentials.access = 'hummockadmin', + s3.credentials.secret = 'hummockadmin', + s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', + ) FORMAT PLAIN ENCODE JSON;''') + + total_rows = file_num * item_num_per_file + MAX_RETRIES = 40 + for retry_no in range(MAX_RETRIES): + cur.execute(f'select count(*) from test_json_sink_table') result = cur.fetchone() if result[0] == total_rows: break print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s") sleep(10) - stmt = f'select count(*), sum(id) from test_sink_table' + stmt = f'select count(*), sum(id) from test_json_sink_table' print(f'Execute reading sink files: {stmt}') cur.execute(stmt) result = cur.fetchone() @@ -194,8 +262,10 @@ def _assert_eq(field, got, expect): _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) print('File sink test pass!') - cur.execute(f'drop sink test_file_sink') - cur.execute(f'drop table test_sink_table') + cur.execute(f'drop sink test_file_sink_parquet') + cur.execute(f'drop table test_parquet_sink_table') + cur.execute(f'drop sink test_file_sink_json') + cur.execute(f'drop table test_json_sink_table') cur.close() conn.close() diff --git a/src/connector/src/sink/file_sink/opendal_sink.rs b/src/connector/src/sink/file_sink/opendal_sink.rs index 7843cdfe0f2c1..1f061de6c398e 100644 --- a/src/connector/src/sink/file_sink/opendal_sink.rs +++ b/src/connector/src/sink/file_sink/opendal_sink.rs @@ -13,21 +13,28 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap}; +use std::fmt::Write; use std::marker::PhantomData; use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; -use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter}; +use bytes::BytesMut; +use opendal::{FuturesAsyncWriter, Operator, Writer, Writer as OpendalWriter}; use parquet::arrow::AsyncArrowWriter; use parquet::file::properties::WriterProperties; use risingwave_common::array::arrow::arrow_schema_iceberg::{self, SchemaRef}; use risingwave_common::array::arrow::IcebergArrowConvert; -use risingwave_common::array::StreamChunk; +use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::catalog::Schema; +use serde_json::Value; use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt}; use crate::sink::catalog::SinkEncode; +use crate::sink::encoder::{ + JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode, + TimestamptzHandlingMode, +}; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam, SinkWriter, @@ -106,9 +113,12 @@ impl Sink for FileSink { For example, `FORMAT xxx ENCODE xxx(force_append_only='true')`" ))); } - if self.format_desc.encode != SinkEncode::Parquet { + + if self.format_desc.encode != SinkEncode::Parquet + && self.format_desc.encode != SinkEncode::Json + { return Err(SinkError::Config(anyhow!( - "File sink only supports `PARQUET` encode at present." + "File sink only supports `PARQUET` and `JSON` encode at present." ))); } @@ -168,6 +178,7 @@ pub struct OpenDalSinkWriter { epoch: Option, executor_id: u64, encode_type: SinkEncode, + row_encoder: JsonEncoder, engine_type: EngineType, } @@ -179,10 +190,12 @@ pub struct OpenDalSinkWriter { /// - `ParquetFileWriter`: Represents a Parquet file writer using the `AsyncArrowWriter` /// for writing data to a Parquet file. It accepts an implementation of W: `AsyncWrite` + `Unpin` + `Send` /// as the underlying writer. In this case, the `OpendalWriter` serves as the underlying writer. +/// - `FileWriter`: Represents a basic `OpenDAL` writer, for writing files in encodes other than parquet. /// /// The choice of writer used during the actual writing process depends on the encode type of the sink. enum FileWriterEnum { ParquetFileWriter(AsyncArrowWriter>), + FileWriter(Writer), } #[async_trait] @@ -218,6 +231,9 @@ impl SinkWriter for OpenDalSinkWriter { FileWriterEnum::ParquetFileWriter(w) => { let _ = w.close().await?; } + FileWriterEnum::FileWriter(mut w) => { + let _ = w.close().await?; + } }; } @@ -235,7 +251,16 @@ impl OpenDalSinkWriter { encode_type: SinkEncode, engine_type: EngineType, ) -> Result { - let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema)?; + let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema.clone())?; + let row_encoder = JsonEncoder::new( + rw_schema, + None, + crate::sink::encoder::DateHandlingMode::String, + TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcString, + TimeHandlingMode::String, + JsonbHandlingMode::String, + ); Ok(Self { schema: Arc::new(arrow_schema), write_path: write_path.to_string(), @@ -246,6 +271,7 @@ impl OpenDalSinkWriter { executor_id, encode_type, + row_encoder, engine_type, }) } @@ -254,6 +280,7 @@ impl OpenDalSinkWriter { // Todo: specify more file suffixes based on encode_type. let suffix = match self.encode_type { SinkEncode::Parquet => "parquet", + SinkEncode::Json => "json", _ => unimplemented!(), }; @@ -293,15 +320,15 @@ impl OpenDalSinkWriter { )?, )); } - _ => unimplemented!(), + _ => { + self.sink_writer = Some(FileWriterEnum::FileWriter(object_writer)); + } } Ok(()) } async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { - let (data_chunk, _) = chunk.compact().into_parts(); - match self .sink_writer .as_mut() @@ -309,9 +336,26 @@ impl OpenDalSinkWriter { { FileWriterEnum::ParquetFileWriter(w) => { let batch = - IcebergArrowConvert.to_record_batch(self.schema.clone(), &data_chunk)?; + IcebergArrowConvert.to_record_batch(self.schema.clone(), chunk.data_chunk())?; w.write(&batch).await?; } + FileWriterEnum::FileWriter(w) => { + let mut chunk_buf = BytesMut::new(); + + // write the json representations of the row(s) in current chunk to `chunk_buf` + for (op, row) in chunk.rows() { + assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`"); + // to prevent temporary string allocation, + // so we directly write to `chunk_buf` implicitly via `write_fmt`. + writeln!( + chunk_buf, + "{}", + Value::Object(self.row_encoder.encode(row)?) + ) + .unwrap(); // write to a `BytesMut` should never fail + } + w.write(chunk_buf.freeze()).await?; + } } Ok(()) diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 1bbdfccdd4034..fb35c5efc2e99 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -895,19 +895,19 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock vec![Encode::Json], ), FileSink::::SINK_NAME => hashmap!( - Format::Plain => vec![Encode::Parquet], + Format::Plain => vec![Encode::Parquet, Encode::Json], ), FileSink::::SINK_NAME => hashmap!( - Format::Plain => vec![Encode::Parquet], + Format::Plain => vec![Encode::Parquet, Encode::Json], ), FileSink::::SINK_NAME => hashmap!( - Format::Plain => vec![Encode::Parquet], + Format::Plain => vec![Encode::Parquet, Encode::Json], ), FileSink::::SINK_NAME => hashmap!( - Format::Plain => vec![Encode::Parquet], + Format::Plain => vec![Encode::Parquet, Encode::Json], ), FileSink::::SINK_NAME => hashmap!( - Format::Plain => vec![Encode::Parquet], + Format::Plain => vec![Encode::Parquet, Encode::Json], ), KinesisSink::SINK_NAME => hashmap!( Format::Plain => vec![Encode::Json],