diff --git a/src/mito2/src/compaction/output.rs b/src/mito2/src/compaction/output.rs index 96d95874b758..bedbcb741729 100644 --- a/src/mito2/src/compaction/output.rs +++ b/src/mito2/src/compaction/output.rs @@ -29,10 +29,6 @@ pub(crate) struct CompactionOutput { pub output_file_id: FileId, /// Compaction output file level. pub output_level: Level, - /// The left bound of time window. - pub time_window_bound: i64, - /// Time window size in seconds. - pub time_window_sec: i64, /// Compaction input files. pub inputs: Vec, } diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index 1e986be0a673..d97229e6ac21 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -29,19 +29,3 @@ pub trait CompactionTask: Debug + Send + Sync + 'static { pub trait Picker: Debug + Send + 'static { fn pick(&self, req: CompactionRequest) -> Option>; } - -pub struct PickerContext { - compaction_time_window: Option, -} - -impl PickerContext { - pub fn with(compaction_time_window: Option) -> Self { - Self { - compaction_time_window, - } - } - - pub fn compaction_time_window(&self) -> Option { - self.compaction_time_window - } -} diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index f840d6aeb2f9..19b9085eba67 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::BTreeMap; use std::fmt::{Debug, Formatter}; use std::sync::Arc; use std::time::Duration; @@ -80,7 +80,6 @@ impl TwcsPicker { &self, time_windows: &BTreeMap>, active_window: Option, - window_size: i64, ) -> Vec { let mut output = vec![]; for (window, files) in time_windows { @@ -89,8 +88,6 @@ impl TwcsPicker { output.push(CompactionOutput { output_file_id: FileId::random(), output_level: 1, // we only have two levels and always compact to l1 - time_window_bound: *window, - time_window_sec: window_size, inputs: files.clone(), }); } else { @@ -102,8 +99,6 @@ impl TwcsPicker { output.push(CompactionOutput { output_file_id: FileId::random(), output_level: 1, - time_window_bound: *window, - time_window_sec: window_size, inputs: files.clone(), }); } else { @@ -155,7 +150,7 @@ impl Picker for TwcsPicker { let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size); // Assign files to windows let windows = assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size); - let outputs = self.build_output(&windows, active_window, time_window_size); + let outputs = self.build_output(&windows, active_window); if outputs.is_empty() && expired_ssts.is_empty() { // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request. @@ -407,54 +402,6 @@ pub(crate) fn infer_time_bucket<'a>(files: impl Iterator) .unwrap_or_else(|| TIME_BUCKETS.max()) // safety: TIME_BUCKETS cannot be empty. } -/// Finds files that can be compacted in given level. -/// Currently they're files that is not currently under compaction. -#[inline] -fn find_compactable_files(level: &LevelMeta) -> Vec { - level.files().filter(|f| !f.compacting()).cloned().collect() -} - -/// Calculates timestamp span between start and end timestamp. -fn file_time_bucket_span(start_sec: i64, end_sec: i64, bucket_sec: i64) -> Vec { - assert!(start_sec <= end_sec); - - // if timestamp is between `[i64::MIN, i64::MIN.align_by_bucket(bucket)]`, which cannot - // be aligned to a valid i64 bound, simply return `i64::MIN` rather than just underflow. - let mut start_aligned = start_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN); - let end_aligned = end_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN); - - let mut res = Vec::with_capacity(((end_aligned - start_aligned) / bucket_sec + 1) as usize); - while start_aligned < end_aligned { - res.push(start_aligned); - start_aligned += bucket_sec; - } - res.push(end_aligned); - res -} - -/// Calculates buckets for files. If file does not contain a time range in metadata, it will be -/// assigned to a special bucket `i64::MAX` (normally no timestamp can be aligned to this bucket) -/// so that all files without timestamp can be compacted together. -fn calculate_time_buckets(bucket_sec: i64, files: &[FileHandle]) -> HashMap> { - let mut buckets = HashMap::new(); - - for file in files { - let (start, end) = file.time_range(); - let bounds = file_time_bucket_span( - start.convert_to(TimeUnit::Second).unwrap().value(), - end.convert_to(TimeUnit::Second).unwrap().value(), - bucket_sec, - ); - for bound in bounds { - buckets - .entry(bound) - .or_insert_with(Vec::new) - .push(file.clone()); - } - } - buckets -} - pub(crate) struct TimeBuckets([i64; 7]); impl TimeBuckets { @@ -603,20 +550,14 @@ mod tests { let windows = assign_to_windows(self.input_files.iter(), self.window_size); let active_window = find_latest_window_in_seconds(self.input_files.iter(), self.window_size); - let output = - TwcsPicker::new(4, 1, None).build_output(&windows, active_window, self.window_size); + let output = TwcsPicker::new(4, 1, None).build_output(&windows, active_window); let output = output .iter() .map(|o| { let input_file_ids = o.inputs.iter().map(|f| f.file_id()).collect::>(); - ( - input_file_ids, - o.output_level, - o.time_window_sec, - o.time_window_bound, - ) + (input_file_ids, o.output_level) }) .collect::>(); @@ -629,12 +570,7 @@ mod tests { .iter() .map(|idx| self.input_files[*idx].file_id()) .collect::>(); - ( - input_file_ids, - o.output_level, - o.time_window_sec, - o.time_window_bound, - ) + (input_file_ids, o.output_level) }) .collect::>(); assert_eq!(expected, output); @@ -644,9 +580,6 @@ mod tests { struct ExpectedOutput { input_files: Vec, output_level: Level, - time_window_sec: i64, - time_window_bound: i64, - strict_window: bool, } #[test] @@ -665,9 +598,6 @@ mod tests { expected_outputs: vec![ExpectedOutput { input_files: vec![0, 1], output_level: 1, - time_window_sec: 3, - time_window_bound: 0, - strict_window: false, }], } .check(); @@ -688,16 +618,10 @@ mod tests { ExpectedOutput { input_files: vec![0, 1], output_level: 1, - time_window_sec: 3, - time_window_bound: 0, - strict_window: false, }, ExpectedOutput { input_files: vec![2, 3, 4], output_level: 1, - time_window_sec: 3, - time_window_bound: 3, - strict_window: false, }, ], } @@ -742,102 +666,5 @@ mod tests { ); } - fn check_bucket_calculation( - bucket_sec: i64, - files: Vec, - expected: &[(i64, &[FileId])], - ) { - let res = calculate_time_buckets(bucket_sec, &files); - - let expected = expected - .iter() - .map(|(bucket, file_ids)| (*bucket, file_ids.iter().copied().collect::>())) - .collect::>(); - - for (bucket, file_ids) in expected { - let actual = res - .get(&bucket) - .unwrap() - .iter() - .map(|f| f.file_id()) - .collect(); - assert_eq!( - file_ids, actual, - "bucket: {bucket}, expected: {file_ids:?}, actual: {actual:?}", - ); - } - } - - #[test] - fn test_calculate_time_buckets() { - let file_id_a = FileId::random(); - let file_id_b = FileId::random(); - // simple case, files with disjoint - check_bucket_calculation( - 10, - new_file_handles(&[(file_id_a, 0, 9000), (file_id_b, 10000, 19000)]), - &[(0, &[file_id_a]), (10, &[file_id_b])], - ); - - // files across buckets - check_bucket_calculation( - 10, - new_file_handles(&[(file_id_a, 0, 10001), (file_id_b, 10000, 19000)]), - &[(0, &[file_id_a]), (10, &[file_id_a, file_id_b])], - ); - check_bucket_calculation( - 10, - new_file_handles(&[(file_id_a, 0, 10000)]), - &[(0, &[file_id_a]), (10, &[file_id_a])], - ); - - // file with an large time range - let file_id_array = &[file_id_a]; - let expected = (0..(TIME_BUCKETS.get(4) / TIME_BUCKETS.get(0))) - .map(|b| (b * TIME_BUCKETS.get(0), file_id_array as _)) - .collect::>(); - check_bucket_calculation( - TIME_BUCKETS.get(0), - new_file_handles(&[(file_id_a, 0, TIME_BUCKETS.get(4) * 1000)]), - &expected, - ); - } - - #[test] - fn test_time_bucket_span() { - assert_eq!(vec![0], file_time_bucket_span(1, 9, 10)); - assert_eq!(vec![0, 10], file_time_bucket_span(1, 10, 10)); - assert_eq!(vec![-10], file_time_bucket_span(-10, -1, 10)); - assert_eq!(vec![-10, 0], file_time_bucket_span(-10, 0, 10)); - } - - #[test] - fn test_time_bucket_span_large() { - assert_eq!( - vec![ - (i64::MAX - 10).align_by_bucket(10).unwrap(), - i64::MAX.align_by_bucket(10).unwrap(), - ], - file_time_bucket_span(i64::MAX - 10, i64::MAX, 10) - ); - - for bucket in 1..100 { - assert_eq!( - vec![ - i64::MIN, - (i64::MIN + bucket).align_by_bucket(bucket).unwrap() - ], - file_time_bucket_span(i64::MIN, i64::MIN + bucket, bucket) - ); - } - } - - fn new_file_handles(input: &[(FileId, i64, i64)]) -> Vec { - input - .iter() - .map(|(file_id, start, end)| new_file_handle(*file_id, *start, *end, 0)) - .collect() - } - // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected. } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 73c7d54b9a78..ff7ee5051e31 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -22,6 +22,7 @@ use common_query::Output; use common_telemetry::{error, info}; use snafu::ResultExt; use store_api::storage::RegionId; +use strum::AsRefStr; use tokio::sync::mpsc; use crate::access_layer::AccessLayerRef; @@ -97,7 +98,7 @@ impl WriteBufferManagerImpl { } /// Returns memory usage of mutable memtables. - pub(crate) fn mutable_usage(&self) -> usize { + pub fn mutable_usage(&self) -> usize { self.memory_active.load(Ordering::Relaxed) } @@ -162,6 +163,7 @@ impl WriteBufferManager for WriteBufferManagerImpl { } /// Reason of a flush task. +#[derive(Debug, AsRefStr)] pub enum FlushReason { /// Other reasons. Others, @@ -302,8 +304,10 @@ impl RegionFlushTask { let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect(); info!( - "Successfully flush memtables, region: {}, files: {:?}", - version.metadata.region_id, file_ids + "Successfully flush memtables, region: {}, reason: {}, files: {:?}", + version.metadata.region_id, + self.reason.as_ref(), + file_ids ); Ok(file_metas) diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 2feb69676ce8..71d67983f091 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -21,28 +21,21 @@ #[cfg(any(test, feature = "test"))] pub mod test_util; -// TODO(yingwen): Remove all `allow(dead_code)` after finish refactoring mito. mod access_layer; -#[allow(dead_code)] mod compaction; pub mod config; pub mod engine; pub mod error; -#[allow(dead_code)] -mod flush; +pub mod flush; pub mod manifest; -#[allow(dead_code)] pub mod memtable; mod metrics; -#[allow(dead_code)] pub mod read; pub mod region; mod region_write_ctx; -#[allow(dead_code)] pub mod request; mod row_converter; pub(crate) mod schedule; -#[allow(dead_code)] pub mod sst; pub mod wal; mod worker; diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index d9c2f2e40b0c..bb82fb60d85d 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -42,16 +42,22 @@ pub type MemtableId = u32; #[derive(Debug, Default)] pub struct MemtableStats { - /// The estimated bytes allocated by this memtable from heap. + /// The estimated bytes allocated by this memtable from heap. estimated_bytes: usize, /// The time range that this memtable contains. time_range: Option<(Timestamp, Timestamp)>, } impl MemtableStats { + /// Returns the estimated bytes allocated by this memtable. pub fn bytes_allocated(&self) -> usize { self.estimated_bytes } + + /// Returns the time range of the memtable. + pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> { + self.time_range + } } pub type BoxedBatchIterator = Box> + Send + Sync>; diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 1a742b6b7e7c..b5ab52447234 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -293,11 +293,6 @@ impl SeriesSet { last_key: None, } } - - /// Returns if series set is empty. - fn is_empty(&self) -> bool { - self.series.read().unwrap().is_empty() - } } struct Iter { @@ -329,37 +324,6 @@ impl Iterator for Iter { } } -/// Bucket holds a set of [Series] which alleviate lock contention between series. -struct Bucket { - region_metadata: RegionMetadataRef, - series: RwLock>>>, -} - -impl Bucket { - fn new(region_metadata: RegionMetadataRef) -> Self { - Self { - region_metadata, - series: Default::default(), - } - } - - /// Returns the series at given index. - /// Returns None if series not found. - #[inline] - fn get_series(&self, idx: usize) -> Option>> { - self.series.read().unwrap().get(idx).cloned() - } - - /// Adds series to bucket and returns the index inside the bucket. - #[inline] - fn add_series(&self, s: Arc>) -> usize { - let mut series = self.series.write().unwrap(); - let idx = series.len(); - series.push(s); - idx - } -} - /// A `Series` holds a list of field values of some given primary key. struct Series { active: ValueBuilder, diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 33cc121a5a3e..f34d8baafe13 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -125,7 +125,7 @@ impl WriteRequest { } /// Gets column index by name. - pub(crate) fn column_index_by_name(&self, name: &str) -> Option { + pub fn column_index_by_name(&self, name: &str) -> Option { self.name_to_index.get(name).copied() } @@ -413,11 +413,6 @@ impl OptionOutputTx { } } - /// Takes the sender. - pub(crate) fn take(&mut self) -> OptionOutputTx { - OptionOutputTx(self.0.take()) - } - /// Takes the inner sender. pub(crate) fn take_inner(&mut self) -> Option { self.0.take() diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 85b1b47ae097..6280f83eef10 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -103,7 +103,7 @@ impl ParquetReaderBuilder { Ok(ParquetReader { file_path, - file_handle: self.file_handle, + _file_handle: self.file_handle, stream, read_format, batches: Vec::new(), @@ -208,7 +208,7 @@ pub struct ParquetReader { /// SST file to read. /// /// Holds the file handle to avoid the file purge purge it. - file_handle: FileHandle, + _file_handle: FileHandle, /// Inner parquet record batch stream. stream: BoxedRecordBatchStream, /// Helper to read record batches. diff --git a/src/mito2/src/sst/stream_writer.rs b/src/mito2/src/sst/stream_writer.rs index 005b533443b6..28d044298ca3 100644 --- a/src/mito2/src/sst/stream_writer.rs +++ b/src/mito2/src/sst/stream_writer.rs @@ -32,7 +32,6 @@ use crate::error::WriteParquetSnafu; /// storage by chunks to reduce memory consumption. pub struct BufferedWriter { inner: InnerBufferedWriter, - arrow_schema: SchemaRef, } type InnerBufferedWriter = LazyBufferedWriter< @@ -79,7 +78,6 @@ impl BufferedWriter { }) }), ), - arrow_schema, }) }