Skip to content

Commit

Permalink
refactor(mito): remove #[allow(dead_code)] (#2479)
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag authored Sep 25, 2023
1 parent c0f080d commit 7ecfaa2
Show file tree
Hide file tree
Showing 10 changed files with 23 additions and 256 deletions.
4 changes: 0 additions & 4 deletions src/mito2/src/compaction/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ pub(crate) struct CompactionOutput {
pub output_file_id: FileId,
/// Compaction output file level.
pub output_level: Level,
/// The left bound of time window.
pub time_window_bound: i64,
/// Time window size in seconds.
pub time_window_sec: i64,
/// Compaction input files.
pub inputs: Vec<FileHandle>,
}
Expand Down
16 changes: 0 additions & 16 deletions src/mito2/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,3 @@ pub trait CompactionTask: Debug + Send + Sync + 'static {
pub trait Picker: Debug + Send + 'static {
fn pick(&self, req: CompactionRequest) -> Option<Box<dyn CompactionTask>>;
}

pub struct PickerContext {
compaction_time_window: Option<i64>,
}

impl PickerContext {
pub fn with(compaction_time_window: Option<i64>) -> Self {
Self {
compaction_time_window,
}
}

pub fn compaction_time_window(&self) -> Option<i64> {
self.compaction_time_window
}
}
183 changes: 5 additions & 178 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -80,7 +80,6 @@ impl TwcsPicker {
&self,
time_windows: &BTreeMap<i64, Vec<FileHandle>>,
active_window: Option<i64>,
window_size: i64,
) -> Vec<CompactionOutput> {
let mut output = vec![];
for (window, files) in time_windows {
Expand All @@ -89,8 +88,6 @@ impl TwcsPicker {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: 1, // we only have two levels and always compact to l1
time_window_bound: *window,
time_window_sec: window_size,
inputs: files.clone(),
});
} else {
Expand All @@ -102,8 +99,6 @@ impl TwcsPicker {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: 1,
time_window_bound: *window,
time_window_sec: window_size,
inputs: files.clone(),
});
} else {
Expand Down Expand Up @@ -155,7 +150,7 @@ impl Picker for TwcsPicker {
let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
// Assign files to windows
let windows = assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
let outputs = self.build_output(&windows, active_window, time_window_size);
let outputs = self.build_output(&windows, active_window);

if outputs.is_empty() && expired_ssts.is_empty() {
// Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
Expand Down Expand Up @@ -407,54 +402,6 @@ pub(crate) fn infer_time_bucket<'a>(files: impl Iterator<Item = &'a FileHandle>)
.unwrap_or_else(|| TIME_BUCKETS.max()) // safety: TIME_BUCKETS cannot be empty.
}

/// Finds files that can be compacted in given level.
/// Currently they're files that is not currently under compaction.
#[inline]
fn find_compactable_files(level: &LevelMeta) -> Vec<FileHandle> {
level.files().filter(|f| !f.compacting()).cloned().collect()
}

/// Calculates timestamp span between start and end timestamp.
fn file_time_bucket_span(start_sec: i64, end_sec: i64, bucket_sec: i64) -> Vec<i64> {
assert!(start_sec <= end_sec);

// if timestamp is between `[i64::MIN, i64::MIN.align_by_bucket(bucket)]`, which cannot
// be aligned to a valid i64 bound, simply return `i64::MIN` rather than just underflow.
let mut start_aligned = start_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN);
let end_aligned = end_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN);

let mut res = Vec::with_capacity(((end_aligned - start_aligned) / bucket_sec + 1) as usize);
while start_aligned < end_aligned {
res.push(start_aligned);
start_aligned += bucket_sec;
}
res.push(end_aligned);
res
}

/// Calculates buckets for files. If file does not contain a time range in metadata, it will be
/// assigned to a special bucket `i64::MAX` (normally no timestamp can be aligned to this bucket)
/// so that all files without timestamp can be compacted together.
fn calculate_time_buckets(bucket_sec: i64, files: &[FileHandle]) -> HashMap<i64, Vec<FileHandle>> {
let mut buckets = HashMap::new();

for file in files {
let (start, end) = file.time_range();
let bounds = file_time_bucket_span(
start.convert_to(TimeUnit::Second).unwrap().value(),
end.convert_to(TimeUnit::Second).unwrap().value(),
bucket_sec,
);
for bound in bounds {
buckets
.entry(bound)
.or_insert_with(Vec::new)
.push(file.clone());
}
}
buckets
}

