Skip to content

Commit

Permalink
feat(storage): clear up dropped column
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Dec 12, 2023
1 parent a3c71aa commit 86095b3
Show file tree
Hide file tree
Showing 18 changed files with 264 additions and 11 deletions.
6 changes: 6 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ message TableOption {
uint32 retention_seconds = 1;
}

message TableSchema {
repeated int32 column_ids = 1;
}

message CompactTask {
enum TaskStatus {
UNSPECIFIED = 0;
Expand Down Expand Up @@ -337,6 +341,8 @@ message CompactTask {
// Deprecated. use table_vnode_partition instead;
uint32 split_weight_by_vnode = 22 [deprecated = true];
map<uint32, uint32> table_vnode_partition = 23;
// The table schemas that are at least as new as the one used to create `input_ssts`.
map<uint32, TableSchema> table_schemas = 24;
}

message LevelHandler {
Expand Down
104 changes: 103 additions & 1 deletion src/common/src/util/value_encoding/column_aware_row_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! We have a `Serializer` and a `Deserializer` for each schema of `Row`, which can be reused
//! until schema changes
use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::sync::Arc;

use bitflags::bitflags;
Expand Down Expand Up @@ -105,6 +105,25 @@ impl RowEncoding {
.expect("should encode at least one column");
self.set_offsets(&offset_usize, max_offset);
}

// TODO: avoid duplicated code
fn encode_slice<'a>(&mut self, datum_refs: impl Iterator<Item = Option<&'a [u8]>>) {
debug_assert!(
self.buf.is_empty(),
"should not encode one RowEncoding object multiple times."
);
let mut offset_usize = vec![];
for datum in datum_refs {
offset_usize.push(self.buf.len());
if let Some(v) = datum {
self.buf.put_slice(v);
}
}
let max_offset = *offset_usize
.last()
.expect("should encode at least one column");
self.set_offsets(&offset_usize, max_offset);
}
}

/// Column-Aware `Serializer` holds schema related information, and shall be
Expand Down Expand Up @@ -268,3 +287,86 @@ impl ValueRowDeserializer for ColumnAwareSerde {
self.deserializer.deserialize(encoded_bytes)
}
}

/// Deserializes row `encoded_bytes`, drops columns not in `valid_column_ids`, serializes and returns.
/// If no column is dropped, returns None.
// TODO: avoid duplicated code
pub fn try_drop_invalid_columns(
mut encoded_bytes: &[u8],
valid_column_ids: &[i32],
) -> Option<Vec<u8>> {
let valid_column_ids: HashSet<i32> = valid_column_ids.iter().copied().collect();
let datum_num = encoded_bytes.get_u32_le() as usize;
let mut is_column_dropped = false;
let mut encoded_bytes_copy = encoded_bytes;
for _ in 0..datum_num {
let this_id = encoded_bytes_copy.get_i32_le();
if !valid_column_ids.contains(&this_id) {
is_column_dropped = true;
break;
}
}
if !is_column_dropped {
return None;
}

// Slow path that drops columns. Should be rare.
let flag = Flag::from_bits(encoded_bytes.get_u8()).expect("should be a valid flag");
let offset_bytes = match flag - Flag::EMPTY {
Flag::OFFSET8 => 1,
Flag::OFFSET16 => 2,
Flag::OFFSET32 => 4,
_ => panic!("invalid flag {}", flag.bits()),
};
let offsets_start_idx = 4 * datum_num;
let data_start_idx = offsets_start_idx + datum_num * offset_bytes;
let offsets = &encoded_bytes[offsets_start_idx..data_start_idx];
let data = &encoded_bytes[data_start_idx..];
let mut datums: Vec<Option<&[u8]>> = Vec::with_capacity(valid_column_ids.len());
let mut column_ids = Vec::with_capacity(valid_column_ids.len());
for i in 0..datum_num {
let this_id = encoded_bytes.get_i32_le();
if valid_column_ids.contains(&this_id) {
column_ids.push(this_id);
let this_offset_start_idx = i * offset_bytes;
let mut this_offset_slice =
&offsets[this_offset_start_idx..(this_offset_start_idx + offset_bytes)];
let this_offset = deserialize_width(offset_bytes, &mut this_offset_slice);
let data = if i + 1 < datum_num {
let mut next_offset_slice = &offsets[(this_offset_start_idx + offset_bytes)
..(this_offset_start_idx + 2 * offset_bytes)];
let next_offset = deserialize_width(offset_bytes, &mut next_offset_slice);
if this_offset == next_offset {
None
} else {
let data_slice = &data[this_offset..next_offset];
Some(data_slice)
}
} else if this_offset == data.len() {
None
} else {
let data_slice = &data[this_offset..];
Some(data_slice)
};
datums.push(data);
}
}

let mut encoding = RowEncoding::new();
encoding.encode_slice(datums.into_iter());
let mut encoded_column_ids = Vec::with_capacity(column_ids.len() * 4);
let datum_num = column_ids.len() as u32;
for id in column_ids {
encoded_column_ids.put_i32_le(id);
}
let mut row_bytes = Vec::with_capacity(
5 + encoded_column_ids.len() + encoding.offsets.len() + encoding.buf.len(), /* 5 comes from u8+u32 */
);
row_bytes.put_u8(encoding.flag.bits());
row_bytes.put_u32_le(datum_num);
row_bytes.extend(&encoded_column_ids);
row_bytes.extend(&encoding.offsets);
row_bytes.extend(&encoding.buf);

Some(row_bytes)
}
12 changes: 10 additions & 2 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ use risingwave_pb::hummock::{
version_update_payload, CompactTask, CompactTaskAssignment, CompactionConfig, GroupDelta,
HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, HummockVersion,
HummockVersionCheckpoint, HummockVersionDelta, HummockVersionDeltas, HummockVersionStats,
IntraLevelDelta, SstableInfo, SubscribeCompactionEventRequest, TableOption, TableWatermarks,
IntraLevelDelta, SstableInfo, SubscribeCompactionEventRequest, TableOption, TableSchema,
TableWatermarks,
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
Expand Down Expand Up @@ -1126,11 +1127,18 @@ impl HummockManager {
anyhow::anyhow!("failpoint metastore error")
)));

