diff --git a/Cargo.lock b/Cargo.lock index cbde18e16a278..05b53b996dff0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11110,6 +11110,7 @@ dependencies = [ "arrow-schema 52.0.0", "arrow-select 50.0.0", "assert_matches", + "async-compression", "async-nats", "async-trait", "auto_enums", diff --git a/e2e_test/s3/fs_source_v2.py b/e2e_test/s3/fs_source_v2.py index a687c9be19c9d..760b8d07a09a5 100644 --- a/e2e_test/s3/fs_source_v2.py +++ b/e2e_test/s3/fs_source_v2.py @@ -29,6 +29,7 @@ def format_json(data): for file in data ] + def format_csv(data, with_header): csv_files = [] diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index b00c8013c70e4..86491ae464a3f 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -23,6 +23,7 @@ arrow-schema = { workspace = true } arrow-schema-iceberg = { workspace = true } arrow-select = { workspace = true } assert_matches = "1" +async-compression = { version = "0.4.5", features = ["gzip", "tokio"] } async-nats = "0.35" async-trait = "0.1" auto_enums = { workspace = true } diff --git a/src/connector/src/source/filesystem/file_common.rs b/src/connector/src/source/filesystem/file_common.rs index b2c5d5ccdb4da..e012fc9ce1e1b 100644 --- a/src/connector/src/source/filesystem/file_common.rs +++ b/src/connector/src/source/filesystem/file_common.rs @@ -18,6 +18,7 @@ use std::marker::PhantomData; use aws_sdk_s3::types::Object; use risingwave_common::types::{JsonbVal, Timestamptz}; use serde::{Deserialize, Serialize}; +use strum::Display; use super::opendal_source::OpendalSource; use crate::error::ConnectorResult; @@ -141,3 +142,12 @@ pub struct FsPageItem { } pub type FsPage = Vec; + +#[derive(Debug, Default, Clone, PartialEq, Display, Deserialize)] +pub enum CompressionFormat { + #[default] + None, + + #[serde(rename = "gzip", alias = "gz")] + Gzip, +} diff --git a/src/connector/src/source/filesystem/opendal_source/gcs_source.rs b/src/connector/src/source/filesystem/opendal_source/gcs_source.rs index 01594af4e4bad..768f19fc36722 100644 --- a/src/connector/src/source/filesystem/opendal_source/gcs_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/gcs_source.rs @@ -58,11 +58,15 @@ impl OpendalEnumerator { } else { (None, None) }; + + let compression_format = gcs_properties.compression_format; + Ok(Self { op, prefix, matcher, marker: PhantomData, + compression_format, }) } } diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index a9689a921d7f0..26a311f26eb8d 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -25,6 +25,7 @@ pub mod opendal_reader; use self::opendal_enumerator::OpendalEnumerator; use self::opendal_reader::OpendalReader; +use super::file_common::CompressionFormat; use super::s3::S3PropertiesCommon; use super::OpendalFsSplit; use crate::error::ConnectorResult; @@ -53,6 +54,9 @@ pub struct GcsProperties { #[serde(flatten)] pub unknown_fields: HashMap, + + #[serde(rename = "compression_format", default = "Default::default")] + pub compression_format: CompressionFormat, } impl UnknownFields for GcsProperties { @@ -147,6 +151,8 @@ pub struct PosixFsProperties { #[serde(flatten)] pub unknown_fields: HashMap, + #[serde(rename = "compression_format", default = "Default::default")] + pub compression_format: CompressionFormat, } impl UnknownFields for PosixFsProperties { diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs index f65eacd9a9696..864d1de56c7be 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs @@ -23,6 +23,7 @@ use risingwave_common::types::Timestamptz; use super::OpendalSource; use crate::error::ConnectorResult; +use crate::source::filesystem::file_common::CompressionFormat; use crate::source::filesystem::{FsPageItem, OpendalFsSplit}; use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; @@ -33,6 +34,7 @@ pub struct OpendalEnumerator { pub(crate) prefix: Option, pub(crate) matcher: Option, pub(crate) marker: PhantomData, + pub(crate) compression_format: CompressionFormat, } #[async_trait] diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 8d1085a094a12..a37a4999d90e5 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -13,19 +13,22 @@ // limitations under the License. use std::future::IntoFuture; +use std::pin::Pin; +use async_compression::tokio::bufread::GzipDecoder; use async_trait::async_trait; use futures::TryStreamExt; use futures_async_stream::try_stream; use opendal::Operator; use risingwave_common::array::StreamChunk; -use tokio::io::BufReader; +use tokio::io::{AsyncRead, BufReader}; use tokio_util::io::{ReaderStream, StreamReader}; use super::opendal_enumerator::OpendalEnumerator; use super::OpendalSource; use crate::error::ConnectorResult; use crate::parser::ParserConfig; +use crate::source::filesystem::file_common::CompressionFormat; use crate::source::filesystem::nd_streaming::need_nd_streaming; use crate::source::filesystem::{nd_streaming, OpendalFsSplit}; use crate::source::{ @@ -34,6 +37,7 @@ use crate::source::{ }; const STREAM_READER_CAPACITY: usize = 4096; + #[derive(Debug, Clone)] pub struct OpendalReader { connector: OpendalEnumerator, @@ -72,8 +76,13 @@ impl OpendalReader { #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)] async fn into_stream_inner(self) { for split in self.splits { - let data_stream = - Self::stream_read_object(self.connector.op.clone(), split, self.source_ctx.clone()); + let data_stream = Self::stream_read_object( + self.connector.op.clone(), + split, + self.source_ctx.clone(), + self.connector.compression_format.clone(), + ); + let data_stream = if need_nd_streaming(&self.parser_config.specific.encoding_config) { nd_streaming::split_stream(data_stream) } else { @@ -98,6 +107,7 @@ impl OpendalReader { op: Operator, split: OpendalFsSplit, source_ctx: SourceContextRef, + compression_format: CompressionFormat, ) { let actor_id = source_ctx.actor_id.to_string(); let fragment_id = source_ctx.fragment_id.to_string(); @@ -117,7 +127,23 @@ impl OpendalReader { let stream_reader = StreamReader::new( reader.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), ); - let buf_reader = BufReader::new(stream_reader); + + let buf_reader: Pin> = match compression_format { + CompressionFormat::Gzip => { + let gzip_decoder = GzipDecoder::new(stream_reader); + Box::pin(BufReader::new(gzip_decoder)) as Pin> + } + CompressionFormat::None => { + // todo: support automatic decompression of more compression types. + if object_name.ends_with(".gz") || object_name.ends_with(".gzip") { + let gzip_decoder = GzipDecoder::new(stream_reader); + Box::pin(BufReader::new(gzip_decoder)) as Pin> + } else { + Box::pin(BufReader::new(stream_reader)) as Pin> + } + } + }; + let stream = ReaderStream::with_capacity(buf_reader, STREAM_READER_CAPACITY); let mut offset: usize = split.offset; diff --git a/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs b/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs index 3a4fb7fcfeaa7..a7a984da663c3 100644 --- a/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/posix_fs_source.rs @@ -49,11 +49,13 @@ impl OpendalEnumerator { } else { (None, None) }; + let compression_format = posix_fs_properties.compression_format; Ok(Self { op, prefix, matcher, marker: PhantomData, + compression_format, }) } } diff --git a/src/connector/src/source/filesystem/opendal_source/s3_source.rs b/src/connector/src/source/filesystem/opendal_source/s3_source.rs index fd41c44e1f7d6..2eb6b7a292506 100644 --- a/src/connector/src/source/filesystem/opendal_source/s3_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/s3_source.rs @@ -71,7 +71,7 @@ impl OpendalEnumerator { } else { (None, None) }; - + let compression_format = s3_properties.compression_format; let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default()) @@ -82,6 +82,7 @@ impl OpendalEnumerator { prefix, matcher, marker: PhantomData, + compression_format, }) } } diff --git a/src/connector/src/source/filesystem/s3/enumerator.rs b/src/connector/src/source/filesystem/s3/enumerator.rs index 7491cac0df7c6..7a3e749cdc756 100644 --- a/src/connector/src/source/filesystem/s3/enumerator.rs +++ b/src/connector/src/source/filesystem/s3/enumerator.rs @@ -126,6 +126,7 @@ mod tests { } use super::*; + use crate::source::filesystem::file_common::CompressionFormat; use crate::source::filesystem::s3::S3PropertiesCommon; use crate::source::SourceEnumeratorContext; #[tokio::test] @@ -138,6 +139,7 @@ mod tests { access: None, secret: None, endpoint_url: None, + compression_format: CompressionFormat::None, }; let mut enumerator = S3SplitEnumerator::new(props.into(), SourceEnumeratorContext::dummy().into()) diff --git a/src/connector/src/source/filesystem/s3/mod.rs b/src/connector/src/source/filesystem/s3/mod.rs index 175c897d46c83..4e4816683872f 100644 --- a/src/connector/src/source/filesystem/s3/mod.rs +++ b/src/connector/src/source/filesystem/s3/mod.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. pub mod enumerator; - use std::collections::HashMap; pub use enumerator::S3SplitEnumerator; + +use crate::source::filesystem::file_common::CompressionFormat; mod source; use serde::Deserialize; pub use source::S3FileReader; @@ -41,6 +42,8 @@ pub struct S3PropertiesCommon { pub secret: Option, #[serde(rename = "s3.endpoint_url")] pub endpoint_url: Option, + #[serde(rename = "compression_format", default = "Default::default")] + pub compression_format: CompressionFormat, } #[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)] diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index 0340e584d0b8f..9f94dedbfacb5 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -250,6 +250,7 @@ mod tests { CommonParserConfig, CsvProperties, EncodingProperties, ProtocolProperties, SpecificParserConfig, }; + use crate::source::filesystem::file_common::CompressionFormat; use crate::source::filesystem::s3::S3PropertiesCommon; use crate::source::filesystem::S3SplitEnumerator; use crate::source::{ @@ -266,6 +267,7 @@ mod tests { access: None, secret: None, endpoint_url: None, + compression_format: CompressionFormat::None, } .into(); let mut enumerator = diff --git a/src/connector/src/with_options.rs b/src/connector/src/with_options.rs index 3207a7bbbde2f..154586d770522 100644 --- a/src/connector/src/with_options.rs +++ b/src/connector/src/with_options.rs @@ -62,6 +62,7 @@ impl WithOptions for f64 {} impl WithOptions for std::time::Duration {} impl WithOptions for crate::connector_common::mqtt_common::QualityOfService {} impl WithOptions for crate::sink::kafka::CompressionCodec {} +impl WithOptions for crate::source::filesystem::file_common::CompressionFormat {} impl WithOptions for nexmark::config::RateShape {} impl WithOptions for nexmark::event::EventType {} diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 93751bf103959..8d808f526bf88 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -33,6 +33,10 @@ GcsProperties: field_type: String required: false default: Default::default + - name: compression_format + field_type: CompressionFormat + required: false + default: Default::default IcebergProperties: fields: - name: catalog.type @@ -616,6 +620,10 @@ OpendalS3Properties: - name: s3.endpoint_url field_type: String required: false + - name: compression_format + field_type: CompressionFormat + required: false + default: Default::default - name: s3.assume_role field_type: String comments: The following are only supported by `s3_v2` (opendal) source. @@ -632,6 +640,10 @@ PosixFsProperties: comments: The regex pattern to match files under root directory. required: false default: Default::default + - name: compression_format + field_type: CompressionFormat + required: false + default: Default::default PubsubProperties: fields: - name: pubsub.subscription @@ -769,6 +781,10 @@ S3Properties: - name: s3.endpoint_url field_type: String required: false + - name: compression_format + field_type: CompressionFormat + required: false + default: Default::default TestSourceProperties: fields: - name: properties