Skip to content

Commit

Permalink
chore: revert opendal 0.46
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jun 3, 2024
1 parent 4e5dd1e commit 7e89345
Show file tree
Hide file tree
Showing 31 changed files with 440 additions and 628 deletions.
234 changes: 74 additions & 160 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ raft-engine = { version = "0.4.1", default-features = false }
rand = "0.8"
regex = "1.8"
regex-automata = { version = "0.4" }
reqwest = { version = "0.12", default-features = false, features = [
reqwest = { version = "0.11", default-features = false, features = [
"json",
"rustls-tls-native-roots",
"stream",
Expand Down
38 changes: 14 additions & 24 deletions src/common/datasource/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,44 +92,34 @@ impl CompressionType {
macro_rules! impl_compression_type {
($(($enum_item:ident, $prefix:ident)),*) => {
paste::item! {
use bytes::{Buf, BufMut, BytesMut};

impl CompressionType {
pub async fn encode<B: Buf>(&self, mut content: B) -> io::Result<Vec<u8>> {
pub async fn encode(&self, content: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
match self {
$(
CompressionType::$enum_item => {
let mut buffer = Vec::with_capacity(content.remaining());
let mut buffer = Vec::with_capacity(content.as_ref().len());
let mut encoder = write::[<$prefix Encoder>]::new(&mut buffer);
encoder.write_all_buf(&mut content).await?;
encoder.write_all(content.as_ref()).await?;
encoder.shutdown().await?;
Ok(buffer)
}
)*
CompressionType::Uncompressed => {
let mut bs = BytesMut::with_capacity(content.remaining());
bs.put(content);
Ok(bs.to_vec())
},
CompressionType::Uncompressed => Ok(content.as_ref().to_vec()),
}
}

pub async fn decode<B: Buf>(&self, mut content: B) -> io::Result<Vec<u8>> {
pub async fn decode(&self, content: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
match self {
$(
CompressionType::$enum_item => {
let mut buffer = Vec::with_capacity(content.remaining() * 2);
let mut buffer = Vec::with_capacity(content.as_ref().len() * 2);
let mut encoder = write::[<$prefix Decoder>]::new(&mut buffer);
encoder.write_all_buf(&mut content).await?;
encoder.write_all(content.as_ref()).await?;
encoder.shutdown().await?;
Ok(buffer)
}
)*
CompressionType::Uncompressed => {
let mut bs = BytesMut::with_capacity(content.remaining());
bs.put(content);
Ok(bs.to_vec())
},
CompressionType::Uncompressed => Ok(content.as_ref().to_vec()),
}
}

Expand Down Expand Up @@ -161,27 +151,27 @@ macro_rules! impl_compression_type {
$(
#[tokio::test]
async fn [<test_ $enum_item:lower _compression>]() {
let string = "foo_bar".as_bytes();
let string = "foo_bar".as_bytes().to_vec();
let compress = CompressionType::$enum_item
.encode(string)
.encode(&string)
.await
.unwrap();
let decompress = CompressionType::$enum_item
.decode(compress.as_slice())
.decode(&compress)
.await
.unwrap();
assert_eq!(decompress, string);
})*

#[tokio::test]
async fn test_uncompression() {
let string = "foo_bar".as_bytes();
let string = "foo_bar".as_bytes().to_vec();
let compress = CompressionType::Uncompressed
.encode(string)
.encode(&string)
.await
.unwrap();
let decompress = CompressionType::Uncompressed
.decode(compress.as_slice())
.decode(&compress)
.await
.unwrap();
assert_eq!(decompress, string);
Expand Down
5 changes: 1 addition & 4 deletions src/common/datasource/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncWriteCompatExt;

use self::csv::CsvFormat;
use self::json::JsonFormat;
Expand Down Expand Up @@ -147,8 +146,7 @@ pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
let reader = object_store
.reader(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?
.into_bytes_stream(..);
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let mut upstream = compression_type.convert_stream(reader).fuse();

Expand Down Expand Up @@ -205,7 +203,6 @@ pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
.writer_with(&path)
.concurrent(concurrency)
.await
.map(|v| v.into_futures_async_write().compat_write())
.context(error::WriteObjectSnafu { path })
});

Expand Down
9 changes: 1 addition & 8 deletions src/common/datasource/src/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use derive_builder::Builder;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::io::SyncIoBridge;

use super::stream_to_file;
Expand Down Expand Up @@ -165,16 +164,10 @@ impl FileOpener for CsvOpener {
#[async_trait]
impl FileFormat for CsvFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();
.context(error::ReadObjectSnafu { path })?;

let decoded = self.compression_type.convert_async_read(reader);

Expand Down
9 changes: 1 addition & 8 deletions src/common/datasource/src/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::SendableRecordBatchStream;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::io::SyncIoBridge;

use super::stream_to_file;
Expand Down Expand Up @@ -83,16 +82,10 @@ impl Default for JsonFormat {
#[async_trait]
impl FileFormat for JsonFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();
.context(error::ReadObjectSnafu { path })?;

let decoded = self.compression_type.convert_async_read(reader);

Expand Down
71 changes: 14 additions & 57 deletions src/common/datasource/src/file_format/orc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,83 +16,48 @@ use std::sync::Arc;

use arrow_schema::{ArrowError, Schema, SchemaRef};
use async_trait::async_trait;
use bytes::Bytes;
use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::error::{DataFusionError, Result as DfResult};
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
use futures::{StreamExt, TryStreamExt};
use object_store::ObjectStore;
use orc_rust::arrow_reader::ArrowReaderBuilder;
use orc_rust::async_arrow_reader::ArrowStreamReader;
use orc_rust::reader::AsyncChunkReader;
use snafu::ResultExt;
use tokio::io::{AsyncRead, AsyncSeek};

use crate::error::{self, Result};
use crate::file_format::FileFormat;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct OrcFormat;

#[derive(Clone)]
pub struct ReaderAdapter {
reader: object_store::Reader,
len: u64,
}

impl ReaderAdapter {
pub fn new(reader: object_store::Reader, len: u64) -> Self {
Self { reader, len }
}
}

impl AsyncChunkReader for ReaderAdapter {
fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
async move { Ok(self.len) }.boxed()
}

fn get_bytes(
&mut self,
offset_from_start: u64,
length: u64,
) -> BoxFuture<'_, std::io::Result<Bytes>> {
async move {
let bytes = self
.reader
.read(offset_from_start..offset_from_start + length)
.await?;
Ok(bytes.to_bytes())
}
.boxed()
}
}

pub async fn new_orc_stream_reader(
reader: ReaderAdapter,
) -> Result<ArrowStreamReader<ReaderAdapter>> {
pub async fn new_orc_stream_reader<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<ArrowStreamReader<R>> {
let reader_build = ArrowReaderBuilder::try_new_async(reader)
.await
.context(error::OrcReaderSnafu)?;
Ok(reader_build.build_async())
}

pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result<Schema> {
pub async fn infer_orc_schema<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<Schema> {
let reader = new_orc_stream_reader(reader).await?;
Ok(reader.schema().as_ref().clone())
}

#[async_trait]
impl FileFormat for OrcFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length())).await?;

let schema = infer_orc_schema(reader).await?;

Ok(schema)
}
}
Expand Down Expand Up @@ -132,23 +97,15 @@ impl FileOpener for OrcOpener {
};
let projection = self.projection.clone();
Ok(Box::pin(async move {
let path = meta.location().to_string();

let meta = object_store
.stat(&path)
let reader = object_store
.reader(meta.location().to_string().as_str())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let reader = object_store
.reader(&path)
let stream_reader = new_orc_stream_reader(reader)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let stream_reader =
new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let stream =
RecordBatchStreamTypeAdapter::new(projected_schema, stream_reader, projection);

Expand Down
29 changes: 7 additions & 22 deletions src/common/datasource/src/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::future::BoxFuture;
use futures::StreamExt;
use object_store::{FuturesAsyncReader, ObjectStore};
use object_store::{ObjectStore, Reader, Writer};
use parquet::basic::{Compression, ZstdLevel};
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};

use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder, LazyBufferedWriter};
use crate::error::{self, Result};
Expand All @@ -46,16 +45,10 @@ pub struct ParquetFormat {}
#[async_trait]
impl FileFormat for ParquetFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let mut reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();
.context(error::ReadObjectSnafu { path })?;

let metadata = reader
.get_metadata()
Expand Down Expand Up @@ -105,7 +98,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {

pub struct LazyParquetFileReader {
object_store: ObjectStore,
reader: Option<Compat<FuturesAsyncReader>>,
reader: Option<Reader>,
path: String,
}

Expand All @@ -121,13 +114,7 @@ impl LazyParquetFileReader {
/// Must initialize the reader, or throw an error from the future.
async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
if self.reader.is_none() {
let meta = self.object_store.stat(&self.path).await?;
let reader = self
.object_store
.reader(&self.path)
.await?
.into_futures_async_read(0..meta.content_length())
.compat();
let reader = self.object_store.reader(&self.path).await?;
self.reader = Some(reader);
}

Expand Down Expand Up @@ -180,25 +167,23 @@ pub struct BufferedWriter {
}

type InnerBufferedWriter = LazyBufferedWriter<
Compat<object_store::FuturesAsyncWriter>,
object_store::Writer,
ArrowWriter<SharedBuffer>,
impl Fn(String) -> BoxFuture<'static, Result<Compat<object_store::FuturesAsyncWriter>>>,
impl Fn(String) -> BoxFuture<'static, Result<Writer>>,
>;

impl BufferedWriter {
fn make_write_factory(
store: ObjectStore,
concurrency: usize,
) -> impl Fn(String) -> BoxFuture<'static, Result<Compat<object_store::FuturesAsyncWriter>>>
{
) -> impl Fn(String) -> BoxFuture<'static, Result<Writer>> {
move |path| {
let store = store.clone();
Box::pin(async move {
store
.writer_with(&path)
.concurrent(concurrency)
.await
.map(|v| v.into_futures_async_write().compat_write())
.context(error::WriteObjectSnafu { path })
})
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/datasource/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi

let written = tmp_store.read(&output_path).await.unwrap();
let origin = store.read(origin_path).await.unwrap();
assert_eq_lines(written.to_vec(), origin.to_vec());
assert_eq_lines(written, origin);
}

pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usize) -> usize) {
Expand Down Expand Up @@ -158,7 +158,7 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz

let written = tmp_store.read(&output_path).await.unwrap();
let origin = store.read(origin_path).await.unwrap();
assert_eq_lines(written.to_vec(), origin.to_vec());
assert_eq_lines(written, origin);
}

// Ignore the CRLF difference across operating systems.
Expand Down
Loading

0 comments on commit 7e89345

Please sign in to comment.