From 67f97bf11f337bae9f6559388a53ea925a2136d1 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Sun, 3 Sep 2023 13:24:29 +0800 Subject: [PATCH] feat: remove memtable request (#2307) * refactor: remove scan request from memtable API * docs: Update comment --------- Co-authored-by: Yingwen --- src/mito2/src/flush.rs | 4 ++-- src/mito2/src/memtable.rs | 11 +++++++---- src/mito2/src/memtable/time_series.rs | 19 +++++++------------ src/mito2/src/read/seq_scan.rs | 2 +- 4 files changed, 17 insertions(+), 19 deletions(-) diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 349628a20a50..7b9f4abaa218 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use common_query::Output; use common_telemetry::{error, info}; use snafu::ResultExt; -use store_api::storage::{RegionId, ScanRequest}; +use store_api::storage::RegionId; use tokio::sync::{mpsc, oneshot}; use crate::access_layer::AccessLayerRef; @@ -190,7 +190,7 @@ impl RegionFlushTask { } let file_id = FileId::random(); - let iter = mem.iter(ScanRequest::default()); + let iter = mem.iter(None, &[]); let source = Source::Iter(iter); let mut writer = self .access_layer diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 8756829a9f6c..ab5b2054d6f3 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -23,10 +23,11 @@ use std::fmt; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; use std::sync::Arc; +use common_query::logical_plan::Expr; use common_time::Timestamp; use metrics::{decrement_gauge, increment_gauge}; use store_api::metadata::RegionMetadataRef; -use store_api::storage::ScanRequest; +use store_api::storage::ColumnId; use crate::error::Result; use crate::flush::WriteBufferManagerRef; @@ -63,8 +64,10 @@ pub trait Memtable: Send + Sync + fmt::Debug { /// Write key values into the memtable. fn write(&self, kvs: &KeyValues) -> Result<()>; - /// Scans the memtable for `req`. - fn iter(&self, req: ScanRequest) -> BoxedBatchIterator; + /// Scans the memtable. + /// `projection` selects columns to read, `None` means reading all columns. + /// `filters` are the predicates to be pushed down to memtable. + fn iter(&self, projection: Option<&[ColumnId]>, filters: &[Expr]) -> BoxedBatchIterator; /// Returns true if the memtable is empty. fn is_empty(&self) -> bool; @@ -110,7 +113,7 @@ impl Memtable for EmptyMemtable { Ok(()) } - fn iter(&self, _req: ScanRequest) -> BoxedBatchIterator { + fn iter(&self, _projection: Option<&[ColumnId]>, _filters: &[Expr]) -> BoxedBatchIterator { Box::new(std::iter::empty()) } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index b20ffb74fafe..842d89627dd8 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicI64, AtomicU32, Ordering}; use std::sync::{Arc, RwLock}; use api::v1::OpType; +use common_query::logical_plan::Expr; use datatypes::arrow; use datatypes::arrow::array::ArrayRef; use datatypes::data_type::DataType; @@ -29,7 +30,7 @@ use datatypes::vectors::{ }; use snafu::{ensure, ResultExt}; use store_api::metadata::RegionMetadataRef; -use store_api::storage::{ColumnId, ScanRequest}; +use store_api::storage::ColumnId; use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result}; use crate::memtable::{ @@ -178,12 +179,9 @@ impl Memtable for TimeSeriesMemtable { Ok(()) } - fn iter(&self, req: ScanRequest) -> BoxedBatchIterator { - let projection = if let Some(projection) = &req.projection { - projection - .iter() - .map(|idx| self.region_metadata.column_metadatas[*idx].column_id) - .collect() + fn iter(&self, projection: Option<&[ColumnId]>, _filters: &[Expr]) -> BoxedBatchIterator { + let projection = if let Some(projection) = projection { + projection.iter().copied().collect() } else { self.region_metadata .field_columns() @@ -883,7 +881,7 @@ mod tests { .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value()) .collect::>(); - let iter = memtable.iter(ScanRequest::default()); + let iter = memtable.iter(None, &[]); let read = iter .flat_map(|batch| { batch @@ -909,10 +907,7 @@ mod tests { let memtable = TimeSeriesMemtable::new(schema, 42); memtable.write(&kvs).unwrap(); - let iter = memtable.iter(ScanRequest { - projection: Some(vec![3]), // k0, k1, ts, v0, v1, only take v0 - ..Default::default() - }); + let iter = memtable.iter(Some(&[3]), &[]); let mut v0_all = vec![]; diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 67168a63b082..0b7d9e23ed93 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -107,7 +107,7 @@ impl SeqScan { // Scans all memtables and SSTs. Builds a merge reader to merge results. let mut builder = MergeReaderBuilder::new(); for mem in &self.memtables { - let iter = mem.iter(self.request.clone()); + let iter = mem.iter(Some(self.mapper.column_ids()), &self.request.filters); builder.push_batch_iter(iter); } for file in &self.files {