Skip to content

Commit

Permalink
fix: Simplify async drop and ensure init
Browse files Browse the repository at this point in the history
Prevents a bug where the database would not be initialized properly and
cleans up some issues around async processes being cleaned up when
dropped.
  • Loading branch information
johnchildren committed Aug 22, 2024
1 parent 3e29f64 commit c2a0700
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 126 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ nix = "0.26"
reflink-copy = "0.1"
tempfile = "3"
thiserror = "1.0"
tokio = { version = "1.8", features = ["parking_lot", "rt", "sync", "io-util", "process", "macros", "fs"], default-features = false, optional = true }
tokio = { version = "1.39", features = ["parking_lot", "rt", "sync", "io-util", "process", "macros", "fs"], default-features = false, optional = true }
tracing = "0.1"
which = "4.0"

Expand Down
62 changes: 47 additions & 15 deletions src/asynchronous.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use nix::unistd::{Uid, User};
use nix::sys::signal::{self, Signal};
use nix::unistd::{Pid, Uid, User};
use std::convert::TryInto;
use std::path::Path;
use std::process::Stdio;
use std::sync::Arc;
Expand All @@ -7,7 +9,6 @@ use tempfile::TempDir;
use tokio::fs::create_dir_all;
use tokio::io::Lines;
use tokio::process::{ChildStderr, ChildStdout};
use tokio::sync::oneshot::Sender;
use tokio::sync::{Semaphore, SemaphorePermit};
use tokio::{
io::BufReader,
Expand Down Expand Up @@ -172,29 +173,60 @@ pub struct ProcessGuard {
pub stdout_reader: Option<Lines<BufReader<ChildStdout>>>,
/// Allows users to read stderr by line for debugging.
pub stderr_reader: Option<Lines<BufReader<ChildStderr>>>,
/// Connection string for connecting to the temporary postgresql instance.
pub connection_string: String,
/// Parameters for connecting to the temporary postgresql instance.
///
/// A user shouldn't need to use these and should call `connection_string` instead.
///
/// Port number that Postgresql is serving on
pub port: u32,
/// Database name to connect to.
pub db_name: String,
/// Username to connect as.
pub user_name: String,

// Signal that the postgres process should be killed.
pub(crate) send_done: Option<Sender<()>>,
// Prevent the data directory from being dropped while
// the process is running.
pub(crate) _data_directory: TempDir,
// Prevent socket directory from being dropped while
pub(crate) _data_directory: Arc<TempDir>,
// Prevent the cache directory from being dropped while
// the process is running.
pub(crate) _socket_dir: Arc<TempDir>,
pub(crate) _cache_directory: Arc<TempDir>,
/// Socket directory for connection to the running process.
pub(crate) socket_dir: Arc<TempDir>,
pub(crate) postgres_process: Child,
// Limit the total concurrent processes.
pub(crate) _process_permit: SemaphorePermit<'static>,
}

/// Signal that the process needs to end.
impl Drop for ProcessGuard {
fn drop(&mut self) {
if let Some(sender) = self.send_done.take() {
sender
.send(())
.expect("failed to signal postgresql process should be killed.");
}
signal::kill(
Pid::from_raw(self.postgres_process.id().unwrap().try_into().unwrap()),
Signal::SIGINT,
)
.expect("Failed to signal the child should terminate");
// Wait indefinately until the child is finished.
while let Ok(None) = self.postgres_process.try_wait() {}
}
}

impl ProcessGuard {
/// Get a Postgresql format connection String for the process guard.
///
/// # Panics
///
/// Panics if a string file path cannot be obtained from the socket directory.
#[must_use]
pub fn connection_string(&self) -> String {
format!(
"postgresql:///?host={}&port={}&dbname={}&user={}",
self.socket_dir
.path()
.to_str()
.expect("Failed to convert socket directory to a path"),
self.port,
self.db_name,
self.user_name,
)
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ pub enum TmpPostgrustError {
/// Error when the cache directory cannot be created.
#[error("failed to create cache directory")]
CreateCacheDirFailed(#[source] std::io::Error),
/// Error when the data directory cannot be created.
#[error("failed to create cache directory")]
CreateDataDirFailed(#[source] std::io::Error),
/// Error when `cp` fails for the initialized database.
#[error("updating directory permission to non-root failed")]
UpdatingPermissionsFailed(ProcessCapture),
Expand Down
Loading

0 comments on commit c2a0700

Please sign in to comment.