Skip to content

Commit

Permalink
PruningOptions -> RetentionPolicies
Browse files Browse the repository at this point in the history
make life easier, requires epochs_to_keep with optional overrides. otherwise, no retention policy at all

strumming the pruning config. pruner.rs file will define prunable tables. the toml file when parsed into Rust config will warn if it tries to name any variants not supported by indexer code. additionally, there's also a check to make sure that prunable tables actually exist in the db
  • Loading branch information
wlmyng committed Oct 4, 2024
1 parent 2e97e71 commit da281ff
Show file tree
Hide file tree
Showing 11 changed files with 408 additions and 60 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use sui_graphql_rpc_client::simple_client::SimpleClient;
use sui_indexer::config::PruningOptions;
pub use sui_indexer::config::RetentionConfig;
pub use sui_indexer::config::SnapshotLagConfig;
use sui_indexer::errors::IndexerError;
use sui_indexer::store::PgIndexerStore;
Expand Down Expand Up @@ -151,7 +151,7 @@ pub async fn start_network_cluster() -> NetworkCluster {
pub async fn serve_executor(
executor: Arc<dyn RestStateReader + Send + Sync>,
snapshot_config: Option<SnapshotLagConfig>,
epochs_to_keep: Option<u64>,
retention_config: Option<RetentionConfig>,
data_ingestion_path: PathBuf,
) -> ExecutorCluster {
let database = TempDb::new().unwrap();
Expand Down Expand Up @@ -184,7 +184,7 @@ pub async fn serve_executor(
let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
db_url,
Some(snapshot_config.clone()),
Some(PruningOptions { epochs_to_keep }),
retention_config,
Some(data_ingestion_path),
Some(cancellation_token.clone()),
)
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ regex.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
strum.workspace = true
strum_macros.workspace = true
tap.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true, features = ["rt"] }
toml.workspace = true
tracing.workspace = true
url.workspace = true

Expand Down
9 changes: 5 additions & 4 deletions crates/sui-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ See the [docs](https://docs.sui.io/guides/developer/getting-started/local-networ

Start a local network using the `sui` binary:
```sh
cargo run --bin sui -- start --with-faucet --force-regenesis
cargo run --bin sui -- start --with-faucet --force-regenesis
```

If you want to run a local network with the indexer enabled (note that `libpq` is required), you can run the following command after following the steps in the next section to set up an indexer DB:
Expand Down Expand Up @@ -65,11 +65,12 @@ cargo run --bin sui-indexer -- --db-url "<DATABASE_URL>" --rpc-client-url "https
```
cargo run --bin sui-indexer -- --db-url "<DATABASE_URL>" --rpc-client-url "https://fullnode.devnet.sui.io:443" --rpc-server-worker
```
More flags info can be found in this [file](https://github.com/MystenLabs/sui/blob/main/crates/sui-indexer/src/lib.rs#L83-L123).
More flags info can be found in this [file](src/main.rs#L41).

### DB reset
Run this command under `sui/crates/sui-indexer`, which will wipe DB; In case of schema changes in `.sql` files, this will also update corresponding `schema.rs` file.
When making db-related changes, you may find yourself having to run migrations and reset dbs often. The commands below are how you can invoke these actions.
```sh
diesel database reset --database-url="<DATABASE_URL>"
cargo run --bin sui-indexer -- --database-url "<DATABASE_URL>" reset-database --force
```

## Steps to run locally (TiDB)
Expand Down
226 changes: 222 additions & 4 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::backfill::BackfillTaskKind;
use crate::db::ConnectionPoolConfig;
use crate::{backfill::BackfillTaskKind, handlers::pruner::PrunableTable};
use clap::{Args, Parser, Subcommand};
use std::{net::SocketAddr, path::PathBuf};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, net::SocketAddr, path::PathBuf};
use strum::IntoEnumIterator;
use sui_json_rpc::name_service::NameServiceConfig;
use sui_types::base_types::{ObjectID, SuiAddress};
use url::Url;

/// The primary purpose of objects_history is to serve consistency query.
/// A short retention is sufficient.
const OBJECTS_HISTORY_EPOCHS_TO_KEEP: u64 = 2;

#[derive(Parser, Clone, Debug)]
#[clap(
name = "Sui indexer",
Expand Down Expand Up @@ -208,8 +214,93 @@ pub enum Command {

#[derive(Args, Default, Debug, Clone)]
pub struct PruningOptions {
#[arg(long, env = "EPOCHS_TO_KEEP")]
pub epochs_to_keep: Option<u64>,
/// Path to TOML file containing configuration for retention policies.
#[arg(long)]
pub pruning_config_path: Option<PathBuf>,
}

/// Represents the default retention policy and overrides for prunable tables. Instantiated only if
/// `PruningOptions` is provided on indexer start.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetentionConfig {
/// Default retention policy for all tables.
pub epochs_to_keep: u64,
/// A map of tables to their respective retention policies that will override the default.
/// Prunable tables not named here will use the default retention policy.
#[serde(default)]
pub overrides: HashMap<PrunableTable, u64>,
}

impl PruningOptions {
/// Load default retention policy and overrides from file.
pub fn load_from_file(&self) -> Option<RetentionConfig> {
let config_path = self.pruning_config_path.as_ref()?;

let contents = std::fs::read_to_string(config_path)
.expect("Failed to read default retention policy and overrides from file");
let retention_with_overrides = toml::de::from_str::<RetentionConfig>(&contents)
.expect("Failed to parse into RetentionConfig struct");

let default_retention = retention_with_overrides.epochs_to_keep;

assert!(
default_retention > 0,
"Default retention must be greater than 0"
);
assert!(
retention_with_overrides
.overrides
.values()
.all(|&policy| policy > 0),
"All retention overrides must be greater than 0"
);

Some(retention_with_overrides)
}
}

impl RetentionConfig {
/// Create a new `RetentionConfig` with the specified default retention and overrides. Call
/// `finalize()` on the instance to update the `policies` field with the default retention
/// policy for all tables that do not have an override specified.
pub fn new(epochs_to_keep: u64, overrides: HashMap<PrunableTable, u64>) -> Self {
Self {
epochs_to_keep,
overrides,
}
}

pub fn new_with_default_retention_only_for_testing(epochs_to_keep: u64) -> Self {
let mut overrides = HashMap::new();
overrides.insert(
PrunableTable::ObjectsHistory,
OBJECTS_HISTORY_EPOCHS_TO_KEEP,
);

Self::new(epochs_to_keep, HashMap::new())
}

/// Consumes this struct to produce a full mapping of every prunable table and its retention
/// policy. By default, every prunable table will have the default retention policy from
/// `epochs_to_keep`. Some tables like `objects_history` will observe a different default
/// retention policy. These default values are overridden by any entries in `overrides`.
pub fn retention_policies(self) -> HashMap<PrunableTable, u64> {
let RetentionConfig {
epochs_to_keep,
mut overrides,
} = self;

for table in PrunableTable::iter() {
let default_retention = match table {
PrunableTable::ObjectsHistory => OBJECTS_HISTORY_EPOCHS_TO_KEEP,
_ => epochs_to_keep,
};

overrides.entry(table).or_insert(default_retention);
}

overrides
}
}

#[derive(Args, Debug, Clone)]
Expand Down Expand Up @@ -290,7 +381,9 @@ impl Default for RestoreConfig {
#[cfg(test)]
mod test {
use super::*;
use std::io::Write;
use tap::Pipe;
use tempfile::NamedTempFile;

fn parse_args<'a, T>(args: impl IntoIterator<Item = &'a str>) -> Result<T, clap::error::Error>
where
Expand Down Expand Up @@ -354,4 +447,129 @@ mod test {
// fullnode rpc url must be present
parse_args::<JsonRpcConfig>([]).unwrap_err();
}

#[test]
fn pruning_options_with_objects_history_override() {
let mut temp_file = NamedTempFile::new().unwrap();
let toml_content = r#"
epochs_to_keep = 5
[overrides]
objects_history = 10
transactions = 20
"#;
temp_file.write_all(toml_content.as_bytes()).unwrap();
let temp_path: PathBuf = temp_file.path().to_path_buf();
let pruning_options = PruningOptions {
pruning_config_path: Some(temp_path.clone()),
};
let retention_config = pruning_options.load_from_file().unwrap();

// Assert the parsed values
assert_eq!(retention_config.epochs_to_keep, 5);
assert_eq!(
retention_config
.overrides
.get(&PrunableTable::ObjectsHistory)
.copied(),
Some(10)
);
assert_eq!(
retention_config
.overrides
.get(&PrunableTable::Transactions)
.copied(),
Some(20)
);
assert_eq!(retention_config.overrides.len(), 2);

let retention_policies = retention_config.retention_policies();

for table in PrunableTable::iter() {
let Some(retention) = retention_policies.get(&table).copied() else {
panic!("Expected a retention policy for table {:?}", table);
};

match table {
PrunableTable::ObjectsHistory => assert_eq!(retention, 10),
PrunableTable::Transactions => assert_eq!(retention, 20),
_ => assert_eq!(retention, 5),
};
}
}

#[test]
fn pruning_options_no_objects_history_override() {
let mut temp_file = NamedTempFile::new().unwrap();
let toml_content = r#"
epochs_to_keep = 5
[overrides]
tx_affected_addresses = 10
transactions = 20
"#;
temp_file.write_all(toml_content.as_bytes()).unwrap();
let temp_path: PathBuf = temp_file.path().to_path_buf();
let pruning_options = PruningOptions {
pruning_config_path: Some(temp_path.clone()),
};
let retention_config = pruning_options.load_from_file().unwrap();

// Assert the parsed values
assert_eq!(retention_config.epochs_to_keep, 5);
assert_eq!(
retention_config
.overrides
.get(&PrunableTable::TxAffectedAddresses)
.copied(),
Some(10)
);
assert_eq!(
retention_config
.overrides
.get(&PrunableTable::Transactions)
.copied(),
Some(20)
);
assert_eq!(retention_config.overrides.len(), 2);

let retention_policies = retention_config.retention_policies();

for table in PrunableTable::iter() {
let Some(retention) = retention_policies.get(&table).copied() else {
panic!("Expected a retention policy for table {:?}", table);
};

match table {
PrunableTable::ObjectsHistory => {
assert_eq!(retention, OBJECTS_HISTORY_EPOCHS_TO_KEEP)
}
PrunableTable::TxAffectedAddresses => assert_eq!(retention, 10),
PrunableTable::Transactions => assert_eq!(retention, 20),
_ => assert_eq!(retention, 5),
};
}
}

#[test]
fn test_invalid_pruning_config_file() {
let toml_str = r#"
epochs_to_keep = 5
[overrides]
objects_history = 10
transactions = 20
invalid_table = 30
"#;

let result = toml::from_str::<RetentionConfig>(toml_str);
assert!(result.is_err(), "Expected an error, but parsing succeeded");

if let Err(e) = result {
assert!(
e.to_string().contains("unknown variant `invalid_table`"),
"Error message doesn't mention the invalid table"
);
}
}
}
Loading

0 comments on commit da281ff

Please sign in to comment.