Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): clear up dropped column #13952

Merged
merged 13 commits into from
Mar 8, 2024
6 changes: 6 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ message TableOption {
uint32 retention_seconds = 1;
}

message TableSchema {
repeated int32 column_ids = 1;
}

message CompactTask {
enum TaskStatus {
UNSPECIFIED = 0;
Expand Down Expand Up @@ -345,6 +349,8 @@ message CompactTask {
// The table watermark of any table id. In compaction we only use the table watermarks on safe epoch,
// so we only need to include the table watermarks on safe epoch to reduce the size of metadata.
map<uint32, TableWatermarks> table_watermarks = 24;
// The table schemas that are at least as new as the one used to create `input_ssts`.
map<uint32, TableSchema> table_schemas = 25;
}

message LevelHandler {
Expand Down
108 changes: 107 additions & 1 deletion src/common/src/util/value_encoding/column_aware_row_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! We have a `Serializer` and a `Deserializer` for each schema of `Row`, which can be reused
//! until schema changes

use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::sync::Arc;

use bitflags::bitflags;
Expand Down Expand Up @@ -105,6 +105,25 @@ impl RowEncoding {
.expect("should encode at least one column");
self.set_offsets(&offset_usize, max_offset);
}

// TODO: Avoid duplicated code. `encode_slice` is the same as `encode` except it doesn't require column type.
fn encode_slice<'a>(&mut self, datum_refs: impl Iterator<Item = Option<&'a [u8]>>) {
debug_assert!(
self.buf.is_empty(),
"should not encode one RowEncoding object multiple times."
);
let mut offset_usize = vec![];
for datum in datum_refs {
offset_usize.push(self.buf.len());
if let Some(v) = datum {
self.buf.put_slice(v);
}
}
let max_offset = *offset_usize
.last()
.expect("should encode at least one column");
self.set_offsets(&offset_usize, max_offset);
}
}

/// Column-Aware `Serializer` holds schema related information, and shall be
Expand Down Expand Up @@ -268,3 +287,90 @@ impl ValueRowDeserializer for ColumnAwareSerde {
self.deserializer.deserialize(encoded_bytes)
}
}

/// Deserializes row `encoded_bytes`, drops columns not in `valid_column_ids`, serializes and returns.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we don't do "real" deserialization here. We're only working with the serialized bytes and drop the unnecessary ones then rearrange them.

/// If no column is dropped, returns None.
// TODO: Avoid duplicated code. The current code combines`Serializer` and `Deserializer` with unavailable parameter removed, e.g. `Deserializer::schema`.
pub fn try_drop_invalid_columns(
mut encoded_bytes: &[u8],
valid_column_ids: &[i32],
) -> Option<Vec<u8>> {
let valid_column_ids: HashSet<i32> = valid_column_ids.iter().copied().collect();
let flag = Flag::from_bits(encoded_bytes.get_u8()).expect("should be a valid flag");
let datum_num = encoded_bytes.get_u32_le() as usize;
let mut is_column_dropped = false;
let mut encoded_bytes_copy = encoded_bytes;
for _ in 0..datum_num {
let this_id = encoded_bytes_copy.get_i32_le();
if !valid_column_ids.contains(&this_id) {
is_column_dropped = true;
break;
}
}
if !is_column_dropped {
return None;
}

// Slow path that drops columns. Should be rare.
let offset_bytes = match flag - Flag::EMPTY {
Flag::OFFSET8 => 1,
Flag::OFFSET16 => 2,
Flag::OFFSET32 => 4,
_ => panic!("invalid flag {}", flag.bits()),
};
let offsets_start_idx = 4 * datum_num;
let data_start_idx = offsets_start_idx + datum_num * offset_bytes;
let offsets = &encoded_bytes[offsets_start_idx..data_start_idx];
let data = &encoded_bytes[data_start_idx..];
let mut datums: Vec<Option<&[u8]>> = Vec::with_capacity(valid_column_ids.len());
let mut column_ids = Vec::with_capacity(valid_column_ids.len());
for i in 0..datum_num {
let this_id = encoded_bytes.get_i32_le();
if valid_column_ids.contains(&this_id) {
column_ids.push(this_id);
let this_offset_start_idx = i * offset_bytes;
let mut this_offset_slice =
&offsets[this_offset_start_idx..(this_offset_start_idx + offset_bytes)];
let this_offset = deserialize_width(offset_bytes, &mut this_offset_slice);
let data = if i + 1 < datum_num {
let mut next_offset_slice = &offsets[(this_offset_start_idx + offset_bytes)
..(this_offset_start_idx + 2 * offset_bytes)];
let next_offset = deserialize_width(offset_bytes, &mut next_offset_slice);
if this_offset == next_offset {
None
} else {
let data_slice = &data[this_offset..next_offset];
Some(data_slice)
}
} else if this_offset == data.len() {
None
} else {
let data_slice = &data[this_offset..];
Some(data_slice)
};
datums.push(data);
}
}
if column_ids.is_empty() {
// According to `RowEncoding::encode`, at least one column is required.
return None;
}

let mut encoding = RowEncoding::new();
encoding.encode_slice(datums.into_iter());
let mut encoded_column_ids = Vec::with_capacity(column_ids.len() * 4);
let datum_num = column_ids.len() as u32;
for id in column_ids {
encoded_column_ids.put_i32_le(id);
}
let mut row_bytes = Vec::with_capacity(
5 + encoded_column_ids.len() + encoding.offsets.len() + encoding.buf.len(), /* 5 comes from u8+u32 */
);
row_bytes.put_u8(encoding.flag.bits());
row_bytes.put_u32_le(datum_num);
row_bytes.extend(&encoded_column_ids);
row_bytes.extend(&encoding.offsets);
row_bytes.extend(&encoding.buf);

Some(row_bytes)
}
12 changes: 10 additions & 2 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ use risingwave_pb::hummock::{
version_update_payload, CompactTask, CompactTaskAssignment, CompactionConfig, GroupDelta,
HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, HummockVersion,
HummockVersionCheckpoint, HummockVersionDelta, HummockVersionDeltas, HummockVersionStats,
IntraLevelDelta, SstableInfo, SubscribeCompactionEventRequest, TableOption, TableWatermarks,
IntraLevelDelta, SstableInfo, SubscribeCompactionEventRequest, TableOption, TableSchema,
TableWatermarks,
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
Expand Down Expand Up @@ -1128,11 +1129,18 @@ impl HummockManager {
anyhow::anyhow!("failpoint metastore error")
)));

