Skip to content

Commit

Permalink
[Enhancement] Lake pk table comaction optimization (StarRocks#34261)
Browse files Browse the repository at this point in the history
Signed-off-by: luohaha <[email protected]>
  • Loading branch information
luohaha authored Nov 10, 2023
1 parent 6a0dcc3 commit dd4b209
Show file tree
Hide file tree
Showing 14 changed files with 229 additions and 99 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ CONF_Int32(update_compaction_per_tablet_min_interval_seconds, "120"); // 2min
CONF_mInt64(max_update_compaction_num_singleton_deltas, "1000");
CONF_mInt64(update_compaction_size_threshold, "268435456");
CONF_mInt64(update_compaction_result_bytes, "1073741824");
// This config controls the io amp ratio of delvec files.
CONF_mInt32(update_compaction_delvec_file_io_amp_ratio, "2");

CONF_mInt32(repair_compaction_interval_seconds, "600"); // 10 min
CONF_Int32(manual_compaction_threads, "4");
Expand Down
93 changes: 60 additions & 33 deletions be/src/storage/lake/compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,35 +86,38 @@ struct RowsetStat {

class RowsetCandidate {
public:
RowsetCandidate(RowsetMetadataPtr rp, const RowsetStat& rs) : rowset_meta_ptr(std::move(rp)), stat(rs) {}
double calc_del_bytes() const { return (double)stat.bytes * (double)stat.num_dels / (double)stat.num_rows; }
// The goal of lake primary table compaction:
// 1. clean up deleted bytes.
// 2. merge rowsets with bigger compaction score
// 3. merge small rowsets to bigger rowset.
// so we pick rowset to compact by this logic:
// First, pick out rowset with more deleted bytes.
// Second, pick out rowset with bigger compaction score
// Finally, pick out rowset with less bytes.
bool operator<(const RowsetCandidate& other) const {
if (calc_del_bytes() < other.calc_del_bytes()) {
return true;
} else if (calc_del_bytes() > other.calc_del_bytes()) {
return false;
} else if (rowset_compaction_score() < other.rowset_compaction_score()) {
return true;
} else if (rowset_compaction_score() > other.rowset_compaction_score()) {
return false;
} else {
// may happen when deleted rows is zero
return stat.bytes > other.stat.bytes;
RowsetCandidate(RowsetMetadataPtr rp, const RowsetStat& rs) : rowset_meta_ptr(std::move(rp)), stat(rs) {
calculate_score();
}
// The goal of lake primary table compaction is to reduce the overhead of reading data.
// So the first thing we need to do is quantify the overhead of reading the data.
// In object storage, we can use this to define overhead:
//
// OverHead (score) = IO count / Read bytes
//
// Same bytes, if we use more io to fetch it, that means more overhead.
// And in one rowset, the IO count is equal overlapped segment count plus their delvec files.
double io_count() const {
// rowset_meta_ptr->segments_size() could be zero here, so make sure this >= 1 using max.
double cnt = rowset_meta_ptr->overlapped() ? std::max(rowset_meta_ptr->segments_size(), 1) : 1;
if (stat.num_dels > 0) {
// if delvec file exist, that means we need to read segment files and delvec files both
// And update_compaction_delvec_file_io_ratio control the io amp ratio of delvec files, default is 2.
// Bigger update_compaction_delvec_file_io_amp_ratio means high priority about merge rowset with delvec files.
cnt *= config::update_compaction_delvec_file_io_amp_ratio;
}
return cnt;
}
double rowset_compaction_score() const {
return rowset_meta_ptr->overlapped() ? rowset_meta_ptr->segments_size() : 1;
double delete_bytes() const {
if (stat.num_rows == 0) return 0.0;
return (double)stat.bytes * ((double)stat.num_dels / (double)stat.num_rows);
}
double read_bytes() const { return (double)stat.bytes - delete_bytes() + 1; }
void calculate_score() { score = (io_count() * 1024 * 1024) / read_bytes(); }
bool operator<(const RowsetCandidate& other) const { return score < other.score; }
RowsetMetadataPtr rowset_meta_ptr;
RowsetStat stat;
double score;
};

class PrimaryCompactionPolicy : public CompactionPolicy {
Expand All @@ -125,20 +128,31 @@ class PrimaryCompactionPolicy : public CompactionPolicy {
~PrimaryCompactionPolicy() override = default;

StatusOr<std::vector<RowsetPtr>> pick_rowsets() override;
StatusOr<std::vector<RowsetPtr>> pick_rowsets(const std::shared_ptr<const TabletMetadataPB>& tablet_metadata);
StatusOr<std::vector<RowsetPtr>> pick_rowsets(const std::shared_ptr<const TabletMetadataPB>& tablet_metadata,
std::vector<bool>* has_dels);

private:
int64_t _get_data_size(const std::shared_ptr<const TabletMetadataPB>& tablet_metadata) {
int size = 0;
for (const auto& rowset : tablet_metadata->rowsets()) {
size += rowset.data_size();
}
return size;
}
};

StatusOr<std::vector<RowsetPtr>> PrimaryCompactionPolicy::pick_rowsets() {
return pick_rowsets(_tablet_metadata);
return pick_rowsets(_tablet_metadata, nullptr);
}

StatusOr<std::vector<RowsetPtr>> PrimaryCompactionPolicy::pick_rowsets(
const std::shared_ptr<const TabletMetadataPB>& tablet_metadata) {
const std::shared_ptr<const TabletMetadataPB>& tablet_metadata, std::vector<bool>* has_dels) {
std::vector<RowsetPtr> input_rowsets;
UpdateManager* mgr = _tablet_mgr->update_mgr();
std::priority_queue<RowsetCandidate> rowset_queue;
const auto tablet_id = tablet_metadata->id();
const auto tablet_version = tablet_metadata->version();
const auto tablet_data_size = _get_data_size(tablet_metadata);
for (const auto& rowset_pb : tablet_metadata->rowsets()) {
RowsetStat stat;
stat.num_rows = rowset_pb.num_rows();
Expand All @@ -152,14 +166,19 @@ StatusOr<std::vector<RowsetPtr>> PrimaryCompactionPolicy::pick_rowsets(
std::stringstream input_infos;
while (!rowset_queue.empty()) {
const auto& rowset_candidate = rowset_queue.top();
cur_compaction_result_bytes += rowset_candidate.stat.bytes;
if (input_rowsets.size() > 0 && cur_compaction_result_bytes > config::update_compaction_result_bytes * 2) {
cur_compaction_result_bytes += rowset_candidate.read_bytes();
if (input_rowsets.size() > 0 &&
cur_compaction_result_bytes > std::max(config::update_compaction_result_bytes * 2, tablet_data_size / 2)) {
break;
}
input_rowsets.emplace_back(std::make_shared<Rowset>(tablet, std::move(rowset_candidate.rowset_meta_ptr)));
if (has_dels != nullptr) {
has_dels->push_back(rowset_candidate.delete_bytes() > 0);
}
input_infos << input_rowsets.back()->id() << "|";

if (cur_compaction_result_bytes > config::update_compaction_result_bytes ||
// Allow to merge half of this tablet
if (cur_compaction_result_bytes > std::max(config::update_compaction_result_bytes, tablet_data_size / 2) ||
input_rowsets.size() >= config::max_update_compaction_num_singleton_deltas) {
break;
}
Expand All @@ -174,10 +193,18 @@ StatusOr<std::vector<RowsetPtr>> PrimaryCompactionPolicy::pick_rowsets(
StatusOr<uint32_t> primary_compaction_score_by_policy(const std::shared_ptr<const TabletMetadataPB>& metadata) {
auto tablet_mgr = ExecEnv::GetInstance()->lake_tablet_manager();
PrimaryCompactionPolicy policy(tablet_mgr, metadata);
ASSIGN_OR_RETURN(auto pick_rowsets, policy.pick_rowsets());
std::vector<bool> has_dels;
ASSIGN_OR_RETURN(auto pick_rowsets, policy.pick_rowsets(metadata, &has_dels));
uint32_t segment_num_score = 0;
for (const auto& pick_rowset : pick_rowsets) {
segment_num_score += pick_rowset->is_overlapped() ? pick_rowset->num_segments() : 1;
for (int i = 0; i < pick_rowsets.size(); i++) {
const auto& pick_rowset = pick_rowsets[i];
const bool has_del = has_dels[i];
auto current_score = pick_rowset->is_overlapped() ? pick_rowset->num_segments() : 1;
if (has_del) {
// if delvec file exist, expand score by config.
current_score *= config::update_compaction_delvec_file_io_amp_ratio;
}
segment_num_score += current_score;
}
return segment_num_score;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,11 +484,11 @@ Status DeltaWriterImpl::finish(DeltaWriter::FinishMode mode) {
}
}
}
RETURN_IF_ERROR(tablet.put_txn_log(txn_log));
if (_tablet_schema->keys_type() == KeysType::PRIMARY_KEYS) {
// preload update state here to minimaze the cost when publishing.
tablet.update_mgr()->preload_update_state(*txn_log, &tablet);
}
RETURN_IF_ERROR(tablet.put_txn_log(std::move(txn_log)));
return Status::OK();
}

Expand Down
9 changes: 7 additions & 2 deletions be/src/storage/lake/horizontal_compaction_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "storage/lake/tablet_reader.h"
#include "storage/lake/tablet_writer.h"
#include "storage/lake/txn_log.h"
#include "storage/lake/update_manager.h"
#include "storage/rowset/column_reader.h"
#include "storage/storage_engine.h"
#include "storage/tablet_reader_params.h"
Expand Down Expand Up @@ -104,8 +105,12 @@ Status HorizontalCompactionTask::execute(Progress* progress, CancelFunc cancel_f
op_compaction->mutable_output_rowset()->set_num_rows(writer->num_rows());
op_compaction->mutable_output_rowset()->set_data_size(writer->data_size());
op_compaction->mutable_output_rowset()->set_overlapped(false);
Status st = _tablet.put_txn_log(std::move(txn_log));
return st;
RETURN_IF_ERROR(_tablet.put_txn_log(txn_log));
if (tablet_schema->keys_type() == KeysType::PRIMARY_KEYS) {
// preload primary key table's compaction state
_tablet.update_mgr()->preload_compaction_state(*txn_log, _tablet, tablet_schema);
}
return Status::OK();
}

StatusOr<int32_t> HorizontalCompactionTask::calculate_chunk_size() {
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/lake/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class Rowset {
[[nodiscard]] Status load_segments(std::vector<SegmentPtr>* segments, bool fill_data_cache,
bool fill_metadata_cache);

int64_t tablet_id() const { return _tablet.id(); }

private:
Tablet _tablet;
RowsetMetadataPtr _rowset_metadata;
Expand Down
8 changes: 4 additions & 4 deletions be/src/storage/lake/txn_log_applier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
RETURN_IF_ERROR(apply_write_log(log.op_write(), log.txn_id()));
}
if (log.has_op_compaction()) {
RETURN_IF_ERROR(apply_compaction_log(log.op_compaction()));
RETURN_IF_ERROR(apply_compaction_log(log.op_compaction(), log.txn_id()));
}
if (log.has_op_schema_change()) {
RETURN_IF_ERROR(apply_schema_change_log(log.op_schema_change()));
Expand Down Expand Up @@ -118,7 +118,7 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
&_builder, _base_version);
}

Status apply_compaction_log(const TxnLogPB_OpCompaction& op_compaction) {
Status apply_compaction_log(const TxnLogPB_OpCompaction& op_compaction, int64_t txn_id) {
// get lock to avoid gc
_tablet.update_mgr()->lock_shard_pk_index_shard(_tablet.id());
DeferOp defer([&]() { _tablet.update_mgr()->unlock_shard_pk_index_shard(_tablet.id()); });
Expand All @@ -133,8 +133,8 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
DCHECK(!op_compaction.has_output_rowset() || op_compaction.output_rowset().num_rows() == 0);
return Status::OK();
}
return _tablet.update_mgr()->publish_primary_compaction(op_compaction, *_metadata, _tablet, _index_entry,
&_builder, _base_version);
return _tablet.update_mgr()->publish_primary_compaction(op_compaction, txn_id, *_metadata, _tablet,
_index_entry, &_builder, _base_version);
}

Status apply_schema_change_log(const TxnLogPB_OpSchemaChange& op_schema_change) {
Expand Down
36 changes: 20 additions & 16 deletions be/src/storage/lake/update_compaction_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,22 @@

namespace starrocks::lake {

CompactionState::CompactionState(Rowset* rowset, UpdateManager* update_manager) {
if (rowset->num_segments() > 0) {
pk_cols.resize(rowset->num_segments());
}
_update_manager = update_manager;
}

CompactionState::~CompactionState() {
_update_manager->compaction_state_mem_tracker()->release(_memory_usage);
if (_update_manager != nullptr) {
_update_manager->compaction_state_mem_tracker()->release(_memory_usage);
}
}

Status CompactionState::load_segments(Rowset* rowset, const TabletSchemaCSPtr& tablet_schema, uint32_t segment_id) {
Status CompactionState::load_segments(Rowset* rowset, UpdateManager* update_manager,
const TabletSchemaCSPtr& tablet_schema, uint32_t segment_id) {
TRACE_COUNTER_SCOPE_LATENCY_US("load_segments_latency_us");
if (pk_cols.empty() && rowset->num_segments() > 0) {
pk_cols.resize(rowset->num_segments());
} else {
DCHECK(pk_cols.size() == rowset->num_segments());
}
_update_manager = update_manager;
_tablet_id = rowset->tablet_id();
if (segment_id >= pk_cols.size() && pk_cols.size() != 0) {
std::string msg = strings::Substitute("Error segment id: $0 vs $1", segment_id, pk_cols.size());
LOG(WARNING) << msg;
Expand All @@ -63,19 +66,16 @@ Status CompactionState::_load_segments(Rowset* rowset, const TabletSchemaCSPtr&
CHECK(PrimaryKeyEncoder::create_column(pkey_schema, &pk_column, true).ok());

OlapReaderStatistics stats;
auto res = rowset->get_each_segment_iterator(pkey_schema, &stats);
if (!res.ok()) {
return res.status();
if (_segment_iters.empty()) {
ASSIGN_OR_RETURN(_segment_iters, rowset->get_each_segment_iterator(pkey_schema, &stats));
}

auto& itrs = res.value();
CHECK_EQ(itrs.size(), rowset->num_segments());
CHECK_EQ(_segment_iters.size(), rowset->num_segments());

// only hold pkey, so can use larger chunk size
auto chunk_shared_ptr = ChunkHelper::new_chunk(pkey_schema, config::vector_chunk_size);
auto chunk = chunk_shared_ptr.get();

auto itr = itrs[segment_id].get();
auto itr = _segment_iters[segment_id].get();
if (itr == nullptr) {
return Status::OK();
}
Expand Down Expand Up @@ -110,4 +110,8 @@ void CompactionState::release_segments(uint32_t segment_id) {
pk_cols[segment_id]->reset_column();
}

std::string CompactionState::to_string() const {
return strings::Substitute("CompactionState tablet:$0", _tablet_id);
}

} // namespace starrocks::lake
16 changes: 13 additions & 3 deletions be/src/storage/lake/update_compaction_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,36 @@ class UpdateManager;

class CompactionState {
public:
CompactionState(Rowset* rowset, UpdateManager* update_manager);
CompactionState() {}
~CompactionState();

CompactionState(const CompactionState&) = delete;
CompactionState& operator=(const CompactionState&) = delete;

Status load_segments(Rowset* rowset, const TabletSchemaCSPtr& tablet_schema, uint32_t segment_id);
Status load_segments(Rowset* rowset, UpdateManager* update_manager, const TabletSchemaCSPtr& tablet_schema,
uint32_t segment_id);
void release_segments(uint32_t segment_id);

std::size_t memory_usage() const { return _memory_usage; }

std::string to_string() const;

std::vector<ColumnUniquePtr> pk_cols;

private:
Status _load_segments(Rowset* rowset, const TabletSchemaCSPtr& tablet_schema, uint32_t segment_id);

UpdateManager* _update_manager;
UpdateManager* _update_manager = nullptr;
size_t _memory_usage = 0;
std::vector<ChunkIteratorPtr> _segment_iters;
int64_t _tablet_id = 0;
};

inline std::ostream& operator<<(std::ostream& os, const CompactionState& o) {
os << o.to_string();
return os;
}

} // namespace lake

} // namespace starrocks
Loading

0 comments on commit dd4b209

Please sign in to comment.