Skip to content

Commit

Permalink
fix: some clippy warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Oct 8, 2023
1 parent f718ff0 commit e19718e
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 23 deletions.
26 changes: 12 additions & 14 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,8 @@ impl SeriesSet {

/// Iterates all series in [SeriesSet].
fn iter_series(&self, projection: HashSet<ColumnId>, predicate: Option<Predicate>) -> Iter {
// let predicate =
// Predicate::try_new(filters.to_vec(), self.region_metadata.schema.clone()).unwrap();

let (primary_key_builders, primary_key_schema) =
primary_key_builders(&self.region_metadata, 1); // TODO(hl): we prune one primary key per range.
primary_key_builders(&self.region_metadata, 1);

Iter {
metadata: self.region_metadata.clone(),
Expand All @@ -314,18 +311,19 @@ impl SeriesSet {
}
}

/// Creates primary key array builders and arrow's schema for primary keys of given region schema.
fn primary_key_builders(
region_metadata: &RegionMetadataRef,
expected_len: usize,
num_pk_rows: usize,
) -> (Vec<Box<dyn MutableVector>>, arrow::datatypes::SchemaRef) {
let (builders, fields): (_, Vec<_>) = region_metadata
.primary_key_columns()
.map(|pk| {
(
pk.column_schema
.data_type
.create_mutable_vector(expected_len),
datatypes::arrow::datatypes::Field::new(
.create_mutable_vector(num_pk_rows),
arrow::datatypes::Field::new(
pk.column_schema.name.clone(),
pk.column_schema.data_type.as_arrow_type(),
pk.column_schema.is_nullable(),
Expand All @@ -352,19 +350,19 @@ impl Iterator for Iter {

fn next(&mut self) -> Option<Self::Item> {
let map = self.series.read().unwrap();
let mut range = match &self.last_key {
let range = match &self.last_key {
None => map.range::<Vec<u8>, _>(..),
Some(last_key) => {
map.range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded))
}
};

// TODO(hl): maybe yield more than one time series to amortize range overhead.
while let Some((primary_key, series)) = range.next() {
for (primary_key, series) in range {
if let Some(predicate) = &self.predicate {
if !prune_primary_key(
&self.codec,
primary_key,
primary_key.as_slice(),
&mut self.primary_key_builders,
self.pk_schema.clone(),
predicate,
Expand All @@ -379,18 +377,18 @@ impl Iterator for Iter {
values.and_then(|v| v.to_batch(primary_key, &self.metadata, &self.projection)),
);
}
return None;
None
}
}

fn prune_primary_key(
codec: &Arc<McmpRowCodec>,
pk: &Vec<u8>,
pk: &[u8],
builders: &mut Vec<Box<dyn MutableVector>>,
pk_schema: arrow::datatypes::SchemaRef,
predicate: &Predicate,
) -> bool {
let Ok(pk_record_batch) = pk_to_record_batch(&codec, pk, builders, pk_schema) else {
let Ok(pk_record_batch) = pk_to_record_batch(codec, pk, builders, pk_schema) else {
return true;
};

Expand All @@ -404,7 +402,7 @@ fn prune_primary_key(

fn pk_to_record_batch(
codec: &Arc<McmpRowCodec>,
bytes: &Vec<u8>,
bytes: &[u8],
builders: &mut Vec<Box<dyn MutableVector>>,
pk_schema: arrow::datatypes::SchemaRef,
) -> error::Result<RecordBatch> {
Expand Down
1 change: 0 additions & 1 deletion src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ impl SeqScan {
// Scans all memtables and SSTs. Builds a merge reader to merge results.
let mut builder = MergeReaderBuilder::new();
for mem in &self.memtables {
// TODO(hl): pass filters once memtable supports filter pushdown.
let iter = mem.iter(Some(self.mapper.column_ids()), self.predicate.clone());
builder.push_batch_iter(iter);
}
Expand Down
1 change: 0 additions & 1 deletion src/mito2/src/test_util/memtable_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use common_query::logical_plan::Expr;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
Expand Down
15 changes: 8 additions & 7 deletions src/table/src/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use common_query::logical_plan::{DfExpr, Expr};
use common_telemetry::{error, warn};
use common_telemetry::{debug, error, warn};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
Expand Down Expand Up @@ -123,9 +123,6 @@ impl Predicate {

/// Prunes primary keys
pub fn prune_primary_key(&self, primary_key: &RecordBatch) -> error::Result<bool> {
// we only expect one row in primary_key.
assert_eq!(1, primary_key.num_rows());

for expr in &self.exprs {
// evaluate every filter against primary key
let Ok(eva) = expr.evaluate(primary_key) else {
Expand All @@ -142,15 +139,19 @@ impl Predicate {
}
// result was a column
ColumnarValue::Scalar(ScalarValue::Boolean(v)) => v.unwrap_or(true),
other => {
unreachable!() // TODO(hl): avoid panic here.
_ => {
unreachable!()
}
};
debug!(
"Evaluate primary key {:?} against filter: {:?}, result: {:?}",
primary_key, expr, result
);
if !result {
return Ok(false);
}
}
return Ok(true);
Ok(true)
}

/// Evaluates the predicate against the `stats`.
Expand Down

0 comments on commit e19718e

Please sign in to comment.