Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integration mito2 with inverted index #3029

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
94618bd
index integration
zhongzc Dec 27, 2023
5255c04
Merge remote-tracking branch 'origin/develop' into zhongzc/index-inte…
zhongzc Dec 27, 2023
a97fad3
index integration
zhongzc Dec 28, 2023
10336d8
Merge remote-tracking branch 'origin/main' into zhongzc/index-integra…
zhongzc Dec 28, 2023
7420e13
Merge remote-tracking branch 'origin/main' into zhongzc/index-integra…
zhongzc Dec 28, 2023
ff2de5b
feat(inverted_index.integration): Add applier builder to convert Expr…
zhongzc Dec 28, 2023
55fc1f9
chore: add docs
zhongzc Dec 28, 2023
09639a7
fix: typos
zhongzc Dec 28, 2023
19c5f38
fix: address comments
zhongzc Dec 29, 2023
a1792d1
Update src/mito2/src/sst/index/applier/builder.rs
zhongzc Dec 29, 2023
315c77b
add some metrics
zhongzc Dec 29, 2023
6f181c4
track io bytes
zhongzc Dec 29, 2023
c5b3d69
add comments
zhongzc Dec 29, 2023
56d1621
instrument object store to track its usage
zhongzc Dec 29, 2023
2f9319a
store is nice
zhongzc Dec 29, 2023
50d8a1b
remove some unwraps
zhongzc Dec 29, 2023
dc2b433
typos
zhongzc Dec 29, 2023
9c98034
fix: remove unwrap
zhongzc Dec 29, 2023
25bb4cc
Merge remote-tracking branch 'zhongzc/zhongzc/inverted-index-integrat…
zhongzc Dec 30, 2023
d5c38b3
fix: toml format
zhongzc Dec 30, 2023
983c09c
feat: add filter metrics
zhongzc Dec 30, 2023
3ed9b30
add abort
zhongzc Jan 2, 2024
1a963a4
Merge remote-tracking branch 'origin/main' into zhongzc/index-integra…
zhongzc Jan 3, 2024
6115d8f
chore: polish
zhongzc Jan 3, 2024
915f222
feat: add seek count
zhongzc Jan 3, 2024
f049ace
Merge remote-tracking branch 'origin/main' into zhongzc/index-integra…
zhongzc Jan 3, 2024
73eaaca
remove reduntant code
zhongzc Jan 3, 2024
ac2367c
feat: add inverted_index_available to file meta
zhongzc Jan 3, 2024
9cda68a
Merge remote-tracking branch 'origin/main' into zhongzc/index-integra…
zhongzc Jan 4, 2024
e38d6cc
fix test
zhongzc Jan 4, 2024
8e01dfd
Merge remote-tracking branch 'origin/main' into zhongzc/index-integra…
zhongzc Jan 5, 2024
34bc2e0
rename config
zhongzc Jan 5, 2024
0351d28
style
zhongzc Jan 5, 2024
f67f3b5
Merge remote-tracking branch 'origin/main' into zhongzc/index-integra…
zhongzc Jan 5, 2024
8633027
typos
zhongzc Jan 5, 2024
067fe64
Merge remote-tracking branch 'origin/main' into zhongzc/index-integra…
zhongzc Jan 10, 2024
384632f
tiny refine
zhongzc Jan 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/index/src/inverted_index/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::inverted_index::BytesRef;

