Skip to content

Commit

Permalink
resolve conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Jul 8, 2024
1 parent d71ee2b commit c4b2024
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 83 deletions.
36 changes: 1 addition & 35 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ opendal = { version = "0.47", features = [
] }
openssl = "0.10"
parking_lot = { workspace = true }
parquet = { workspace = true, features = ["async"] }
parquet = { version = "52", features = ["async"] }
paste = "1"
pg_bigdecimal = { git = "https://github.com/risingwavelabs/rust-pg_bigdecimal", rev = "0b7893d88894ca082b4525f94f812da034486f7c" }
postgres-openssl = "0.5.0"
Expand Down
43 changes: 20 additions & 23 deletions src/connector/src/parser/parquet_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,33 @@
// limitations under the License.
use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_array_iceberg::RecordBatch;
use futures_async_stream::try_stream;
use opendal::{FuturesAsyncReader, Operator};
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk};
use risingwave_common::types::{Datum, ScalarImpl};

use crate::parser::ConnectorResult;
use crate::source::{SourceColumnDesc, SourceContextRef};
use crate::source::filesystem::opendal_source::opendal_reader::ParquetFileReader;
use crate::source::SourceColumnDesc;

/// `ParquetParser` is responsible for converting the incoming `record_batch_stream`
/// into a `streamChunk`.
#[derive(Debug)]
pub struct ParquetParser {
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
}

