From 5994337aac8e33050d652804115160be07f7b59b Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 12 Dec 2023 17:59:31 +0800 Subject: [PATCH 01/12] feat(storage): clear up dropped column --- proto/hummock.proto | 6 + .../column_aware_row_encoding.rs | 104 +++++++++++++++++- src/meta/src/hummock/manager/mod.rs | 12 +- src/meta/src/manager/catalog/mod.rs | 25 +++++ src/prost/build.rs | 1 + .../src/hummock/compactor/compaction_utils.rs | 10 +- .../src/hummock/compactor/compactor_runner.rs | 48 +++++++- .../compactor/fast_compactor_runner.rs | 1 + src/storage/src/hummock/compactor/iterator.rs | 10 ++ .../compactor/shared_buffer_compact.rs | 1 + .../src/hummock/iterator/concat_inner.rs | 7 ++ .../src/hummock/iterator/forward_merge.rs | 4 + .../src/hummock/iterator/merge_inner.rs | 4 + src/storage/src/hummock/iterator/mod.rs | 26 +++++ .../src/hummock/iterator/skip_watermark.rs | 4 + .../shared_buffer/shared_buffer_batch.rs | 4 + .../sstable/backward_sstable_iterator.rs | 4 + .../sstable/forward_sstable_iterator.rs | 4 + 18 files changed, 264 insertions(+), 11 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index df582cc491ae3..031794c4e38f7 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -278,6 +278,10 @@ message TableOption { uint32 retention_seconds = 1; } +message TableSchema { + repeated int32 column_ids = 1; +} + message CompactTask { enum TaskStatus { UNSPECIFIED = 0; @@ -345,6 +349,8 @@ message CompactTask { // The table watermark of any table id. In compaction we only use the table watermarks on safe epoch, // so we only need to include the table watermarks on safe epoch to reduce the size of metadata. map table_watermarks = 24; + // The table schemas that are at least as new as the one used to create `input_ssts`. + map table_schemas = 25; } message LevelHandler { diff --git a/src/common/src/util/value_encoding/column_aware_row_encoding.rs b/src/common/src/util/value_encoding/column_aware_row_encoding.rs index 118aaf0bf2e45..9abd370959f75 100644 --- a/src/common/src/util/value_encoding/column_aware_row_encoding.rs +++ b/src/common/src/util/value_encoding/column_aware_row_encoding.rs @@ -19,7 +19,7 @@ //! We have a `Serializer` and a `Deserializer` for each schema of `Row`, which can be reused //! until schema changes -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::sync::Arc; use bitflags::bitflags; @@ -105,6 +105,25 @@ impl RowEncoding { .expect("should encode at least one column"); self.set_offsets(&offset_usize, max_offset); } + + // TODO: avoid duplicated code + fn encode_slice<'a>(&mut self, datum_refs: impl Iterator>) { + debug_assert!( + self.buf.is_empty(), + "should not encode one RowEncoding object multiple times." + ); + let mut offset_usize = vec![]; + for datum in datum_refs { + offset_usize.push(self.buf.len()); + if let Some(v) = datum { + self.buf.put_slice(v); + } + } + let max_offset = *offset_usize + .last() + .expect("should encode at least one column"); + self.set_offsets(&offset_usize, max_offset); + } } /// Column-Aware `Serializer` holds schema related information, and shall be @@ -268,3 +287,86 @@ impl ValueRowDeserializer for ColumnAwareSerde { self.deserializer.deserialize(encoded_bytes) } } + +/// Deserializes row `encoded_bytes`, drops columns not in `valid_column_ids`, serializes and returns. +/// If no column is dropped, returns None. +// TODO: avoid duplicated code +pub fn try_drop_invalid_columns( + mut encoded_bytes: &[u8], + valid_column_ids: &[i32], +) -> Option> { + let valid_column_ids: HashSet = valid_column_ids.iter().copied().collect(); + let datum_num = encoded_bytes.get_u32_le() as usize; + let mut is_column_dropped = false; + let mut encoded_bytes_copy = encoded_bytes; + for _ in 0..datum_num { + let this_id = encoded_bytes_copy.get_i32_le(); + if !valid_column_ids.contains(&this_id) { + is_column_dropped = true; + break; + } + } + if !is_column_dropped { + return None; + } + + // Slow path that drops columns. Should be rare. + let flag = Flag::from_bits(encoded_bytes.get_u8()).expect("should be a valid flag"); + let offset_bytes = match flag - Flag::EMPTY { + Flag::OFFSET8 => 1, + Flag::OFFSET16 => 2, + Flag::OFFSET32 => 4, + _ => panic!("invalid flag {}", flag.bits()), + }; + let offsets_start_idx = 4 * datum_num; + let data_start_idx = offsets_start_idx + datum_num * offset_bytes; + let offsets = &encoded_bytes[offsets_start_idx..data_start_idx]; + let data = &encoded_bytes[data_start_idx..]; + let mut datums: Vec> = Vec::with_capacity(valid_column_ids.len()); + let mut column_ids = Vec::with_capacity(valid_column_ids.len()); + for i in 0..datum_num { + let this_id = encoded_bytes.get_i32_le(); + if valid_column_ids.contains(&this_id) { + column_ids.push(this_id); + let this_offset_start_idx = i * offset_bytes; + let mut this_offset_slice = + &offsets[this_offset_start_idx..(this_offset_start_idx + offset_bytes)]; + let this_offset = deserialize_width(offset_bytes, &mut this_offset_slice); + let data = if i + 1 < datum_num { + let mut next_offset_slice = &offsets[(this_offset_start_idx + offset_bytes) + ..(this_offset_start_idx + 2 * offset_bytes)]; + let next_offset = deserialize_width(offset_bytes, &mut next_offset_slice); + if this_offset == next_offset { + None + } else { + let data_slice = &data[this_offset..next_offset]; + Some(data_slice) + } + } else if this_offset == data.len() { + None + } else { + let data_slice = &data[this_offset..]; + Some(data_slice) + }; + datums.push(data); + } + } + + let mut encoding = RowEncoding::new(); + encoding.encode_slice(datums.into_iter()); + let mut encoded_column_ids = Vec::with_capacity(column_ids.len() * 4); + let datum_num = column_ids.len() as u32; + for id in column_ids { + encoded_column_ids.put_i32_le(id); + } + let mut row_bytes = Vec::with_capacity( + 5 + encoded_column_ids.len() + encoding.offsets.len() + encoding.buf.len(), /* 5 comes from u8+u32 */ + ); + row_bytes.put_u8(encoding.flag.bits()); + row_bytes.put_u32_le(datum_num); + row_bytes.extend(&encoded_column_ids); + row_bytes.extend(&encoding.offsets); + row_bytes.extend(&encoding.buf); + + Some(row_bytes) +} diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 06dcd63b69313..e4ad7f828e483 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -57,7 +57,8 @@ use risingwave_pb::hummock::{ version_update_payload, CompactTask, CompactTaskAssignment, CompactionConfig, GroupDelta, HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, HummockVersion, HummockVersionCheckpoint, HummockVersionDelta, HummockVersionDeltas, HummockVersionStats, - IntraLevelDelta, SstableInfo, SubscribeCompactionEventRequest, TableOption, TableWatermarks, + IntraLevelDelta, SstableInfo, SubscribeCompactionEventRequest, TableOption, TableSchema, + TableWatermarks, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; @@ -1128,11 +1129,18 @@ impl HummockManager { anyhow::anyhow!("failpoint metastore error") ))); - while let Some(task) = self + while let Some(mut task) = self .get_compact_task_impl(compaction_group_id, selector) .await? { if let TaskStatus::Pending = task.task_status() { + task.table_schemas = self + .catalog_manager + .get_versioned_table_schemas(&task.existing_table_ids) + .await + .into_iter() + .map(|(table_id, column_ids)| (table_id, TableSchema { column_ids })) + .collect(); return Ok(Some(task)); } assert!( diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 498325660cb04..9aea086308f01 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -3210,6 +3210,31 @@ impl CatalogManager { .map(|table| table.id) .collect() } + + /// Returns column ids of `table_ids` that is versioned. + /// Being versioned implies using `ColumnAwareSerde`. + pub async fn get_versioned_table_schemas( + &self, + table_ids: &[TableId], + ) -> HashMap> { + let guard = self.core.lock().await; + table_ids + .iter() + .filter_map(|table_id| { + if let Some(t) = guard.database.tables.get(table_id) && t.version.is_some() { + let ret = ( + t.id, + t.columns + .iter() + .map(|c| c.column_desc.as_ref().unwrap().column_id) + .collect_vec(), + ); + return Some(ret); + } + None + }) + .collect() + } } // User related methods diff --git a/src/prost/build.rs b/src/prost/build.rs index 5b8ddda59e098..7909b58ab4dc6 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -133,6 +133,7 @@ fn main() -> Result<(), Box> { .type_attribute("hummock.LevelHandler", "#[derive(Eq)]") .type_attribute("hummock.TableOption", "#[derive(Eq)]") .type_attribute("hummock.InputLevel", "#[derive(Eq)]") + .type_attribute("hummock.TableSchema", "#[derive(Eq)]") .type_attribute("hummock.CompactTask", "#[derive(Eq)]") .type_attribute("hummock.TableWatermarks", "#[derive(Eq)]") .type_attribute("hummock.VnodeWatermark", "#[derive(Eq)]") diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 53df3fd9de482..8645a3b3a53ed 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::marker::PhantomData; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -24,7 +24,9 @@ use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_hummock_sdk::table_stats::TableStatsMap; use risingwave_hummock_sdk::{EpochWithGap, KeyComparator}; -use risingwave_pb::hummock::{compact_task, CompactTask, KeyRange as KeyRange_vec, SstableInfo}; +use risingwave_pb::hummock::{ + compact_task, CompactTask, KeyRange as KeyRange_vec, SstableInfo, TableSchema, +}; use tokio::time::Instant; pub use super::context::CompactorContext; @@ -122,6 +124,10 @@ pub struct TaskConfig { pub use_block_based_filter: bool, pub table_vnode_partition: BTreeMap, + /// `TableId` -> `TableSchema` + /// Schemas in `table_schemas` are at least as new as the one used to create `input_ssts`. + /// For a table with schema existing in `table_schemas`, its columns not in `table_schemas` but in `input_ssts` can be safely dropped. + pub table_schemas: HashMap, } pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter { diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 47443a3a6fee9..d056c1b9f432a 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -20,13 +20,14 @@ use bytes::Bytes; use futures::{stream, FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::util::epoch::is_max_epoch; +use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns; use risingwave_hummock_sdk::compact::{ compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task, }; use risingwave_hummock_sdk::key::{FullKey, PointRange}; use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon}; use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, TableStatsMap}; -use risingwave_hummock_sdk::{can_concat, EpochWithGap, HummockEpoch}; +use risingwave_hummock_sdk::{can_concat, EpochWithGap, HummockEpoch, HummockSstableObjectId}; use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType}; use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType}; use tokio::sync::oneshot::Receiver; @@ -106,6 +107,11 @@ impl CompactorRunner { || task.target_level == task.base_level, use_block_based_filter, table_vnode_partition: task.table_vnode_partition.clone(), + table_schemas: task + .table_schemas + .iter() + .map(|(k, v)| (*k, v.clone())) + .collect(), }, object_id_getter, ); @@ -706,6 +712,7 @@ where let mut last_table_id = None; let mut compaction_statistics = CompactionStatistics::default(); let mut progress_key_num: u64 = 0; + let mut skip_schema_check: HashSet<(HummockSstableObjectId, u32)> = HashSet::default(); const PROGRESS_KEY_INTERVAL: u64 = 100; while iter.is_valid() { progress_key_num += 1; @@ -727,6 +734,7 @@ where let epoch = iter_key.epoch_with_gap.pure_epoch(); let value = iter.value(); + let value_meta = iter.value_meta(); if is_new_user_key { if !max_key.is_empty() && iter_key >= max_key { break; @@ -842,11 +850,39 @@ where is_new_user_key = false; } - // Don't allow two SSTs to share same user key - sst_builder - .add_full_key(iter_key, value, is_new_user_key) - .verbose_instrument_await("add_full_key") - .await?; + // May drop stale columns + let check_table_id = last_key.user_key.table_id.table_id; + let mut is_value_rewritten = false; + if let HummockValue::Put(v) = value + && let Some(object_id) = value_meta + && !skip_schema_check.contains(&(object_id, check_table_id)) + && let Some(schema) = task_config + .table_schemas + .get(&check_table_id) { + match try_drop_invalid_columns(v, &schema.column_ids) { + None => { + // Under the assumption that all values in the same (table, SSTable) group should share the same schema, + // if one value drops no columns during a compaction, no need to check other values in the same group. + skip_schema_check.insert((object_id, check_table_id)); + } + Some(new_value) => { + is_value_rewritten = true; + let new_put = HummockValue::put(new_value.as_slice()); + sst_builder + .add_full_key(iter_key, new_put, is_new_user_key) + .verbose_instrument_await("add_rewritten_full_key") + .await?; + } + } + } + + if !is_value_rewritten { + // Don't allow two SSTs to share same user key + sst_builder + .add_full_key(iter_key, value, is_new_user_key) + .verbose_instrument_await("add_full_key") + .await?; + } iter.next().verbose_instrument_await("iter_next").await?; } diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs index 3b9056dc1c17b..16d0660bba822 100644 --- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs +++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs @@ -313,6 +313,7 @@ impl CompactorRunner { is_target_l0_or_lbase: task.target_level == 0 || task.target_level == task.base_level, table_vnode_partition: task.table_vnode_partition.clone(), use_block_based_filter: true, + table_schemas: Default::default(), }; let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone()); diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 34b834625a477..f581ec4a2c129 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -488,6 +488,16 @@ impl HummockIterator for ConcatSstableIterator { fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { stats.add(&self.stats) } + + fn value_meta(&self) -> Option { + let object_id = self + .sstable_iter + .as_ref() + .expect("no table iter") + .sstable_info + .object_id; + Some(object_id) + } } #[cfg(test)] diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 3051d22edab02..d829de2d3441e 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -506,6 +506,7 @@ impl SharedBufferCompactRunner { is_target_l0_or_lbase: true, table_vnode_partition, use_block_based_filter, + table_schemas: Default::default(), }, object_id_getter, ); diff --git a/src/storage/src/hummock/iterator/concat_inner.rs b/src/storage/src/hummock/iterator/concat_inner.rs index 5402f5bfb987f..eb5f002583541 100644 --- a/src/storage/src/hummock/iterator/concat_inner.rs +++ b/src/storage/src/hummock/iterator/concat_inner.rs @@ -180,4 +180,11 @@ impl HummockIterator for ConcatIteratorInner { iter.collect_local_statistic(stats); } } + + fn value_meta(&self) -> Option { + self.sstable_iter + .as_ref() + .expect("no table iter") + .value_meta() + } } diff --git a/src/storage/src/hummock/iterator/forward_merge.rs b/src/storage/src/hummock/iterator/forward_merge.rs index 70b5c72f9db28..9a0e909f73654 100644 --- a/src/storage/src/hummock/iterator/forward_merge.rs +++ b/src/storage/src/hummock/iterator/forward_merge.rs @@ -335,6 +335,10 @@ mod test { } fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {} + + fn value_meta(&self) -> Option { + None + } } #[tokio::test] diff --git a/src/storage/src/hummock/iterator/merge_inner.rs b/src/storage/src/hummock/iterator/merge_inner.rs index 9865287f0b588..b512bc90edda3 100644 --- a/src/storage/src/hummock/iterator/merge_inner.rs +++ b/src/storage/src/hummock/iterator/merge_inner.rs @@ -403,4 +403,8 @@ where fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { self.collect_local_statistic_impl(stats); } + + fn value_meta(&self) -> Option { + self.heap.peek().expect("no inner iter").iter.value_meta() + } } diff --git a/src/storage/src/hummock/iterator/mod.rs b/src/storage/src/hummock/iterator/mod.rs index 488cc5d6c65ba..df5c8b606a6ac 100644 --- a/src/storage/src/hummock/iterator/mod.rs +++ b/src/storage/src/hummock/iterator/mod.rs @@ -125,6 +125,11 @@ pub trait HummockIterator: Send + Sync { /// take local statistic info from iterator to report metrics. fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic); + + /// Returns value meta. + /// + /// Currently the only value meta is `SSTable` object id of current KV, if any. + fn value_meta(&self) -> Option; } /// This is a placeholder trait used in `HummockIteratorUnion` @@ -160,6 +165,10 @@ impl HummockIterator for PhantomHummockIterator } fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {} + + fn value_meta(&self) -> Option { + unreachable!() + } } /// The `HummockIteratorUnion` acts like a wrapper over multiple types of `HummockIterator`, so that @@ -259,6 +268,15 @@ impl< Fourth(iter) => iter.collect_local_statistic(stats), } } + + fn value_meta(&self) -> Option { + match self { + First(iter) => iter.value_meta(), + Second(iter) => iter.value_meta(), + Third(iter) => iter.value_meta(), + Fourth(iter) => iter.value_meta(), + } + } } impl HummockIterator for Box { @@ -291,6 +309,10 @@ impl HummockIterator for Box { fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { (*self).deref().collect_local_statistic(stats); } + + fn value_meta(&self) -> Option { + (*self).deref().value_meta() + } } pub enum RustIteratorOfBuilder<'a, B: RustIteratorBuilder> { @@ -439,6 +461,10 @@ impl<'a, B: RustIteratorBuilder> HummockIterator for FromRustIterator<'a, B> { } fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {} + + fn value_meta(&self) -> Option { + None + } } #[derive(PartialEq, Eq, Debug)] diff --git a/src/storage/src/hummock/iterator/skip_watermark.rs b/src/storage/src/hummock/iterator/skip_watermark.rs index 09644b2ab7475..1b23cfdabdae0 100644 --- a/src/storage/src/hummock/iterator/skip_watermark.rs +++ b/src/storage/src/hummock/iterator/skip_watermark.rs @@ -269,6 +269,10 @@ impl> HummockIterator for SkipWatermarkI fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { self.inner.collect_local_statistic(stats) } + + fn value_meta(&self) -> Option { + self.inner.value_meta() + } } #[cfg(test)] diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 0dc7d67bc6175..faec9dfe351df 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -828,6 +828,10 @@ impl HummockIterator for SharedBufferBatchIterator< } fn collect_local_statistic(&self, _stats: &mut crate::monitor::StoreLocalStatistic) {} + + fn value_meta(&self) -> Option { + None + } } pub struct SharedBufferDeleteRangeIterator { diff --git a/src/storage/src/hummock/sstable/backward_sstable_iterator.rs b/src/storage/src/hummock/sstable/backward_sstable_iterator.rs index cd2de8d234b94..0ac44544d74cd 100644 --- a/src/storage/src/hummock/sstable/backward_sstable_iterator.rs +++ b/src/storage/src/hummock/sstable/backward_sstable_iterator.rs @@ -149,6 +149,10 @@ impl HummockIterator for BackwardSstableIterator { fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { stats.add(&self.stats) } + + fn value_meta(&self) -> Option { + Some(self.sst.value().id) + } } impl SstableIteratorType for BackwardSstableIterator { diff --git a/src/storage/src/hummock/sstable/forward_sstable_iterator.rs b/src/storage/src/hummock/sstable/forward_sstable_iterator.rs index 53d0ecc770b13..803f6f867badc 100644 --- a/src/storage/src/hummock/sstable/forward_sstable_iterator.rs +++ b/src/storage/src/hummock/sstable/forward_sstable_iterator.rs @@ -287,6 +287,10 @@ impl HummockIterator for SstableIterator { fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { stats.add(&self.stats); } + + fn value_meta(&self) -> Option { + Some(self.sst.value().id) + } } impl SstableIteratorType for SstableIterator { From 69c21649510d112e543851fe69205954f4d771b8 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Wed, 13 Dec 2023 14:28:55 +0800 Subject: [PATCH 02/12] bugfix --- src/common/src/util/value_encoding/column_aware_row_encoding.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/src/util/value_encoding/column_aware_row_encoding.rs b/src/common/src/util/value_encoding/column_aware_row_encoding.rs index 9abd370959f75..d7d2e047a7922 100644 --- a/src/common/src/util/value_encoding/column_aware_row_encoding.rs +++ b/src/common/src/util/value_encoding/column_aware_row_encoding.rs @@ -296,6 +296,7 @@ pub fn try_drop_invalid_columns( valid_column_ids: &[i32], ) -> Option> { let valid_column_ids: HashSet = valid_column_ids.iter().copied().collect(); + let flag = Flag::from_bits(encoded_bytes.get_u8()).expect("should be a valid flag"); let datum_num = encoded_bytes.get_u32_le() as usize; let mut is_column_dropped = false; let mut encoded_bytes_copy = encoded_bytes; @@ -311,7 +312,6 @@ pub fn try_drop_invalid_columns( } // Slow path that drops columns. Should be rare. - let flag = Flag::from_bits(encoded_bytes.get_u8()).expect("should be a valid flag"); let offset_bytes = match flag - Flag::EMPTY { Flag::OFFSET8 => 1, Flag::OFFSET16 => 2, From cca49bc4a13418287246182f62322b7d02588f63 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Wed, 13 Dec 2023 15:02:15 +0800 Subject: [PATCH 03/12] fixup: add tests --- .../column_aware_row_encoding.rs | 8 +++- src/storage/src/row_serde/value_serde.rs | 39 +++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/src/common/src/util/value_encoding/column_aware_row_encoding.rs b/src/common/src/util/value_encoding/column_aware_row_encoding.rs index d7d2e047a7922..23c170b598042 100644 --- a/src/common/src/util/value_encoding/column_aware_row_encoding.rs +++ b/src/common/src/util/value_encoding/column_aware_row_encoding.rs @@ -106,7 +106,7 @@ impl RowEncoding { self.set_offsets(&offset_usize, max_offset); } - // TODO: avoid duplicated code + // TODO: Avoid duplicated code. `encode_slice` is the same as `encode` except it doesn't require column type. fn encode_slice<'a>(&mut self, datum_refs: impl Iterator>) { debug_assert!( self.buf.is_empty(), @@ -290,7 +290,7 @@ impl ValueRowDeserializer for ColumnAwareSerde { /// Deserializes row `encoded_bytes`, drops columns not in `valid_column_ids`, serializes and returns. /// If no column is dropped, returns None. -// TODO: avoid duplicated code +// TODO: Avoid duplicated code. The current code combines`Serializer` and `Deserializer` with unavailable parameter removed, e.g. `Deserializer::schema`. pub fn try_drop_invalid_columns( mut encoded_bytes: &[u8], valid_column_ids: &[i32], @@ -351,6 +351,10 @@ pub fn try_drop_invalid_columns( datums.push(data); } } + if column_ids.is_empty() { + // According to `RowEncoding::encode`, at least one column is required. + return None; + } let mut encoding = RowEncoding::new(); encoding.encode_slice(datums.into_iter()); diff --git a/src/storage/src/row_serde/value_serde.rs b/src/storage/src/row_serde/value_serde.rs index 9048b90c23a53..1f8c6b9a5dc81 100644 --- a/src/storage/src/row_serde/value_serde.rs +++ b/src/storage/src/row_serde/value_serde.rs @@ -146,6 +146,7 @@ mod tests { use risingwave_common::row::OwnedRow; use risingwave_common::types::ScalarImpl::*; use risingwave_common::util::value_encoding::column_aware_row_encoding; + use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns; use super::*; @@ -269,4 +270,42 @@ mod tests { let decoded_row = serde.deserialize(&encoded_bytes); assert_eq!(decoded_row.unwrap(), data); } + + #[test] + fn test_drop_column() { + let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)]; + let row1 = OwnedRow::new(vec![ + Some(Int16(5)), + Some(Utf8("abc".into())), + Some(Utf8("ABC".into())), + ]); + let serializer = column_aware_row_encoding::Serializer::new(&column_ids); + let row_bytes = serializer.serialize(row1); + + // no columns is dropped + assert!(try_drop_invalid_columns(&row_bytes, &[0, 1, 2]).is_none()); + + // column id 1 is dropped + let row_bytes_dropped = try_drop_invalid_columns(&row_bytes, &[0, 2]).unwrap(); + let deserializer = column_aware_row_encoding::Deserializer::new( + &[ColumnId::new(0), ColumnId::new(2)], + Arc::from(vec![DataType::Int16, DataType::Varchar].into_boxed_slice()), + std::iter::empty(), + ); + let decoded = deserializer.deserialize(&row_bytes_dropped[..]); + assert_eq!( + decoded.unwrap(), + vec![Some(Int16(5)), Some(Utf8("ABC".into()))] + ); + + // all columns are dropped + let row_bytes_dropped = try_drop_invalid_columns(&row_bytes, &[]).unwrap(); + let deserializer = column_aware_row_encoding::Deserializer::new( + &[], + Arc::from(vec![].into_boxed_slice()), + std::iter::empty(), + ); + let decoded = deserializer.deserialize(&row_bytes_dropped[..]); + assert_eq!(decoded.unwrap(), vec![]); + } } From 17762d341b457e1b697a6f904ea039aac047b24b Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Wed, 13 Dec 2023 20:04:37 +0800 Subject: [PATCH 04/12] add bench --- .../column_aware_row_encoding.rs | 3 +- src/storage/benches/bench_compactor.rs | 267 +++++++++++++++++- .../src/hummock/compactor/compaction_utils.rs | 1 + .../src/hummock/compactor/compactor_runner.rs | 19 +- .../compactor/fast_compactor_runner.rs | 1 + .../compactor/shared_buffer_compact.rs | 1 + src/storage/src/row_serde/value_serde.rs | 20 +- 7 files changed, 280 insertions(+), 32 deletions(-) diff --git a/src/common/src/util/value_encoding/column_aware_row_encoding.rs b/src/common/src/util/value_encoding/column_aware_row_encoding.rs index 23c170b598042..760a84f9871a0 100644 --- a/src/common/src/util/value_encoding/column_aware_row_encoding.rs +++ b/src/common/src/util/value_encoding/column_aware_row_encoding.rs @@ -293,9 +293,8 @@ impl ValueRowDeserializer for ColumnAwareSerde { // TODO: Avoid duplicated code. The current code combines`Serializer` and `Deserializer` with unavailable parameter removed, e.g. `Deserializer::schema`. pub fn try_drop_invalid_columns( mut encoded_bytes: &[u8], - valid_column_ids: &[i32], + valid_column_ids: &HashSet, ) -> Option> { - let valid_column_ids: HashSet = valid_column_ids.iter().copied().collect(); let flag = Flag::from_bits(encoded_bytes.get_u8()).expect("should be a valid flag"); let datum_num = encoded_bytes.get_u32_le() as usize; let mut is_column_dropped = false; diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index 78e9bede3f1a3..61215858ffe10 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -18,14 +18,18 @@ use std::sync::Arc; use criterion::async_executor::FuturesExecutor; use criterion::{criterion_group, criterion_main, Criterion}; use risingwave_common::cache::CachePriority; -use risingwave_common::catalog::TableId; +use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; use risingwave_common::hash::VirtualNode; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::DataType; +use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; +use risingwave_common::util::value_encoding::ValueRowSerializer; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::HummockEpoch; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::{InMemObjectStore, ObjectStore, ObjectStoreImpl}; -use risingwave_pb::hummock::{compact_task, SstableInfo}; +use risingwave_pb::hummock::{compact_task, SstableInfo, TableSchema}; use risingwave_storage::hummock::compactor::compactor_runner::compact_and_build_sst; use risingwave_storage::hummock::compactor::{ ConcatSstableIterator, DummyCompactionFilter, TaskConfig, TaskProgress, @@ -71,9 +75,9 @@ pub fn default_writer_opts() -> SstableWriterOptions { } } -pub fn test_key_of(idx: usize, epoch: u64) -> FullKey> { +pub fn test_key_of(idx: usize, epoch: u64, table_id: TableId) -> FullKey> { FullKey::for_test( - TableId::default(), + table_id, [ VirtualNode::ZERO.to_be_bytes().as_slice(), format!("key_test_{:08}", idx * 2).as_bytes(), @@ -109,7 +113,7 @@ async fn build_table( let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(sstable_object_id, writer, opt); let value = b"1234567890123456789"; - let mut full_key = test_key_of(0, epoch); + let mut full_key = test_key_of(0, epoch, TableId::new(0)); let table_key_len = full_key.user_key.table_key.len(); for i in range { let start = (i % 8) as usize; @@ -127,6 +131,63 @@ async fn build_table( sst } +async fn build_table_2( + sstable_store: SstableStoreRef, + sstable_object_id: u64, + range: Range, + epoch: u64, + table_id: u32, + column_num: usize, +) -> SstableInfo { + let opt = SstableBuilderOptions { + capacity: 32 * 1024 * 1024, + block_capacity: 16 * 1024, + restart_interval: 16, + bloom_false_positive: 0.001, + ..Default::default() + }; + let writer = sstable_store.create_sst_writer( + sstable_object_id, + SstableWriterOptions { + capacity_hint: None, + tracker: None, + policy: CachePolicy::Fill(CachePriority::High), + }, + ); + let mut builder = + SstableBuilder::<_, Xor16FilterBuilder>::for_test(sstable_object_id, writer, opt); + let mut full_key = test_key_of(0, epoch, TableId::new(table_id)); + let table_key_len = full_key.user_key.table_key.len(); + + let schema = vec![DataType::Int64; column_num]; + let column_ids = (0..column_num as i32).into_iter().map(ColumnId::new); + use risingwave_common::types::ScalarImpl; + let row = OwnedRow::new(vec![Some(ScalarImpl::Int64(5)); column_num]); + let table_columns: Vec<_> = column_ids + .clone() + .map(|id| ColumnDesc::unnamed(id, schema.get(id.get_id() as usize).unwrap().clone())) + .collect(); + use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew; + let serializer = ColumnAwareSerde::new( + Arc::from_iter(column_ids.map(|id| id.get_id() as usize)), + table_columns.into(), + ); + let row_bytes = serializer.serialize(row); + + for i in range { + full_key.user_key.table_key[table_key_len - 8..].copy_from_slice(&i.to_be_bytes()); + builder + .add_for_test(full_key.to_ref(), HummockValue::put(&row_bytes)) + .await + .unwrap(); + } + let output = builder.finish().await.unwrap(); + let handle = output.writer_output; + let sst = output.sst_info.sst_info; + handle.await.unwrap().unwrap(); + sst +} + async fn scan_all_table(info: &SstableInfo, sstable_store: SstableStoreRef) { let mut stats = StoreLocalStatistic::default(); let table = sstable_store.sstable(info, &mut stats).await.unwrap(); @@ -175,7 +236,11 @@ fn bench_table_scan(c: &mut Criterion) { }); } -async fn compact>(iter: I, sstable_store: SstableStoreRef) { +async fn compact>( + iter: I, + sstable_store: SstableStoreRef, + task_config: Option, +) { let opt = SstableBuilderOptions { capacity: 32 * 1024 * 1024, block_capacity: 64 * 1024, @@ -186,7 +251,7 @@ async fn compact>(iter: I, sstable_store let mut builder = CapacitySplitTableBuilder::for_test(LocalTableBuilderFactory::new(32, sstable_store, opt)); - let task_config = TaskConfig { + let task_config = task_config.unwrap_or_else(|| TaskConfig { key_range: KeyRange::inf(), cache_policy: CachePolicy::Disable, gc_delete_keys: false, @@ -195,7 +260,7 @@ async fn compact>(iter: I, sstable_store task_type: compact_task::TaskType::Dynamic, use_block_based_filter: true, ..Default::default() - }; + }); compact_and_build_sst( &mut builder, CompactionDeleteRangeIterator::new(ForwardMergeRangeIterator::new(HummockEpoch::MAX)), @@ -240,7 +305,7 @@ fn bench_merge_iterator_compactor(c: &mut Criterion) { ConcatIterator::new(level2.clone(), sstable_store.clone(), read_options.clone()), ]; let iter = UnorderedMergeIteratorInner::for_compactor(sub_iters); - async move { compact(iter, sstable_store1).await } + async move { compact(iter, sstable_store1, None).await } }); }); c.bench_function("bench_merge_iterator", |b| { @@ -265,15 +330,191 @@ fn bench_merge_iterator_compactor(c: &mut Criterion) { ]; let iter = UnorderedMergeIteratorInner::for_compactor(sub_iters); let sstable_store1 = sstable_store.clone(); - async move { compact(iter, sstable_store1).await } + async move { compact(iter, sstable_store1, None).await } }); }); } +fn bench_drop_column_compaction_impl(c: &mut Criterion, column_num: usize) { + let runtime = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + let sstable_store = mock_sstable_store(); + let test_key_size = 256 * 1024; + let info1 = runtime.block_on(async { + build_table_2( + sstable_store.clone(), + 1, + 0..test_key_size, + 1, + 10, + column_num, + ) + .await + }); + let info2 = runtime.block_on(async { + build_table_2( + sstable_store.clone(), + 2, + 0..test_key_size, + 1, + 11, + column_num, + ) + .await + }); + let level1 = vec![info1, info2]; + + let info1 = runtime.block_on(async { + build_table_2( + sstable_store.clone(), + 3, + 0..test_key_size, + 2, + 10, + column_num, + ) + .await + }); + let info2 = runtime.block_on(async { + build_table_2( + sstable_store.clone(), + 4, + 0..test_key_size, + 2, + 11, + column_num, + ) + .await + }); + let level2 = vec![info1, info2]; + + let task_config_no_schema = TaskConfig { + key_range: KeyRange::inf(), + cache_policy: CachePolicy::Disable, + gc_delete_keys: false, + watermark: 0, + stats_target_table_ids: None, + task_type: compact_task::TaskType::Dynamic, + use_block_based_filter: true, + table_schemas: vec![].into_iter().collect(), + ..Default::default() + }; + + let mut task_config_schema = task_config_no_schema.clone(); + task_config_schema.table_schemas.insert( + 10, + TableSchema { + column_ids: (0..column_num as i32).collect(), + }, + ); + task_config_schema.table_schemas.insert( + 11, + TableSchema { + column_ids: (0..column_num as i32).collect(), + }, + ); + + let mut task_config_schema_cause_drop = task_config_no_schema.clone(); + task_config_schema_cause_drop.table_schemas.insert( + 10, + TableSchema { + column_ids: (0..column_num as i32 / 2).collect(), + }, + ); + task_config_schema_cause_drop.table_schemas.insert( + 11, + TableSchema { + column_ids: (0..column_num as i32 / 2).collect(), + }, + ); + + let get_iter = || { + let sub_iters = vec![ + ConcatSstableIterator::new( + vec![10, 11], + level1.clone(), + KeyRange::inf(), + sstable_store.clone(), + Arc::new(TaskProgress::default()), + 0, + ), + ConcatSstableIterator::new( + vec![10, 11], + level2.clone(), + KeyRange::inf(), + sstable_store.clone(), + Arc::new(TaskProgress::default()), + 0, + ), + ]; + UnorderedMergeIteratorInner::for_compactor(sub_iters) + }; + + c.bench_function( + &format!("bench_drop_column_compaction_baseline_c{column_num}"), + |b| { + b.to_async(&runtime).iter(|| { + let iter = get_iter(); + let sstable_store1 = sstable_store.clone(); + let task_config_clone = task_config_no_schema.clone(); + async move { compact(iter, sstable_store1, Some(task_config_clone)).await } + }); + }, + ); + + c.bench_function( + &format!("bench_drop_column_compaction_without_drop_c{column_num}"), + |b| { + b.to_async(&runtime).iter(|| { + let iter = get_iter(); + let sstable_store1 = sstable_store.clone(); + let task_config_clone = task_config_schema.clone(); + async move { compact(iter, sstable_store1, Some(task_config_clone)).await } + }); + }, + ); + + c.bench_function( + &format!("bench_drop_column_compaction_without_drop_disable_optimization_c{column_num}"), + |b| { + b.to_async(&runtime).iter(|| { + let iter = get_iter(); + let sstable_store1 = sstable_store.clone(); + let mut task_config_clone = task_config_schema.clone(); + task_config_clone.disable_drop_column_optimization = true; + async move { compact(iter, sstable_store1, Some(task_config_clone)).await } + }); + }, + ); + + c.bench_function( + &format!("bench_drop_column_compaction_with_drop_c{column_num}"), + |b| { + b.to_async(&runtime).iter(|| { + let iter = get_iter(); + let sstable_store1 = sstable_store.clone(); + let task_config_clone = task_config_schema_cause_drop.clone(); + async move { compact(iter, sstable_store1, Some(task_config_clone)).await } + }); + }, + ); +} + +fn bench_drop_column_compaction_small(c: &mut Criterion) { + bench_drop_column_compaction_impl(c, 10); +} + +fn bench_drop_column_compaction_large(c: &mut Criterion) { + bench_drop_column_compaction_impl(c, 100); +} + criterion_group!( benches, - bench_table_build, - bench_table_scan, - bench_merge_iterator_compactor + // bench_table_build, + // bench_table_scan, + // bench_merge_iterator_compactor, + bench_drop_column_compaction_small, + bench_drop_column_compaction_large ); criterion_main!(benches); diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 8645a3b3a53ed..d1f7ec8284139 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -128,6 +128,7 @@ pub struct TaskConfig { /// Schemas in `table_schemas` are at least as new as the one used to create `input_ssts`. /// For a table with schema existing in `table_schemas`, its columns not in `table_schemas` but in `input_ssts` can be safely dropped. pub table_schemas: HashMap, + pub disable_drop_column_optimization: bool, } pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter { diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index d056c1b9f432a..e6348a9e6043e 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -112,6 +112,7 @@ impl CompactorRunner { .iter() .map(|(k, v)| (*k, v.clone())) .collect(), + disable_drop_column_optimization: false, }, object_id_getter, ); @@ -713,6 +714,11 @@ where let mut compaction_statistics = CompactionStatistics::default(); let mut progress_key_num: u64 = 0; let mut skip_schema_check: HashSet<(HummockSstableObjectId, u32)> = HashSet::default(); + let schemas: HashMap> = task_config + .table_schemas + .iter() + .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect())) + .collect(); const PROGRESS_KEY_INTERVAL: u64 = 100; while iter.is_valid() { progress_key_num += 1; @@ -856,14 +862,15 @@ where if let HummockValue::Put(v) = value && let Some(object_id) = value_meta && !skip_schema_check.contains(&(object_id, check_table_id)) - && let Some(schema) = task_config - .table_schemas + && let Some(schema) = schemas .get(&check_table_id) { - match try_drop_invalid_columns(v, &schema.column_ids) { + match try_drop_invalid_columns(v, &schema) { None => { - // Under the assumption that all values in the same (table, SSTable) group should share the same schema, - // if one value drops no columns during a compaction, no need to check other values in the same group. - skip_schema_check.insert((object_id, check_table_id)); + if !task_config.disable_drop_column_optimization { + // Under the assumption that all values in the same (table, SSTable) group should share the same schema, + // if one value drops no columns during a compaction, no need to check other values in the same group. + skip_schema_check.insert((object_id, check_table_id)); + } } Some(new_value) => { is_value_rewritten = true; diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs index 16d0660bba822..8dfa1ac77b042 100644 --- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs +++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs @@ -314,6 +314,7 @@ impl CompactorRunner { table_vnode_partition: task.table_vnode_partition.clone(), use_block_based_filter: true, table_schemas: Default::default(), + disable_drop_column_optimization: false, }; let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone()); diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index d829de2d3441e..11890dd145f65 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -507,6 +507,7 @@ impl SharedBufferCompactRunner { table_vnode_partition, use_block_based_filter, table_schemas: Default::default(), + disable_drop_column_optimization: false, }, object_id_getter, ); diff --git a/src/storage/src/row_serde/value_serde.rs b/src/storage/src/row_serde/value_serde.rs index 1f8c6b9a5dc81..be928e4d8df2f 100644 --- a/src/storage/src/row_serde/value_serde.rs +++ b/src/storage/src/row_serde/value_serde.rs @@ -142,6 +142,8 @@ impl ValueRowSerde for ColumnAwareSerde { #[cfg(test)] mod tests { + use std::collections::HashSet; + use risingwave_common::catalog::ColumnId; use risingwave_common::row::OwnedRow; use risingwave_common::types::ScalarImpl::*; @@ -283,10 +285,13 @@ mod tests { let row_bytes = serializer.serialize(row1); // no columns is dropped - assert!(try_drop_invalid_columns(&row_bytes, &[0, 1, 2]).is_none()); + assert!( + try_drop_invalid_columns(&row_bytes, &[0, 1, 2, 3, 4].into_iter().collect()).is_none() + ); // column id 1 is dropped - let row_bytes_dropped = try_drop_invalid_columns(&row_bytes, &[0, 2]).unwrap(); + let row_bytes_dropped = + try_drop_invalid_columns(&row_bytes, &[0, 2].into_iter().collect()).unwrap(); let deserializer = column_aware_row_encoding::Deserializer::new( &[ColumnId::new(0), ColumnId::new(2)], Arc::from(vec![DataType::Int16, DataType::Varchar].into_boxed_slice()), @@ -298,14 +303,7 @@ mod tests { vec![Some(Int16(5)), Some(Utf8("ABC".into()))] ); - // all columns are dropped - let row_bytes_dropped = try_drop_invalid_columns(&row_bytes, &[]).unwrap(); - let deserializer = column_aware_row_encoding::Deserializer::new( - &[], - Arc::from(vec![].into_boxed_slice()), - std::iter::empty(), - ); - let decoded = deserializer.deserialize(&row_bytes_dropped[..]); - assert_eq!(decoded.unwrap(), vec![]); + // drop all columns is now allowed + assert!(try_drop_invalid_columns(&row_bytes, &HashSet::new()).is_none()); } } From df571a3ad0cdd473dd02092d1edab97b24f5dac3 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Thu, 14 Dec 2023 16:23:49 +0800 Subject: [PATCH 05/12] fixup: group by (object,block) --- .../src/hummock/compactor/compactor_runner.rs | 16 +++++++++------ src/storage/src/hummock/compactor/iterator.rs | 16 +++++++-------- .../src/hummock/iterator/concat_inner.rs | 6 ++++-- .../src/hummock/iterator/forward_merge.rs | 4 ++-- .../src/hummock/iterator/merge_inner.rs | 6 ++++-- src/storage/src/hummock/iterator/mod.rs | 20 +++++++++++-------- .../src/hummock/iterator/skip_watermark.rs | 4 ++-- .../shared_buffer/shared_buffer_batch.rs | 6 +++--- .../sstable/backward_sstable_iterator.rs | 9 ++++++--- .../sstable/forward_sstable_iterator.rs | 9 ++++++--- 10 files changed, 56 insertions(+), 40 deletions(-) diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index e6348a9e6043e..22955f4e0342c 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -45,7 +45,7 @@ use crate::hummock::compactor::{ }; use crate::hummock::iterator::{ Forward, ForwardMergeRangeIterator, HummockIterator, SkipWatermarkIterator, - UnorderedMergeIteratorInner, + UnorderedMergeIteratorInner, ValueMeta, }; use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory}; use crate::hummock::value::HummockValue; @@ -713,7 +713,7 @@ where let mut last_table_id = None; let mut compaction_statistics = CompactionStatistics::default(); let mut progress_key_num: u64 = 0; - let mut skip_schema_check: HashSet<(HummockSstableObjectId, u32)> = HashSet::default(); + let mut skip_schema_check: HashSet<(HummockSstableObjectId, u64)> = HashSet::default(); let schemas: HashMap> = task_config .table_schemas .iter() @@ -740,7 +740,10 @@ where let epoch = iter_key.epoch_with_gap.pure_epoch(); let value = iter.value(); - let value_meta = iter.value_meta(); + let ValueMeta { + object_id, + block_id, + } = iter.value_meta(); if is_new_user_key { if !max_key.is_empty() && iter_key >= max_key { break; @@ -860,8 +863,9 @@ where let check_table_id = last_key.user_key.table_id.table_id; let mut is_value_rewritten = false; if let HummockValue::Put(v) = value - && let Some(object_id) = value_meta - && !skip_schema_check.contains(&(object_id, check_table_id)) + && let Some(object_id) = object_id + && let Some(block_id) = block_id + && !skip_schema_check.contains(&(object_id, block_id)) && let Some(schema) = schemas .get(&check_table_id) { match try_drop_invalid_columns(v, &schema) { @@ -869,7 +873,7 @@ where if !task_config.disable_drop_column_optimization { // Under the assumption that all values in the same (table, SSTable) group should share the same schema, // if one value drops no columns during a compaction, no need to check other values in the same group. - skip_schema_check.insert((object_id, check_table_id)); + skip_schema_check.insert((object_id, block_id)); } } Some(new_value) => { diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index f581ec4a2c129..d1b278dcbc31d 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -28,7 +28,7 @@ use risingwave_pb::hummock::SstableInfo; use crate::hummock::block_stream::BlockDataStream; use crate::hummock::compactor::task_progress::TaskProgress; -use crate::hummock::iterator::{Forward, HummockIterator}; +use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta}; use crate::hummock::sstable_store::SstableStoreRef; use crate::hummock::value::HummockValue; use crate::hummock::{BlockHolder, BlockIterator, BlockMeta, HummockResult}; @@ -489,14 +489,12 @@ impl HummockIterator for ConcatSstableIterator { stats.add(&self.stats) } - fn value_meta(&self) -> Option { - let object_id = self - .sstable_iter - .as_ref() - .expect("no table iter") - .sstable_info - .object_id; - Some(object_id) + fn value_meta(&self) -> ValueMeta { + let iter = self.sstable_iter.as_ref().expect("no table iter"); + ValueMeta { + object_id: Some(iter.sstable_info.object_id), + block_id: Some(iter.seek_block_idx as u64 - 1), + } } } diff --git a/src/storage/src/hummock/iterator/concat_inner.rs b/src/storage/src/hummock/iterator/concat_inner.rs index eb5f002583541..c6f5df400b987 100644 --- a/src/storage/src/hummock/iterator/concat_inner.rs +++ b/src/storage/src/hummock/iterator/concat_inner.rs @@ -18,7 +18,9 @@ use std::sync::Arc; use risingwave_hummock_sdk::key::FullKey; use risingwave_pb::hummock::SstableInfo; -use crate::hummock::iterator::{DirectionEnum, HummockIterator, HummockIteratorDirection}; +use crate::hummock::iterator::{ + DirectionEnum, HummockIterator, HummockIteratorDirection, ValueMeta, +}; use crate::hummock::sstable::SstableIteratorReadOptions; use crate::hummock::value::HummockValue; use crate::hummock::{HummockResult, SstableIteratorType, SstableStoreRef}; @@ -181,7 +183,7 @@ impl HummockIterator for ConcatIteratorInner { } } - fn value_meta(&self) -> Option { + fn value_meta(&self) -> ValueMeta { self.sstable_iter .as_ref() .expect("no table iter") diff --git a/src/storage/src/hummock/iterator/forward_merge.rs b/src/storage/src/hummock/iterator/forward_merge.rs index 9a0e909f73654..52550024a429b 100644 --- a/src/storage/src/hummock/iterator/forward_merge.rs +++ b/src/storage/src/hummock/iterator/forward_merge.rs @@ -30,7 +30,7 @@ mod test { }; use crate::hummock::iterator::{ Forward, HummockIterator, HummockIteratorUnion, OrderedMergeIteratorInner, - UnorderedMergeIteratorInner, + UnorderedMergeIteratorInner, ValueMeta, }; use crate::hummock::sstable::{ SstableIterator, SstableIteratorReadOptions, SstableIteratorType, @@ -336,7 +336,7 @@ mod test { fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {} - fn value_meta(&self) -> Option { + fn value_meta(&self) -> ValueMeta { None } } diff --git a/src/storage/src/hummock/iterator/merge_inner.rs b/src/storage/src/hummock/iterator/merge_inner.rs index b512bc90edda3..a4aec51b34ef6 100644 --- a/src/storage/src/hummock/iterator/merge_inner.rs +++ b/src/storage/src/hummock/iterator/merge_inner.rs @@ -21,7 +21,9 @@ use bytes::Bytes; use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; use risingwave_hummock_sdk::EpochWithGap; -use crate::hummock::iterator::{DirectionEnum, Forward, HummockIterator, HummockIteratorDirection}; +use crate::hummock::iterator::{ + DirectionEnum, Forward, HummockIterator, HummockIteratorDirection, ValueMeta, +}; use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchIterator; use crate::hummock::value::HummockValue; use crate::hummock::HummockResult; @@ -404,7 +406,7 @@ where self.collect_local_statistic_impl(stats); } - fn value_meta(&self) -> Option { + fn value_meta(&self) -> ValueMeta { self.heap.peek().expect("no inner iter").iter.value_meta() } } diff --git a/src/storage/src/hummock/iterator/mod.rs b/src/storage/src/hummock/iterator/mod.rs index df5c8b606a6ac..abc428beab618 100644 --- a/src/storage/src/hummock/iterator/mod.rs +++ b/src/storage/src/hummock/iterator/mod.rs @@ -54,6 +54,12 @@ pub use skip_watermark::*; use crate::monitor::StoreLocalStatistic; +#[derive(Default)] +pub struct ValueMeta { + pub object_id: Option, + pub block_id: Option, +} + /// `HummockIterator` defines the interface of all iterators, including `SstableIterator`, /// `MergeIterator`, `UserIterator` and `ConcatIterator`. /// @@ -127,9 +133,7 @@ pub trait HummockIterator: Send + Sync { fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic); /// Returns value meta. - /// - /// Currently the only value meta is `SSTable` object id of current KV, if any. - fn value_meta(&self) -> Option; + fn value_meta(&self) -> ValueMeta; } /// This is a placeholder trait used in `HummockIteratorUnion` @@ -166,7 +170,7 @@ impl HummockIterator for PhantomHummockIterator fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {} - fn value_meta(&self) -> Option { + fn value_meta(&self) -> ValueMeta { unreachable!() } } @@ -269,7 +273,7 @@ impl< } } - fn value_meta(&self) -> Option { + fn value_meta(&self) -> ValueMeta { match self { First(iter) => iter.value_meta(), Second(iter) => iter.value_meta(), @@ -310,7 +314,7 @@ impl HummockIterator for Box { (*self).deref().collect_local_statistic(stats); } - fn value_meta(&self) -> Option { + fn value_meta(&self) -> ValueMeta { (*self).deref().value_meta() } } @@ -462,8 +466,8 @@ impl<'a, B: RustIteratorBuilder> HummockIterator for FromRustIterator<'a, B> { fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {} - fn value_meta(&self) -> Option { - None + fn value_meta(&self) -> ValueMeta { + ValueMeta::default() } } diff --git a/src/storage/src/hummock/iterator/skip_watermark.rs b/src/storage/src/hummock/iterator/skip_watermark.rs index 1b23cfdabdae0..5ad669ddf7bdf 100644 --- a/src/storage/src/hummock/iterator/skip_watermark.rs +++ b/src/storage/src/hummock/iterator/skip_watermark.rs @@ -23,7 +23,7 @@ use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection}; use risingwave_pb::hummock::PbTableWatermarks; -use crate::hummock::iterator::{Forward, HummockIterator}; +use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta}; use crate::hummock::value::HummockValue; use crate::hummock::HummockResult; use crate::monitor::StoreLocalStatistic; @@ -270,7 +270,7 @@ impl> HummockIterator for SkipWatermarkI self.inner.collect_local_statistic(stats) } - fn value_meta(&self) -> Option { + fn value_meta(&self) -> ValueMeta { self.inner.value_meta() } } diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index faec9dfe351df..7610a4fc025a7 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -31,7 +31,7 @@ use risingwave_hummock_sdk::EpochWithGap; use crate::hummock::event_handler::LocalInstanceId; use crate::hummock::iterator::{ Backward, DeleteRangeIterator, DirectionEnum, Forward, HummockIterator, - HummockIteratorDirection, + HummockIteratorDirection, ValueMeta, }; use crate::hummock::utils::{range_overlap, MemoryTracker}; use crate::hummock::value::HummockValue; @@ -829,8 +829,8 @@ impl HummockIterator for SharedBufferBatchIterator< fn collect_local_statistic(&self, _stats: &mut crate::monitor::StoreLocalStatistic) {} - fn value_meta(&self) -> Option { - None + fn value_meta(&self) -> ValueMeta { + ValueMeta::default() } } diff --git a/src/storage/src/hummock/sstable/backward_sstable_iterator.rs b/src/storage/src/hummock/sstable/backward_sstable_iterator.rs index 0ac44544d74cd..e92850ef7810a 100644 --- a/src/storage/src/hummock/sstable/backward_sstable_iterator.rs +++ b/src/storage/src/hummock/sstable/backward_sstable_iterator.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use risingwave_common::cache::CachePriority; use risingwave_hummock_sdk::key::FullKey; -use crate::hummock::iterator::{Backward, HummockIterator}; +use crate::hummock::iterator::{Backward, HummockIterator, ValueMeta}; use crate::hummock::sstable::SstableIteratorReadOptions; use crate::hummock::value::HummockValue; use crate::hummock::{ @@ -150,8 +150,11 @@ impl HummockIterator for BackwardSstableIterator { stats.add(&self.stats) } - fn value_meta(&self) -> Option { - Some(self.sst.value().id) + fn value_meta(&self) -> ValueMeta { + ValueMeta { + object_id: Some(self.sst.value().id), + block_id: Some(self.cur_idx as _), + } } } diff --git a/src/storage/src/hummock/sstable/forward_sstable_iterator.rs b/src/storage/src/hummock/sstable/forward_sstable_iterator.rs index 803f6f867badc..32fc0cd42a095 100644 --- a/src/storage/src/hummock/sstable/forward_sstable_iterator.rs +++ b/src/storage/src/hummock/sstable/forward_sstable_iterator.rs @@ -20,7 +20,7 @@ use risingwave_hummock_sdk::key::FullKey; use super::super::{HummockResult, HummockValue}; use crate::hummock::block_stream::BlockStream; -use crate::hummock::iterator::{Forward, HummockIterator}; +use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta}; use crate::hummock::sstable::SstableIteratorReadOptions; use crate::hummock::{BlockIterator, SstableStoreRef, TableHolder}; use crate::monitor::StoreLocalStatistic; @@ -288,8 +288,11 @@ impl HummockIterator for SstableIterator { stats.add(&self.stats); } - fn value_meta(&self) -> Option { - Some(self.sst.value().id) + fn value_meta(&self) -> ValueMeta { + ValueMeta { + object_id: Some(self.sst.value().id), + block_id: Some(self.cur_idx as _), + } } } From 122f87b0ce4aed502a6d3fb48486cdf2d582ca45 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Thu, 14 Dec 2023 17:12:25 +0800 Subject: [PATCH 06/12] refactor --- .../src/hummock/compactor/compaction_utils.rs | 1 + .../src/hummock/compactor/compactor_runner.rs | 12 ++++++------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index d1f7ec8284139..e3d0413fb442f 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -128,6 +128,7 @@ pub struct TaskConfig { /// Schemas in `table_schemas` are at least as new as the one used to create `input_ssts`. /// For a table with schema existing in `table_schemas`, its columns not in `table_schemas` but in `input_ssts` can be safely dropped. pub table_schemas: HashMap, + /// `disable_drop_column_optimization` should only be set in benchmark. pub disable_drop_column_optimization: bool, } diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 22955f4e0342c..7f5fed2265da9 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -713,7 +713,8 @@ where let mut last_table_id = None; let mut compaction_statistics = CompactionStatistics::default(); let mut progress_key_num: u64 = 0; - let mut skip_schema_check: HashSet<(HummockSstableObjectId, u64)> = HashSet::default(); + // object id -> block id. block is is updated in a monotonically non-decreasing manner. + let mut skip_schema_check: HashMap = HashMap::default(); let schemas: HashMap> = task_config .table_schemas .iter() @@ -865,15 +866,14 @@ where if let HummockValue::Put(v) = value && let Some(object_id) = object_id && let Some(block_id) = block_id - && !skip_schema_check.contains(&(object_id, block_id)) - && let Some(schema) = schemas - .get(&check_table_id) { + && skip_schema_check.get(&object_id).map(|prev_block_id|*prev_block_id != block_id).unwrap_or(true) + && let Some(schema) = schemas.get(&check_table_id) { match try_drop_invalid_columns(v, &schema) { None => { if !task_config.disable_drop_column_optimization { - // Under the assumption that all values in the same (table, SSTable) group should share the same schema, + // Under the assumption that all values in the same (object, block) group should share the same schema, // if one value drops no columns during a compaction, no need to check other values in the same group. - skip_schema_check.insert((object_id, block_id)); + skip_schema_check.insert(object_id, block_id); } } Some(new_value) => { From 25351b672637bc973c265a8bfacd5ff42d2d2a9b Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Thu, 14 Dec 2023 17:36:15 +0800 Subject: [PATCH 07/12] fix test --- src/storage/benches/bench_compactor.rs | 6 +++--- src/storage/src/hummock/iterator/forward_merge.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index 61215858ffe10..76ed30a43119d 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -511,9 +511,9 @@ fn bench_drop_column_compaction_large(c: &mut Criterion) { criterion_group!( benches, - // bench_table_build, - // bench_table_scan, - // bench_merge_iterator_compactor, + bench_table_build, + bench_table_scan, + bench_merge_iterator_compactor, bench_drop_column_compaction_small, bench_drop_column_compaction_large ); diff --git a/src/storage/src/hummock/iterator/forward_merge.rs b/src/storage/src/hummock/iterator/forward_merge.rs index 52550024a429b..07f0517aef220 100644 --- a/src/storage/src/hummock/iterator/forward_merge.rs +++ b/src/storage/src/hummock/iterator/forward_merge.rs @@ -337,7 +337,7 @@ mod test { fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {} fn value_meta(&self) -> ValueMeta { - None + ValueMeta::default() } } From a9242fbf3523d4c7cddf461681abfe162cc7456c Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Thu, 14 Dec 2023 18:07:29 +0800 Subject: [PATCH 08/12] fixup: format --- src/storage/benches/bench_compactor.rs | 2 +- src/storage/src/hummock/compactor/compactor_runner.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index 76ed30a43119d..773d0bc532784 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -160,7 +160,7 @@ async fn build_table_2( let table_key_len = full_key.user_key.table_key.len(); let schema = vec![DataType::Int64; column_num]; - let column_ids = (0..column_num as i32).into_iter().map(ColumnId::new); + let column_ids = (0..column_num as i32).map(ColumnId::new); use risingwave_common::types::ScalarImpl; let row = OwnedRow::new(vec![Some(ScalarImpl::Int64(5)); column_num]); let table_columns: Vec<_> = column_ids diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 7f5fed2265da9..ed75c24138e15 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -868,7 +868,7 @@ where && let Some(block_id) = block_id && skip_schema_check.get(&object_id).map(|prev_block_id|*prev_block_id != block_id).unwrap_or(true) && let Some(schema) = schemas.get(&check_table_id) { - match try_drop_invalid_columns(v, &schema) { + match try_drop_invalid_columns(v, schema) { None => { if !task_config.disable_drop_column_optimization { // Under the assumption that all values in the same (object, block) group should share the same schema, From a23355f39bc1f381a8a15112a712c818070c5825 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Thu, 14 Dec 2023 18:20:53 +0800 Subject: [PATCH 09/12] add opt to disable dropped column reclaim --- src/common/src/config.rs | 8 ++++++++ src/config/example.toml | 1 + src/meta/node/src/lib.rs | 1 + src/meta/src/hummock/manager/mod.rs | 16 +++++++++------- src/meta/src/manager/env.rs | 2 ++ 5 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index d7c4c6f260971..36cc37f2509fa 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -305,6 +305,10 @@ pub struct MetaConfig { /// Keeps the latest N events per channel. #[serde(default = "default::meta::event_log_channel_max_size")] pub event_log_channel_max_size: u32, + + /// Whether compactor should rewrite row to remove dropped column. + #[serde(default = "default::meta::enable_dropped_column_reclaim")] + pub enable_dropped_column_reclaim: bool, } #[derive(Clone, Debug, Default)] @@ -1038,6 +1042,10 @@ pub mod default { pub fn event_log_channel_max_size() -> u32 { 10 } + + pub fn enable_dropped_column_reclaim() -> bool { + true + } } pub mod server { diff --git a/src/config/example.toml b/src/config/example.toml index eda01aa5f8905..90175eb419619 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -47,6 +47,7 @@ compaction_task_max_heartbeat_interval_secs = 60 hybird_partition_vnode_count = 4 event_log_enabled = true event_log_channel_max_size = 10 +enable_dropped_column_reclaim = true [meta.compaction_config] max_bytes_for_level_base = 536870912 diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index a25168d29dc9c..1ba2e9285fca0 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -302,6 +302,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { event_log_enabled: config.meta.event_log_enabled, event_log_channel_max_size: config.meta.event_log_channel_max_size, advertise_addr: opts.advertise_addr, + enable_dropped_column_reclaim: config.meta.enable_dropped_column_reclaim, }, config.system.into_init_system_params(), ) diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index e4ad7f828e483..26485284fa4bb 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -1134,13 +1134,15 @@ impl HummockManager { .await? { if let TaskStatus::Pending = task.task_status() { - task.table_schemas = self - .catalog_manager - .get_versioned_table_schemas(&task.existing_table_ids) - .await - .into_iter() - .map(|(table_id, column_ids)| (table_id, TableSchema { column_ids })) - .collect(); + if self.env.opts.enable_dropped_column_reclaim { + task.table_schemas = self + .catalog_manager + .get_versioned_table_schemas(&task.existing_table_ids) + .await + .into_iter() + .map(|(table_id, column_ids)| (table_id, TableSchema { column_ids })) + .collect(); + } return Ok(Some(task)); } assert!( diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 31a9cad8f316e..c11818ce42d14 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -194,6 +194,7 @@ pub struct MetaOpts { pub event_log_enabled: bool, pub event_log_channel_max_size: u32, pub advertise_addr: String, + pub enable_dropped_column_reclaim: bool, } impl MetaOpts { @@ -241,6 +242,7 @@ impl MetaOpts { event_log_enabled: false, event_log_channel_max_size: 1, advertise_addr: "".to_string(), + enable_dropped_column_reclaim: false, } } } From b3389a26cbd1b9f766570b0514315f3f018e5204 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Fri, 8 Mar 2024 09:14:44 +0800 Subject: [PATCH 10/12] refactor --- src/storage/benches/bench_compactor.rs | 6 +++--- src/storage/src/hummock/compactor/compactor_runner.rs | 10 +++++++--- src/storage/src/hummock/compactor/iterator.rs | 3 +++ 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index 473ed5db7a37b..cadbad5cad85b 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use criterion::async_executor::FuturesExecutor; use criterion::{criterion_group, criterion_main, Criterion}; use risingwave_common::cache::CachePriority; -use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId, TableId}; +use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; use risingwave_common::hash::VirtualNode; use risingwave_common::row::OwnedRow; @@ -326,7 +326,7 @@ fn bench_merge_iterator_compactor(c: &mut Criterion) { ConcatIterator::new(level2.clone(), sstable_store.clone(), read_options.clone()), ]; let iter = MergeIterator::for_compactor(sub_iters); - async move { compact(iter, sstable_store1).await } + async move { compact(iter, sstable_store1, None).await } }); }); c.bench_function("bench_merge_iterator", |b| { @@ -469,7 +469,7 @@ fn bench_drop_column_compaction_impl(c: &mut Criterion, column_num: usize) { 0, ), ]; - UnorderedMergeIteratorInner::for_compactor(sub_iters) + MergeIterator::for_compactor(sub_iters) }; c.bench_function( diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index c9b14c1c3baba..05145c4e431e4 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -767,6 +767,7 @@ where let mut last_table_stats = TableStats::default(); let mut last_table_id = None; let mut compaction_statistics = CompactionStatistics::default(); + // object id -> block id. For an object id, block id is updated in a monotonically non-decreasing manner. let mut skip_schema_check: HashMap = HashMap::default(); let schemas: HashMap> = task_config .table_schemas @@ -907,10 +908,13 @@ where if let HummockValue::Put(v) = value && let Some(object_id) = object_id && let Some(block_id) = block_id - && skip_schema_check + && !skip_schema_check .get(&object_id) - .map(|prev_block_id| *prev_block_id != block_id) - .unwrap_or(true) + .map(|prev_block_id| { + assert!(*prev_block_id <= block_id); + *prev_block_id == block_id + }) + .unwrap_or(false) && let Some(schema) = schemas.get(&check_table_id) { match try_drop_invalid_columns(v, schema) { diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index f4f1ca3f5a143..13fafe720be4c 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -489,6 +489,9 @@ impl HummockIterator for ConcatSstableIterator { fn value_meta(&self) -> ValueMeta { let iter = self.sstable_iter.as_ref().expect("no table iter"); + // sstable_iter's seek_block_idx must have advanced at least one. + // See SstableStreamIterator::next_block. + assert!(iter.seek_block_idx >= 1); ValueMeta { object_id: Some(iter.sstable_info.object_id), block_id: Some(iter.seek_block_idx as u64 - 1), From 3eeaa2035bf42312a15117f30ba9a735c4074cf9 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Fri, 8 Mar 2024 09:46:29 +0800 Subject: [PATCH 11/12] minor --- src/storage/src/hummock/compactor/compactor_runner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 05145c4e431e4..8556eb7e827a2 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -767,7 +767,7 @@ where let mut last_table_stats = TableStats::default(); let mut last_table_id = None; let mut compaction_statistics = CompactionStatistics::default(); - // object id -> block id. For an object id, block id is updated in a monotonically non-decreasing manner. + // object id -> block id. For an object id, block id is updated in a monotonically increasing manner. let mut skip_schema_check: HashMap = HashMap::default(); let schemas: HashMap> = task_config .table_schemas @@ -903,7 +903,7 @@ where } // May drop stale columns - let check_table_id = full_key_tracker.latest_full_key.user_key.table_id.table_id; + let check_table_id = iter_key.user_key.table_id.table_id; let mut is_value_rewritten = false; if let HummockValue::Put(v) = value && let Some(object_id) = object_id From 0d251ee082a4b8290680455de4b3e2c6f452c49a Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Fri, 8 Mar 2024 10:41:57 +0800 Subject: [PATCH 12/12] update table stats --- src/storage/src/hummock/compactor/compactor_runner.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 8556eb7e827a2..ad71cc7f916a9 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -917,6 +917,7 @@ where .unwrap_or(false) && let Some(schema) = schemas.get(&check_table_id) { + let value_size = v.len(); match try_drop_invalid_columns(v, schema) { None => { if !task_config.disable_drop_column_optimization { @@ -932,6 +933,9 @@ where .add_full_key(iter_key, new_put, is_new_user_key) .verbose_instrument_await("add_rewritten_full_key") .await?; + let value_size_change = value_size as i64 - new_value.len() as i64; + assert!(value_size_change >= 0); + last_table_stats.total_value_size -= value_size_change; } } }