/// `InvertedIndexCreator` provides functionality to construct an inverted index
#[async_trait]
pub trait InvertedIndexCreator {
pub trait InvertedIndexCreator: Send {
/// Adds a value to the named index. A `None` value represents an absence of data (null)
///
/// - `index_name`: Identifier for the index being built
Expand Down
2 changes: 1 addition & 1 deletion src/index/src/inverted_index/search/index_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::inverted_index::format::reader::InvertedIndexReader;
/// avoiding repeated compilation of fixed predicates such as regex patterns.
#[mockall::automock]
#[async_trait]
pub trait IndexApplier {
pub trait IndexApplier: Send + Sync {
/// Applies the predefined predicates to the data read by the given index reader, returning
/// a list of relevant indices (e.g., post IDs, group IDs, row IDs).
async fn apply<'a>(
Expand Down
24 changes: 18 additions & 6 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,19 @@ impl AccessLayer {

/// Deletes a SST file with given file id.
pub(crate) async fn delete_sst(&self, file_id: FileId) -> Result<()> {
let path = location::sst_file_path(&self.region_dir, file_id);
let sst_path = location::sst_file_path(&self.region_dir, file_id);
self.object_store
.delete(&path)
.delete(&sst_path)
.await
.context(DeleteSstSnafu { file_id })
.context(DeleteSstSnafu { file_id })?;

let index_path = location::index_file_path(&self.region_dir, file_id);
self.object_store
.delete(&index_path)
.await
.context(OpenDalSnafu)
// ignore error if index file not found for compatibility
.or_else(|e| e.is_object_not_found().then_some(()).ok_or(e))
}

/// Returns a reader builder for specific `file`.
Expand All @@ -88,7 +96,6 @@ impl AccessLayer {
request: SstWriteRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
let path = location::sst_file_path(&self.region_dir, request.file_id);
let region_id = request.metadata.region_id;

let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() {
Expand All @@ -100,15 +107,20 @@ impl AccessLayer {
metadata: request.metadata,
source: request.source,
storage: request.storage,
upload_path: path,
region_dir: self.region_dir.clone(),
remote_store: self.object_store.clone(),
},
write_opts,
)
.await?
} else {
// Write cache is disabled.
let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone());
let mut writer = ParquetWriter::new(
self.region_dir.clone(),
request.file_id,
request.metadata,
self.object_store.clone(),
);
writer.write_all(request.source, write_opts).await?
};

Expand Down
11 changes: 7 additions & 4 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,12 @@ impl WriteCache {
) -> Result<Option<SstInfo>> {
// TODO(yingwen): Write to the local store and then upload.
// Now we write to the remote and ignore local cache.
let mut writer =
ParquetWriter::new(request.upload_path, request.metadata, request.remote_store);
let mut writer = ParquetWriter::new(
request.region_dir,
request.file_id,
request.metadata,
request.remote_store,
);
writer.write_all(request.source, write_opts).await
}
}
Expand All @@ -91,8 +95,7 @@ pub struct SstUploadRequest {
pub metadata: RegionMetadataRef,
pub source: Source,
pub storage: Option<String>,
/// Path to upload the file.
pub upload_path: String,
pub region_dir: String,
/// Remote object store to upload.
pub remote_store: ObjectStore,
}
8 changes: 4 additions & 4 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, error};
pub use picker::CompactionPickerRef;
use snafu::ResultExt;
Expand Down Expand Up @@ -53,9 +52,10 @@ pub struct CompactionRequest {
pub(crate) file_purger: FilePurgerRef,
/// Start time of compaction task.
pub(crate) start_time: Instant,
/// Buffering threshold while writing SST files.
pub(crate) sst_write_buffer_size: ReadableSize,

pub(crate) cache_manager: CacheManagerRef,

pub(crate) engine_config: Arc<MitoConfig>,
}

impl CompactionRequest {
Expand Down Expand Up @@ -337,8 +337,8 @@ impl CompactionStatus {
waiters: Vec::new(),
file_purger: self.file_purger.clone(),
start_time,
sst_write_buffer_size: engine_config.sst_write_buffer_size,
cache_manager,
engine_config,
};

if let Some(pending) = self.pending_compaction.take() {
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/compaction/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub fn new_file_handle(
),
level,
file_size: 0,
inverted_index_available: false,
},
file_purger,
)
Expand Down
25 changes: 18 additions & 7 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::{Duration, Instant};

use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, error, info};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
Expand All @@ -31,7 +30,9 @@ use crate::access_layer::{AccessLayerRef, SstWriteRequest};
use crate::cache::CacheManagerRef;
use crate::compaction::picker::{CompactionTask, Picker};
use crate::compaction::CompactionRequest;
use crate::error::{self, CompactRegionSnafu};
use crate::config::MitoConfig;
use crate::error;
use crate::error::CompactRegionSnafu;
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
Expand All @@ -41,7 +42,7 @@ use crate::request::{
};
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
use crate::sst::parquet::{InvertedIndexOptions, WriteOptions};
use crate::sst::version::LevelMeta;

const MAX_PARALLEL_COMPACTION: usize = 8;
Expand Down Expand Up @@ -128,8 +129,8 @@ impl Picker for TwcsPicker {
waiters,
file_purger,
start_time,
sst_write_buffer_size,
cache_manager,
engine_config,
} = req;

let region_metadata = current_version.metadata.clone();
Expand Down Expand Up @@ -177,14 +178,14 @@ impl Picker for TwcsPicker {
sst_layer: access_layer,
outputs,
expired_ssts,
sst_write_buffer_size,
compaction_time_window: Some(time_window_size),
request_sender,
waiters,
file_purger,
start_time,
cache_manager,
storage: current_version.options.storage.clone(),
engine_config,
};
Some(Box::new(task))
}
Expand Down Expand Up @@ -238,7 +239,6 @@ pub(crate) struct TwcsCompactionTask {
pub sst_layer: AccessLayerRef,
pub outputs: Vec<CompactionOutput>,
pub expired_ssts: Vec<FileHandle>,
pub sst_write_buffer_size: ReadableSize,
pub compaction_time_window: Option<i64>,
pub file_purger: FilePurgerRef,
/// Request sender to notify the worker.
Expand All @@ -250,6 +250,7 @@ pub(crate) struct TwcsCompactionTask {
pub(crate) cache_manager: CacheManagerRef,
/// Target storage of the region.
pub(crate) storage: Option<String>,
pub engine_config: Arc<MitoConfig>,
}

impl Debug for TwcsCompactionTask {
Expand Down Expand Up @@ -299,8 +300,17 @@ impl TwcsCompactionTask {
output.output_file_id
);

let index_config = &self.engine_config.inverted_index;
let inverted_index =
(!index_config.disable_creation_on_compact).then(|| InvertedIndexOptions {
creation_memory_usage_threshold: index_config
.creation_memory_usage_threshold
.map(|size| size.as_bytes() as _),
});

let write_opts = WriteOptions {
write_buffer_size: self.sst_write_buffer_size,
write_buffer_size: self.engine_config.sst_write_buffer_size,
inverted_index,
..Default::default()
};
let metadata = self.metadata.clone();
Expand Down Expand Up @@ -330,6 +340,7 @@ impl TwcsCompactionTask {
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
inverted_index_available: sst_info.inverted_index_available,
});
Ok(file_meta_opt)
});
Expand Down
20 changes: 20 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ pub struct MitoConfig {
pub parallel_scan_channel_size: usize,
/// Whether to allow stale entries read during replay.
pub allow_stale_entries: bool,

pub inverted_index: InvertedIndexConfig,
}

