diff --git a/Cargo.lock b/Cargo.lock index 09b7d4ccd9b9c..114d8404c56b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9907,7 +9907,6 @@ dependencies = [ "assert_matches", "async-recursion", "async-trait", - "bytes", "criterion", "either", "foyer", diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index e0c284a46fde7..019c33253466b 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -20,7 +20,6 @@ arrow-schema = { workspace = true } assert_matches = "1" async-recursion = "1" async-trait = "0.1" -bytes = { version = "1", features = ["serde"] } either = "1" foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs index 1bc0c1e53c109..c46de5b3a2944 100644 --- a/src/batch/src/executor/log_row_seq_scan.rs +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -15,7 +15,6 @@ use std::ops::{Bound, Deref}; use std::sync::Arc; -use bytes::Bytes; use futures::prelude::stream::StreamExt; use futures_async_stream::try_stream; use futures_util::pin_mut; @@ -215,13 +214,8 @@ impl LogRowSeqScanExecutor { }, ), ) - .await?; - - pin_mut!(iter); - loop { - let timer = histogram.as_ref().map(|histogram| histogram.start_timer()); - - let mut iter = iter.as_mut().map(|r| match r { + .await? + .map(|r| match r { Ok((op, value)) => { let (k, row) = value.into_owned_row_key(); // Todo! To avoid create a full row. @@ -230,10 +224,15 @@ impl LogRowSeqScanExecutor { .chain(vec![Some(ScalarImpl::Int16(op.to_i16()))]) .collect_vec(); let row = OwnedRow::new(full_row); - Ok(KeyedRow::::new(k, row)) + Ok(KeyedRow::<_>::new(k, row)) } Err(e) => Err(e), }); + + pin_mut!(iter); + loop { + let timer = histogram.as_ref().map(|histogram| histogram.start_timer()); + let chunk = collect_data_chunk(&mut iter, &schema, Some(chunk_size)) .await .map_err(BatchError::from)?; diff --git a/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs index c81cb6dd07967..93132ce65e51c 100644 --- a/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs @@ -27,7 +27,6 @@ use crate::optimizer::plan_node::ToLocalBatch; use crate::optimizer::property::{Distribution, DistributionDisplay, Order}; use crate::scheduler::SchedulerResult; -/// `BatchLogSeqScan` implements [`super::LogicalLogScan`] to scan from a row-oriented table #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchLogSeqScan { pub base: PlanBase,