Skip to content

Commit

Permalink
feat: remove memtable request (GreptimeTeam#2307)
Browse files Browse the repository at this point in the history
* refactor: remove scan request from memtable API

* docs: Update comment

---------

Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
2 people authored and paomian committed Oct 19, 2023
1 parent 0358c6d commit 67f97bf
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 19 deletions.
4 changes: 2 additions & 2 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use common_query::Output;
use common_telemetry::{error, info};
use snafu::ResultExt;
use store_api::storage::{RegionId, ScanRequest};
use store_api::storage::RegionId;
use tokio::sync::{mpsc, oneshot};

use crate::access_layer::AccessLayerRef;
Expand Down Expand Up @@ -190,7 +190,7 @@ impl RegionFlushTask {
}

let file_id = FileId::random();
let iter = mem.iter(ScanRequest::default());
let iter = mem.iter(None, &[]);
let source = Source::Iter(iter);
let mut writer = self
.access_layer
Expand Down
11 changes: 7 additions & 4 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;

use common_query::logical_plan::Expr;
use common_time::Timestamp;
use metrics::{decrement_gauge, increment_gauge};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ScanRequest;
use store_api::storage::ColumnId;

use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
Expand Down Expand Up @@ -63,8 +64,10 @@ pub trait Memtable: Send + Sync + fmt::Debug {
/// Write key values into the memtable.
fn write(&self, kvs: &KeyValues) -> Result<()>;

/// Scans the memtable for `req`.
fn iter(&self, req: ScanRequest) -> BoxedBatchIterator;
/// Scans the memtable.
/// `projection` selects columns to read, `None` means reading all columns.
/// `filters` are the predicates to be pushed down to memtable.
fn iter(&self, projection: Option<&[ColumnId]>, filters: &[Expr]) -> BoxedBatchIterator;

/// Returns true if the memtable is empty.
fn is_empty(&self) -> bool;
Expand Down Expand Up @@ -110,7 +113,7 @@ impl Memtable for EmptyMemtable {
Ok(())
}

fn iter(&self, _req: ScanRequest) -> BoxedBatchIterator {
fn iter(&self, _projection: Option<&[ColumnId]>, _filters: &[Expr]) -> BoxedBatchIterator {
Box::new(std::iter::empty())
}

Expand Down
19 changes: 7 additions & 12 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
use std::sync::{Arc, RwLock};

use api::v1::OpType;
use common_query::logical_plan::Expr;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
use datatypes::data_type::DataType;
Expand All @@ -29,7 +30,7 @@ use datatypes::vectors::{
};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, ScanRequest};
use store_api::storage::ColumnId;

use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::memtable::{
Expand Down Expand Up @@ -178,12 +179,9 @@ impl Memtable for TimeSeriesMemtable {
Ok(())
}

fn iter(&self, req: ScanRequest) -> BoxedBatchIterator {
let projection = if let Some(projection) = &req.projection {
projection
.iter()
.map(|idx| self.region_metadata.column_metadatas[*idx].column_id)
.collect()
fn iter(&self, projection: Option<&[ColumnId]>, _filters: &[Expr]) -> BoxedBatchIterator {
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()
} else {
self.region_metadata
.field_columns()
Expand Down Expand Up @@ -883,7 +881,7 @@ mod tests {
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
.collect::<HashSet<_>>();

let iter = memtable.iter(ScanRequest::default());
let iter = memtable.iter(None, &[]);
let read = iter
.flat_map(|batch| {
batch
Expand All @@ -909,10 +907,7 @@ mod tests {
let memtable = TimeSeriesMemtable::new(schema, 42);
memtable.write(&kvs).unwrap();

let iter = memtable.iter(ScanRequest {
projection: Some(vec![3]), // k0, k1, ts, v0, v1, only take v0
..Default::default()
});
let iter = memtable.iter(Some(&[3]), &[]);

let mut v0_all = vec![];

Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl SeqScan {
// Scans all memtables and SSTs. Builds a merge reader to merge results.
let mut builder = MergeReaderBuilder::new();
for mem in &self.memtables {
let iter = mem.iter(self.request.clone());
let iter = mem.iter(Some(self.mapper.column_ids()), &self.request.filters);
builder.push_batch_iter(iter);
}
for file in &self.files {
Expand Down

0 comments on commit 67f97bf

Please sign in to comment.