Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): merge reader for mito2 #2210

Merged
merged 25 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9784754
feat: Implement slice and first/last timestamp for Batch
evenyag Aug 16, 2023
719a98f
feat(mito): implements sort/concat for Batch
evenyag Aug 18, 2023
1d834f2
chore: fix typo
evenyag Aug 18, 2023
129bf2d
chore: remove comments
evenyag Aug 18, 2023
0bd62ca
feat: sort and dedup
evenyag Aug 18, 2023
b96d7d9
test: test batch operations
evenyag Aug 19, 2023
056b0a9
chore: cast enum to test op type
evenyag Aug 19, 2023
6a13346
test: test filter related api
evenyag Aug 19, 2023
fe8dd31
sytle: fix clippy
evenyag Aug 19, 2023
d722511
feat: implement Node and CompareFirst
evenyag Aug 17, 2023
0ff7a5c
feat: merge reader wip
evenyag Aug 17, 2023
b493cd5
feat: merge wip
evenyag Aug 18, 2023
e69ec14
feat: use batch's operation to sort and dedup
evenyag Aug 18, 2023
96ce279
feat: implement BatchReader for MergeReader
evenyag Aug 18, 2023
eb690b0
feat: simplify codes
evenyag Aug 19, 2023
08d57a2
test: test merge reader
evenyag Aug 21, 2023
80a0621
refactor: use test util to create batch
evenyag Aug 21, 2023
2da5cb2
refactor: remove unused imports
evenyag Aug 21, 2023
534a628
feat: Merge branch 'develop' into feat/mito2-merge
evenyag Aug 22, 2023
0b39809
feat: update comment
evenyag Aug 23, 2023
a398923
chore: remove metadata() from Source
evenyag Aug 23, 2023
17845be
chroe: update comment
evenyag Aug 23, 2023
633d53e
feat: Merge branch 'develop' into feat/mito2-merge
evenyag Aug 23, 2023
4394be6
feat: source supports batch iterator
evenyag Aug 23, 2023
7f60281
chore: update comment
evenyag Aug 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 13 additions & 35 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

//! Common structs and utilities for reading data.

pub mod merge;

use std::sync::Arc;

use api::v1::OpType;
Expand Down Expand Up @@ -109,7 +111,7 @@ impl Batch {
self.num_rows() == 0
}

/// Returns the first timestamp in the batch.
/// Returns the first timestamp in the batch or `None` if the batch is empty.
pub fn first_timestamp(&self) -> Option<Timestamp> {
if self.timestamps.is_empty() {
return None;
Expand All @@ -118,7 +120,7 @@ impl Batch {
Some(self.get_timestamp(0))
}

/// Returns the last timestamp in the batch.
/// Returns the last timestamp in the batch or `None` if the batch is empty.
pub fn last_timestamp(&self) -> Option<Timestamp> {
if self.timestamps.is_empty() {
return None;
Expand Down Expand Up @@ -554,12 +556,17 @@ pub struct SourceStats {
/// Async [Batch] reader and iterator wrapper.
///
/// This is the data source for SST writers or internal readers.
pub enum Source {}
pub enum Source {
/// Source from a [BoxedBatchReader].
Reader(BoxedBatchReader),
}

impl Source {
/// Returns next [Batch] from this data source.
pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
unimplemented!()
match self {
Source::Reader(reader) => reader.next_batch().await,
}
}

/// Returns the metadata of the source region.
Expand Down Expand Up @@ -603,46 +610,17 @@ impl<T: BatchReader + ?Sized> BatchReader for Box<T> {

#[cfg(test)]
mod tests {
use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};

use super::*;
use crate::error::Error;

fn new_batch_builder(
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
field: &[u64],
) -> BatchBuilder {
let mut builder = BatchBuilder::new(b"test".to_vec());
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
timestamps.iter().copied(),
)))
.unwrap()
.sequences_array(Arc::new(UInt64Array::from_iter_values(
sequences.iter().copied(),
)))
.unwrap()
.op_types_array(Arc::new(UInt8Array::from_iter_values(
op_types.iter().map(|v| *v as u8),
)))
.unwrap()
.push_field_array(
1,
Arc::new(UInt64Array::from_iter_values(field.iter().copied())),
)
.unwrap();
builder
}
use crate::test_util::new_batch_builder;

fn new_batch(
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
field: &[u64],
) -> Batch {
new_batch_builder(timestamps, sequences, op_types, field)
new_batch_builder(b"test", timestamps, sequences, op_types, field)
.build()
.unwrap()
}
Expand Down
Loading