Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement container db folder cleanup on startup #746

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions client/service-container-chain/src/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ pub struct ContainerChainSpawner<
/// State
pub state: Arc<Mutex<ContainerChainSpawnerState>>,

/// Before the first assignment, there is a db cleanup process that removes folders of container
/// chains that we are no longer assigned to.
pub db_folder_cleanup_done: bool,

/// Async callback that enables collation on the orchestrator chain
pub collate_on_tanssi:
Arc<dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync>,
Expand Down Expand Up @@ -692,6 +696,19 @@ impl<

/// Handle `CcSpawnMsg::UpdateAssignment`
async fn handle_update_assignment(&mut self, current: Option<ParaId>, next: Option<ParaId>) {
if !self.db_folder_cleanup_done {
self.db_folder_cleanup_done = true;

// Disabled when running with --keep-db
let keep_db = self.params.container_chain_cli.base.keep_db;
if !keep_db {
let mut chains_to_keep = HashSet::new();
chains_to_keep.extend(current);
chains_to_keep.extend(next);
self.db_folder_cleanup(&chains_to_keep);
}
}

let HandleUpdateAssignmentResult {
chains_to_stop,
chains_to_start,
Expand Down Expand Up @@ -767,6 +784,64 @@ impl<
self.spawn(para_id, start_collation).await;
}
}

fn db_folder_cleanup(&self, chains_to_keep: &HashSet<ParaId>) {
// "containers" folder
let mut base_path = self
.params
.container_chain_cli
.base
.base
.shared_params
.base_path
.as_ref()
.expect("base_path is always set")
.to_owned();

// "containers/chains"
base_path.push("chains");

// Inside chains folder we have container folders such as
// containers/chains/simple_container_2000/
// containers/chains/frontier_container_2001/
// But this is not the para id, it's the chain id which we have set to include the para id, but that's not mandatory.
// To get the para id we need to look for the paritydb folder:
// containers/chains/frontier_container_2001/paritydb/full-container-2001/
let mut chain_folders = sort_container_folders_by_para_id(&base_path);

// Keep chains that we are assigned to
for para_id in chains_to_keep {
chain_folders.remove(&Some(*para_id));
}

// Print nice log message when removing folders
if !chain_folders.is_empty() {
let chain_folders_fmt = chain_folders
.iter()
.flat_map(|(para_id, vec_paths)| {
let para_id_fmt = if let Some(para_id) = para_id {
para_id.to_string()
} else {
"None".to_string()
};
vec_paths
.iter()
.map(move |path| format!("\n{}: {}", para_id_fmt, path.display()))
})
.collect::<String>();
log::info!(
"db_folder_cleanup: removing container folders: (para_id, path):{}",
chain_folders_fmt
);
}

// Remove, ignoring errors
for (_para_id, folders) in chain_folders {
for folder in folders {
let _ = std::fs::remove_dir_all(&folder);
}
}
}
}

