Skip to content

Commit

Permalink
fix(compactor): Compactor potential oom risk of builder (#16802)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Jun 13, 2024
1 parent 13cdd95 commit 32a1129
Show file tree
Hide file tree
Showing 13 changed files with 202 additions and 123 deletions.
1 change: 1 addition & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ message CompactTask {
JOIN_HANDLE_FAILED = 11;
TRACK_SST_OBJECT_ID_FAILED = 12;
NO_AVAIL_CPU_RESOURCE_CANCELED = 13;
HEARTBEAT_PROGRESS_CANCELED = 14;
}
// SSTs to be compacted, which will be removed from LSM after compaction
repeated InputLevel input_ssts = 1;
Expand Down
55 changes: 47 additions & 8 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,14 @@ pub struct StorageConfig {
#[serde(default = "default::storage::mem_table_spill_threshold")]
pub mem_table_spill_threshold: usize,

/// The concurrent uploading number of `SSTables` of buidler
#[serde(default = "default::storage::compactor_concurrent_uploading_sst_count")]
pub compactor_concurrent_uploading_sst_count: Option<usize>,

/// Object storage configuration
/// 1. General configuration
/// 2. Some special configuration of Backend
/// 3. Retry and timeout configuration
#[serde(default)]
pub object_store: ObjectStoreConfig,
}
Expand Down Expand Up @@ -1024,9 +1032,13 @@ pub struct ObjectStoreConfig {
#[serde(default = "default::object_store_config::object_store_set_atomic_write_dir")]
pub object_store_set_atomic_write_dir: bool,

/// Retry and timeout configuration
/// Description retry strategy driven by exponential back-off
/// Exposes the timeout and retries of each Object store interface. Therefore, the total timeout for each interface is determined based on the interface's timeout/retry configuration and the exponential back-off policy.
#[serde(default)]
pub retry: ObjectStoreRetryConfig,

/// Some special configuration of S3 Backend
#[serde(default)]
pub s3: S3ObjectStoreConfig,
}
Expand Down Expand Up @@ -1080,66 +1092,89 @@ pub struct S3ObjectStoreDeveloperConfig {

#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct ObjectStoreRetryConfig {
// A retry strategy driven by exponential back-off.
// The retry strategy is used for all object store operations.
/// Given a base duration for retry strategy in milliseconds.
#[serde(default = "default::object_store_config::object_store_req_backoff_interval_ms")]
pub req_backoff_interval_ms: u64,

/// The max delay interval for the retry strategy. No retry delay will be longer than this `Duration`.
#[serde(default = "default::object_store_config::object_store_req_backoff_max_delay_ms")]
pub req_backoff_max_delay_ms: u64,

/// A multiplicative factor that will be applied to the exponential back-off retry delay.
#[serde(default = "default::object_store_config::object_store_req_backoff_factor")]
pub req_backoff_factor: u64,

// upload
/// Maximum timeout for `upload` operation
#[serde(default = "default::object_store_config::object_store_upload_attempt_timeout_ms")]
pub upload_attempt_timeout_ms: u64,

/// Total counts of `upload` operation retries
#[serde(default = "default::object_store_config::object_store_upload_retry_attempts")]
pub upload_retry_attempts: usize,

// streaming_upload_init + streaming_upload
/// Maximum timeout for `streaming_upload_init` and `streaming_upload`
#[serde(
default = "default::object_store_config::object_store_streaming_upload_attempt_timeout_ms"
)]
pub streaming_upload_attempt_timeout_ms: u64,

/// Total counts of `streaming_upload` operation retries
#[serde(
default = "default::object_store_config::object_store_streaming_upload_retry_attempts"
)]
pub streaming_upload_retry_attempts: usize,

// read
/// Maximum timeout for `read` operation
#[serde(default = "default::object_store_config::object_store_read_attempt_timeout_ms")]
pub read_attempt_timeout_ms: u64,

/// Total counts of `read` operation retries
#[serde(default = "default::object_store_config::object_store_read_retry_attempts")]
pub read_retry_attempts: usize,

// streaming_read_init + streaming_read
/// Maximum timeout for `streaming_read_init` and `streaming_read` operation
#[serde(
default = "default::object_store_config::object_store_streaming_read_attempt_timeout_ms"
)]
pub streaming_read_attempt_timeout_ms: u64,

/// Total counts of `streaming_read operation` retries
#[serde(default = "default::object_store_config::object_store_streaming_read_retry_attempts")]
pub streaming_read_retry_attempts: usize,

// metadata
/// Maximum timeout for `metadata` operation
#[serde(default = "default::object_store_config::object_store_metadata_attempt_timeout_ms")]
pub metadata_attempt_timeout_ms: u64,

/// Total counts of `metadata` operation retries
#[serde(default = "default::object_store_config::object_store_metadata_retry_attempts")]
pub metadata_retry_attempts: usize,

