Skip to content

Commit

Permalink
feat(batch): support batch s3 parquet file executor (#17606)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Jul 9, 2024
1 parent 5cd5ccd commit 607a2af
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 36 deletions.
34 changes: 1 addition & 33 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ deltalake = { git = "https://github.com/risingwavelabs/delta-rs", rev = "5c2dccd
itertools = "0.12.0"
jsonbb = "0.1.4"
lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" }
parquet = "50"
parquet = { version = "52", features = ["async"] }
thiserror-ext = "0.1.2"
tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" }
tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [
Expand Down
1 change: 1 addition & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ itertools = { workspace = true }
memcomparable = "0.2"
opendal = "0.47"
parking_lot = { workspace = true }
parquet = { workspace = true }
paste = "1"
prometheus = { version = "0.13", features = ["process"] }
prost = "0.12"
Expand Down
8 changes: 8 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use std::sync::Arc;

pub use anyhow::anyhow;
use parquet::errors::ParquetError;
use risingwave_common::array::ArrayError;
use risingwave_common::error::BoxedError;
use risingwave_common::util::value_encoding::error::ValueEncodingError;
Expand Down Expand Up @@ -119,6 +120,13 @@ pub enum BatchError {
iceberg::Error,
),

#[error(transparent)]
Parquet(
#[from]
#[backtrace]
ParquetError,
),

// Make the ref-counted type to be a variant for easier code structuring.
// TODO(error-handling): replace with `thiserror_ext::Arc`
#[error(transparent)]
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod order_by;
mod project;
mod project_set;
mod row_seq_scan;
mod s3_file_scan;
mod sort_agg;
mod sort_over_window;
mod source;
Expand Down
163 changes: 163 additions & 0 deletions src/batch/src/executor/s3_file_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Range;
use std::sync::Arc;

use anyhow::anyhow;
use bytes::Bytes;
use futures_async_stream::try_stream;
use futures_util::future::BoxFuture;
use futures_util::stream::StreamExt;
use futures_util::TryFutureExt;
use hashbrown::HashMap;
use iceberg::io::{
FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY,
};
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::ParquetMetaData;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::catalog::Schema;

use crate::error::BatchError;
use crate::executor::{DataChunk, Executor};

#[derive(PartialEq, Debug)]
pub enum FileFormat {
Parquet,
}

/// S3 file scan executor. Currently only support parquet file format.
pub struct S3FileScanExecutor {
file_format: FileFormat,
location: String,
s3_region: String,
s3_access_key: String,
s3_secret_key: String,
batch_size: usize,
schema: Schema,
identity: String,
}

impl Executor for S3FileScanExecutor {
fn schema(&self) -> &risingwave_common::catalog::Schema {
&self.schema
}

fn identity(&self) -> &str {
&self.identity
}

fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
self.do_execute().boxed()
}
}

impl S3FileScanExecutor {
#![expect(dead_code)]
pub fn new(
file_format: FileFormat,
location: String,
s3_region: String,
s3_access_key: String,
s3_secret_key: String,
batch_size: usize,
schema: Schema,
identity: String,
) -> Self {
Self {
file_format,
location,
s3_region,
s3_access_key,
s3_secret_key,
batch_size,
schema,
identity,
}
}

#[try_stream(ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
assert_eq!(self.file_format, FileFormat::Parquet);

let mut props = HashMap::new();
props.insert(S3_REGION, self.s3_region.clone());
props.insert(S3_ACCESS_KEY_ID, self.s3_access_key.clone());
props.insert(S3_SECRET_ACCESS_KEY, self.s3_secret_key.clone());

let file_io_builder = FileIOBuilder::new("s3");
let file_io = file_io_builder.with_props(props.into_iter()).build()?;
let parquet_file = file_io.new_input(&self.location)?;

let parquet_metadata = parquet_file.metadata().await?;
let parquet_reader = parquet_file.reader().await?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
.await
.map_err(|e| anyhow!(e))?;

let arrow_schema = batch_stream_builder.schema();
assert_eq!(arrow_schema.fields.len(), self.schema.fields.len());
for (field, arrow_field) in self.schema.fields.iter().zip(arrow_schema.fields.iter()) {
assert_eq!(*field.name, *arrow_field.name());
}

batch_stream_builder = batch_stream_builder.with_projection(ProjectionMask::all());

batch_stream_builder = batch_stream_builder.with_batch_size(self.batch_size);

let record_batch_stream = batch_stream_builder.build().map_err(|e| anyhow!(e))?;

#[for_await]
for record_batch in record_batch_stream {
let record_batch = record_batch.map_err(BatchError::Parquet)?;
let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
debug_assert_eq!(chunk.data_types(), self.schema.data_types());
yield chunk;
}
}
}

struct ArrowFileReader<R: FileRead> {
meta: FileMetadata,
r: R,
}

impl<R: FileRead> ArrowFileReader<R> {
fn new(meta: FileMetadata, r: R) -> Self {
Self { meta, r }
}
}

impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
Box::pin(
self.r
.read(range.start as _..range.end as _)
.map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
)
}

fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let file_size = self.meta.size;
let mut loader = MetadataLoader::load(self, file_size as usize, None).await?;
loader.load_page_index(false, false).await?;
Ok(Arc::new(loader.finish()))
})
}
}
1 change: 0 additions & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ opendal = { version = "0.47", features = [
] }
openssl = "0.10"
parking_lot = { workspace = true }
parquet = { workspace = true }
paste = "1"
pg_bigdecimal = { git = "https://github.com/risingwavelabs/rust-pg_bigdecimal", rev = "0b7893d88894ca082b4525f94f812da034486f7c" }
postgres-openssl = "0.5.0"
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub mod opendal_reader;
use self::opendal_enumerator::OpendalEnumerator;
use self::opendal_reader::OpendalReader;
use super::file_common::CompressionFormat;
use super::s3::S3PropertiesCommon;
pub use super::s3::S3PropertiesCommon;
use super::OpendalFsSplit;
use crate::error::ConnectorResult;
use crate::source::{SourceProperties, UnknownFields};
Expand Down

0 comments on commit 607a2af

Please sign in to comment.