diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index fab4497433ddb..ce283064008dd 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -21,14 +21,13 @@ use std::sync::{Arc, LazyLock}; use await_tree::InstrumentAwait; use bytes::Bytes; use foyer::CacheContext; -use futures::future::{try_join, try_join_all}; +use futures::future::try_join; use futures::{stream, FutureExt, StreamExt, TryFutureExt}; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey, EPOCH_LEN}; use risingwave_hummock_sdk::key_range::KeyRange; -use risingwave_hummock_sdk::{CompactionGroupId, EpochWithGap, LocalSstableInfo}; +use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo}; use risingwave_pb::hummock::compact_task; use thiserror_ext::AsReport; use tracing::{error, warn}; @@ -59,41 +58,20 @@ pub async fn compact( context: CompactorContext, sstable_object_id_manager: SstableObjectIdManagerRef, payload: Vec, - compaction_group_index: Arc>, filter_key_extractor_manager: FilterKeyExtractorManager, ) -> HummockResult { - let mut grouped_payload: HashMap> = HashMap::new(); - for imm in &payload { - let compaction_group_id = match compaction_group_index.get(&imm.table_id) { - // compaction group id is used only as a hint for grouping different data. - // If the compaction group id is not found for the table id, we can assign a - // default compaction group id for the batch. - // - // On meta side, when we commit a new epoch, it is acceptable that the - // compaction group id provided from CN does not match the latest compaction - // group config. - None => StaticCompactionGroupId::StateDefault as CompactionGroupId, - Some(group_id) => *group_id, - }; - grouped_payload - .entry(compaction_group_id) - .or_default() - .push(imm.clone()); - } - - let mut new_value_futures = vec![]; - for (id, group_payload) in grouped_payload { - new_value_futures.push( - compact_shared_buffer::( - context.clone(), - sstable_object_id_manager.clone(), - filter_key_extractor_manager.clone(), - group_payload, - ) - .map_ok(move |results| results.into_iter()) - .instrument_await(format!("shared_buffer_compact_compaction_group {}", id)), - ); - } + let new_value_payload = payload.clone(); + let new_value_future = async { + compact_shared_buffer::( + context.clone(), + sstable_object_id_manager.clone(), + filter_key_extractor_manager.clone(), + new_value_payload, + ) + .map_ok(move |results| results.into_iter()) + .instrument_await("shared_buffer_compact_new_value") + .await + }; let old_value_payload = payload .into_iter() @@ -106,8 +84,8 @@ pub async fn compact( } else { compact_shared_buffer::( context.clone(), - sstable_object_id_manager, - filter_key_extractor_manager, + sstable_object_id_manager.clone(), + filter_key_extractor_manager.clone(), old_value_payload, ) .await @@ -115,10 +93,9 @@ pub async fn compact( }; // Note that the output is reordered compared with input `payload`. - let (grouped_new_value_ssts, old_value_ssts) = - try_join(try_join_all(new_value_futures), old_value_future).await?; + let (new_value_ssts, old_value_ssts) = try_join(new_value_future, old_value_future).await?; - let new_value_ssts = grouped_new_value_ssts.into_iter().flatten().collect_vec(); + let new_value_ssts = new_value_ssts.into_iter().collect_vec(); Ok(UploadTaskOutput { new_value_ssts, old_value_ssts, diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index ee87177923e9b..f2aa2ea7fd88d 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -228,7 +228,6 @@ async fn flush_imms( compactor_context, sstable_object_id_manager, payload, - task_info.compaction_group_index, filter_key_extractor_manager, ) .verbose_instrument_await("shared_buffer_compact") diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 88351c34b6210..4494049d93b0b 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -37,7 +37,7 @@ use risingwave_common::must_match; use risingwave_hummock_sdk::table_watermark::{ TableWatermarks, VnodeWatermark, WatermarkDirection, }; -use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, LocalSstableInfo}; +use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; use task_manager::{TaskManager, UploadingTaskStatus}; use thiserror_ext::AsReport; use tokio::sync::oneshot; @@ -88,7 +88,6 @@ pub struct UploadTaskInfo { pub task_size: usize, pub epochs: Vec, pub imm_ids: HashMap>, - pub compaction_group_index: Arc>, } impl Display for UploadTaskInfo { @@ -249,7 +248,6 @@ impl UploadingTask { task_size, epochs, imm_ids, - compaction_group_index: context.pinned_version.compaction_group_index(), }; context .buffer_tracker diff --git a/src/storage/src/hummock/local_version/pinned_version.rs b/src/storage/src/hummock/local_version/pinned_version.rs index 2a552ffbbf31a..5ef53edcd26ef 100644 --- a/src/storage/src/hummock/local_version/pinned_version.rs +++ b/src/storage/src/hummock/local_version/pinned_version.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::BTreeMap; use std::iter::empty; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -74,7 +74,6 @@ impl Drop for PinnedVersionGuard { #[derive(Clone)] pub struct PinnedVersion { version: Arc, - compaction_group_index: Arc>, guard: Arc, } @@ -84,11 +83,8 @@ impl PinnedVersion { pinned_version_manager_tx: UnboundedSender, ) -> Self { let version_id = version.id; - let compaction_group_index = version.state_table_info.build_table_compaction_group_id(); - PinnedVersion { version: Arc::new(version), - compaction_group_index: Arc::new(compaction_group_index), guard: Arc::new(PinnedVersionGuard::new( version_id, pinned_version_manager_tx, @@ -96,10 +92,6 @@ impl PinnedVersion { } } - pub(crate) fn compaction_group_index(&self) -> Arc> { - self.compaction_group_index.clone() - } - pub fn new_pin_version(&self, version: HummockVersion) -> Self { assert!( version.id >= self.version.id, @@ -108,11 +100,9 @@ impl PinnedVersion { self.version.id ); let version_id = version.id; - let compaction_group_index = version.state_table_info.build_table_compaction_group_id(); PinnedVersion { version: Arc::new(version), - compaction_group_index: Arc::new(compaction_group_index), guard: Arc::new(PinnedVersionGuard::new( version_id, self.guard.pinned_version_manager_tx.clone(),