impl ParquetParser {
pub fn new(
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
) -> ConnectorResult<Self> {
Ok(Self {
rw_columns,
source_ctx,
})
pub fn new(rw_columns: Vec<SourceColumnDesc>) -> ConnectorResult<Self> {
Ok(Self { rw_columns })
}

#[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
pub async fn into_stream(
self,
record_batch_stream: parquet::arrow::async_reader::ParquetRecordBatchStream<
FuturesAsyncReader,
ParquetFileReader,
>,
file_name: String,
) {
Expand Down Expand Up @@ -97,19 +91,23 @@ fn convert_record_batch_to_stream_chunk(
crate::source::SourceColumnType::Normal => {
match source_column.is_hidden_addition_col {
false => {
if let Some(parquet_column) =
record_batch.column_by_name(&source_column.name)
{
let converted_arrow_data_type =
arrow_schema::DataType::try_from(&source_column.data_type)?;
let rw_data_type = source_column.data_type;
let rw_column_name = source_column.name;
if let Some(parquet_column) = record_batch.column_by_name(&rw_column_name) {
let arrow_field = IcebergArrowConvert
.to_arrow_field(&rw_column_name, &rw_data_type)?;
let converted_arrow_data_type: &arrow_schema_iceberg::DataType =
arrow_field.data_type();

if &converted_arrow_data_type == parquet_column.data_type() {
let column = Arc::new(parquet_column.try_into()?);
if converted_arrow_data_type == parquet_column.data_type() {
let array_impl = IcebergArrowConvert
.array_from_arrow_array(&arrow_field, parquet_column)?;
let column = Arc::new(array_impl);
chunk_columns.push(column);
} else {
// data type mismatch, this column is set to null.
let mut array_builder =
ArrayBuilderImpl::with_type(size, source_column.data_type);
ArrayBuilderImpl::with_type(size, rw_data_type);

array_builder.append_n_null(record_batch.num_rows());
let res = array_builder.finish();
Expand All @@ -118,8 +116,7 @@ fn convert_record_batch_to_stream_chunk(
}
} else {
// For columns defined in the source schema but not present in the Parquet file, null values are filled in.
let mut array_builder =
ArrayBuilderImpl::with_type(size, source_column.data_type);
let mut array_builder = ArrayBuilderImpl::with_type(size, rw_data_type);

array_builder.append_n_null(record_batch.num_rows());
let res = array_builder.finish();
Expand Down
129 changes: 105 additions & 24 deletions src/connector/src/source/filesystem/opendal_source/opendal_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,22 @@
// limitations under the License.

use std::future::IntoFuture;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;

use async_compression::tokio::bufread::GzipDecoder;
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::TryStreamExt;
use futures_async_stream::try_stream;
use opendal::Reader;
use opendal::Operator;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::errors::ParquetError;
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::FOOTER_SIZE;
use risingwave_common::array::StreamChunk;
use tokio::io::{AsyncRead, BufReader};
use tokio_util::io::{ReaderStream, StreamReader};
Expand All @@ -33,8 +41,8 @@ use crate::source::filesystem::file_common::CompressionFormat;
use crate::source::filesystem::nd_streaming::need_nd_streaming;
use crate::source::filesystem::{nd_streaming, OpendalFsSplit};
use crate::source::{
into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta,
SplitMetaData, SplitReader,
BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData,
SplitReader,
};

const STREAM_READER_CAPACITY: usize = 4096;
Expand All @@ -45,7 +53,6 @@ pub struct OpendalReader<Src: OpendalSource> {
splits: Vec<OpendalFsSplit<Src>>,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
}
#[async_trait]
impl<Src: OpendalSource> SplitReader for OpendalReader<Src> {
Expand All @@ -57,15 +64,14 @@ impl<Src: OpendalSource> SplitReader for OpendalReader<Src> {
splits: Vec<OpendalFsSplit<Src>>,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
_columns: Option<Vec<Column>>,
) -> ConnectorResult<Self> {
let connector = Src::new_enumerator(properties)?;
let opendal_reader = OpendalReader {
connector,
splits,
parser_config,
source_ctx,
columns,
};
Ok(opendal_reader)
}
Expand All @@ -78,41 +84,34 @@ impl<Src: OpendalSource> SplitReader for OpendalReader<Src> {
impl<Src: OpendalSource> OpendalReader<Src> {
#[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
async fn into_stream_inner(self) {
let actor_id = self.source_ctx.actor_id.to_string();
let fragment_id = self.source_ctx.fragment_id.to_string();
let source_id = self.source_ctx.source_id.to_string();
let source_name = self.source_ctx.source_name.to_string();

for split in self.splits {
let source_ctx = self.source_ctx.clone();
let split_id = split.id();
let file_reader = self
.connector
.op
.reader_with(&split.name.clone())
.await?
.into_futures_async_read(split.offset as u64..) .await?;

let object_name = split.name.clone();

let msg_stream;

if let EncodingProperties::Parquet = &self.parser_config.specific.encoding_config {
// If the format is "parquet", use `ParquetParser` to convert `record_batch` into stream chunk.
let file_reader = ParquetFileReader {
op: self.connector.op.clone(),
path: split.name.clone(),
};

let record_batch_stream = ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.with_batch_size(self.source_ctx.source_ctrl_opts.chunk_size)
.build()?;

let parquet_parser =
ParquetParser::new(self.parser_config.common.rw_columns.clone(), source_ctx)?;
msg_stream = parquet_parser.into_stream(record_batch_stream, split.name.clone());
ParquetParser::new(self.parser_config.common.rw_columns.clone())?;
msg_stream = parquet_parser.into_stream(record_batch_stream, object_name);
} else {
let data_stream = Self::stream_read_object(
file_reader,
self.connector.op.clone(),
split,
self.source_ctx.clone(),
self.connector.compression_format,
self.connector.compression_format.clone(),
);

let parser =
Expand All @@ -134,7 +133,7 @@ impl<Src: OpendalSource> OpendalReader<Src> {

#[try_stream(boxed, ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
pub async fn stream_read_object(
file_reader: Reader,
op: Operator,
split: OpendalFsSplit<Src>,
source_ctx: SourceContextRef,
compression_format: CompressionFormat,
Expand All @@ -146,8 +145,14 @@ impl<Src: OpendalSource> OpendalReader<Src> {
let max_chunk_size = source_ctx.source_ctrl_opts.chunk_size;
let split_id = split.id();
let object_name = split.name.clone();

let reader = op
.read_with(&object_name)
.range(split.offset as u64..)
.into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`.
.await?;
let stream_reader = StreamReader::new(
file_reader.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
reader.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
);

let buf_reader: Pin<Box<dyn AsyncRead + Send>> = match compression_format {
Expand Down Expand Up @@ -212,3 +217,79 @@ impl<Src: OpendalSource> OpendalReader<Src> {
}
}
}

/// `ParquetFileReader` is a struct that implements `AsyncFileReader` and is used to read parquet files.
pub struct ParquetFileReader {
op: Operator,
path: String,
}

impl AsyncFileReader for ParquetFileReader {
fn get_bytes(
&mut self,
range: Range<usize>,
) -> BoxFuture<'_, parquet::errors::Result<bytes::Bytes>> {
Box::pin(async move {
self.op
.read_with(&self.path)
.range(range.start as u64..range.end as u64)
.await
.map(|data| data.to_bytes())
.map_err(|e| ParquetError::General(format!("{}", e)))
})
}

/// Get the metadata of the parquet file.
fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
Box::pin(async {
let file_size = self
.op
.stat(&self.path)
.await
.map_err(|e| ParquetError::General(format!("{}", e)))?
.content_length();

if file_size < (FOOTER_SIZE as u64) {
return Err(ParquetError::General(
"Invalid Parquet file. Size is smaller than footer".to_string(),
));
}

let mut footer: [u8; FOOTER_SIZE] = [0; FOOTER_SIZE];
{
let footer_buffer = self
.op
.read_with(&self.path)
.range((file_size - (FOOTER_SIZE as u64))..file_size)
.await
.map_err(|e| ParquetError::General(format!("{}", e)))?
.to_bytes();

assert_eq!(footer_buffer.len(), FOOTER_SIZE);
footer.copy_from_slice(&footer_buffer);
}

let metadata_len = decode_footer(&footer)?;
let footer_metadata_len = FOOTER_SIZE + metadata_len;

if footer_metadata_len > file_size as usize {
return Err(ParquetError::General(format!(
"Invalid Parquet file. Reported metadata length of {} + {} byte footer, but file is only {} bytes",
metadata_len,
FOOTER_SIZE,
file_size
)));
}

let start = file_size - footer_metadata_len as u64;
let metadata_bytes = self
.op
.read_with(&self.path)
.range(start..(start + metadata_len as u64))
.await
.map_err(|e| ParquetError::General(format!("{}", e)))?
.to_bytes();
Ok(Arc::new(decode_metadata(&metadata_bytes)?))
})
}
}

0 comments on commit c4b2024

Please sign in to comment.