// delete
/// Maximum timeout for `delete` operation
#[serde(default = "default::object_store_config::object_store_delete_attempt_timeout_ms")]
pub delete_attempt_timeout_ms: u64,

/// Total counts of `delete` operation retries
#[serde(default = "default::object_store_config::object_store_delete_retry_attempts")]
pub delete_retry_attempts: usize,

// delete_object
/// Maximum timeout for `delete_object` operation
#[serde(
default = "default::object_store_config::object_store_delete_objects_attempt_timeout_ms"
)]
pub delete_objects_attempt_timeout_ms: u64,

/// Total counts of `delete_object` operation retries
#[serde(default = "default::object_store_config::object_store_delete_objects_retry_attempts")]
pub delete_objects_retry_attempts: usize,

// list
/// Maximum timeout for `list` operation
#[serde(default = "default::object_store_config::object_store_list_attempt_timeout_ms")]
pub list_attempt_timeout_ms: u64,

/// Total counts of `list` operation retries
#[serde(default = "default::object_store_config::object_store_list_retry_attempts")]
pub list_retry_attempts: usize,
}
Expand Down Expand Up @@ -1509,6 +1544,10 @@ pub mod default {
pub fn max_prefetch_block_number() -> usize {
16
}

pub fn compactor_concurrent_uploading_sst_count() -> Option<usize> {
None
}
}

