Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(mito): remove #[allow(dead_code)] #2479

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/mito2/src/compaction/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ pub(crate) struct CompactionOutput {
pub output_file_id: FileId,
/// Compaction output file level.
pub output_level: Level,
/// The left bound of time window.
pub time_window_bound: i64,
/// Time window size in seconds.
pub time_window_sec: i64,
/// Compaction input files.
pub inputs: Vec<FileHandle>,
}
Expand Down
16 changes: 0 additions & 16 deletions src/mito2/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,3 @@ pub trait CompactionTask: Debug + Send + Sync + 'static {
pub trait Picker: Debug + Send + 'static {
fn pick(&self, req: CompactionRequest) -> Option<Box<dyn CompactionTask>>;
}

pub struct PickerContext {
compaction_time_window: Option<i64>,
}

impl PickerContext {
pub fn with(compaction_time_window: Option<i64>) -> Self {
Self {
compaction_time_window,
}
}

pub fn compaction_time_window(&self) -> Option<i64> {
self.compaction_time_window
}
}
183 changes: 5 additions & 178 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -80,7 +80,6 @@ impl TwcsPicker {
&self,
time_windows: &BTreeMap<i64, Vec<FileHandle>>,
active_window: Option<i64>,
window_size: i64,
) -> Vec<CompactionOutput> {
let mut output = vec![];
for (window, files) in time_windows {
Expand All @@ -89,8 +88,6 @@ impl TwcsPicker {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: 1, // we only have two levels and always compact to l1
time_window_bound: *window,
time_window_sec: window_size,
inputs: files.clone(),
});
} else {
Expand All @@ -102,8 +99,6 @@ impl TwcsPicker {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: 1,
time_window_bound: *window,
time_window_sec: window_size,
inputs: files.clone(),
});
} else {
Expand Down Expand Up @@ -155,7 +150,7 @@ impl Picker for TwcsPicker {
let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
// Assign files to windows
let windows = assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
let outputs = self.build_output(&windows, active_window, time_window_size);
let outputs = self.build_output(&windows, active_window);

if outputs.is_empty() && expired_ssts.is_empty() {
// Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
Expand Down Expand Up @@ -407,54 +402,6 @@ pub(crate) fn infer_time_bucket<'a>(files: impl Iterator<Item = &'a FileHandle>)
.unwrap_or_else(|| TIME_BUCKETS.max()) // safety: TIME_BUCKETS cannot be empty.
}

/// Finds files that can be compacted in given level.
/// Currently they're files that is not currently under compaction.
#[inline]
fn find_compactable_files(level: &LevelMeta) -> Vec<FileHandle> {
level.files().filter(|f| !f.compacting()).cloned().collect()
}

/// Calculates timestamp span between start and end timestamp.
fn file_time_bucket_span(start_sec: i64, end_sec: i64, bucket_sec: i64) -> Vec<i64> {
assert!(start_sec <= end_sec);

// if timestamp is between `[i64::MIN, i64::MIN.align_by_bucket(bucket)]`, which cannot
// be aligned to a valid i64 bound, simply return `i64::MIN` rather than just underflow.
let mut start_aligned = start_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN);
let end_aligned = end_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN);

let mut res = Vec::with_capacity(((end_aligned - start_aligned) / bucket_sec + 1) as usize);
while start_aligned < end_aligned {
res.push(start_aligned);
start_aligned += bucket_sec;
}
res.push(end_aligned);
res
}

/// Calculates buckets for files. If file does not contain a time range in metadata, it will be
/// assigned to a special bucket `i64::MAX` (normally no timestamp can be aligned to this bucket)
/// so that all files without timestamp can be compacted together.
fn calculate_time_buckets(bucket_sec: i64, files: &[FileHandle]) -> HashMap<i64, Vec<FileHandle>> {
let mut buckets = HashMap::new();

for file in files {
let (start, end) = file.time_range();
let bounds = file_time_bucket_span(
start.convert_to(TimeUnit::Second).unwrap().value(),
end.convert_to(TimeUnit::Second).unwrap().value(),
bucket_sec,
);
for bound in bounds {
buckets
.entry(bound)
.or_insert_with(Vec::new)
.push(file.clone());
}
}
buckets
}