pub(crate) struct TimeBuckets([i64; 7]);

impl TimeBuckets {
Expand Down Expand Up @@ -603,20 +550,14 @@ mod tests {
let windows = assign_to_windows(self.input_files.iter(), self.window_size);
let active_window =
find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
let output =
TwcsPicker::new(4, 1, None).build_output(&windows, active_window, self.window_size);
let output = TwcsPicker::new(4, 1, None).build_output(&windows, active_window);

let output = output
.iter()
.map(|o| {
let input_file_ids =
o.inputs.iter().map(|f| f.file_id()).collect::<HashSet<_>>();
(
input_file_ids,
o.output_level,
o.time_window_sec,
o.time_window_bound,
)
(input_file_ids, o.output_level)
})
.collect::<Vec<_>>();

Expand All @@ -629,12 +570,7 @@ mod tests {
.iter()
.map(|idx| self.input_files[*idx].file_id())
.collect::<HashSet<_>>();
(
input_file_ids,
o.output_level,
o.time_window_sec,
o.time_window_bound,
)
(input_file_ids, o.output_level)
})
.collect::<Vec<_>>();
assert_eq!(expected, output);
Expand All @@ -644,9 +580,6 @@ mod tests {
struct ExpectedOutput {
input_files: Vec<usize>,
output_level: Level,
time_window_sec: i64,
time_window_bound: i64,
strict_window: bool,
}

#[test]
Expand All @@ -665,9 +598,6 @@ mod tests {
expected_outputs: vec![ExpectedOutput {
input_files: vec![0, 1],
output_level: 1,
time_window_sec: 3,
time_window_bound: 0,
strict_window: false,
}],
}
.check();
Expand All @@ -688,16 +618,10 @@ mod tests {
ExpectedOutput {
input_files: vec![0, 1],
output_level: 1,
time_window_sec: 3,
time_window_bound: 0,
strict_window: false,
},
ExpectedOutput {
input_files: vec![2, 3, 4],
output_level: 1,
time_window_sec: 3,
time_window_bound: 3,
strict_window: false,
},
],
}
Expand Down Expand Up @@ -742,102 +666,5 @@ mod tests {
);
}

fn check_bucket_calculation(
bucket_sec: i64,
files: Vec<FileHandle>,
expected: &[(i64, &[FileId])],
) {
let res = calculate_time_buckets(bucket_sec, &files);

let expected = expected
.iter()
.map(|(bucket, file_ids)| (*bucket, file_ids.iter().copied().collect::<HashSet<_>>()))
.collect::<HashMap<_, _>>();

for (bucket, file_ids) in expected {
let actual = res
.get(&bucket)
.unwrap()
.iter()
.map(|f| f.file_id())
.collect();
assert_eq!(
file_ids, actual,
"bucket: {bucket}, expected: {file_ids:?}, actual: {actual:?}",
);
}
}

#[test]
fn test_calculate_time_buckets() {
let file_id_a = FileId::random();
let file_id_b = FileId::random();
// simple case, files with disjoint
check_bucket_calculation(
10,
new_file_handles(&[(file_id_a, 0, 9000), (file_id_b, 10000, 19000)]),
&[(0, &[file_id_a]), (10, &[file_id_b])],
);

// files across buckets
check_bucket_calculation(
10,
new_file_handles(&[(file_id_a, 0, 10001), (file_id_b, 10000, 19000)]),
&[(0, &[file_id_a]), (10, &[file_id_a, file_id_b])],
);
check_bucket_calculation(
10,
new_file_handles(&[(file_id_a, 0, 10000)]),
&[(0, &[file_id_a]), (10, &[file_id_a])],
);

// file with an large time range
let file_id_array = &[file_id_a];
let expected = (0..(TIME_BUCKETS.get(4) / TIME_BUCKETS.get(0)))
.map(|b| (b * TIME_BUCKETS.get(0), file_id_array as _))
.collect::<Vec<_>>();
check_bucket_calculation(
TIME_BUCKETS.get(0),
new_file_handles(&[(file_id_a, 0, TIME_BUCKETS.get(4) * 1000)]),
&expected,
);
}