impl Default for MitoConfig {
Expand All @@ -113,6 +115,7 @@ impl Default for MitoConfig {
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
inverted_index: InvertedIndexConfig::default(),
}
}
}
Expand Down Expand Up @@ -188,3 +191,20 @@ fn divide_num_cpus(divisor: usize) -> usize {

(cores + divisor - 1) / divisor
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct InvertedIndexConfig {
pub disable_creation_on_flush: bool,
pub disable_creation_on_compact: bool,
pub creation_memory_usage_threshold: Option<ReadableSize>,
}

impl Default for InvertedIndexConfig {
fn default() -> Self {
InvertedIndexConfig {
disable_creation_on_flush: false,
disable_creation_on_compact: false,
creation_memory_usage_threshold: Some(ReadableSize::mb(128)),
}
}
}
2 changes: 1 addition & 1 deletion src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,5 +553,5 @@ async fn test_region_usage() {
assert_eq!(region_stat.sst_usage, 2742);

// region total usage
assert_eq!(region_stat.disk_usage(), 3748);
assert_eq!(region_stat.disk_usage(), 3780);
}
12 changes: 11 additions & 1 deletion src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::request::{
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
use crate::sst::parquet::{InvertedIndexOptions, WriteOptions};
use crate::worker::WorkerListener;

/// Global write buffer (memtable) manager.
Expand Down Expand Up @@ -294,8 +294,17 @@ impl RegionFlushTask {
.with_label_values(&["flush_memtables"])
.start_timer();

let inverted_index_config = &self.engine_config.inverted_index;
let inverted_index_options =
(!inverted_index_config.disable_creation_on_flush).then(|| InvertedIndexOptions {
creation_memory_usage_threshold: inverted_index_config
.creation_memory_usage_threshold
.map(|size| size.as_bytes() as _),
});

let mut write_opts = WriteOptions {
write_buffer_size: self.engine_config.sst_write_buffer_size,
inverted_index: inverted_index_options,
..Default::default()
};
if let Some(row_group_size) = self.row_group_size {
Expand Down Expand Up @@ -339,6 +348,7 @@ impl RegionFlushTask {
time_range: sst_info.time_range,
level: 0,
file_size: sst_info.file_size,
inverted_index_available: sst_info.inverted_index_available,
};
file_metas.push(file_meta);
}
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/manifest/tests/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ async fn checkpoint_with_different_compression_types() {
time_range: (0.into(), 10000000.into()),
level: 0,
file_size: 1024000,
inverted_index_available: false,
};
let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![file_meta],
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ lazy_static! {
/// Counter of filtered rows during merge.
pub static ref MERGE_FILTER_ROWS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_merge_filter_rows_total", "mito merge filter rows total", &[TYPE_LABEL]).unwrap();
/// Counter of row groups read.
pub static ref READ_ROW_GROUPS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_read_row_groups_total", "mito read row groups total", &[TYPE_LABEL]).unwrap();
// ------- End of query metrics.

// Cache related metrics.
Expand Down
26 changes: 25 additions & 1 deletion src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

//! Scans a region according to the scan request.

use std::sync::Arc;

use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use common_telemetry::{debug, warn};
use common_time::range::TimestampRange;
use store_api::storage::ScanRequest;
use table::predicate::{Predicate, TimeRangePredicateBuilder};
Expand All @@ -27,6 +29,8 @@ use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
use crate::region::version::VersionRef;
use crate::sst::file::FileHandle;
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::applier::SstIndexApplierRef;

/// A scanner scans a region and returns a [SendableRecordBatchStream].
pub(crate) enum Scanner {
Expand Down Expand Up @@ -194,6 +198,7 @@ impl ScanRegion {
total_ssts
);

let index_applier = self.build_index_applier();
let predicate = Predicate::new(self.request.filters.clone());
// The mapper always computes projected column ids as the schema of SSTs may change.
let mapper = match &self.request.projection {
Expand All @@ -204,6 +209,7 @@ impl ScanRegion {
let seq_scan = SeqScan::new(self.access_layer.clone(), mapper)
.with_time_range(Some(time_range))
.with_predicate(Some(predicate))
.with_index_applier(index_applier)
.with_memtables(memtables)
.with_files(files)
.with_cache(self.cache_manager)
Expand All @@ -224,6 +230,24 @@ impl ScanRegion {
TimeRangePredicateBuilder::new(&time_index.column_schema.name, unit, &self.request.filters)
.build()
}

/// Use the latest schema to build the index applier.
///
/// To use this fixed schema to apply to different versions of SSTs, we have to make sure:
/// 1. Type of column cannot be changed.
/// 2. Column cannot be renamed.
fn build_index_applier(&self) -> Option<SstIndexApplierRef> {
SstIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.access_layer.object_store().clone(),
self.version.metadata.as_ref(),
)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build index applier"))
.ok()
.flatten()
.map(Arc::new)
}
}

/// Config for parallel scan.
Expand Down
Loading
Loading