Skip to content

Commit

Permalink
feat(storage): remove shared buffer compaction grouped payload (#18277)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Aug 28, 2024
1 parent 9f52981 commit ee77691
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 56 deletions.
59 changes: 18 additions & 41 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ use std::sync::{Arc, LazyLock};
use await_tree::InstrumentAwait;
use bytes::Bytes;
use foyer::CacheContext;
use futures::future::{try_join, try_join_all};
use futures::future::try_join;
use futures::{stream, FutureExt, StreamExt, TryFutureExt};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey, EPOCH_LEN};
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::{CompactionGroupId, EpochWithGap, LocalSstableInfo};
use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo};
use risingwave_pb::hummock::compact_task;
use thiserror_ext::AsReport;
use tracing::{error, warn};
Expand Down Expand Up @@ -59,41 +58,20 @@ pub async fn compact(
context: CompactorContext,
sstable_object_id_manager: SstableObjectIdManagerRef,
payload: Vec<ImmutableMemtable>,
compaction_group_index: Arc<HashMap<TableId, CompactionGroupId>>,
filter_key_extractor_manager: FilterKeyExtractorManager,
) -> HummockResult<UploadTaskOutput> {
let mut grouped_payload: HashMap<CompactionGroupId, Vec<ImmutableMemtable>> = HashMap::new();
for imm in &payload {
let compaction_group_id = match compaction_group_index.get(&imm.table_id) {
// compaction group id is used only as a hint for grouping different data.
// If the compaction group id is not found for the table id, we can assign a
// default compaction group id for the batch.
//
// On meta side, when we commit a new epoch, it is acceptable that the
// compaction group id provided from CN does not match the latest compaction
// group config.
None => StaticCompactionGroupId::StateDefault as CompactionGroupId,
Some(group_id) => *group_id,
};
grouped_payload
.entry(compaction_group_id)
.or_default()
.push(imm.clone());
}

let mut new_value_futures = vec![];
for (id, group_payload) in grouped_payload {
new_value_futures.push(
compact_shared_buffer::<true>(
context.clone(),
sstable_object_id_manager.clone(),
filter_key_extractor_manager.clone(),
group_payload,
)
.map_ok(move |results| results.into_iter())
.instrument_await(format!("shared_buffer_compact_compaction_group {}", id)),
);
}
let new_value_payload = payload.clone();
let new_value_future = async {
compact_shared_buffer::<true>(
context.clone(),
sstable_object_id_manager.clone(),
filter_key_extractor_manager.clone(),
new_value_payload,
)
.map_ok(move |results| results.into_iter())
.instrument_await("shared_buffer_compact_new_value")
.await
};

let old_value_payload = payload
.into_iter()
Expand All @@ -106,19 +84,18 @@ pub async fn compact(
} else {
compact_shared_buffer::<false>(
context.clone(),
sstable_object_id_manager,
filter_key_extractor_manager,
sstable_object_id_manager.clone(),
filter_key_extractor_manager.clone(),
old_value_payload,
)
.await
}
};

// Note that the output is reordered compared with input `payload`.
let (grouped_new_value_ssts, old_value_ssts) =
try_join(try_join_all(new_value_futures), old_value_future).await?;
let (new_value_ssts, old_value_ssts) = try_join(new_value_future, old_value_future).await?;

let new_value_ssts = grouped_new_value_ssts.into_iter().flatten().collect_vec();
let new_value_ssts = new_value_ssts.into_iter().collect_vec();
Ok(UploadTaskOutput {
new_value_ssts,
old_value_ssts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ async fn flush_imms(
compactor_context,
sstable_object_id_manager,
payload,
task_info.compaction_group_index,
filter_key_extractor_manager,
)
.verbose_instrument_await("shared_buffer_compact")
Expand Down
4 changes: 1 addition & 3 deletions src/storage/src/hummock/event_handler/uploader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use risingwave_common::must_match;
use risingwave_hummock_sdk::table_watermark::{
TableWatermarks, VnodeWatermark, WatermarkDirection,
};
use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, LocalSstableInfo};
use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo};
use task_manager::{TaskManager, UploadingTaskStatus};
use thiserror_ext::AsReport;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -88,7 +88,6 @@ pub struct UploadTaskInfo {
pub task_size: usize,
pub epochs: Vec<HummockEpoch>,
pub imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
pub compaction_group_index: Arc<HashMap<TableId, CompactionGroupId>>,
}

impl Display for UploadTaskInfo {
Expand Down Expand Up @@ -249,7 +248,6 @@ impl UploadingTask {
task_size,
epochs,
imm_ids,
compaction_group_index: context.pinned_version.compaction_group_index(),
};
context
.buffer_tracker
Expand Down
12 changes: 1 addition & 11 deletions src/storage/src/hummock/local_version/pinned_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::iter::empty;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -74,7 +74,6 @@ impl Drop for PinnedVersionGuard {
#[derive(Clone)]
pub struct PinnedVersion {
version: Arc<HummockVersion>,
compaction_group_index: Arc<HashMap<TableId, CompactionGroupId>>,
guard: Arc<PinnedVersionGuard>,
}

Expand All @@ -84,22 +83,15 @@ impl PinnedVersion {
pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
) -> Self {
let version_id = version.id;
let compaction_group_index = version.state_table_info.build_table_compaction_group_id();

PinnedVersion {
version: Arc::new(version),
compaction_group_index: Arc::new(compaction_group_index),
guard: Arc::new(PinnedVersionGuard::new(
version_id,
pinned_version_manager_tx,
)),
}
}

pub(crate) fn compaction_group_index(&self) -> Arc<HashMap<TableId, CompactionGroupId>> {
self.compaction_group_index.clone()
}

pub fn new_pin_version(&self, version: HummockVersion) -> Self {
assert!(
version.id >= self.version.id,
Expand All @@ -108,11 +100,9 @@ impl PinnedVersion {
self.version.id
);
let version_id = version.id;
let compaction_group_index = version.state_table_info.build_table_compaction_group_id();

PinnedVersion {
version: Arc::new(version),
compaction_group_index: Arc::new(compaction_group_index),
guard: Arc::new(PinnedVersionGuard::new(
version_id,
self.guard.pinned_version_manager_tx.clone(),
Expand Down

0 comments on commit ee77691

Please sign in to comment.