Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): batch get_compact_task/apply_compact_task in once transaction #15523

Merged
merged 15 commits into from
Apr 2, 2024
160 changes: 157 additions & 3 deletions src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,34 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;

use function_name::named;
use futures::future::Shared;
use itertools::Itertools;
use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
use risingwave_pb::hummock::subscribe_compaction_event_request::{
self, Event as RequestEvent, PullTask,
};
use risingwave_pb::hummock::subscribe_compaction_event_response::{
Event as ResponseEvent, PullTaskAck,
};
use risingwave_pb::hummock::{CompactStatus as PbCompactStatus, CompactTaskAssignment};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::oneshot::Receiver as OneShotReceiver;

use crate::hummock::compaction::selector::level_selector::PickerInfo;
use crate::hummock::compaction::selector::DynamicLevelSelectorCore;
use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig};
use crate::hummock::manager::read_lock;
use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
use crate::hummock::manager::{init_selectors, read_lock};
use crate::hummock::HummockManager;

const MAX_SKIP_TIMES: usize = 8;
const MAX_REPORT_COUNT: usize = 16;

#[derive(Default)]
pub struct Compaction {
/// Compaction task that is already assigned to a compactor
Expand Down Expand Up @@ -102,4 +116,144 @@ impl HummockManager {
let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
ctx.score_levels
}

pub async fn handle_pull_task_event(
&self,
context_id: u32,
pull_task_count: usize,
compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
) {
assert_ne!(0, pull_task_count);
if let Some(compactor) = self.compactor_manager.get_compactor(context_id) {
let (groups, task_type) = self.auto_pick_compaction_groups_and_type().await;
if !groups.is_empty() {
let selector: &mut Box<dyn CompactionSelector> =
compaction_selectors.get_mut(&task_type).unwrap();

let mut generated_task_count = 0;
let mut existed_groups = groups.clone();
let mut no_task_groups: HashSet<CompactionGroupId> = HashSet::default();
let mut failed_tasks = vec![];

while generated_task_count < pull_task_count && failed_tasks.is_empty() {
let compact_ret = self
.get_compact_tasks(
existed_groups.clone(),
pull_task_count - generated_task_count,
selector,
)
.await;

match compact_ret {
Ok((compact_tasks, unschedule_groups)) => {
if compact_tasks.is_empty() {
break;
}
generated_task_count += compact_tasks.len();
no_task_groups.extend(unschedule_groups);
for task in compact_tasks {
hzxa21 marked this conversation as resolved.
Show resolved Hide resolved
let task_id = task.task_id;
if let Err(e) =
compactor.send_event(ResponseEvent::CompactTask(task))
{
tracing::warn!(
error = %e.as_report(),
"Failed to send task {} to {}",
task_id,
compactor.context_id(),
);
failed_tasks.push(task_id);
}
}
if !failed_tasks.is_empty() {
self.compactor_manager.remove_compactor(context_id);
}
existed_groups.retain(|group_id| !no_task_groups.contains(group_id));
}
Err(err) => {
tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
break;
}
};
}
for group in no_task_groups {
self.compaction_state.unschedule(group, task_type);
}
if let Err(err) = self
.cancel_compact_tasks(failed_tasks, TaskStatus::SendFailCanceled)
.await
{
tracing::warn!(error = %err.as_report(), "Failed to cancel compaction task");
}
}

// ack to compactor
if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) {
tracing::warn!(
error = %e.as_report(),
"Failed to send ask to {}",
context_id,
);
self.compactor_manager.remove_compactor(context_id);
}
}
}

