From c4b2024607d4d0e2d6bc6798a1c5d3bfe704d4b8 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 8 Jul 2024 20:25:07 +0800 Subject: [PATCH] resolve conflict --- Cargo.lock | 36 +---- src/connector/Cargo.toml | 2 +- src/connector/src/parser/parquet_parser.rs | 43 +++--- .../opendal_source/opendal_reader.rs | 129 ++++++++++++++---- 4 files changed, 127 insertions(+), 83 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 16e835d078f17..ab516c676af7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8914,40 +8914,6 @@ dependencies = [ "zstd 0.13.0", ] -[[package]] -name = "parquet" -version = "50.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "547b92ebf0c1177e3892f44c8f79757ee62e678d564a9834189725f2c5b7a750" -dependencies = [ - "ahash 0.8.11", - "arrow-array 50.0.0", - "arrow-buffer 50.0.0", - "arrow-cast 50.0.0", - "arrow-data 50.0.0", - "arrow-ipc 50.0.0", - "arrow-schema 50.0.0", - "arrow-select 50.0.0", - "base64 0.21.7", - "brotli 3.5.0", - "bytes", - "chrono", - "flate2", - "futures", - "half 2.3.1", - "hashbrown 0.14.3", - "lz4_flex", - "num", - "num-bigint", - "paste", - "seq-macro", - "snap", - "thrift", - "tokio", - "twox-hash", - "zstd 0.13.0", -] - [[package]] name = "parquet" version = "52.0.0" @@ -11205,7 +11171,7 @@ dependencies = [ "opendal", "openssl", "parking_lot 0.12.1", - "parquet 50.0.0", + "parquet 52.0.0", "paste", "pg_bigdecimal", "postgres-openssl", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 9ae4ee7f13022..006b75c300144 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -93,7 +93,7 @@ opendal = { version = "0.47", features = [ ] } openssl = "0.10" parking_lot = { workspace = true } -parquet = { workspace = true, features = ["async"] } +parquet = { version = "52", features = ["async"] } paste = "1" pg_bigdecimal = { git = "https://github.com/risingwavelabs/rust-pg_bigdecimal", rev = "0b7893d88894ca082b4525f94f812da034486f7c" } postgres-openssl = "0.5.0" diff --git a/src/connector/src/parser/parquet_parser.rs b/src/connector/src/parser/parquet_parser.rs index d68162c9f1cd3..de488bad409ef 100644 --- a/src/connector/src/parser/parquet_parser.rs +++ b/src/connector/src/parser/parquet_parser.rs @@ -13,39 +13,33 @@ // limitations under the License. use std::sync::Arc; -use arrow_array::RecordBatch; +use arrow_array_iceberg::RecordBatch; use futures_async_stream::try_stream; -use opendal::{FuturesAsyncReader, Operator}; +use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk}; use risingwave_common::types::{Datum, ScalarImpl}; use crate::parser::ConnectorResult; -use crate::source::{SourceColumnDesc, SourceContextRef}; +use crate::source::filesystem::opendal_source::opendal_reader::ParquetFileReader; +use crate::source::SourceColumnDesc; /// `ParquetParser` is responsible for converting the incoming `record_batch_stream` /// into a `streamChunk`. #[derive(Debug)] pub struct ParquetParser { rw_columns: Vec, - source_ctx: SourceContextRef, } impl ParquetParser { - pub fn new( - rw_columns: Vec, - source_ctx: SourceContextRef, - ) -> ConnectorResult { - Ok(Self { - rw_columns, - source_ctx, - }) + pub fn new(rw_columns: Vec) -> ConnectorResult { + Ok(Self { rw_columns }) } #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)] pub async fn into_stream( self, record_batch_stream: parquet::arrow::async_reader::ParquetRecordBatchStream< - FuturesAsyncReader, + ParquetFileReader, >, file_name: String, ) { @@ -97,19 +91,23 @@ fn convert_record_batch_to_stream_chunk( crate::source::SourceColumnType::Normal => { match source_column.is_hidden_addition_col { false => { - if let Some(parquet_column) = - record_batch.column_by_name(&source_column.name) - { - let converted_arrow_data_type = - arrow_schema::DataType::try_from(&source_column.data_type)?; + let rw_data_type = source_column.data_type; + let rw_column_name = source_column.name; + if let Some(parquet_column) = record_batch.column_by_name(&rw_column_name) { + let arrow_field = IcebergArrowConvert + .to_arrow_field(&rw_column_name, &rw_data_type)?; + let converted_arrow_data_type: &arrow_schema_iceberg::DataType = + arrow_field.data_type(); - if &converted_arrow_data_type == parquet_column.data_type() { - let column = Arc::new(parquet_column.try_into()?); + if converted_arrow_data_type == parquet_column.data_type() { + let array_impl = IcebergArrowConvert + .array_from_arrow_array(&arrow_field, parquet_column)?; + let column = Arc::new(array_impl); chunk_columns.push(column); } else { // data type mismatch, this column is set to null. let mut array_builder = - ArrayBuilderImpl::with_type(size, source_column.data_type); + ArrayBuilderImpl::with_type(size, rw_data_type); array_builder.append_n_null(record_batch.num_rows()); let res = array_builder.finish(); @@ -118,8 +116,7 @@ fn convert_record_batch_to_stream_chunk( } } else { // For columns defined in the source schema but not present in the Parquet file, null values are filled in. - let mut array_builder = - ArrayBuilderImpl::with_type(size, source_column.data_type); + let mut array_builder = ArrayBuilderImpl::with_type(size, rw_data_type); array_builder.append_n_null(record_batch.num_rows()); let res = array_builder.finish(); diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 0be687084a756..98c44aabb1926 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -13,14 +13,22 @@ // limitations under the License. use std::future::IntoFuture; +use std::ops::Range; use std::pin::Pin; +use std::sync::Arc; use async_compression::tokio::bufread::GzipDecoder; use async_trait::async_trait; +use futures::future::BoxFuture; use futures::TryStreamExt; use futures_async_stream::try_stream; -use opendal::Reader; +use opendal::Operator; +use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::errors::ParquetError; +use parquet::file::footer::{decode_footer, decode_metadata}; +use parquet::file::metadata::ParquetMetaData; +use parquet::file::FOOTER_SIZE; use risingwave_common::array::StreamChunk; use tokio::io::{AsyncRead, BufReader}; use tokio_util::io::{ReaderStream, StreamReader}; @@ -33,8 +41,8 @@ use crate::source::filesystem::file_common::CompressionFormat; use crate::source::filesystem::nd_streaming::need_nd_streaming; use crate::source::filesystem::{nd_streaming, OpendalFsSplit}; use crate::source::{ - into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta, - SplitMetaData, SplitReader, + BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData, + SplitReader, }; const STREAM_READER_CAPACITY: usize = 4096; @@ -45,7 +53,6 @@ pub struct OpendalReader { splits: Vec>, parser_config: ParserConfig, source_ctx: SourceContextRef, - columns: Option>, } #[async_trait] impl SplitReader for OpendalReader { @@ -57,7 +64,7 @@ impl SplitReader for OpendalReader { splits: Vec>, parser_config: ParserConfig, source_ctx: SourceContextRef, - columns: Option>, + _columns: Option>, ) -> ConnectorResult { let connector = Src::new_enumerator(properties)?; let opendal_reader = OpendalReader { @@ -65,7 +72,6 @@ impl SplitReader for OpendalReader { splits, parser_config, source_ctx, - columns, }; Ok(opendal_reader) } @@ -78,26 +84,19 @@ impl SplitReader for OpendalReader { impl OpendalReader { #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)] async fn into_stream_inner(self) { - let actor_id = self.source_ctx.actor_id.to_string(); - let fragment_id = self.source_ctx.fragment_id.to_string(); - let source_id = self.source_ctx.source_id.to_string(); - let source_name = self.source_ctx.source_name.to_string(); - for split in self.splits { let source_ctx = self.source_ctx.clone(); - let split_id = split.id(); - let file_reader = self - .connector - .op - .reader_with(&split.name.clone()) - .await? - .into_futures_async_read(split.offset as u64..) .await?; + let object_name = split.name.clone(); let msg_stream; if let EncodingProperties::Parquet = &self.parser_config.specific.encoding_config { // If the format is "parquet", use `ParquetParser` to convert `record_batch` into stream chunk. + let file_reader = ParquetFileReader { + op: self.connector.op.clone(), + path: split.name.clone(), + }; let record_batch_stream = ParquetRecordBatchStreamBuilder::new(file_reader) .await? @@ -105,14 +104,14 @@ impl OpendalReader { .build()?; let parquet_parser = - ParquetParser::new(self.parser_config.common.rw_columns.clone(), source_ctx)?; - msg_stream = parquet_parser.into_stream(record_batch_stream, split.name.clone()); + ParquetParser::new(self.parser_config.common.rw_columns.clone())?; + msg_stream = parquet_parser.into_stream(record_batch_stream, object_name); } else { let data_stream = Self::stream_read_object( - file_reader, + self.connector.op.clone(), split, self.source_ctx.clone(), - self.connector.compression_format, + self.connector.compression_format.clone(), ); let parser = @@ -134,7 +133,7 @@ impl OpendalReader { #[try_stream(boxed, ok = Vec, error = crate::error::ConnectorError)] pub async fn stream_read_object( - file_reader: Reader, + op: Operator, split: OpendalFsSplit, source_ctx: SourceContextRef, compression_format: CompressionFormat, @@ -146,8 +145,14 @@ impl OpendalReader { let max_chunk_size = source_ctx.source_ctrl_opts.chunk_size; let split_id = split.id(); let object_name = split.name.clone(); + + let reader = op + .read_with(&object_name) + .range(split.offset as u64..) + .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`. + .await?; let stream_reader = StreamReader::new( - file_reader.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), + reader.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), ); let buf_reader: Pin> = match compression_format { @@ -212,3 +217,79 @@ impl OpendalReader { } } } + +/// `ParquetFileReader` is a struct that implements `AsyncFileReader` and is used to read parquet files. +pub struct ParquetFileReader { + op: Operator, + path: String, +} + +impl AsyncFileReader for ParquetFileReader { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + Box::pin(async move { + self.op + .read_with(&self.path) + .range(range.start as u64..range.end as u64) + .await + .map(|data| data.to_bytes()) + .map_err(|e| ParquetError::General(format!("{}", e))) + }) + } + + /// Get the metadata of the parquet file. + fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { + Box::pin(async { + let file_size = self + .op + .stat(&self.path) + .await + .map_err(|e| ParquetError::General(format!("{}", e)))? + .content_length(); + + if file_size < (FOOTER_SIZE as u64) { + return Err(ParquetError::General( + "Invalid Parquet file. Size is smaller than footer".to_string(), + )); + } + + let mut footer: [u8; FOOTER_SIZE] = [0; FOOTER_SIZE]; + { + let footer_buffer = self + .op + .read_with(&self.path) + .range((file_size - (FOOTER_SIZE as u64))..file_size) + .await + .map_err(|e| ParquetError::General(format!("{}", e)))? + .to_bytes(); + + assert_eq!(footer_buffer.len(), FOOTER_SIZE); + footer.copy_from_slice(&footer_buffer); + } + + let metadata_len = decode_footer(&footer)?; + let footer_metadata_len = FOOTER_SIZE + metadata_len; + + if footer_metadata_len > file_size as usize { + return Err(ParquetError::General(format!( + "Invalid Parquet file. Reported metadata length of {} + {} byte footer, but file is only {} bytes", + metadata_len, + FOOTER_SIZE, + file_size + ))); + } + + let start = file_size - footer_metadata_len as u64; + let metadata_bytes = self + .op + .read_with(&self.path) + .range(start..(start + metadata_len as u64)) + .await + .map_err(|e| ParquetError::General(format!("{}", e)))? + .to_bytes(); + Ok(Arc::new(decode_metadata(&metadata_bytes)?)) + }) + } +}