From b107604b4fdfcb2d602833845b7ed86a5d3abf52 Mon Sep 17 00:00:00 2001 From: bxq2011hust Date: Wed, 24 Jul 2024 17:03:02 +0800 Subject: [PATCH] snapshot support state and block seperate --- .../workflows/workflow-self-hosted-arm.yml | 1 - ...rkflow-self-hosted-centos-static-build.yml | 1 - .../bcos-framework/storage/Common.h | 5 + bcos-ledger/src/libledger/Ledger.cpp | 1 - bcos-tool/bcos-tool/NodeConfig.cpp | 7 + libinitializer/CMakeLists.txt | 9 +- libinitializer/Initializer.cpp | 528 +++++++++++++----- libinitializer/Initializer.h | 11 +- tools/archive-tool/ArchiveService.h | 2 +- tools/archive-tool/archiveTool.cpp | 4 +- 10 files changed, 419 insertions(+), 150 deletions(-) diff --git a/.github/workflows/workflow-self-hosted-arm.yml b/.github/workflows/workflow-self-hosted-arm.yml index e90c2467e3..0e2b99392c 100644 --- a/.github/workflows/workflow-self-hosted-arm.yml +++ b/.github/workflows/workflow-self-hosted-arm.yml @@ -58,7 +58,6 @@ jobs: - name: Remove cache if correspond dir change run: ./tools/.ci/clear_build_cache.sh - - name: update vcpkg run: | cd vcpkg && git fetch origin master diff --git a/.github/workflows/workflow-self-hosted-centos-static-build.yml b/.github/workflows/workflow-self-hosted-centos-static-build.yml index 65008aa03b..ef2e4e9a0e 100644 --- a/.github/workflows/workflow-self-hosted-centos-static-build.yml +++ b/.github/workflows/workflow-self-hosted-centos-static-build.yml @@ -43,7 +43,6 @@ jobs: - name: Remove cache if correspond dir change run: ./tools/.ci/clear_build_cache.sh - - name: update vcpkg run: | cd ${{ env.VCPKG_ROOT }} && git fetch --all && git checkout master && git pull diff --git a/bcos-framework/bcos-framework/storage/Common.h b/bcos-framework/bcos-framework/storage/Common.h index d0989186b6..c312a4f1f9 100644 --- a/bcos-framework/bcos-framework/storage/Common.h +++ b/bcos-framework/bcos-framework/storage/Common.h @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -35,6 +36,10 @@ namespace bcos::storage { + +const std::string ROCKSDB = "rocksDB"; +const std::string TiKV = "TiKV"; + enum StorageError { UnknownError = -60000, diff --git a/bcos-ledger/src/libledger/Ledger.cpp b/bcos-ledger/src/libledger/Ledger.cpp index 37505d8783..ca19e4422c 100644 --- a/bcos-ledger/src/libledger/Ledger.cpp +++ b/bcos-ledger/src/libledger/Ledger.cpp @@ -1359,7 +1359,6 @@ void Ledger::asyncGetBlockTransactionHashes(bcos::protocol::BlockNumber blockNum void Ledger::asyncBatchGetTransactions(std::shared_ptr> hashes, std::function&&)> callback) { - // FIXME: adapt the getBlockStorage() std::vector hashesView; hashesView.reserve(hashes->size()); for (auto& hash : *hashes) diff --git a/bcos-tool/bcos-tool/NodeConfig.cpp b/bcos-tool/bcos-tool/NodeConfig.cpp index 2b107614be..6c73f29aa9 100644 --- a/bcos-tool/bcos-tool/NodeConfig.cpp +++ b/bcos-tool/bcos-tool/NodeConfig.cpp @@ -715,6 +715,13 @@ void NodeConfig::loadStorageConfig(boost::property_tree::ptree const& _pt) m_pdKeyPath = _pt.get("storage.pd_ssl_key_path", ""); m_enableArchive = _pt.get("storage.enable_archive", false); m_enableSeparateBlockAndState = _pt.get("storage.enable_separate_block_state", false); + if (boost::iequals(m_storageType, bcos::storage::TiKV)) + { + m_enableSeparateBlockAndState = false; + NodeConfig_LOG(INFO) << LOG_DESC("Only rocksDB support separate block and state") + << LOG_KV("separateBlockAndState", m_enableSeparateBlockAndState) + << LOG_KV("storageType", m_storageType); + } m_stateDBPath = m_storagePath; m_stateDBPath = m_storagePath + "/state"; m_blockDBPath = m_storagePath + "/block"; diff --git a/libinitializer/CMakeLists.txt b/libinitializer/CMakeLists.txt index dd11a95fac..40ba4ca4d4 100644 --- a/libinitializer/CMakeLists.txt +++ b/libinitializer/CMakeLists.txt @@ -19,9 +19,16 @@ endif() add_library(${TXPOOL_INIT_LIB} TxPoolInitializer.cpp) target_link_libraries(${TXPOOL_INIT_LIB} PUBLIC ${PROTOCOL_INIT_LIB} ${TOOL_TARGET} ${TXPOOL_TARGET}) +include(FetchContent) +FetchContent_Declare( + tomlplusplus + GIT_REPOSITORY https://github.com/marzer/tomlplusplus.git + GIT_TAG v3.4.0 +) +FetchContent_MakeAvailable(tomlplusplus) add_library(${INIT_LIB} Initializer.cpp LightNodeInitializer.h BaselineSchedulerInitializer.cpp) -list(APPEND INIT_LIB_DEPENDS ${PROTOCOL_INIT_LIB} ${FRONTSERVICE_INIT_LIB} ${TXPOOL_INIT_LIB} ${SCHEDULER_TARGET} ${STORAGE_TARGET} ${EXECUTOR_TARGET} ${RPC_TARGET} transaction-scheduler transaction-executor bcos-boostssl) +list(APPEND INIT_LIB_DEPENDS ${PROTOCOL_INIT_LIB} ${FRONTSERVICE_INIT_LIB} ${TXPOOL_INIT_LIB} ${SCHEDULER_TARGET} ${STORAGE_TARGET} ${EXECUTOR_TARGET} ${RPC_TARGET} transaction-scheduler transaction-executor bcos-boostssl tomlplusplus::tomlplusplus) if(WITH_TIKV) list(APPEND INIT_LIB_DEPENDS ${LEADER_ELECTION_TARGET}) endif() diff --git a/libinitializer/Initializer.cpp b/libinitializer/Initializer.cpp index 17fffec1c6..0d8d554914 100644 --- a/libinitializer/Initializer.cpp +++ b/libinitializer/Initializer.cpp @@ -35,6 +35,7 @@ #include "bcos-scheduler/src/TarsExecutorManager.h" #include "bcos-storage/RocksDBStorage.h" #include "bcos-task/Wait.h" +#include "bcos-utilities/Error.h" #include "fisco-bcos-tars-service/Common/TarsUtils.h" #include "libinitializer/BaselineSchedulerInitializer.h" #include "libinitializer/ProPBFTInitializer.h" @@ -61,10 +62,13 @@ #include #include #include +#include #include #include #include +#include #include +#include #include using namespace bcos; @@ -685,7 +689,7 @@ void Initializer::prune() } } -rocksdb::DB* createReadOnlyRocksDB(const std::string& path) +std::unique_ptr createReadOnlyRocksDB(const std::string& path) { rocksdb::Options options; options.create_if_missing = false; @@ -696,7 +700,7 @@ rocksdb::DB* createReadOnlyRocksDB(const std::string& path) std::cout << "open read only rocksDB failed: " << status.ToString() << std::endl; return nullptr; } - return db; + return std::unique_ptr(db); } fs::path getSstFileName(const std::string& path, size_t index) @@ -708,156 +712,285 @@ fs::path getSstFileName(const std::string& path, size_t index) return {ss.str()}; } -bcos::Error::Ptr Initializer::generateSnapshot( - const std::string& snapshotPath, bool withTxAndReceipts) +bcos::Error::Ptr checkOrCreateDir(const fs::path& dir) { - if (!boost::iequals(m_nodeConfig->storageType(), "RocksDB")) - { // TODO: support TiKV - std::cerr << "only support RocksDB storage" << std::endl; - return BCOS_ERROR_PTR(-1, "only support RocksDB storage"); - } - auto rockDBPath = m_nodeConfig->storagePath(); - return generateSnapshotFromRocksDB(rockDBPath, snapshotPath, withTxAndReceipts); -} - -bcos::Error::Ptr Initializer::generateSnapshotFromRocksDB(const std::string& rockDBPath, - const std::string& snapshotPath, bool withTxAndReceipts, size_t snapshotFileSize) -{ // FIXME: if enableSeparateBlockAndState, generate state and block into different directories - using namespace rocksdb; - - if (!fs::exists(rockDBPath)) - { - std::cerr << "rocksDB path " << rockDBPath << " does not exist" << std::endl; - return BCOS_ERROR_PTR(-1, rockDBPath + " does not exist"); - } - - DB* db = createReadOnlyRocksDB(rockDBPath); - if (db == nullptr) - { - std::cerr << "open readonly rocksDB failed" << std::endl; - return BCOS_ERROR_PTR(-1, "open readonly rocksDB failed"); - } - fs::path sstPath = snapshotPath + "/snapshot"; - if (!fs::exists(sstPath)) + if (!fs::exists(dir)) { // create directory - if (!fs::create_directories(sstPath)) + if (!fs::create_directories(dir)) { - std::cerr << "failed to create directory " << sstPath << std::endl; - return BCOS_ERROR_PTR(-1, "failed to create directory " + sstPath.string()); + std::cerr << "failed to create directory " << dir << std::endl; + return BCOS_ERROR_PTR(-1, "failed to create directory " + dir.string()); } } else { - if (!fs::is_directory(sstPath)) + if (!fs::is_directory(dir)) { - std::cerr << sstPath << " exists but not a directory" << std::endl; - return BCOS_ERROR_PTR(-1, sstPath.string() + " exists but is not a directory"); + std::cerr << dir << " exists but not a directory" << std::endl; + return BCOS_ERROR_PTR(-1, dir.string() + " exists but is not a directory"); } - if (!fs::is_empty(sstPath)) + if (!fs::is_empty(dir)) { // check sstPath is empty - std::cerr << "Path of sst file is not empty" << std::endl; - return BCOS_ERROR_PTR(-1, sstPath.string() + " is not empty"); + std::cerr << dir << " is not empty, please specific an empty directory" << std::endl; + return BCOS_ERROR_PTR(-1, dir.string() + " is not empty"); } } - rocksdb::Options options; - options.compression = rocksdb::kZSTD; - auto sstFileWriter = rocksdb::SstFileWriter(rocksdb::EnvOptions(), options); - size_t sstIndex = 0; - auto sstFileName = getSstFileName(sstPath.string(), sstIndex); - rocksdb::Status status = sstFileWriter.Open(sstFileName.string()); - if (!status.ok()) + return nullptr; +} + +bcos::Error::Ptr Initializer::generateSnapshot( + const std::string& snapshotPath, bool withTxAndReceipts) +{ + if (!boost::iequals(m_nodeConfig->storageType(), "RocksDB")) + { // TODO: support TiKV + std::cerr << "only support RocksDB storage" << std::endl; + return BCOS_ERROR_PTR(-1, "only support RocksDB storage"); + } + auto stateDBPath = m_nodeConfig->storagePath(); + auto blockDBPath = m_nodeConfig->blockDBPath(); + auto snapshotRoot = snapshotPath + "/snapshot"; + fs::path stateSstPath = snapshotRoot + "/state"; + fs::path blockSstPath = snapshotRoot + "/block"; + auto err = checkOrCreateDir(stateSstPath); + if (err) { - std::cerr << "open file " << sstFileName << " failed, reason: " << status.ToString() - << std::endl; - return BCOS_ERROR_PTR( - -1, "open file " + sstFileName.string() + " failed , reason: " + status.ToString()); + return err; } - ReadOptions readOptions; - readOptions.snapshot = db->GetSnapshot(); - std::unique_ptr it(db->NewIterator(readOptions)); - std::vector sstFiles; - std::vector sstFileList; + err = checkOrCreateDir(blockSstPath); + if (err) + { + return err; + } + // write info to meta, meta file is toml format + std::ofstream metaFile(snapshotRoot + "/meta"); + if (!metaFile.is_open()) + { + std::cerr << "Failed to open meta file" << std::endl; + return BCOS_ERROR_PTR(-1, "Failed to open meta file"); + } + metaFile << "snapshot.withTxAndReceipts = " << withTxAndReceipts << std::endl; + metaFile << "snapshot.separatedBlockAndState = " << m_nodeConfig->enableSeparateBlockAndState() + << std::endl; + auto blockLimit = (protocol::BlockNumber)m_nodeConfig->blockLimit(); bcos::protocol::BlockNumber currentBlockNumber = getCurrentBlockNumber(); + metaFile << "snapshot.blockNumber = " << currentBlockNumber << std::endl; std::cout << "current block number: " << currentBlockNumber << std::endl; auto nonceStartNumber = currentBlockNumber > blockLimit ? currentBlockNumber - blockLimit : 0; auto validNonceStartKey = bcos::storage::toDBKey( bcos::ledger::SYS_BLOCK_NUMBER_2_NONCES, std::to_string(blockLimit + 1)); auto validNonceEndKey = bcos::storage::toDBKey( bcos::ledger::SYS_BLOCK_NUMBER_2_NONCES, std::to_string(currentBlockNumber)); - for (it->SeekToFirst(); it->Valid(); it->Next()) + const size_t MAX_SST_FILE_BYTE = 256 * 1024 * 1024; + auto separatedBlockAndState = m_nodeConfig->enableSeparateBlockAndState(); + rocksdb::Options options; + options.compression = rocksdb::kZSTD; + auto stateSstFileWriter = rocksdb::SstFileWriter(rocksdb::EnvOptions(), options); + auto blockSstFileWriter = rocksdb::SstFileWriter(rocksdb::EnvOptions(), options); + size_t blockSstIndex = 0; + auto blockSstFileName = getSstFileName(blockSstPath.string(), blockSstIndex); + auto status = blockSstFileWriter.Open(blockSstFileName.string()); + if (!status.ok()) { - if (it->key().starts_with("s_block_number_2_nonces")) - { - if (it->key().compare(validNonceStartKey) < 0) - { - continue; - } - } - if (!withTxAndReceipts && - (it->key().starts_with("s_hash_2_receipt") || it->key().starts_with("s_hash_2_tx"))) - { - continue; - } - Status s = sstFileWriter.Put(it->key(), it->value()); - if (!s.ok()) - { - std::cerr << "Error while adding Key: " << it->key().ToString() - << ", Error: " << s.ToString() << std::endl; - return BCOS_ERROR_PTR( - -1, "Error while adding Key: " + it->key().ToString() + ", Error: " + s.ToString()); - } - const size_t MAX_SST_FILE_BYTE = snapshotFileSize * 1024 * 1024; + std::cerr << "open file " << blockSstFileName << " failed, reason: " << status.ToString() + << std::endl; + return BCOS_ERROR_PTR(-1, + "open file " + blockSstFileName.string() + " failed , reason: " + status.ToString()); + } + size_t stateSstIndex = 0; + std::vector sstFiles; + auto stateSstFileName = getSstFileName(stateSstPath.string(), stateSstIndex); + status = stateSstFileWriter.Open(stateSstFileName.string()); + if (!status.ok()) + { + std::cerr << "open file " << stateSstFileName << " failed, reason: " << status.ToString() + << std::endl; + return BCOS_ERROR_PTR(-1, + "open file " + stateSstFileName.string() + " failed , reason: " + status.ToString()); + } + + auto checkSstFileWriter = [&sstFiles](const fs::path& sstPath, + rocksdb::SstFileWriter& sstFileWriter, fs::path& sstFileName, + size_t& sstIndex) -> Error::Ptr { if (sstFileWriter.FileSize() >= MAX_SST_FILE_BYTE) { sstFiles.emplace_back(); - std::cout << sstFiles.size() << " Finishing file " << sstFileName << std::endl; - s = sstFileWriter.Finish(&sstFiles.back()); - if (!s.ok()) + std::cout << sstFileName << " Finished. count: " << sstFiles.size() << std::endl; + auto status = sstFileWriter.Finish(&sstFiles.back()); + if (!status.ok()) { - std::cout << "Error while finishing file " << sstFileName - << ", Error: " << s.ToString() << std::endl; - return BCOS_ERROR_PTR(-1, "Error while finishing file " + sstFileName.string() + - ", Error: " + s.ToString()); + std::cout << "Error while finish file " << sstFileName + << ", Error: " << status.ToString() << std::endl; + return BCOS_ERROR_PTR(-1, "Error while finish file " + sstFileName.string() + + ", Error: " + status.ToString()); } - sstFileList.emplace_back(sstFiles.back().file_path); + // sstFileList.emplace_back(sstFiles.back().file_path); ++sstIndex; sstFileName = getSstFileName(sstPath.string(), sstIndex); - s = sstFileWriter.Open(sstFileName.string()); - if (!s.ok()) + status = sstFileWriter.Open(sstFileName.string()); + if (!status.ok()) { std::cout << "Error while opening file " << sstFileName - << ", Error: " << s.ToString() << std::endl; + << ", Error: " << status.ToString() << std::endl; return BCOS_ERROR_PTR(-1, "Error while opening file " + sstFileName.string() + - ", Error: " + s.ToString()); + ", Error: " + status.ToString()); + } + } + return nullptr; + }; + auto checkAndFinishSStFileWriter = [&](rocksdb::SstFileWriter& sstFileWriter, + const fs::path& sstFileName, + size_t& sstIndex) -> Error::Ptr { + if (sstFileWriter.FileSize() > 0) + { + sstFiles.emplace_back(); + std::cout << sstFileName << " Finished. " << sstFiles.size() << std::endl; + auto status = sstFileWriter.Finish(&sstFiles.back()); + if (!status.ok()) + { + std::cout << "Error while finish file " << sstFileName + << ", Error: " << status.ToString() << std::endl; + return BCOS_ERROR_PTR(-1, "Error while finish file " + sstFileName.string() + + ", Error: " + status.ToString()); + } + // sstFileList.emplace_back(sstFiles.back().file_path); + } + else + { + --sstIndex; + } + return nullptr; + }; + + { // export state to sst + auto error = traverseRocksDB(stateDBPath, + [&](const rocksdb::Slice& key, const rocksdb::Slice& value) -> bcos::Error::Ptr { + // if return true, skip the key + if (key.starts_with("s_block_number_2_nonces")) + { + if (key.compare(validNonceStartKey) < 0) + { // useless nonce + return nullptr; + } + } + if (!separatedBlockAndState && + (key.starts_with("s_hash_2_receipt") || key.starts_with("s_hash_2_tx"))) + { // only separatedBlockAndState = false, stateDB has tx and receipt + if (!withTxAndReceipts) + { // if not withTxAndReceipts, skip tx and receipt in state + return nullptr; + } + // store tx and receipt in different sst file + rocksdb::Status status = blockSstFileWriter.Put(key, value); + if (!status.ok()) + { + std::cerr << "Error while adding Key: " << key.ToString() + << ", Error: " << status.ToString() << std::endl; + return BCOS_ERROR_PTR(-1, "Error while adding Key: " + key.ToString() + + ", Error: " + status.ToString()); + } + return checkSstFileWriter( + blockSstPath, blockSstFileWriter, blockSstFileName, blockSstIndex); + } + rocksdb::Status status = stateSstFileWriter.Put(key, value); + if (!status.ok()) + { + std::cerr << "Error while adding Key: " << key.ToString() + << ", Error: " << status.ToString() << std::endl; + return BCOS_ERROR_PTR(-1, "Error while adding Key: " + key.ToString() + + ", Error: " + status.ToString()); + } + return checkSstFileWriter( + stateSstPath, stateSstFileWriter, stateSstFileName, stateSstIndex); + }); + if (error) + { + metaFile.close(); + return error; + } + error = checkAndFinishSStFileWriter(stateSstFileWriter, stateSstFileName, stateSstIndex); + if (error) + { + metaFile.close(); + return error; + } + if (withTxAndReceipts && !separatedBlockAndState) + { + error = + checkAndFinishSStFileWriter(blockSstFileWriter, blockSstFileName, blockSstIndex); + if (error) + { + metaFile.close(); + return error; } } + // write index to meta file + metaFile << "snapshot.stateSstCount = " << stateSstIndex << std::endl; } - if (sstFileWriter.FileSize() > 0) + if (withTxAndReceipts && separatedBlockAndState) { - sstFiles.emplace_back(); - std::cout << sstFiles.size() << " Finishing file " << sstFileName << std::endl; - auto status = sstFileWriter.Finish(&sstFiles.back()); - if (!status.ok()) + // open blockDB + auto error = traverseRocksDB(blockDBPath, + [&](const rocksdb::Slice& key, const rocksdb::Slice& value) -> bcos::Error::Ptr { + if (key.starts_with("s_hash_2_receipt") || key.starts_with("s_hash_2_tx")) + { + rocksdb::Status status = blockSstFileWriter.Put(key, value); + if (!status.ok()) + { + std::cerr << "Error while adding Key: " << key.ToString() + << ", Error: " << status.ToString() << std::endl; + return BCOS_ERROR_PTR(-1, "Error while adding Key: " + key.ToString() + + ", Error: " + status.ToString()); + } + + return checkSstFileWriter( + blockSstPath, blockSstFileWriter, blockSstFileName, blockSstIndex); + } + return nullptr; + }); + error = checkAndFinishSStFileWriter(blockSstFileWriter, blockSstFileName, blockSstIndex); + if (error) { - std::cout << "Error while finishing file " << sstFileName - << ", Error: " << status.ToString() << std::endl; - return BCOS_ERROR_PTR(-1, "Error while finishing file " + sstFileName.string() + - ", Error: " + status.ToString()); + metaFile.close(); + return error; } - sstFileList.emplace_back(sstFiles.back().file_path); } - // write max index to meta - std::ofstream metaFile(sstPath.string() + "/meta"); - if (!metaFile.is_open()) + metaFile << "snapshot.blockSstCount = " << blockSstIndex << std::endl; + metaFile.close(); + std::cout << "generate snapshot success, the snapshot is in " << snapshotRoot << std::endl; + return nullptr; +} + +bcos::Error::Ptr bcos::initializer::traverseRocksDB(const std::string& rockDBPath, + const std::function& + processor) +{ + using namespace rocksdb; + if (!fs::exists(rockDBPath)) { - std::cerr << "Failed to open meta file" << std::endl; - return BCOS_ERROR_PTR(-1, "Failed to open meta file"); + std::cerr << "rocksDB path " << rockDBPath << " does not exist" << std::endl; + return BCOS_ERROR_PTR(-1, rockDBPath + " does not exist"); + } + auto db = createReadOnlyRocksDB(rockDBPath); + if (db == nullptr) + { + std::cerr << "open readonly rocksDB failed" << std::endl; + return BCOS_ERROR_PTR(-1, "open readonly rocksDB failed"); + } + std::cout << "Traverse RocksDB: " << rockDBPath << std::endl; + ReadOptions readOptions; + readOptions.snapshot = db->GetSnapshot(); + std::unique_ptr it(db->NewIterator(readOptions)); + for (it->SeekToFirst(); it->Valid(); it->Next()) + { + auto err = processor(it->key(), it->value()); + if (err) + { + db->ReleaseSnapshot(readOptions.snapshot); + return err; + } } - metaFile << sstIndex; - metaFile.close(); db->ReleaseSnapshot(readOptions.snapshot); - delete db; return nullptr; } @@ -871,55 +1004,31 @@ bcos::Error::Ptr Initializer::importSnapshot(const std::string& snapshotPath) return importSnapshotToRocksDB(snapshotPath, m_nodeConfig->storagePath()); } -bcos::Error::Ptr Initializer::importSnapshotToRocksDB( - const std::string& snapshotPath, const std::string& rockDBPath) -{ // FIXME: if enableSeparateBlockAndState, import state and block into different db - // check snapshot file and meta file - fs::path sstPath = snapshotPath; - if (!fs::exists(sstPath)) - { - std::cerr << "snapshot path " << sstPath << " does not exist" << std::endl; - return BCOS_ERROR_PTR(-1, sstPath.string() + " does not exist"); - } - // read meta file - std::ifstream metaFile(sstPath.string() + "/meta"); - if (!metaFile.is_open()) - { - std::cerr << "Failed to open meta file" << std::endl; - return BCOS_ERROR_PTR(-1, "Failed to open meta file"); - } - size_t sstIndex = 0; - metaFile >> sstIndex; - metaFile.close(); +bcos::Error::Ptr ingestIntoRocksDB(rocksdb::DB& rocksDB, const std::vector& sstFiles) +{ rocksdb::Options options; options.compression = rocksdb::kZSTD; auto sstFileReader = rocksdb::SstFileReader(options); - std::vector sstFiles; - - for (size_t i = 0; i <= sstIndex; ++i) - { - auto sstFileName = getSstFileName(sstPath.string(), i); - auto status = sstFileReader.Open(sstFileName.string()); + for (const auto& sstFileName : sstFiles) + { // check sst file + auto status = sstFileReader.Open(sstFileName); if (!status.ok()) { std::cerr << "open file " << sstFileName << " failed, reason: " << status.ToString() << std::endl; return BCOS_ERROR_PTR( - -1, "open file " + sstFileName.string() + " failed , reason: " + status.ToString()); + -1, "open file " + sstFileName + " failed , reason: " + status.ToString()); } status = sstFileReader.VerifyChecksum(); if (!status.ok()) { std::cerr << "verify file " << sstFileName << " failed, reason: " << status.ToString() << std::endl; - return BCOS_ERROR_PTR(-1, - "verify file " + sstFileName.string() + " failed , reason: " + status.ToString()); + return BCOS_ERROR_PTR( + -1, "verify file " + sstFileName + " failed , reason: " + status.ToString()); } - sstFiles.emplace_back(sstFileName.string()); } std::cout << "check sst files success, ingest sst files" << std::endl; - auto storage = std::dynamic_pointer_cast(m_storage); - auto& rocksDB = storage->rocksDB(); rocksdb::IngestExternalFileOptions info; info.move_files = true; @@ -930,7 +1039,142 @@ bcos::Error::Ptr Initializer::importSnapshotToRocksDB( std::cerr << "Error while adding file, " << status.ToString() << std::endl; return BCOS_ERROR_PTR(-1, "Error while adding file, " + status.ToString()); } + return nullptr; +} + +bcos::Error::Ptr Initializer::importSnapshotToRocksDB( + const std::string& snapshotPath, const std::string& rockDBPath) +{ + // check snapshot file and meta file + fs::path sstPath = snapshotPath + "/state"; + if (!fs::exists(sstPath)) + { + std::cerr << "snapshot path " << sstPath << " does not exist" << std::endl; + return BCOS_ERROR_PTR(-1, sstPath.string() + " does not exist"); + } + // read meta file + fs::path metaFilePath = snapshotPath + "/meta"; + if (!fs::exists(metaFilePath)) + { + std::cerr << "meta file " << metaFilePath << " does not exist" << std::endl; + return BCOS_ERROR_PTR(-1, metaFilePath.string() + " does not exist"); + } + auto tomlTable = toml::parse_file(metaFilePath.string()); + std::cout << tomlTable << std::endl; + auto snapshotWithTxAndReceipts = tomlTable["snapshot"]["withTxAndReceipts"].value(); + auto snapshotBlockNumber = tomlTable["snapshot"]["blockNumber"].value(); + auto stateSstCount = tomlTable["snapshot"]["stateSstCount"].value(); + auto blockSstCount = tomlTable["snapshot"]["blockSstCount"].value(); + + if (snapshotBlockNumber.has_value()) + { + std::cout << "The block number of snapshot: " << snapshotBlockNumber.value() << std::endl; + } + + // import state + size_t sstIndex = 0; + if (stateSstCount.has_value()) + { + sstIndex = stateSstCount.value(); + } + else + { + std::cerr << "stateSstCount is not set" << std::endl; + return BCOS_ERROR_PTR(-1, "stateSstCount is not set"); + } + std::vector sstFiles; + for (size_t i = 0; i <= sstIndex; ++i) + { + auto sstFileName = getSstFileName(sstPath.string(), i); + if (!fs::exists(sstFileName)) + { + std::cerr << "sst file " << sstFileName << " does not exist" << std::endl; + return BCOS_ERROR_PTR(-1, sstFileName.string() + " does not exist"); + } + sstFiles.emplace_back(sstFileName.string()); + } + + auto stateStorage = std::dynamic_pointer_cast(m_storage); + if (stateStorage) + { + auto& rocksDB = stateStorage->rocksDB(); + ingestIntoRocksDB(rocksDB, sstFiles); + } + else + { // TODO: support import into TiKV + std::cerr << "storage is not RocksDBStorage" << std::endl; + return BCOS_ERROR_PTR(-1, "storage is not RocksDBStorage"); + } + auto currentBlockNumber = getCurrentBlockNumber(); + std::cout << "The block number of this node: " << currentBlockNumber << std::endl; + // import tx and receipt + if (snapshotWithTxAndReceipts.has_value() && snapshotWithTxAndReceipts.value()) + { // snapshot has tx and receipt + if (blockSstCount.has_value()) + { + sstIndex = blockSstCount.value(); + } + else + { + std::cerr << "blockSstCount is not set" << std::endl; + return BCOS_ERROR_PTR(-1, "blockSstCount is not set"); + } + fs::path blockSstPath = snapshotPath + "/block"; + if (!fs::exists(blockSstPath)) + { + std::cerr << "snapshot path " << blockSstPath << " does not exist" << std::endl; + return BCOS_ERROR_PTR(-1, blockSstPath.string() + " does not exist"); + } + std::vector blockSstFiles; + for (size_t i = 0; i <= sstIndex; ++i) + { + auto sstFileName = getSstFileName(blockSstPath.string(), i); + if (!fs::exists(sstFileName)) + { + std::cerr << "sst file " << sstFileName << " does not exist" << std::endl; + return BCOS_ERROR_PTR(-1, sstFileName.string() + " does not exist"); + } + blockSstFiles.emplace_back(sstFileName.string()); + } + if (m_nodeConfig->enableSeparateBlockAndState()) + { + auto blockStorage = std::dynamic_pointer_cast(m_blockStorage); + if (blockStorage) + { + auto& rocksDB = blockStorage->rocksDB(); + ingestIntoRocksDB(rocksDB, blockSstFiles); + } + else + { + std::cerr << "blockStorage is not RocksDBStorage" << std::endl; + return BCOS_ERROR_PTR(-1, "blockStorage is not RocksDBStorage"); + } + } + else + { // import block into state db + auto& rocksDB = stateStorage->rocksDB(); + ingestIntoRocksDB(rocksDB, blockSstFiles); + } + return nullptr; + } - std::cout << "current block number: " << getCurrentBlockNumber() << std::endl; + { // snapshot without tx and receipt + storage::Entry archivedNumber; + // the archived number is the first block has full tx and receipt + archivedNumber.importFields({std::to_string(currentBlockNumber + 1)}); + std::promise setPromise; + m_storage->asyncSetRow(ledger::SYS_CURRENT_STATE, ledger::SYS_KEY_ARCHIVED_NUMBER, + archivedNumber, [&](Error::UniquePtr err) { setPromise.set_value(std::move(err)); }); + auto setError = setPromise.get_future().get(); + if (setError) + { + std::cerr << "set archived number failed: " << setError->errorMessage() << std::endl; + return BCOS_ERROR_PTR(-1, "set archived number failed: " + setError->errorMessage()); + } + std::cout << "The snapshot doesn't contain transactions and receipts, if you want the new " + "node to sync historic blocks, please set storage.sync_archived_blocks = true " + "in the configuration file." + << std::endl; + } return nullptr; } diff --git a/libinitializer/Initializer.h b/libinitializer/Initializer.h index 0f74a9bd43..e57b70752e 100644 --- a/libinitializer/Initializer.h +++ b/libinitializer/Initializer.h @@ -33,6 +33,11 @@ #include "LightNodeInitializer.h" #endif +namespace rocksdb +{ +class Slice; +} + namespace bcos { namespace gateway @@ -86,8 +91,6 @@ class Initializer void initSysContract(); bcos::storage::TransactionalStorageInterface::Ptr storage() { return m_storage; } bcos::Error::Ptr generateSnapshot(const std::string& snapshotPath, bool withTxAndReceipts); - bcos::Error::Ptr generateSnapshotFromRocksDB(const std::string& rockDBPath, - const std::string& snapshotPath, bool withTxAndReceipts, size_t snapshotFileSize = 256); bcos::Error::Ptr importSnapshot(const std::string& snapshotPath); bcos::Error::Ptr importSnapshotToRocksDB( const std::string& snapshotPath, const std::string& rockDBPath); @@ -119,5 +122,9 @@ class Initializer protocol::BlockNumber getCurrentBlockNumber(); }; + +bcos::Error::Ptr traverseRocksDB(const std::string& rockDBPath, + const std::function& + processor); } // namespace initializer } // namespace bcos diff --git a/tools/archive-tool/ArchiveService.h b/tools/archive-tool/ArchiveService.h index 8c8826197b..c1dea13fca 100644 --- a/tools/archive-tool/ArchiveService.h +++ b/tools/archive-tool/ArchiveService.h @@ -211,7 +211,7 @@ class ArchiveService : public std::enable_shared_from_this } Error::Ptr deleteArchivedData(int64_t startBlock, int64_t endBlock) - { + { // delete blocks in [startBlock, endBlock) auto blockFlag = bcos::ledger::HEADER; for (int64_t blockNumber = startBlock; blockNumber < endBlock; blockNumber++) { diff --git a/tools/archive-tool/archiveTool.cpp b/tools/archive-tool/archiveTool.cpp index 01a353881f..3c54274d64 100644 --- a/tools/archive-tool/archiveTool.cpp +++ b/tools/archive-tool/archiveTool.cpp @@ -180,6 +180,7 @@ createBackendStorage(std::shared_ptr nodeConfig, const s nodeConfig->blockDBPath(), option, nodeConfig->enableStatistics()); blockStorage = StorageInitializer::build(std::move(blockDB), dataEncryption); } + blockStorage = storage; } else { @@ -193,6 +194,7 @@ createBackendStorage(std::shared_ptr nodeConfig, const s blockStorage = std::make_shared( std::unique_ptr(blockRocksDB), dataEncryption); } + blockStorage = storage; } } else if (boost::iequals(nodeConfig->storageType(), "TiKV")) @@ -531,7 +533,7 @@ void reimportBlocks(auto archiveStorage, TransactionalStorageInterface::Ptr loca h256s topics; for (const auto& k : logEntryJson["topics"]) { - topics.push_back(h256(k.asString())); + topics.emplace_back(k.asString()); } auto address = logEntryJson["address"].asString(); auto addr = bytes(address.data(), address.data() + address.size());