Skip to content

Commit

Permalink
feat(storage): batch get_compact_task/apply_compact_task in once tran…
Browse files Browse the repository at this point in the history
…saction (#15523)

Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace authored and Li0k committed Apr 19, 2024
1 parent 5b6bef8 commit bc57c1f
Show file tree
Hide file tree
Showing 10 changed files with 833 additions and 629 deletions.
160 changes: 157 additions & 3 deletions src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,34 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;

use function_name::named;
use futures::future::Shared;
use itertools::Itertools;
use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
use risingwave_pb::hummock::subscribe_compaction_event_request::{
self, Event as RequestEvent, PullTask,
};
use risingwave_pb::hummock::subscribe_compaction_event_response::{
Event as ResponseEvent, PullTaskAck,
};
use risingwave_pb::hummock::{CompactStatus as PbCompactStatus, CompactTaskAssignment};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::oneshot::Receiver as OneShotReceiver;

use crate::hummock::compaction::selector::level_selector::PickerInfo;
use crate::hummock::compaction::selector::DynamicLevelSelectorCore;
use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig};
use crate::hummock::manager::read_lock;
use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
use crate::hummock::manager::{init_selectors, read_lock};
use crate::hummock::HummockManager;

const MAX_SKIP_TIMES: usize = 8;
const MAX_REPORT_COUNT: usize = 16;

#[derive(Default)]
pub struct Compaction {
/// Compaction task that is already assigned to a compactor
Expand Down Expand Up @@ -102,4 +116,144 @@ impl HummockManager {
let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
ctx.score_levels
}

pub async fn handle_pull_task_event(
&self,
context_id: u32,
pull_task_count: usize,
compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
) {
assert_ne!(0, pull_task_count);
if let Some(compactor) = self.compactor_manager.get_compactor(context_id) {
let (groups, task_type) = self.auto_pick_compaction_groups_and_type().await;
if !groups.is_empty() {
let selector: &mut Box<dyn CompactionSelector> =
compaction_selectors.get_mut(&task_type).unwrap();

let mut generated_task_count = 0;
let mut existed_groups = groups.clone();
let mut no_task_groups: HashSet<CompactionGroupId> = HashSet::default();
let mut failed_tasks = vec![];

while generated_task_count < pull_task_count && failed_tasks.is_empty() {
let compact_ret = self
.get_compact_tasks(
existed_groups.clone(),
pull_task_count - generated_task_count,
selector,
)
.await;

match compact_ret {
Ok((compact_tasks, unschedule_groups)) => {
if compact_tasks.is_empty() {
break;
}
generated_task_count += compact_tasks.len();
no_task_groups.extend(unschedule_groups);
for task in compact_tasks {
let task_id = task.task_id;
if let Err(e) =
compactor.send_event(ResponseEvent::CompactTask(task))
{
tracing::warn!(
error = %e.as_report(),
"Failed to send task {} to {}",
task_id,
compactor.context_id(),
);
failed_tasks.push(task_id);
}
}
if !failed_tasks.is_empty() {
self.compactor_manager.remove_compactor(context_id);
}
existed_groups.retain(|group_id| !no_task_groups.contains(group_id));
}
Err(err) => {
tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
break;
}
};
}
for group in no_task_groups {
self.compaction_state.unschedule(group, task_type);
}
if let Err(err) = self
.cancel_compact_tasks(failed_tasks, TaskStatus::SendFailCanceled)
.await
{
tracing::warn!(error = %err.as_report(), "Failed to cancel compaction task");
}
}

// ack to compactor
if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) {
tracing::warn!(
error = %e.as_report(),
"Failed to send ask to {}",
context_id,
);
self.compactor_manager.remove_compactor(context_id);
}
}
}

