Skip to content

Commit

Permalink
feat(sink): add json encode for file sink (#18744)
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Oct 8, 2024
1 parent e2c7618 commit 8d6453d
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 26 deletions.
6 changes: 3 additions & 3 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,9 @@ steps:
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3 source and sink on parquet file"
key: "s3-source-and-sink-parquet-encode"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source_and_sink.py"
- label: "S3 sink on parquet and json file"
key: "s3-sink-parquet-and-json-encode"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_sink.py"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def _table():
return 's3_test_parquet'

# Execute a SELECT statement
cur.execute(f'''CREATE sink test_file_sink as select
cur.execute(f'''CREATE sink test_file_sink_parquet as select
id,
name,
sex,
Expand All @@ -138,15 +138,15 @@ def _table():
s3.credentials.access = 'hummockadmin',
s3.credentials.secret = 'hummockadmin',
s3.endpoint_url = 'http://hummock001.127.0.0.1:9301',
s3.path = '',
s3.path = 'test_parquet_sink/',
s3.file_type = 'parquet',
type = 'append-only',
force_append_only='true'
) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''')

print('Sink into s3...')
print('Sink into s3 in parquet encode...')
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE test_sink_table(
cur.execute(f'''CREATE TABLE test_parquet_sink_table(
id bigint primary key,
name TEXT,
sex bigint,
Expand All @@ -162,7 +162,7 @@ def _table():
test_timestamptz timestamptz,
) WITH (
connector = 's3',
match_pattern = '*.parquet',
match_pattern = 'test_parquet_sink/*.parquet',
s3.region_name = 'custom',
s3.bucket_name = 'hummock001',
s3.credentials.access = 'hummockadmin',
Expand All @@ -173,14 +173,82 @@ def _table():
total_rows = file_num * item_num_per_file
MAX_RETRIES = 40
for retry_no in range(MAX_RETRIES):
cur.execute(f'select count(*) from test_sink_table')
cur.execute(f'select count(*) from test_parquet_sink_table')
result = cur.fetchone()
if result[0] == total_rows:
break
print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s")
sleep(10)

stmt = f'select count(*), sum(id) from test_parquet_sink_table'
print(f'Execute reading sink files: {stmt}')

# Execute a SELECT statement
cur.execute(f'''CREATE sink test_file_sink_json as select
id,
name,
sex,
mark,
test_int,
test_real,
test_double_precision,
test_varchar,
test_bytea,
test_date,
test_time,
test_timestamp,
test_timestamptz
from {_table()} WITH (
connector = 's3',
match_pattern = '*.parquet',
s3.region_name = 'custom',
s3.bucket_name = 'hummock001',
s3.credentials.access = 'hummockadmin',
s3.credentials.secret = 'hummockadmin',
s3.endpoint_url = 'http://hummock001.127.0.0.1:9301',
s3.path = 'test_json_sink/',
s3.file_type = 'json',
type = 'append-only',
force_append_only='true'
) FORMAT PLAIN ENCODE JSON(force_append_only='true');''')

print('Sink into s3 in json encode...')
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE test_json_sink_table(
id bigint primary key,
name TEXT,
sex bigint,
mark bigint,
test_int int,
test_real real,
test_double_precision double precision,
test_varchar varchar,
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
) WITH (
connector = 's3',
match_pattern = 'test_json_sink/*.json',
s3.region_name = 'custom',
s3.bucket_name = 'hummock001',
s3.credentials.access = 'hummockadmin',
s3.credentials.secret = 'hummockadmin',
s3.endpoint_url = 'http://hummock001.127.0.0.1:9301',
) FORMAT PLAIN ENCODE JSON;''')

total_rows = file_num * item_num_per_file
MAX_RETRIES = 40
for retry_no in range(MAX_RETRIES):
cur.execute(f'select count(*) from test_json_sink_table')
result = cur.fetchone()
if result[0] == total_rows:
break
print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s")
sleep(10)

stmt = f'select count(*), sum(id) from test_sink_table'
stmt = f'select count(*), sum(id) from test_json_sink_table'
print(f'Execute reading sink files: {stmt}')
cur.execute(stmt)
result = cur.fetchone()
Expand All @@ -194,8 +262,10 @@ def _assert_eq(field, got, expect):
_assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2)

print('File sink test pass!')
cur.execute(f'drop sink test_file_sink')
cur.execute(f'drop table test_sink_table')
cur.execute(f'drop sink test_file_sink_parquet')
cur.execute(f'drop table test_parquet_sink_table')
cur.execute(f'drop sink test_file_sink_json')
cur.execute(f'drop table test_json_sink_table')
cur.close()
conn.close()

Expand Down
62 changes: 53 additions & 9 deletions src/connector/src/sink/file_sink/opendal_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,28 @@
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::fmt::Write;
use std::marker::PhantomData;
use std::sync::Arc;

use anyhow::anyhow;
use async_trait::async_trait;
use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter};
use bytes::BytesMut;
use opendal::{FuturesAsyncWriter, Operator, Writer, Writer as OpendalWriter};
use parquet::arrow::AsyncArrowWriter;
use parquet::file::properties::WriterProperties;
use risingwave_common::array::arrow::arrow_schema_iceberg::{self, SchemaRef};
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::StreamChunk;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::catalog::Schema;
use serde_json::Value;
use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};

use crate::sink::catalog::SinkEncode;
use crate::sink::encoder::{
JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
TimestamptzHandlingMode,
};
use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
use crate::sink::{
DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam, SinkWriter,
Expand Down Expand Up @@ -106,9 +113,12 @@ impl<S: OpendalSinkBackend> Sink for FileSink<S> {
For example, `FORMAT xxx ENCODE xxx(force_append_only='true')`"
)));
}
if self.format_desc.encode != SinkEncode::Parquet {

if self.format_desc.encode != SinkEncode::Parquet
&& self.format_desc.encode != SinkEncode::Json
{
return Err(SinkError::Config(anyhow!(
"File sink only supports `PARQUET` encode at present."
"File sink only supports `PARQUET` and `JSON` encode at present."
)));
}

Expand Down Expand Up @@ -168,6 +178,7 @@ pub struct OpenDalSinkWriter {
epoch: Option<u64>,
executor_id: u64,
encode_type: SinkEncode,
row_encoder: JsonEncoder,
engine_type: EngineType,
}

Expand All @@ -179,10 +190,12 @@ pub struct OpenDalSinkWriter {
/// - `ParquetFileWriter`: Represents a Parquet file writer using the `AsyncArrowWriter<W>`
/// for writing data to a Parquet file. It accepts an implementation of W: `AsyncWrite` + `Unpin` + `Send`
/// as the underlying writer. In this case, the `OpendalWriter` serves as the underlying writer.
/// - `FileWriter`: Represents a basic `OpenDAL` writer, for writing files in encodes other than parquet.
///
/// The choice of writer used during the actual writing process depends on the encode type of the sink.
enum FileWriterEnum {
ParquetFileWriter(AsyncArrowWriter<Compat<FuturesAsyncWriter>>),
FileWriter(Writer),
}

#[async_trait]
Expand Down Expand Up @@ -218,6 +231,9 @@ impl SinkWriter for OpenDalSinkWriter {
FileWriterEnum::ParquetFileWriter(w) => {
let _ = w.close().await?;
}
FileWriterEnum::FileWriter(mut w) => {
let _ = w.close().await?;
}
};
}

Expand All @@ -235,7 +251,16 @@ impl OpenDalSinkWriter {
encode_type: SinkEncode,
engine_type: EngineType,
) -> Result<Self> {
let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema)?;
let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema.clone())?;
let row_encoder = JsonEncoder::new(
rw_schema,
None,
crate::sink::encoder::DateHandlingMode::String,
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::String,
JsonbHandlingMode::String,
);
Ok(Self {
schema: Arc::new(arrow_schema),
write_path: write_path.to_string(),
Expand All @@ -246,6 +271,7 @@ impl OpenDalSinkWriter {
executor_id,

encode_type,
row_encoder,
engine_type,
})
}
Expand All @@ -254,6 +280,7 @@ impl OpenDalSinkWriter {
// Todo: specify more file suffixes based on encode_type.
let suffix = match self.encode_type {
SinkEncode::Parquet => "parquet",
SinkEncode::Json => "json",
_ => unimplemented!(),
};

Expand Down Expand Up @@ -293,25 +320,42 @@ impl OpenDalSinkWriter {
)?,
));
}
_ => unimplemented!(),
_ => {
self.sink_writer = Some(FileWriterEnum::FileWriter(object_writer));
}
}

Ok(())
}

async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
let (data_chunk, _) = chunk.compact().into_parts();

match self
.sink_writer
.as_mut()
.ok_or_else(|| SinkError::File("Sink writer is not created.".to_string()))?
{
FileWriterEnum::ParquetFileWriter(w) => {
let batch =
IcebergArrowConvert.to_record_batch(self.schema.clone(), &data_chunk)?;
IcebergArrowConvert.to_record_batch(self.schema.clone(), chunk.data_chunk())?;
w.write(&batch).await?;
}
FileWriterEnum::FileWriter(w) => {
let mut chunk_buf = BytesMut::new();

// write the json representations of the row(s) in current chunk to `chunk_buf`
for (op, row) in chunk.rows() {
assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`");
// to prevent temporary string allocation,
// so we directly write to `chunk_buf` implicitly via `write_fmt`.
writeln!(
chunk_buf,
"{}",
Value::Object(self.row_encoder.encode(row)?)
)
.unwrap(); // write to a `BytesMut` should never fail
}
w.write(chunk_buf.freeze()).await?;
}
}

Ok(())
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,19 +895,19 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
Format::Debezium => vec![Encode::Json],
),
FileSink::<S3Sink>::SINK_NAME => hashmap!(
Format::Plain => vec![Encode::Parquet],
Format::Plain => vec![Encode::Parquet, Encode::Json],
),
FileSink::<GcsSink>::SINK_NAME => hashmap!(
Format::Plain => vec![Encode::Parquet],
Format::Plain => vec![Encode::Parquet, Encode::Json],
),
FileSink::<AzblobSink>::SINK_NAME => hashmap!(
Format::Plain => vec![Encode::Parquet],
Format::Plain => vec![Encode::Parquet, Encode::Json],
),
FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
Format::Plain => vec![Encode::Parquet],
Format::Plain => vec![Encode::Parquet, Encode::Json],
),
FileSink::<FsSink>::SINK_NAME => hashmap!(
Format::Plain => vec![Encode::Parquet],
Format::Plain => vec![Encode::Parquet, Encode::Json],
),
KinesisSink::SINK_NAME => hashmap!(
Format::Plain => vec![Encode::Json],
Expand Down

0 comments on commit 8d6453d

Please sign in to comment.