Skip to content

Commit

Permalink
tor-interface: forward arti stdout lines to TorEvent::LogLine in Arti…
Browse files Browse the repository at this point in the history
…TorClient
  • Loading branch information
morganava committed Nov 29, 2024
1 parent 1543085 commit be7cf50
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 12 deletions.
56 changes: 47 additions & 9 deletions source/gosling/crates/tor-interface/src/arti_process.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// standard
use std::fs;
use std::fs::File;
use std::io::Write;
use std::io::{BufRead, BufReader, Write};
use std::ops::Drop;
use std::process;
use std::process::{Child, ChildStdout, Command, Stdio};
use std::path::Path;
use std::sync::{Mutex, Weak};
use std::time::{Duration, Instant};

#[derive(thiserror::Error, Debug)]
Expand All @@ -19,20 +20,26 @@ pub enum Error {
#[error("provided data directory '{0}' must be an absolute path")]
ArtiDataDirectoryPathNotAbsolute(String),

#[error("failed to create data directory")]
#[error("failed to create data directory: {0}")]
ArtiDataDirectoryCreationFailed(#[source] std::io::Error),

#[error("file exists in provided data directory path '{0}'")]
ArtiDataDirectoryPathExistsAsFile(String),

#[error("failed to create arti.toml file")]
#[error("failed to create arti.toml file: {0}")]
ArtiTomlFileCreationFailed(#[source] std::io::Error),

#[error("failed to write arti.toml file")]
#[error("failed to write arti.toml file: {0}")]
ArtiTomlFileWriteFailed(#[source] std::io::Error),

#[error("failed to start arti process")]
#[error("failed to start arti process: {0}")]
ArtiProcessStartFailed(#[source] std::io::Error),

#[error("unable to take arti process stdout")]
ArtiProcessStdoutTakeFailed(),

#[error("failed to spawn arti process stdout read thread: {0}")]
ArtiStdoutReadThreadSpawnFailed(#[source] std::io::Error),
}

pub(crate) struct ArtiProcess {
Expand All @@ -41,7 +48,7 @@ pub(crate) struct ArtiProcess {
}

impl ArtiProcess {
pub fn new(arti_bin_path: &Path, data_directory: &Path) -> Result<Self, Error> {
pub fn new(arti_bin_path: &Path, data_directory: &Path, stdout_lines: Weak<Mutex<Vec<String>>>) -> Result<Self, Error> {
// verify provided paths are absolute
if arti_bin_path.is_relative() {
return Err(Error::ArtiBinPathNotAbsolute(format!(
Expand Down Expand Up @@ -94,9 +101,8 @@ impl ArtiProcess {
.write_all(arti_toml_content.as_bytes())
.map_err(Error::ArtiTomlFileWriteFailed)?;

let process = Command::new(arti_bin_path.as_os_str())
// TODO: make this pipe() and fwd log events
.stdout(Stdio::inherit())
let mut process = Command::new(arti_bin_path.as_os_str())
.stdout(Stdio::piped())
.stdin(Stdio::null())
.stderr(Stdio::null())
// set working directory to data directory
Expand All @@ -109,6 +115,18 @@ impl ArtiProcess {
.spawn()
.map_err(Error::ArtiProcessStartFailed)?;

// spawn a task to read stdout lines and forward to list
let stdout = BufReader::new(match process.stdout.take() {
Some(stdout) => stdout,
None => return Err(Error::ArtiProcessStdoutTakeFailed()),
});
std::thread::Builder::new()
.name("arti_stdout_reader".to_string())
.spawn(move || {
ArtiProcess::read_stdout_task(&stdout_lines, stdout);
})
.map_err(Error::ArtiStdoutReadThreadSpawnFailed)?;

let connect_string = format!("unix:{rpc_listen}");

Ok(ArtiProcess { process, connect_string })
Expand All @@ -117,6 +135,26 @@ impl ArtiProcess {
pub fn connect_string(&self) -> &str {
self.connect_string.as_str()
}

fn read_stdout_task(
stdout_lines: &std::sync::Weak<Mutex<Vec<String>>>,
mut stdout: BufReader<ChildStdout>,
) {
while let Some(stdout_lines) = stdout_lines.upgrade() {
let mut line = String::default();
// read line
if stdout.read_line(&mut line).is_ok() {
// remove trailing '\n'
line.pop();
// then acquire the lock on the line buffer
let mut stdout_lines = match stdout_lines.lock() {
Ok(stdout_lines) => stdout_lines,
Err(_) => unreachable!(),
};
stdout_lines.push(line);
}
}
}
}

impl Drop for ArtiProcess {
Expand Down
27 changes: 24 additions & 3 deletions source/gosling/crates/tor-interface/src/arti_tor_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub enum ArtiTorClientConfig {
pub struct ArtiTorClient {
daemon: Option<ArtiProcess>,
rpc_conn: RpcConn,
pending_log_lines: Arc<Mutex<Vec<String>>>,
pending_events: Arc<Mutex<Vec<TorEvent>>>,
bootstrapped: bool,
// our list of circuit tokens for the arti daemon
Expand All @@ -65,14 +66,17 @@ pub struct ArtiTorClient {

impl ArtiTorClient {
pub fn new(config: ArtiTorClientConfig) -> Result<Self, tor_provider::Error> {
let pending_log_lines: Arc<Mutex<Vec<String>>> = Default::default();

let (daemon, rpc_conn) = match &config {
ArtiTorClientConfig::BundledArti {
arti_bin_path,
data_directory,
} => {

// launch arti
let daemon =
ArtiProcess::new(arti_bin_path.as_path(), data_directory.as_path())
ArtiProcess::new(arti_bin_path.as_path(), data_directory.as_path(), Arc::downgrade(&pending_log_lines))
.map_err(Error::ArtiProcessCreationFailed)?;

let builder = RpcConnBuilder::from_connect_string(daemon.connect_string()).unwrap();
Expand Down Expand Up @@ -109,6 +113,7 @@ impl ArtiTorClient {
Ok(Self {
daemon: Some(daemon),
rpc_conn,
pending_log_lines,
pending_events,
bootstrapped: false,
circuit_token_counter: 0,
Expand All @@ -120,12 +125,28 @@ impl ArtiTorClient {
impl TorProvider for ArtiTorClient {
fn update(&mut self) -> Result<Vec<TorEvent>, tor_provider::Error> {
std::thread::sleep(std::time::Duration::from_millis(16));
match self.pending_events.lock() {
Ok(mut pending_events) => Ok(std::mem::take(pending_events.deref_mut())),
let mut tor_events = match self.pending_events.lock() {
Ok(mut pending_events) => std::mem::take(pending_events.deref_mut()),
Err(_) => {
unreachable!("another thread panicked while holding this pending_events mutex")
}
};
// take our log lines
let mut log_lines = match self.pending_log_lines.lock() {
Ok(mut pending_log_lines) => std::mem::take(pending_log_lines.deref_mut()),
Err(_) => {
unreachable!("another thread panicked while holding this pending_log_lines mutex")
}
};

// append raw lines as TorEvent
for log_line in log_lines.iter_mut() {
tor_events.push(TorEvent::LogReceived {
line: std::mem::take(log_line),
});
}

Ok(tor_events)
}

fn bootstrap(&mut self) -> Result<(), tor_provider::Error> {
Expand Down

0 comments on commit be7cf50

Please sign in to comment.