diff --git a/CMakeLists.txt b/CMakeLists.txt index 0123485cdf..614d6068dd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -45,7 +45,7 @@ set(CPACK_PACKAGE_VERSION_PATCH "0") if(CI_VERSION_PRE_RELEASE) set(CPACK_PACKAGE_VERSION_PRE_RELEASE "${CI_VERSION_PRE_RELEASE}") else() - set(CPACK_PACKAGE_VERSION_PRE_RELEASE "99") + set(CPACK_PACKAGE_VERSION_PRE_RELEASE "100") endif() if(CI_TAG) diff --git a/nano/lib/lmdbconfig.hpp b/nano/lib/lmdbconfig.hpp index b48f635bd5..30b5515ea3 100644 --- a/nano/lib/lmdbconfig.hpp +++ b/nano/lib/lmdbconfig.hpp @@ -45,6 +45,6 @@ class lmdb_config final /** Sync strategy for the ledger database */ sync_strategy sync{ always }; uint32_t max_databases{ 128 }; - size_t map_size{ 256ULL * 1024 * 1024 * 1024 }; + size_t map_size{ 512ULL * 1024 * 1024 * 1024 }; }; } diff --git a/nano/lib/numbers.hpp b/nano/lib/numbers.hpp index 3021524c4c..4b340033c4 100644 --- a/nano/lib/numbers.hpp +++ b/nano/lib/numbers.hpp @@ -255,6 +255,21 @@ class qualified_root : public uint512_union } }; +class vote_storage_key : public uint512_union +{ +public: + using uint512_union::uint512_union; + + nano::block_hash const & block_hash () const + { + return reinterpret_cast (uint256s[0]); + } + nano::account const & account () const + { + return reinterpret_cast (uint256s[1]); + } +}; + nano::signature sign_message (nano::raw_key const &, nano::public_key const &, nano::uint256_union const &); nano::signature sign_message (nano::raw_key const &, nano::public_key const &, uint8_t const *, size_t); bool validate_message (nano::public_key const &, nano::uint256_union const &, nano::signature const &); diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 63c90bddc5..80937b315d 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -53,6 +53,11 @@ enum class type : uint8_t bootstrap_ascending, bootstrap_ascending_accounts, + vote_storage, + vote_storage_broadcast, + vote_storage_replies, + vote_storage_write, + _last // Must be the last enum }; @@ -74,6 +79,7 @@ enum class detail : uint8_t none, success, unknown, + empty, // processing queue queue, @@ -328,6 +334,26 @@ enum class detail : uint8_t deprioritize, deprioritize_failed, + // vote storage + stored, + stored_votes, + frontier, + frontier_empty, + reply_vote, + reply_duplicate, + broadcast_duplicate, + broadcast_vote_rep, + broadcast_vote_random, + broadcast_rep, + broadcast_random, + low_weight, + reply_channel_full, + broadcast_channel_full, + rep_broadcast_channel_full, + random_broadcast_channel_full, + write_error_broadcast, + write_error_reply, + _last // Must be the last enum }; diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 0824d23733..75b95201d1 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -100,6 +100,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::scheduler_priority: thread_role_name_string = "Sched Priority"; break; + case nano::thread_role::name::vote_storage: + thread_role_name_string = "Vote storage"; + break; default: debug_assert (false && "nano::thread_role::get_string unhandled thread role"); } diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 311ae58d1b..88313cfbc0 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -42,6 +42,7 @@ enum class name scheduler_manual, scheduler_optimistic, scheduler_priority, + vote_storage, }; /* diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 4452df575c..a1c7f8fb61 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -166,6 +166,8 @@ add_library( vote_cache.cpp vote_processor.hpp vote_processor.cpp + vote_storage.hpp + vote_storage.cpp voting.hpp voting.cpp wallet.hpp diff --git a/nano/node/bandwidth_limiter.cpp b/nano/node/bandwidth_limiter.cpp index 344cb6a129..cc3aacb05f 100644 --- a/nano/node/bandwidth_limiter.cpp +++ b/nano/node/bandwidth_limiter.cpp @@ -27,7 +27,8 @@ void nano::bandwidth_limiter::reset (std::size_t limit_a, double burst_ratio_a) nano::outbound_bandwidth_limiter::outbound_bandwidth_limiter (nano::outbound_bandwidth_limiter::config config_a) : config_m{ config_a }, limiter_standard (config_m.standard_limit, config_m.standard_burst_ratio), - limiter_bootstrap{ config_m.bootstrap_limit, config_m.bootstrap_burst_ratio } + limiter_bootstrap{ config_m.bootstrap_limit, config_m.bootstrap_burst_ratio }, + limiter_vote_storage{ config_m.vote_storage_limit, config_m.vote_storage_burst_ratio } { } @@ -39,6 +40,8 @@ nano::bandwidth_limiter & nano::outbound_bandwidth_limiter::select_limiter (nano return limiter_bootstrap; case bandwidth_limit_type::standard: break; + case bandwidth_limit_type::vote_storage: + return limiter_vote_storage; default: debug_assert (false); break; @@ -68,6 +71,9 @@ nano::bandwidth_limit_type nano::to_bandwidth_limit_type (const nano::transport: case nano::transport::traffic_type::bootstrap: return nano::bandwidth_limit_type::bootstrap; break; + case nano::transport::traffic_type::vote_storage: + return nano::bandwidth_limit_type::vote_storage; + break; } debug_assert (false); return {}; diff --git a/nano/node/bandwidth_limiter.hpp b/nano/node/bandwidth_limiter.hpp index 5cb13f0eba..5994f2f631 100644 --- a/nano/node/bandwidth_limiter.hpp +++ b/nano/node/bandwidth_limiter.hpp @@ -13,7 +13,9 @@ enum class bandwidth_limit_type /** For all message */ standard, /** For bootstrap (asc_pull_ack, asc_pull_req) traffic */ - bootstrap + bootstrap, + /** For vote storage broadcasts and replies */ + vote_storage, }; nano::bandwidth_limit_type to_bandwidth_limit_type (nano::transport::traffic_type const &); @@ -42,9 +44,14 @@ class outbound_bandwidth_limiter final // standard std::size_t standard_limit; double standard_burst_ratio; + // bootstrap std::size_t bootstrap_limit; double bootstrap_burst_ratio; + + // vote storage + std::size_t vote_storage_limit; + double vote_storage_burst_ratio; }; public: @@ -72,5 +79,6 @@ class outbound_bandwidth_limiter final private: bandwidth_limiter limiter_standard; bandwidth_limiter limiter_bootstrap; + bandwidth_limiter limiter_vote_storage; }; } \ No newline at end of file diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index c84a257b14..49a0148e3f 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -3719,6 +3719,223 @@ void nano::json_handler::republish () response_errors (); } +void nano::json_handler::republish_dependencies () +{ + auto value_or_throw = [] (auto && opt, auto ec) { + if (opt.has_value ()) + { + return opt.value (); + } + throw std::system_error (make_error_code (ec)); + }; + + auto get = [&, this] (std::string name) -> std::optional { + auto value = request.get_optional (name); + if (value.is_initialized ()) + { + return value.get (); + } + return std::nullopt; + }; + + auto parse_count = [this] (std::string text) -> uint64_t { + uint64_t result; + if (decode_unsigned (text, result)) + { + throw std::system_error (make_error_code (nano::error_common::invalid_count)); + } + return result; + }; + + auto parse_account = [this] (std::string text) -> nano::account { + nano::account account; + if (account.decode_account (text)) + { + throw std::system_error (make_error_code (nano::error_common::bad_account_number)); + } + return account; + }; + + try + { + auto depth = parse_count (get ("depth").value_or ("6")); + auto count = parse_count (get ("count").value_or ("1")); + auto account = parse_account (value_or_throw (get ("account"), nano::error_common::missing_account)); + + bool broadcast_votes = true; + + auto [num_blocks, num_votes] = republish_dependencies_impl (account, depth, count, broadcast_votes); + + response_l.put ("blocks", num_blocks); + response_l.put ("votes", num_votes); + } + catch (std::system_error const & ex) + { + ec = ex.code (); + } + response_errors (); +} + +std::tuple nano::json_handler::republish_dependencies_impl (nano::account account, size_t depth, size_t count, bool broadcast_votes) +{ + node.logger.info (nano::log::type::rpc, "Republishing blocks for account: {} with depth: {}, count: {}", account.to_account (), depth, count); + + auto transaction = node.store.tx_begin_read (); + + auto const maybe_account_info = node.ledger.account_info (transaction, account); + if (!maybe_account_info) + { + throw std::system_error (make_error_code (nano::error_common::account_not_found)); + } + auto const account_info = maybe_account_info.value (); + + node.logger.debug (nano::log::type::rpc, "Account info: {}", account_info); + + std::set visited; + auto dfs_traversal = [&visited] (size_t max_depth, auto start, auto && visitor) { + struct entry + { + decltype (start) node; + size_t depth; + }; + + std::stack stack; + stack.push (entry{ start, 0 }); + + while (!stack.empty ()) + { + auto current = stack.top (); + stack.pop (); + + auto deps = visitor (current.node); + + if (current.depth < max_depth) + { + for (auto & dep : deps) + { + if (visited.insert (dep->hash ()).second) + { + stack.push (entry{ dep, current.depth + 1 }); + } + } + } + } + }; + + auto block_dependencies = [this] (nano::store::transaction & transaction, std::shared_ptr block) { + auto deps_hashes = node.ledger.dependent_blocks (transaction, *block); + std::vector> deps; + for (auto & hash : deps_hashes) + { + if (!hash.is_zero ()) + { + auto block = node.ledger.block (transaction, hash); + release_assert (block); + deps.push_back (std::move (block)); + } + } + return deps; + }; + + auto flood_messages = [this] (auto messages) { + release_assert (!messages.empty ()); + + auto wait_channels = [] (auto const & channels) { + auto busy = [] (auto const & channels) { + return std::any_of (channels.begin (), channels.end (), [] (auto const & channel) { + return channel->max (nano::transport::traffic_type::vote_storage); + }); + }; + + int attempts = 0; + while (busy (channels) && attempts++ < 10) + { + std::this_thread::sleep_for (std::chrono::milliseconds (100)); + } + }; + + auto peers = node.network.list (); + + node.logger.debug (nano::log::type::rpc, "Flooding {} {} messages to {} peers", + messages.size (), + to_string (messages[0].type ()), + peers.size ()); + + wait_channels (peers); + + for (auto & channel : peers) + { + for (auto & message : messages) + { + channel->send ( + message, [node_s = this->node.shared (), channel] (auto & ec, auto size) { + if (ec) + { + node_s->logger.debug (nano::log::type::rpc, "Error sending to: {} ({})", channel->to_string (), ec.message ()); + } + }, + nano::transport::buffer_drop_policy::no_socket_drop, nano::transport::traffic_type::vote_storage); + } + } + }; + + int num_blocks = 0; + int num_votes = 0; + + auto block_visitor = [&, this] (auto block) { + num_blocks++; + node.logger.debug (nano::log::type::rpc, "Republishing block: {}", block->hash ().to_string ()); + + nano::publish pub{ node.network_params.network, block }; + flood_messages (std::vector{ pub }); + + if (broadcast_votes) + { + auto votes = node.vote_storage.query_hash (block->hash ()); + if (!votes.empty ()) + { + ++num_votes; + node.logger.debug (nano::log::type::rpc, "Republishing {} votes for block: {}", votes.size (), block->hash ().to_string ()); + + std::vector vote_messages; + for (auto & vote : votes) + { + nano::confirm_ack ack{ node.network_params.network, vote }; + vote_messages.push_back (ack); + } + + flood_messages (vote_messages); + } + else + { + node.logger.debug (nano::log::type::rpc, "No votes found for block: {}", block->hash ().to_string ()); + } + } + + return block_dependencies (transaction, block); + }; + + auto current_block = node.ledger.head_block (transaction, account); + debug_assert (current_block); + + std::deque> blocks; + for (int n = 0; n < count && current_block; ++n) + { + blocks.push_front (current_block); + current_block = node.ledger.block (transaction, current_block->previous ()); + } + + for (auto & block : blocks) + { + node.logger.info (nano::log::type::rpc, "Republishing for block: {} with depth: {}", block->hash ().to_string (), depth); + dfs_traversal (depth, block, block_visitor); + } + + node.logger.info (nano::log::type::rpc, "Republishing finished"); + + return { num_blocks, num_votes }; +} + void nano::json_handler::search_pending () { response_l.put ("deprecated", "1"); @@ -5395,6 +5612,7 @@ ipc_json_handler_no_arg_func_map create_ipc_json_handler_no_arg_func_map () no_arg_funcs.emplace ("representatives", &nano::json_handler::representatives); no_arg_funcs.emplace ("representatives_online", &nano::json_handler::representatives_online); no_arg_funcs.emplace ("republish", &nano::json_handler::republish); + no_arg_funcs.emplace ("republish_dependencies", &nano::json_handler::republish_dependencies); no_arg_funcs.emplace ("search_pending", &nano::json_handler::search_pending); no_arg_funcs.emplace ("search_receivable", &nano::json_handler::search_receivable); no_arg_funcs.emplace ("search_pending_all", &nano::json_handler::search_pending_all); diff --git a/nano/node/json_handler.hpp b/nano/node/json_handler.hpp index cb7da62c72..4be4515949 100644 --- a/nano/node/json_handler.hpp +++ b/nano/node/json_handler.hpp @@ -8,6 +8,7 @@ #include #include +#include #include namespace nano @@ -99,6 +100,9 @@ class json_handler : public std::enable_shared_from_this void representatives (); void representatives_online (); void republish (); + void republish_dependencies (); + /// @returns + std::tuple republish_dependencies_impl (nano::account account, size_t depth, size_t count, bool broadcast_votes); void search_pending (); void search_receivable (); void search_pending_all (); @@ -146,6 +150,7 @@ class json_handler : public std::enable_shared_from_this void work_peers_clear (); void work_set (); void work_validate (); + std::string body; nano::node & node; boost::property_tree::ptree request; diff --git a/nano/node/make_store.cpp b/nano/node/make_store.cpp index c3c8c28105..4eec778ffa 100644 --- a/nano/node/make_store.cpp +++ b/nano/node/make_store.cpp @@ -11,3 +11,18 @@ std::unique_ptr nano::make_store (nano::logger & logger, return std::make_unique (logger, add_db_postfix ? path / "data.ldb" : path, constants, txn_tracking_config_a, block_processor_batch_max_time_a, lmdb_config_a, backup_before_upgrade); } + +std::unique_ptr nano::make_vote_store (nano::logger & logger, std::filesystem::path const & path, nano::ledger_constants & constants, bool read_only, bool add_db_postfix, nano::rocksdb_config const & rocksdb_config, nano::txn_tracking_config const & txn_tracking_config_a, std::chrono::milliseconds block_processor_batch_max_time_a, nano::lmdb_config const & lmdb_config_a, bool backup_before_upgrade) +{ + if (rocksdb_config.enable) + { + release_assert (false, "rocksdb not supported"); + } + else + { + nano::lmdb_config lmdb_config = lmdb_config_a; + lmdb_config.sync = nano::lmdb_config::sync_strategy::nosync_unsafe; + + return std::make_unique (logger, add_db_postfix ? path / "votes.ldb" : path, constants, txn_tracking_config_a, block_processor_batch_max_time_a, lmdb_config, backup_before_upgrade); + } +} \ No newline at end of file diff --git a/nano/node/make_store.hpp b/nano/node/make_store.hpp index 5f5b5f9372..fe151eb8af 100644 --- a/nano/node/make_store.hpp +++ b/nano/node/make_store.hpp @@ -23,4 +23,6 @@ class component; namespace nano { std::unique_ptr make_store (nano::logger &, std::filesystem::path const & path, nano::ledger_constants & constants, bool open_read_only = false, bool add_db_postfix = true, nano::rocksdb_config const & rocksdb_config = nano::rocksdb_config{}, nano::txn_tracking_config const & txn_tracking_config_a = nano::txn_tracking_config{}, std::chrono::milliseconds block_processor_batch_max_time_a = std::chrono::milliseconds (5000), nano::lmdb_config const & lmdb_config_a = nano::lmdb_config{}, bool backup_before_upgrade = false); + +std::unique_ptr make_vote_store (nano::logger &, std::filesystem::path const & path, nano::ledger_constants & constants, bool open_read_only = false, bool add_db_postfix = true, nano::rocksdb_config const & rocksdb_config = nano::rocksdb_config{}, nano::txn_tracking_config const & txn_tracking_config_a = nano::txn_tracking_config{}, std::chrono::milliseconds block_processor_batch_max_time_a = std::chrono::milliseconds (5000), nano::lmdb_config const & lmdb_config_a = nano::lmdb_config{}, bool backup_before_upgrade = false); } diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 4d47533be6..4619adbd15 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -276,6 +276,15 @@ class network_message_visitor : public nano::message_visitor node.aggregator.add (channel, message_a.roots_hashes); } } + + // Vote storage + if (!message_a.roots_hashes.empty ()) + { + for (auto & [hash, root] : message_a.roots_hashes) + { + node.vote_storage.trigger (hash, channel); + } + } } void confirm_ack (nano::confirm_ack const & message_a) override diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 834cabbd9f..94d3bfaca4 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -153,6 +153,8 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons distributed_work (*this), store_impl (nano::make_store (logger, application_path_a, network_params.ledger, flags.read_only, true, config_a.rocksdb_config, config_a.diagnostics_config.txn_tracking, config_a.block_processor_batch_max_time, config_a.lmdb_config, config_a.backup_before_upgrade)), store (*store_impl), + vote_store_impl{ nano::make_vote_store (logger, application_path_a, network_params.ledger, flags.read_only, true, config_a.rocksdb_config, config_a.diagnostics_config.txn_tracking, config_a.block_processor_batch_max_time, config_a.lmdb_config, config_a.backup_before_upgrade) }, + vote_store{ *vote_store_impl }, unchecked{ config.max_unchecked_blocks, stats, flags.disable_block_processor_unchecked_deletion }, wallets_store_impl (std::make_unique (application_path_a / "wallets.ldb", config_a.lmdb_config)), wallets_store (*wallets_store_impl), @@ -176,6 +178,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons application_path (application_path_a), port_mapping (*this), rep_crawler (*this), + vote_storage{ *this, vote_store, network, ledger, stats }, vote_processor (active, observers, stats, config, flags, logger, online_reps, rep_crawler, ledger, network_params), warmed_up (0), block_processor (*this, write_database_queue), @@ -218,6 +221,13 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons scheduler.optimistic.activate (account, account_info, conf_info); }); + observers.vote.add ([this] (std::shared_ptr vote, std::shared_ptr const & channel, nano::vote_code code) { + if (code != nano::vote_code::invalid) + { + vote_storage.vote (vote); + } + }); + if (!init_error ()) { // Notify election schedulers when AEC frees election slot @@ -551,6 +561,7 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (collect_container_info (node.final_generator, "vote_generator_final")); composite->add_component (node.ascendboot.collect_container_info ("bootstrap_ascending")); composite->add_component (node.unchecked.collect_container_info ("unchecked")); + composite->add_component (node.vote_storage.collect_container_info ("vote_storage")); return composite; } @@ -658,6 +669,7 @@ void nano::node::start () } websocket.start (); telemetry.start (); + vote_storage.start (); } void nano::node::stop () @@ -698,6 +710,7 @@ void nano::node::stop () stats.stop (); epoch_upgrader.stop (); workers.stop (); + vote_storage.stop (); // work pool is not stopped on purpose due to testing setup } diff --git a/nano/node/node.hpp b/nano/node/node.hpp index abd7b3651b..f3fa159693 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -148,6 +149,8 @@ class node final : public std::enable_shared_from_this nano::distributed_work_factory distributed_work; std::unique_ptr store_impl; nano::store::component & store; + std::unique_ptr vote_store_impl; + nano::store::component & vote_store; nano::unchecked_map unchecked; std::unique_ptr wallets_store_impl; nano::wallets_store & wallets_store; @@ -163,6 +166,7 @@ class node final : public std::enable_shared_from_this nano::port_mapping port_mapping; nano::online_reps online_reps; nano::rep_crawler rep_crawler; + nano::vote_storage vote_storage; nano::vote_processor vote_processor; unsigned warmed_up; nano::block_processor block_processor; diff --git a/nano/node/repcrawler.cpp b/nano/node/repcrawler.cpp index 8d7bb5ceee..f0ec38d026 100644 --- a/nano/node/repcrawler.cpp +++ b/nano/node/repcrawler.cpp @@ -115,6 +115,9 @@ void nano::rep_crawler::ongoing_crawl () { node.keepalive_preconfigured (node.config.preconfigured_peers); } + + sufficient_weight = false; + // Reduce crawl frequency when there's enough total peer weight unsigned next_run_ms = node.network_params.network.is_dev_network () ? 100 : sufficient_weight ? 7000 : 3000; @@ -149,6 +152,11 @@ std::vector> nano::rep_crawler::get_cr return result; } +namespace +{ +std::chrono::seconds const rep_query_timeout{ 30 }; +} + void nano::rep_crawler::query (std::vector> const & channels_a) { auto transaction (node.store.tx_begin_read ()); @@ -186,7 +194,7 @@ void nano::rep_crawler::query (std::vector node_w (node.shared ()); - node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [node_w, hash = hash_root->first] () { + node.workers.add_timed_task (std::chrono::steady_clock::now () + rep_query_timeout, [node_w, hash = hash_root->first] () { if (auto node_l = node_w.lock ()) { auto target_finished_processed (node_l->vote_processor.total_processed + node_l->vote_processor.size ()); @@ -211,7 +219,7 @@ void nano::rep_crawler::throttled_remove (nano::block_hash const & hash_a, uint6 else { std::weak_ptr node_w (node.shared ()); - node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [node_w, hash_a, target_finished_processed] () { + node.workers.add_timed_task (std::chrono::steady_clock::now () + rep_query_timeout, [node_w, hash_a, target_finished_processed] () { if (auto node_l = node_w.lock ()) { node_l->rep_crawler.throttled_remove (hash_a, target_finished_processed); diff --git a/nano/node/request_aggregator.cpp b/nano/node/request_aggregator.cpp index bb60519824..e1cb85855c 100644 --- a/nano/node/request_aggregator.cpp +++ b/nano/node/request_aggregator.cpp @@ -20,9 +20,12 @@ nano::request_aggregator::request_aggregator (nano::node_config const & config_a wallets (wallets_a), active (active_a), generator (generator_a), - final_generator (final_generator_a), - thread ([this] () { run (); }) + final_generator (final_generator_a) { + for (auto i = 0; i < 4; ++i) + { + threads.emplace_back ([this] () { run (); }); + } generator.set_reply_action ([this] (std::shared_ptr const & vote_a, std::shared_ptr const & channel_a) { this->reply_action (vote_a, channel_a); }); @@ -131,9 +134,12 @@ void nano::request_aggregator::stop () stopped = true; } condition.notify_all (); - if (thread.joinable ()) + for (auto & thread: threads) { - thread.join (); + if (thread.joinable ()) + { + thread.join (); + } } } diff --git a/nano/node/request_aggregator.hpp b/nano/node/request_aggregator.hpp index 657b7b56f3..448dfda220 100644 --- a/nano/node/request_aggregator.hpp +++ b/nano/node/request_aggregator.hpp @@ -13,6 +13,7 @@ #include #include #include +#include namespace mi = boost::multi_index; @@ -105,7 +106,7 @@ class request_aggregator final bool started{ false }; nano::condition_variable condition; nano::mutex mutex{ mutex_identifier (mutexes::request_aggregator) }; - std::thread thread; + std::vector threads; friend std::unique_ptr collect_container_info (request_aggregator &, std::string const &); }; diff --git a/nano/node/transport/socket.cpp b/nano/node/transport/socket.cpp index 4271905534..84010393dc 100644 --- a/nano/node/transport/socket.cpp +++ b/nano/node/transport/socket.cpp @@ -193,12 +193,14 @@ void nano::transport::socket::write_queued_messages () bool nano::transport::socket::max (nano::transport::traffic_type traffic_type) const { - return send_queue.size (traffic_type) >= max_queue_size; + // return send_queue.size (traffic_type) >= max_queue_size; + return send_queue.max (traffic_type); } bool nano::transport::socket::full (nano::transport::traffic_type traffic_type) const { - return send_queue.size (traffic_type) >= 2 * max_queue_size; + // return send_queue.size (traffic_type) >= 2 * max_queue_size; + return send_queue.full (traffic_type); } /** Call set_timeout with default_timeout as parameter */ @@ -373,7 +375,7 @@ nano::transport::socket::write_queue::write_queue (std::size_t max_size_a) : bool nano::transport::socket::write_queue::insert (const buffer_t & buffer, callback_t callback, nano::transport::traffic_type traffic_type) { nano::lock_guard guard{ mutex }; - if (queues[traffic_type].size () < 2 * max_size) + if (queues[traffic_type].size () < 2 * calculate_max_size (traffic_type)) { queues[traffic_type].push (entry{ buffer, callback }); return true; // Queued @@ -397,10 +399,29 @@ std::optional nano::transport::sock }; // TODO: This is a very basic prioritization, implement something more advanced and configurable - if (auto item = try_pop (nano::transport::traffic_type::generic)) + if (counter++ % 10 != 0) { - return item; + if (auto item = try_pop (nano::transport::traffic_type::vote_storage)) + { + return item; + } + if (auto item = try_pop (nano::transport::traffic_type::generic)) + { + return item; + } } + else + { + if (auto item = try_pop (nano::transport::traffic_type::generic)) + { + return item; + } + if (auto item = try_pop (nano::transport::traffic_type::vote_storage)) + { + return item; + } + } + if (auto item = try_pop (nano::transport::traffic_type::bootstrap)) { return item; @@ -433,6 +454,25 @@ bool nano::transport::socket::write_queue::empty () const }); } +bool nano::transport::socket::write_queue::max (nano::transport::traffic_type traffic_type) const +{ + return size (traffic_type) >= calculate_max_size (traffic_type); +} + +bool nano::transport::socket::write_queue::full (nano::transport::traffic_type traffic_type) const +{ + return size (traffic_type) >= 2 * calculate_max_size (traffic_type); +} + +std::size_t nano::transport::socket::write_queue::calculate_max_size (nano::transport::traffic_type traffic_type) const +{ + return traffic_type == nano::transport::traffic_type::vote_storage ? max_size * 8 : max_size; +} + +/* + * + */ + boost::asio::ip::network_v6 nano::transport::socket_functions::get_ipv6_subnet_address (boost::asio::ip::address_v6 const & ip_address, std::size_t network_prefix) { return boost::asio::ip::make_network_v6 (ip_address, static_cast (network_prefix)); diff --git a/nano/node/transport/socket.hpp b/nano/node/transport/socket.hpp index 91f7d008fc..08eb57593b 100644 --- a/nano/node/transport/socket.hpp +++ b/nano/node/transport/socket.hpp @@ -142,11 +142,16 @@ class socket final : public std::enable_shared_from_this> queues; + size_t counter{ 0 }; }; write_queue send_queue; diff --git a/nano/node/transport/traffic_type.hpp b/nano/node/transport/traffic_type.hpp index a4b89e1c7f..cc6874788a 100644 --- a/nano/node/transport/traffic_type.hpp +++ b/nano/node/transport/traffic_type.hpp @@ -9,6 +9,7 @@ enum class traffic_type { generic, /** For bootstrap (asc_pull_ack, asc_pull_req) traffic */ - bootstrap + bootstrap, + vote_storage, }; } \ No newline at end of file diff --git a/nano/node/vote_storage.cpp b/nano/node/vote_storage.cpp new file mode 100644 index 0000000000..0ac11d1f8d --- /dev/null +++ b/nano/node/vote_storage.cpp @@ -0,0 +1,649 @@ +#include +#include +#include + +nano::vote_storage::vote_storage (nano::node & node_a, nano::store::component & vote_store_a, nano::network & network_a, nano::ledger & ledger_a, nano::stats & stats_a) : + node{ node_a }, + vote_store{ vote_store_a }, + network{ network_a }, + ledger{ ledger_a }, + stats{ stats_a }, + store_queue{ stats, nano::stat::type::vote_storage_write, nano::thread_role::name::vote_storage, /* single threaded */ 1, 1024 * 16, 1024 }, + reply_queue{ stats, nano::stat::type::vote_storage_replies, nano::thread_role::name::vote_storage, /* threads */ 1, 1024 * 4, 512 } +{ + store_queue.process_batch = [this] (auto & batch) { + process_batch (batch); + }; + + reply_queue.process_batch = [this] (auto & batch) { + process_batch (batch); + }; +} + +nano::vote_storage::~vote_storage () +{ + // All threads should be stopped before destruction + debug_assert (!store_queue.joinable ()); + debug_assert (!reply_queue.joinable ()); + debug_assert (!thread.joinable ()); +} + +void nano::vote_storage::start () +{ + store_queue.start (); + reply_queue.start (); + + debug_assert (!thread.joinable ()); + + if (!enable_broadcast) + { + return; + } + + thread = std::thread ([this] { + nano::thread_role::set (nano::thread_role::name::vote_storage); + run (); + }); +} + +void nano::vote_storage::stop () +{ + store_queue.stop (); + reply_queue.stop (); + + { + nano::unique_lock lock{ mutex }; + stopped = true; + } + condition.notify_all (); + if (thread.joinable ()) + { + thread.join (); + } +} + +void nano::vote_storage::vote (std::shared_ptr vote) +{ + if (ignore_255_votes && vote->hashes.size () > 12) + { + return; + } + if (store_final_only && !vote->is_final ()) + { + return; + } + if (ledger.weight (vote->account) < rep_weight_threshold) + { + return; + } + store_queue.add (vote); +} + +void nano::vote_storage::trigger (const nano::block_hash & hash, const std::shared_ptr & channel) +{ + if (!enable_broadcast && !enable_replies) + { + return; + } + + bool const is_pr = node.rep_crawler.is_pr (*channel); + + if (enable_broadcast) + { + if (!trigger_pr_only || is_pr) + { + std::lock_guard guard{ mutex }; + + auto [it, _] = requests.emplace (request_entry{ hash, 0 }); + requests.modify (it, [] (auto & entry) { + ++entry.count; + }); + + while (requests.size () > max_requests) + { + requests.get ().pop_front (); + } + } + } + + if (enable_replies) + { + if (!is_pr) + { + if (!channel->max (nano::transport::traffic_type::vote_storage)) + { + if (!recently_broadcasted.check (hash, channel)) + { + reply_queue.add (reply_entry_t{ hash, channel }); + } + } + } + } +} + +void nano::vote_storage::run () +{ + std::unique_lock lock{ mutex }; + while (!stopped) + { + condition.wait_for (lock, std::chrono::seconds{ 1 }, [this] { + return stopped || !requests.empty (); + }); + + if (stopped) + { + return; + } + + stats.inc (nano::stat::type::vote_storage, nano::stat::detail::loop); + + cleanup (); + + if (!requests.empty ()) + { + auto requests_l = requests; // Copy + + lock.unlock (); + + auto broadcasted = run_broadcasts (requests_l); + + lock.lock (); + + for (auto & hash : broadcasted) + { + requests.erase (hash); + } + } + } +} + +void nano::vote_storage::cleanup () +{ + debug_assert (!mutex.try_lock ()); + + erase_if (requests, [this] (auto const & entry) { + return nano::elapsed (entry.time, request_age_cutoff); + }); + + erase_if (requests, [this] (auto const & entry) { + return recently_broadcasted.check (entry.hash); + }); +} + +std::unordered_set nano::vote_storage::run_broadcasts (ordered_requests requests_l) +{ + debug_assert (!requests_l.empty ()); + + size_t constexpr max_broadcasts = 16; + + auto vote_transaction = vote_store.tx_begin_read (); + + std::unordered_set broadcasted; + + for (auto & entry : requests_l.get ()) + { + // std::cout << entry.count << std::endl; + + auto votes = query_hash (vote_transaction, entry.hash); + if (!votes.empty ()) + { + bool recent = recently_broadcasted.check_and_insert (entry.hash); + debug_assert (!recent); // Should be filtered out earlier + + stats.inc (nano::stat::type::vote_storage, nano::stat::detail::process); + + wait_peers (); + broadcast (votes, entry.hash); + + broadcasted.insert (entry.hash); + } + else + { + stats.inc (nano::stat::type::vote_storage, nano::stat::detail::empty); + } + + if (broadcasted.size () >= max_broadcasts) + { + break; + } + } + + return broadcasted; +} + +void nano::vote_storage::wait_peers () +{ + auto reps = node.rep_crawler.principal_representatives (); + + auto busy_count = [&reps] () { + return std::count_if (reps.begin (), reps.end (), [] (auto const & rep) { + return rep.channel->max (nano::transport::traffic_type::vote_storage); + }); + }; + + while (busy_count () > reps.size () * max_busy_ratio) + { + if (stopped) + { + return; + } + + std::this_thread::sleep_for (std::chrono::milliseconds{ 100 }); + } +} + +void nano::vote_storage::process_batch (decltype (store_queue)::batch_t & batch) +{ + auto vote_transaction = vote_store.tx_begin_write ({ tables::vote_storage }); + + for (auto & vote : batch) + { + auto result = vote_store.vote_storage.put (vote_transaction, vote); + if (result > 0) + { + stats.inc (nano::stat::type::vote_storage_write, nano::stat::detail::stored); + stats.add (nano::stat::type::vote_storage_write, nano::stat::detail::stored_votes, nano::stat::dir::in, result); + } + } +} + +void nano::vote_storage::process_batch (decltype (reply_queue)::batch_t & batch) +{ + auto vote_transaction = vote_store.tx_begin_read (); + // auto ledger_transaction = ledger.store.tx_begin_read (); + + for (auto & [hash, channel] : batch) + { + // Check votes for specific hash + { + auto votes = query_hash (vote_transaction, hash, /* final only */ true); + if (!votes.empty ()) + { + stats.inc (nano::stat::type::vote_storage, nano::stat::detail::reply); + + reply (votes, hash, channel); + + // if (enable_broadcast) + // { + // broadcast (votes, hash); + // } + } + else + { + stats.inc (nano::stat::type::vote_storage, nano::stat::detail::empty); + } + } + + // Check votes for frontier + // if (enable_query_frontier) + // { + // auto [frontier_votes, frontier_hash] = query_frontier (ledger_transaction, vote_transaction, hash); + // if (!frontier_votes.empty ()) + // { + // stats.inc (nano::stat::type::vote_storage, nano::stat::detail::frontier); + // + // reply (frontier_votes, channel); + // + // if (enable_broadcast) + // { + // broadcast (frontier_votes, frontier_hash); + // } + // } + // else + // { + // stats.inc (nano::stat::type::vote_storage, nano::stat::detail::frontier_empty); + // } + // } + } +} + +void nano::vote_storage::reply (const nano::vote_storage::vote_list_t & votes, const nano::block_hash & hash, const std::shared_ptr & channel) +{ + // if (channel->max (nano::transport::traffic_type::vote_storage)) // TODO: Scrutinize this + // { + // stats.inc (nano::stat::type::vote_storage, nano::stat::detail::reply_channel_full, nano::stat::dir::out); + // return; + // } + + if (recently_broadcasted.check_and_insert (hash, channel)) + { + stats.inc (nano::stat::type::vote_storage, nano::stat::detail::reply_duplicate, nano::stat::dir::out); + return; + } + + stats.inc (nano::stat::type::vote_storage, nano::stat::detail::reply, nano::stat::dir::out); + stats.add (nano::stat::type::vote_storage, nano::stat::detail::reply_vote, nano::stat::dir::out, votes.size ()); + + for (auto & vote : votes) + { + nano::confirm_ack message{ node.network_params.network, vote }; + + channel->send ( + message, [this] (auto & ec, auto size) { + if (ec) + { + stats.inc (nano::stat::type::vote_storage, nano::stat::detail::write_error_reply, nano::stat::dir::out); + } + }, + nano::transport::buffer_drop_policy::no_socket_drop, nano::transport::traffic_type::vote_storage); + } +} + +void nano::vote_storage::broadcast (const nano::vote_storage::vote_list_t & votes, const nano::block_hash & hash) +{ + stats.inc (nano::stat::type::vote_storage, nano::stat::detail::broadcast, nano::stat::dir::out); + + if (enable_pr_broadcast) + { + auto pr_nodes = node.rep_crawler.principal_representatives (); + for (auto const & rep : pr_nodes) + { + // if (rep.channel->max (nano::transport::traffic_type::vote_storage)) // TODO: Scrutinize this + // { + // stats.inc (nano::stat::type::vote_storage, nano::stat::detail::broadcast_channel_full, nano::stat::dir::out); + // continue; + // } + + // if (recently_broadcasted (rep.channel, hash)) + // { + // stats.inc (nano::stat::type::vote_storage, nano::stat::detail::broadcast_duplicate, nano::stat::dir::out); + // continue; + // } + + stats.inc (nano::stat::type::vote_storage, nano::stat::detail::broadcast_rep, nano::stat::dir::out); + stats.add (nano::stat::type::vote_storage, nano::stat::detail::broadcast_vote, nano::stat::dir::out, votes.size ()); + + for (auto & vote : votes) + { + nano::confirm_ack message{ node.network_params.network, vote }; + + rep.channel->send ( + message, [this] (auto & ec, auto size) { + if (ec) + { + stats.inc (nano::stat::type::vote_storage, nano::stat::detail::write_error_broadcast, nano::stat::dir::out); + } + }, + nano::transport::buffer_drop_policy::no_socket_drop, nano::transport::traffic_type::vote_storage); + } + } + } + + // if (enable_random_broadcast) + // { + // auto random_nodes = network.list (network.fanout ()); + // for (auto const & channel : random_nodes) + // { + // if (channel->max (nano::transport::traffic_type::vote_storage)) // TODO: Scrutinize this + // { + // stats.inc (nano::stat::type::vote_storage, nano::stat::detail::broadcast_channel_full, nano::stat::dir::in); + // continue; + // } + // + // if (recently_broadcasted (channel, hash)) + // { + // stats.inc (nano::stat::type::vote_storage, nano::stat::detail::broadcast_duplicate, nano::stat::dir::in); + // continue; + // } + // + // stats.inc (nano::stat::type::vote_storage, nano::stat::detail::broadcast_random, nano::stat::dir::in); + // stats.add (nano::stat::type::vote_storage, nano::stat::detail::broadcast_vote, nano::stat::dir::in, votes.size ()); + // + // for (auto & vote : votes) + // { + // nano::confirm_ack message{ node.network_params.network, vote }; + // + // channel->send ( + // message, [this] (auto & ec, auto size) { + // if (ec) + // { + // stats.inc (nano::stat::type::vote_storage, nano::stat::detail::write_error_broadcast, nano::stat::dir::in); + // } + // }, + // nano::transport::buffer_drop_policy::no_socket_drop, nano::transport::traffic_type::vote_storage); + // } + // } + // } +} + +nano::uint128_t nano::vote_storage::weight (const nano::vote_storage::vote_list_t & votes) const +{ + nano::uint128_t result = 0; + for (auto const & vote : votes) + { + result += ledger.weight (vote->account); + } + return result; +} + +nano::uint128_t nano::vote_storage::weight_final (const nano::vote_storage::vote_list_t & votes) const +{ + nano::uint128_t result = 0; + for (auto const & vote : votes) + { + if (vote->is_final ()) + { + result += ledger.weight (vote->account); + } + } + return result; +} + +nano::vote_storage::vote_list_t nano::vote_storage::filter (const nano::vote_storage::vote_list_t & votes) const +{ + nano::vote_storage::vote_list_t result; + for (auto const & vote : votes) + { + auto should_pass = [this] (auto const & vote) { + if (ignore_255_votes && vote->hashes.size () > 12) + { + return false; + } + return ledger.weight (vote->account) >= rep_weight_threshold; + }; + + if (should_pass (vote)) + { + result.push_back (vote); + } + } + return result; +} + +nano::vote_storage::vote_list_t nano::vote_storage::query_hash (const nano::store::transaction & vote_transaction, const nano::block_hash & hash, bool final_only) +{ + auto votes = vote_store.vote_storage.get (vote_transaction, hash); + if (!votes.empty ()) + { + auto const quorum = node.online_reps.delta (); + + // auto should_pass = [this] (auto const & votes) { + // if (vote_final_weight_threshold > 0 && weight_final (votes) >= vote_final_weight_threshold) + // { + // return true; + // } + // if (vote_weight_threshold > 0 && weight (votes) >= vote_weight_threshold) + // { + // return true; + // } + // return false; + // }; + + // if (should_pass (votes)) + + auto const wgt = final_only ? weight_final (votes) : weight (votes); + if (wgt >= quorum) + { + return filter (votes); + } + else + { + stats.inc (nano::stat::type::vote_storage, nano::stat::detail::low_weight); + } + } + return {}; +} + +nano::vote_storage::vote_list_t nano::vote_storage::query_hash (nano::block_hash hash) +{ + auto vote_transaction = vote_store.tx_begin_read (); + return query_hash (vote_transaction, hash); +} + +std::pair nano::vote_storage::query_frontier (nano::store::transaction const & ledger_transaction, nano::store::transaction const & vote_transaction, const nano::block_hash & hash) +{ + auto account = ledger.account_safe (ledger_transaction, hash); + if (account.is_zero ()) + { + return {}; + } + + auto account_info = ledger.account_info (ledger_transaction, account); + if (!account_info) + { + return {}; + } + + auto frontier = account_info->head; + + const int max_retries = 128; + for (int n = 0; n < max_retries && !frontier.is_zero () && frontier != hash; ++n) + { + auto votes = query_hash (vote_transaction, frontier); + if (!votes.empty ()) + { + return { votes, frontier }; + } + + // TODO: Create `ledger.previous(hash)` helper + auto block = ledger.store.block.get (ledger_transaction, frontier); + if (block) + { + frontier = block->previous (); + } + else + { + frontier = { 0 }; + } + } + + return {}; +} + +std::unique_ptr nano::vote_storage::collect_container_info (const std::string & name) const +{ + nano::lock_guard lock{ mutex }; + + auto composite = std::make_unique (name); + composite->add_component (recently_broadcasted.collect_container_info ("recently_broadcasted")); + composite->add_component (std::make_unique (container_info{ "requests", requests.size (), sizeof (decltype (requests)::value_type) })); + composite->add_component (std::make_unique (container_info{ "store_queue", store_queue.size (), 0 })); + composite->add_component (std::make_unique (container_info{ "broadcast_queue", reply_queue.size (), 0 })); + return composite; +} + +/* + * recently_broadcasted + */ + +bool nano::vote_storage::recently_broadcasted::check (const nano::block_hash & hash) +{ + nano::lock_guard lock{ mutex }; + cleanup (); + return recently_broadcasted_hashes.contains (hash); +} + +bool nano::vote_storage::recently_broadcasted::check_and_insert (const nano::block_hash & hash) +{ + nano::lock_guard lock{ mutex }; + cleanup (); + + if (recently_broadcasted_hashes.contains (hash)) + { + return true; + } + recently_broadcasted_hashes.emplace (hash, std::chrono::steady_clock::now ()); + return false; +} + +bool nano::vote_storage::recently_broadcasted::check (const nano::block_hash & hash, const std::shared_ptr & channel) +{ + nano::lock_guard lock{ mutex }; + cleanup (); + + if (auto it = recently_broadcasted_map.find (channel); it != recently_broadcasted_map.end ()) + { + auto & entries = it->second; + return entries.contains (hash); + } + return false; +} + +bool nano::vote_storage::recently_broadcasted::check_and_insert (const nano::block_hash & hash, const std::shared_ptr & channel) +{ + nano::lock_guard lock{ mutex }; + cleanup (); + + if (auto it = recently_broadcasted_map.find (channel); it != recently_broadcasted_map.end ()) + { + auto & entries = it->second; + if (entries.contains (hash)) + { + return true; + } + } + recently_broadcasted_map[channel].emplace (hash, std::chrono::steady_clock::now ()); + return false; +} + +void nano::vote_storage::recently_broadcasted::cleanup () +{ + debug_assert (!mutex.try_lock ()); + + // Cleanup + if (elapsed (last_cleanup, cleanup_interval)) + { + last_cleanup = std::chrono::steady_clock::now (); + + for (auto & [channel, entries] : recently_broadcasted_map) + { + erase_if (entries, [&] (auto const & entry) { + auto const & [hash, time] = entry; + return nano::elapsed (time, rebroadcast_interval); + }); + } + + erase_if (recently_broadcasted_map, [&] (auto const & pair) { + auto const & [channel, entries] = pair; + if (!channel->alive ()) + { + return true; // Erase + } + if (entries.empty ()) + { + return true; // Erase + } + return false; + }); + + erase_if (recently_broadcasted_hashes, [&] (auto const & pair) { + auto const & [hash, time] = pair; + return nano::elapsed (time, rebroadcast_interval); + }); + } +} + +std::unique_ptr nano::vote_storage::recently_broadcasted::collect_container_info (const std::string & name) const +{ + nano::lock_guard lock{ mutex }; + + auto total_size = std::accumulate (recently_broadcasted_map.begin (), recently_broadcasted_map.end (), std::size_t{ 0 }, [] (auto total, auto const & entry) { + return total + entry.second.size (); + }); + + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "map", recently_broadcasted_map.size (), sizeof (decltype (recently_broadcasted_map)::value_type) })); + composite->add_component (std::make_unique (container_info{ "map_total", total_size, sizeof (decltype (recently_broadcasted_map)::value_type::second_type) })); + composite->add_component (std::make_unique (container_info{ "hashes", recently_broadcasted_hashes.size (), sizeof (decltype (recently_broadcasted_hashes)::value_type) })); + return composite; +} \ No newline at end of file diff --git a/nano/node/vote_storage.hpp b/nano/node/vote_storage.hpp new file mode 100644 index 0000000000..94e0aaa242 --- /dev/null +++ b/nano/node/vote_storage.hpp @@ -0,0 +1,171 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace mi = boost::multi_index; + +namespace nano +{ +class network; +class stats; +class ledger; +class vote; +namespace transport +{ + class channel; +} + +class vote_storage final +{ +public: + using vote_list_t = std::vector>; + +public: + vote_storage (nano::node &, nano::store::component & vote_store, nano::network &, nano::ledger &, nano::stats &); + ~vote_storage (); + + void start (); + void stop (); + + std::unique_ptr collect_container_info (std::string const & name) const; + +public: + /** + * Store vote in database + */ + void vote (std::shared_ptr); + + /** + * Trigger vote broadcast for hash + */ + void trigger (nano::block_hash const &, std::shared_ptr const &); + +public: // Dependencies + nano::node & node; + nano::store::component & vote_store; + nano::network & network; + nano::ledger & ledger; + nano::stats & stats; + +private: + using reply_entry_t = std::pair>; + + nano::processing_queue> store_queue; + nano::processing_queue reply_queue; + +private: + class recently_broadcasted + { + public: + bool check (nano::block_hash const &); + bool check_and_insert (nano::block_hash const &); + bool check (nano::block_hash const &, std::shared_ptr const &); + bool check_and_insert (nano::block_hash const &, std::shared_ptr const &); + + std::unique_ptr collect_container_info (std::string const & name) const; + + static std::chrono::seconds constexpr rebroadcast_interval{ 10 }; + static std::chrono::seconds constexpr cleanup_interval{ rebroadcast_interval / 2 }; + + private: + void cleanup (); + + using recently_broadcasted_entries_t = std::unordered_map; + std::unordered_map, recently_broadcasted_entries_t> recently_broadcasted_map; + + std::unordered_map recently_broadcasted_hashes; + + std::chrono::steady_clock::time_point last_cleanup{ std::chrono::steady_clock::now () }; + mutable nano::mutex mutex; + }; + + vote_storage::recently_broadcasted recently_broadcasted; + +private: + struct request_entry + { + nano::block_hash hash; + size_t count; + std::chrono::steady_clock::time_point time{ std::chrono::steady_clock::now () }; + }; + + // clang-format off + class tag_sequenced {}; + class tag_hash {}; + class tag_count {}; + + using ordered_requests = boost::multi_index_container, + mi::member>, + mi::sequenced>, + mi::ordered_non_unique, + mi::member, std::greater<>> // DESC + >>; + // clang-format on + + static size_t constexpr max_requests = 1024 * 512; + ordered_requests requests; + +private: + void run (); + std::unordered_set run_broadcasts (ordered_requests); + void cleanup (); + void wait_peers (); + + void process_batch (decltype (store_queue)::batch_t &); + void process_batch (decltype (reply_queue)::batch_t &); + + uint128_t weight (vote_list_t const &) const; + uint128_t weight_final (vote_list_t const &) const; + vote_list_t filter (vote_list_t const &) const; + + void reply (vote_list_t const &, nano::block_hash const &, std::shared_ptr const &); + void broadcast (vote_list_t const &, nano::block_hash const &); + + vote_list_t query_hash (nano::store::transaction const & vote_transaction, nano::block_hash const &, bool final_only = false); + /** @returns */ + std::pair query_frontier (nano::store::transaction const & ledger_transaction, nano::store::transaction const & vote_transaction, nano::block_hash const &); + +public: + vote_list_t query_hash (nano::block_hash); + +private: + std::atomic stopped{ false }; + mutable nano::mutex mutex; + nano::condition_variable condition; + std::thread thread; + +private: + // TODO: Use nodeconfig + uint128_t const vote_weight_threshold{ 65600 * nano::Gxrb_ratio }; // Quorum + uint128_t const vote_final_weight_threshold{ 65600 * nano::Gxrb_ratio }; // Quorum + uint128_t const rep_weight_threshold{ 100 * nano::Gxrb_ratio }; + bool const trigger_pr_only{ true }; + bool const store_final_only{ false }; + bool const ignore_255_votes{ true }; + float const max_busy_ratio{ 0.5f }; + std::chrono::seconds const request_age_cutoff{ 30 }; + + static bool constexpr enable_broadcast = true; + static bool constexpr enable_replies = true; + static bool constexpr enable_pr_broadcast = true; + static bool constexpr enable_random_broadcast = false; + static bool constexpr enable_query_frontier = false; +}; +} \ No newline at end of file diff --git a/nano/secure/common.cpp b/nano/secure/common.cpp index d34f9daf07..5e0ffd880b 100644 --- a/nano/secure/common.cpp +++ b/nano/secure/common.cpp @@ -223,6 +223,10 @@ nano::keypair::keypair (std::string const & prv_a) ed25519_publickey (prv.bytes.data (), pub.bytes.data ()); } +/* + * account_info + */ + nano::account_info::account_info (nano::block_hash const & head_a, nano::account const & representative_a, nano::block_hash const & open_block_a, nano::amount const & balance_a, nano::seconds_t modified_a, uint64_t block_count_a, nano::epoch epoch_a) : head (head_a), representative (representative_a), @@ -282,6 +286,21 @@ nano::epoch nano::account_info::epoch () const return epoch_m; } +void nano::account_info::operator() (nano::object_stream & obs) const +{ + obs.write ("head", head.to_string ()); + obs.write ("representative", representative.to_account ()); + obs.write ("open_block", open_block.to_string ()); + obs.write ("balance", balance.to_string_dec ()); + obs.write ("modified", std::to_string (modified)); + obs.write ("block_count", std::to_string (block_count)); + obs.write ("epoch", epoch_m); +} + +/* + * pending_info + */ + nano::pending_info::pending_info (nano::account const & source_a, nano::amount const & amount_a, nano::epoch epoch_a) : source (source_a), amount (amount_a), diff --git a/nano/secure/common.hpp b/nano/secure/common.hpp index 2e183267d4..1347f5f91a 100644 --- a/nano/secure/common.hpp +++ b/nano/secure/common.hpp @@ -120,6 +120,8 @@ class account_info final nano::seconds_t modified{ 0 }; uint64_t block_count{ 0 }; nano::epoch epoch_m{ nano::epoch::epoch_0 }; + + void operator() (nano::object_stream &) const; }; /** diff --git a/nano/secure/ledger.cpp b/nano/secure/ledger.cpp index c23ffc73b4..0894f4afa4 100644 --- a/nano/secure/ledger.cpp +++ b/nano/secure/ledger.cpp @@ -1453,6 +1453,11 @@ std::shared_ptr nano::ledger::forked_block (store::transaction cons return result; } +std::shared_ptr nano::ledger::block (store::transaction const & transaction_a, nano::block_hash const & hash_a) +{ + return store.block.get (transaction_a, hash_a); +} + std::shared_ptr nano::ledger::head_block (store::transaction const & transaction, nano::account const & account) { auto info = store.account.get (transaction, account); diff --git a/nano/secure/ledger.hpp b/nano/secure/ledger.hpp index 935f57e2cb..b894fe13ba 100644 --- a/nano/secure/ledger.hpp +++ b/nano/secure/ledger.hpp @@ -54,6 +54,7 @@ class ledger final nano::uint128_t account_balance (store::transaction const &, nano::account const &, bool = false); nano::uint128_t account_receivable (store::transaction const &, nano::account const &, bool = false); nano::uint128_t weight (nano::account const &); + std::shared_ptr block (store::transaction const &, nano::block_hash const &); std::shared_ptr successor (store::transaction const &, nano::qualified_root const &); std::shared_ptr forked_block (store::transaction const &, nano::block const &); std::shared_ptr head_block (store::transaction const &, nano::account const &); diff --git a/nano/secure/vote.cpp b/nano/secure/vote.cpp index 28002f25f0..296886634b 100644 --- a/nano/secure/vote.cpp +++ b/nano/secure/vote.cpp @@ -137,6 +137,11 @@ std::chrono::milliseconds nano::vote::duration () const return std::chrono::milliseconds{ 1u << (duration_bits () + 4) }; } +bool nano::vote::is_final () const +{ + return is_final_timestamp (timestamp_m); +} + void nano::vote::serialize_json (boost::property_tree::ptree & tree) const { tree.put ("account", account.to_account ()); diff --git a/nano/secure/vote.hpp b/nano/secure/vote.hpp index 9ae7b0689f..da3cde85f6 100644 --- a/nano/secure/vote.hpp +++ b/nano/secure/vote.hpp @@ -42,6 +42,7 @@ class vote final uint64_t timestamp () const; uint8_t duration_bits () const; std::chrono::milliseconds duration () const; + bool is_final () const; static uint64_t constexpr timestamp_mask = { 0xffff'ffff'ffff'fff0ULL }; static nano::seconds_t constexpr timestamp_max = { 0xffff'ffff'ffff'fff0ULL }; diff --git a/nano/store/CMakeLists.txt b/nano/store/CMakeLists.txt index a992ba59bc..468c7bf4e8 100644 --- a/nano/store/CMakeLists.txt +++ b/nano/store/CMakeLists.txt @@ -27,6 +27,8 @@ add_library( lmdb/transaction_impl.hpp lmdb/version.hpp lmdb/wallet_value.hpp + lmdb/vote_storage.hpp + lmdb/vote_storage.cpp online_weight.hpp peer.hpp pending.hpp diff --git a/nano/store/component.cpp b/nano/store/component.cpp index cd68dc2a09..a33dc25cf7 100644 --- a/nano/store/component.cpp +++ b/nano/store/component.cpp @@ -5,7 +5,7 @@ #include #include -nano::store::component::component (nano::store::block & block_store_a, nano::store::frontier & frontier_store_a, nano::store::account & account_store_a, nano::store::pending & pending_store_a, nano::store::online_weight & online_weight_store_a, nano::store::pruned & pruned_store_a, nano::store::peer & peer_store_a, nano::store::confirmation_height & confirmation_height_store_a, nano::store::final_vote & final_vote_store_a, nano::store::version & version_store_a) : +nano::store::component::component (nano::store::block & block_store_a, nano::store::frontier & frontier_store_a, nano::store::account & account_store_a, nano::store::pending & pending_store_a, nano::store::online_weight & online_weight_store_a, nano::store::pruned & pruned_store_a, nano::store::peer & peer_store_a, nano::store::confirmation_height & confirmation_height_store_a, nano::store::final_vote & final_vote_store_a, nano::store::version & version_store_a, nano::store::vote_storage & vote_storage_store_a) : block (block_store_a), frontier (frontier_store_a), account (account_store_a), @@ -15,7 +15,8 @@ nano::store::component::component (nano::store::block & block_store_a, nano::sto peer (peer_store_a), confirmation_height (confirmation_height_store_a), final_vote (final_vote_store_a), - version (version_store_a) + version (version_store_a), + vote_storage (vote_storage_store_a) { } diff --git a/nano/store/component.hpp b/nano/store/component.hpp index a675ca5623..e7d5c93f16 100644 --- a/nano/store/component.hpp +++ b/nano/store/component.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -27,6 +28,7 @@ namespace store class pending; class pruned; class version; + class vote_storage; } class ledger_cache; @@ -52,7 +54,8 @@ namespace store nano::store::peer &, nano::store::confirmation_height &, nano::store::final_vote &, - nano::store::version & + nano::store::version &, + nano::store::vote_storage & ); // clang-format on virtual ~component () = default; @@ -78,6 +81,7 @@ namespace store store::confirmation_height & confirmation_height; store::final_vote & final_vote; store::version & version; + store::vote_storage & vote_storage; virtual unsigned max_block_write_batch_num () const = 0; @@ -100,3 +104,18 @@ namespace store }; } // namespace store } // namespace nano + +namespace nano::store +{ +class vote_storage +{ +public: + virtual std::size_t put (nano::store::write_transaction const &, std::shared_ptr const &) = 0; + virtual std::vector> get (nano::store::transaction const &, nano::block_hash const &) = 0; + // virtual int del (nano::write_transaction const &, nano::block_hash const &) = 0; + // virtual void del (nano::write_transaction const &, nano::vote_storage_key const &) = 0; + // virtual nano::store_iterator begin (nano::transaction const &) const = 0; + virtual nano::store::iterator begin (nano::store::transaction const &, nano::vote_storage_key const &) const = 0; + virtual nano::store::iterator end () const = 0; +}; +} \ No newline at end of file diff --git a/nano/store/db_val.hpp b/nano/store/db_val.hpp index 957492e23e..07edc0178f 100644 --- a/nano/store/db_val.hpp +++ b/nano/store/db_val.hpp @@ -113,6 +113,21 @@ class db_val convert_buffer_to_value (); } + db_val (nano::vote_storage_key const & val_a) : + db_val (sizeof (val_a), const_cast (&val_a)) + { + } + + db_val (nano::vote const & val_a) : + buffer (std::make_shared> ()) + { + { + nano::vectorstream stream (*buffer); + val_a.serialize (stream); + } + convert_buffer_to_value (); + } + explicit operator nano::account_info () const { nano::account_info result; @@ -292,6 +307,21 @@ class db_val return result; } + explicit operator nano::vote () const + { + nano::bufferstream stream (reinterpret_cast (data ()), size ()); + nano::vote result; + bool error = result.deserialize (stream); + (void)error; + debug_assert (!error); + return result; + } + + explicit operator nano::vote_storage_key () const + { + return convert (); + } + operator Val * () const { // Allow passing a temporary to a non-c++ function which doesn't have constness diff --git a/nano/store/lmdb/lmdb.cpp b/nano/store/lmdb/lmdb.cpp index 4dd9e64407..41952f8795 100644 --- a/nano/store/lmdb/lmdb.cpp +++ b/nano/store/lmdb/lmdb.cpp @@ -24,7 +24,8 @@ nano::store::lmdb::component::component (nano::logger & logger_a, std::filesyste peer_store, confirmation_height_store, final_vote_store, - version_store + version_store, + vote_storage_store }, // clang-format on block_store{ *this }, @@ -37,6 +38,7 @@ nano::store::lmdb::component::component (nano::logger & logger_a, std::filesyste confirmation_height_store{ *this }, final_vote_store{ *this }, version_store{ *this }, + vote_storage_store{ *this }, logger{ logger_a }, env (error, path_a, nano::store::lmdb::env::options::make ().set_config (lmdb_config_a).set_use_no_mem_init (true)), mdb_txn_tracker (logger_a, txn_tracking_config_a, block_processor_batch_max_time_a), @@ -44,7 +46,7 @@ nano::store::lmdb::component::component (nano::logger & logger_a, std::filesyste { if (!error) { - debug_assert (path_a.filename () == "data.ldb"); + debug_assert (path_a.filename () == "data.ldb" || path_a.filename () == "votes.ldb"); auto is_fully_upgraded (false); auto is_fresh_db (false); @@ -204,6 +206,15 @@ void nano::store::lmdb::component::open_databases (bool & error_a, store::transa pending_store.pending_handle = pending_store.pending_v0_handle; error_a |= mdb_dbi_open (env.tx (transaction_a), "final_votes", flags, &final_vote_store.final_votes_handle) != 0; error_a |= mdb_dbi_open (env.tx (transaction_a), "blocks", MDB_CREATE, &block_store.blocks_handle) != 0; + + // TODO: This should be done in a cleaner way + { + bool error_b = mdb_dbi_open (env.tx (transaction_a), "vote_storage", flags, &vote_storage_store.handle) != 0; + if (flags & MDB_CREATE) + { + error_a |= error_b; + } + } } bool nano::store::lmdb::component::do_upgrades (store::write_transaction & transaction_a, nano::ledger_constants & constants, bool & needs_vacuuming) @@ -339,6 +350,8 @@ MDB_dbi nano::store::lmdb::component::table_to_dbi (tables table_a) const return confirmation_height_store.confirmation_height_handle; case tables::final_votes: return final_vote_store.final_votes_handle; + case tables::vote_storage: + return vote_storage_store.handle; default: release_assert (false); return peer_store.peers_handle; diff --git a/nano/store/lmdb/lmdb.hpp b/nano/store/lmdb/lmdb.hpp index a1f5dda6cf..4fa3a53f1d 100644 --- a/nano/store/lmdb/lmdb.hpp +++ b/nano/store/lmdb/lmdb.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -50,6 +51,7 @@ class component : public nano::store::component nano::store::lmdb::pending pending_store; nano::store::lmdb::pruned pruned_store; nano::store::lmdb::version version_store; + nano::store::lmdb::vote_storage vote_storage_store; friend class nano::store::lmdb::account; friend class nano::store::lmdb::block; @@ -61,6 +63,7 @@ class component : public nano::store::component friend class nano::store::lmdb::pending; friend class nano::store::lmdb::pruned; friend class nano::store::lmdb::version; + friend class nano::store::lmdb::vote_storage; public: component (nano::logger &, std::filesystem::path const &, nano::ledger_constants & constants, nano::txn_tracking_config const & txn_tracking_config_a = nano::txn_tracking_config{}, std::chrono::milliseconds block_processor_batch_max_time_a = std::chrono::milliseconds (5000), nano::lmdb_config const & lmdb_config_a = nano::lmdb_config{}, bool backup_before_upgrade = false); diff --git a/nano/store/lmdb/vote_storage.cpp b/nano/store/lmdb/vote_storage.cpp new file mode 100644 index 0000000000..a544827410 --- /dev/null +++ b/nano/store/lmdb/vote_storage.cpp @@ -0,0 +1,68 @@ +#include +#include + +nano::store::lmdb::vote_storage::vote_storage (nano::store::lmdb::component & store_a) : + store{ store_a } +{ +} + +std::size_t nano::store::lmdb::vote_storage::put (const nano::store::write_transaction & transaction, const std::shared_ptr & vote) +{ + std::size_t result = 0; // Number of inserted votes + + for (auto const hash : vote->hashes) + { + nano::vote_storage_key key{ hash, vote->account }; + + nano::store::lmdb::db_val value; + auto status = store.get (transaction, tables::vote_storage, key, value); + if (store.success (status)) + { + auto existing_vote = static_cast (value); + + debug_assert (existing_vote.account == vote->account); + debug_assert (!existing_vote.validate ()); + + // Replace with final vote + if (!existing_vote.is_final () && vote->is_final ()) + { + auto status2 = store.put (transaction, tables::vote_storage, key, *vote); + store.release_assert_success (status2); + ++result; + } + } + else + { + auto status2 = store.put (transaction, tables::vote_storage, key, *vote); + store.release_assert_success (status2); + ++result; + } + } + + return result; +} + +std::vector> nano::store::lmdb::vote_storage::get (const nano::store::transaction & transaction, const nano::block_hash & hash) +{ + std::vector> result; + + nano::vote_storage_key start{ hash, 0 }; + for (auto i = begin (transaction, start), n = end (); i != n && i->first.block_hash () == hash; ++i) + { + // TODO: It's inefficient to use shared ptr here but it is required in many other places + result.push_back (std::make_shared (i->second)); + debug_assert (!result.back ()->validate ()); + } + + return result; +} + +nano::store::iterator nano::store::lmdb::vote_storage::begin (const nano::store::transaction & transaction, const nano::vote_storage_key & key) const +{ + return store.make_iterator (transaction, tables::vote_storage, key); +} + +nano::store::iterator nano::store::lmdb::vote_storage::end () const +{ + return { nullptr }; +} \ No newline at end of file diff --git a/nano/store/lmdb/vote_storage.hpp b/nano/store/lmdb/vote_storage.hpp new file mode 100644 index 0000000000..8c07b11ae9 --- /dev/null +++ b/nano/store/lmdb/vote_storage.hpp @@ -0,0 +1,29 @@ +#pragma once + +#include +#include + +#include + +namespace nano::store::lmdb +{ +class vote_storage : public nano::store::vote_storage +{ +public: + explicit vote_storage (nano::store::lmdb::component &); + + std::size_t put (nano::store::write_transaction const &, std::shared_ptr const &) override; + std::vector> get (nano::store::transaction const &, nano::block_hash const &) override; + // int del (nano::write_transaction const &, nano::block_hash const &) override; + // void del (nano::write_transaction const &, nano::vote_storage_key const &) override; + // nano::store_iterator begin (nano::transaction const &) const override; + nano::store::iterator begin (nano::store::transaction const &, nano::vote_storage_key const &) const override; + nano::store::iterator end () const override; + +private: + nano::store::lmdb::component & store; + +public: + MDB_dbi handle{ 0 }; +}; +} \ No newline at end of file diff --git a/nano/store/rocksdb/rocksdb.cpp b/nano/store/rocksdb/rocksdb.cpp index 099972583c..f526bb70a1 100644 --- a/nano/store/rocksdb/rocksdb.cpp +++ b/nano/store/rocksdb/rocksdb.cpp @@ -34,6 +34,34 @@ class event_listener : public rocksdb::EventListener }; } +namespace +{ +class fake_vote_storage : public nano::store::vote_storage +{ + std::size_t put (nano::store::write_transaction const &, std::shared_ptr const &) + { + release_assert (false); + }; + std::vector> get (nano::store::transaction const &, nano::block_hash const &) + { + release_assert (false); + }; + // int del (nano::write_transaction const &, nano::block_hash const &) override; + // void del (nano::write_transaction const &, nano::vote_storage_key const &) override; + // nano::store_iterator begin (nano::transaction const &) const override; + nano::store::iterator begin (nano::store::transaction const &, nano::vote_storage_key const &) const + { + release_assert (false); + }; + nano::store::iterator end () const + { + release_assert (false); + }; +}; + +static fake_vote_storage fake_vote_storage_instance; +} + nano::store::rocksdb::component::component (nano::logger & logger_a, std::filesystem::path const & path_a, nano::ledger_constants & constants, nano::rocksdb_config const & rocksdb_config_a, bool open_read_only_a) : // clang-format off nano::store::component{ @@ -46,7 +74,8 @@ nano::store::rocksdb::component::component (nano::logger & logger_a, std::filesy peer_store, confirmation_height_store, final_vote_store, - version_store + version_store, + fake_vote_storage_instance }, // clang-format on block_store{ *this }, diff --git a/nano/store/tables.hpp b/nano/store/tables.hpp index e0a03bd020..335e3d4b3e 100644 --- a/nano/store/tables.hpp +++ b/nano/store/tables.hpp @@ -18,7 +18,8 @@ enum class tables peers, pending, pruned, - vote + vote, + vote_storage, }; } // namespace nano