Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): seperate timeout between process and heartbeat #14366

Merged
merged 36 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
635f777
feat(storage): seperate timeout between process and heartbeat
Li0k Jan 4, 2024
974fa53
fix(dashboard): minor style tweaks (#14356)
BugenZhao Jan 4, 2024
191ba77
fix: ignore dependent table_fragment not exist (#14308)
fuyufjh Jan 4, 2024
1aff233
fix(stream/materialize): incorrect implementation in ignore-conflict …
TennyZhuang Jan 4, 2024
0217506
refactor(frontend): simplify bind_columns_from_source (#14335)
xxchan Jan 4, 2024
f3eafad
fix: pubsub auth (#14360)
tabVersion Jan 4, 2024
5bf6c46
feat(sink): support es sink struct and refactor es sink (#14231)
xxhZs Jan 4, 2024
63103d4
feat(catalog): add pg_get_viewdef (#14336)
yuhao-su Jan 4, 2024
e038eb1
chore(storage): update config
Li0k Jan 4, 2024
fcc827b
fix(storage):fix ut
Li0k Jan 4, 2024
8bc1996
chore(storage): rename
Li0k Jan 5, 2024
c0409ca
fix(stream/materialize): enforce update_cache when modify fixed_chang…
TennyZhuang Jan 4, 2024
be1c706
test: reorganize the sink to table e2e test (#14158)
st1page Jan 4, 2024
d44270e
test: fix flaky sink_into_table test (#14372)
xxchan Jan 5, 2024
abee759
test: bump sqllogictest & fix backwards-compat-test/e2e-tests (#14354)
xxchan Jan 5, 2024
7a784cc
fix: Do not recover background streaming jobs whose table fragments h…
yezizp2012 Jan 5, 2024
ca74b3c
chore(storage): ctl support cancel specific task (#14325)
Li0k Jan 5, 2024
1795e6c
feat(connector): support local fs source (#14312)
KeXiangWang Jan 5, 2024
9671c76
chore(storage): typo
Li0k Jan 10, 2024
58b3bb3
chore(storage): revert num io
Li0k Jan 10, 2024
1c4bd73
feat(storage): skip_watermark support task progress
Li0k Jan 10, 2024
eb7824f
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jan 10, 2024
51f8e6a
chore(storage): typo
Li0k Jan 10, 2024
c43cdd1
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jan 10, 2024
f8fde47
fix(storage): fix compile
Li0k Jan 10, 2024
c7ae224
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jan 10, 2024
99e6e7c
fix(storage): fix compile
Li0k Jan 10, 2024
a8277a5
fix(storage): fix compile
Li0k Jan 11, 2024
b274193
fix(storage): move task_progress from skip_iter to merge_iter
Li0k Jan 11, 2024
35d1bad
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jan 11, 2024
71c12c7
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jan 15, 2024
ad6f99a
refactor(storage): introduced MonitoredCompactorIterator
Li0k Jan 15, 2024
074d9b1
refactor(storage): introduced MonitoredCompactorIterator
Li0k Jan 15, 2024
4819ddd
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jan 15, 2024
7ac4049
chore(storage): typo
Li0k Jan 15, 2024
4eb64eb
fix(storage): address comments
Li0k Jan 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,15 @@ pub struct MetaConfig {
/// split it to an single group.
pub min_table_split_write_throughput: u64,

// If the compaction task does not report heartbeat beyond the
// `compaction_task_max_heartbeat_interval_secs` interval, we will cancel the task
#[serde(default = "default::meta::compaction_task_max_heartbeat_interval_secs")]
pub compaction_task_max_heartbeat_interval_secs: u64,

// If the compaction task does not change in progress beyond the
// `compaction_task_max_heartbeat_interval_secs` interval, we will cancel the task
pub compaction_task_max_heartbeat_interval_secs: u64,
#[serde(default = "default::meta::compaction_task_max_progress_interval_secs")]
pub compaction_task_max_progress_interval_secs: u64,

#[serde(default)]
pub compaction_config: CompactionConfig,
Expand Down Expand Up @@ -1059,7 +1064,11 @@ pub mod default {
}

pub fn compaction_task_max_heartbeat_interval_secs() -> u64 {
60 // 1min
30 // 30s
}

pub fn compaction_task_max_progress_interval_secs() -> u64 {
60 * 10 // 10min
}

pub fn cut_table_size_limit() -> u64 {
Expand Down Expand Up @@ -1489,19 +1498,19 @@ pub mod default {

pub mod object_store_config {
pub fn object_store_streaming_read_timeout_ms() -> u64 {
10 * 60 * 1000
8 * 60 * 1000
}

pub fn object_store_streaming_upload_timeout_ms() -> u64 {
10 * 60 * 1000
8 * 60 * 1000
}

pub fn object_store_upload_timeout_ms() -> u64 {
60 * 60 * 1000
8 * 60 * 1000
}

pub fn object_store_read_timeout_ms() -> u64 {
60 * 60 * 1000
8 * 60 * 1000
}

pub mod s3 {
Expand Down
11 changes: 6 additions & 5 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ do_not_config_object_storage_lifecycle = false
partition_vnode_count = 16
table_write_throughput_threshold = 16777216
min_table_split_write_throughput = 4194304
compaction_task_max_heartbeat_interval_secs = 60
compaction_task_max_heartbeat_interval_secs = 30
compaction_task_max_progress_interval_secs = 600
hybird_partition_vnode_count = 4
event_log_enabled = true
event_log_channel_max_size = 10
Expand Down Expand Up @@ -164,10 +165,10 @@ recent_filter_layers = 6
recent_filter_rotate_interval_ms = 10000

[storage.object_store]
object_store_streaming_read_timeout_ms = 600000
object_store_streaming_upload_timeout_ms = 600000
object_store_upload_timeout_ms = 3600000
object_store_read_timeout_ms = 3600000
object_store_streaming_read_timeout_ms = 480000
object_store_streaming_upload_timeout_ms = 480000
object_store_upload_timeout_ms = 480000
object_store_read_timeout_ms = 480000

[storage.object_store.s3]
object_store_keepalive_ms = 600000
Expand Down
23 changes: 23 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,28 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
ui_path: opts.dashboard_ui_path,
};

const MIN_TIMEOUT_INTERVAL_SEC: u64 = 20;
let compaction_task_max_progress_interval_secs = {
config
.storage
.object_store
.object_store_read_timeout_ms
.max(config.storage.object_store.object_store_upload_timeout_ms)
.max(
config
.storage
.object_store
.object_store_streaming_read_timeout_ms,
)
.max(
config
.storage
.object_store
.object_store_streaming_upload_timeout_ms,
)
.max(config.meta.compaction_task_max_progress_interval_secs)
} + MIN_TIMEOUT_INTERVAL_SEC;

let (mut join_handle, leader_lost_handle, shutdown_send) = rpc_serve(
add_info,
backend,
Expand Down Expand Up @@ -310,6 +332,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
compaction_task_max_heartbeat_interval_secs: config
.meta
.compaction_task_max_heartbeat_interval_secs,
compaction_task_max_progress_interval_secs,
compaction_config: Some(config.meta.compaction_config),
cut_table_size_limit: config.meta.cut_table_size_limit,
hybird_partition_vnode_count: config.meta.hybird_partition_vnode_count,
Expand Down
59 changes: 32 additions & 27 deletions src/meta/src/hummock/compactor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ struct TaskHeartbeat {
num_pending_write_io: u64,
create_time: Instant,
expire_at: u64,

update_at: u64,
}

impl Compactor {
Expand Down Expand Up @@ -116,7 +118,8 @@ impl Compactor {
/// - 3. Cancelled: a task is reported as cancelled via `CompactStatus::report_compact_task`. It's
/// the final state.
pub struct CompactorManagerInner {
pub task_expiry_seconds: u64,
pub task_expired_seconds: u64,
pub heartbeat_expired_seconds: u64,
task_heartbeats: HashMap<HummockCompactionTaskId, TaskHeartbeat>,

/// The outer lock is a RwLock, so we should still be able to modify each compactor
Expand All @@ -139,7 +142,8 @@ impl CompactorManagerInner {
.collect(),
};
let mut manager = Self {
task_expiry_seconds: env.opts.compaction_task_max_heartbeat_interval_secs,
task_expired_seconds: env.opts.compaction_task_max_progress_interval_secs,
heartbeat_expired_seconds: env.opts.compaction_task_max_heartbeat_interval_secs,
task_heartbeats: Default::default(),
compactor_map: Default::default(),
};
Expand All @@ -153,7 +157,8 @@ impl CompactorManagerInner {
/// Only used for unit test.
pub fn for_test() -> Self {
Self {
task_expiry_seconds: 1,
task_expired_seconds: 1,
heartbeat_expired_seconds: 1,
task_heartbeats: Default::default(),
compactor_map: Default::default(),
}
Expand Down Expand Up @@ -239,19 +244,18 @@ impl CompactorManagerInner {
ret
}

pub fn get_expired_tasks(&self, interval_sec: Option<u64>) -> Vec<CompactTask> {
let interval = interval_sec.unwrap_or(0);
let now: u64 = SystemTime::now()
pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
let heartbeat_expired_ts: u64 = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Clock may have gone backwards")
.as_secs()
- interval;
Self::get_heartbeat_expired_tasks(&self.task_heartbeats, now)
- self.heartbeat_expired_seconds;
Self::get_heartbeat_expired_tasks_impl(&self.task_heartbeats, heartbeat_expired_ts)
}

fn get_heartbeat_expired_tasks(
fn get_heartbeat_expired_tasks_impl(
task_heartbeats: &HashMap<HummockCompactionTaskId, TaskHeartbeat>,
now: u64,
heartbeat_expired_ts: u64,
) -> Vec<CompactTask> {
let mut cancellable_tasks = vec![];
const MAX_TASK_DURATION_SEC: u64 = 2700;
Expand All @@ -265,22 +269,24 @@ impl CompactorManagerInner {
num_progress_key,
num_pending_read_io,
num_pending_write_io,
update_at,
} in task_heartbeats.values()
{
if *expire_at < now {
// task heartbeat expire
if *update_at < heartbeat_expired_ts {
cancellable_tasks.push(task.clone());
}

let task_duration_too_long = create_time.elapsed().as_secs() > MAX_TASK_DURATION_SEC;
if task_duration_too_long {
let compact_task_statistics = statistics_compact_task(task);
tracing::info!(
"CompactionGroupId {} Task {} duration too long create_time {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \
"CompactionGroupId {} Task {} duration too long create_time {:?} expire_at {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \
pending_read_io_count {} pending_write_io_count {} target_level {} \
base_level {} target_sub_level_id {} task_type {} compact_task_statistics {:?}",
task.compaction_group_id,
task.task_id,
create_time,
expire_at,
num_ssts_sealed,
num_ssts_uploaded,
num_progress_key,
Expand Down Expand Up @@ -312,7 +318,8 @@ impl CompactorManagerInner {
num_pending_read_io: 0,
num_pending_write_io: 0,
create_time: Instant::now(),
expire_at: now + self.task_expiry_seconds,
expire_at: now + self.task_expired_seconds,
update_at: now,
},
);
}
Expand All @@ -332,12 +339,14 @@ impl CompactorManagerInner {
let mut cancel_tasks = vec![];
for progress in progress_list {
if let Some(task_ref) = self.task_heartbeats.get_mut(&progress.task_id) {
task_ref.update_at = now;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In original implement, we update "expire_at" to "expire_at + 60 secs". And we check it with "now - 30secs". It means that the task will be cancled after "90s" not changed.
But now, we will cancel task if it did not receive heartbeat during "30s". Is it too short ?


if task_ref.num_ssts_sealed < progress.num_ssts_sealed
|| task_ref.num_ssts_uploaded < progress.num_ssts_uploaded
|| task_ref.num_progress_key < progress.num_progress_key
{
// Refresh the expiry of the task as it is showing progress.
task_ref.expire_at = now + self.task_expiry_seconds;
// Refresh the expired of the task as it is showing progress.
task_ref.expire_at = now + self.task_expired_seconds;
task_ref.num_ssts_sealed = progress.num_ssts_sealed;
task_ref.num_ssts_uploaded = progress.num_ssts_uploaded;
task_ref.num_progress_key = progress.num_progress_key;
Expand Down Expand Up @@ -430,8 +439,8 @@ impl CompactorManager {
.check_tasks_status(tasks, slow_task_duration)
}

pub fn get_expired_tasks(&self, interval_sec: Option<u64>) -> Vec<CompactTask> {
self.inner.read().get_expired_tasks(interval_sec)
pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
self.inner.read().get_heartbeat_expired_tasks()
}

pub fn initiate_task_heartbeat(&self, task: CompactTask) {
Expand Down Expand Up @@ -497,7 +506,7 @@ mod tests {
(env, context_id)
};

// Restart. Set task_expiry_seconds to 0 only to speed up test.
// Restart. Set task_expired_seconds to 0 only to speed up test.
let compactor_manager = CompactorManager::with_meta(env).await.unwrap();
// Because task assignment exists.
// Because compactor gRPC is not established yet.
Expand All @@ -506,15 +515,11 @@ mod tests {

// Ensure task is expired.
tokio::time::sleep(Duration::from_secs(2)).await;
let expired = compactor_manager.get_expired_tasks(None);
let expired = compactor_manager.get_heartbeat_expired_tasks();
assert_eq!(expired.len(), 1);

// Mimic no-op compaction heartbeat
compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
task_id: expired[0].task_id,
..Default::default()
}]);
assert_eq!(compactor_manager.get_expired_tasks(None).len(), 1);
assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);

// Mimic compaction heartbeat with invalid task id
compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
Expand All @@ -524,7 +529,7 @@ mod tests {
num_progress_key: 100,
..Default::default()
}]);
assert_eq!(compactor_manager.get_expired_tasks(None).len(), 1);
assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);

// Mimic effective compaction heartbeat
compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
Expand All @@ -534,7 +539,7 @@ mod tests {
num_progress_key: 100,
..Default::default()
}]);
assert_eq!(compactor_manager.get_expired_tasks(None).len(), 0);
assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 0);

// Test add
assert_eq!(compactor_manager.compactor_num(), 0);
Expand Down
18 changes: 4 additions & 14 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2189,7 +2189,7 @@ impl HummockManager {
GroupSplit,
CheckDeadTask,
Report,
CompactionHeartBeat,
CompactionHeartBeatExpiredCheck,

DynamicCompactionTrigger,
SpaceReclaimCompactionTrigger,
Expand Down Expand Up @@ -2221,7 +2221,7 @@ impl HummockManager {
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
compaction_heartbeat_interval.reset();
let compaction_heartbeat_trigger = IntervalStream::new(compaction_heartbeat_interval)
.map(|_| HummockTimerEvent::CompactionHeartBeat);
.map(|_| HummockTimerEvent::CompactionHeartBeatExpiredCheck);

let mut min_trigger_interval = tokio::time::interval(Duration::from_secs(
hummock_manager.env.opts.periodic_compaction_interval_sec,
Expand Down Expand Up @@ -2400,27 +2400,17 @@ impl HummockManager {
}
}

HummockTimerEvent::CompactionHeartBeat => {
HummockTimerEvent::CompactionHeartBeatExpiredCheck => {
let compactor_manager =
hummock_manager.compactor_manager.clone();

// TODO: add metrics to track expired tasks
const INTERVAL_SEC: u64 = 30;
// The cancel task has two paths
// 1. compactor heartbeat cancels the expired task based on task
// progress (meta + compactor)
// 2. meta periodically scans the task and performs a cancel on
// the meta side for tasks that are not updated by heartbeat

// So the reason for setting Interval is to let compactor be
// responsible for canceling the corresponding task as much as
// possible by relaxing the conditions for detection on the meta
// side, and meta is just used as a last resort to clean up the
// tasks that compactor has expired.

for task in
compactor_manager.get_expired_tasks(Some(INTERVAL_SEC))
{
for task in compactor_manager.get_heartbeat_expired_tasks() {
if let Err(e) = hummock_manager
.cancel_compact_task(
task.task_id,
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ pub struct MetaOpts {
pub min_table_split_write_throughput: u64,

pub compaction_task_max_heartbeat_interval_secs: u64,
pub compaction_task_max_progress_interval_secs: u64,
pub compaction_config: Option<CompactionConfig>,

/// The size limit to split a state-table to independent sstable.
Expand Down Expand Up @@ -252,6 +253,7 @@ impl MetaOpts {
do_not_config_object_storage_lifecycle: true,
partition_vnode_count: 32,
compaction_task_max_heartbeat_interval_secs: 0,
compaction_task_max_progress_interval_secs: 1,
compaction_config: None,
cut_table_size_limit: 1024 * 1024 * 1024,
hybird_partition_vnode_count: 4,
Expand Down
1 change: 0 additions & 1 deletion src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ async fn compact<I: HummockIterator<Direction = Forward>>(iter: I, sstable_store
Arc::new(CompactorMetrics::unused()),
iter,
DummyCompactionFilter,
None,
)
.await
.unwrap();
Expand Down
Loading
Loading