diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 1d9118ec2c..c260d5dbe2 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -3698,3 +3699,22 @@ TEST (node, container_info) ASSERT_NO_THROW (node1.container_info ()); ASSERT_NO_THROW (node2.container_info ()); } + +TEST (node, bounded_backlog) +{ + nano::test::system system; + + nano::node_config node_config; + node_config.max_backlog = 10; + node_config.backlog_scan.enable = false; + auto & node = *system.add_node (node_config); + + const int howmany_blocks = 64; + const int howmany_chains = 16; + + auto chains = nano::test::setup_chains (system, node, howmany_chains, howmany_blocks, nano::dev::genesis_key, /* do not confirm */ false); + + node.backlog_scan.trigger (); + + ASSERT_TIMELY_EQ (20s, node.ledger.block_count (), 11); // 10 + genesis +} diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index 9d6e2124ab..cb635aecd1 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -67,7 +67,125 @@ TEST (toml, diff_equal) ASSERT_TRUE (other.empty ()); } -TEST (toml, daemon_config_update_array) +TEST (toml, optional_child) +{ + std::stringstream ss; + ss << R"toml( + [child] + val=1 + )toml"; + + nano::tomlconfig t; + t.read (ss); + auto c1 = t.get_required_child ("child"); + int val = 0; + c1.get_required ("val", val); + ASSERT_EQ (val, 1); + auto c2 = t.get_optional_child ("child2"); + ASSERT_FALSE (c2); +} + +/** Config settings passed via CLI overrides the config file settings. This is solved +using an override stream. */ +TEST (toml, dot_child_syntax) +{ + std::stringstream ss_override; + ss_override << R"toml( + node.a = 1 + node.b = 2 + )toml"; + + std::stringstream ss; + ss << R"toml( + [node] + b=5 + c=3 + )toml"; + + nano::tomlconfig t; + t.read (ss_override, ss); + + auto node = t.get_required_child ("node"); + uint16_t a, b, c; + node.get ("a", a); + ASSERT_EQ (a, 1); + node.get ("b", b); + ASSERT_EQ (b, 2); + node.get ("c", c); + ASSERT_EQ (c, 3); +} + +TEST (toml, base_override) +{ + std::stringstream ss_base; + ss_base << R"toml( + node.peering_port=7075 + )toml"; + + std::stringstream ss_override; + ss_override << R"toml( + node.peering_port=8075 + node.too_big=70000 + )toml"; + + nano::tomlconfig t; + t.read (ss_override, ss_base); + + // Query optional existent value + uint16_t port = 0; + t.get_optional ("node.peering_port", port); + ASSERT_EQ (port, 8075); + ASSERT_FALSE (t.get_error ()); + + // Query optional non-existent value, make sure we get default and no errors + port = 65535; + t.get_optional ("node.peering_port_non_existent", port); + ASSERT_EQ (port, 65535); + ASSERT_FALSE (t.get_error ()); + t.get_error ().clear (); + + // Query required non-existent value, make sure it errors + t.get_required ("node.peering_port_not_existent", port); + ASSERT_EQ (port, 65535); + ASSERT_TRUE (t.get_error ()); + ASSERT_EQ (t.get_error (), nano::error_config::missing_value); + t.get_error ().clear (); + + // Query uint16 that's too big, make sure we have an error + t.get_required ("node.too_big", port); + ASSERT_TRUE (t.get_error ()); + ASSERT_EQ (t.get_error (), nano::error_config::invalid_value); +} + +TEST (toml, put) +{ + nano::tomlconfig config; + nano::tomlconfig config_node; + // Overwrite value and add to child node + config_node.put ("port", "7074"); + config_node.put ("port", "7075"); + config.put_child ("node", config_node); + uint16_t port; + config.get_required ("node.port", port); + ASSERT_EQ (port, 7075); + ASSERT_FALSE (config.get_error ()); +} + +TEST (toml, array) +{ + nano::tomlconfig config; + nano::tomlconfig config_node; + config.put_child ("node", config_node); + config_node.push ("items", "item 1"); + config_node.push ("items", "item 2"); + int i = 1; + config_node.array_entries_required ("items", [&i] (std::string item) { + ASSERT_EQ (item, std::string ("item ") + std::to_string (i)); + i++; + }); +} + +TEST (toml_config, daemon_config_update_array) { nano::tomlconfig t; std::filesystem::path data_path ("."); @@ -79,7 +197,7 @@ TEST (toml, daemon_config_update_array) } /** Empty rpc config file should match a default config object */ -TEST (toml, rpc_config_deserialize_defaults) +TEST (toml_config, rpc_config_deserialize_defaults) { std::stringstream ss; @@ -111,12 +229,13 @@ TEST (toml, rpc_config_deserialize_defaults) } /** Empty config file should match a default config object */ -TEST (toml, daemon_config_deserialize_defaults) +TEST (toml_config, daemon_config_deserialize_defaults) { std::stringstream ss; ss << R"toml( [node] [node.backlog_scan] + [node.bounded_backlog] [node.bootstrap] [node.bootstrap_server] [node.block_processor] @@ -197,10 +316,17 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.max_queued_requests, defaults.node.max_queued_requests); ASSERT_EQ (conf.node.request_aggregator_threads, defaults.node.request_aggregator_threads); ASSERT_EQ (conf.node.max_unchecked_blocks, defaults.node.max_unchecked_blocks); + ASSERT_EQ (conf.node.max_backlog, defaults.node.max_backlog); + ASSERT_EQ (conf.node.enable_upnp, defaults.node.enable_upnp); + ASSERT_EQ (conf.node.backlog_scan.enable, defaults.node.backlog_scan.enable); ASSERT_EQ (conf.node.backlog_scan.batch_size, defaults.node.backlog_scan.batch_size); ASSERT_EQ (conf.node.backlog_scan.rate_limit, defaults.node.backlog_scan.rate_limit); - ASSERT_EQ (conf.node.enable_upnp, defaults.node.enable_upnp); + + ASSERT_EQ (conf.node.bounded_backlog.enable, defaults.node.bounded_backlog.enable); + ASSERT_EQ (conf.node.bounded_backlog.batch_size, defaults.node.bounded_backlog.batch_size); + ASSERT_EQ (conf.node.bounded_backlog.max_queued_notifications, defaults.node.bounded_backlog.max_queued_notifications); + ASSERT_EQ (conf.node.bounded_backlog.scan_rate, defaults.node.bounded_backlog.scan_rate); ASSERT_EQ (conf.node.websocket_config.enabled, defaults.node.websocket_config.enabled); ASSERT_EQ (conf.node.websocket_config.address, defaults.node.websocket_config.address); @@ -294,126 +420,8 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.message_processor.max_queue, defaults.node.message_processor.max_queue); } -TEST (toml, optional_child) -{ - std::stringstream ss; - ss << R"toml( - [child] - val=1 - )toml"; - - nano::tomlconfig t; - t.read (ss); - auto c1 = t.get_required_child ("child"); - int val = 0; - c1.get_required ("val", val); - ASSERT_EQ (val, 1); - auto c2 = t.get_optional_child ("child2"); - ASSERT_FALSE (c2); -} - -/** Config settings passed via CLI overrides the config file settings. This is solved -using an override stream. */ -TEST (toml, dot_child_syntax) -{ - std::stringstream ss_override; - ss_override << R"toml( - node.a = 1 - node.b = 2 - )toml"; - - std::stringstream ss; - ss << R"toml( - [node] - b=5 - c=3 - )toml"; - - nano::tomlconfig t; - t.read (ss_override, ss); - - auto node = t.get_required_child ("node"); - uint16_t a, b, c; - node.get ("a", a); - ASSERT_EQ (a, 1); - node.get ("b", b); - ASSERT_EQ (b, 2); - node.get ("c", c); - ASSERT_EQ (c, 3); -} - -TEST (toml, base_override) -{ - std::stringstream ss_base; - ss_base << R"toml( - node.peering_port=7075 - )toml"; - - std::stringstream ss_override; - ss_override << R"toml( - node.peering_port=8075 - node.too_big=70000 - )toml"; - - nano::tomlconfig t; - t.read (ss_override, ss_base); - - // Query optional existent value - uint16_t port = 0; - t.get_optional ("node.peering_port", port); - ASSERT_EQ (port, 8075); - ASSERT_FALSE (t.get_error ()); - - // Query optional non-existent value, make sure we get default and no errors - port = 65535; - t.get_optional ("node.peering_port_non_existent", port); - ASSERT_EQ (port, 65535); - ASSERT_FALSE (t.get_error ()); - t.get_error ().clear (); - - // Query required non-existent value, make sure it errors - t.get_required ("node.peering_port_not_existent", port); - ASSERT_EQ (port, 65535); - ASSERT_TRUE (t.get_error ()); - ASSERT_EQ (t.get_error (), nano::error_config::missing_value); - t.get_error ().clear (); - - // Query uint16 that's too big, make sure we have an error - t.get_required ("node.too_big", port); - ASSERT_TRUE (t.get_error ()); - ASSERT_EQ (t.get_error (), nano::error_config::invalid_value); -} - -TEST (toml, put) -{ - nano::tomlconfig config; - nano::tomlconfig config_node; - // Overwrite value and add to child node - config_node.put ("port", "7074"); - config_node.put ("port", "7075"); - config.put_child ("node", config_node); - uint16_t port; - config.get_required ("node.port", port); - ASSERT_EQ (port, 7075); - ASSERT_FALSE (config.get_error ()); -} - -TEST (toml, array) -{ - nano::tomlconfig config; - nano::tomlconfig config_node; - config.put_child ("node", config_node); - config_node.push ("items", "item 1"); - config_node.push ("items", "item 2"); - int i = 1; - config_node.array_entries_required ("items", [&i] (std::string item) { - ASSERT_EQ (item, std::string ("item ") + std::to_string (i)); - i++; - }); -} - /** Deserialize a node config with non-default values */ -TEST (toml, daemon_config_deserialize_no_defaults) +TEST (toml_config, daemon_config_deserialize_no_defaults) { std::stringstream ss; @@ -462,6 +470,7 @@ TEST (toml, daemon_config_deserialize_no_defaults) max_queued_requests = 999 request_aggregator_threads = 999 max_unchecked_blocks = 999 + max_backlog = 999 frontiers_confirmation = "always" enable_upnp = false @@ -470,6 +479,12 @@ TEST (toml, daemon_config_deserialize_no_defaults) batch_size = 999 rate_limit = 999 + [node.bounded_backlog] + enable = false + batch_size = 999 + max_queued_notifications = 999 + scan_rate = 999 + [node.block_processor] max_peer_queue = 999 max_system_queue = 999 @@ -679,6 +694,7 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.io_threads, defaults.node.io_threads); ASSERT_NE (conf.node.max_work_generate_multiplier, defaults.node.max_work_generate_multiplier); ASSERT_NE (conf.node.max_unchecked_blocks, defaults.node.max_unchecked_blocks); + ASSERT_NE (conf.node.max_backlog, defaults.node.max_backlog); ASSERT_NE (conf.node.network_threads, defaults.node.network_threads); ASSERT_NE (conf.node.background_threads, defaults.node.background_threads); ASSERT_NE (conf.node.secondary_work_peers, defaults.node.secondary_work_peers); @@ -704,10 +720,16 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.work_threads, defaults.node.work_threads); ASSERT_NE (conf.node.max_queued_requests, defaults.node.max_queued_requests); ASSERT_NE (conf.node.request_aggregator_threads, defaults.node.request_aggregator_threads); + ASSERT_NE (conf.node.enable_upnp, defaults.node.enable_upnp); + ASSERT_NE (conf.node.backlog_scan.enable, defaults.node.backlog_scan.enable); ASSERT_NE (conf.node.backlog_scan.batch_size, defaults.node.backlog_scan.batch_size); ASSERT_NE (conf.node.backlog_scan.rate_limit, defaults.node.backlog_scan.rate_limit); - ASSERT_NE (conf.node.enable_upnp, defaults.node.enable_upnp); + + ASSERT_NE (conf.node.bounded_backlog.enable, defaults.node.bounded_backlog.enable); + ASSERT_NE (conf.node.bounded_backlog.batch_size, defaults.node.bounded_backlog.batch_size); + ASSERT_NE (conf.node.bounded_backlog.max_queued_notifications, defaults.node.bounded_backlog.max_queued_notifications); + ASSERT_NE (conf.node.bounded_backlog.scan_rate, defaults.node.bounded_backlog.scan_rate); ASSERT_NE (conf.node.websocket_config.enabled, defaults.node.websocket_config.enabled); ASSERT_NE (conf.node.websocket_config.address, defaults.node.websocket_config.address); @@ -804,7 +826,7 @@ TEST (toml, daemon_config_deserialize_no_defaults) } /** There should be no required values **/ -TEST (toml, daemon_config_no_required) +TEST (toml_config, daemon_config_no_required) { std::stringstream ss; @@ -835,7 +857,7 @@ TEST (toml, daemon_config_no_required) } /** Deserialize an rpc config with non-default values */ -TEST (toml, rpc_config_deserialize_no_defaults) +TEST (toml_config, rpc_config_deserialize_no_defaults) { std::stringstream ss; @@ -878,7 +900,7 @@ TEST (toml, rpc_config_deserialize_no_defaults) } /** There should be no required values **/ -TEST (toml, rpc_config_no_required) +TEST (toml_config, rpc_config_no_required) { std::stringstream ss; @@ -900,7 +922,7 @@ TEST (toml, rpc_config_no_required) } /** Deserialize a node config with incorrect values */ -TEST (toml, daemon_config_deserialize_errors) +TEST (toml_config, daemon_config_deserialize_errors) { { std::stringstream ss; @@ -932,7 +954,7 @@ TEST (toml, daemon_config_deserialize_errors) } } -TEST (toml, daemon_read_config) +TEST (toml_config, daemon_read_config) { auto path (nano::unique_path ()); std::filesystem::create_directories (path); @@ -976,7 +998,7 @@ TEST (toml, daemon_read_config) } } -TEST (toml, log_config_defaults) +TEST (toml_config, log_config_defaults) { std::stringstream ss; @@ -1002,7 +1024,7 @@ TEST (toml, log_config_defaults) ASSERT_EQ (confg.file.rotation_count, defaults.file.rotation_count); } -TEST (toml, log_config_no_defaults) +TEST (toml_config, log_config_no_defaults) { std::stringstream ss; @@ -1044,7 +1066,7 @@ TEST (toml, log_config_no_defaults) ASSERT_NE (confg.file.rotation_count, defaults.file.rotation_count); } -TEST (toml, log_config_no_required) +TEST (toml_config, log_config_no_required) { std::stringstream ss; @@ -1065,7 +1087,7 @@ TEST (toml, log_config_no_required) ASSERT_FALSE (toml.get_error ()) << toml.get_error ().get_message (); } -TEST (toml, merge_config_files) +TEST (toml_config, merge_config_files) { nano::network_params network_params{ nano::network_constants::active_network }; nano::tomlconfig default_toml; diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index 85136b98b7..b3f5312577 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -83,6 +83,7 @@ enum class type local_block_broadcaster, monitor, confirming_set, + bounded_backlog, // bootstrap bulk_pull_client, diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 8281754014..2022b29083 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -85,6 +85,7 @@ enum class type active_elections_cancelled, active_elections_cemented, backlog_scan, + bounded_backlog, backlog, unchecked, election_scheduler, @@ -203,6 +204,7 @@ enum class detail unchecked, local, forced, + election, // message specific not_a_type, @@ -569,6 +571,14 @@ enum class detail blocks_by_account, account_info_by_hash, + // bounded backlog, + gathered_targets, + performing_rollbacks, + no_targets, + rollback_missing_block, + rollback_skipped, + loop_scan, + _last // Must be the last enum }; diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 10b6c0d333..503b0281f8 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -100,6 +100,15 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::backlog_scan: thread_role_name_string = "Backlog scan"; break; + case nano::thread_role::name::bounded_backlog: + thread_role_name_string = "Bounded backlog"; + break; + case nano::thread_role::name::bounded_backlog_scan: + thread_role_name_string = "Bounded b scan"; + break; + case nano::thread_role::name::bounded_backlog_notifications: + thread_role_name_string = "Bounded b notif"; + break; case nano::thread_role::name::vote_generator_queue: thread_role_name_string = "Voting que"; break; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index c98248ceae..420aa0d8ea 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -38,6 +38,9 @@ enum class name db_parallel_traversal, unchecked, backlog_scan, + bounded_backlog, + bounded_backlog_scan, + bounded_backlog_notifications, vote_generator_queue, telemetry, bootstrap, diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 7fa1263972..13a182adff 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -24,6 +24,8 @@ add_library( blockprocessor.cpp bucketing.hpp bucketing.cpp + bounded_backlog.hpp + bounded_backlog.cpp bootstrap_weights_beta.hpp bootstrap_weights_live.hpp bootstrap/account_sets.hpp diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index de314b20e9..544de7973c 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -51,7 +51,7 @@ nano::block_processor::block_processor (nano::node_config const & node_config, n case nano::block_source::local: return config.priority_local; default: - return 1; + return config.priority_system; } }; diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 50ae78c2ef..1adec9ca62 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -24,6 +24,7 @@ enum class block_source unchecked, local, forced, + election, }; std::string_view to_string (block_source); @@ -47,6 +48,7 @@ class block_processor_config final size_t priority_live{ 1 }; size_t priority_bootstrap{ 8 }; size_t priority_local{ 16 }; + size_t priority_system{ 32 }; size_t batch_size{ 256 }; size_t max_queued_notifications{ 8 }; diff --git a/nano/node/bootstrap/bootstrap_service.cpp b/nano/node/bootstrap/bootstrap_service.cpp index 2a71ffb08e..e6a2295cb8 100644 --- a/nano/node/bootstrap/bootstrap_service.cpp +++ b/nano/node/bootstrap/bootstrap_service.cpp @@ -50,6 +50,16 @@ nano::bootstrap_service::bootstrap_service (nano::node_config const & node_confi condition.notify_all (); }); + // Unblock rolled back accounts as the dependency is no longer valid + block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { + nano::lock_guard lock{ mutex }; + for (auto const & block : blocks) + { + debug_assert (block != nullptr); + accounts.unblock (block->account ()); + } + }); + accounts.priority_set (node_config_a.network_params.ledger.genesis->account_field ().value ()); } diff --git a/nano/node/bounded_backlog.cpp b/nano/node/bounded_backlog.cpp new file mode 100644 index 0000000000..4a800a60d3 --- /dev/null +++ b/nano/node/bounded_backlog.cpp @@ -0,0 +1,564 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +nano::bounded_backlog::bounded_backlog (nano::node_config const & config_a, nano::node & node_a, nano::ledger & ledger_a, nano::bucketing & bucketing_a, nano::backlog_scan & backlog_scan_a, nano::block_processor & block_processor_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a) : + config{ config_a }, + node{ node_a }, + ledger{ ledger_a }, + bucketing{ bucketing_a }, + backlog_scan{ backlog_scan_a }, + block_processor{ block_processor_a }, + confirming_set{ confirming_set_a }, + stats{ stats_a }, + logger{ logger_a }, + scan_limiter{ config.bounded_backlog.scan_rate }, + workers{ 1, nano::thread_role::name::bounded_backlog_notifications } +{ + // Activate accounts with unconfirmed blocks + backlog_scan.batch_activated.add ([this] (auto const & batch) { + auto transaction = ledger.tx_begin_read (); + for (auto const & info : batch) + { + activate (transaction, info.account, info.account_info, info.conf_info); + } + }); + + // Erase accounts with all confirmed blocks + backlog_scan.batch_scanned.add ([this] (auto const & batch) { + nano::lock_guard guard{ mutex }; + for (auto const & info : batch) + { + if (info.conf_info.height == info.account_info.block_count) + { + index.erase (info.account); + } + } + }); + + // Track unconfirmed blocks + block_processor.batch_processed.add ([this] (auto const & batch) { + auto transaction = ledger.tx_begin_read (); + for (auto const & [result, context] : batch) + { + if (result == nano::block_status::progress) + { + auto const & block = context.block; + insert (transaction, *block); + } + } + }); + + // Remove rolled back blocks from the backlog + block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { + nano::lock_guard guard{ mutex }; + for (auto const & block : blocks) + { + index.erase (block->hash ()); + } + }); + + // Remove cemented blocks from the backlog + confirming_set.batch_cemented.add ([this] (auto const & batch) { + nano::lock_guard guard{ mutex }; + for (auto const & context : batch) + { + index.erase (context.block->hash ()); + } + }); +} + +nano::bounded_backlog::~bounded_backlog () +{ + // Thread must be stopped before destruction + debug_assert (!thread.joinable ()); + debug_assert (!scan_thread.joinable ()); + debug_assert (!workers.alive ()); +} + +void nano::bounded_backlog::start () +{ + debug_assert (!thread.joinable ()); + + if (!config.bounded_backlog.enable) + { + return; + } + + workers.start (); + + thread = std::thread{ [this] () { + nano::thread_role::set (nano::thread_role::name::bounded_backlog); + run (); + } }; + + scan_thread = std::thread{ [this] () { + nano::thread_role::set (nano::thread_role::name::bounded_backlog_scan); + run_scan (); + } }; +} + +void nano::bounded_backlog::stop () +{ + { + nano::lock_guard lock{ mutex }; + stopped = true; + } + condition.notify_all (); + if (thread.joinable ()) + { + thread.join (); + } + if (scan_thread.joinable ()) + { + scan_thread.join (); + } + workers.stop (); +} + +size_t nano::bounded_backlog::index_size () const +{ + nano::lock_guard guard{ mutex }; + return index.size (); +} + +void nano::bounded_backlog::activate (nano::secure::transaction & transaction, nano::account const & account, nano::account_info const & account_info, nano::confirmation_height_info const & conf_info) +{ + debug_assert (conf_info.frontier != account_info.head); + + // Insert blocks into the index starting from the account head block + auto block = ledger.any.block_get (transaction, account_info.head); + while (block) + { + // We reached the confirmed frontier, no need to track more blocks + if (block->hash () == conf_info.frontier) + { + break; + } + // Check if the block is already in the backlog, avoids unnecessary ledger lookups + if (contains (block->hash ())) + { + break; + } + + bool inserted = insert (transaction, *block); + + // If the block was not inserted, we already have it in the backlog + if (!inserted) + { + break; + } + + transaction.refresh_if_needed (); + + block = ledger.any.block_get (transaction, block->previous ()); + } +} + +void nano::bounded_backlog::update (nano::secure::transaction const & transaction, nano::block_hash const & hash) +{ + // Erase if the block is either confirmed or missing + if (!ledger.unconfirmed_exists (transaction, hash)) + { + nano::lock_guard guard{ mutex }; + index.erase (hash); + } +} + +bool nano::bounded_backlog::insert (nano::secure::transaction const & transaction, nano::block const & block) +{ + auto const [priority_balance, priority_timestamp] = ledger.block_priority (transaction, block); + auto const bucket_index = bucketing.bucket_index (priority_balance); + + nano::lock_guard guard{ mutex }; + + return index.insert (block, bucket_index, priority_timestamp); +} + +bool nano::bounded_backlog::predicate () const +{ + debug_assert (!mutex.try_lock ()); + // Both ledger and tracked backlog must be over the threshold + return ledger.backlog_count () > config.max_backlog && index.size () > config.max_backlog; +} + +void nano::bounded_backlog::run () +{ + std::unique_lock lock{ mutex }; + while (!stopped) + { + condition.wait_for (lock, 1s, [this] { + return stopped || predicate (); + }); + + if (stopped) + { + return; + } + + // Wait until all notification about the previous rollbacks are processed + while (workers.queued_tasks () >= config.bounded_backlog.max_queued_notifications) + { + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::cooldown); + condition.wait_for (lock, 100ms, [this] { return stopped.load (); }); + if (stopped) + { + return; + } + } + + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::loop); + + // Calculate the number of targets to rollback + uint64_t const backlog = ledger.backlog_count (); + uint64_t const target_count = backlog > config.max_backlog ? backlog - config.max_backlog : 0; + + auto targets = gather_targets (std::min (target_count, static_cast (config.bounded_backlog.batch_size))); + if (!targets.empty ()) + { + lock.unlock (); + + stats.add (nano::stat::type::bounded_backlog, nano::stat::detail::gathered_targets, targets.size ()); + auto processed = perform_rollbacks (targets, target_count); + + lock.lock (); + + // Erase rolled back blocks from the index + for (auto const & hash : processed) + { + index.erase (hash); + } + } + else + { + // Cooldown, this should not happen in normal operation + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::no_targets); + condition.wait_for (lock, 100ms, [this] { + return stopped.load (); + }); + } + } +} + +bool nano::bounded_backlog::should_rollback (nano::block_hash const & hash) const +{ + if (node.vote_cache.contains (hash)) + { + return false; + } + if (node.vote_router.contains (hash)) + { + return false; + } + if (node.active.recently_confirmed.exists (hash)) + { + return false; + } + if (node.scheduler.contains (hash)) + { + return false; + } + if (node.confirming_set.contains (hash)) + { + return false; + } + if (node.local_block_broadcaster.contains (hash)) + { + return false; + } + return true; +} + +std::deque nano::bounded_backlog::perform_rollbacks (std::deque const & targets, size_t max_rollbacks) +{ + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::performing_rollbacks); + + auto transaction = ledger.tx_begin_write (nano::store::writer::bounded_backlog); + + std::deque processed; + for (auto const & hash : targets) + { + // Skip the rollback if the block is being used by the node, this should be race free as it's checked while holding the ledger write lock + if (!should_rollback (hash)) + { + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::rollback_skipped); + continue; + } + + // Here we check that the block is still OK to rollback, there could be a delay between gathering the targets and performing the rollbacks + if (auto block = ledger.any.block_get (transaction, hash)) + { + logger.debug (nano::log::type::bounded_backlog, "Rolling back: {}, account: {}", hash.to_string (), block->account ().to_account ()); + + std::deque> rollback_list; + bool error = ledger.rollback (transaction, hash, rollback_list); + stats.inc (nano::stat::type::bounded_backlog, error ? nano::stat::detail::rollback_failed : nano::stat::detail::rollback); + + for (auto const & rollback : rollback_list) + { + processed.push_back (rollback->hash ()); + } + + // Notify observers of the rolled back blocks on a background thread, avoid dispatching notifications when holding ledger write transaction + workers.post ([this, rollback_list = std::move (rollback_list), root = block->qualified_root ()] { + // TODO: Calling block_processor's event here is not ideal, but duplicating these events is even worse + block_processor.rolled_back.notify (rollback_list, root); + }); + + // Return early if we reached the maximum number of rollbacks + if (processed.size () >= max_rollbacks) + { + break; + } + } + else + { + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::rollback_missing_block); + processed.push_back (hash); + } + } + + return processed; +} + +size_t nano::bounded_backlog::bucket_threshold () const +{ + return config.max_backlog / bucketing.size (); +} + +std::deque nano::bounded_backlog::gather_targets (size_t max_count) const +{ + debug_assert (!mutex.try_lock ()); + + std::deque targets; + + // Start rolling back from lowest index buckets first + for (auto bucket : bucketing.bucket_indices ()) + { + // Only start rolling back if the bucket is over the threshold of unconfirmed blocks + if (index.size (bucket) > bucket_threshold ()) + { + auto const count = std::min (max_count, config.bounded_backlog.batch_size); + + auto const top = index.top (bucket, count, [this] (auto const & hash) { + // Only rollback if the block is not being used by the node + return should_rollback (hash); + }); + + for (auto const & entry : top) + { + targets.push_back (entry); + } + } + } + + return targets; +} + +void nano::bounded_backlog::run_scan () +{ + std::unique_lock lock{ mutex }; + while (!stopped) + { + auto wait = [&] (auto count) { + while (!scan_limiter.should_pass (count)) + { + condition.wait_for (lock, 100ms); + if (stopped) + { + return; + } + } + }; + + nano::block_hash last = 0; + while (!stopped) + { + wait (config.bounded_backlog.batch_size); + + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::loop_scan); + + auto batch = index.next (last, config.bounded_backlog.batch_size); + if (batch.empty ()) // If batch is empty, we iterated over all accounts in the index + { + break; + } + + lock.unlock (); + { + auto transaction = ledger.tx_begin_read (); + for (auto const & hash : batch) + { + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::scanned); + update (transaction, hash); + last = hash; + } + } + lock.lock (); + } + } +} + +bool nano::bounded_backlog::contains (nano::block_hash const & hash) const +{ + nano::lock_guard guard{ mutex }; + return index.contains (hash); +} + +nano::container_info nano::bounded_backlog::container_info () const +{ + nano::lock_guard guard{ mutex }; + nano::container_info info; + info.put ("backlog", index.size ()); + info.put ("notifications", workers.queued_tasks ()); + info.add ("index", index.container_info ()); + return info; +} + +/* + * backlog_index + */ + +bool nano::backlog_index::insert (nano::block const & block, nano::bucket_index bucket, nano::priority_timestamp priority) +{ + auto const hash = block.hash (); + auto const account = block.account (); + + entry new_entry{ + .hash = hash, + .account = account, + .bucket = bucket, + .priority = priority, + }; + + auto [it, inserted] = blocks.emplace (new_entry); + if (inserted) + { + size_by_bucket[bucket]++; + return true; + } + return false; +} + +bool nano::backlog_index::erase (nano::account const & account) +{ + auto const [begin, end] = blocks.get ().equal_range (account); + for (auto it = begin; it != end;) + { + size_by_bucket[it->bucket]--; + it = blocks.get ().erase (it); + } + return begin != end; +} + +bool nano::backlog_index::erase (nano::block_hash const & hash) +{ + if (auto existing = blocks.get ().find (hash); existing != blocks.get ().end ()) + { + size_by_bucket[existing->bucket]--; + blocks.get ().erase (existing); + return true; + } + return false; +} + +std::deque nano::backlog_index::top (nano::bucket_index bucket, size_t count, filter_callback const & filter) const +{ + // Highest timestamp, lowest priority, iterate in descending order + priority_key const starting_key{ bucket, std::numeric_limits::max () }; + + std::deque results; + auto begin = blocks.get ().lower_bound (starting_key); + for (auto it = begin; it != blocks.get ().end () && it->bucket == bucket && results.size () < count; ++it) + { + if (filter (it->hash)) + { + results.push_back (it->hash); + } + } + return results; +} + +std::deque nano::backlog_index::next (nano::block_hash last, size_t count) const +{ + std::deque results; + + auto it = blocks.get ().upper_bound (last); + auto end = blocks.get ().end (); + + for (; it != end && results.size () < count; ++it) + { + results.push_back (it->hash); + } + return results; +} + +bool nano::backlog_index::contains (nano::block_hash const & hash) const +{ + return blocks.get ().contains (hash); +} + +size_t nano::backlog_index::size () const +{ + return blocks.size (); +} + +size_t nano::backlog_index::size (nano::bucket_index bucket) const +{ + if (auto it = size_by_bucket.find (bucket); it != size_by_bucket.end ()) + { + return it->second; + } + return 0; +} + +nano::container_info nano::backlog_index::container_info () const +{ + auto collect_bucket_sizes = [&] () { + nano::container_info info; + for (auto [bucket, count] : size_by_bucket) + { + info.put (std::to_string (bucket), count); + } + return info; + }; + + nano::container_info info; + info.put ("blocks", blocks); + info.add ("sizes", collect_bucket_sizes ()); + return info; +} + +/* + * bounded_backlog_config + */ + +nano::error nano::bounded_backlog_config::serialize (nano::tomlconfig & toml) const +{ + toml.put ("enable", enable, "Enable the bounded backlog. \ntype:bool"); + toml.put ("batch_size", batch_size, "Maximum number of blocks to rollback per iteration. \ntype:uint64"); + toml.put ("max_queued_notifications", max_queued_notifications, "Maximum number of queued background tasks before cooldown. \ntype:uint64"); + toml.put ("scan_rate", scan_rate, "Rate limit for refreshing the backlog index. \ntype:uint64"); + + return toml.get_error (); +} + +nano::error nano::bounded_backlog_config::deserialize (nano::tomlconfig & toml) +{ + toml.get ("enable", enable); + toml.get ("batch_size", batch_size); + toml.get ("max_queued_notifications", max_queued_notifications); + toml.get ("scan_rate", scan_rate); + + return toml.get_error (); +} \ No newline at end of file diff --git a/nano/node/bounded_backlog.hpp b/nano/node/bounded_backlog.hpp new file mode 100644 index 0000000000..807bfe69d6 --- /dev/null +++ b/nano/node/bounded_backlog.hpp @@ -0,0 +1,161 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +namespace mi = boost::multi_index; + +namespace nano +{ +class backlog_index +{ +public: + struct priority_key + { + nano::bucket_index bucket; + nano::priority_timestamp priority; + + auto operator<=> (priority_key const &) const = default; + }; + + struct entry + { + nano::block_hash hash; + nano::account account; + nano::bucket_index bucket; + nano::priority_timestamp priority; + + backlog_index::priority_key priority_key () const + { + return { bucket, priority }; + } + }; + +public: + backlog_index () = default; + + bool insert (nano::block const & block, nano::bucket_index, nano::priority_timestamp); + + bool erase (nano::account const & account); + bool erase (nano::block_hash const & hash); + + using filter_callback = std::function; + std::deque top (nano::bucket_index, size_t count, filter_callback const &) const; + + std::deque next (nano::block_hash last, size_t count) const; + + bool contains (nano::block_hash const & hash) const; + size_t size () const; + size_t size (nano::bucket_index) const; + + nano::container_info container_info () const; + +private: + // clang-format off + class tag_hash {}; + class tag_hash_ordered {}; + class tag_account {}; + class tag_priority {}; + + using ordered_blocks = boost::multi_index_container, // Allows for fast lookup + mi::member>, + mi::ordered_unique, // Allows for sequential scan + mi::member>, + mi::hashed_non_unique, + mi::member>, + mi::ordered_non_unique, + mi::const_mem_fun, std::greater<>> // DESC order + >>; + // clang-format on + + ordered_blocks blocks; + + // Keep track of the size of the backlog in number of unconfirmed blocks per bucket + std::unordered_map size_by_bucket; +}; + +class bounded_backlog_config +{ +public: + nano::error deserialize (nano::tomlconfig &); + nano::error serialize (nano::tomlconfig &) const; + +public: + bool enable{ true }; + size_t batch_size{ 32 }; + size_t max_queued_notifications{ 128 }; + size_t scan_rate{ 64 }; +}; + +class bounded_backlog +{ +public: + bounded_backlog (nano::node_config const &, nano::node &, nano::ledger &, nano::bucketing &, nano::backlog_scan &, nano::block_processor &, nano::confirming_set &, nano::stats &, nano::logger &); + ~bounded_backlog (); + + void start (); + void stop (); + + size_t index_size () const; + size_t bucket_threshold () const; + bool contains (nano::block_hash const &) const; + + nano::container_info container_info () const; + +private: // Dependencies + nano::node_config const & config; + nano::node & node; + nano::ledger & ledger; + nano::bucketing & bucketing; + nano::backlog_scan & backlog_scan; + nano::block_processor & block_processor; + nano::confirming_set & confirming_set; + nano::stats & stats; + nano::logger & logger; + +private: + void activate (nano::secure::transaction &, nano::account const &, nano::account_info const &, nano::confirmation_height_info const &); + void update (nano::secure::transaction const &, nano::block_hash const &); + bool insert (nano::secure::transaction const &, nano::block const &); + + bool predicate () const; + void run (); + std::deque gather_targets (size_t max_count) const; + bool should_rollback (nano::block_hash const &) const; + + std::deque perform_rollbacks (std::deque const & targets, size_t max_rollbacks); + + void run_scan (); + +private: + nano::backlog_index index; + + nano::rate_limiter scan_limiter; + + std::atomic stopped{ false }; + nano::condition_variable condition; + mutable nano::mutex mutex; + std::thread thread; + std::thread scan_thread; + + nano::thread_pool workers; +}; +} \ No newline at end of file diff --git a/nano/node/election.cpp b/nano/node/election.cpp index 1e760bb938..ee64237059 100644 --- a/nano/node/election.cpp +++ b/nano/node/election.cpp @@ -399,7 +399,7 @@ void nano::election::confirm_if_quorum (nano::unique_lock & lock_a) { debug_assert (lock_a.owns_lock ()); auto tally_l (tally_impl ()); - debug_assert (!tally_l.empty ()); + release_assert (!tally_l.empty ()); auto winner (tally_l.begin ()); auto block_l (winner->second); auto const & winner_hash_l (block_l->hash ()); @@ -425,6 +425,8 @@ void nano::election::confirm_if_quorum (nano::unique_lock & lock_a) } if (final_weight >= node.online_reps.delta ()) { + // In some edge cases block might get rolled back while the election is confirming, reprocess it to ensure it's present in the ledger + node.block_processor.add (block_l, nano::block_source::election); confirm_once (lock_a); debug_assert (!lock_a.owns_lock ()); } diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index 414c3e6c3c..e477f8414a 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -9,7 +9,9 @@ namespace nano { class account_sets_config; class active_elections; +class backlog_scan; class block_processor; +class bounded_backlog; class bucketing; class bootstrap_config; class bootstrap_server; diff --git a/nano/node/local_block_broadcaster.cpp b/nano/node/local_block_broadcaster.cpp index b2a41a747f..31715309a7 100644 --- a/nano/node/local_block_broadcaster.cpp +++ b/nano/node/local_block_broadcaster.cpp @@ -103,6 +103,12 @@ void nano::local_block_broadcaster::stop () nano::join_or_pass (thread); } +bool nano::local_block_broadcaster::contains (nano::block_hash const & hash) const +{ + nano::lock_guard lock{ mutex }; + return local_blocks.get ().contains (hash); +} + size_t nano::local_block_broadcaster::size () const { nano::lock_guard lock{ mutex }; diff --git a/nano/node/local_block_broadcaster.hpp b/nano/node/local_block_broadcaster.hpp index a88ed25f95..567244491f 100644 --- a/nano/node/local_block_broadcaster.hpp +++ b/nano/node/local_block_broadcaster.hpp @@ -59,6 +59,7 @@ class local_block_broadcaster final void start (); void stop (); + bool contains (nano::block_hash const &) const; size_t size () const; nano::container_info container_info () const; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 22ae40f352..5e87808671 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -159,6 +160,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy wallets (wallets_store.init_error (), *this), backlog_scan_impl{ std::make_unique (config.backlog_scan, ledger, stats) }, backlog_scan{ *backlog_scan_impl }, + backlog_impl{ std::make_unique (config, *this, ledger, bucketing, backlog_scan, block_processor, confirming_set, stats, logger) }, + backlog{ *backlog_impl }, bootstrap_server_impl{ std::make_unique (config.bootstrap_server, store, ledger, network_params.network, stats) }, bootstrap_server{ *bootstrap_server_impl }, bootstrap_impl{ std::make_unique (config, block_processor, ledger, network, stats, logger) }, @@ -651,6 +654,7 @@ void nano::node::start () scheduler.start (); aggregator.start (); backlog_scan.start (); + backlog.start (); bootstrap_server.start (); bootstrap.start (); websocket.start (); @@ -683,6 +687,7 @@ void nano::node::stop () distributed_work.stop (); backlog_scan.stop (); bootstrap.stop (); + backlog.stop (); rep_crawler.stop (); unchecked.stop (); block_processor.stop (); @@ -1211,6 +1216,7 @@ nano::container_info nano::node::container_info () const info.add ("message_processor", message_processor.container_info ()); info.add ("bandwidth", outbound_limiter.container_info ()); info.add ("backlog_scan", backlog_scan.container_info ()); + info.add ("bounded_backlog", backlog.container_info ()); return info; } diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 8ade202c4c..56c6727802 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -33,7 +33,6 @@ namespace nano { class active_elections; -class backlog_scan; class bandwidth_limiter; class confirming_set; class message_processor; @@ -203,6 +202,8 @@ class node final : public std::enable_shared_from_this nano::wallets wallets; std::unique_ptr backlog_scan_impl; nano::backlog_scan & backlog_scan; + std::unique_ptr backlog_impl; + nano::bounded_backlog & backlog; std::unique_ptr bootstrap_server_impl; nano::bootstrap_server & bootstrap_server; std::unique_ptr bootstrap_impl; diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index 8fb095afd8..f6ccdf563f 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -140,6 +140,7 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const toml.put ("max_queued_requests", max_queued_requests, "Limit for number of queued confirmation requests for one channel, after which new requests are dropped until the queue drops below this value.\ntype:uint32"); toml.put ("request_aggregator_threads", request_aggregator_threads, "Number of threads to dedicate to request aggregator. Defaults to using all cpu threads, up to a maximum of 4"); toml.put ("max_unchecked_blocks", max_unchecked_blocks, "Maximum number of unchecked blocks to store in memory. Defaults to 65536. \ntype:uint64,[0..]"); + toml.put ("max_backlog", max_backlog, "Maximum number of unconfirmed blocks to keep in the ledger. If this limit is exceeded, the node will start dropping low-priority unconfirmed blocks.\ntype:uint64"); toml.put ("rep_crawler_weight_minimum", rep_crawler_weight_minimum.to_string_dec (), "Rep crawler minimum weight, if this is less than minimum principal weight then this is taken as the minimum weight a rep must have to be tracked. If you want to track all reps set this to 0. If you do not want this to influence anything then set it to max value. This is only useful for debugging or for people who really know what they are doing.\ntype:string,amount,raw"); toml.put ("enable_upnp", enable_upnp, "Enable or disable automatic UPnP port forwarding. This feature only works if the node is directly connected to a router (not inside a docker container, etc.).\ntype:bool"); @@ -262,6 +263,10 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const backlog_scan.serialize (backlog_scan_l); toml.put_child ("backlog_scan", backlog_scan_l); + nano::tomlconfig bounded_backlog_l; + bounded_backlog.serialize (bounded_backlog_l); + toml.put_child ("bounded_backlog", bounded_backlog_l); + return toml.get_error (); } @@ -401,6 +406,12 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) backlog_scan.deserialize (config_l); } + if (toml.has_key ("bounded_backlog")) + { + auto config_l = toml.get_required_child ("bounded_backlog"); + bounded_backlog.deserialize (config_l); + } + /* * Values */ @@ -552,6 +563,7 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) toml.get ("request_aggregator_threads", request_aggregator_threads); toml.get ("max_unchecked_blocks", max_unchecked_blocks); + toml.get ("max_backlog", max_backlog); auto rep_crawler_weight_minimum_l (rep_crawler_weight_minimum.to_string_dec ()); if (toml.has_key ("rep_crawler_weight_minimum")) diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index ed4ca6fa3e..4871b98711 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -128,11 +129,14 @@ class node_config uint32_t max_queued_requests{ 512 }; unsigned request_aggregator_threads{ std::min (nano::hardware_concurrency (), 4u) }; // Max 4 threads if available unsigned max_unchecked_blocks{ 65536 }; + std::size_t max_backlog{ 100000 }; std::chrono::seconds max_pruning_age{ !network_params.network.is_beta_network () ? std::chrono::seconds (24 * 60 * 60) : std::chrono::seconds (5 * 60) }; // 1 day; 5 minutes for beta network uint64_t max_pruning_depth{ 0 }; nano::rocksdb_config rocksdb_config; nano::lmdb_config lmdb_config; bool enable_upnp{ true }; + +public: nano::vote_cache_config vote_cache; nano::rep_crawler_config rep_crawler; nano::block_processor_config block_processor; @@ -147,6 +151,7 @@ class node_config nano::confirming_set_config confirming_set; nano::monitor_config monitor; nano::backlog_scan_config backlog_scan; + nano::bounded_backlog_config bounded_backlog; public: /** Entry is ignored if it cannot be parsed as a valid address:port */ diff --git a/nano/secure/ledger.cpp b/nano/secure/ledger.cpp index 6d1bbc9b9b..60d39ab1c3 100644 --- a/nano/secure/ledger.cpp +++ b/nano/secure/ledger.cpp @@ -792,6 +792,11 @@ void nano::ledger::initialize (nano::generate_cache_flags const & generate_cache cache.pruned_count = store.pruned.count (transaction); } +bool nano::ledger::unconfirmed_exists (secure::transaction const & transaction, nano::block_hash const & hash) +{ + return any.block_exists (transaction, hash) && !confirmed.block_exists (transaction, hash); +} + nano::uint128_t nano::ledger::account_receivable (secure::transaction const & transaction_a, nano::account const & account_a, bool only_confirmed_a) { nano::uint128_t result (0); @@ -1537,6 +1542,13 @@ uint64_t nano::ledger::pruned_count () const return cache.pruned_count; } +uint64_t nano::ledger::backlog_count () const +{ + auto blocks = cache.block_count.load (); + auto cemented = cache.cemented_count.load (); + return (blocks > cemented) ? blocks - cemented : 0; +} + nano::container_info nano::ledger::container_info () const { nano::container_info info; diff --git a/nano/secure/ledger.hpp b/nano/secure/ledger.hpp index c9a987be72..91e1b8155d 100644 --- a/nano/secure/ledger.hpp +++ b/nano/secure/ledger.hpp @@ -43,6 +43,7 @@ class ledger final /** Start read-only transaction */ secure::read_transaction tx_begin_read () const; + bool unconfirmed_exists (secure::transaction const &, nano::block_hash const &); nano::uint128_t account_receivable (secure::transaction const &, nano::account const &, bool = false); /** * Returns the cached vote weight for the given representative. @@ -83,6 +84,7 @@ class ledger final uint64_t block_count () const; uint64_t account_count () const; uint64_t pruned_count () const; + uint64_t backlog_count () const; // Returned priority balance is maximum of block balance and previous block balance to handle full account balance send cases // Returned timestamp is the previous block timestamp or the current timestamp if there's no previous block diff --git a/nano/secure/ledger_set_any.cpp b/nano/secure/ledger_set_any.cpp index 67b14fda19..4eed6292b0 100644 --- a/nano/secure/ledger_set_any.cpp +++ b/nano/secure/ledger_set_any.cpp @@ -123,11 +123,19 @@ std::optional nano::ledger_set_any::block_balance (secure::transac bool nano::ledger_set_any::block_exists (secure::transaction const & transaction, nano::block_hash const & hash) const { + if (hash.is_zero ()) + { + return false; + } return ledger.store.block.exists (transaction, hash); } bool nano::ledger_set_any::block_exists_or_pruned (secure::transaction const & transaction, nano::block_hash const & hash) const { + if (hash.is_zero ()) + { + return false; + } if (ledger.store.pruned.exists (transaction, hash)) { return true; @@ -137,6 +145,10 @@ bool nano::ledger_set_any::block_exists_or_pruned (secure::transaction const & t std::shared_ptr nano::ledger_set_any::block_get (secure::transaction const & transaction, nano::block_hash const & hash) const { + if (hash.is_zero ()) + { + return nullptr; + } return ledger.store.block.get (transaction, hash); } diff --git a/nano/secure/ledger_set_confirmed.cpp b/nano/secure/ledger_set_confirmed.cpp index 8702d61b4f..655f04edf8 100644 --- a/nano/secure/ledger_set_confirmed.cpp +++ b/nano/secure/ledger_set_confirmed.cpp @@ -45,10 +45,6 @@ uint64_t nano::ledger_set_confirmed::account_height (secure::transaction const & std::optional nano::ledger_set_confirmed::block_balance (secure::transaction const & transaction, nano::block_hash const & hash) const { - if (hash.is_zero ()) - { - return std::nullopt; - } auto block = block_get (transaction, hash); if (!block) { @@ -64,6 +60,10 @@ bool nano::ledger_set_confirmed::block_exists (secure::transaction const & trans bool nano::ledger_set_confirmed::block_exists_or_pruned (secure::transaction const & transaction, nano::block_hash const & hash) const { + if (hash.is_zero ()) + { + return false; + } if (ledger.store.pruned.exists (transaction, hash)) { return true; @@ -73,6 +73,10 @@ bool nano::ledger_set_confirmed::block_exists_or_pruned (secure::transaction con std::shared_ptr nano::ledger_set_confirmed::block_get (secure::transaction const & transaction, nano::block_hash const & hash) const { + if (hash.is_zero ()) + { + return nullptr; + } auto block = ledger.store.block.get (transaction, hash); if (!block) { diff --git a/nano/secure/transaction.hpp b/nano/secure/transaction.hpp index 8a3ebd9c1d..0a1bd90a41 100644 --- a/nano/secure/transaction.hpp +++ b/nano/secure/transaction.hpp @@ -27,6 +27,9 @@ class transaction // Conversion operator to const nano::store::transaction& virtual operator const nano::store::transaction & () const = 0; + + // Certain transactions may need to be refreshed if they are held for a long time + virtual bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) = 0; }; class write_transaction final : public transaction @@ -69,7 +72,7 @@ class write_transaction final : public transaction renew (); } - bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) + bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) override { auto now = std::chrono::steady_clock::now (); if (now - start > max_age) @@ -119,9 +122,9 @@ class read_transaction final : public transaction txn.refresh (); } - void refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) + bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) override { - txn.refresh_if_needed (max_age); + return txn.refresh_if_needed (max_age); } auto timestamp () const diff --git a/nano/store/transaction.cpp b/nano/store/transaction.cpp index 0d1d6a9008..6f475bf06c 100644 --- a/nano/store/transaction.cpp +++ b/nano/store/transaction.cpp @@ -83,13 +83,15 @@ void nano::store::read_transaction::refresh () renew (); } -void nano::store::read_transaction::refresh_if_needed (std::chrono::milliseconds max_age) +bool nano::store::read_transaction::refresh_if_needed (std::chrono::milliseconds max_age) { auto now = std::chrono::steady_clock::now (); if (now - start > max_age) { refresh (); + return true; } + return false; } /* diff --git a/nano/store/transaction.hpp b/nano/store/transaction.hpp index 23459a9258..0697e1b1e8 100644 --- a/nano/store/transaction.hpp +++ b/nano/store/transaction.hpp @@ -66,7 +66,7 @@ class read_transaction final : public transaction void reset (); void renew (); void refresh (); - void refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }); + bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }); private: std::unique_ptr impl; diff --git a/nano/store/write_queue.hpp b/nano/store/write_queue.hpp index 0171685b87..0c395212c4 100644 --- a/nano/store/write_queue.hpp +++ b/nano/store/write_queue.hpp @@ -17,6 +17,7 @@ enum class writer confirmation_height, pruning, voting_final, + bounded_backlog, testing // Used in tests to emulate a write lock };