diff --git a/proto/hummock.proto b/proto/hummock.proto index d40b31c52f60c..50509160e2efd 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -288,6 +288,10 @@ message TableOption { optional uint32 retention_seconds = 2; } +message TableSchema { + repeated int32 column_ids = 1; +} + message CompactTask { enum TaskStatus { UNSPECIFIED = 0; @@ -356,6 +360,8 @@ message CompactTask { // The table watermark of any table id. In compaction we only use the table watermarks on safe epoch, // so we only need to include the table watermarks on safe epoch to reduce the size of metadata. map table_watermarks = 24; + // The table schemas that are at least as new as the one used to create `input_ssts`. + map table_schemas = 25; } message LevelHandler { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 00d36ced99ff8..f2e00232a38c9 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -345,6 +345,9 @@ pub struct MetaConfig { #[serde(default, with = "meta_prefix")] #[config_doc(omitted)] pub developer: MetaDeveloperConfig, + /// Whether compactor should rewrite row to remove dropped column. + #[serde(default = "default::meta::enable_dropped_column_reclaim")] + pub enable_dropped_column_reclaim: bool, } #[derive(Clone, Debug, Default)] @@ -1145,6 +1148,9 @@ pub mod default { pub fn parallelism_control_trigger_first_delay_sec() -> u64 { 30 } + pub fn enable_dropped_column_reclaim() -> bool { + false + } } pub mod server { diff --git a/src/common/src/util/value_encoding/column_aware_row_encoding.rs b/src/common/src/util/value_encoding/column_aware_row_encoding.rs index 2f3c426a484c3..28fcb65c4d8db 100644 --- a/src/common/src/util/value_encoding/column_aware_row_encoding.rs +++ b/src/common/src/util/value_encoding/column_aware_row_encoding.rs @@ -19,7 +19,7 @@ //! We have a `Serializer` and a `Deserializer` for each schema of `Row`, which can be reused //! until schema changes -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::sync::Arc; use bitflags::bitflags; @@ -105,6 +105,25 @@ impl RowEncoding { .expect("should encode at least one column"); self.set_offsets(&offset_usize, max_offset); } + + // TODO: Avoid duplicated code. `encode_slice` is the same as `encode` except it doesn't require column type. + fn encode_slice<'a>(&mut self, datum_refs: impl Iterator>) { + debug_assert!( + self.buf.is_empty(), + "should not encode one RowEncoding object multiple times." + ); + let mut offset_usize = vec![]; + for datum in datum_refs { + offset_usize.push(self.buf.len()); + if let Some(v) = datum { + self.buf.put_slice(v); + } + } + let max_offset = *offset_usize + .last() + .expect("should encode at least one column"); + self.set_offsets(&offset_usize, max_offset); + } } /// Column-Aware `Serializer` holds schema related information, and shall be @@ -268,3 +287,89 @@ impl ValueRowDeserializer for ColumnAwareSerde { self.deserializer.deserialize(encoded_bytes) } } + +/// Deserializes row `encoded_bytes`, drops columns not in `valid_column_ids`, serializes and returns. +/// If no column is dropped, returns None. +// TODO: Avoid duplicated code. The current code combines`Serializer` and `Deserializer` with unavailable parameter removed, e.g. `Deserializer::schema`. +pub fn try_drop_invalid_columns( + mut encoded_bytes: &[u8], + valid_column_ids: &HashSet, +) -> Option> { + let flag = Flag::from_bits(encoded_bytes.get_u8()).expect("should be a valid flag"); + let datum_num = encoded_bytes.get_u32_le() as usize; + let mut is_column_dropped = false; + let mut encoded_bytes_copy = encoded_bytes; + for _ in 0..datum_num { + let this_id = encoded_bytes_copy.get_i32_le(); + if !valid_column_ids.contains(&this_id) { + is_column_dropped = true; + break; + } + } + if !is_column_dropped { + return None; + } + + // Slow path that drops columns. Should be rare. + let offset_bytes = match flag - Flag::EMPTY { + Flag::OFFSET8 => 1, + Flag::OFFSET16 => 2, + Flag::OFFSET32 => 4, + _ => panic!("invalid flag {}", flag.bits()), + }; + let offsets_start_idx = 4 * datum_num; + let data_start_idx = offsets_start_idx + datum_num * offset_bytes; + let offsets = &encoded_bytes[offsets_start_idx..data_start_idx]; + let data = &encoded_bytes[data_start_idx..]; + let mut datums: Vec> = Vec::with_capacity(valid_column_ids.len()); + let mut column_ids = Vec::with_capacity(valid_column_ids.len()); + for i in 0..datum_num { + let this_id = encoded_bytes.get_i32_le(); + if valid_column_ids.contains(&this_id) { + column_ids.push(this_id); + let this_offset_start_idx = i * offset_bytes; + let mut this_offset_slice = + &offsets[this_offset_start_idx..(this_offset_start_idx + offset_bytes)]; + let this_offset = deserialize_width(offset_bytes, &mut this_offset_slice); + let data = if i + 1 < datum_num { + let mut next_offset_slice = &offsets[(this_offset_start_idx + offset_bytes) + ..(this_offset_start_idx + 2 * offset_bytes)]; + let next_offset = deserialize_width(offset_bytes, &mut next_offset_slice); + if this_offset == next_offset { + None + } else { + let data_slice = &data[this_offset..next_offset]; + Some(data_slice) + } + } else if this_offset == data.len() { + None + } else { + let data_slice = &data[this_offset..]; + Some(data_slice) + }; + datums.push(data); + } + } + if column_ids.is_empty() { + // According to `RowEncoding::encode`, at least one column is required. + return None; + } + + let mut encoding = RowEncoding::new(); + encoding.encode_slice(datums.into_iter()); + let mut encoded_column_ids = Vec::with_capacity(column_ids.len() * 4); + let datum_num = column_ids.len() as u32; + for id in column_ids { + encoded_column_ids.put_i32_le(id); + } + let mut row_bytes = Vec::with_capacity( + 5 + encoded_column_ids.len() + encoding.offsets.len() + encoding.buf.len(), /* 5 comes from u8+u32 */ + ); + row_bytes.put_u8(encoding.flag.bits()); + row_bytes.put_u32_le(datum_num); + row_bytes.extend(&encoded_column_ids); + row_bytes.extend(&encoding.offsets); + row_bytes.extend(&encoding.buf); + + Some(row_bytes) +} diff --git a/src/config/docs.md b/src/config/docs.md index cf28f0ca9e2c6..93cfa668b0d5f 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -28,6 +28,7 @@ This page is automatically generated by `./risedev generate-example-config` | do_not_config_object_storage_lifecycle | Whether config object storage bucket lifecycle to purge stale data. | false | | enable_committed_sst_sanity_check | Enable sanity check when SSTs are committed. | false | | enable_compaction_deterministic | Whether to enable deterministic compaction scheduling, which will disable all auto scheduling of compaction tasks. Should only be used in e2e tests. | false | +| enable_dropped_column_reclaim | Whether compactor should rewrite row to remove dropped column. | false | | enable_hummock_data_archive | If enabled, SSTable object file and version delta will be retained. SSTable object file need to be deleted via full GC. version delta need to be manually deleted. | false | | event_log_channel_max_size | Keeps the latest N events per channel. | 10 | | event_log_enabled | | true | diff --git a/src/config/example.toml b/src/config/example.toml index e322a180f978e..c115b779e6fb7 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -51,6 +51,7 @@ compaction_task_max_progress_interval_secs = 600 hybird_partition_vnode_count = 4 event_log_enabled = true event_log_channel_max_size = 10 +enable_dropped_column_reclaim = false [meta.compaction_config] max_bytes_for_level_base = 536870912 diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 112b584587a18..45a5797f40ec5 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -360,6 +360,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { .meta .developer .enable_check_task_level_overlap, + enable_dropped_column_reclaim: config.meta.enable_dropped_column_reclaim, }, config.system.into_init_system_params(), ) diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index b33365868e681..7676cbe04bda3 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -62,7 +62,7 @@ use risingwave_pb::hummock::subscribe_compaction_event_response::{ use risingwave_pb::hummock::{ CompactTask, CompactTaskAssignment, CompactionConfig, GroupDelta, HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, HummockVersionStats, IntraLevelDelta, - PbCompactionGroupInfo, SstableInfo, SubscribeCompactionEventRequest, TableOption, + PbCompactionGroupInfo, SstableInfo, SubscribeCompactionEventRequest, TableOption, TableSchema, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use rw_futures_util::{pending_on_none, select_all}; @@ -1228,11 +1228,26 @@ impl HummockManager { anyhow::anyhow!("failpoint metastore error") ))); - while let Some(task) = self + while let Some(mut task) = self .get_compact_task_impl(compaction_group_id, selector) .await? { if let TaskStatus::Pending = task.task_status() { + if self.env.opts.enable_dropped_column_reclaim { + task.table_schemas = match self.metadata_manager() { + MetadataManager::V1(mgr) => mgr + .catalog_manager + .get_versioned_table_schemas(&task.existing_table_ids) + .await + .into_iter() + .map(|(table_id, column_ids)| (table_id, TableSchema { column_ids })) + .collect(), + MetadataManager::V2(_) => { + // TODO #13952: support V2 + BTreeMap::default() + } + } + } return Ok(Some(task)); } assert!( diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 281ae32133f40..b5df9d223daf9 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -3692,6 +3692,33 @@ impl CatalogManager { .collect() } + /// Returns column ids of `table_ids` that is versioned. + /// Being versioned implies using `ColumnAwareSerde`. + pub async fn get_versioned_table_schemas( + &self, + table_ids: &[TableId], + ) -> HashMap> { + let guard = self.core.lock().await; + table_ids + .iter() + .filter_map(|table_id| { + if let Some(t) = guard.database.tables.get(table_id) + && t.version.is_some() + { + let ret = ( + t.id, + t.columns + .iter() + .map(|c| c.column_desc.as_ref().unwrap().column_id) + .collect_vec(), + ); + return Some(ret); + } + None + }) + .collect() + } + // TODO: replace *_count with SQL #[cfg_attr(coverage, coverage(off))] pub async fn source_count(&self) -> usize { diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index ea26fea83ebcc..bcfc0bc0d3e31 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -206,7 +206,6 @@ pub struct MetaOpts { pub event_log_enabled: bool, pub event_log_channel_max_size: u32, pub advertise_addr: String, - /// The number of traces to be cached in-memory by the tracing collector /// embedded in the meta node. pub cached_traces_num: u32, @@ -219,6 +218,7 @@ pub struct MetaOpts { /// l0 multi level picker whether to check the overlap accuracy between sub levels pub enable_check_task_level_overlap: bool, + pub enable_dropped_column_reclaim: bool, } impl MetaOpts { @@ -274,6 +274,7 @@ impl MetaOpts { cached_traces_memory_limit_bytes: usize::MAX, enable_trivial_move: true, enable_check_task_level_overlap: true, + enable_dropped_column_reclaim: false, } } } diff --git a/src/prost/build.rs b/src/prost/build.rs index cd65b20d6f192..e031e5cfb01ae 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -157,6 +157,7 @@ fn main() -> Result<(), Box> { .type_attribute("hummock.LevelHandler", "#[derive(Eq)]") .type_attribute("hummock.TableOption", "#[derive(Eq)]") .type_attribute("hummock.InputLevel", "#[derive(Eq)]") + .type_attribute("hummock.TableSchema", "#[derive(Eq)]") .type_attribute("hummock.CompactTask", "#[derive(Eq)]") .type_attribute("hummock.TableWatermarks", "#[derive(Eq)]") .type_attribute("hummock.VnodeWatermark", "#[derive(Eq)]") diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index 94ea463eb5af3..cadbad5cad85b 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -18,15 +18,19 @@ use std::sync::Arc; use criterion::async_executor::FuturesExecutor; use criterion::{criterion_group, criterion_main, Criterion}; use risingwave_common::cache::CachePriority; -use risingwave_common::catalog::TableId; +use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; use risingwave_common::hash::VirtualNode; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::DataType; +use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; +use risingwave_common::util::value_encoding::ValueRowSerializer; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::HummockEpoch; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::{InMemObjectStore, ObjectStore, ObjectStoreImpl}; -use risingwave_pb::hummock::{compact_task, SstableInfo}; +use risingwave_pb::hummock::{compact_task, SstableInfo, TableSchema}; use risingwave_storage::hummock::compactor::compactor_runner::compact_and_build_sst; use risingwave_storage::hummock::compactor::{ ConcatSstableIterator, DummyCompactionFilter, TaskConfig, TaskProgress, @@ -78,9 +82,9 @@ pub fn default_writer_opts() -> SstableWriterOptions { } } -pub fn test_key_of(idx: usize, epoch: u64) -> FullKey> { +pub fn test_key_of(idx: usize, epoch: u64, table_id: TableId) -> FullKey> { FullKey::for_test( - TableId::default(), + table_id, [ VirtualNode::ZERO.to_be_bytes().as_slice(), format!("key_test_{:08}", idx * 2).as_bytes(), @@ -117,7 +121,7 @@ async fn build_table( let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(sstable_object_id, writer, opt); let value = b"1234567890123456789"; - let mut full_key = test_key_of(0, epoch); + let mut full_key = test_key_of(0, epoch, TableId::new(0)); let table_key_len = full_key.user_key.table_key.len(); for i in range { let start = (i % 8) as usize; @@ -135,6 +139,63 @@ async fn build_table( sst } +async fn build_table_2( + sstable_store: SstableStoreRef, + sstable_object_id: u64, + range: Range, + epoch: u64, + table_id: u32, + column_num: usize, +) -> SstableInfo { + let opt = SstableBuilderOptions { + capacity: 32 * 1024 * 1024, + block_capacity: 16 * 1024, + restart_interval: 16, + bloom_false_positive: 0.001, + ..Default::default() + }; + let writer = sstable_store.create_sst_writer( + sstable_object_id, + SstableWriterOptions { + capacity_hint: None, + tracker: None, + policy: CachePolicy::Fill(CachePriority::High), + }, + ); + let mut builder = + SstableBuilder::<_, Xor16FilterBuilder>::for_test(sstable_object_id, writer, opt); + let mut full_key = test_key_of(0, epoch, TableId::new(table_id)); + let table_key_len = full_key.user_key.table_key.len(); + + let schema = vec![DataType::Int64; column_num]; + let column_ids = (0..column_num as i32).map(ColumnId::new); + use risingwave_common::types::ScalarImpl; + let row = OwnedRow::new(vec![Some(ScalarImpl::Int64(5)); column_num]); + let table_columns: Vec<_> = column_ids + .clone() + .map(|id| ColumnDesc::unnamed(id, schema.get(id.get_id() as usize).unwrap().clone())) + .collect(); + use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew; + let serializer = ColumnAwareSerde::new( + Arc::from_iter(column_ids.map(|id| id.get_id() as usize)), + table_columns.into(), + ); + let row_bytes = serializer.serialize(row); + + for i in range { + full_key.user_key.table_key[table_key_len - 8..].copy_from_slice(&i.to_be_bytes()); + builder + .add_for_test(full_key.to_ref(), HummockValue::put(&row_bytes)) + .await + .unwrap(); + } + let output = builder.finish().await.unwrap(); + let handle = output.writer_output; + let sst = output.sst_info.sst_info; + handle.await.unwrap().unwrap(); + sst +} + async fn scan_all_table(info: &SstableInfo, sstable_store: SstableStoreRef) { let mut stats = StoreLocalStatistic::default(); let table = sstable_store.sstable(info, &mut stats).await.unwrap(); @@ -183,7 +244,11 @@ fn bench_table_scan(c: &mut Criterion) { }); } -async fn compact>(iter: I, sstable_store: SstableStoreRef) { +async fn compact>( + iter: I, + sstable_store: SstableStoreRef, + task_config: Option, +) { let opt = SstableBuilderOptions { capacity: 32 * 1024 * 1024, block_capacity: 64 * 1024, @@ -194,7 +259,7 @@ async fn compact>(iter: I, sstable_store let mut builder = CapacitySplitTableBuilder::for_test(LocalTableBuilderFactory::new(32, sstable_store, opt)); - let task_config = TaskConfig { + let task_config = task_config.unwrap_or_else(|| TaskConfig { key_range: KeyRange::inf(), cache_policy: CachePolicy::Disable, gc_delete_keys: false, @@ -203,7 +268,7 @@ async fn compact>(iter: I, sstable_store task_type: compact_task::TaskType::Dynamic, use_block_based_filter: true, ..Default::default() - }; + }); compact_and_build_sst( &mut builder, CompactionDeleteRangeIterator::new(ForwardMergeRangeIterator::new(HummockEpoch::MAX)), @@ -261,7 +326,7 @@ fn bench_merge_iterator_compactor(c: &mut Criterion) { ConcatIterator::new(level2.clone(), sstable_store.clone(), read_options.clone()), ]; let iter = MergeIterator::for_compactor(sub_iters); - async move { compact(iter, sstable_store1).await } + async move { compact(iter, sstable_store1, None).await } }); }); c.bench_function("bench_merge_iterator", |b| { @@ -286,15 +351,191 @@ fn bench_merge_iterator_compactor(c: &mut Criterion) { ]; let iter = MergeIterator::for_compactor(sub_iters); let sstable_store1 = sstable_store.clone(); - async move { compact(iter, sstable_store1).await } + async move { compact(iter, sstable_store1, None).await } }); }); } +fn bench_drop_column_compaction_impl(c: &mut Criterion, column_num: usize) { + let runtime = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + let sstable_store = mock_sstable_store(); + let test_key_size = 256 * 1024; + let info1 = runtime.block_on(async { + build_table_2( + sstable_store.clone(), + 1, + 0..test_key_size, + 1, + 10, + column_num, + ) + .await + }); + let info2 = runtime.block_on(async { + build_table_2( + sstable_store.clone(), + 2, + 0..test_key_size, + 1, + 11, + column_num, + ) + .await + }); + let level1 = vec![info1, info2]; + + let info1 = runtime.block_on(async { + build_table_2( + sstable_store.clone(), + 3, + 0..test_key_size, + 2, + 10, + column_num, + ) + .await + }); + let info2 = runtime.block_on(async { + build_table_2( + sstable_store.clone(), + 4, + 0..test_key_size, + 2, + 11, + column_num, + ) + .await + }); + let level2 = vec![info1, info2]; + + let task_config_no_schema = TaskConfig { + key_range: KeyRange::inf(), + cache_policy: CachePolicy::Disable, + gc_delete_keys: false, + watermark: 0, + stats_target_table_ids: None, + task_type: compact_task::TaskType::Dynamic, + use_block_based_filter: true, + table_schemas: vec![].into_iter().collect(), + ..Default::default() + }; + + let mut task_config_schema = task_config_no_schema.clone(); + task_config_schema.table_schemas.insert( + 10, + TableSchema { + column_ids: (0..column_num as i32).collect(), + }, + ); + task_config_schema.table_schemas.insert( + 11, + TableSchema { + column_ids: (0..column_num as i32).collect(), + }, + ); + + let mut task_config_schema_cause_drop = task_config_no_schema.clone(); + task_config_schema_cause_drop.table_schemas.insert( + 10, + TableSchema { + column_ids: (0..column_num as i32 / 2).collect(), + }, + ); + task_config_schema_cause_drop.table_schemas.insert( + 11, + TableSchema { + column_ids: (0..column_num as i32 / 2).collect(), + }, + ); + + let get_iter = || { + let sub_iters = vec![ + ConcatSstableIterator::new( + vec![10, 11], + level1.clone(), + KeyRange::inf(), + sstable_store.clone(), + Arc::new(TaskProgress::default()), + 0, + ), + ConcatSstableIterator::new( + vec![10, 11], + level2.clone(), + KeyRange::inf(), + sstable_store.clone(), + Arc::new(TaskProgress::default()), + 0, + ), + ]; + MergeIterator::for_compactor(sub_iters) + }; + + c.bench_function( + &format!("bench_drop_column_compaction_baseline_c{column_num}"), + |b| { + b.to_async(&runtime).iter(|| { + let iter = get_iter(); + let sstable_store1 = sstable_store.clone(); + let task_config_clone = task_config_no_schema.clone(); + async move { compact(iter, sstable_store1, Some(task_config_clone)).await } + }); + }, + ); + + c.bench_function( + &format!("bench_drop_column_compaction_without_drop_c{column_num}"), + |b| { + b.to_async(&runtime).iter(|| { + let iter = get_iter(); + let sstable_store1 = sstable_store.clone(); + let task_config_clone = task_config_schema.clone(); + async move { compact(iter, sstable_store1, Some(task_config_clone)).await } + }); + }, + ); + + c.bench_function( + &format!("bench_drop_column_compaction_without_drop_disable_optimization_c{column_num}"), + |b| { + b.to_async(&runtime).iter(|| { + let iter = get_iter(); + let sstable_store1 = sstable_store.clone(); + let mut task_config_clone = task_config_schema.clone(); + task_config_clone.disable_drop_column_optimization = true; + async move { compact(iter, sstable_store1, Some(task_config_clone)).await } + }); + }, + ); + + c.bench_function( + &format!("bench_drop_column_compaction_with_drop_c{column_num}"), + |b| { + b.to_async(&runtime).iter(|| { + let iter = get_iter(); + let sstable_store1 = sstable_store.clone(); + let task_config_clone = task_config_schema_cause_drop.clone(); + async move { compact(iter, sstable_store1, Some(task_config_clone)).await } + }); + }, + ); +} + +fn bench_drop_column_compaction_small(c: &mut Criterion) { + bench_drop_column_compaction_impl(c, 10); +} + +fn bench_drop_column_compaction_large(c: &mut Criterion) { + bench_drop_column_compaction_impl(c, 100); +} + criterion_group!( benches, bench_table_build, bench_table_scan, - bench_merge_iterator_compactor + bench_merge_iterator_compactor, + bench_drop_column_compaction_small, + bench_drop_column_compaction_large ); criterion_main!(benches); diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 758a6d1253451..60574992961e7 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::marker::PhantomData; use std::ops::Bound; use std::sync::atomic::{AtomicU64, Ordering}; @@ -27,7 +27,7 @@ use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_hummock_sdk::table_stats::TableStatsMap; use risingwave_hummock_sdk::{can_concat, EpochWithGap, KeyComparator}; use risingwave_pb::hummock::{ - compact_task, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo, + compact_task, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo, TableSchema, }; use tokio::time::Instant; @@ -131,6 +131,12 @@ pub struct TaskConfig { pub use_block_based_filter: bool, pub table_vnode_partition: BTreeMap, + /// `TableId` -> `TableSchema` + /// Schemas in `table_schemas` are at least as new as the one used to create `input_ssts`. + /// For a table with schema existing in `table_schemas`, its columns not in `table_schemas` but in `input_ssts` can be safely dropped. + pub table_schemas: HashMap, + /// `disable_drop_column_optimization` should only be set in benchmark. + pub disable_drop_column_optimization: bool, } pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter { diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 441480d66a6b7..ad71cc7f916a9 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -21,13 +21,14 @@ use bytes::Bytes; use futures::{stream, FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::util::epoch::is_max_epoch; +use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns; use risingwave_hummock_sdk::compact::{ compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task, }; use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, PointRange}; use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon}; use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, TableStatsMap}; -use risingwave_hummock_sdk::{can_concat, EpochWithGap, HummockEpoch}; +use risingwave_hummock_sdk::{can_concat, EpochWithGap, HummockEpoch, HummockSstableObjectId}; use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType}; use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType}; use thiserror_ext::AsReport; @@ -47,6 +48,7 @@ use crate::hummock::compactor::{ }; use crate::hummock::iterator::{ Forward, ForwardMergeRangeIterator, HummockIterator, MergeIterator, SkipWatermarkIterator, + ValueMeta, }; use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory}; use crate::hummock::utils::MemoryTracker; @@ -109,6 +111,12 @@ impl CompactorRunner { || task.target_level == task.base_level, use_block_based_filter, table_vnode_partition: task.table_vnode_partition.clone(), + table_schemas: task + .table_schemas + .iter() + .map(|(k, v)| (*k, v.clone())) + .collect(), + disable_drop_column_optimization: false, }, object_id_getter, ); @@ -759,6 +767,13 @@ where let mut last_table_stats = TableStats::default(); let mut last_table_id = None; let mut compaction_statistics = CompactionStatistics::default(); + // object id -> block id. For an object id, block id is updated in a monotonically increasing manner. + let mut skip_schema_check: HashMap = HashMap::default(); + let schemas: HashMap> = task_config + .table_schemas + .iter() + .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect())) + .collect(); while iter.is_valid() { let mut iter_key = iter.key(); compaction_statistics.iter_total_key_counts += 1; @@ -769,6 +784,10 @@ where // CRITICAL WARN: Because of memtable spill, there may be several versions of the same user-key share the same `pure_epoch`. Do not change this code unless necessary. let epoch = iter_key.epoch_with_gap.pure_epoch(); let value = iter.value(); + let ValueMeta { + object_id, + block_id, + } = iter.value_meta(); if is_new_user_key { if !max_key.is_empty() && iter_key >= max_key { break; @@ -883,11 +902,51 @@ where is_new_user_key = false; } - // Don't allow two SSTs to share same user key - sst_builder - .add_full_key(iter_key, value, is_new_user_key) - .verbose_instrument_await("add_full_key") - .await?; + // May drop stale columns + let check_table_id = iter_key.user_key.table_id.table_id; + let mut is_value_rewritten = false; + if let HummockValue::Put(v) = value + && let Some(object_id) = object_id + && let Some(block_id) = block_id + && !skip_schema_check + .get(&object_id) + .map(|prev_block_id| { + assert!(*prev_block_id <= block_id); + *prev_block_id == block_id + }) + .unwrap_or(false) + && let Some(schema) = schemas.get(&check_table_id) + { + let value_size = v.len(); + match try_drop_invalid_columns(v, schema) { + None => { + if !task_config.disable_drop_column_optimization { + // Under the assumption that all values in the same (object, block) group should share the same schema, + // if one value drops no columns during a compaction, no need to check other values in the same group. + skip_schema_check.insert(object_id, block_id); + } + } + Some(new_value) => { + is_value_rewritten = true; + let new_put = HummockValue::put(new_value.as_slice()); + sst_builder + .add_full_key(iter_key, new_put, is_new_user_key) + .verbose_instrument_await("add_rewritten_full_key") + .await?; + let value_size_change = value_size as i64 - new_value.len() as i64; + assert!(value_size_change >= 0); + last_table_stats.total_value_size -= value_size_change; + } + } + } + + if !is_value_rewritten { + // Don't allow two SSTs to share same user key + sst_builder + .add_full_key(iter_key, value, is_new_user_key) + .verbose_instrument_await("add_full_key") + .await?; + } iter.next().verbose_instrument_await("iter_next").await?; } diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs index 92b5e8ee3b835..7a30a6fd2ef65 100644 --- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs +++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs @@ -296,6 +296,8 @@ impl CompactorRunner { is_target_l0_or_lbase: task.target_level == 0 || task.target_level == task.base_level, table_vnode_partition: task.table_vnode_partition.clone(), use_block_based_filter: true, + table_schemas: Default::default(), + disable_drop_column_optimization: false, }; let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone()); diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 9d0a1047dcafe..13fafe720be4c 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -28,7 +28,7 @@ use risingwave_pb::hummock::SstableInfo; use crate::hummock::block_stream::BlockDataStream; use crate::hummock::compactor::task_progress::TaskProgress; -use crate::hummock::iterator::{Forward, HummockIterator}; +use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta}; use crate::hummock::sstable_store::SstableStoreRef; use crate::hummock::value::HummockValue; use crate::hummock::{BlockHolder, BlockIterator, BlockMeta, HummockResult}; @@ -486,6 +486,17 @@ impl HummockIterator for ConcatSstableIterator { fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { stats.add(&self.stats) } + + fn value_meta(&self) -> ValueMeta { + let iter = self.sstable_iter.as_ref().expect("no table iter"); + // sstable_iter's seek_block_idx must have advanced at least one. + // See SstableStreamIterator::next_block. + assert!(iter.seek_block_idx >= 1); + ValueMeta { + object_id: Some(iter.sstable_info.object_id), + block_id: Some(iter.seek_block_idx as u64 - 1), + } + } } pub struct MonitoredCompactorIterator { @@ -547,6 +558,10 @@ impl> HummockIterator for MonitoredCompa fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { self.inner.collect_local_statistic(stats) } + + fn value_meta(&self) -> ValueMeta { + self.inner.value_meta() + } } #[cfg(test)] diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 3e7e58938caf9..3426f6880deff 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -505,6 +505,8 @@ impl SharedBufferCompactRunner { is_target_l0_or_lbase: true, table_vnode_partition, use_block_based_filter, + table_schemas: Default::default(), + disable_drop_column_optimization: false, }, object_id_getter, ); diff --git a/src/storage/src/hummock/iterator/concat_inner.rs b/src/storage/src/hummock/iterator/concat_inner.rs index d624068f3694e..357d12e32036e 100644 --- a/src/storage/src/hummock/iterator/concat_inner.rs +++ b/src/storage/src/hummock/iterator/concat_inner.rs @@ -18,7 +18,9 @@ use std::sync::Arc; use risingwave_hummock_sdk::key::FullKey; use risingwave_pb::hummock::SstableInfo; -use crate::hummock::iterator::{DirectionEnum, HummockIterator, HummockIteratorDirection}; +use crate::hummock::iterator::{ + DirectionEnum, HummockIterator, HummockIteratorDirection, ValueMeta, +}; use crate::hummock::sstable::SstableIteratorReadOptions; use crate::hummock::value::HummockValue; use crate::hummock::{HummockResult, SstableIteratorType, SstableStoreRef}; @@ -180,4 +182,11 @@ impl HummockIterator for ConcatIteratorInner { iter.collect_local_statistic(stats); } } + + fn value_meta(&self) -> ValueMeta { + self.sstable_iter + .as_ref() + .expect("no table iter") + .value_meta() + } } diff --git a/src/storage/src/hummock/iterator/forward_merge.rs b/src/storage/src/hummock/iterator/forward_merge.rs index 8219a7eb52823..6e360d7f18c9c 100644 --- a/src/storage/src/hummock/iterator/forward_merge.rs +++ b/src/storage/src/hummock/iterator/forward_merge.rs @@ -28,7 +28,7 @@ mod test { gen_merge_iterator_interleave_test_sstable_iters, iterator_test_key_of, iterator_test_value_of, mock_sstable_store, TEST_KEYS_COUNT, }; - use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator}; + use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, ValueMeta}; use crate::hummock::sstable::{ SstableIterator, SstableIteratorReadOptions, SstableIteratorType, }; @@ -196,6 +196,10 @@ mod test { } fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {} + + fn value_meta(&self) -> ValueMeta { + ValueMeta::default() + } } #[tokio::test] diff --git a/src/storage/src/hummock/iterator/merge_inner.rs b/src/storage/src/hummock/iterator/merge_inner.rs index 00c789a386bd7..54221ee8b70a9 100644 --- a/src/storage/src/hummock/iterator/merge_inner.rs +++ b/src/storage/src/hummock/iterator/merge_inner.rs @@ -20,7 +20,9 @@ use futures::FutureExt; use risingwave_hummock_sdk::key::FullKey; use super::Forward; -use crate::hummock::iterator::{DirectionEnum, HummockIterator, HummockIteratorDirection}; +use crate::hummock::iterator::{ + DirectionEnum, HummockIterator, HummockIteratorDirection, ValueMeta, +}; use crate::hummock::shared_buffer::shared_buffer_batch::{ SharedBufferBatchIterator, SharedBufferVersionedEntryRef, }; @@ -291,4 +293,8 @@ where fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { self.collect_local_statistic_impl(stats); } + + fn value_meta(&self) -> ValueMeta { + self.heap.peek().expect("no inner iter").iter.value_meta() + } } diff --git a/src/storage/src/hummock/iterator/mod.rs b/src/storage/src/hummock/iterator/mod.rs index 7392709710996..8688693037218 100644 --- a/src/storage/src/hummock/iterator/mod.rs +++ b/src/storage/src/hummock/iterator/mod.rs @@ -54,6 +54,12 @@ pub use skip_watermark::*; use crate::monitor::StoreLocalStatistic; +#[derive(Default)] +pub struct ValueMeta { + pub object_id: Option, + pub block_id: Option, +} + /// `HummockIterator` defines the interface of all iterators, including `SstableIterator`, /// `MergeIterator`, `UserIterator` and `ConcatIterator`. /// @@ -125,6 +131,9 @@ pub trait HummockIterator: Send + Sync { /// take local statistic info from iterator to report metrics. fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic); + + /// Returns value meta. + fn value_meta(&self) -> ValueMeta; } /// This is a placeholder trait used in `HummockIteratorUnion` @@ -160,6 +169,10 @@ impl HummockIterator for PhantomHummockIterator } fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {} + + fn value_meta(&self) -> ValueMeta { + unreachable!() + } } /// The `HummockIteratorUnion` acts like a wrapper over multiple types of `HummockIterator`, so that @@ -259,6 +272,15 @@ impl< Fourth(iter) => iter.collect_local_statistic(stats), } } + + fn value_meta(&self) -> ValueMeta { + match self { + First(iter) => iter.value_meta(), + Second(iter) => iter.value_meta(), + Third(iter) => iter.value_meta(), + Fourth(iter) => iter.value_meta(), + } + } } impl HummockIterator for Box { @@ -291,6 +313,10 @@ impl HummockIterator for Box { fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { (*self).deref().collect_local_statistic(stats); } + + fn value_meta(&self) -> ValueMeta { + (*self).deref().value_meta() + } } pub enum RustIteratorOfBuilder<'a, B: RustIteratorBuilder> { @@ -439,6 +465,10 @@ impl<'a, B: RustIteratorBuilder> HummockIterator for FromRustIterator<'a, B> { } fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {} + + fn value_meta(&self) -> ValueMeta { + ValueMeta::default() + } } #[derive(PartialEq, Eq, Debug)] diff --git a/src/storage/src/hummock/iterator/skip_watermark.rs b/src/storage/src/hummock/iterator/skip_watermark.rs index ac0b24dbfa133..8ddb95b1b8791 100644 --- a/src/storage/src/hummock/iterator/skip_watermark.rs +++ b/src/storage/src/hummock/iterator/skip_watermark.rs @@ -23,7 +23,7 @@ use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection}; use risingwave_pb::hummock::PbTableWatermarks; -use crate::hummock::iterator::{Forward, HummockIterator}; +use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta}; use crate::hummock::value::HummockValue; use crate::hummock::HummockResult; use crate::monitor::StoreLocalStatistic; @@ -115,6 +115,10 @@ impl> HummockIterator for SkipWatermarkI fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { self.inner.collect_local_statistic(stats) } + + fn value_meta(&self) -> ValueMeta { + self.inner.value_meta() + } } pub struct SkipWatermarkState { watermarks: BTreeMap, diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index a797937321c01..14b95814abb14 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -32,7 +32,7 @@ use risingwave_hummock_sdk::EpochWithGap; use crate::hummock::event_handler::LocalInstanceId; use crate::hummock::iterator::{ Backward, DeleteRangeIterator, DirectionEnum, Forward, HummockIterator, - HummockIteratorDirection, + HummockIteratorDirection, ValueMeta, }; use crate::hummock::utils::{range_overlap, MemoryTracker}; use crate::hummock::value::HummockValue; @@ -656,6 +656,10 @@ impl HummockIterator for SharedBufferBatchIterator< } fn collect_local_statistic(&self, _stats: &mut crate::monitor::StoreLocalStatistic) {} + + fn value_meta(&self) -> ValueMeta { + ValueMeta::default() + } } pub struct SharedBufferDeleteRangeIterator { diff --git a/src/storage/src/hummock/sstable/backward_sstable_iterator.rs b/src/storage/src/hummock/sstable/backward_sstable_iterator.rs index 35445b1138bff..96fe33c955477 100644 --- a/src/storage/src/hummock/sstable/backward_sstable_iterator.rs +++ b/src/storage/src/hummock/sstable/backward_sstable_iterator.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use risingwave_common::cache::CachePriority; use risingwave_hummock_sdk::key::FullKey; -use crate::hummock::iterator::{Backward, HummockIterator}; +use crate::hummock::iterator::{Backward, HummockIterator, ValueMeta}; use crate::hummock::sstable::SstableIteratorReadOptions; use crate::hummock::value::HummockValue; use crate::hummock::{ @@ -148,6 +148,13 @@ impl HummockIterator for BackwardSstableIterator { fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { stats.add(&self.stats) } + + fn value_meta(&self) -> ValueMeta { + ValueMeta { + object_id: Some(self.sst.id), + block_id: Some(self.cur_idx as _), + } + } } impl SstableIteratorType for BackwardSstableIterator { diff --git a/src/storage/src/hummock/sstable/forward_sstable_iterator.rs b/src/storage/src/hummock/sstable/forward_sstable_iterator.rs index fbb3c8785dc5e..66fe6d15f68f2 100644 --- a/src/storage/src/hummock/sstable/forward_sstable_iterator.rs +++ b/src/storage/src/hummock/sstable/forward_sstable_iterator.rs @@ -22,7 +22,7 @@ use thiserror_ext::AsReport; use super::super::{HummockResult, HummockValue}; use crate::hummock::block_stream::BlockStream; -use crate::hummock::iterator::{Forward, HummockIterator}; +use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta}; use crate::hummock::sstable::SstableIteratorReadOptions; use crate::hummock::{BlockIterator, SstableStoreRef, TableHolder}; use crate::monitor::StoreLocalStatistic; @@ -292,6 +292,13 @@ impl HummockIterator for SstableIterator { fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { stats.add(&self.stats); } + + fn value_meta(&self) -> ValueMeta { + ValueMeta { + object_id: Some(self.sst.id), + block_id: Some(self.cur_idx as _), + } + } } impl SstableIteratorType for SstableIterator { diff --git a/src/storage/src/row_serde/value_serde.rs b/src/storage/src/row_serde/value_serde.rs index 7bd760fa340ce..bde7091597605 100644 --- a/src/storage/src/row_serde/value_serde.rs +++ b/src/storage/src/row_serde/value_serde.rs @@ -142,10 +142,13 @@ impl ValueRowSerde for ColumnAwareSerde { #[cfg(test)] mod tests { + use std::collections::HashSet; + use risingwave_common::catalog::ColumnId; use risingwave_common::row::OwnedRow; use risingwave_common::types::ScalarImpl::*; use risingwave_common::util::value_encoding::column_aware_row_encoding; + use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns; use super::*; @@ -269,4 +272,38 @@ mod tests { let decoded_row = serde.deserialize(&encoded_bytes); assert_eq!(decoded_row.unwrap(), data); } + + #[test] + fn test_drop_column() { + let column_ids = vec![ColumnId::new(0), ColumnId::new(1), ColumnId::new(2)]; + let row1 = OwnedRow::new(vec![ + Some(Int16(5)), + Some(Utf8("abc".into())), + Some(Utf8("ABC".into())), + ]); + let serializer = column_aware_row_encoding::Serializer::new(&column_ids); + let row_bytes = serializer.serialize(row1); + + // no columns is dropped + assert!( + try_drop_invalid_columns(&row_bytes, &[0, 1, 2, 3, 4].into_iter().collect()).is_none() + ); + + // column id 1 is dropped + let row_bytes_dropped = + try_drop_invalid_columns(&row_bytes, &[0, 2].into_iter().collect()).unwrap(); + let deserializer = column_aware_row_encoding::Deserializer::new( + &[ColumnId::new(0), ColumnId::new(2)], + Arc::from(vec![DataType::Int16, DataType::Varchar].into_boxed_slice()), + std::iter::empty(), + ); + let decoded = deserializer.deserialize(&row_bytes_dropped[..]); + assert_eq!( + decoded.unwrap(), + vec![Some(Int16(5)), Some(Utf8("ABC".into()))] + ); + + // drop all columns is now allowed + assert!(try_drop_invalid_columns(&row_bytes, &HashSet::new()).is_none()); + } }