diff --git a/CHANGELOG.md b/CHANGELOG.md index 7cef9ee95379..7e001198fbf5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ Notable updates: - [forest daemon] Support for NV18. [#2558](https://github.com/ChainSafe/forest/pull/2558) [#2579](https://github.com/ChainSafe/forest/pull/2579) +- [forest daemon] Automatic database garbage collection. + [#2638](https://github.com/ChainSafe/forest/pull/2638) ### Changed diff --git a/Cargo.lock b/Cargo.lock index 482880eda003..c857bf9ea822 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3268,6 +3268,7 @@ dependencies = [ "atty", "base64 0.21.0", "boa_engine", + "chrono", "cid", "clap", "convert_case 0.6.0", @@ -3454,7 +3455,6 @@ dependencies = [ "forest_networks", "forest_shim", "forest_utils", - "futures", "fvm_ipld_amt 0.5.1", "fvm_ipld_blockstore", "fvm_ipld_car", @@ -3575,9 +3575,16 @@ version = "0.6.0" dependencies = [ "ahash 0.8.3", "anyhow", + "chrono", "cid", + "flume", + "forest_blocks", + "forest_ipld", "forest_libp2p_bitswap", + "forest_utils", + "fs_extra", "fvm_ipld_blockstore", + "human-repr", "lazy_static", "libipld", "log", @@ -3585,10 +3592,14 @@ dependencies = [ "parity-db", "parking_lot 0.12.1", "prometheus", + "rand 0.8.5", "rocksdb", "serde", + "serde_yaml", "tempfile", "thiserror", + "tokio", + "uuid", ] [[package]] @@ -3665,6 +3676,7 @@ version = "0.6.0" dependencies = [ "anyhow", "cid", + "flume", "forest_blocks", "forest_state_manager", "forest_utils", @@ -3722,6 +3734,7 @@ dependencies = [ "async-recursion", "async-trait", "cid", + "forest_blocks", "forest_db", "forest_json", "forest_utils", @@ -4203,14 +4216,17 @@ dependencies = [ "async-trait", "atty", "blake2b_simd", + "chrono", "cid", "const_format", "cs_serde_bytes", "digest 0.10.6", + "flume", "futures", "fvm_ipld_blockstore", "fvm_ipld_encoding 0.2.3", "fvm_ipld_encoding 0.3.3", + "human-repr", "hyper", "hyper-rustls", "libc", @@ -8741,6 +8757,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "serde_yaml" +version = "0.9.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f82e6c8c047aa50a7328632d067bcae6ef38772a79e28daf32f735e0e4f3dd10" +dependencies = [ + "indexmap", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "serialization_tests" version = "0.6.0" @@ -10014,6 +10043,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "unsafe-libyaml" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad2024452afd3874bf539695e04af6732ba06517424dbf958fdb16a01f3bef6c" + [[package]] name = "unsigned-varint" version = "0.7.1" diff --git a/Cargo.toml b/Cargo.toml index b3938ed1b1a0..e26e314918f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,7 +60,7 @@ bls-signatures = { version = "0.13", default-features = false, features = ["blst byteorder = "1.4.3" bytes = "1.2" cfg-if = "1" -chrono = { version = "0.4", default-features = false, features = [] } +chrono = { version = "0.4", default-features = false, features = ["clock"] } cid = { version = "0.8", default-features = false, features = ["std"] } clap = { version = "4.0", features = ["derive"] } console-subscriber = { version = "0.1", features = ["parking_lot"] } @@ -124,6 +124,7 @@ serde_ipld_dagcbor = "0.2" serde_json = "1.0" serde_tuple = "0.5" serde_with = { version = "2.0.1", features = ["chrono_0_4"] } +serde_yaml = "0.9" sha2 = { version = "0.10.5", default-features = false } tempfile = "3.4" thiserror = "1.0" diff --git a/blockchain/chain/Cargo.toml b/blockchain/chain/Cargo.toml index 08a7e06e9e82..d9dca5499713 100644 --- a/blockchain/chain/Cargo.toml +++ b/blockchain/chain/Cargo.toml @@ -26,7 +26,6 @@ forest_metrics.workspace = true forest_networks.workspace = true forest_shim.workspace = true forest_utils.workspace = true -futures.workspace = true fvm_ipld_amt.workspace = true fvm_ipld_blockstore.workspace = true fvm_ipld_car.workspace = true diff --git a/blockchain/chain/src/store/chain_store.rs b/blockchain/chain/src/store/chain_store.rs index a1341d033498..ff43d69c87a7 100644 --- a/blockchain/chain/src/store/chain_store.rs +++ b/blockchain/chain/src/store/chain_store.rs @@ -1,28 +1,24 @@ // Copyright 2019-2023 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use std::{collections::VecDeque, num::NonZeroUsize, path::Path, sync::Arc, time::SystemTime}; +use std::{num::NonZeroUsize, path::Path, sync::Arc, time::SystemTime}; use ahash::{HashMap, HashMapExt, HashSet}; use anyhow::Result; use async_stream::stream; use bls_signatures::Serialize as SerializeBls; -use cid::{ - multihash::{Code, Code::Blake2b256}, - Cid, -}; +use cid::{multihash::Code::Blake2b256, Cid}; use digest::Digest; use forest_beacon::{BeaconEntry, IGNORE_DRAND_VAR}; use forest_blocks::{Block, BlockHeader, FullTipset, Tipset, TipsetKeys, TxMeta}; use forest_interpreter::BlockMessages; -use forest_ipld::{recurse_links_hash, CidHashSet}; +use forest_ipld::{should_save_block_to_snapshot, walk_snapshot}; use forest_libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite}; use forest_message::{ChainMessage, Message as MessageTrait, SignedMessage}; use forest_metrics::metrics; use forest_networks::ChainConfig; use forest_shim::{ address::Address, - clock::EPOCHS_IN_DAY, crypto::{Signature, SignatureType}, econ::TokenAmount, executor::Receipt, @@ -36,7 +32,6 @@ use forest_utils::{ }, io::Checksum, }; -use futures::Future; use fvm_ipld_amt::Amtv0 as Amt; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_car::CarHeader; @@ -562,7 +557,7 @@ where // Walks over tipset and historical data, sending all blocks visited into the // car writer. - Self::walk_snapshot(tipset, recent_roots, |cid| { + walk_snapshot(tipset, recent_roots, |cid| { let tx_clone = tx.clone(); async move { let block = self @@ -570,15 +565,10 @@ where .get(&cid)? .ok_or_else(|| Error::Other(format!("Cid {cid} not found in blockstore")))?; - // Don't include identity CIDs. - // We only include raw and dagcbor, for now. - // Raw for "code" CIDs. - if u64::from(Code::Identity) != cid.hash().code() - && (cid.codec() == fvm_shared::IPLD_RAW - || cid.codec() == fvm_ipld_encoding::DAG_CBOR) - { + if should_save_block_to_snapshot(&cid) { tx_clone.send_async((cid, block.clone())).await?; } + Ok(block) } }) @@ -604,60 +594,6 @@ where let digest = writer.lock().await.get_mut().finalize(); Ok(digest) } - - /// Walks over tipset and state data and loads all blocks not yet seen. - /// This is tracked based on the callback function loading blocks. - pub async fn walk_snapshot( - tipset: &Tipset, - recent_roots: ChainEpoch, - mut load_block: F, - ) -> Result<(), Error> - where - F: FnMut(Cid) -> T + Send, - T: Future, anyhow::Error>> + Send, - { - let mut seen = CidHashSet::default(); - let mut blocks_to_walk: VecDeque = tipset.cids().to_vec().into(); - let mut current_min_height = tipset.epoch(); - let incl_roots_epoch = tipset.epoch() - recent_roots; - - while let Some(next) = blocks_to_walk.pop_front() { - if !seen.insert(&next) { - continue; - } - - let data = load_block(next).await?; - - let h = BlockHeader::unmarshal_cbor(&data)?; - - if current_min_height > h.epoch() { - current_min_height = h.epoch(); - if current_min_height % EPOCHS_IN_DAY == 0 { - info!(target: "chain_api", "export at: {}", current_min_height); - } - } - - if h.epoch() > incl_roots_epoch { - recurse_links_hash(&mut seen, *h.messages(), &mut load_block).await?; - } - - if h.epoch() > 0 { - for p in h.parents().cids() { - blocks_to_walk.push_back(*p); - } - } else { - for p in h.parents().cids() { - load_block(*p).await?; - } - } - - if h.epoch() == 0 || h.epoch() > incl_roots_epoch { - recurse_links_hash(&mut seen, *h.state_root(), &mut load_block).await?; - } - } - - Ok(()) - } } pub(crate) type TipsetCache = Mutex>>; diff --git a/forest/cli/Cargo.toml b/forest/cli/Cargo.toml index fc4d721b8cfd..567e09bd0d92 100644 --- a/forest/cli/Cargo.toml +++ b/forest/cli/Cargo.toml @@ -13,6 +13,7 @@ anyhow.workspace = true atty = "0.2" base64.workspace = true boa_engine = { version = "0.16.0", features = ["console"] } +chrono.workspace = true cid.workspace = true clap.workspace = true convert_case = "0.6.0" diff --git a/forest/cli/src/cli/db_cmd.rs b/forest/cli/src/cli/db_cmd.rs index b6d75a68cbf3..d1b9b8bbc0c0 100644 --- a/forest/cli/src/cli/db_cmd.rs +++ b/forest/cli/src/cli/db_cmd.rs @@ -1,17 +1,21 @@ // Copyright 2019-2023 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use chrono::Utc; use clap::Subcommand; use forest_cli_shared::{chain_path, cli::Config}; -use forest_db::db_engine::db_path; +use forest_db::db_engine::db_root; +use forest_rpc_client::db_ops::db_gc; use log::error; -use crate::cli::prompt_confirm; +use crate::cli::{handle_rpc_err, prompt_confirm}; #[derive(Debug, Subcommand)] pub enum DBCommands { /// Show DB stats Stats, + /// Run DB garbage collection + GC, /// DB Clean up Clean { /// Answer yes to all forest-cli yes/no questions without prompting @@ -21,19 +25,33 @@ pub enum DBCommands { } impl DBCommands { - pub fn run(&self, config: &Config) -> anyhow::Result<()> { + pub async fn run(&self, config: &Config) -> anyhow::Result<()> { match self { Self::Stats => { use human_repr::HumanCount; - let dir = db_path(&chain_path(config)); + let dir = db_root(&chain_path(config)); println!("Database path: {}", dir.display()); let size = fs_extra::dir::get_size(dir).unwrap_or_default(); println!("Database size: {}", size.human_count_bytes()); Ok(()) } + Self::GC => { + let start = Utc::now(); + + db_gc((), &config.client.rpc_token) + .await + .map_err(handle_rpc_err)?; + + println!( + "DB GC completed. took {}s", + (Utc::now() - start).num_seconds() + ); + + Ok(()) + } Self::Clean { force } => { - let dir = db_path(&chain_path(config)); + let dir = db_root(&chain_path(config)); if !dir.is_dir() { println!( "Aborted. Database path {} is not a valid directory", diff --git a/forest/cli/src/cli/snapshot_cmd.rs b/forest/cli/src/cli/snapshot_cmd.rs index 0af8b09e622e..ea6aa814ae17 100644 --- a/forest/cli/src/cli/snapshot_cmd.rs +++ b/forest/cli/src/cli/snapshot_cmd.rs @@ -11,9 +11,9 @@ use forest_chain::ChainStore; use forest_cli_shared::cli::{ default_snapshot_dir, is_car_or_tmp, snapshot_fetch, SnapshotServer, SnapshotStore, }; -use forest_db::db_engine::open_db; +use forest_db::db_engine::{db_root, open_proxy_db}; use forest_genesis::{forest_load_car, read_genesis_header}; -use forest_ipld::{recurse_links_hash, CidHashSet}; +use forest_ipld::{recurse_links_hash, CidHashSet, DEFAULT_RECENT_STATE_ROOTS}; use forest_rpc_api::chain_api::ChainExportParams; use forest_rpc_client::chain_ops::*; use forest_utils::{io::parser::parse_duration, net::FetchProgress, retry}; @@ -35,12 +35,6 @@ pub(crate) const OUTPUT_PATH_DEFAULT_FORMAT: &str = pub enum SnapshotCommands { /// Export a snapshot of the chain to `` Export { - /// Tipset to start the export from, default is the chain head - #[arg(short, long)] - tipset: Option, - /// Specify the number of recent state roots to include in the export. - #[arg(short, long, default_value = "2000")] - recent_stateroots: i64, /// Snapshot output path. Default to /// `forest_snapshot_{chain}_{year}-{month}-{day}_height_{height}.car` /// Date is in ISO 8601 date format. @@ -150,8 +144,6 @@ impl SnapshotCommands { pub async fn run(&self, config: Config) -> anyhow::Result<()> { match self { Self::Export { - tipset, - recent_stateroots, output_path, skip_checksum, dry_run, @@ -161,7 +153,7 @@ impl SnapshotCommands { Err(_) => cli_error_and_die("Could not get network head", 1), }; - let epoch = tipset.unwrap_or(chain_head.epoch()); + let epoch = chain_head.epoch(); let now = OffsetDateTime::now_utc(); @@ -196,7 +188,7 @@ impl SnapshotCommands { let params = ChainExportParams { epoch, - recent_roots: *recent_stateroots, + recent_roots: DEFAULT_RECENT_STATE_ROOTS, output_path, tipset_keys: TipsetKeysJson(chain_head.key().clone()), skip_checksum: *skip_checksum, @@ -393,8 +385,13 @@ async fn validate( if confirm { let tmp_chain_data_path = TempDir::new()?; - let db_path = tmp_chain_data_path.path().join(&config.chain.name); - let db = open_db(&db_path, config.db_config())?; + let db_path = db_root( + tmp_chain_data_path + .path() + .join(&config.chain.name) + .as_path(), + ); + let db = open_proxy_db(db_path, config.db_config().clone())?; let genesis = read_genesis_header( config.client.genesis_file.as_ref(), @@ -413,7 +410,7 @@ async fn validate( let cids = { let file = tokio::fs::File::open(&snapshot).await?; let reader = FetchProgress::fetch_from_file(file).await?; - forest_load_car(chain_store.blockstore(), reader.compat()).await? + forest_load_car(chain_store.blockstore().clone(), reader.compat()).await? }; let ts = chain_store.tipset_from_keys(&TipsetKeys::new(cids))?; diff --git a/forest/cli/src/subcommand.rs b/forest/cli/src/subcommand.rs index 78b7efcd4a02..292e28a33974 100644 --- a/forest/cli/src/subcommand.rs +++ b/forest/cli/src/subcommand.rs @@ -20,7 +20,7 @@ pub(super) async fn process(command: Subcommand, config: Config) -> anyhow::Resu Subcommand::State(cmd) => cmd.run(config), Subcommand::Config(cmd) => cmd.run(&config, &mut std::io::stdout()), Subcommand::Send(cmd) => cmd.run(config).await, - Subcommand::DB(cmd) => cmd.run(&config), + Subcommand::DB(cmd) => cmd.run(&config).await, Subcommand::Snapshot(cmd) => cmd.run(config).await, Subcommand::Attach(cmd) => cmd.run(config), Subcommand::Shutdown(cmd) => cmd.run(config).await, diff --git a/forest/daemon/src/daemon.rs b/forest/daemon/src/daemon.rs index c9a4aae2be37..fd38e06132ab 100644 --- a/forest/daemon/src/daemon.rs +++ b/forest/daemon/src/daemon.rs @@ -17,7 +17,8 @@ use forest_cli_shared::{ }, }; use forest_db::{ - db_engine::{db_path, open_db, Db}, + db_engine::{db_root, open_proxy_db}, + rolling::{DbGarbageCollector, RollingDB}, Store, }; use forest_genesis::{get_network_name_from_genesis, import_chain, read_genesis_header}; @@ -67,7 +68,7 @@ fn unblock_parent_process() -> anyhow::Result<()> { } /// Starts daemon process -pub(super) async fn start(opts: CliOpts, config: Config) -> anyhow::Result { +pub(super) async fn start(opts: CliOpts, config: Config) -> anyhow::Result { if config.chain.name == "calibnet" { forest_shim::address::set_current_network(forest_shim::address::Network::Testnet); } @@ -119,8 +120,7 @@ pub(super) async fn start(opts: CliOpts, config: Config) -> anyhow::Result { let keystore = Arc::new(RwLock::new(keystore)); let chain_data_path = chain_path(&config); - - let db = open_db(&db_path(&chain_data_path), config.db_config())?; + let db = open_proxy_db(db_root(&chain_data_path), config.db_config().clone())?; let mut services = JoinSet::new(); @@ -133,8 +133,7 @@ pub(super) async fn start(opts: CliOpts, config: Config) -> anyhow::Result { "Prometheus server started at {}", config.client.metrics_address ); - - let db_directory = forest_db::db_engine::db_path(&chain_data_path); + let db_directory = forest_db::db_engine::db_root(&chain_path(&config)); let db = db.clone(); services.spawn(async { forest_metrics::init_prometheus(prometheus_listener, db_directory, db) @@ -162,6 +161,23 @@ pub(super) async fn start(opts: CliOpts, config: Config) -> anyhow::Result { )?); chain_store.set_genesis(&genesis_header)?; + let db_garbage_collector = { + let db = db.clone(); + let chain_store = chain_store.clone(); + let get_tipset = move || chain_store.heaviest_tipset().as_ref().clone(); + Arc::new(DbGarbageCollector::new(db, get_tipset)) + }; + + #[allow(clippy::redundant_async_block)] + services.spawn({ + let db_garbage_collector = db_garbage_collector.clone(); + async move { db_garbage_collector.collect_loop_passive().await } + }); + #[allow(clippy::redundant_async_block)] + services.spawn({ + let db_garbage_collector = db_garbage_collector.clone(); + async move { db_garbage_collector.collect_loop_event().await } + }); let publisher = chain_store.publisher(); @@ -292,6 +308,7 @@ pub(super) async fn start(opts: CliOpts, config: Config) -> anyhow::Result { let rpc_state_manager = Arc::clone(&state_manager); let rpc_chain_store = Arc::clone(&chain_store); + let gc_event_tx = db_garbage_collector.get_tx(); services.spawn(async move { info!("JSON-RPC endpoint started at {}", config.client.rpc_address); // XXX: The JSON error message are a nightmare to print. @@ -304,11 +321,11 @@ pub(super) async fn start(opts: CliOpts, config: Config) -> anyhow::Result { sync_state, network_send, network_name, - beacon: rpc_state_manager.beacon_schedule(), /* TODO: the RPCState can fetch - * this itself from the - * StateManager */ + // TODO: the RPCState can fetch this itself from the StateManager + beacon: rpc_state_manager.beacon_schedule(), chain_store: rpc_chain_store, new_mined_block_tx: tipset_sink, + gc_event_tx, }), rpc_listen, FOREST_VERSION_STRING.as_str(), diff --git a/forest/daemon/src/main.rs b/forest/daemon/src/main.rs index ce35a58da01f..1f2ff3931069 100644 --- a/forest/daemon/src/main.rs +++ b/forest/daemon/src/main.rs @@ -16,7 +16,7 @@ static GLOBAL: MiMalloc = MiMalloc; mod cli; mod daemon; -use std::{cmp::max, fs::File, process, sync::Arc, time::Duration}; +use std::{cmp::max, fs::File, process, time::Duration}; use anyhow::Context; use clap::Parser; @@ -26,10 +26,10 @@ use forest_cli_shared::{ cli::{check_for_unknown_keys, cli_error_and_die, ConfigPath, DaemonConfig}, logger, }; -use forest_db::{db_engine::Db, Store}; +use forest_db::Store; use forest_utils::io::ProgressBar; use lazy_static::lazy_static; -use log::{error, info, warn}; +use log::{info, warn}; use raw_sync::{ events::{Event, EventInit}, Timeout, @@ -170,21 +170,12 @@ fn main() -> anyhow::Result<()> { if let Some(loki_task) = loki_task { rt.spawn(loki_task); } - let db: Db = rt.block_on(daemon::start(opts, cfg))?; - + let db = rt.block_on(daemon::start(opts, cfg))?; info!("Shutting down tokio..."); rt.shutdown_timeout(Duration::from_secs(10)); - db.flush()?; - let db_weak_ref = Arc::downgrade(&db.db); drop(db); - if db_weak_ref.strong_count() != 0 { - error!( - "Dangling reference to DB detected: {}. Tracking issue: https://github.com/ChainSafe/forest/issues/1891", - db_weak_ref.strong_count() - ); - } info!("Forest finish shutdown"); } } diff --git a/ipld/Cargo.toml b/ipld/Cargo.toml index 869735d3e8ae..a1334a9702af 100644 --- a/ipld/Cargo.toml +++ b/ipld/Cargo.toml @@ -13,6 +13,7 @@ anyhow.workspace = true async-recursion = "1.0" async-trait.workspace = true cid.workspace = true +forest_blocks.workspace = true fvm_ipld_encoding.workspace = true fvm_shared = { workspace = true, default-features = false } indexmap.workspace = true diff --git a/ipld/src/util.rs b/ipld/src/util.rs index d9368745fa1f..29eec737798b 100644 --- a/ipld/src/util.rs +++ b/ipld/src/util.rs @@ -1,10 +1,11 @@ // Copyright 2019-2023 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use std::future::Future; +use std::{collections::VecDeque, future::Future}; use cid::Cid; -use fvm_ipld_encoding::from_slice; +use forest_blocks::{BlockHeader, Tipset}; +use fvm_ipld_encoding::{from_slice, Cbor}; use crate::{CidHashSet, Ipld}; @@ -78,3 +79,70 @@ where Ok(()) } + +pub const DEFAULT_RECENT_STATE_ROOTS: i64 = 2000; + +/// Walks over tipset and state data and loads all blocks not yet seen. +/// This is tracked based on the callback function loading blocks. +pub async fn walk_snapshot( + tipset: &Tipset, + recent_roots: i64, + mut load_block: F, +) -> anyhow::Result<()> +where + F: FnMut(Cid) -> T + Send, + T: Future>> + Send, +{ + let mut seen = CidHashSet::default(); + let mut blocks_to_walk: VecDeque = tipset.cids().to_vec().into(); + let mut current_min_height = tipset.epoch(); + let incl_roots_epoch = tipset.epoch() - recent_roots; + + while let Some(next) = blocks_to_walk.pop_front() { + if !seen.insert(&next) { + continue; + } + + let data = load_block(next).await?; + + let h = BlockHeader::unmarshal_cbor(&data)?; + + if current_min_height > h.epoch() { + current_min_height = h.epoch(); + } + + if h.epoch() > incl_roots_epoch { + recurse_links_hash(&mut seen, *h.messages(), &mut load_block).await?; + } + + if h.epoch() > 0 { + for p in h.parents().cids() { + blocks_to_walk.push_back(*p); + } + } else { + for p in h.parents().cids() { + load_block(*p).await?; + } + } + + if h.epoch() == 0 || h.epoch() > incl_roots_epoch { + recurse_links_hash(&mut seen, *h.state_root(), &mut load_block).await?; + } + } + + Ok(()) +} + +pub fn should_save_block_to_snapshot(cid: &Cid) -> bool { + // Don't include identity CIDs. + // We only include raw and dagcbor, for now. + // Raw for "code" CIDs. + if cid.hash().code() == u64::from(cid::multihash::Code::Identity) { + false + } else { + matches!( + cid.codec(), + fvm_shared::IPLD_RAW | fvm_ipld_encoding::DAG_CBOR + ) + } +} diff --git a/node/db/Cargo.toml b/node/db/Cargo.toml index 2a36e67a2d2d..a4565f5ceb10 100644 --- a/node/db/Cargo.toml +++ b/node/db/Cargo.toml @@ -25,9 +25,16 @@ paritydb = ["dep:parity-db"] [dependencies] ahash.workspace = true anyhow.workspace = true +chrono.workspace = true cid.workspace = true +flume.workspace = true +forest_blocks.workspace = true +forest_ipld.workspace = true forest_libp2p_bitswap.workspace = true +forest_utils.workspace = true +fs_extra.workspace = true fvm_ipld_blockstore.workspace = true +human-repr.workspace = true lazy_static.workspace = true libipld.workspace = true log.workspace = true @@ -35,11 +42,15 @@ num_cpus.workspace = true parking_lot.workspace = true prometheus = { workspace = true } serde = { workspace = true, features = ["derive"] } +serde_yaml.workspace = true thiserror.workspace = true +tokio = { workspace = true, features = ["sync"] } +uuid = { version = "1.3", features = ["v4"] } # optional parity-db = { version = "0.4.6", default-features = false, optional = true } rocksdb = { version = "0.20", default-features = false, optional = true } [dev-dependencies] +rand.workspace = true tempfile.workspace = true diff --git a/node/db/src/lib.rs b/node/db/src/lib.rs index a12d9b3b7726..de2665fa8a1b 100644 --- a/node/db/src/lib.rs +++ b/node/db/src/lib.rs @@ -17,6 +17,9 @@ pub mod rocks_config; pub use errors::Error; pub use memory::MemoryDB; +#[cfg(any(feature = "paritydb", feature = "rocksdb"))] +pub mod rolling; + /// Store interface used as a KV store implementation pub trait Store { /// Read single value from data store and return `None` if key doesn't @@ -31,24 +34,11 @@ pub trait Store { K: AsRef<[u8]>, V: AsRef<[u8]>; - /// Delete value at key. - fn delete(&self, key: K) -> Result<(), Error> - where - K: AsRef<[u8]>; - /// Returns `Ok(true)` if key exists in store fn exists(&self, key: K) -> Result where K: AsRef<[u8]>; - /// Read slice of keys and return a vector of optional values. - fn bulk_read(&self, keys: &[K]) -> Result>>, Error> - where - K: AsRef<[u8]>, - { - keys.iter().map(|key| self.read(key)).collect() - } - /// Write slice of KV pairs. fn bulk_write( &self, @@ -59,14 +49,6 @@ pub trait Store { .try_for_each(|(key, value)| self.write(key.into(), value.into())) } - /// Bulk delete keys from the data store. - fn bulk_delete(&self, keys: &[K]) -> Result<(), Error> - where - K: AsRef<[u8]>, - { - keys.iter().try_for_each(|key| self.delete(key)) - } - /// Flush writing buffer if there is any. Default implementation is blank fn flush(&self) -> Result<(), Error> { Ok(()) @@ -89,13 +71,6 @@ impl Store for &BS { (*self).write(key, value) } - fn delete(&self, key: K) -> Result<(), Error> - where - K: AsRef<[u8]>, - { - (*self).delete(key) - } - fn exists(&self, key: K) -> Result where K: AsRef<[u8]>, @@ -103,26 +78,12 @@ impl Store for &BS { (*self).exists(key) } - fn bulk_read(&self, keys: &[K]) -> Result>>, Error> - where - K: AsRef<[u8]>, - { - (*self).bulk_read(keys) - } - fn bulk_write( &self, values: impl IntoIterator>, impl Into>)>, ) -> Result<(), Error> { (*self).bulk_write(values) } - - fn bulk_delete(&self, keys: &[K]) -> Result<(), Error> - where - K: AsRef<[u8]>, - { - (*self).bulk_delete(keys) - } } /// Traits for collecting DB stats @@ -132,35 +93,35 @@ pub trait DBStatistics { } } -#[cfg(feature = "rocksdb")] +#[cfg(any(feature = "paritydb", feature = "rocksdb"))] pub mod db_engine { use std::path::{Path, PathBuf}; + use crate::rolling::*; + + #[cfg(feature = "rocksdb")] pub type Db = crate::rocks::RocksDb; + #[cfg(feature = "paritydb")] + pub type Db = crate::parity_db::ParityDb; + #[cfg(feature = "rocksdb")] pub type DbConfig = crate::rocks_config::RocksDbConfig; + #[cfg(feature = "paritydb")] + pub type DbConfig = crate::parity_db_config::ParityDbConfig; - pub fn db_path(path: &Path) -> PathBuf { - path.join("rocksdb") - } + #[cfg(feature = "rocksdb")] + const DIR_NAME: &str = "rocksdb"; + #[cfg(feature = "paritydb")] + const DIR_NAME: &str = "paritydb"; - pub fn open_db(path: &std::path::Path, config: &DbConfig) -> anyhow::Result { - crate::rocks::RocksDb::open(path, config).map_err(Into::into) + pub fn db_root(chain_data_root: &Path) -> PathBuf { + chain_data_root.join(DIR_NAME) } -} - -#[cfg(feature = "paritydb")] -pub mod db_engine { - use std::path::{Path, PathBuf}; - - pub type Db = crate::parity_db::ParityDb; - pub type DbConfig = crate::parity_db_config::ParityDbConfig; - pub fn db_path(path: &Path) -> PathBuf { - path.join("paritydb") + pub(crate) fn open_db(path: &Path, config: &DbConfig) -> anyhow::Result { + Db::open(path, config).map_err(Into::into) } - pub fn open_db(path: &std::path::Path, config: &DbConfig) -> anyhow::Result { - use crate::parity_db::ParityDb; - ParityDb::open(path.to_owned(), config) + pub fn open_proxy_db(db_root: PathBuf, db_config: DbConfig) -> anyhow::Result { + RollingDB::load_or_create(db_root, db_config) } } diff --git a/node/db/src/memory.rs b/node/db/src/memory.rs index 0bfb598108c3..020453ac49a2 100644 --- a/node/db/src/memory.rs +++ b/node/db/src/memory.rs @@ -30,14 +30,6 @@ impl Store for MemoryDB { Ok(()) } - fn delete(&self, key: K) -> Result<(), Error> - where - K: AsRef<[u8]>, - { - self.db.write().remove(key.as_ref()); - Ok(()) - } - fn read(&self, key: K) -> Result>, Error> where K: AsRef<[u8]>, diff --git a/node/db/src/parity_db.rs b/node/db/src/parity_db.rs index a3aac9770701..33bb9aa91a7b 100644 --- a/node/db/src/parity_db.rs +++ b/node/db/src/parity_db.rs @@ -50,8 +50,8 @@ impl ParityDb { }) } - pub fn open(path: PathBuf, config: &ParityDbConfig) -> anyhow::Result { - let opts = Self::to_options(path, config)?; + pub fn open(path: impl Into, config: &ParityDbConfig) -> anyhow::Result { + let opts = Self::to_options(path.into(), config)?; Ok(Self { db: Arc::new(Db::open_or_create(&opts)?), statistics_enabled: opts.stats, @@ -106,14 +106,6 @@ impl Store for ParityDb { // ``` } - fn delete(&self, key: K) -> Result<(), Error> - where - K: AsRef<[u8]>, - { - let tx = [(0, key.as_ref(), None)]; - self.db.commit(tx).map_err(Error::from) - } - fn exists(&self, key: K) -> Result where K: AsRef<[u8]>, diff --git a/node/db/src/rocks.rs b/node/db/src/rocks.rs index 4867d2772330..c1f02331c005 100644 --- a/node/db/src/rocks.rs +++ b/node/db/src/rocks.rs @@ -234,13 +234,6 @@ impl Store for RocksDb { Ok(self.db.put_opt(key, value, &WRITE_OPT_NO_WAL)?) } - fn delete(&self, key: K) -> Result<(), Error> - where - K: AsRef<[u8]>, - { - Ok(self.db.delete(key)?) - } - fn exists(&self, key: K) -> Result where K: AsRef<[u8]>, diff --git a/node/db/src/rolling/gc.rs b/node/db/src/rolling/gc.rs new file mode 100644 index 000000000000..f7e6666e226b --- /dev/null +++ b/node/db/src/rolling/gc.rs @@ -0,0 +1,246 @@ +// Copyright 2019-2023 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +//! +//! The current implementation of the garbage collector is a concurrent, +//! semi-space one. +//! +//! ## Design goals +//! Implement a correct GC algorithm that is simple and efficient for forest +//! scenarios. +//! +//! ## GC algorithm +//! We chose the `semi-space` GC algorithm for simplicity and sufficiency +//! Besides `semi-space`, `mark-and-sweep` was also considered and evaluated. +//! However, it's not feasible because of the limitations of the underlying DB +//! we use, more specifically, limitations in iterating the DB and retrieving the original key. See +//! +//! ## GC workflow +//! 1. Walk back from the current heaviest tipset to the genesis block, collect +//! all the blocks that are reachable from the snapshot +//! 2. writes blocks that are absent from the `current` database to it +//! 3. delete `old` database(s) +//! 4. sets `current` database to a newly created one +//! +//! ## Correctness +//! This algorithm considers all blocks that are visited during the snapshot +//! export task reachable, and ensures they are all transferred and kept in the +//! current DB space. A snapshot can be used to bootstrap a node from +//! scratch thus the algorithm is considered appropriate when the post-GC +//! database contains blocks that are sufficient for exporting a snapshot +//! +//! ## Disk usage +//! During `walk_snapshot`, data from the `old` DB is duplicated in the +//! `current` DB, which uses extra disk space of up to 100% of the snapshot file +//! size +//! +//! ## Memory usage +//! During the data carry-over process, a memory buffer with a fixed capacity is +//! used to speed up the database write operation +//! +//! ## Scheduling +//! 1. GC is triggered automatically when total DB size is greater than 2x of +//! the last reachable data size +//! 2. GC can be triggered manually by `forest-cli db gc` command +//! 3. There's a global GC lock to ensure at most one GC job is running +//! +//! ## Performance +//! GC performance is typically 1x-1.5x of `snapshot export`, depending on +//! number of write operations to the `current` DB space. +//! +//! ### Look up performance +//! DB lookup performance is almost on-par between from single DB and two DBs. +//! Time cost of `forest-cli snapshot export --dry-run` on DO droplet with 16GiB +//! ram is between `9000s` to `11000s` for both scenarios, no significant +//! performance regression has been observed +//! +//! ### Write performance +//! DB write performance is typically on par with `snapshot import`. Note that +//! when the `current` DB space is very large, it tends to trigger DB re-index +//! more frequently, each DB re-index could pause the GC process for a few +//! minutes. The same behaviour is observed during snapshot import as well. +//! +//! ### Sample mainnet log +//! ``` +//! 2023-03-16T19:50:40.323860Z INFO forest_db::rolling::gc: Garbage collection started at epoch 2689660 +//! 2023-03-16T22:27:36.484245Z INFO forest_db::rolling::gc: Garbage collection finished at epoch 2689660, took 9416s, reachable data size: 135.71GB +//! 2023-03-16T22:27:38.793717Z INFO forest_db::rolling::impls: Deleted database under /root/.local/share/forest/mainnet/paritydb/14d0f80992374fb8b20e3b1bd70d5d7b, size: 139.01GB +//! ``` + +use std::{ + sync::atomic::{self, AtomicU64, AtomicUsize}, + time::Duration, +}; + +use chrono::Utc; +use forest_blocks::Tipset; +use forest_ipld::util::*; +use forest_utils::db::{BlockstoreBufferedWriteExt, DB_KEY_BYTES}; +use fvm_ipld_blockstore::Blockstore; +use human_repr::HumanCount; +use tokio::sync::Mutex; + +use super::*; + +pub struct DbGarbageCollector +where + F: Fn() -> Tipset + Send + Sync + 'static, +{ + db: RollingDB, + get_tipset: F, + lock: Mutex<()>, + gc_tx: flume::Sender>>, + gc_rx: flume::Receiver>>, + last_reachable_bytes: AtomicU64, +} + +impl DbGarbageCollector +where + F: Fn() -> Tipset + Send + Sync + 'static, +{ + pub fn new(db: RollingDB, get_tipset: F) -> Self { + let (gc_tx, gc_rx) = flume::unbounded(); + + Self { + db, + get_tipset, + lock: Default::default(), + gc_tx, + gc_rx, + last_reachable_bytes: AtomicU64::new(0), + } + } + + pub fn get_tx(&self) -> flume::Sender>> { + self.gc_tx.clone() + } + + /// This loop automatically triggers `collect_once` when the total DB size + /// is greater than 2x of the last reachable data size + pub async fn collect_loop_passive(&self) -> anyhow::Result<()> { + loop { + // Check every 10 mins + tokio::time::sleep(Duration::from_secs(10 * 60)).await; + + // Bypass size checking during import + let tipset = (self.get_tipset)(); + if tipset.epoch() == 0 { + continue; + } + + // Bypass size checking when lock is held + { + let lock = self.lock.try_lock(); + if lock.is_err() { + continue; + } + } + + if let (Ok(total_size), Ok(current_size), last_reachable_bytes) = ( + self.db.total_size_in_bytes(), + self.db.current_size_in_bytes(), + self.last_reachable_bytes.load(atomic::Ordering::Relaxed), + ) { + let should_collect = if last_reachable_bytes > 0 { + total_size > (gc_trigger_factor() * last_reachable_bytes as f64) as _ + } else { + total_size > 0 && current_size * 3 > total_size + }; + + if should_collect { + if let Err(err) = self.collect_once(tipset).await { + warn!("Garbage collection failed: {err}"); + } + } + } + } + } + + /// This loop listens on events emitted by `forest-cli db gc` and triggers + /// `collect_once` + pub async fn collect_loop_event(self: &Arc) -> anyhow::Result<()> { + while let Ok(responder) = self.gc_rx.recv_async().await { + let this = self.clone(); + let tipset = (self.get_tipset)(); + tokio::spawn(async move { + let result = this.collect_once(tipset).await; + if let Err(e) = responder.send(result) { + warn!("{e}"); + } + }); + } + + Ok(()) + } + + /// ## GC workflow + /// 1. Walk back from the current heaviest tipset to the genesis block, + /// collect all the blocks that are reachable from the snapshot + /// 2. writes blocks that are absent from the `current` database to it + /// 3. delete `old` database(s) + /// 4. sets `current` database to a newly created one + async fn collect_once(&self, tipset: Tipset) -> anyhow::Result<()> { + let guard = self.lock.try_lock(); + if guard.is_err() { + anyhow::bail!("Another garbage collection task is in progress."); + } + + let start = Utc::now(); + let reachable_bytes = Arc::new(AtomicUsize::new(0)); + + info!("Garbage collection started at epoch {}", tipset.epoch()); + let db = &self.db; + // 128MB + const BUFFER_CAPCITY_BYTES: usize = 128 * 1024 * 1024; + let (tx, rx) = flume::bounded(100); + #[allow(clippy::redundant_async_block)] + let write_task = tokio::spawn({ + let db = db.current(); + async move { db.buffered_write(rx, BUFFER_CAPCITY_BYTES).await } + }); + walk_snapshot(&tipset, DEFAULT_RECENT_STATE_ROOTS, |cid| { + let db = db.clone(); + let tx = tx.clone(); + let reachable_bytes = reachable_bytes.clone(); + async move { + let block = db + .get(&cid)? + .ok_or_else(|| anyhow::anyhow!("Cid {cid} not found in blockstore"))?; + + let pair = (cid, block.clone()); + reachable_bytes.fetch_add(DB_KEY_BYTES + pair.1.len(), atomic::Ordering::Relaxed); + if !db.current().has(&cid)? { + tx.send_async(pair).await?; + } + + Ok(block) + } + }) + .await?; + drop(tx); + write_task.await??; + + let reachable_bytes = reachable_bytes.load(atomic::Ordering::Relaxed); + self.last_reachable_bytes + .store(reachable_bytes as _, atomic::Ordering::Relaxed); + info!( + "Garbage collection finished at epoch {}, took {}s, reachable data size: {}", + tipset.epoch(), + (Utc::now() - start).num_seconds(), + reachable_bytes.human_count_bytes(), + ); + + db.next_current()?; + Ok(()) + } +} + +fn gc_trigger_factor() -> f64 { + const DEFAULT_GC_TRIGGER_FACTOR: f64 = 2.0; + + if let Ok(factor) = std::env::var("FOREST_GC_TRIGGER_FACTOR") { + factor.parse().unwrap_or(DEFAULT_GC_TRIGGER_FACTOR) + } else { + DEFAULT_GC_TRIGGER_FACTOR + } +} diff --git a/node/db/src/rolling/impls.rs b/node/db/src/rolling/impls.rs new file mode 100644 index 000000000000..cc000c9ea7e2 --- /dev/null +++ b/node/db/src/rolling/impls.rs @@ -0,0 +1,338 @@ +// Copyright 2019-2023 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use cid::Cid; +use forest_libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite}; +use forest_utils::db::file_backed_obj::FileBackedObject; +use fvm_ipld_blockstore::Blockstore; +use human_repr::HumanCount; +use parking_lot::RwLock; +use uuid::Uuid; + +use super::*; +use crate::*; + +impl Blockstore for RollingDB { + fn has(&self, k: &Cid) -> anyhow::Result { + for db in self.db_queue().iter() { + if Blockstore::has(db, k)? { + return Ok(true); + } + } + + Ok(false) + } + + fn get(&self, k: &Cid) -> anyhow::Result>> { + for db in self.db_queue().iter() { + if let Some(v) = Blockstore::get(db, k)? { + return Ok(Some(v)); + } + } + + Ok(None) + } + + fn put( + &self, + mh_code: cid::multihash::Code, + block: &fvm_ipld_blockstore::Block, + ) -> anyhow::Result + where + Self: Sized, + D: AsRef<[u8]>, + { + Blockstore::put(&self.current(), mh_code, block) + } + + fn put_many(&self, blocks: I) -> anyhow::Result<()> + where + Self: Sized, + D: AsRef<[u8]>, + I: IntoIterator)>, + { + Blockstore::put_many(&self.current(), blocks) + } + + fn put_many_keyed(&self, blocks: I) -> anyhow::Result<()> + where + Self: Sized, + D: AsRef<[u8]>, + I: IntoIterator, + { + Blockstore::put_many_keyed(&self.current(), blocks) + } + + fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { + Blockstore::put_keyed(&self.current(), k, block) + } +} + +impl Store for RollingDB { + fn read(&self, key: K) -> Result>, crate::Error> + where + K: AsRef<[u8]>, + { + for db in self.db_queue().iter() { + if let Some(v) = Store::read(db, key.as_ref())? { + return Ok(Some(v)); + } + } + + Ok(None) + } + + fn exists(&self, key: K) -> Result + where + K: AsRef<[u8]>, + { + for db in self.db_queue().iter() { + if Store::exists(db, key.as_ref())? { + return Ok(true); + } + } + + Ok(false) + } + + fn write(&self, key: K, value: V) -> Result<(), crate::Error> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + Store::write(&self.current(), key, value) + } + + fn bulk_write( + &self, + values: impl IntoIterator>, impl Into>)>, + ) -> Result<(), crate::Error> { + Store::bulk_write(&self.current(), values) + } + + fn flush(&self) -> Result<(), crate::Error> { + Store::flush(&self.current()) + } +} + +impl BitswapStoreRead for RollingDB { + fn contains(&self, cid: &Cid) -> anyhow::Result { + for db in self.db_queue().iter() { + if BitswapStoreRead::contains(db, cid)? { + return Ok(true); + } + } + + Ok(false) + } + + fn get(&self, cid: &Cid) -> anyhow::Result>> { + for db in self.db_queue().iter() { + if let Some(v) = BitswapStoreRead::get(db, cid)? { + return Ok(Some(v)); + } + } + + Ok(None) + } +} + +impl BitswapStoreReadWrite for RollingDB { + type Params = ::Params; + + fn insert(&self, block: &libipld::Block) -> anyhow::Result<()> { + BitswapStoreReadWrite::insert(&self.current(), block) + } +} + +impl DBStatistics for RollingDB { + fn get_statistics(&self) -> Option { + DBStatistics::get_statistics(&self.current()) + } +} + +impl FileBackedObject for DbIndex { + fn serialize(&self) -> anyhow::Result> { + Ok(serde_yaml::to_string(self)?.as_bytes().to_vec()) + } + + fn deserialize(bytes: &[u8]) -> anyhow::Result { + Ok(serde_yaml::from_slice(bytes)?) + } +} + +impl Drop for RollingDB { + fn drop(&mut self) { + if let Err(err) = self.flush() { + warn!( + "Error flushing rolling db under {}: {err}", + self.db_root.display() + ); + } + } +} + +impl RollingDB { + pub fn load_or_create(db_root: PathBuf, db_config: DbConfig) -> anyhow::Result { + if !db_root.exists() { + std::fs::create_dir_all(db_root.as_path())?; + } + let (db_index, current, old) = load_dbs(&db_root, &db_config)?; + + Ok(Self { + db_root: db_root.into(), + db_config: db_config.into(), + db_index: RwLock::new(db_index).into(), + current: RwLock::new(current).into(), + old: RwLock::new(old).into(), + }) + } + + /// Sets `current` as `old`, and sets a new DB as `current`, finally delete + /// the dangling `old` DB. + pub(crate) fn next_current(&self) -> anyhow::Result<()> { + let new_db_name = Uuid::new_v4().simple().to_string(); + let db = open_db(&self.db_root.join(&new_db_name), &self.db_config)?; + *self.old.write() = self.current.read().clone(); + *self.current.write() = db; + let mut db_index = self.db_index.write(); + let db_index_inner_mut = db_index.inner_mut(); + let old_db_path = self.db_root.join(&db_index_inner_mut.old); + db_index_inner_mut.old = db_index_inner_mut.current.clone(); + db_index_inner_mut.current = new_db_name; + db_index.sync()?; + delete_db(&old_db_path); + + Ok(()) + } + + pub fn total_size_in_bytes(&self) -> anyhow::Result { + Ok(fs_extra::dir::get_size(self.db_root.as_path())?) + } + + pub fn current_size_in_bytes(&self) -> anyhow::Result { + Ok(fs_extra::dir::get_size( + self.db_root + .as_path() + .join(self.db_index.read().inner().current.as_str()), + )?) + } + + pub fn current(&self) -> Db { + self.current.read().clone() + } + + fn db_queue(&self) -> [Db; 2] { + [self.current.read().clone(), self.old.read().clone()] + } +} + +fn load_dbs(db_root: &Path, db_config: &DbConfig) -> anyhow::Result<(FileBacked, Db, Db)> { + let mut db_index = FileBacked::load_from_file_or_create( + db_root.join("db_index.yaml"), + Default::default, + None, + )?; + let db_index_mut: &mut DbIndex = db_index.inner_mut(); + if db_index_mut.current.is_empty() { + db_index_mut.current = Uuid::new_v4().simple().to_string(); + } + if db_index_mut.old.is_empty() { + db_index_mut.old = Uuid::new_v4().simple().to_string(); + } + let current = open_db(&db_root.join(&db_index_mut.current), db_config)?; + let old = open_db(&db_root.join(&db_index_mut.old), db_config)?; + db_index.sync()?; + Ok((db_index, current, old)) +} + +fn delete_db(db_path: &Path) { + let size = fs_extra::dir::get_size(db_path).unwrap_or_default(); + if let Err(err) = std::fs::remove_dir_all(db_path) { + warn!( + "Error deleting database under {}, size: {}. {err}", + db_path.display(), + size.human_count_bytes() + ); + } else { + info!( + "Deleted database under {}, size: {}", + db_path.display(), + size.human_count_bytes() + ); + } +} + +#[cfg(test)] +mod tests { + use std::{thread::sleep, time::Duration}; + + use anyhow::*; + use cid::{multihash::MultihashDigest, Cid}; + use forest_libp2p_bitswap::BitswapStoreRead; + use fvm_ipld_blockstore::Blockstore; + use rand::Rng; + use tempfile::TempDir; + + use super::*; + + #[test] + fn rolling_db_behaviour_tests() -> Result<()> { + let db_root = TempDir::new()?; + println!("Creating rolling db under {}", db_root.path().display()); + let rolling_db = RollingDB::load_or_create(db_root.path().into(), Default::default())?; + println!("Generating random blocks"); + let pairs: Vec<_> = (0..1000) + .map(|_| { + let mut bytes = [0; 1024]; + rand::rngs::OsRng.fill(&mut bytes); + let cid = + Cid::new_v0(cid::multihash::Code::Sha2_256.digest(bytes.as_slice())).unwrap(); + (cid, bytes.to_vec()) + }) + .collect(); + + let split_index = 500; + + for (i, (k, block)) in pairs.iter().enumerate() { + if i == split_index { + sleep(Duration::from_millis(1)); + println!("Creating a new current db"); + rolling_db.next_current()?; + println!("Created a new current db"); + } + rolling_db.put_keyed(k, block)?; + } + + for (i, (k, block)) in pairs.iter().enumerate() { + ensure!(rolling_db.contains(k)?, "{i}"); + ensure!( + Blockstore::get(&rolling_db, k)?.unwrap().as_slice() == block, + "{i}" + ); + } + + rolling_db.next_current()?; + + for (i, (k, _)) in pairs.iter().enumerate() { + if i < split_index { + ensure!(!rolling_db.contains(k)?, "{i}"); + } else { + ensure!(rolling_db.contains(k)?, "{i}"); + } + } + + drop(rolling_db); + + let rolling_db = RollingDB::load_or_create(db_root.path().into(), Default::default())?; + for (i, (k, _)) in pairs.iter().enumerate() { + if i < split_index { + ensure!(!rolling_db.contains(k)?); + } else { + ensure!(rolling_db.contains(k)?); + } + } + + Ok(()) + } +} diff --git a/node/db/src/rolling/mod.rs b/node/db/src/rolling/mod.rs new file mode 100644 index 000000000000..e59657bd240f --- /dev/null +++ b/node/db/src/rolling/mod.rs @@ -0,0 +1,50 @@ +// Copyright 2019-2023 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +//! The state of the Filecoin Blockchain is a persistent, directed acyclic +//! graph. Data in this graph is never mutated nor explicitly deleted but may +//! become unreachable over time. +//! +//! This module contains a concurrent, semi-space garbage collector. The garbage +//! collector is guaranteed to be non-blocking and can be expected to run with a +//! fixed memory overhead and require disk space proportional to the size of the +//! reachable graph. For example, if the size of the reachable graph is 100GiB, +//! expect this garbage collector to use 3x100GiB = 300GiB of storage. + +mod gc; +pub use gc::*; +mod impls; + +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; + +use forest_utils::db::file_backed_obj::FileBacked; +use log::{info, warn}; +use parking_lot::RwLock; +use serde::{Deserialize, Serialize}; + +use crate::db_engine::{open_db, Db, DbConfig}; + +/// This DB wrapper is specially designed for supporting the concurrent, +/// semi-space GC algorithm that is implemented in [DbGarbageCollector], +/// containing a reference to the `old` DB space and a reference to the +/// `current` DB space. Both underlying key-vale DB are supposed to contain only +/// block data as value and its content-addressed CID as key +#[derive(Clone)] +pub struct RollingDB { + db_root: Arc, + db_config: Arc, + db_index: Arc>>, + /// The current writable DB + current: Arc>, + /// The old writable DB + old: Arc>, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +struct DbIndex { + current: String, + old: String, +} diff --git a/node/db/tests/mem_test.rs b/node/db/tests/mem_test.rs index 2449c1346743..729295d609eb 100644 --- a/node/db/tests/mem_test.rs +++ b/node/db/tests/mem_test.rs @@ -29,26 +29,8 @@ fn mem_db_does_not_exist() { subtests::does_not_exist(&db); } -#[test] -fn mem_db_delete() { - let db = MemoryDB::default(); - subtests::delete(&db); -} - #[test] fn mem_db_bulk_write() { let db = MemoryDB::default(); subtests::bulk_write(&db); } - -#[test] -fn mem_db_bulk_read() { - let db = MemoryDB::default(); - subtests::bulk_read(&db); -} - -#[test] -fn mem_db_bulk_delete() { - let db = MemoryDB::default(); - subtests::bulk_delete(&db); -} diff --git a/node/db/tests/parity_test.rs b/node/db/tests/parity_test.rs index 38dd0d6c8cfe..0b422e9e9a57 100644 --- a/node/db/tests/parity_test.rs +++ b/node/db/tests/parity_test.rs @@ -35,27 +35,9 @@ mod paritydb_tests { subtests::does_not_exist(&*db); } - #[test] - fn db_delete() { - let db = TempParityDB::new(); - subtests::delete(&*db); - } - #[test] fn db_bulk_write() { let db = TempParityDB::new(); subtests::bulk_write(&*db); } - - #[test] - fn db_bulk_read() { - let db = TempParityDB::new(); - subtests::bulk_read(&*db); - } - - #[test] - fn db_bulk_delete() { - let db = TempParityDB::new(); - subtests::bulk_delete(&*db); - } } diff --git a/node/db/tests/rocks_test.rs b/node/db/tests/rocks_test.rs index 18934db82633..4db645b8c532 100644 --- a/node/db/tests/rocks_test.rs +++ b/node/db/tests/rocks_test.rs @@ -36,27 +36,9 @@ mod rocksdb_tests { subtests::does_not_exist(&*db); } - #[test] - fn db_delete() { - let db = TempRocksDB::new(); - subtests::delete(&*db); - } - #[test] fn db_bulk_write() { let db = TempRocksDB::new(); subtests::bulk_write(&*db); } - - #[test] - fn db_bulk_read() { - let db = TempRocksDB::new(); - subtests::bulk_read(&*db); - } - - #[test] - fn db_bulk_delete() { - let db = TempRocksDB::new(); - subtests::bulk_delete(&*db); - } } diff --git a/node/db/tests/subtests/mod.rs b/node/db/tests/subtests/mod.rs index 926b8b1d8079..9909382342bf 100644 --- a/node/db/tests/subtests/mod.rs +++ b/node/db/tests/subtests/mod.rs @@ -43,20 +43,6 @@ where assert!(!res); } -pub fn delete(db: &DB) -where - DB: Store, -{ - let key = [0]; - let value = [1]; - db.write(key, value).unwrap(); - let res = db.exists(key).unwrap(); - assert!(res); - db.delete(key).unwrap(); - let res = db.exists(key).unwrap(); - assert!(!res); -} - pub fn bulk_write(db: &DB) where DB: Store, @@ -68,43 +54,3 @@ where assert!(res); } } - -pub fn bulk_read(db: &DB) -where - DB: Store, -{ - let keys = [[0], [1], [2]]; - let values = [[0], [1], [2]]; - let kvs: Vec<_> = keys - .iter() - .zip(values.iter()) - .map(|(k, v)| (k.to_vec(), v.to_vec())) - .collect(); - db.bulk_write(kvs).unwrap(); - let results = db.bulk_read(&keys).unwrap(); - for (result, value) in results.iter().zip(values.iter()) { - match result { - Some(v) => assert_eq!(v, value), - None => panic!("No values found!"), - } - } -} - -pub fn bulk_delete(db: &DB) -where - DB: Store, -{ - let keys = [[0], [1], [2]]; - let values = [[0], [1], [2]]; - let kvs: Vec<_> = keys - .iter() - .zip(values.iter()) - .map(|(k, v)| (k.to_vec(), v.to_vec())) - .collect(); - db.bulk_write(kvs).unwrap(); - db.bulk_delete(&keys).unwrap(); - for k in keys.iter() { - let res = db.exists(*k).unwrap(); - assert!(!res); - } -} diff --git a/node/rpc-api/src/data_types.rs b/node/rpc-api/src/data_types.rs index c5f8cb92f7a9..3e864724e8d3 100644 --- a/node/rpc-api/src/data_types.rs +++ b/node/rpc-api/src/data_types.rs @@ -42,6 +42,7 @@ where pub network_name: String, pub new_mined_block_tx: flume::Sender>, pub beacon: Arc>, + pub gc_event_tx: flume::Sender>>, } #[derive(Debug, Serialize, Deserialize)] diff --git a/node/rpc-api/src/lib.rs b/node/rpc-api/src/lib.rs index a91f503b6609..5795dc83eaa6 100644 --- a/node/rpc-api/src/lib.rs +++ b/node/rpc-api/src/lib.rs @@ -89,6 +89,9 @@ pub static ACCESS_MAP: Lazy> = Lazy::new(|| { access.insert(net_api::NET_CONNECT, Access::Write); access.insert(net_api::NET_DISCONNECT, Access::Write); + // DB API + access.insert(db_api::DB_GC, Access::Write); + access }); @@ -411,3 +414,10 @@ pub mod net_api { pub type NetDisconnectParams = (String,); pub type NetDisconnectResult = (); } + +/// DB API +pub mod db_api { + pub const DB_GC: &str = "Filecoin.DatabaseGarbageCollection"; + pub type DBGCParams = (); + pub type DBGCResult = (); +} diff --git a/node/rpc-client/src/db_ops.rs b/node/rpc-client/src/db_ops.rs new file mode 100644 index 000000000000..d142b690b3e1 --- /dev/null +++ b/node/rpc-client/src/db_ops.rs @@ -0,0 +1,11 @@ +// Copyright 2019-2023 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use forest_rpc_api::db_api::*; +use jsonrpc_v2::Error; + +use crate::call; + +pub async fn db_gc(params: DBGCParams, auth_token: &Option) -> Result { + call(DB_GC, params, auth_token).await +} diff --git a/node/rpc-client/src/lib.rs b/node/rpc-client/src/lib.rs index 5addee1f3a3e..37d17438033c 100644 --- a/node/rpc-client/src/lib.rs +++ b/node/rpc-client/src/lib.rs @@ -5,6 +5,7 @@ pub mod auth_ops; pub mod chain_ops; pub mod common_ops; +pub mod db_ops; pub mod mpool_ops; pub mod net_ops; pub mod state_ops; diff --git a/node/rpc/src/db_api.rs b/node/rpc/src/db_api.rs new file mode 100644 index 000000000000..e2c8975ec70c --- /dev/null +++ b/node/rpc/src/db_api.rs @@ -0,0 +1,17 @@ +// Copyright 2019-2023 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use forest_beacon::Beacon; +use forest_rpc_api::{data_types::RPCState, db_api::*}; +use fvm_ipld_blockstore::Blockstore; +use jsonrpc_v2::{Data, Error as JsonRpcError, Params}; + +pub(crate) async fn db_gc( + data: Data>, + Params(_): Params, +) -> Result { + let (tx, rx) = flume::bounded(1); + data.gc_event_tx.send_async(tx).await?; + rx.recv_async().await??; + Ok(()) +} diff --git a/node/rpc/src/lib.rs b/node/rpc/src/lib.rs index d6dd285aea66..b37aa56775dd 100644 --- a/node/rpc/src/lib.rs +++ b/node/rpc/src/lib.rs @@ -5,6 +5,7 @@ mod auth_api; mod beacon_api; mod chain_api; mod common_api; +mod db_api; mod gas_api; mod mpool_api; mod net_api; @@ -21,8 +22,8 @@ use axum::routing::{get, post}; use forest_beacon::Beacon; use forest_chain::Scale; use forest_rpc_api::{ - auth_api::*, beacon_api::*, chain_api::*, common_api::*, data_types::RPCState, gas_api::*, - mpool_api::*, net_api::*, state_api::*, sync_api::*, wallet_api::*, + auth_api::*, beacon_api::*, chain_api::*, common_api::*, data_types::RPCState, db_api::*, + gas_api::*, mpool_api::*, net_api::*, state_api::*, sync_api::*, wallet_api::*, }; use fvm_ipld_blockstore::Blockstore; use jsonrpc_v2::{Data, Error as JSONRPCError, Server}; @@ -126,6 +127,8 @@ where .with_method(NET_PEERS, net_api::net_peers::) .with_method(NET_CONNECT, net_api::net_connect::) .with_method(NET_DISCONNECT, net_api::net_disconnect::) + // DB API + .with_method(DB_GC, db_api::db_gc::) .finish_unwrapped(), ); diff --git a/node/rpc/src/sync_api.rs b/node/rpc/src/sync_api.rs index d6b1cf27fca0..e5d1c25bc53d 100644 --- a/node/rpc/src/sync_api.rs +++ b/node/rpc/src/sync_api.rs @@ -151,6 +151,7 @@ mod tests { .unwrap() }; let (new_mined_block_tx, _) = flume::bounded(5); + let (gc_event_tx, _) = flume::unbounded(); let state = Arc::new(RPCState { state_manager, keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory).unwrap())), @@ -162,6 +163,7 @@ mod tests { chain_store: cs_for_chain, beacon, new_mined_block_tx, + gc_event_tx, }); (state, network_rx) } diff --git a/scripts/calibnet_health_check.sh b/scripts/calibnet_health_check.sh index 40a95a32d9f0..b28a169b125f 100755 --- a/scripts/calibnet_health_check.sh +++ b/scripts/calibnet_health_check.sh @@ -45,6 +45,17 @@ $FOREST_CLI_PATH chain validate-tipset-checkpoints echo "Waiting for sync and check health" timeout 30m $FOREST_CLI_PATH --chain calibnet sync wait && $FOREST_CLI_PATH --chain calibnet db stats + +# Admin token used when interacting with wallet +ADMIN_TOKEN=$(cat admin_token) +# Set environment variable +export FULLNODE_API_INFO="$ADMIN_TOKEN:/ip4/127.0.0.1/tcp/1234/http" + +echo "Running database garbage collection" +du -hS ~/.local/share/forest/calibnet +$FOREST_CLI_PATH --chain calibnet db gc +du -hS ~/.local/share/forest/calibnet + echo "Exporting snapshot" $FOREST_CLI_PATH --chain calibnet snapshot export @@ -76,10 +87,6 @@ echo "Wallet tests" # Amount to send to FIL_AMT=500 -# Admin token used when interacting with wallet -ADMIN_TOKEN=$(cat admin_token) -# Set environment variable -export FULLNODE_API_INFO="$ADMIN_TOKEN:/ip4/127.0.0.1/tcp/1234/http" echo "Importing preloaded wallet key" $FOREST_CLI_PATH --chain calibnet wallet import preloaded_wallet.key diff --git a/scripts/gen_coverage_report.sh b/scripts/gen_coverage_report.sh index d7cec82f33a1..e7569b0468da 100755 --- a/scripts/gen_coverage_report.sh +++ b/scripts/gen_coverage_report.sh @@ -35,6 +35,8 @@ cov forest --chain calibnet --encrypt-keystore false --import-snapshot "$SNAPSHO cov forest-cli sync wait cov forest-cli sync status cov forest-cli chain validate-tipset-checkpoints +cov forest-cli --chain calibnet db gc +cov forest-cli --chain calibnet db stats cov forest-cli snapshot export cov forest-cli attach --exec 'showPeers()' cov forest-cli net listen diff --git a/utils/forest_utils/Cargo.toml b/utils/forest_utils/Cargo.toml index 9d7fe194e604..53913abf6c43 100644 --- a/utils/forest_utils/Cargo.toml +++ b/utils/forest_utils/Cargo.toml @@ -12,13 +12,16 @@ anyhow.workspace = true async-trait.workspace = true atty.workspace = true blake2b_simd.workspace = true +chrono.workspace = true cid.workspace = true const_format = "0.2.26" digest.workspace = true +flume.workspace = true futures.workspace = true fvm_ipld_blockstore.workspace = true fvm_ipld_encoding.workspace = true fvm_ipld_encoding3.workspace = true +human-repr.workspace = true hyper-rustls.workspace = true hyper.workspace = true libc = "0.2" diff --git a/utils/forest_utils/src/db/file_backed_obj.rs b/utils/forest_utils/src/db/file_backed_obj.rs index 0522bead53d2..663faed1ee36 100644 --- a/utils/forest_utils/src/db/file_backed_obj.rs +++ b/utils/forest_utils/src/db/file_backed_obj.rs @@ -26,6 +26,11 @@ impl FileBacked { &self.inner } + /// Gets a mutable borrow of the inner object + pub fn inner_mut(&mut self) -> &mut T { + &mut self.inner + } + /// Sets the inner object and try sync to file pub fn set_inner(&mut self, inner: T) -> anyhow::Result<()> { self.inner = inner; @@ -90,13 +95,13 @@ impl FileBacked { } /// Syncs the object to the file - fn sync(&self) -> anyhow::Result<()> { + pub fn sync(&self) -> anyhow::Result<()> { let bytes = self.inner().serialize()?; Ok(std::fs::write(&self.path, bytes)?) } /// Try to sync to file if there is some sync period, otherwise syncs - fn try_sync(&mut self) -> anyhow::Result<()> { + pub fn try_sync(&mut self) -> anyhow::Result<()> { if let Some(sync_period) = self.sync_period { let now = SystemTime::now(); if let Some(last_sync) = self.last_sync { diff --git a/utils/forest_utils/src/db/mod.rs b/utils/forest_utils/src/db/mod.rs index 4b77c501e4f2..a57a5608df43 100644 --- a/utils/forest_utils/src/db/mod.rs +++ b/utils/forest_utils/src/db/mod.rs @@ -3,14 +3,23 @@ pub mod file_backed_obj; +use async_trait::async_trait; +use chrono::Utc; use cid::{ multihash::{Code, MultihashDigest}, Cid, }; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::{from_slice, to_vec, DAG_CBOR}; +use human_repr::HumanCount; +use log::info; use serde::{de::DeserializeOwned, ser::Serialize}; +/// DB key size in bytes for estimating reachable data size. Use parity-db value +/// for simplicity. The actual value for other underlying DB might be slightly +/// different but that is negligible for calculating the total reachable data +/// size +pub const DB_KEY_BYTES: usize = 32; /// Extension methods for inserting and retrieving IPLD data with CIDs pub trait BlockstoreExt: Blockstore { /// Get typed object from block store by CID @@ -68,3 +77,40 @@ pub trait BlockstoreExt: Blockstore { } impl BlockstoreExt for T {} + +/// Extension methods for buffered write with manageable limit of RAM usage +#[async_trait] +pub trait BlockstoreBufferedWriteExt: Blockstore + Sized { + async fn buffered_write( + &self, + rx: flume::Receiver<(Cid, Vec)>, + buffer_capacity_bytes: usize, + ) -> anyhow::Result<()> { + let start = Utc::now(); + let mut total_bytes = 0; + let mut total_entries = 0; + let mut estimated_buffer_bytes = 0; + let mut buffer = vec![]; + while let Ok((key, value)) = rx.recv_async().await { + // Key is stored in 32 bytes in paritydb + estimated_buffer_bytes += DB_KEY_BYTES + value.len(); + total_bytes += DB_KEY_BYTES + value.len(); + total_entries += 1; + buffer.push((key, value)); + if estimated_buffer_bytes >= buffer_capacity_bytes { + self.put_many_keyed(std::mem::take(&mut buffer))?; + estimated_buffer_bytes = 0; + } + } + self.put_many_keyed(buffer)?; + info!( + "Buffered write completed: total entries: {total_entries}, total size: {}, took: {}s", + total_bytes.human_count_bytes(), + (Utc::now() - start).num_seconds() + ); + + Ok(()) + } +} + +impl BlockstoreBufferedWriteExt for T {} diff --git a/utils/forest_utils/src/lib.rs b/utils/forest_utils/src/lib.rs index a733224354b8..a472ea9986a3 100644 --- a/utils/forest_utils/src/lib.rs +++ b/utils/forest_utils/src/lib.rs @@ -1,5 +1,6 @@ // Copyright 2019-2023 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT + pub extern crate const_format; pub mod db; diff --git a/utils/genesis/Cargo.toml b/utils/genesis/Cargo.toml index a1fd705f129a..b0c1f22cc2d7 100644 --- a/utils/genesis/Cargo.toml +++ b/utils/genesis/Cargo.toml @@ -12,6 +12,7 @@ testing = [] [dependencies] anyhow.workspace = true cid.workspace = true +flume.workspace = true forest_blocks.workspace = true forest_state_manager.workspace = true forest_utils.workspace = true diff --git a/utils/genesis/src/lib.rs b/utils/genesis/src/lib.rs index 810e8d18c985..c5664c12a610 100644 --- a/utils/genesis/src/lib.rs +++ b/utils/genesis/src/lib.rs @@ -7,7 +7,10 @@ use anyhow::bail; use cid::Cid; use forest_blocks::{BlockHeader, Tipset, TipsetKeys}; use forest_state_manager::StateManager; -use forest_utils::{db::BlockstoreExt, net::FetchProgress}; +use forest_utils::{ + db::{BlockstoreBufferedWriteExt, BlockstoreExt}, + net::FetchProgress, +}; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_car::{load_car, CarReader}; use log::{debug, info}; @@ -117,12 +120,12 @@ where info!("Downloading file..."); let url = Url::parse(path)?; let reader = FetchProgress::fetch_from_url(url).await?; - load_and_retrieve_header(sm.blockstore(), reader, skip_load).await? + load_and_retrieve_header(sm.blockstore().clone(), reader, skip_load).await? } else { info!("Reading file..."); let file = File::open(&path).await?; let reader = FetchProgress::fetch_from_file(file).await?; - load_and_retrieve_header(sm.blockstore(), reader, skip_load).await? + load_and_retrieve_header(sm.blockstore().clone(), reader, skip_load).await? }; info!("Loaded .car file in {}s", stopwatch.elapsed().as_secs()); @@ -162,12 +165,12 @@ where /// Loads car file into database, and returns the block header CIDs from the CAR /// header. async fn load_and_retrieve_header( - store: &DB, + store: DB, reader: FetchProgress, skip_load: bool, ) -> anyhow::Result> where - DB: Blockstore, + DB: Blockstore + Send + Sync + 'static, R: AsyncRead + Send + Unpin, { let mut compat = reader.compat(); @@ -184,22 +187,20 @@ where pub async fn forest_load_car(store: DB, reader: R) -> anyhow::Result> where R: futures::AsyncRead + Send + Unpin, - DB: Blockstore, + DB: Blockstore + Send + Sync + 'static, { // 1GB const BUFFER_CAPCITY_BYTES: usize = 1024 * 1024 * 1024; + let (tx, rx) = flume::bounded(100); + #[allow(clippy::redundant_async_block)] + let write_task = + tokio::spawn(async move { store.buffered_write(rx, BUFFER_CAPCITY_BYTES).await }); let mut car_reader = CarReader::new(reader).await?; - let mut estimated_size = 0; - let mut buffer = vec![]; while let Some(block) = car_reader.next_block().await? { - estimated_size += 64 + block.data.len(); - buffer.push((block.cid, block.data)); - if estimated_size >= BUFFER_CAPCITY_BYTES { - store.put_many_keyed(std::mem::take(&mut buffer))?; - estimated_size = 0; - } + tx.send_async((block.cid, block.data)).await?; } - store.put_many_keyed(buffer)?; + drop(tx); + write_task.await??; Ok(car_reader.header.roots) } diff --git a/utils/statediff/src/main.rs b/utils/statediff/src/main.rs index 4ec7860d6c43..fbabb9edb2e9 100644 --- a/utils/statediff/src/main.rs +++ b/utils/statediff/src/main.rs @@ -4,7 +4,7 @@ use cid::Cid; use clap::Parser; use directories::ProjectDirs; use forest_cli_shared::cli::HELP_MESSAGE; -use forest_db::db_engine::{db_path, open_db, DbConfig}; +use forest_db::db_engine::{db_root, open_proxy_db}; use forest_statediff::print_state_diff; impl crate::Subcommand { @@ -19,7 +19,7 @@ impl crate::Subcommand { let dir = ProjectDirs::from("com", "ChainSafe", "Forest") .ok_or(anyhow::Error::msg("no such path"))?; let chain_path = dir.data_dir().join(chain); - let blockstore = open_db(&db_path(&chain_path), &DbConfig::default())?; + let blockstore = open_proxy_db(db_root(&chain_path), Default::default())?; if let Err(err) = print_state_diff(&blockstore, pre, post, *depth) { eprintln!("Failed to print state diff: {err}");