pub mod streaming {
Expand Down
3 changes: 2 additions & 1 deletion src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ This page is automatically generated by `./risedev generate-example-config`
| cache_refill | | |
| check_compaction_result | | false |
| compact_iter_recreate_timeout_ms | | 600000 |
| compactor_concurrent_uploading_sst_count | The concurrent uploading number of `SSTables` of buidler | |
| compactor_fast_max_compact_delete_ratio | | 40 |
| compactor_fast_max_compact_task_size | | 2147483648 |
| compactor_iter_max_io_retry_times | | 8 |
Expand All @@ -128,7 +129,7 @@ This page is automatically generated by `./risedev generate-example-config`
| meta_file_cache | | |
| min_sst_size_for_streaming_upload | Whether to enable streaming upload for sstable. | 33554432 |
| min_sstable_size_mb | | 32 |
| object_store | | |
| object_store | Object storage configuration 1. General configuration 2. Some special configuration of Backend 3. Retry and timeout configuration | |
| prefetch_buffer_capacity_mb | max memory usage for large query | |
| share_buffer_compaction_worker_threads_number | Worker threads number of dedicated tokio runtime for share buffer compaction. 0 means use tokio's default value (number of CPU core). | 4 |
| share_buffer_upload_concurrency | Number of tasks shared buffer can upload in parallel. | 8 |
Expand Down
7 changes: 7 additions & 0 deletions src/meta/src/hummock/compactor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ impl Compactor {
Ok(())
}

pub fn cancel_tasks(&self, task_ids: &Vec<u64>) -> MetaResult<()> {
for task_id in task_ids {
self.cancel_task(*task_id)?;
}
Ok(())
}

pub fn context_id(&self) -> HummockContextId {
self.context_id
}
Expand Down
61 changes: 29 additions & 32 deletions src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
TaskStatus::InvalidGroupCanceled,
TaskStatus::NoAvailMemoryResourceCanceled,
TaskStatus::NoAvailCpuResourceCanceled,
TaskStatus::HeartbeatProgressCanceled,
]
.into_iter()
.collect()
Expand Down Expand Up @@ -464,40 +465,36 @@ impl HummockManager {
progress,
}) => {
let compactor_manager = hummock_manager.compactor_manager.clone();
let cancel_tasks = compactor_manager.update_task_heartbeats(&progress);
if let Some(compactor) = compactor_manager.get_compactor(context_id) {
// TODO: task cancellation can be batched
for task in cancel_tasks {
tracing::info!(
"Task with group_id {} task_id {} with context_id {} has expired due to lack of visible progress",
task.compaction_group_id,
task.task_id,
context_id,
);
let cancel_tasks = compactor_manager.update_task_heartbeats(&progress).into_iter().map(|task|task.task_id).collect::<Vec<_>>();
if !cancel_tasks.is_empty() {
tracing::info!(
"Tasks cancel with task_ids {:?} with context_id {} has expired due to lack of visible progress",
cancel_tasks,
context_id,
);

if let Err(e) =
hummock_manager
.cancel_compact_task(task.task_id, TaskStatus::HeartbeatCanceled)
.await
{
tracing::error!(
task_id = task.task_id,
error = %e.as_report(),
"Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
until we can successfully report its status."
);
}

// Forcefully cancel the task so that it terminates
// early on the compactor
// node.
let _ = compactor.cancel_task(task.task_id);
tracing::info!(
"CancelTask operation for task_id {} has been sent to node with context_id {}",
context_id,
task.task_id
if let Err(e) = hummock_manager
.cancel_compact_tasks(cancel_tasks.clone(), TaskStatus::HeartbeatProgressCanceled)
.await
{
tracing::error!(
error = %e.as_report(),
"Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
until we can successfully report its status."
);
}
}

if let Some(compactor) = compactor_manager.get_compactor(context_id) {
// Forcefully cancel the task so that it terminates
// early on the compactor
// node.
let _ = compactor.cancel_tasks(&cancel_tasks);
tracing::info!(
"CancelTask operation for task_id {:?} has been sent to node with context_id {}",
cancel_tasks,
context_id
);
} else {
// Determine the validity of the compactor streaming rpc. When the compactor no longer exists in the manager, the stream will be removed.
// Tip: Connectivity to the compactor will be determined through the `send_event` operation. When send fails, it will be removed from the manager
Expand Down Expand Up @@ -1004,7 +1001,7 @@ impl HummockManager {
Ok(ret[0])
}

async fn cancel_compact_tasks(
pub async fn cancel_compact_tasks(
&self,
tasks: Vec<u64>,
task_status: TaskStatus,
Expand Down
17 changes: 13 additions & 4 deletions src/meta/src/hummock/manager/timer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,25 @@ impl HummockManager {
// progress (meta + compactor)
// 2. meta periodically scans the task and performs a cancel on
// the meta side for tasks that are not updated by heartbeat
for task in compactor_manager.get_heartbeat_expired_tasks() {
let expired_tasks: Vec<u64> = compactor_manager
.get_heartbeat_expired_tasks()
.into_iter()
.map(|task| task.task_id)
.collect();
if !expired_tasks.is_empty() {
tracing::info!(
expired_tasks = ?expired_tasks,
"Heartbeat expired compaction tasks detected. Attempting to cancel tasks.",
);
if let Err(e) = hummock_manager
.cancel_compact_task(
task.task_id,
.cancel_compact_tasks(
expired_tasks.clone(),
TaskStatus::HeartbeatCanceled,
)
.await
{
tracing::error!(
task_id = task.task_id,
expired_tasks = ?expired_tasks,
error = %e.as_report(),
"Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
until we can successfully report its status",
Expand Down
47 changes: 29 additions & 18 deletions src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use std::time::Duration;

use criterion::{criterion_group, criterion_main, Criterion};
use foyer::HybridCacheBuilder;
use futures::future::try_join_all;
use itertools::Itertools;
use rand::random;
use risingwave_common::catalog::TableId;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
Expand Down Expand Up @@ -118,17 +116,14 @@ async fn build_tables<F: SstableWriterFactory>(
.await
.unwrap();
}
let split_table_outputs = builder.finish().await.unwrap();
let ssts = split_table_outputs
.iter()
.map(|handle| handle.sst_info.sst_info.clone())
.collect_vec();
let join_handles = split_table_outputs

builder
.finish()
.await
.unwrap()
.into_iter()
.map(|o| o.upload_join_handle)
.collect_vec();
try_join_all(join_handles).await.unwrap();
ssts
.map(|info| info.sst_info)
.collect()
}

async fn generate_sstable_store(object_store: Arc<ObjectStoreImpl>) -> Arc<SstableStore> {
Expand Down Expand Up @@ -160,23 +155,38 @@ async fn generate_sstable_store(object_store: Arc<ObjectStoreImpl>) -> Arc<Sstab

fn bench_builder(
c: &mut Criterion,
bucket: &str,
capacity_mb: usize,
enable_streaming_upload: bool,
local: bool,
) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

let metrics = Arc::new(ObjectStoreMetrics::unused());

let default_config = Arc::new(ObjectStoreConfig::default());

let object_store = runtime.block_on(async {
S3ObjectStore::new_with_config(bucket.to_string(), metrics.clone(), default_config.clone())
if local {
S3ObjectStore::new_minio_engine(
"minio://hummockadmin:[email protected]:9301/hummock001",
metrics.clone(),
default_config.clone(),
)
.await
.monitored(metrics, default_config)
} else {
S3ObjectStore::new_with_config(
env::var("S3_BUCKET").unwrap(),
metrics.clone(),
default_config.clone(),
)
.await
.monitored(metrics, default_config)
}
});

let object_store = Arc::new(ObjectStoreImpl::S3(object_store));

let sstable_store = runtime.block_on(async { generate_sstable_store(object_store).await });
Expand Down Expand Up @@ -216,10 +226,11 @@ fn bench_builder(
// SST size: 4, 32, 64, 128, 256MiB
fn bench_multi_builder(c: &mut Criterion) {
let sst_capacities = vec![4, 32, 64, 128, 256];
let bucket = env::var("S3_BUCKET").unwrap();
let is_local_test = env::var("LOCAL_TEST").is_ok();

for capacity in sst_capacities {
bench_builder(c, &bucket, capacity, false);
bench_builder(c, &bucket, capacity, true);
bench_builder(c, capacity, false, is_local_test);
bench_builder(c, capacity, true, is_local_test);
}
}

Expand Down
Loading

0 comments on commit 32a1129

Please sign in to comment.