Skip to content

Commit

Permalink
Revert "feat(storage): seperate timeout between process and heartbeat (
Browse files Browse the repository at this point in the history
…#14366)"

This reverts commit c9e3e92.
  • Loading branch information
fuyufjh committed Jan 29, 2024
1 parent 46ab019 commit c1f2cc1
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 193 deletions.
21 changes: 6 additions & 15 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,10 @@ pub struct MetaConfig {
/// split it to an single group.
pub min_table_split_write_throughput: u64,

// If the compaction task does not report heartbeat beyond the
// `compaction_task_max_heartbeat_interval_secs` interval, we will cancel the task
#[serde(default = "default::meta::compaction_task_max_heartbeat_interval_secs")]
pub compaction_task_max_heartbeat_interval_secs: u64,

// If the compaction task does not change in progress beyond the
// `compaction_task_max_heartbeat_interval_secs` interval, we will cancel the task
#[serde(default = "default::meta::compaction_task_max_progress_interval_secs")]
pub compaction_task_max_progress_interval_secs: u64,
pub compaction_task_max_heartbeat_interval_secs: u64,

#[serde(default)]
pub compaction_config: CompactionConfig,
Expand Down Expand Up @@ -1064,11 +1059,7 @@ pub mod default {
}

pub fn compaction_task_max_heartbeat_interval_secs() -> u64 {
30 // 30s
}

pub fn compaction_task_max_progress_interval_secs() -> u64 {
60 * 10 // 10min
60 // 1min
}

pub fn cut_table_size_limit() -> u64 {
Expand Down Expand Up @@ -1498,19 +1489,19 @@ pub mod default {

pub mod object_store_config {
pub fn object_store_streaming_read_timeout_ms() -> u64 {
8 * 60 * 1000
10 * 60 * 1000
}

pub fn object_store_streaming_upload_timeout_ms() -> u64 {
8 * 60 * 1000
10 * 60 * 1000
}

pub fn object_store_upload_timeout_ms() -> u64 {
8 * 60 * 1000
60 * 60 * 1000
}

pub fn object_store_read_timeout_ms() -> u64 {
8 * 60 * 1000
60 * 60 * 1000
}

pub mod s3 {
Expand Down
11 changes: 5 additions & 6 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ do_not_config_object_storage_lifecycle = false
partition_vnode_count = 16
table_write_throughput_threshold = 16777216
min_table_split_write_throughput = 4194304
compaction_task_max_heartbeat_interval_secs = 30
compaction_task_max_progress_interval_secs = 600
compaction_task_max_heartbeat_interval_secs = 60
hybird_partition_vnode_count = 4
event_log_enabled = true
event_log_channel_max_size = 10
Expand Down Expand Up @@ -165,10 +164,10 @@ recent_filter_layers = 6
recent_filter_rotate_interval_ms = 10000

[storage.object_store]
object_store_streaming_read_timeout_ms = 480000
object_store_streaming_upload_timeout_ms = 480000
object_store_upload_timeout_ms = 480000
object_store_read_timeout_ms = 480000
object_store_streaming_read_timeout_ms = 600000
object_store_streaming_upload_timeout_ms = 600000
object_store_upload_timeout_ms = 3600000
object_store_read_timeout_ms = 3600000

[storage.object_store.s3]
object_store_keepalive_ms = 600000
Expand Down
23 changes: 0 additions & 23 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,28 +248,6 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
ui_path: opts.dashboard_ui_path,
};

const MIN_TIMEOUT_INTERVAL_SEC: u64 = 20;
let compaction_task_max_progress_interval_secs = {
config
.storage
.object_store
.object_store_read_timeout_ms
.max(config.storage.object_store.object_store_upload_timeout_ms)
.max(
config
.storage
.object_store
.object_store_streaming_read_timeout_ms,
)
.max(
config
.storage
.object_store
.object_store_streaming_upload_timeout_ms,
)
.max(config.meta.compaction_task_max_progress_interval_secs)
} + MIN_TIMEOUT_INTERVAL_SEC;

let (mut join_handle, leader_lost_handle, shutdown_send) = rpc_serve(
add_info,
backend,
Expand Down Expand Up @@ -332,7 +310,6 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
compaction_task_max_heartbeat_interval_secs: config
.meta
.compaction_task_max_heartbeat_interval_secs,
compaction_task_max_progress_interval_secs,
compaction_config: Some(config.meta.compaction_config),
cut_table_size_limit: config.meta.cut_table_size_limit,
hybird_partition_vnode_count: config.meta.hybird_partition_vnode_count,
Expand Down
59 changes: 27 additions & 32 deletions src/meta/src/hummock/compactor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ struct TaskHeartbeat {
num_pending_write_io: u64,
create_time: Instant,
expire_at: u64,

update_at: u64,
}

impl Compactor {
Expand Down Expand Up @@ -118,8 +116,7 @@ impl Compactor {
/// - 3. Cancelled: a task is reported as cancelled via `CompactStatus::report_compact_task`. It's
/// the final state.
pub struct CompactorManagerInner {
pub task_expired_seconds: u64,
pub heartbeat_expired_seconds: u64,
pub task_expiry_seconds: u64,
task_heartbeats: HashMap<HummockCompactionTaskId, TaskHeartbeat>,

/// The outer lock is a RwLock, so we should still be able to modify each compactor
Expand All @@ -142,8 +139,7 @@ impl CompactorManagerInner {
.collect(),
};
let mut manager = Self {
task_expired_seconds: env.opts.compaction_task_max_progress_interval_secs,
heartbeat_expired_seconds: env.opts.compaction_task_max_heartbeat_interval_secs,
task_expiry_seconds: env.opts.compaction_task_max_heartbeat_interval_secs,
task_heartbeats: Default::default(),
compactor_map: Default::default(),
};
Expand All @@ -157,8 +153,7 @@ impl CompactorManagerInner {
/// Only used for unit test.
pub fn for_test() -> Self {
Self {
task_expired_seconds: 1,
heartbeat_expired_seconds: 1,
task_expiry_seconds: 1,
task_heartbeats: Default::default(),
compactor_map: Default::default(),
}
Expand Down Expand Up @@ -244,18 +239,19 @@ impl CompactorManagerInner {
ret
}

pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
let heartbeat_expired_ts: u64 = SystemTime::now()
pub fn get_expired_tasks(&self, interval_sec: Option<u64>) -> Vec<CompactTask> {
let interval = interval_sec.unwrap_or(0);
let now: u64 = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Clock may have gone backwards")
.as_secs()
- self.heartbeat_expired_seconds;
Self::get_heartbeat_expired_tasks_impl(&self.task_heartbeats, heartbeat_expired_ts)
- interval;
Self::get_heartbeat_expired_tasks(&self.task_heartbeats, now)
}

fn get_heartbeat_expired_tasks_impl(
fn get_heartbeat_expired_tasks(
task_heartbeats: &HashMap<HummockCompactionTaskId, TaskHeartbeat>,
heartbeat_expired_ts: u64,
now: u64,
) -> Vec<CompactTask> {
let mut cancellable_tasks = vec![];
const MAX_TASK_DURATION_SEC: u64 = 2700;
Expand All @@ -269,24 +265,22 @@ impl CompactorManagerInner {
num_progress_key,
num_pending_read_io,
num_pending_write_io,
update_at,
} in task_heartbeats.values()
{
if *update_at < heartbeat_expired_ts {
if *expire_at < now {
// task heartbeat expire
cancellable_tasks.push(task.clone());
}

let task_duration_too_long = create_time.elapsed().as_secs() > MAX_TASK_DURATION_SEC;
if task_duration_too_long {
let compact_task_statistics = statistics_compact_task(task);
tracing::info!(
"CompactionGroupId {} Task {} duration too long create_time {:?} expire_at {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \
"CompactionGroupId {} Task {} duration too long create_time {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \
pending_read_io_count {} pending_write_io_count {} target_level {} \
base_level {} target_sub_level_id {} task_type {} compact_task_statistics {:?}",
task.compaction_group_id,
task.task_id,
create_time,
expire_at,
num_ssts_sealed,
num_ssts_uploaded,
num_progress_key,
Expand Down Expand Up @@ -318,8 +312,7 @@ impl CompactorManagerInner {
num_pending_read_io: 0,
num_pending_write_io: 0,
create_time: Instant::now(),
expire_at: now + self.task_expired_seconds,
update_at: now,
expire_at: now + self.task_expiry_seconds,
},
);
}
Expand All @@ -339,14 +332,12 @@ impl CompactorManagerInner {
let mut cancel_tasks = vec![];
for progress in progress_list {
if let Some(task_ref) = self.task_heartbeats.get_mut(&progress.task_id) {
task_ref.update_at = now;

if task_ref.num_ssts_sealed < progress.num_ssts_sealed
|| task_ref.num_ssts_uploaded < progress.num_ssts_uploaded
|| task_ref.num_progress_key < progress.num_progress_key
{
// Refresh the expired of the task as it is showing progress.
task_ref.expire_at = now + self.task_expired_seconds;
// Refresh the expiry of the task as it is showing progress.
task_ref.expire_at = now + self.task_expiry_seconds;
task_ref.num_ssts_sealed = progress.num_ssts_sealed;
task_ref.num_ssts_uploaded = progress.num_ssts_uploaded;
task_ref.num_progress_key = progress.num_progress_key;
Expand Down Expand Up @@ -439,8 +430,8 @@ impl CompactorManager {
.check_tasks_status(tasks, slow_task_duration)
}

pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
self.inner.read().get_heartbeat_expired_tasks()
pub fn get_expired_tasks(&self, interval_sec: Option<u64>) -> Vec<CompactTask> {
self.inner.read().get_expired_tasks(interval_sec)
}

pub fn initiate_task_heartbeat(&self, task: CompactTask) {
Expand Down Expand Up @@ -506,7 +497,7 @@ mod tests {
(env, context_id)
};

// Restart. Set task_expired_seconds to 0 only to speed up test.
// Restart. Set task_expiry_seconds to 0 only to speed up test.
let compactor_manager = CompactorManager::with_meta(env).await.unwrap();
// Because task assignment exists.
// Because compactor gRPC is not established yet.
Expand All @@ -515,11 +506,15 @@ mod tests {

// Ensure task is expired.
tokio::time::sleep(Duration::from_secs(2)).await;
let expired = compactor_manager.get_heartbeat_expired_tasks();
let expired = compactor_manager.get_expired_tasks(None);
assert_eq!(expired.len(), 1);

// Mimic no-op compaction heartbeat
assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
task_id: expired[0].task_id,
..Default::default()
}]);
assert_eq!(compactor_manager.get_expired_tasks(None).len(), 1);

// Mimic compaction heartbeat with invalid task id
compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
Expand All @@ -529,7 +524,7 @@ mod tests {
num_progress_key: 100,
..Default::default()
}]);
assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
assert_eq!(compactor_manager.get_expired_tasks(None).len(), 1);

// Mimic effective compaction heartbeat
compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
Expand All @@ -539,7 +534,7 @@ mod tests {
num_progress_key: 100,
..Default::default()
}]);
assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 0);
assert_eq!(compactor_manager.get_expired_tasks(None).len(), 0);

// Test add
assert_eq!(compactor_manager.compactor_num(), 0);
Expand Down
18 changes: 14 additions & 4 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2186,7 +2186,7 @@ impl HummockManager {
GroupSplit,
CheckDeadTask,
Report,
CompactionHeartBeatExpiredCheck,
CompactionHeartBeat,

DynamicCompactionTrigger,
SpaceReclaimCompactionTrigger,
Expand Down Expand Up @@ -2218,7 +2218,7 @@ impl HummockManager {
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
compaction_heartbeat_interval.reset();
let compaction_heartbeat_trigger = IntervalStream::new(compaction_heartbeat_interval)
.map(|_| HummockTimerEvent::CompactionHeartBeatExpiredCheck);
.map(|_| HummockTimerEvent::CompactionHeartBeat);

let mut min_trigger_interval = tokio::time::interval(Duration::from_secs(
hummock_manager.env.opts.periodic_compaction_interval_sec,
Expand Down Expand Up @@ -2397,17 +2397,27 @@ impl HummockManager {
}
}

HummockTimerEvent::CompactionHeartBeatExpiredCheck => {
HummockTimerEvent::CompactionHeartBeat => {
let compactor_manager =
hummock_manager.compactor_manager.clone();

// TODO: add metrics to track expired tasks
const INTERVAL_SEC: u64 = 30;
// The cancel task has two paths
// 1. compactor heartbeat cancels the expired task based on task
// progress (meta + compactor)
// 2. meta periodically scans the task and performs a cancel on
// the meta side for tasks that are not updated by heartbeat
for task in compactor_manager.get_heartbeat_expired_tasks() {

// So the reason for setting Interval is to let compactor be
// responsible for canceling the corresponding task as much as
// possible by relaxing the conditions for detection on the meta
// side, and meta is just used as a last resort to clean up the
// tasks that compactor has expired.

for task in
compactor_manager.get_expired_tasks(Some(INTERVAL_SEC))
{
if let Err(e) = hummock_manager
.cancel_compact_task(
task.task_id,
Expand Down
2 changes: 0 additions & 2 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ pub struct MetaOpts {
pub min_table_split_write_throughput: u64,

pub compaction_task_max_heartbeat_interval_secs: u64,
pub compaction_task_max_progress_interval_secs: u64,
pub compaction_config: Option<CompactionConfig>,

/// The size limit to split a state-table to independent sstable.
Expand Down Expand Up @@ -253,7 +252,6 @@ impl MetaOpts {
do_not_config_object_storage_lifecycle: true,
partition_vnode_count: 32,
compaction_task_max_heartbeat_interval_secs: 0,
compaction_task_max_progress_interval_secs: 1,
compaction_config: None,
cut_table_size_limit: 1024 * 1024 * 1024,
hybird_partition_vnode_count: 4,
Expand Down
1 change: 1 addition & 0 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ async fn compact<I: HummockIterator<Direction = Forward>>(iter: I, sstable_store
Arc::new(CompactorMetrics::unused()),
iter,
DummyCompactionFilter,
None,
)
.await
.unwrap();
Expand Down
Loading

0 comments on commit c1f2cc1

Please sign in to comment.