Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(compaction): deprecate safe_epoch #18491

Merged
merged 18 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion proto/backup_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ message MetaSnapshotMetadata {
uint64 id = 1;
uint64 hummock_version_id = 2;
uint64 max_committed_epoch = 3;
uint64 safe_epoch = 4;
reserved 4;
reserved 'safe_epoch';
optional uint32 format_version = 5;
optional string remarks = 6;
optional string rw_version = 7;
Expand Down
20 changes: 10 additions & 10 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,15 @@ message TableChangeLog {

message StateTableInfo {
uint64 committed_epoch = 1;
uint64 safe_epoch = 2;
reserved 2;
reserved 'safe_epoch';
uint64 compaction_group_id = 3;
}

message StateTableInfoDelta {
uint64 committed_epoch = 1;
uint64 safe_epoch = 2;
reserved 2;
reserved 'safe_epoch';
uint64 compaction_group_id = 3;
}

Expand All @@ -187,9 +189,8 @@ message HummockVersion {
// Levels of each compaction group
map<uint64, Levels> levels = 2;
uint64 max_committed_epoch = 3;
// Snapshots with epoch less than the safe epoch have been GCed.
// Reads against such an epoch will fail.
uint64 safe_epoch = 4;
reserved 4;
reserved 'safe_epoch';
map<uint32, TableWatermarks> table_watermarks = 5;
map<uint32, TableChangeLog> table_change_logs = 6;
map<uint32, StateTableInfo> state_table_info = 7;
Expand All @@ -204,9 +205,8 @@ message HummockVersionDelta {
// Levels of each compaction group
map<uint64, GroupDeltas> group_deltas = 3;
uint64 max_committed_epoch = 4;
// Snapshots with epoch less than the safe epoch have been GCed.
// Reads against such an epoch will fail.
uint64 safe_epoch = 5;
reserved 5;
reserved 'safe_epoch';
bool trivial_move = 6;
reserved 7;
reserved "gc_object_ids";
Expand Down Expand Up @@ -364,8 +364,8 @@ message CompactTask {
// In ideal case, the compaction will generate splits.len() tables which have key range
// corresponding to that in [splits], respectively
repeated KeyRange splits = 2;
// low watermark in 'ts-aware compaction'
uint64 watermark = 3;
reserved 3;
reserved 'watermark';
// compaction output, which will be added to [target_level] of LSM after compaction
repeated SstableInfo sorted_output_ssts = 4;
// task id assigned by hummock storage service
Expand Down
8 changes: 7 additions & 1 deletion src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,13 @@ pub async fn print_version_delta_in_archive(
if match_delta(d, sst_id) {
if is_first {
is_first = false;
println!("delta: id {}, prev_id {}, max_committed_epoch {}, trivial_move {}, safe_epoch {}", delta.id, delta.prev_id, delta.max_committed_epoch, delta.trivial_move, delta.safe_epoch);
println!(
"delta: id {}, prev_id {}, max_committed_epoch {}, trivial_move {}",
delta.id,
delta.prev_id,
delta.max_committed_epoch,
delta.trivial_move
);
}
println!("compaction group id {cg_id}");
print_delta(d);
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
id: Set(vd.id.to_u64() as _),
prev_id: Set(vd.prev_id.to_u64() as _),
max_committed_epoch: Set(vd.visible_table_committed_epoch() as _),
safe_epoch: Set(vd.visible_table_safe_epoch() as _),
safe_epoch: Set(0 as _),
trivial_move: Set(vd.trivial_move),
full_version_delta: Set((&vd.to_protobuf()).into()),
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ struct RwHummockCompactTaskAssignment {
target_level: i32,
task_type: i32,
task_status: i32,
watermark: i64,
base_level: i32,
gc_delete_keys: bool,
target_file_size: i64,
Expand All @@ -55,7 +54,6 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<RwHummockCompactTaskA
target_level: compact_task.target_level as _,
task_type: compact_task.task_type as _,
task_status: compact_task.task_status as _,
watermark: compact_task.watermark as _,
base_level: compact_task.base_level as _,
gc_delete_keys: compact_task.gc_delete_keys as _,
target_file_size: compact_task.target_file_size as _,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ struct RwHummockVersion {
#[primary_key]
version_id: i64,
max_committed_epoch: i64,
safe_epoch: i64,
compaction_group: JsonbVal,
}

Expand Down Expand Up @@ -103,7 +102,6 @@ fn version_to_compaction_group_rows(version: &HummockVersion) -> Vec<RwHummockVe
.map(|cg| RwHummockVersion {
version_id: version.id.to_u64() as _,
max_committed_epoch: version.visible_table_committed_epoch() as _,
safe_epoch: version.visible_table_safe_epoch() as _,
compaction_group: json!(cg.to_protobuf()).into(),
})
.collect()
Expand Down Expand Up @@ -207,7 +205,6 @@ async fn read_hummock_table_watermarks(
struct RwHummockSnapshot {
#[primary_key]
table_id: i32,
safe_epoch: i64,
committed_epoch: i64,
}

Expand All @@ -223,7 +220,6 @@ async fn read_hummock_snapshot_groups(
.map(|(table_id, info)| RwHummockSnapshot {
table_id: table_id.table_id as _,
committed_epoch: info.committed_epoch as _,
safe_epoch: info.safe_epoch as _,
})
.collect())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ struct RwHummockVersionDelta {
id: i64,
prev_id: i64,
max_committed_epoch: i64,
safe_epoch: i64,
trivial_move: bool,
group_deltas: JsonbVal,
}
Expand All @@ -42,7 +41,6 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<RwHummockVersionDelta
id: d.id.to_u64() as _,
prev_id: d.prev_id.to_u64() as _,
max_committed_epoch: d.visible_table_committed_epoch() as _,
safe_epoch: d.visible_table_safe_epoch() as _,
trivial_move: d.trivial_move,
group_deltas: json!(d
.group_deltas
Expand Down
1 change: 0 additions & 1 deletion src/meta/model_v2/src/hummock_version_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ impl From<Model> for PbHummockVersionDelta {
assert_eq!(value.id, ret.id as i64);
assert_eq!(value.prev_id, ret.prev_id as i64);
assert_eq!(value.max_committed_epoch, ret.max_committed_epoch as i64);
assert_eq!(value.safe_epoch, ret.safe_epoch as i64);
assert_eq!(value.trivial_move, ret.trivial_move);
ret
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
safe_epoch_read_table_watermarks_impl, safe_epoch_table_watermarks_impl,
safe_epoch_read_table_watermarks_impl, table_watermarks_by_table_ids_impl,
};
use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, TableWatermarks};
use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
use risingwave_hummock_sdk::HummockCompactionTaskId;
use risingwave_pb::hummock::compact_task::TaskType;

Expand All @@ -43,15 +42,13 @@ impl CompactionSelector for VnodeWatermarkCompactionSelector {
developer_config,
table_watermarks,
member_table_ids,
state_table_info,
..
} = context;
let dynamic_level_core =
DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
let ctx = dynamic_level_core.calculate_level_base_size(levels);
let mut picker = VnodeWatermarkCompactionPicker::new();
let table_watermarks =
safe_epoch_read_table_watermarks(table_watermarks, state_table_info, member_table_ids);
let table_watermarks = safe_epoch_read_table_watermarks(table_watermarks, member_table_ids);
let compaction_input = picker.pick_compaction(levels, level_handlers, &table_watermarks)?;
compaction_input.add_pending_task(task_id, level_handlers);
Some(create_compaction_task(
Expand All @@ -73,12 +70,10 @@ impl CompactionSelector for VnodeWatermarkCompactionSelector {

fn safe_epoch_read_table_watermarks(
table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
state_table_info: &HummockVersionStateTableInfo,
member_table_ids: &BTreeSet<TableId>,
) -> BTreeMap<TableId, ReadTableWatermark> {
safe_epoch_read_table_watermarks_impl(&safe_epoch_table_watermarks_impl(
safe_epoch_read_table_watermarks_impl(&table_watermarks_by_table_ids_impl(
table_watermarks,
state_table_info,
&member_table_ids
.iter()
.map(TableId::table_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ impl HummockManager {
TableId::new(*table_id),
PbStateTableInfoDelta {
committed_epoch: epoch,
safe_epoch: epoch,
compaction_group_id: *raw_group_id,
}
)
Expand Down Expand Up @@ -544,7 +543,6 @@ impl HummockManager {
table_id,
PbStateTableInfoDelta {
committed_epoch: info.committed_epoch,
safe_epoch: info.safe_epoch,
compaction_group_id: new_compaction_group_id,
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ impl HummockManager {
table_id,
PbStateTableInfoDelta {
committed_epoch: info.committed_epoch,
safe_epoch: info.safe_epoch,
compaction_group_id: target_compaction_group_id,
}
)
Expand Down
49 changes: 2 additions & 47 deletions src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::min;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::{Arc, LazyLock};
use std::time::{Instant, SystemTime};
Expand Down Expand Up @@ -63,8 +62,7 @@ use risingwave_pb::hummock::subscribe_compaction_event_response::{
};
use risingwave_pb::hummock::{
compact_task, CompactTaskAssignment, CompactionConfig, PbCompactStatus,
PbCompactTaskAssignment, StateTableInfoDelta, SubscribeCompactionEventRequest, TableOption,
TableSchema,
PbCompactTaskAssignment, SubscribeCompactionEventRequest, TableOption, TableSchema,
};
use rw_futures_util::pending_on_none;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -189,38 +187,6 @@ impl<'a> HummockVersionTransaction<'a> {
));

group_deltas.push(group_delta);
let new_visible_table_safe_epoch = std::cmp::max(
version_delta.latest_version().visible_table_safe_epoch(),
compact_task.watermark,
);
version_delta.set_safe_epoch(new_visible_table_safe_epoch);
if version_delta.latest_version().visible_table_safe_epoch() < new_visible_table_safe_epoch
{
version_delta.with_latest_version(|version, version_delta| {
for (table_id, info) in version.state_table_info.info() {
let new_safe_epoch = min(new_visible_table_safe_epoch, info.committed_epoch);
if new_safe_epoch > info.safe_epoch {
if new_safe_epoch != version_delta.visible_table_safe_epoch() {
warn!(
new_safe_epoch,
committed_epoch = info.committed_epoch,
global_safe_epoch = new_visible_table_safe_epoch,
table_id = table_id.table_id,
"table has different safe epoch to global"
);
}
version_delta.state_table_info_delta.insert(
*table_id,
StateTableInfoDelta {
committed_epoch: info.committed_epoch,
safe_epoch: new_safe_epoch,
compaction_group_id: info.compaction_group_id,
},
);
}
}
});
}
version_delta.pre_apply();
}
}
Expand Down Expand Up @@ -667,16 +633,6 @@ impl HummockManager {
let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");

let start_time = Instant::now();
let max_committed_epoch = versioning.current_version.visible_table_committed_epoch();
let watermark = self
.context_info
.read()
.await
.pinned_snapshots
.values()
.map(|v| v.minimal_pinned_snapshot)
.fold(max_committed_epoch, std::cmp::min);

let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);

let mut compact_task_assignment =
Expand Down Expand Up @@ -787,7 +743,6 @@ impl HummockManager {
let mut compact_task = CompactTask {
input_ssts: compact_task.input.input_levels,
splits: vec![KeyRange::inf()],
watermark,
sorted_output_ssts: vec![],
task_id,
target_level: target_level_id,
Expand Down Expand Up @@ -869,7 +824,7 @@ impl HummockManager {
.await;
compact_task.table_watermarks = version
.latest_version()
.safe_epoch_table_watermarks(&compact_task.existing_table_ids);
.table_watermarks_by_table_ids(&compact_task.existing_table_ids);

if self.env.opts.enable_dropped_column_reclaim {
// TODO: get all table schemas for all tables in once call to avoid acquiring lock and await.
Expand Down
5 changes: 0 additions & 5 deletions src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion)
metrics
.version_size
.set(current_version.estimated_encode_len() as i64);
metrics
.safe_epoch
.set(current_version.visible_table_safe_epoch() as i64);
metrics
.current_version_id
.set(current_version.id.to_u64() as i64);
Expand Down Expand Up @@ -185,7 +182,6 @@ impl<'a> HummockVersionTransaction<'a> {
*table_id,
StateTableInfoDelta {
committed_epoch,
safe_epoch: committed_epoch,
compaction_group_id: *cg_id,
},
);
Expand All @@ -204,7 +200,6 @@ impl<'a> HummockVersionTransaction<'a> {
*table_id,
StateTableInfoDelta {
committed_epoch,
safe_epoch: info.safe_epoch,
compaction_group_id: info.compaction_group_id,
}
)
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/model/ext/hummock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl Transactional<Transaction> for HummockVersionDelta {
id: Set(self.id.to_u64().try_into().unwrap()),
prev_id: Set(self.prev_id.to_u64().try_into().unwrap()),
max_committed_epoch: Set(self.visible_table_committed_epoch().try_into().unwrap()),
safe_epoch: Set(self.visible_table_safe_epoch().try_into().unwrap()),
safe_epoch: Set(0.into()),
trivial_move: Set(self.trivial_move),
full_version_delta: Set(FullVersionDelta::from(&self.into())),
};
Expand Down
2 changes: 1 addition & 1 deletion src/storage/backup/integration_tests/run_all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
tests=( \
"test_basic.sh" \
"test_pin_sst.sh" \
"test_query_backup.sh" \
#"test_query_backup.sh" \
"test_set_config.sh" \
zwang28 marked this conversation as resolved.
Show resolved Hide resolved
)
for t in "${tests[@]}"
Expand Down
Loading
Loading