pub(crate) struct TimeBuckets([i64; 7]);

impl TimeBuckets {
Expand Down Expand Up @@ -603,20 +550,14 @@ mod tests {
let windows = assign_to_windows(self.input_files.iter(), self.window_size);
let active_window =
find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
let output =
TwcsPicker::new(4, 1, None).build_output(&windows, active_window, self.window_size);
let output = TwcsPicker::new(4, 1, None).build_output(&windows, active_window);

let output = output
.iter()
.map(|o| {
let input_file_ids =
o.inputs.iter().map(|f| f.file_id()).collect::<HashSet<_>>();
(
input_file_ids,
o.output_level,
o.time_window_sec,
o.time_window_bound,
)
(input_file_ids, o.output_level)
})
.collect::<Vec<_>>();

Expand All @@ -629,12 +570,7 @@ mod tests {
.iter()
.map(|idx| self.input_files[*idx].file_id())
.collect::<HashSet<_>>();
(
input_file_ids,
o.output_level,
o.time_window_sec,
o.time_window_bound,
)
(input_file_ids, o.output_level)
})
.collect::<Vec<_>>();
assert_eq!(expected, output);
Expand All @@ -644,9 +580,6 @@ mod tests {
struct ExpectedOutput {
input_files: Vec<usize>,
output_level: Level,
time_window_sec: i64,
time_window_bound: i64,
strict_window: bool,
}

#[test]
Expand All @@ -665,9 +598,6 @@ mod tests {
expected_outputs: vec![ExpectedOutput {
input_files: vec![0, 1],
output_level: 1,
time_window_sec: 3,
time_window_bound: 0,
strict_window: false,
}],
}
.check();
Expand All @@ -688,16 +618,10 @@ mod tests {
ExpectedOutput {
input_files: vec![0, 1],
output_level: 1,
time_window_sec: 3,
time_window_bound: 0,
strict_window: false,
},
ExpectedOutput {
input_files: vec![2, 3, 4],
output_level: 1,
time_window_sec: 3,
time_window_bound: 3,
strict_window: false,
},
],
}
Expand Down Expand Up @@ -742,102 +666,5 @@ mod tests {
);
}

fn check_bucket_calculation(
bucket_sec: i64,
files: Vec<FileHandle>,
expected: &[(i64, &[FileId])],
) {
let res = calculate_time_buckets(bucket_sec, &files);

let expected = expected
.iter()
.map(|(bucket, file_ids)| (*bucket, file_ids.iter().copied().collect::<HashSet<_>>()))
.collect::<HashMap<_, _>>();

for (bucket, file_ids) in expected {
let actual = res
.get(&bucket)
.unwrap()
.iter()
.map(|f| f.file_id())
.collect();
assert_eq!(
file_ids, actual,
"bucket: {bucket}, expected: {file_ids:?}, actual: {actual:?}",
);
}
}

#[test]
fn test_calculate_time_buckets() {
let file_id_a = FileId::random();
let file_id_b = FileId::random();
// simple case, files with disjoint
check_bucket_calculation(
10,
new_file_handles(&[(file_id_a, 0, 9000), (file_id_b, 10000, 19000)]),
&[(0, &[file_id_a]), (10, &[file_id_b])],
);

// files across buckets
check_bucket_calculation(
10,
new_file_handles(&[(file_id_a, 0, 10001), (file_id_b, 10000, 19000)]),
&[(0, &[file_id_a]), (10, &[file_id_a, file_id_b])],
);
check_bucket_calculation(
10,
new_file_handles(&[(file_id_a, 0, 10000)]),
&[(0, &[file_id_a]), (10, &[file_id_a])],
);

// file with an large time range
let file_id_array = &[file_id_a];
let expected = (0..(TIME_BUCKETS.get(4) / TIME_BUCKETS.get(0)))
.map(|b| (b * TIME_BUCKETS.get(0), file_id_array as _))
.collect::<Vec<_>>();
check_bucket_calculation(
TIME_BUCKETS.get(0),
new_file_handles(&[(file_id_a, 0, TIME_BUCKETS.get(4) * 1000)]),
&expected,
);
}