while let Some(task) = self
while let Some(mut task) = self
.get_compact_task_impl(compaction_group_id, selector)
.await?
{
if let TaskStatus::Pending = task.task_status() {
task.table_schemas = self
.catalog_manager
.get_versioned_table_schemas(&task.existing_table_ids)
.await
.into_iter()
.map(|(table_id, column_ids)| (table_id, TableSchema { column_ids }))
.collect();
return Ok(Some(task));
}
assert!(
Expand Down
25 changes: 25 additions & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3210,6 +3210,31 @@ impl CatalogManager {
.map(|table| table.id)
.collect()
}

/// Returns column ids of `table_ids` that is versioned.
/// Being versioned implies using `ColumnAwareSerde`.
pub async fn get_versioned_table_schemas(
&self,
table_ids: &[TableId],
) -> HashMap<TableId, Vec<i32>> {
let guard = self.core.lock().await;
table_ids
.iter()
.filter_map(|table_id| {
if let Some(t) = guard.database.tables.get(table_id) && t.version.is_some() {
let ret = (
t.id,
t.columns
.iter()
.map(|c| c.column_desc.as_ref().unwrap().column_id)
.collect_vec(),
);
return Some(ret);
}
None
})
.collect()
}
}

// User related methods
Expand Down
1 change: 1 addition & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.type_attribute("hummock.LevelHandler", "#[derive(Eq)]")
.type_attribute("hummock.TableOption", "#[derive(Eq)]")
.type_attribute("hummock.InputLevel", "#[derive(Eq)]")
.type_attribute("hummock.TableSchema", "#[derive(Eq)]")
.type_attribute("hummock.CompactTask", "#[derive(Eq)]")
.type_attribute("hummock.TableWatermarks", "#[derive(Eq)]")
.type_attribute("hummock.VnodeWatermark", "#[derive(Eq)]")
Expand Down
10 changes: 8 additions & 2 deletions src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::marker::PhantomData;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
Expand All @@ -24,7 +24,9 @@ use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_hummock_sdk::table_stats::TableStatsMap;
use risingwave_hummock_sdk::{EpochWithGap, KeyComparator};
use risingwave_pb::hummock::{compact_task, CompactTask, KeyRange as KeyRange_vec, SstableInfo};
use risingwave_pb::hummock::{
compact_task, CompactTask, KeyRange as KeyRange_vec, SstableInfo, TableSchema,
};
use tokio::time::Instant;

pub use super::context::CompactorContext;
Expand Down Expand Up @@ -122,6 +124,10 @@ pub struct TaskConfig {
pub use_block_based_filter: bool,

pub table_vnode_partition: BTreeMap<u32, u32>,
/// `TableId` -> `TableSchema`
/// Schemas in `table_schemas` are at least as new as the one used to create `input_ssts`.
/// For a table with schema existing in `table_schemas`, its columns not in `table_schemas` but in `input_ssts` can be safely dropped.
pub table_schemas: HashMap<u32, TableSchema>,
}

pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter {
Expand Down
48 changes: 42 additions & 6 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ use bytes::Bytes;
use futures::{stream, FutureExt, StreamExt};
use itertools::Itertools;
use risingwave_common::util::epoch::is_max_epoch;
use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
use risingwave_hummock_sdk::compact::{
compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
};
use risingwave_hummock_sdk::key::{FullKey, PointRange};
use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, TableStatsMap};
use risingwave_hummock_sdk::{can_concat, EpochWithGap, HummockEpoch};
use risingwave_hummock_sdk::{can_concat, EpochWithGap, HummockEpoch, HummockSstableObjectId};
use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType};
use tokio::sync::oneshot::Receiver;
Expand Down Expand Up @@ -106,6 +107,11 @@ impl CompactorRunner {
|| task.target_level == task.base_level,
use_block_based_filter,
table_vnode_partition: task.table_vnode_partition.clone(),
table_schemas: task
.table_schemas
.iter()
.map(|(k, v)| (*k, v.clone()))
.collect(),
},
object_id_getter,
);
Expand Down Expand Up @@ -706,6 +712,7 @@ where
let mut last_table_id = None;
let mut compaction_statistics = CompactionStatistics::default();
let mut progress_key_num: u64 = 0;
let mut skip_schema_check: HashSet<(HummockSstableObjectId, u32)> = HashSet::default();
const PROGRESS_KEY_INTERVAL: u64 = 100;
while iter.is_valid() {
progress_key_num += 1;
Expand All @@ -727,6 +734,7 @@ where

let epoch = iter_key.epoch_with_gap.pure_epoch();
let value = iter.value();
let value_meta = iter.value_meta();
if is_new_user_key {
if !max_key.is_empty() && iter_key >= max_key {
break;
Expand Down Expand Up @@ -842,11 +850,39 @@ where
is_new_user_key = false;
}

// Don't allow two SSTs to share same user key
sst_builder
.add_full_key(iter_key, value, is_new_user_key)
.verbose_instrument_await("add_full_key")
.await?;
// May drop stale columns
let check_table_id = last_key.user_key.table_id.table_id;
let mut is_value_rewritten = false;
if let HummockValue::Put(v) = value
&& let Some(object_id) = value_meta
&& !skip_schema_check.contains(&(object_id, check_table_id))
&& let Some(schema) = task_config
.table_schemas
.get(&check_table_id) {
match try_drop_invalid_columns(v, &schema.column_ids) {
None => {
// Under the assumption that all values in the same (table, SSTable) group should share the same schema,
// if one value drops no columns during a compaction, no need to check other values in the same group.
skip_schema_check.insert((object_id, check_table_id));
}
Some(new_value) => {
is_value_rewritten = true;
let new_put = HummockValue::put(new_value.as_slice());
sst_builder
.add_full_key(iter_key, new_put, is_new_user_key)
.verbose_instrument_await("add_rewritten_full_key")
.await?;
}
}
}

if !is_value_rewritten {
// Don't allow two SSTs to share same user key
sst_builder
.add_full_key(iter_key, value, is_new_user_key)
.verbose_instrument_await("add_full_key")
.await?;
}

iter.next().verbose_instrument_await("iter_next").await?;
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/compactor/fast_compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ impl CompactorRunner {
is_target_l0_or_lbase: task.target_level == 0 || task.target_level == task.base_level,
table_vnode_partition: task.table_vnode_partition.clone(),
use_block_based_filter: true,
table_schemas: Default::default(),
};
let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone());

Expand Down
10 changes: 10 additions & 0 deletions src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,16 @@ impl HummockIterator for ConcatSstableIterator {
fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
stats.add(&self.stats)
}

fn value_meta(&self) -> Option<u64> {
let object_id = self
.sstable_iter
.as_ref()
.expect("no table iter")
.sstable_info
.object_id;
Some(object_id)
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ impl SharedBufferCompactRunner {
is_target_l0_or_lbase: true,
table_vnode_partition,
use_block_based_filter,
table_schemas: Default::default(),
},
object_id_getter,
);
Expand Down
7 changes: 7 additions & 0 deletions src/storage/src/hummock/iterator/concat_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,11 @@ impl<TI: SstableIteratorType> HummockIterator for ConcatIteratorInner<TI> {
iter.collect_local_statistic(stats);
}
}

fn value_meta(&self) -> Option<u64> {
self.sstable_iter
.as_ref()
.expect("no table iter")
.value_meta()
}
}
4 changes: 4 additions & 0 deletions src/storage/src/hummock/iterator/forward_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ mod test {
}

fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {}

fn value_meta(&self) -> Option<u64> {
None
}
}

#[tokio::test]
Expand Down
Loading
Loading