Skip to content

Commit

Permalink
Merge remote-tracking branch 'zhongzc/zhongzc/inverted-index-integrat…
Browse files Browse the repository at this point in the history
…ion-applier-builder-part-1' into zhongzc/index-integration
  • Loading branch information
zhongzc committed Dec 30, 2023
2 parents dc2b433 + 9c98034 commit 25bb4cc
Show file tree
Hide file tree
Showing 25 changed files with 1,273 additions and 119 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/common/config/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct KafkaConfig {
#[serde(skip)]
#[serde(default)]
pub compression: RsKafkaCompression,
/// The maximum log size a kakfa batch producer could buffer.
/// The maximum log size a kafka batch producer could buffer.
pub max_batch_size: ReadableSize,
/// The linger duration of a kafka batch producer.
#[serde(with = "humantime_serde")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl IntersectionFstApplier {
Predicate::RegexMatch(regex) => {
let dfa = DFA::new(&regex.pattern);
let dfa = dfa.map_err(Box::new).context(ParseDFASnafu)?;

memory_usage += dfa.memory_usage();
dfas.push(dfa);
}
Expand Down Expand Up @@ -291,6 +292,7 @@ mod tests {
("^$", vec![]),
("1|a", vec![1, 2]),
("^123$|^abc$", vec![1, 2]),
("^123$|d", vec![1]),
];

for (pattern, expected) in cases {
Expand Down
28 changes: 14 additions & 14 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid file metadata"))]
ConvertMetaData {
location: Location,
#[snafu(source)]
error: parquet::errors::ParquetError,
},

#[snafu(display("Column not found, column: {column}"))]
ColumnNotFound { column: String, location: Location },

Expand All @@ -427,6 +434,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to convert value"))]
ConvertValue {
#[snafu(source)]
source: datatypes::error::Error,
location: Location,
},

#[snafu(display("Failed to push index value"))]
PushIndexValue {
#[snafu(source)]
Expand Down Expand Up @@ -481,20 +495,6 @@ pub enum Error {
source: puffin::error::Error,
location: Location,
},

#[snafu(display("Failed to convert value"))]
ConvertValue {
#[snafu(source)]
source: datatypes::error::Error,
location: Location,
},

#[snafu(display("Invalid file metadata"))]
ConvertMetaData {
location: Location,
#[snafu(source)]
error: parquet::errors::ParquetError,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down
7 changes: 5 additions & 2 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

//! Scans a region according to the scan request.
use std::sync::Arc;

use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{debug, logging};
use common_time::range::TimestampRange;
Expand All @@ -28,7 +30,7 @@ use crate::read::seq_scan::SeqScan;
use crate::region::version::VersionRef;
use crate::sst::file::FileHandle;
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::applier::SstIndexApplier;
use crate::sst::index::applier::SstIndexApplierRef;

/// A scanner scans a region and returns a [SendableRecordBatchStream].
pub(crate) enum Scanner {
Expand Down Expand Up @@ -234,7 +236,7 @@ impl ScanRegion {
/// To use this fixed schema to apply to different versions of SSTs, we have to make sure:
/// 1. Type of column cannot be changed.
/// 2. Column cannot be renamed.
fn build_index_applier(&self) -> Option<SstIndexApplier> {
fn build_index_applier(&self) -> Option<SstIndexApplierRef> {
SstIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.access_layer.object_store().clone(),
Expand All @@ -244,6 +246,7 @@ impl ScanRegion {
.inspect_err(|e| logging::warn!("Failed to build index applier: {}", e))
.ok()
.flatten()
.map(Arc::new)
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::ScanParallism;
use crate::read::{BatchReader, BoxedBatchReader, BoxedBatchStream, Source};
use crate::sst::file::FileHandle;
use crate::sst::index::applier::SstIndexApplier;
use crate::sst::index::applier::SstIndexApplierRef;

/// Scans a region and returns rows in a sorted sequence.
///
Expand All @@ -64,7 +64,7 @@ pub struct SeqScan {
/// Parallelism to scan data.
parallelism: ScanParallism,

index_appiler: Option<SstIndexApplier>,
index_appiler: Option<SstIndexApplierRef>,
}

impl SeqScan {
Expand Down Expand Up @@ -100,7 +100,7 @@ impl SeqScan {
}

#[must_use]
pub(crate) fn with_index_applier(mut self, index_applier: Option<SstIndexApplier>) -> Self {
pub(crate) fn with_index_applier(mut self, index_applier: Option<SstIndexApplierRef>) -> Self {
self.index_appiler = index_applier;
self
}
Expand Down
22 changes: 16 additions & 6 deletions src/mito2/src/sst/index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use index::inverted_index::format::reader::InvertedIndexBlobReader;
use index::inverted_index::search::index_apply::{
IndexApplier, IndexNotFoundStrategy, SearchContext,
};
use object_store::ObjectStore;
use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader};
use snafu::{OptionExt, ResultExt};

Expand All @@ -36,25 +37,34 @@ use crate::sst::index::store::InstrumentedStore;
use crate::sst::index::INDEX_BLOB_TYPE;
use crate::sst::location;

#[derive(Clone)]
/// The [`SstIndexApplier`] is responsible for applying predicates to the provided SST files
/// and returning the relevant row group ids for further scan.
pub struct SstIndexApplier {
/// The root directory of the region.
region_dir: String,

/// Object store responsible for accessing SST files.
store: InstrumentedStore,

index_applier: Arc<dyn IndexApplier>,
/// Predefined index applier used to apply predicates to index files
/// and return the relevant row group ids for further scan.
index_applier: Box<dyn IndexApplier>,
}

pub type SstIndexApplierRef = Arc<SstIndexApplier>;

impl SstIndexApplier {
pub(crate) fn new(
/// Creates a new [`SstIndexApplier`].
pub fn new(
region_dir: String,
store: InstrumentedStore,
index_applier: Arc<dyn IndexApplier>,
store: ObjectStore,
index_applier: Box<dyn IndexApplier>,
) -> Self {
INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);

Self {
region_dir,
store,
store: InstrumentedStore::new(store),
index_applier,
}
}
Expand Down
Loading

0 comments on commit 25bb4cc

Please sign in to comment.