struct HandleUpdateAssignmentResult {
Expand Down Expand Up @@ -1117,6 +1192,99 @@ fn check_paritydb_lock_held(db_path: &Path) -> Result<bool, std::io::Error> {
Ok(lock_held)
}

fn sort_container_folders_by_para_id(
chains_folder_path: &Path,
) -> HashMap<Option<ParaId>, Vec<PathBuf>> {
let mut h = HashMap::new();

let entry_iter = std::fs::read_dir(chains_folder_path);
let entry_iter = match entry_iter {
Ok(x) => x,
Err(_e) => return h,
};

for entry in entry_iter {
let entry = match entry {
Ok(x) => x,
Err(_e) => continue,
};

let path = entry.path();
if path.is_dir() {
if let Ok(para_id) = process_container_folder_get_para_id(path.clone()) {
h.entry(para_id).or_default().push(path);
}
}
}

h
}

fn process_container_folder_get_para_id(path: PathBuf) -> std::io::Result<Option<ParaId>> {
// Build the path to the paritydb directory
let paritydb_path = path.join("paritydb");

// Check if the paritydb directory exists and is a directory
if !paritydb_path.is_dir() {
// If not, associate the path with `None` in the hashmap
return Ok(None);
}

// Read the entries in the paritydb directory
let entry_iter = std::fs::read_dir(&paritydb_path)?;

let mut para_id: Option<ParaId> = None;

// Iterate over each entry in the paritydb directory
for entry in entry_iter {
let entry = entry?;
let sub_path = entry.path();

// Only consider directories
if !sub_path.is_dir() {
continue;
}

let sub_path_file_name = match sub_path.file_name().and_then(|s| s.to_str()) {
Some(x) => x,
None => {
continue;
}
};

// That follow this pattern
if !sub_path_file_name.starts_with("full-container-") {
continue;
}

if let Some(id) = parse_para_id_from_folder_name(sub_path_file_name) {
if para_id.is_some() {
// If there is more than one folder with a para id, assume this folder is
// corrupted and ignore it, keep it for manual deletion
return Err(std::io::Error::new(std::io::ErrorKind::Other, ""));
}
para_id = Some(id);
}
}

Ok(para_id)
}

// Input:
// full-container-2000
// Output:
// Some(2000)
fn parse_para_id_from_folder_name(folder_name: &str) -> Option<ParaId> {
// Find last '-' in string
let idx = folder_name.rfind('-')?;
// +1 to skip the '-'
let id_str = &folder_name[idx + 1..];
// Try to parse as u32, in case of error return None
let id = id_str.parse::<u32>().ok()?;

Some(id.into())
}

#[cfg(test)]
mod tests {
use {super::*, std::path::PathBuf};
Expand Down Expand Up @@ -1568,4 +1736,16 @@ mod tests {
)
)
}

#[test]
fn para_id_from_folder_name() {
assert_eq!(parse_para_id_from_folder_name(""), None,);
assert_eq!(parse_para_id_from_folder_name("full"), None,);
assert_eq!(parse_para_id_from_folder_name("full-container"), None,);
assert_eq!(parse_para_id_from_folder_name("full-container-"), None,);
assert_eq!(
parse_para_id_from_folder_name("full-container-2000"),
Some(ParaId::from(2000)),
);
}
}
3 changes: 3 additions & 0 deletions container-chains/nodes/frontier/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,9 @@ fn rpc_provider_mode(cli: Cli, profile_id: u64) -> Result<()> {
phantom: PhantomData,
},
state: Default::default(),
// db cleanup task disabled here because it uses collator assignment to decide
// which folders to keep and this is not a collator, this is an rpc node
db_folder_cleanup_done: true,
collate_on_tanssi: Arc::new(|| {
panic!("Called collate_on_tanssi outside of Tanssi node")
}),
Expand Down
3 changes: 3 additions & 0 deletions container-chains/nodes/simple/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,9 @@ fn rpc_provider_mode(cli: Cli, profile_id: u64) -> Result<()> {
phantom: PhantomData,
},
state: Default::default(),
// db cleanup task disabled here because it uses collator assignment to decide
// which folders to keep and this is not a collator, this is an rpc node
db_folder_cleanup_done: true,
collate_on_tanssi: Arc::new(|| {
panic!("Called collate_on_tanssi outside of Tanssi node")
}),
Expand Down
2 changes: 2 additions & 0 deletions node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ async fn start_node_impl(
phantom: PhantomData,
},
state: Default::default(),
db_folder_cleanup_done: false,
collate_on_tanssi,
collation_cancellation_constructs: None,
};
Expand Down Expand Up @@ -826,6 +827,7 @@ pub async fn start_solochain_node(
phantom: PhantomData,
},
state: Default::default(),
db_folder_cleanup_done: false,
collate_on_tanssi,
collation_cancellation_constructs: None,
};
Expand Down
Loading