/// dedicated event runtime for CPU/IO bound event
pub async fn compact_task_dedicated_event_handler(
hummock_manager: Arc<HummockManager>,
mut rx: UnboundedReceiver<(u32, subscribe_compaction_event_request::Event)>,
shutdown_rx_shared: Shared<OneShotReceiver<()>>,
) {
let mut compaction_selectors = init_selectors();

tokio::select! {
_ = shutdown_rx_shared => {}

_ = async {
while let Some((context_id, event)) = rx.recv().await {
let mut report_events = vec![];
let mut skip_times = 0;
match event {
RequestEvent::PullTask(PullTask { pull_task_count }) => {
hummock_manager.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors).await;
}

RequestEvent::ReportTask(task) => {
report_events.push(task);
}

_ => unreachable!(),
}
while let Ok((context_id, event)) = rx.try_recv() {
match event {
RequestEvent::PullTask(PullTask { pull_task_count }) => {
hummock_manager.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors).await;
if !report_events.is_empty() {
if skip_times > MAX_SKIP_TIMES {
break;
}
skip_times += 1;
}
}

RequestEvent::ReportTask(task) => {
report_events.push(task);
if report_events.len() >= MAX_REPORT_COUNT {
break;
}
}
_ => unreachable!(),
}
}
if !report_events.is_empty() {
if let Err(e) = hummock_manager.report_compact_tasks(report_events).await
{
tracing::error!(error = %e.as_report(), "report compact_tack fail")
}
}
}
} => {}
}
}
}
30 changes: 11 additions & 19 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ use risingwave_pb::hummock::compact_task::TaskStatus;
use risingwave_pb::hummock::group_delta::DeltaType;
use risingwave_pb::hummock::hummock_version_delta::GroupDeltas;
use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
use risingwave_pb::hummock::subscribe_compaction_event_request::ReportTask;
use risingwave_pb::hummock::{
compact_task, CompactionConfig, CompactionGroupInfo, CompatibilityVersion, GroupConstruct,
GroupDelta, GroupDestroy, GroupMetaChange, GroupTableChange,
};
use thiserror_ext::AsReport;
use tokio::sync::{OnceCell, RwLock};
use tracing::warn;

use super::write_lock;
use crate::controller::SqlMetaStore;
Expand Down Expand Up @@ -476,7 +476,7 @@ impl HummockManager {
return Ok((parent_group_id, table_to_partition));
}
let table_ids = table_ids.iter().cloned().unique().collect_vec();
let mut compaction_guard = write_lock!(self, compaction).await;
let compaction_guard = write_lock!(self, compaction).await;
let mut versioning_guard = write_lock!(self, versioning).await;
let versioning = versioning_guard.deref_mut();
let current_version = &versioning.current_version;
Expand Down Expand Up @@ -682,27 +682,19 @@ impl HummockManager {
}
}
if need_cancel {
canceled_tasks.push(task.clone());
canceled_tasks.push(ReportTask {
task_id: task.task_id,
task_status: TaskStatus::ManualCanceled as i32,
table_stats_change: HashMap::default(),
sorted_output_ssts: vec![],
});
}
}
}

for task in canceled_tasks {
if !self
.report_compact_task_impl(
task.task_id,
None,
TaskStatus::ManualCanceled,
vec![],
&mut compaction_guard,
None,
)
.await
.unwrap_or(false)
{
warn!("failed to cancel task-{}", task.task_id);
}
}
drop(compaction_guard);
self.report_compact_tasks(canceled_tasks).await?;

// Don't trigger compactions if we enable deterministic compaction
if !self.env.opts.compaction_deterministic_test {
// commit_epoch may contains SSTs from any compaction group
Expand Down
5 changes: 2 additions & 3 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ use itertools::Itertools;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::WorkerType;
use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
use risingwave_pb::hummock::FullScanTask;

use crate::hummock::error::{Error, Result};
use crate::hummock::manager::{
commit_multi_var, create_trx_wrapper, read_lock, write_lock, ResponseEvent,
};
use crate::hummock::manager::{commit_multi_var, create_trx_wrapper, read_lock, write_lock};
use crate::hummock::HummockManager;
use crate::manager::MetadataManager;
use crate::model::{BTreeMapTransaction, BTreeMapTransactionWrapper, ValTransaction};
Expand Down
Loading
Loading