#[test]
fn test_time_bucket_span() {
assert_eq!(vec![0], file_time_bucket_span(1, 9, 10));
assert_eq!(vec![0, 10], file_time_bucket_span(1, 10, 10));
assert_eq!(vec![-10], file_time_bucket_span(-10, -1, 10));
assert_eq!(vec![-10, 0], file_time_bucket_span(-10, 0, 10));
}

#[test]
fn test_time_bucket_span_large() {
assert_eq!(
vec![
(i64::MAX - 10).align_by_bucket(10).unwrap(),
i64::MAX.align_by_bucket(10).unwrap(),
],
file_time_bucket_span(i64::MAX - 10, i64::MAX, 10)
);

for bucket in 1..100 {
assert_eq!(
vec![
i64::MIN,
(i64::MIN + bucket).align_by_bucket(bucket).unwrap()
],
file_time_bucket_span(i64::MIN, i64::MIN + bucket, bucket)
);
}
}

fn new_file_handles(input: &[(FileId, i64, i64)]) -> Vec<FileHandle> {
input
.iter()
.map(|(file_id, start, end)| new_file_handle(*file_id, *start, *end, 0))
.collect()
}

// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
}
10 changes: 7 additions & 3 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_query::Output;
use common_telemetry::{error, info};
use snafu::ResultExt;
use store_api::storage::RegionId;
use strum::AsRefStr;
use tokio::sync::mpsc;

use crate::access_layer::AccessLayerRef;
Expand Down Expand Up @@ -97,7 +98,7 @@ impl WriteBufferManagerImpl {
}

/// Returns memory usage of mutable memtables.
pub(crate) fn mutable_usage(&self) -> usize {
pub fn mutable_usage(&self) -> usize {
self.memory_active.load(Ordering::Relaxed)
}

Expand Down Expand Up @@ -162,6 +163,7 @@ impl WriteBufferManager for WriteBufferManagerImpl {
}

/// Reason of a flush task.
#[derive(Debug, AsRefStr)]
pub enum FlushReason {
/// Other reasons.
Others,
Expand Down Expand Up @@ -302,8 +304,10 @@ impl RegionFlushTask {

let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect();
info!(
"Successfully flush memtables, region: {}, files: {:?}",
version.metadata.region_id, file_ids
"Successfully flush memtables, region: {}, reason: {}, files: {:?}",
version.metadata.region_id,
self.reason.as_ref(),
file_ids
);

Ok(file_metas)
Expand Down
9 changes: 1 addition & 8 deletions src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,21 @@
#[cfg(any(test, feature = "test"))]
pub mod test_util;

// TODO(yingwen): Remove all `allow(dead_code)` after finish refactoring mito.
mod access_layer;
#[allow(dead_code)]
mod compaction;
pub mod config;
pub mod engine;
pub mod error;
#[allow(dead_code)]
mod flush;
pub mod flush;
pub mod manifest;
#[allow(dead_code)]
pub mod memtable;
mod metrics;
#[allow(dead_code)]
pub mod read;
pub mod region;
mod region_write_ctx;
#[allow(dead_code)]
pub mod request;
mod row_converter;
pub(crate) mod schedule;
#[allow(dead_code)]
pub mod sst;
pub mod wal;
mod worker;
Expand Down
8 changes: 7 additions & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,22 @@ pub type MemtableId = u32;

#[derive(Debug, Default)]
pub struct MemtableStats {
/// The estimated bytes allocated by this memtable from heap.
/// The estimated bytes allocated by this memtable from heap.
estimated_bytes: usize,
/// The time range that this memtable contains.
time_range: Option<(Timestamp, Timestamp)>,
}

impl MemtableStats {
/// Returns the estimated bytes allocated by this memtable.
pub fn bytes_allocated(&self) -> usize {
self.estimated_bytes
}

/// Returns the time range of the memtable.
pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
self.time_range
}
}

pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send + Sync>;
Expand Down
Loading

0 comments on commit 7ecfaa2

Please sign in to comment.