From dd4b2092696d890a4c856704bcd4176207e038be Mon Sep 17 00:00:00 2001 From: Yixin Luo <18810541851@163.com> Date: Thu, 9 Nov 2023 23:53:48 -0800 Subject: [PATCH] [Enhancement] Lake pk table comaction optimization (#34261) Signed-off-by: luohaha <18810541851@163.com> --- be/src/common/config.h | 2 + be/src/storage/lake/compaction_policy.cpp | 93 ++++++++++++------- be/src/storage/lake/delta_writer.cpp | 2 +- .../lake/horizontal_compaction_task.cpp | 9 +- be/src/storage/lake/rowset.h | 2 + be/src/storage/lake/txn_log_applier.cpp | 8 +- .../storage/lake/update_compaction_state.cpp | 36 +++---- be/src/storage/lake/update_compaction_state.h | 16 +++- be/src/storage/lake/update_manager.cpp | 79 +++++++++++++--- be/src/storage/lake/update_manager.h | 13 ++- .../storage/lake/vertical_compaction_task.cpp | 7 +- .../lake/primary_key_compaction_task_test.cpp | 43 ++++++--- .../storage/lake/primary_key_publish_test.cpp | 16 ++-- be/test/storage/lake/test_util.h | 2 +- 14 files changed, 229 insertions(+), 99 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 44b5ef33c0c64..ccd81ab841dfa 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -294,6 +294,8 @@ CONF_Int32(update_compaction_per_tablet_min_interval_seconds, "120"); // 2min CONF_mInt64(max_update_compaction_num_singleton_deltas, "1000"); CONF_mInt64(update_compaction_size_threshold, "268435456"); CONF_mInt64(update_compaction_result_bytes, "1073741824"); +// This config controls the io amp ratio of delvec files. +CONF_mInt32(update_compaction_delvec_file_io_amp_ratio, "2"); CONF_mInt32(repair_compaction_interval_seconds, "600"); // 10 min CONF_Int32(manual_compaction_threads, "4"); diff --git a/be/src/storage/lake/compaction_policy.cpp b/be/src/storage/lake/compaction_policy.cpp index 0aecce64617b1..504b81dc315d1 100644 --- a/be/src/storage/lake/compaction_policy.cpp +++ b/be/src/storage/lake/compaction_policy.cpp @@ -86,35 +86,38 @@ struct RowsetStat { class RowsetCandidate { public: - RowsetCandidate(RowsetMetadataPtr rp, const RowsetStat& rs) : rowset_meta_ptr(std::move(rp)), stat(rs) {} - double calc_del_bytes() const { return (double)stat.bytes * (double)stat.num_dels / (double)stat.num_rows; } - // The goal of lake primary table compaction: - // 1. clean up deleted bytes. - // 2. merge rowsets with bigger compaction score - // 3. merge small rowsets to bigger rowset. - // so we pick rowset to compact by this logic: - // First, pick out rowset with more deleted bytes. - // Second, pick out rowset with bigger compaction score - // Finally, pick out rowset with less bytes. - bool operator<(const RowsetCandidate& other) const { - if (calc_del_bytes() < other.calc_del_bytes()) { - return true; - } else if (calc_del_bytes() > other.calc_del_bytes()) { - return false; - } else if (rowset_compaction_score() < other.rowset_compaction_score()) { - return true; - } else if (rowset_compaction_score() > other.rowset_compaction_score()) { - return false; - } else { - // may happen when deleted rows is zero - return stat.bytes > other.stat.bytes; + RowsetCandidate(RowsetMetadataPtr rp, const RowsetStat& rs) : rowset_meta_ptr(std::move(rp)), stat(rs) { + calculate_score(); + } + // The goal of lake primary table compaction is to reduce the overhead of reading data. + // So the first thing we need to do is quantify the overhead of reading the data. + // In object storage, we can use this to define overhead: + // + // OverHead (score) = IO count / Read bytes + // + // Same bytes, if we use more io to fetch it, that means more overhead. + // And in one rowset, the IO count is equal overlapped segment count plus their delvec files. + double io_count() const { + // rowset_meta_ptr->segments_size() could be zero here, so make sure this >= 1 using max. + double cnt = rowset_meta_ptr->overlapped() ? std::max(rowset_meta_ptr->segments_size(), 1) : 1; + if (stat.num_dels > 0) { + // if delvec file exist, that means we need to read segment files and delvec files both + // And update_compaction_delvec_file_io_ratio control the io amp ratio of delvec files, default is 2. + // Bigger update_compaction_delvec_file_io_amp_ratio means high priority about merge rowset with delvec files. + cnt *= config::update_compaction_delvec_file_io_amp_ratio; } + return cnt; } - double rowset_compaction_score() const { - return rowset_meta_ptr->overlapped() ? rowset_meta_ptr->segments_size() : 1; + double delete_bytes() const { + if (stat.num_rows == 0) return 0.0; + return (double)stat.bytes * ((double)stat.num_dels / (double)stat.num_rows); } + double read_bytes() const { return (double)stat.bytes - delete_bytes() + 1; } + void calculate_score() { score = (io_count() * 1024 * 1024) / read_bytes(); } + bool operator<(const RowsetCandidate& other) const { return score < other.score; } RowsetMetadataPtr rowset_meta_ptr; RowsetStat stat; + double score; }; class PrimaryCompactionPolicy : public CompactionPolicy { @@ -125,20 +128,31 @@ class PrimaryCompactionPolicy : public CompactionPolicy { ~PrimaryCompactionPolicy() override = default; StatusOr> pick_rowsets() override; - StatusOr> pick_rowsets(const std::shared_ptr& tablet_metadata); + StatusOr> pick_rowsets(const std::shared_ptr& tablet_metadata, + std::vector* has_dels); + +private: + int64_t _get_data_size(const std::shared_ptr& tablet_metadata) { + int size = 0; + for (const auto& rowset : tablet_metadata->rowsets()) { + size += rowset.data_size(); + } + return size; + } }; StatusOr> PrimaryCompactionPolicy::pick_rowsets() { - return pick_rowsets(_tablet_metadata); + return pick_rowsets(_tablet_metadata, nullptr); } StatusOr> PrimaryCompactionPolicy::pick_rowsets( - const std::shared_ptr& tablet_metadata) { + const std::shared_ptr& tablet_metadata, std::vector* has_dels) { std::vector input_rowsets; UpdateManager* mgr = _tablet_mgr->update_mgr(); std::priority_queue rowset_queue; const auto tablet_id = tablet_metadata->id(); const auto tablet_version = tablet_metadata->version(); + const auto tablet_data_size = _get_data_size(tablet_metadata); for (const auto& rowset_pb : tablet_metadata->rowsets()) { RowsetStat stat; stat.num_rows = rowset_pb.num_rows(); @@ -152,14 +166,19 @@ StatusOr> PrimaryCompactionPolicy::pick_rowsets( std::stringstream input_infos; while (!rowset_queue.empty()) { const auto& rowset_candidate = rowset_queue.top(); - cur_compaction_result_bytes += rowset_candidate.stat.bytes; - if (input_rowsets.size() > 0 && cur_compaction_result_bytes > config::update_compaction_result_bytes * 2) { + cur_compaction_result_bytes += rowset_candidate.read_bytes(); + if (input_rowsets.size() > 0 && + cur_compaction_result_bytes > std::max(config::update_compaction_result_bytes * 2, tablet_data_size / 2)) { break; } input_rowsets.emplace_back(std::make_shared(tablet, std::move(rowset_candidate.rowset_meta_ptr))); + if (has_dels != nullptr) { + has_dels->push_back(rowset_candidate.delete_bytes() > 0); + } input_infos << input_rowsets.back()->id() << "|"; - if (cur_compaction_result_bytes > config::update_compaction_result_bytes || + // Allow to merge half of this tablet + if (cur_compaction_result_bytes > std::max(config::update_compaction_result_bytes, tablet_data_size / 2) || input_rowsets.size() >= config::max_update_compaction_num_singleton_deltas) { break; } @@ -174,10 +193,18 @@ StatusOr> PrimaryCompactionPolicy::pick_rowsets( StatusOr primary_compaction_score_by_policy(const std::shared_ptr& metadata) { auto tablet_mgr = ExecEnv::GetInstance()->lake_tablet_manager(); PrimaryCompactionPolicy policy(tablet_mgr, metadata); - ASSIGN_OR_RETURN(auto pick_rowsets, policy.pick_rowsets()); + std::vector has_dels; + ASSIGN_OR_RETURN(auto pick_rowsets, policy.pick_rowsets(metadata, &has_dels)); uint32_t segment_num_score = 0; - for (const auto& pick_rowset : pick_rowsets) { - segment_num_score += pick_rowset->is_overlapped() ? pick_rowset->num_segments() : 1; + for (int i = 0; i < pick_rowsets.size(); i++) { + const auto& pick_rowset = pick_rowsets[i]; + const bool has_del = has_dels[i]; + auto current_score = pick_rowset->is_overlapped() ? pick_rowset->num_segments() : 1; + if (has_del) { + // if delvec file exist, expand score by config. + current_score *= config::update_compaction_delvec_file_io_amp_ratio; + } + segment_num_score += current_score; } return segment_num_score; } diff --git a/be/src/storage/lake/delta_writer.cpp b/be/src/storage/lake/delta_writer.cpp index 1ec2e6073deb2..58b6dffe32558 100644 --- a/be/src/storage/lake/delta_writer.cpp +++ b/be/src/storage/lake/delta_writer.cpp @@ -484,11 +484,11 @@ Status DeltaWriterImpl::finish(DeltaWriter::FinishMode mode) { } } } + RETURN_IF_ERROR(tablet.put_txn_log(txn_log)); if (_tablet_schema->keys_type() == KeysType::PRIMARY_KEYS) { // preload update state here to minimaze the cost when publishing. tablet.update_mgr()->preload_update_state(*txn_log, &tablet); } - RETURN_IF_ERROR(tablet.put_txn_log(std::move(txn_log))); return Status::OK(); } diff --git a/be/src/storage/lake/horizontal_compaction_task.cpp b/be/src/storage/lake/horizontal_compaction_task.cpp index 289033f4f41b7..c13937cdafa88 100644 --- a/be/src/storage/lake/horizontal_compaction_task.cpp +++ b/be/src/storage/lake/horizontal_compaction_task.cpp @@ -21,6 +21,7 @@ #include "storage/lake/tablet_reader.h" #include "storage/lake/tablet_writer.h" #include "storage/lake/txn_log.h" +#include "storage/lake/update_manager.h" #include "storage/rowset/column_reader.h" #include "storage/storage_engine.h" #include "storage/tablet_reader_params.h" @@ -104,8 +105,12 @@ Status HorizontalCompactionTask::execute(Progress* progress, CancelFunc cancel_f op_compaction->mutable_output_rowset()->set_num_rows(writer->num_rows()); op_compaction->mutable_output_rowset()->set_data_size(writer->data_size()); op_compaction->mutable_output_rowset()->set_overlapped(false); - Status st = _tablet.put_txn_log(std::move(txn_log)); - return st; + RETURN_IF_ERROR(_tablet.put_txn_log(txn_log)); + if (tablet_schema->keys_type() == KeysType::PRIMARY_KEYS) { + // preload primary key table's compaction state + _tablet.update_mgr()->preload_compaction_state(*txn_log, _tablet, tablet_schema); + } + return Status::OK(); } StatusOr HorizontalCompactionTask::calculate_chunk_size() { diff --git a/be/src/storage/lake/rowset.h b/be/src/storage/lake/rowset.h index 1203e4af4234e..87da6184afb4e 100644 --- a/be/src/storage/lake/rowset.h +++ b/be/src/storage/lake/rowset.h @@ -79,6 +79,8 @@ class Rowset { [[nodiscard]] Status load_segments(std::vector* segments, bool fill_data_cache, bool fill_metadata_cache); + int64_t tablet_id() const { return _tablet.id(); } + private: Tablet _tablet; RowsetMetadataPtr _rowset_metadata; diff --git a/be/src/storage/lake/txn_log_applier.cpp b/be/src/storage/lake/txn_log_applier.cpp index 80300d9337a68..5924542b41b6f 100644 --- a/be/src/storage/lake/txn_log_applier.cpp +++ b/be/src/storage/lake/txn_log_applier.cpp @@ -77,7 +77,7 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier { RETURN_IF_ERROR(apply_write_log(log.op_write(), log.txn_id())); } if (log.has_op_compaction()) { - RETURN_IF_ERROR(apply_compaction_log(log.op_compaction())); + RETURN_IF_ERROR(apply_compaction_log(log.op_compaction(), log.txn_id())); } if (log.has_op_schema_change()) { RETURN_IF_ERROR(apply_schema_change_log(log.op_schema_change())); @@ -118,7 +118,7 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier { &_builder, _base_version); } - Status apply_compaction_log(const TxnLogPB_OpCompaction& op_compaction) { + Status apply_compaction_log(const TxnLogPB_OpCompaction& op_compaction, int64_t txn_id) { // get lock to avoid gc _tablet.update_mgr()->lock_shard_pk_index_shard(_tablet.id()); DeferOp defer([&]() { _tablet.update_mgr()->unlock_shard_pk_index_shard(_tablet.id()); }); @@ -133,8 +133,8 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier { DCHECK(!op_compaction.has_output_rowset() || op_compaction.output_rowset().num_rows() == 0); return Status::OK(); } - return _tablet.update_mgr()->publish_primary_compaction(op_compaction, *_metadata, _tablet, _index_entry, - &_builder, _base_version); + return _tablet.update_mgr()->publish_primary_compaction(op_compaction, txn_id, *_metadata, _tablet, + _index_entry, &_builder, _base_version); } Status apply_schema_change_log(const TxnLogPB_OpSchemaChange& op_schema_change) { diff --git a/be/src/storage/lake/update_compaction_state.cpp b/be/src/storage/lake/update_compaction_state.cpp index f2b71175086bd..0bfd0dcbca829 100644 --- a/be/src/storage/lake/update_compaction_state.cpp +++ b/be/src/storage/lake/update_compaction_state.cpp @@ -25,19 +25,22 @@ namespace starrocks::lake { -CompactionState::CompactionState(Rowset* rowset, UpdateManager* update_manager) { - if (rowset->num_segments() > 0) { - pk_cols.resize(rowset->num_segments()); - } - _update_manager = update_manager; -} - CompactionState::~CompactionState() { - _update_manager->compaction_state_mem_tracker()->release(_memory_usage); + if (_update_manager != nullptr) { + _update_manager->compaction_state_mem_tracker()->release(_memory_usage); + } } -Status CompactionState::load_segments(Rowset* rowset, const TabletSchemaCSPtr& tablet_schema, uint32_t segment_id) { +Status CompactionState::load_segments(Rowset* rowset, UpdateManager* update_manager, + const TabletSchemaCSPtr& tablet_schema, uint32_t segment_id) { TRACE_COUNTER_SCOPE_LATENCY_US("load_segments_latency_us"); + if (pk_cols.empty() && rowset->num_segments() > 0) { + pk_cols.resize(rowset->num_segments()); + } else { + DCHECK(pk_cols.size() == rowset->num_segments()); + } + _update_manager = update_manager; + _tablet_id = rowset->tablet_id(); if (segment_id >= pk_cols.size() && pk_cols.size() != 0) { std::string msg = strings::Substitute("Error segment id: $0 vs $1", segment_id, pk_cols.size()); LOG(WARNING) << msg; @@ -63,19 +66,16 @@ Status CompactionState::_load_segments(Rowset* rowset, const TabletSchemaCSPtr& CHECK(PrimaryKeyEncoder::create_column(pkey_schema, &pk_column, true).ok()); OlapReaderStatistics stats; - auto res = rowset->get_each_segment_iterator(pkey_schema, &stats); - if (!res.ok()) { - return res.status(); + if (_segment_iters.empty()) { + ASSIGN_OR_RETURN(_segment_iters, rowset->get_each_segment_iterator(pkey_schema, &stats)); } - - auto& itrs = res.value(); - CHECK_EQ(itrs.size(), rowset->num_segments()); + CHECK_EQ(_segment_iters.size(), rowset->num_segments()); // only hold pkey, so can use larger chunk size auto chunk_shared_ptr = ChunkHelper::new_chunk(pkey_schema, config::vector_chunk_size); auto chunk = chunk_shared_ptr.get(); - auto itr = itrs[segment_id].get(); + auto itr = _segment_iters[segment_id].get(); if (itr == nullptr) { return Status::OK(); } @@ -110,4 +110,8 @@ void CompactionState::release_segments(uint32_t segment_id) { pk_cols[segment_id]->reset_column(); } +std::string CompactionState::to_string() const { + return strings::Substitute("CompactionState tablet:$0", _tablet_id); +} + } // namespace starrocks::lake diff --git a/be/src/storage/lake/update_compaction_state.h b/be/src/storage/lake/update_compaction_state.h index 19974fb7b745d..6adf348f2e3bc 100644 --- a/be/src/storage/lake/update_compaction_state.h +++ b/be/src/storage/lake/update_compaction_state.h @@ -30,26 +30,36 @@ class UpdateManager; class CompactionState { public: - CompactionState(Rowset* rowset, UpdateManager* update_manager); + CompactionState() {} ~CompactionState(); CompactionState(const CompactionState&) = delete; CompactionState& operator=(const CompactionState&) = delete; - Status load_segments(Rowset* rowset, const TabletSchemaCSPtr& tablet_schema, uint32_t segment_id); + Status load_segments(Rowset* rowset, UpdateManager* update_manager, const TabletSchemaCSPtr& tablet_schema, + uint32_t segment_id); void release_segments(uint32_t segment_id); std::size_t memory_usage() const { return _memory_usage; } + std::string to_string() const; + std::vector pk_cols; private: Status _load_segments(Rowset* rowset, const TabletSchemaCSPtr& tablet_schema, uint32_t segment_id); - UpdateManager* _update_manager; + UpdateManager* _update_manager = nullptr; size_t _memory_usage = 0; + std::vector _segment_iters; + int64_t _tablet_id = 0; }; +inline std::ostream& operator<<(std::ostream& os, const CompactionState& o) { + os << o.to_string(); + return os; +} + } // namespace lake } // namespace starrocks diff --git a/be/src/storage/lake/update_manager.cpp b/be/src/storage/lake/update_manager.cpp index 05210e179e6de..0f0185852b604 100644 --- a/be/src/storage/lake/update_manager.cpp +++ b/be/src/storage/lake/update_manager.cpp @@ -34,6 +34,7 @@ namespace starrocks::lake { UpdateManager::UpdateManager(LocationProvider* location_provider, MemTracker* mem_tracker) : _index_cache(std::numeric_limits::max()), _update_state_cache(std::numeric_limits::max()), + _compaction_cache(std::numeric_limits::max()), _location_provider(location_provider), _pk_index_shards(config::pk_index_map_shard_size) { _update_mem_tracker = mem_tracker; @@ -48,6 +49,10 @@ UpdateManager::UpdateManager(LocationProvider* location_provider, MemTracker* me _index_cache.set_capacity(byte_limits * update_mem_percent); } +inline std::string cache_key(uint32_t tablet_id, int64_t txn_id) { + return strings::Substitute("$0_$1", tablet_id, txn_id); +} + Status LakeDelvecLoader::load(const TabletSegmentId& tsid, int64_t version, DelVectorPtr* pdelvec) { return _update_mgr->get_del_vec(tsid, version, _pk_builder, pdelvec); } @@ -114,7 +119,7 @@ Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_writ // 1. load rowset update data to cache, get upsert and delete list const uint32_t rowset_id = metadata.next_rowset_id(); auto tablet_schema = std::make_shared(metadata.schema()); - auto state_entry = _update_state_cache.get_or_create(strings::Substitute("$0_$1", tablet->id(), txn_id)); + auto state_entry = _update_state_cache.get_or_create(cache_key(tablet->id(), txn_id)); state_entry->update_expire_time(MonotonicMillis() + get_cache_expire_ms()); // only use state entry once, remove it when publish finish or fail DeferOp remove_state_entry([&] { _update_state_cache.remove(state_entry); }); @@ -134,6 +139,7 @@ Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_writ // handle merge condition, skip update row which's merge condition column value is smaller than current row int32_t condition_column = _get_condition_column(op_write, *tablet_schema); // 3. update primary index, and generate delete info. + TRACE_COUNTER_SCOPE_LATENCY_US("update_index_latency_us"); for (uint32_t i = 0; i < upserts.size(); i++) { if (upserts[i] != nullptr) { if (condition_column < 0) { @@ -498,7 +504,7 @@ size_t UpdateManager::get_rowset_num_deletes(int64_t tablet_id, int64_t version, return num_dels; } -Status UpdateManager::publish_primary_compaction(const TxnLogPB_OpCompaction& op_compaction, +Status UpdateManager::publish_primary_compaction(const TxnLogPB_OpCompaction& op_compaction, int64_t txn_id, const TabletMetadata& metadata, Tablet tablet, IndexEntry* index_entry, MetaFileBuilder* builder, int64_t base_version) { auto& index = index_entry->value(); @@ -506,7 +512,11 @@ Status UpdateManager::publish_primary_compaction(const TxnLogPB_OpCompaction& op std::shared_ptr tablet_schema = std::make_shared(metadata.schema()); RowsetPtr output_rowset = std::make_shared(tablet, std::make_shared(op_compaction.output_rowset())); - auto compaction_state = std::make_unique(output_rowset.get(), this); + auto compaction_entry = _compaction_cache.get_or_create(cache_key(tablet.id(), txn_id)); + compaction_entry->update_expire_time(MonotonicMillis() + get_cache_expire_ms()); + // only use state entry once, remove it when publish finish or fail + DeferOp remove_state_entry([&] { _compaction_cache.remove(compaction_entry); }); + auto& compaction_state = compaction_entry->value(); size_t total_deletes = 0; size_t total_rows = 0; vector> delvecs; @@ -521,15 +531,19 @@ Status UpdateManager::publish_primary_compaction(const TxnLogPB_OpCompaction& op uint32_t max_src_rssid = max_rowset_id + input_rowset->segments_size() - 1; // 2. update primary index, and generate delete info. - for (size_t i = 0; i < compaction_state->pk_cols.size(); i++) { - RETURN_IF_ERROR(compaction_state->load_segments(output_rowset.get(), tablet_schema, i)); - TRACE_COUNTER_INCREMENT("state_bytes", compaction_state->memory_usage()); - auto& pk_col = compaction_state->pk_cols[i]; + TRACE_COUNTER_INCREMENT("output_rowsets_size", compaction_state.pk_cols.size()); + for (size_t i = 0; i < compaction_state.pk_cols.size(); i++) { + RETURN_IF_ERROR(compaction_state.load_segments(output_rowset.get(), this, tablet_schema, i)); + TRACE_COUNTER_INCREMENT("state_bytes", compaction_state.memory_usage()); + auto& pk_col = compaction_state.pk_cols[i]; total_rows += pk_col->size(); uint32_t rssid = rowset_id + i; tmp_deletes.clear(); // replace will not grow hashtable, so don't need to check memory limit - RETURN_IF_ERROR(index.try_replace(rssid, 0, *pk_col, max_src_rssid, &tmp_deletes)); + { + TRACE_COUNTER_SCOPE_LATENCY_US("update_index_latency_us"); + RETURN_IF_ERROR(index.try_replace(rssid, 0, *pk_col, max_src_rssid, &tmp_deletes)); + } DelVectorPtr dv = std::make_shared(); if (tmp_deletes.empty()) { dv->init(metadata.version(), nullptr, 0); @@ -538,7 +552,7 @@ Status UpdateManager::publish_primary_compaction(const TxnLogPB_OpCompaction& op total_deletes += tmp_deletes.size(); } delvecs.emplace_back(rssid, dv); - compaction_state->release_segments(i); + compaction_state.release_segments(i); } // 3. update TabletMeta and write to meta file @@ -646,18 +660,29 @@ bool UpdateManager::TEST_check_primary_index_cache_ref(uint32_t tablet_id, uint3 return true; } -bool UpdateManager::TEST_check_update_state_cache_noexist(uint32_t tablet_id, int64_t txn_id) { - auto state_entry = _update_state_cache.get(strings::Substitute("$0_$1", tablet_id, txn_id)); +bool UpdateManager::TEST_check_update_state_cache_absent(uint32_t tablet_id, int64_t txn_id) { + auto state_entry = _update_state_cache.get(cache_key(tablet_id, txn_id)); if (state_entry == nullptr) { return true; } else { + _update_state_cache.release(state_entry); + return false; + } +} + +bool UpdateManager::TEST_check_compaction_cache_absent(uint32_t tablet_id, int64_t txn_id) { + auto compaction_entry = _compaction_cache.get(cache_key(tablet_id, txn_id)); + if (compaction_entry == nullptr) { + return true; + } else { + _compaction_cache.release(compaction_entry); return false; } } void UpdateManager::preload_update_state(const TxnLog& txnlog, Tablet* tablet) { // use tabletid-txnid as update state cache's key, so it can retry safe. - auto state_entry = _update_state_cache.get_or_create(strings::Substitute("$0_$1", tablet->id(), txnlog.txn_id())); + auto state_entry = _update_state_cache.get_or_create(cache_key(tablet->id(), txnlog.txn_id())); state_entry->update_expire_time(MonotonicMillis() + get_cache_expire_ms()); auto& state = state_entry->value(); _update_state_cache.update_object_size(state_entry, state.memory_usage()); @@ -684,4 +709,34 @@ void UpdateManager::preload_update_state(const TxnLog& txnlog, Tablet* tablet) { } } +void UpdateManager::preload_compaction_state(const TxnLog& txnlog, const Tablet& tablet, + const TabletSchemaCSPtr& tablet_schema) { + // no need to preload if output rowset is empty. + const int segments_size = txnlog.op_compaction().output_rowset().segments_size(); + if (segments_size <= 0) return; + RowsetPtr output_rowset = + std::make_shared(tablet, std::make_shared(txnlog.op_compaction().output_rowset())); + // use tabletid-txnid as compaction state cache's key, so it can retry safe. + auto compaction_entry = _compaction_cache.get_or_create(cache_key(tablet.id(), txnlog.txn_id())); + compaction_entry->update_expire_time(MonotonicMillis() + get_cache_expire_ms()); + auto& compaction_state = compaction_entry->value(); + // preload compaction state, only load first output segment, to avoid too much memory cost + auto st = Status::OK(); + for (int i = 0; i < segments_size; i++) { + st = compaction_state.load_segments(output_rowset.get(), this, tablet_schema, i); + if (!st.ok() || _compaction_state_mem_tracker->any_limit_exceeded()) { + break; + } + } + if (!st.ok()) { + _compaction_cache.remove(compaction_entry); + LOG(ERROR) << strings::Substitute("lake primary table preload_compaction_state id:$0 error:$1", tablet.id(), + st.to_string()); + // not return error even it fail, because we can load compaction state in publish again. + } else { + // just release it, will use it again in publish + _compaction_cache.release(compaction_entry); + } +} + } // namespace starrocks::lake diff --git a/be/src/storage/lake/update_manager.h b/be/src/storage/lake/update_manager.h index 8731bf84ca9d9..dd64bce97b89e 100644 --- a/be/src/storage/lake/update_manager.h +++ b/be/src/storage/lake/update_manager.h @@ -22,6 +22,7 @@ #include "storage/lake/rowset_update_state.h" #include "storage/lake/tablet_metadata.h" #include "storage/lake/types_fwd.h" +#include "storage/lake/update_compaction_state.h" #include "util/dynamic_cache.h" #include "util/mem_info.h" #include "util/parse_util.h" @@ -89,9 +90,9 @@ class UpdateManager { // get del nums from rowset, for compaction policy size_t get_rowset_num_deletes(int64_t tablet_id, int64_t version, const RowsetMetadataPB& rowset_meta); - Status publish_primary_compaction(const TxnLogPB_OpCompaction& op_compaction, const TabletMetadata& metadata, - Tablet tablet, IndexEntry* index_entry, MetaFileBuilder* builder, - int64_t base_version); + Status publish_primary_compaction(const TxnLogPB_OpCompaction& op_compaction, int64_t txn_id, + const TabletMetadata& metadata, Tablet tablet, IndexEntry* index_entry, + MetaFileBuilder* builder, int64_t base_version); // remove primary index entry from cache, called when publish version error happens. // Because update primary index isn't idempotent, so if primary index update success, but @@ -110,11 +111,13 @@ class UpdateManager { void evict_cache(int64_t memory_urgent_level, int64_t memory_high_level); void preload_update_state(const TxnLog& op_write, Tablet* tablet); + void preload_compaction_state(const TxnLog& txnlog, const Tablet& tablet, const TabletSchemaCSPtr& tablet_schema); // check if pk index's cache ref == ref_cnt bool TEST_check_primary_index_cache_ref(uint32_t tablet_id, uint32_t ref_cnt); - bool TEST_check_update_state_cache_noexist(uint32_t tablet_id, int64_t txn_id); + bool TEST_check_update_state_cache_absent(uint32_t tablet_id, int64_t txn_id); + bool TEST_check_compaction_cache_absent(uint32_t tablet_id, int64_t txn_id); Status update_primary_index_memory_limit(int32_t update_memory_limit_percent) { int64_t byte_limits = ParseUtil::parse_mem_spec(config::mem_limit, MemInfo::physical_mem()); @@ -179,6 +182,8 @@ class UpdateManager { // rowset cache DynamicCache _update_state_cache; + // compaction cache + DynamicCache _compaction_cache; std::atomic _last_clear_expired_cache_millis = 0; LocationProvider* _location_provider = nullptr; TabletManager* _tablet_mgr = nullptr; diff --git a/be/src/storage/lake/vertical_compaction_task.cpp b/be/src/storage/lake/vertical_compaction_task.cpp index 8ce755f18089e..dd706ed0867e6 100644 --- a/be/src/storage/lake/vertical_compaction_task.cpp +++ b/be/src/storage/lake/vertical_compaction_task.cpp @@ -22,6 +22,7 @@ #include "storage/lake/tablet_reader.h" #include "storage/lake/tablet_writer.h" #include "storage/lake/txn_log.h" +#include "storage/lake/update_manager.h" #include "storage/row_source_mask.h" #include "storage/rowset/column_reader.h" #include "storage/storage_engine.h" @@ -97,7 +98,11 @@ Status VerticalCompactionTask::execute(Progress* progress, CancelFunc cancel_fun op_compaction->mutable_output_rowset()->set_num_rows(writer->num_rows()); op_compaction->mutable_output_rowset()->set_data_size(writer->data_size()); op_compaction->mutable_output_rowset()->set_overlapped(false); - RETURN_IF_ERROR(_tablet.put_txn_log(std::move(txn_log))); + RETURN_IF_ERROR(_tablet.put_txn_log(txn_log)); + if (_tablet_schema->keys_type() == KeysType::PRIMARY_KEYS) { + // preload primary key table's compaction state + _tablet.update_mgr()->preload_compaction_state(*txn_log, _tablet, _tablet_schema); + } return Status::OK(); } diff --git a/be/test/storage/lake/primary_key_compaction_task_test.cpp b/be/test/storage/lake/primary_key_compaction_task_test.cpp index 67dab2d2c9a72..1fb20f4cb1f4d 100644 --- a/be/test/storage/lake/primary_key_compaction_task_test.cpp +++ b/be/test/storage/lake/primary_key_compaction_task_test.cpp @@ -248,7 +248,9 @@ TEST_P(LakePrimaryKeyCompactionTest, test1) { CompactionTask::Progress progress; ASSERT_OK(task->execute(&progress, CompactionTask::kNoCancelFn)); EXPECT_EQ(100, progress.value()); + EXPECT_FALSE(_update_mgr->TEST_check_compaction_cache_absent(tablet_id, txn_id)); ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status()); + EXPECT_TRUE(_update_mgr->TEST_check_compaction_cache_absent(tablet_id, txn_id)); version++; ASSERT_EQ(kChunkSize, read(version)); if (GetParam().enable_persistent_index) { @@ -303,7 +305,9 @@ TEST_P(LakePrimaryKeyCompactionTest, test2) { CompactionTask::Progress progress; ASSERT_OK(task->execute(&progress, CompactionTask::kNoCancelFn)); EXPECT_EQ(100, progress.value()); + EXPECT_FALSE(_update_mgr->TEST_check_compaction_cache_absent(tablet_id, txn_id)); ASSERT_OK(publish_single_version(_tablet_metadata->id(), version + 1, txn_id).status()); + EXPECT_TRUE(_update_mgr->TEST_check_compaction_cache_absent(tablet_id, txn_id)); version++; ASSERT_EQ(kChunkSize * 3, read(version)); if (GetParam().enable_persistent_index) { @@ -367,7 +371,9 @@ TEST_P(LakePrimaryKeyCompactionTest, test3) { CompactionTask::Progress progress; ASSERT_OK(task->execute(&progress, CompactionTask::kNoCancelFn)); EXPECT_EQ(100, progress.value()); + EXPECT_FALSE(_update_mgr->TEST_check_compaction_cache_absent(tablet_id, txn_id)); ASSERT_OK(publish_single_version(_tablet_metadata->id(), version + 1, txn_id).status()); + EXPECT_TRUE(_update_mgr->TEST_check_compaction_cache_absent(tablet_id, txn_id)); version++; if (GetParam().enable_persistent_index) { check_local_persistent_index_meta(tablet_id, version); @@ -487,7 +493,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_policy2) { ASSIGN_OR_ABORT(auto input_rowsets, compaction_policy->pick_rowsets()); EXPECT_EQ(4, input_rowsets.size()); - // check the rowset order, pick rowset#1 first, because it have deleted rows. + // check the rowset order, pick rowset#1 first, because it is empty. // Next order is rowset#4#2#3, by their byte size. EXPECT_EQ(input_rowsets[0]->id(), 1); EXPECT_EQ(input_rowsets[1]->id(), 4); @@ -563,16 +569,16 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_policy3) { ASSIGN_OR_ABORT(auto input_rowsets, compaction_policy->pick_rowsets()); EXPECT_EQ(4, input_rowsets.size()); EXPECT_EQ(1, input_rowsets[0]->num_segments()); - EXPECT_EQ(3, input_rowsets[1]->num_segments()); + EXPECT_EQ(1, input_rowsets[1]->num_segments()); EXPECT_EQ(2, input_rowsets[2]->num_segments()); - EXPECT_EQ(1, input_rowsets[3]->num_segments()); + EXPECT_EQ(3, input_rowsets[3]->num_segments()); - // check the rowset order, pick rowset#1 first, because it have deleted rows. - // Next order is rowset#4#2#3, by their segment count. + // check the rowset order, pick rowset#1 first, because it is empty. + // Next order is rowset#7#2#4, by their segment count. EXPECT_EQ(input_rowsets[0]->id(), 1); - EXPECT_EQ(input_rowsets[1]->id(), 4); + EXPECT_EQ(input_rowsets[1]->id(), 7); EXPECT_EQ(input_rowsets[2]->id(), 2); - EXPECT_EQ(input_rowsets[3]->id(), 7); + EXPECT_EQ(input_rowsets[3]->id(), 4); } TEST_P(LakePrimaryKeyCompactionTest, test_compaction_score_by_policy) { @@ -670,22 +676,28 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_sorted) { check_task(task); CompactionTask::Progress progress; ASSERT_OK(task->execute(&progress, CompactionTask::kNoCancelFn)); + EXPECT_FALSE(_update_mgr->TEST_check_compaction_cache_absent(tablet_id, txn_id)); EXPECT_EQ(100, progress.value()); // check compaction state ASSIGN_OR_ABORT(auto txn_log, tablet.get_txn_log(txn_id)); RowsetPtr output_rowset = std::make_shared( tablet, std::make_shared(txn_log->op_compaction().output_rowset())); - auto compaction_state = std::make_unique(output_rowset.get(), _update_mgr.get()); - for (size_t i = 0; i < compaction_state->pk_cols.size(); i++) { - ASSERT_OK(compaction_state->load_segments(output_rowset.get(), _tablet_schema, i)); - auto& pk_col = compaction_state->pk_cols[i]; - EXPECT_EQ(_update_mgr->compaction_state_mem_tracker()->consumption(), pk_col->memory_usage()); - compaction_state->release_segments(i); - EXPECT_EQ(_update_mgr->compaction_state_mem_tracker()->consumption(), 0); + { + auto compaction_state = std::make_unique(); + EXPECT_TRUE(_update_mgr->compaction_state_mem_tracker() != nullptr); + auto prev = _update_mgr->compaction_state_mem_tracker()->consumption(); + for (size_t i = 0; i < output_rowset->num_segments(); i++) { + ASSERT_OK(compaction_state->load_segments(output_rowset.get(), _update_mgr.get(), _tablet_schema, i)); + auto& pk_col = compaction_state->pk_cols[i]; + EXPECT_EQ(_update_mgr->compaction_state_mem_tracker()->consumption(), pk_col->memory_usage() + prev); + compaction_state->release_segments(i); + EXPECT_EQ(_update_mgr->compaction_state_mem_tracker()->consumption(), prev); + } } // publish version ASSERT_OK(publish_single_version(_tablet_metadata->id(), version + 1, txn_id).status()); version++; + EXPECT_TRUE(_update_mgr->TEST_check_compaction_cache_absent(tablet_id, txn_id)); ASSERT_EQ(kChunkSize * 3, read(version)); ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version)); @@ -699,6 +711,9 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_sorted) { for (int i = 0; i < key_list.size() - 1; i++) { EXPECT_TRUE(key_list[i] < key_list[i + 1]); } + + // Make sure all memory consume by compaction have return. + EXPECT_EQ(_update_mgr->compaction_state_mem_tracker()->consumption(), 0); } INSTANTIATE_TEST_SUITE_P(LakePrimaryKeyCompactionTest, LakePrimaryKeyCompactionTest, diff --git a/be/test/storage/lake/primary_key_publish_test.cpp b/be/test/storage/lake/primary_key_publish_test.cpp index fabc19dca66a1..137e832ecac23 100644 --- a/be/test/storage/lake/primary_key_publish_test.cpp +++ b/be/test/storage/lake/primary_key_publish_test.cpp @@ -200,7 +200,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_read_success) { writer->close(); ASSERT_OK(publish_single_version(_tablet_metadata->id(), 2, txn_id).status()); - EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_noexist(_tablet_metadata->id(), txn_id)); + EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(_tablet_metadata->id(), txn_id)); // read at version 2 ASSIGN_OR_ABORT(auto reader, tablet.new_reader(2, *_schema)); @@ -238,7 +238,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_multitime_check_result) { delta_writer->close(); // Publish version ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status()); - EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_noexist(tablet_id, txn_id)); + EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id)); version++; } ASSERT_EQ(kChunkSize, read_rows(tablet_id, version)); @@ -278,7 +278,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_fail_retry) { delta_writer->close(); // Publish version ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status()); - EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_noexist(tablet_id, txn_id)); + EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id)); version++; } // write failed @@ -330,7 +330,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_fail_retry) { delta_writer->close(); // Publish version ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status()); - EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_noexist(tablet_id, txn_id)); + EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id)); version++; } ASSERT_EQ(kChunkSize * 5, read_rows(tablet_id, version)); @@ -362,7 +362,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_publish_multi_times) { delta_writer->close(); // Publish version ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status()); - EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_noexist(tablet_id, txn_id)); + EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id)); version++; txns.push_back(txn_id); } @@ -465,7 +465,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_resolve_conflict) { // publish in order for (int64_t txn_id : txn_ids) { ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status()); - EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_noexist(tablet_id, txn_id)); + EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id)); version++; } // check result @@ -509,7 +509,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_read_success_multiple_tablet) { w->close(); // Publish version ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status()); - EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_noexist(tablet_id, txn_id)); + EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id)); } version++; } @@ -546,7 +546,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_largedata) { delta_writer->close(); // Publish version ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status()); - EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_noexist(tablet_id, txn_id)); + EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id)); version++; } ASSERT_EQ(kChunkSize * N, read_rows(tablet_id, version)); diff --git a/be/test/storage/lake/test_util.h b/be/test/storage/lake/test_util.h index faefe7f53213e..d263dffb26d79 100644 --- a/be/test/storage/lake/test_util.h +++ b/be/test/storage/lake/test_util.h @@ -53,7 +53,7 @@ class TestBase : public ::testing::Test { _parent_tracker(std::make_unique(-1)), _mem_tracker(std::make_unique(-1, "", _parent_tracker.get())), _lp(std::make_unique(_test_dir)), - _update_mgr(std::make_unique(_lp.get())), + _update_mgr(std::make_unique(_lp.get(), _mem_tracker.get())), _tablet_mgr(std::make_unique(_lp.get(), _update_mgr.get(), cache_limit)) {} void remove_test_dir_or_die() { ASSERT_OK(fs::remove_all(_test_dir)); }