while let Some(task) = self
while let Some(mut task) = self
.get_compact_task_impl(compaction_group_id, selector)
.await?
{
if let TaskStatus::Pending = task.task_status() {
task.table_schemas = self
.catalog_manager
.get_versioned_table_schemas(&task.existing_table_ids)
.await
.into_iter()
.map(|(table_id, column_ids)| (table_id, TableSchema { column_ids }))
.collect();
return Ok(Some(task));
}
assert!(
Expand Down
25 changes: 25 additions & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3210,6 +3210,31 @@ impl CatalogManager {
.map(|table| table.id)
.collect()
}

/// Returns column ids of `table_ids` that is versioned.
/// Being versioned implies using `ColumnAwareSerde`.
pub async fn get_versioned_table_schemas(
&self,
table_ids: &[TableId],
) -> HashMap<TableId, Vec<i32>> {
let guard = self.core.lock().await;
table_ids
.iter()
.filter_map(|table_id| {
if let Some(t) = guard.database.tables.get(table_id) && !t.version.is_none() {
let ret = (
t.id,
t.columns
.iter()
.map(|c| c.column_desc.as_ref().unwrap().column_id)
.collect_vec(),
);
return Some(ret);
}
None
})
.collect()
}
}

// User related methods
Expand Down
1 change: 1 addition & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.type_attribute("hummock.LevelHandler", "#[derive(Eq)]")
.type_attribute("hummock.TableOption", "#[derive(Eq)]")
.type_attribute("hummock.InputLevel", "#[derive(Eq)]")
.type_attribute("hummock.TableSchema", "#[derive(Eq)]")
.type_attribute("hummock.CompactTask", "#[derive(Eq)]")
// ===================
.out_dir(out_dir.as_path())
Expand Down
10 changes: 8 additions & 2 deletions src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::marker::PhantomData;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
Expand All @@ -24,7 +24,9 @@ use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_hummock_sdk::table_stats::TableStatsMap;
use risingwave_hummock_sdk::{EpochWithGap, KeyComparator};
use risingwave_pb::hummock::{compact_task, CompactTask, KeyRange as KeyRange_vec, SstableInfo};
use risingwave_pb::hummock::{
compact_task, CompactTask, KeyRange as KeyRange_vec, SstableInfo, TableSchema,
};
use tokio::time::Instant;

pub use super::context::CompactorContext;
Expand Down Expand Up @@ -122,6 +124,10 @@ pub struct TaskConfig {
pub use_block_based_filter: bool,

pub table_vnode_partition: BTreeMap<u32, u32>,
/// `TableId` -> `TableSchema`
/// Schemas in `table_schemas` are at least as new as the one used to create `input_ssts`.
/// For a table with schema existing in `table_schemas`, its columns not in `table_schemas` but in `input_ssts` can be safely dropped.
pub table_schemas: HashMap<u32, TableSchema>,
}

pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter {
Expand Down
48 changes: 42 additions & 6 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ use bytes::Bytes;
use futures::{stream, FutureExt, StreamExt};
use itertools::Itertools;
use risingwave_common::util::epoch::is_max_epoch;
use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
use risingwave_hummock_sdk::compact::{
compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
};
use risingwave_hummock_sdk::key::{FullKey, PointRange};
use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, TableStatsMap};
use risingwave_hummock_sdk::{can_concat, EpochWithGap, HummockEpoch};
use risingwave_hummock_sdk::{can_concat, EpochWithGap, HummockEpoch, HummockSstableObjectId};
use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType};
use tokio::sync::oneshot::Receiver;
Expand Down Expand Up @@ -105,6 +106,11 @@ impl CompactorRunner {
|| task.target_level == task.base_level,
use_block_based_filter,
table_vnode_partition: task.table_vnode_partition.clone(),
table_schemas: task
.table_schemas
.iter()
.map(|(k, v)| (*k, v.clone()))
.collect(),
},
object_id_getter,
);
Expand Down Expand Up @@ -699,6 +705,7 @@ where
let mut last_table_id = None;
let mut compaction_statistics = CompactionStatistics::default();
let mut progress_key_num: u64 = 0;
let mut skip_schema_check: HashSet<(HummockSstableObjectId, u32)> = HashSet::default();
const PROGRESS_KEY_INTERVAL: u64 = 100;
while iter.is_valid() {
progress_key_num += 1;
Expand All @@ -720,6 +727,7 @@ where

let epoch = iter_key.epoch_with_gap.pure_epoch();
let value = iter.value();
let value_meta = iter.value_meta();
if is_new_user_key {
if !max_key.is_empty() && iter_key >= max_key {
break;
Expand Down Expand Up @@ -835,11 +843,39 @@ where
is_new_user_key = false;
}

// Don't allow two SSTs to share same user key
sst_builder
.add_full_key(iter_key, value, is_new_user_key)
.verbose_instrument_await("add_full_key")
.await?;
// May drop stale columns
let check_table_id = last_key.user_key.table_id.table_id;
let mut is_value_rewritten = false;
if let HummockValue::Put(v) = value
&& let Some(object_id) = value_meta
&& !skip_schema_check.contains(&(object_id, check_table_id))
&& let Some(schema) = task_config
.table_schemas
.get(&check_table_id) {
match try_drop_invalid_columns(v, &schema.column_ids) {
None => {
// Under the assumption that all values in the same (table, SSTable) group should share the same schema,
// if one value drops no columns during a compaction, no need to check other values in the same group.
skip_schema_check.insert((object_id, check_table_id));
}
Some(new_value) => {
is_value_rewritten = true;
let new_put = HummockValue::put(new_value.as_slice());
sst_builder
.add_full_key(iter_key, new_put, is_new_user_key)
.verbose_instrument_await("add_rewritten_full_key")
.await?;
}
}
}

if !is_value_rewritten {
// Don't allow two SSTs to share same user key
sst_builder
.add_full_key(iter_key, value, is_new_user_key)
.verbose_instrument_await("add_full_key")
.await?;
}

iter.next().verbose_instrument_await("iter_next").await?;
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/compactor/fast_compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ impl CompactorRunner {
is_target_l0_or_lbase: task.target_level == 0 || task.target_level == task.base_level,
table_vnode_partition: task.table_vnode_partition.clone(),
use_block_based_filter: true,
table_schemas: Default::default(),
};
let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());

Expand Down
10 changes: 10 additions & 0 deletions src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,16 @@ impl HummockIterator for ConcatSstableIterator {
fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
stats.add(&self.stats)
}

fn value_meta(&self) -> Option<u64> {
let object_id = self
.sstable_iter
.as_ref()
.expect("no table iter")
.sstable_info
.object_id;
Some(object_id)
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ impl SharedBufferCompactRunner {
is_target_l0_or_lbase: true,
table_vnode_partition,
use_block_based_filter,
table_schemas: Default::default(),
},
object_id_getter,
);
Expand Down
7 changes: 7 additions & 0 deletions src/storage/src/hummock/iterator/concat_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,11 @@ impl<TI: SstableIteratorType> HummockIterator for ConcatIteratorInner<TI> {
iter.collect_local_statistic(stats);
}
}

fn value_meta(&self) -> Option<u64> {
self.sstable_iter
.as_ref()
.expect("no table iter")
.value_meta()
}
}
4 changes: 4 additions & 0 deletions src/storage/src/hummock/iterator/forward_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ mod test {
}

fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {}

fn value_meta(&self) -> Option<u64> {
None
}
}

#[tokio::test]
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/hummock/iterator/merge_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,4 +403,8 @@ where
fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
self.collect_local_statistic_impl(stats);
}

fn value_meta(&self) -> Option<u64> {
self.heap.peek().expect("no inner iter").iter.value_meta()
}
}
Loading

0 comments on commit 86095b3

Please sign in to comment.