/// dedicated event runtime for CPU/IO bound event
pub async fn compact_task_dedicated_event_handler(
hummock_manager: Arc<HummockManager>,
mut rx: UnboundedReceiver<(u32, subscribe_compaction_event_request::Event)>,
shutdown_rx_shared: Shared<OneShotReceiver<()>>,
) {
let mut compaction_selectors = init_selectors();

tokio::select! {
_ = shutdown_rx_shared => {}

_ = async {
while let Some((context_id, event)) = rx.recv().await {
let mut report_events = vec![];
let mut skip_times = 0;
match event {
RequestEvent::PullTask(PullTask { pull_task_count }) => {
hummock_manager.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors).await;
}

RequestEvent::ReportTask(task) => {
report_events.push(task);
}

_ => unreachable!(),
}
while let Ok((context_id, event)) = rx.try_recv() {
match event {
RequestEvent::PullTask(PullTask { pull_task_count }) => {
hummock_manager.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors).await;
if !report_events.is_empty() {
if skip_times > MAX_SKIP_TIMES {
break;
}
skip_times += 1;
}
}

RequestEvent::ReportTask(task) => {
report_events.push(task);
if report_events.len() >= MAX_REPORT_COUNT {
break;
}
}
_ => unreachable!(),
}
}
if !report_events.is_empty() {
if let Err(e) = hummock_manager.report_compact_tasks(report_events).await
{
tracing::error!(error = %e.as_report(), "report compact_tack fail")
}
}
}
} => {}
}
}
}
30 changes: 11 additions & 19 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ use risingwave_pb::hummock::compact_task::TaskStatus;
use risingwave_pb::hummock::group_delta::DeltaType;
use risingwave_pb::hummock::hummock_version_delta::GroupDeltas;
use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
use risingwave_pb::hummock::subscribe_compaction_event_request::ReportTask;
use risingwave_pb::hummock::{
compact_task, CompactionConfig, CompactionGroupInfo, CompatibilityVersion, GroupConstruct,
GroupDelta, GroupDestroy, GroupMetaChange, GroupTableChange,
};
use thiserror_ext::AsReport;
use tokio::sync::{OnceCell, RwLock};
use tracing::warn;

use super::write_lock;
use crate::controller::SqlMetaStore;
Expand Down Expand Up @@ -490,7 +490,7 @@ impl HummockManager {
return Ok((parent_group_id, table_to_partition));
}
let table_ids = table_ids.iter().cloned().unique().collect_vec();
let mut compaction_guard = write_lock!(self, compaction).await;
let compaction_guard = write_lock!(self, compaction).await;
let mut versioning_guard = write_lock!(self, versioning).await;
let versioning = versioning_guard.deref_mut();
let current_version = &versioning.current_version;
Expand Down Expand Up @@ -696,27 +696,19 @@ impl HummockManager {
}
}
if need_cancel {
canceled_tasks.push(task.clone());
canceled_tasks.push(ReportTask {
task_id: task.task_id,
task_status: TaskStatus::ManualCanceled as i32,
table_stats_change: HashMap::default(),
sorted_output_ssts: vec![],
});
}
}
}

for task in canceled_tasks {
if !self
.report_compact_task_impl(
task.task_id,
None,
TaskStatus::ManualCanceled,
vec![],
&mut compaction_guard,
None,
)
.await
.unwrap_or(false)
{
warn!("failed to cancel task-{}", task.task_id);
}
}
drop(compaction_guard);
self.report_compact_tasks(canceled_tasks).await?;

// Don't trigger compactions if we enable deterministic compaction
if !self.env.opts.compaction_deterministic_test {
// commit_epoch may contains SSTs from any compaction group
Expand Down
5 changes: 2 additions & 3 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ use itertools::Itertools;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::WorkerType;
use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
use risingwave_pb::hummock::FullScanTask;

use crate::hummock::error::{Error, Result};
use crate::hummock::manager::{
commit_multi_var, create_trx_wrapper, read_lock, write_lock, ResponseEvent,
};
use crate::hummock::manager::{commit_multi_var, create_trx_wrapper, read_lock, write_lock};
use crate::hummock::HummockManager;
use crate::manager::MetadataManager;
use crate::model::{BTreeMapTransaction, BTreeMapTransactionWrapper, ValTransaction};
Expand Down
Loading

0 comments on commit bc57c1f

Please sign in to comment.