Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into baseline_resync
Browse files Browse the repository at this point in the history
  • Loading branch information
koujl committed Nov 29, 2024
2 parents dac10d8 + 160f8c1 commit 5070584
Show file tree
Hide file tree
Showing 23 changed files with 1,261 additions and 448 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.9"
version = "2.1.11"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
3 changes: 2 additions & 1 deletion src/include/homeobject/pg_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ struct PGInfo {
pg_id_t id;
mutable MemberSet members;
peer_id_t replica_set_uuid;
u_int64_t size;
uint64_t size;
uint64_t chunk_size;

auto operator<=>(PGInfo const& rhs) const { return id <=> rhs.id; }
auto operator==(PGInfo const& rhs) const { return id == rhs.id; }
Expand Down
2 changes: 1 addition & 1 deletion src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
namespace homeobject {

ENUM(ShardError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNSUPPORTED_OP, UNKNOWN_PG, UNKNOWN_SHARD,
PG_NOT_READY, CRC_MISMATCH);
PG_NOT_READY, CRC_MISMATCH, NO_SPACE_LEFT);

struct ShardInfo {
enum class State : uint8_t {
Expand Down
12 changes: 12 additions & 0 deletions src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,15 @@ target_link_libraries(homestore_test PUBLIC
)
add_test(NAME HomestoreTest COMMAND homestore_test -csv error --executor immediate --config_path ./ --override_config homestore_config.consensus.snapshot_freq_distance:0)
set_property(TEST HomestoreTest PROPERTY RUN_SERIAL 1)

add_executable (homestore_test_dynamic)
target_sources(homestore_test_dynamic PRIVATE
$<TARGET_OBJECTS:homestore_tests_dynamic>
)
target_link_libraries(homestore_test_dynamic PUBLIC
homeobject_homestore
${COMMON_TEST_DEPS}
)

add_test(NAME HomestoreTestDynamic COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./ --override_config homestore_config.consensus.snapshot_freq_distance:0)

404 changes: 243 additions & 161 deletions src/lib/homestore_backend/heap_chunk_selector.cpp

Large diffs are not rendered by default.

110 changes: 74 additions & 36 deletions src/lib/homestore_backend/heap_chunk_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <homestore/vchunk.h>
#include <homestore/homestore_decl.hpp>
#include <homestore/blk.h>
#include <sisl/utility/enum.hpp>

#include <queue>
#include <vector>
Expand All @@ -15,6 +16,8 @@

namespace homeobject {

ENUM(ChunkState, uint8_t, AVAILABLE = 0, INUSE);

using csharedChunk = homestore::cshared< homestore::Chunk >;

class HeapChunkSelector : public homestore::ChunkSelector {
Expand All @@ -23,61 +26,95 @@ class HeapChunkSelector : public homestore::ChunkSelector {
~HeapChunkSelector() = default;

using VChunk = homestore::VChunk;
class VChunkComparator {
using chunk_num_t = homestore::chunk_num_t;

class ExtendedVChunk : public VChunk {
public:
bool operator()(VChunk& lhs, VChunk& rhs) { return lhs.available_blks() < rhs.available_blks(); }
ExtendedVChunk(csharedChunk const& chunk) :
VChunk(chunk), m_state(ChunkState::AVAILABLE), m_pg_id(), m_v_chunk_id() {}
~ExtendedVChunk() = default;
ChunkState m_state;
std::optional< pg_id_t > m_pg_id;
std::optional< chunk_num_t > m_v_chunk_id;
bool available() const { return m_state == ChunkState::AVAILABLE; }
};

class VChunkDefragComparator {
class ExtendedVChunkComparator {
public:
bool operator()(VChunk& lhs, VChunk& rhs) { return lhs.get_defrag_nblks() < rhs.get_defrag_nblks(); }
bool operator()(std::shared_ptr< ExtendedVChunk >& lhs, std::shared_ptr< ExtendedVChunk >& rhs) {
return lhs->available_blks() < rhs->available_blks();
}
};
using ExtendedVChunkHeap =
std::priority_queue< std::shared_ptr< ExtendedVChunk >, std::vector< std::shared_ptr< ExtendedVChunk > >,
ExtendedVChunkComparator >;

using VChunkHeap = std::priority_queue< VChunk, std::vector< VChunk >, VChunkComparator >;
using VChunkDefragHeap = std::priority_queue< VChunk, std::vector< VChunk >, VChunkDefragComparator >;
using chunk_num_t = homestore::chunk_num_t;

struct PerDevHeap {
struct ChunkHeap {
std::mutex mtx;
VChunkHeap m_heap;
ExtendedVChunkHeap m_heap;
std::atomic_size_t available_blk_count;
uint64_t m_total_blks{0}; // initlized during boot, and will not change during runtime;
uint32_t size() const { return m_heap.size(); }
};

struct PGChunkCollection {
std::mutex mtx;
std::vector< std::shared_ptr< ExtendedVChunk > > m_pg_chunks;
std::atomic_size_t available_num_chunks;
std::atomic_size_t available_blk_count;
uint64_t m_total_blks{0}; // initlized during boot, and will not change during runtime;
};

void add_chunk(csharedChunk&) override;

void foreach_chunks(std::function< void(csharedChunk&) >&& cb) override;

csharedChunk select_chunk([[maybe_unused]] homestore::blk_count_t nblks, const homestore::blk_alloc_hints& hints);

// this function will be used by GC flow or recovery flow to mark one specific chunk to be busy, caller should be
// responsible to use release_chunk() interface to release it when no longer to use the chunk anymore.
csharedChunk select_specific_chunk(const chunk_num_t);
// this function will be used by create shard or recovery flow to mark one specific chunk to be busy, caller should
// be responsible to use release_chunk() interface to release it when no longer to use the chunk anymore.
csharedChunk select_specific_chunk(const pg_id_t pg_id, const chunk_num_t v_chunk_id);

// this function will be used by GC flow to select a chunk for GC
csharedChunk most_defrag_chunk();
// This function returns a chunk back to ChunkSelector.
// It is used in two scenarios: 1. seal shard 2. create shard rollback
bool release_chunk(const pg_id_t pg_id, const chunk_num_t v_chunk_id);

// this function is used to return a chunk back to ChunkSelector when sealing a shard, and will only be used by
// Homeobject.
void release_chunk(const chunk_num_t);
/**
* select chunks for pg, chunks need to be in same pdev.
*
* @param pg_id The ID of the pg.
* @param pg_size The fix pg size.
* @return An optional uint32_t value representing num_chunk, or std::nullopt if no space left.
*/
std::optional< uint32_t > select_chunks_for_pg(pg_id_t pg_id, uint64_t pg_size);

// this should be called after ShardManager is initialized and get all the open shards
void build_per_dev_chunk_heap(const std::unordered_set< chunk_num_t >& excludingChunks);
// this function is used for pg info superblk persist v_chunk_id <-> p_chunk_id
std::shared_ptr< const std::vector< chunk_num_t > > get_pg_chunks(pg_id_t pg_id) const;

/**
* Retrieves the block allocation hints for a given chunk.
* pop pg top chunk
*
* @param chunk_id The ID of the chunk.
* @return The block allocation hints for the specified chunk.
* @param pg_id The ID of the pg.
* @return An optional chunk_num_t value representing v_chunk_id, or std::nullopt if no space left.
*/
homestore::blk_alloc_hints chunk_to_hints(chunk_num_t chunk_id) const;
std::optional< chunk_num_t > get_most_available_blk_chunk(pg_id_t pg_id) const;

// this should be called on each pg meta blk found
bool recover_pg_chunks(pg_id_t pg_id, std::vector< chunk_num_t >&& p_chunk_ids);

// this should be called after all pg meta blk recovered
void recover_per_dev_chunk_heap();

// this should be called after ShardManager is initialized and get all the open shards
bool recover_pg_chunks_states(pg_id_t pg_id, const std::unordered_set< chunk_num_t >& excluding_v_chunk_ids);

/**
* Returns the number of available blocks of the given device id.
*
* @param dev_id (optional) The device ID. if nullopt, it returns the maximum available blocks among all devices.
* @return The number of available blocks.
*/
uint64_t avail_blks(std::optional< uint32_t > dev_id) const;
uint64_t avail_blks(pg_id_t pg_id) const;

/**
* Returns the total number of blocks of the given device;
Expand All @@ -96,12 +133,12 @@ class HeapChunkSelector : public homestore::ChunkSelector {
uint32_t most_avail_num_chunks() const;

/**
* Returns the number of available chunks for a given device ID.
* Returns the number of available chunks for a given pg id.
*
* @param dev_id The device ID.
* @param pg_id The pg id.
* @return The number of available chunks.
*/
uint32_t avail_num_chunks(uint32_t dev_id) const;
uint32_t avail_num_chunks(pg_id_t pg_id) const;

/**
* @brief Returns the total number of chunks.
Expand All @@ -112,17 +149,18 @@ class HeapChunkSelector : public homestore::ChunkSelector {
*/
uint32_t total_chunks() const;

private:
std::unordered_map< uint32_t, std::shared_ptr< PerDevHeap > > m_per_dev_heap;

// hold all the chunks , selected or not
std::unordered_map< chunk_num_t, csharedChunk > m_chunks;
uint32_t get_chunk_size() const;

private:
void add_chunk_internal(const chunk_num_t, bool add_to_heap = true);

VChunkDefragHeap m_defrag_heap;
std::mutex m_defrag_mtx;
private:
std::unordered_map< uint32_t, std::shared_ptr< ChunkHeap > > m_per_dev_heap;

std::unordered_map< pg_id_t, std::shared_ptr< PGChunkCollection > > m_per_pg_chunks;
// hold all the chunks , selected or not
std::unordered_map< chunk_num_t, homestore::cshared< ExtendedVChunk > > m_chunks;

void remove_chunk_from_defrag_heap(const chunk_num_t);
mutable std::shared_mutex m_chunk_selector_mtx;
};
} // namespace homeobject
21 changes: 17 additions & 4 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s

RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");

if (!repl_dev->is_leader()) {
LOGW("failed to put blob for pg [{}], shard [{}], not leader", pg_id, shard.id);
return folly::makeUnexpected(BlobErrorCode::NOT_LEADER);
}

// Create a put_blob request which allocates for header, key and blob_header, user_key. Data sgs are added later
auto req = put_blob_req_ctx::make(sizeof(BlobHeader) + blob.user_key.size());
req->header()->msg_type = ReplicationMessageType::PUT_BLOB_MSG;
Expand Down Expand Up @@ -349,16 +354,19 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
return folly::makeUnexpected(homestore::ReplServiceError::FAILED);
}

auto chunk_id = get_shard_chunk(msg_header->shard_id);
if (!chunk_id.has_value()) {
std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(msg_header->shard_id);
if (shard_iter == _shard_map.end()) {
LOGW("Received a blob_put on an unknown shard:{}, underlying engine will retry this later",
msg_header->shard_id);
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}
BLOGD(msg_header->shard_id, "n/a", "Picked chunk_id={}", *chunk_id);

auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get());
BLOGD(msg_header->shard_id, "n/a", "Picked p_chunk_id={}", hs_shard->sb_->p_chunk_id);

homestore::blk_alloc_hints hints;
hints.chunk_id_hint = *chunk_id;
hints.chunk_id_hint = hs_shard->sb_->p_chunk_id;
return hints;
}

Expand All @@ -375,6 +383,11 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo

RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");

if (!repl_dev->is_leader()) {
LOGW("failed to del blob for pg [{}], shard [{}], blob_id [{}], not leader", pg_id, shard.id, blob_id);
return folly::makeUnexpected(BlobErrorCode::NOT_LEADER);
}

// Create an unaligned header request unaligned
auto req = repl_result_ctx< BlobManager::Result< BlobInfo > >::make(0u /* header_extn */,
sizeof(blob_id_t) /* key_size */);
Expand Down
7 changes: 6 additions & 1 deletion src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ void HSHomeObject::on_replica_restart() {
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_pg_meta_blk_found(std::move(buf), voidptr_cast(mblk));
},
nullptr, true);
[this](bool success) { on_pg_meta_blk_recover_completed(success); }, true);
HomeStore::instance()->meta_service().read_sub_sb(_pg_meta_name);

// recover shard
Expand Down Expand Up @@ -323,4 +323,9 @@ sisl::io_blob_safe& HSHomeObject::get_pad_buf(uint32_t pad_len) {
return zpad_bufs_[idx];
}

bool HSHomeObject::pg_exists(pg_id_t pg_id) const {
std::shared_lock lock_guard(_pg_lock);
return _pg_map.contains(pg_id);
}

} // namespace homeobject
Loading

0 comments on commit 5070584

Please sign in to comment.