Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Jul 9, 2024
1 parent 451f2f1 commit bdde66c
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/batch/src/executor/s3_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use futures_util::stream::StreamExt;
use parquet::arrow::ProjectionMask;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::catalog::Schema;
use risingwave_connector::source::iceberg::arrow_file_reader::create_parquet_stream_builder;
use risingwave_connector::source::iceberg::parquet_file_reader::create_parquet_stream_builder;

use crate::error::BatchError;
use crate::executor::{DataChunk, Executor};
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ opendal = { version = "0.47", features = [
] }
openssl = "0.10"
parking_lot = { workspace = true }
parquet = { workspace = true }
paste = "1"
pg_bigdecimal = { git = "https://github.com/risingwavelabs/rust-pg_bigdecimal", rev = "0b7893d88894ca082b4525f94f812da034486f7c" }
postgres-openssl = "0.5.0"
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod arrow_file_reader;
pub mod parquet_file_reader;

use std::collections::HashMap;

use anyhow::anyhow;
pub use arrow_file_reader::*;
pub use parquet_file_reader::*;
use async_trait::async_trait;
use futures::StreamExt;
use iceberg::spec::{DataContentType, ManifestList};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::metadata::ParquetMetaData;

pub struct ArrowFileReader<R: FileRead> {
pub struct ParquetFileReader<R: FileRead> {
meta: FileMetadata,
r: R,
}

impl<R: FileRead> ArrowFileReader<R> {
impl<R: FileRead> ParquetFileReader<R> {
pub fn new(meta: FileMetadata, r: R) -> Self {
Self { meta, r }
}
}

impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
impl<R: FileRead> AsyncFileReader for ParquetFileReader<R> {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
Box::pin(
self.r
Expand All @@ -62,7 +62,7 @@ pub async fn create_parquet_stream_builder(
s3_access_key: String,
s3_secret_key: String,
location: String,
) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead>>, anyhow::Error> {
) -> Result<ParquetRecordBatchStreamBuilder<ParquetFileReader<impl FileRead>>, anyhow::Error> {
let mut props = HashMap::new();
props.insert(S3_REGION, s3_region.clone());
props.insert(S3_ACCESS_KEY_ID, s3_access_key.clone());
Expand All @@ -77,9 +77,9 @@ pub async fn create_parquet_stream_builder(

let parquet_metadata = parquet_file.metadata().await.map_err(|e| anyhow!(e))?;
let parquet_reader = parquet_file.reader().await.map_err(|e| anyhow!(e))?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader);

ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
ParquetRecordBatchStreamBuilder::new(parquet_file_reader)
.await
.map_err(|e| anyhow!(e))
}

0 comments on commit bdde66c

Please sign in to comment.