Skip to content

Commit

Permalink
feat: iter wip
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Feb 6, 2024
1 parent 92d90b8 commit c5333e3
Showing 1 changed file with 73 additions and 6 deletions.
79 changes: 73 additions & 6 deletions src/mito2/src/memtable/merge_tree/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ use table::predicate::Predicate;
use crate::error::{PrimaryKeyLengthMismatchSnafu, Result};
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::data::DataBuffer;
use crate::memtable::merge_tree::index::{IndexConfig, KeyIndex, KeyIndexRef, ShardReader};
use crate::memtable::merge_tree::index::{
IndexConfig, IndexReader, KeyIndex, KeyIndexRef, ShardReader,
};
use crate::memtable::merge_tree::mutable::WriteMetrics;
use crate::memtable::merge_tree::{MergeTreeConfig, PkId};
use crate::memtable::merge_tree::{MergeTreeConfig, PkId, PkIndex};
use crate::memtable::{BoxedBatchIterator, KeyValues};
use crate::read::{Batch, BatchBuilder};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
Expand Down Expand Up @@ -156,8 +158,9 @@ impl MergeTree {

let iter = ShardIter {
metadata: self.metadata.clone(),
index_reader,
projection,
index_reader,
data_reader: DataReader {},
};

todo!()
Expand Down Expand Up @@ -268,29 +271,73 @@ struct TreeParts {

struct ShardIter {
metadata: RegionMetadataRef,
index_reader: Option<ShardReader>,
projection: HashSet<ColumnId>,
index_reader: Option<ShardReader>,
data_reader: DataReader,
}

impl Iterator for ShardIter {
type Item = Result<Batch>;

fn next(&mut self) -> Option<Self::Item> {
unimplemented!()
if !self.data_reader.is_valid() {
return None;
}

self.next_batch().transpose()
}
}

impl ShardIter {
/// Fetches next batch and advances the iter.
fn next_batch(&mut self) -> Result<Option<Batch>> {
let Some(index_reader) = &mut self.index_reader else {
// No primary key to read.
// Safety: `next()` ensures the data reader is valid.
let record_batch = self.data_reader.current_record_batch();
let batch =
Self::convert_record_batch(&self.metadata, &self.projection, &[], record_batch)?;
// Advances the data reader.
self.data_reader.next();
return Ok(Some(batch));
};

// Iterate the index reader until we see the same pk index of the data batch.
while index_reader.is_valid()
&& index_reader.current_pk_index() != self.data_reader.current_pk_index()
{
index_reader.next();
}
assert!(
index_reader.is_valid(),
"Data contains pk_index {} not in the index",
self.data_reader.current_pk_index()
);

let record_batch = self.data_reader.current_record_batch();
let batch = Self::convert_record_batch(
&self.metadata,
&self.projection,
index_reader.current_key(),
record_batch,
)?;
// Advances the data reader.
self.data_reader.next();
Ok(Some(batch))
}

/// Converts [RecordBatch] to [Batch].
fn convert_record_batch(
&self,
metadata: &RegionMetadataRef,
projection: &HashSet<ColumnId>,
primary_key: &[u8],
record_batch: &RecordBatch,
) -> Result<Batch> {
let mut builder = BatchBuilder::new(primary_key.to_vec())
.timestamps_array(Self::record_batch_timestamps(record_batch).clone())?
.sequences_array(Self::record_batch_sequences(record_batch).clone())?
.op_types_array(Self::record_batch_op_types(record_batch).clone())?;

// TODO(yingwen): fields.

unimplemented!()
Expand All @@ -308,3 +355,23 @@ impl ShardIter {
unimplemented!()
}
}

struct DataReader {}

impl DataReader {
fn is_valid(&self) -> bool {
unimplemented!()
}

fn current_pk_index(&self) -> PkIndex {
unimplemented!()
}

fn current_record_batch(&self) -> &RecordBatch {
unimplemented!()
}

fn next(&mut self) {
unimplemented!()
}
}

0 comments on commit c5333e3

Please sign in to comment.