#[test]
fn test_time_bucket_span() {
assert_eq!(vec![0], file_time_bucket_span(1, 9, 10));
assert_eq!(vec![0, 10], file_time_bucket_span(1, 10, 10));
assert_eq!(vec![-10], file_time_bucket_span(-10, -1, 10));
assert_eq!(vec![-10, 0], file_time_bucket_span(-10, 0, 10));
}

#[test]
fn test_time_bucket_span_large() {
assert_eq!(
vec![
(i64::MAX - 10).align_by_bucket(10).unwrap(),
i64::MAX.align_by_bucket(10).unwrap(),
],
file_time_bucket_span(i64::MAX - 10, i64::MAX, 10)
);

for bucket in 1..100 {
assert_eq!(
vec![
i64::MIN,
(i64::MIN + bucket).align_by_bucket(bucket).unwrap()
],
file_time_bucket_span(i64::MIN, i64::MIN + bucket, bucket)
);
}
}

fn new_file_handles(input: &[(FileId, i64, i64)]) -> Vec<FileHandle> {
input
.iter()
.map(|(file_id, start, end)| new_file_handle(*file_id, *start, *end, 0))
.collect()
}

// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
}
10 changes: 7 additions & 3 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_query::Output;
use common_telemetry::{error, info};
use snafu::ResultExt;
use store_api::storage::RegionId;
use strum::AsRefStr;
use tokio::sync::mpsc;

use crate::access_layer::AccessLayerRef;
Expand Down Expand Up @@ -97,7 +98,7 @@ impl WriteBufferManagerImpl {
}

/// Returns memory usage of mutable memtables.
pub(crate) fn mutable_usage(&self) -> usize {
pub fn mutable_usage(&self) -> usize {
self.memory_active.load(Ordering::Relaxed)
}

Expand Down Expand Up @@ -162,6 +163,7 @@ impl WriteBufferManager for WriteBufferManagerImpl {
}

/// Reason of a flush task.
#[derive(Debug, AsRefStr)]
pub enum FlushReason {
/// Other reasons.
Others,
Expand Down Expand Up @@ -302,8 +304,10 @@ impl RegionFlushTask {

let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect();
info!(
"Successfully flush memtables, region: {}, files: {:?}",
version.metadata.region_id, file_ids
"Successfully flush memtables, region: {}, reason: {}, files: {:?}",
version.metadata.region_id,
self.reason.as_ref(),
file_ids
);

Ok(file_metas)
Expand Down
9 changes: 1 addition & 8 deletions src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,21 @@
#[cfg(any(test, feature = "test"))]
pub mod test_util;

// TODO(yingwen): Remove all `allow(dead_code)` after finish refactoring mito.
mod access_layer;
#[allow(dead_code)]
mod compaction;
pub mod config;
pub mod engine;
pub mod error;
#[allow(dead_code)]
mod flush;
pub mod flush;
pub mod manifest;
#[allow(dead_code)]
pub mod memtable;
mod metrics;
#[allow(dead_code)]
pub mod read;
pub mod region;
mod region_write_ctx;
#[allow(dead_code)]
pub mod request;
mod row_converter;
pub(crate) mod schedule;
#[allow(dead_code)]
pub mod sst;
pub mod wal;
mod worker;
Expand Down
8 changes: 7 additions & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,22 @@ pub type MemtableId = u32;

#[derive(Debug, Default)]
pub struct MemtableStats {
/// The estimated bytes allocated by this memtable from heap.
/// The estimated bytes allocated by this memtable from heap.
estimated_bytes: usize,
/// The time range that this memtable contains.
time_range: Option<(Timestamp, Timestamp)>,
}

impl MemtableStats {
/// Returns the estimated bytes allocated by this memtable.
pub fn bytes_allocated(&self) -> usize {
self.estimated_bytes
}

/// Returns the time range of the memtable.
pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
self.time_range
}
}

pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send + Sync>;
Expand Down
Loading