From 4174e14b7c737883ad52d9cbc54d160b31a91a3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Agust=C3=ADn=20Borgna?= <121866228+aborgna-q@users.noreply.github.com> Date: Thu, 5 Oct 2023 16:39:37 +0200 Subject: [PATCH] feat: Embarrassingly parallel TASO (#149) Use the `CircuitChunks` splitter for running TASO on different parts of the circuit. - Adds a `--split-circ` flag to the taso optimizer bin, to split the circuit before running instead of sharing the full circuit between threads. - Adds a `node_cost` parameter to the splitter to split circuits according to our cost functions. I'm getting some really good results with this, we should test if they are correct :P Optimizing `barenco_tof_10.json` (192 CX gates): - Single thread, 60s timeout: - min cost: 166 CX gates - peak memory usage: 8.48 GB - 2 threads (96-CX chunks), 60s timeout: - min cost: 141 CX gates - peak memory usage: 8.06 GB - 2 threads (96-CX chunks), 30s timeout followed by single thread for 30s: - min cost: 126 CX gates - peak memory usage: 8.01 GB (these are noisy results, as I'm running them on a non-idle laptop) --- Edit: Updated measurements Optimizing barenco_tof_10.json (192 CX gates): - Single thread, 60s timeout: - min cost: 138 CX gates - peak memory usage: 3.20 GB - 2 threads (96-CX chunks), 60s timeout: - min cost: 126 CX gates - peak memory usage: 3.06 GB - 2 threads (96-CX chunks), 30s timeout followed by single thread for 30s: - min cost: 124 CX gates - peak memory usage: 3.05 GB --- Cargo.toml | 2 +- pyrs/src/optimiser.rs | 17 ++++++ src/optimiser/taso.rs | 98 +++++++++++++++++++++++++++++------ src/optimiser/taso/log.rs | 8 +-- taso-optimiser/src/main.rs | 15 +++++- taso-optimiser/src/tracing.rs | 12 +++-- 6 files changed, 125 insertions(+), 27 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1283ca25..37f63f63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,7 @@ fxhash = "0.2.1" rmp-serde = { version = "1.1.2", optional = true } delegate = "0.10.0" csv = { version = "1.2.2" } -chrono = { version ="0.4.30" } +chrono = { version = "0.4.30" } bytemuck = "1.14.0" stringreader = "0.1.1" crossbeam-channel = "0.5.8" diff --git a/pyrs/src/optimiser.rs b/pyrs/src/optimiser.rs index bd7b6a4c..a6efab45 100644 --- a/pyrs/src/optimiser.rs +++ b/pyrs/src/optimiser.rs @@ -1,5 +1,6 @@ //! PyO3 wrapper for the TASO circuit optimiser. +use std::io::BufWriter; use std::{fs, num::NonZeroUsize, path::PathBuf}; use pyo3::prelude::*; @@ -42,16 +43,31 @@ impl PyDefaultTasoOptimiser { /// /// Returns an optimised circuit and optionally log the progress to a CSV /// file. + /// + /// # Parameters + /// + /// * `circ`: The circuit to optimise. + /// * `timeout`: The timeout in seconds. + /// * `n_threads`: The number of threads to use. + /// * `split_circ`: Whether to split the circuit into chunks before + /// processing. + /// + /// If this option is set, the optimise will divide the circuit into + /// `n_threads` chunks and optimise each on a separate thread. + /// * `log_progress`: The path to a CSV file to log progress to. + /// pub fn optimise( &self, circ: PyObject, timeout: Option, n_threads: Option, + split_circ: Option, log_progress: Option, ) -> PyResult { let taso_logger = log_progress .map(|file_name| { let log_file = fs::File::create(file_name).unwrap(); + let log_file = BufWriter::new(log_file); TasoLogger::new(log_file) }) .unwrap_or_default(); @@ -61,6 +77,7 @@ impl PyDefaultTasoOptimiser { taso_logger, timeout, n_threads.unwrap_or(NonZeroUsize::new(1).unwrap()), + split_circ.unwrap_or(false), ) }) } diff --git a/src/optimiser/taso.rs b/src/optimiser/taso.rs index 1cfa0568..a15e8c5d 100644 --- a/src/optimiser/taso.rs +++ b/src/optimiser/taso.rs @@ -21,19 +21,24 @@ mod worker; use crossbeam_channel::select; pub use eq_circ_class::{load_eccs_json_file, EqCircClass}; use fxhash::FxHashSet; +use hugr::hugr::HugrError; pub use log::TasoLogger; use std::num::NonZeroUsize; use std::time::{Duration, Instant}; +use std::{mem, thread}; use hugr::Hugr; +use crate::circuit::cost::CircuitCost; use crate::circuit::CircuitHash; use crate::optimiser::taso::hugr_pchannel::{HugrPriorityChannel, PriorityChannelLog}; use crate::optimiser::taso::hugr_pqueue::{Entry, HugrPQ}; use crate::optimiser::taso::worker::TasoWorker; +use crate::passes::CircuitChunks; use crate::rewrite::strategy::RewriteStrategy; use crate::rewrite::Rewriter; +use crate::Circuit; /// The TASO optimiser. /// @@ -75,16 +80,19 @@ impl TasoOptimiser where R: Rewriter + Send + Clone + 'static, S: RewriteStrategy + Send + Sync + Clone + 'static, - S::Cost: serde::Serialize, + S::Cost: serde::Serialize + Send + Sync, { /// Run the TASO optimiser on a circuit. /// /// A timeout (in seconds) can be provided. - pub fn optimise(&self, circ: &Hugr, timeout: Option, n_threads: NonZeroUsize) -> Hugr - where - S::Cost: Send + Sync + Clone, - { - self.optimise_with_log(circ, Default::default(), timeout, n_threads) + pub fn optimise( + &self, + circ: &Hugr, + timeout: Option, + n_threads: NonZeroUsize, + split_circuit: bool, + ) -> Hugr { + self.optimise_with_log(circ, Default::default(), timeout, n_threads, split_circuit) } /// Run the TASO optimiser on a circuit with logging activated. @@ -96,10 +104,13 @@ where log_config: TasoLogger, timeout: Option, n_threads: NonZeroUsize, - ) -> Hugr - where - S::Cost: Send + Sync + Clone, - { + split_circuit: bool, + ) -> Hugr { + if split_circuit && n_threads.get() > 1 { + return self + .split_run(circ, log_config, timeout, n_threads) + .unwrap(); + } match n_threads.get() { 1 => self.taso(circ, log_config, timeout), _ => self.taso_multithreaded(circ, log_config, timeout, n_threads), @@ -177,10 +188,7 @@ where mut logger: TasoLogger, timeout: Option, n_threads: NonZeroUsize, - ) -> Hugr - where - S::Cost: Send + Sync + Clone, - { + ) -> Hugr { let n_threads: usize = n_threads.get(); const PRIORITY_QUEUE_CAPACITY: usize = 10_000; @@ -276,6 +284,66 @@ where best_circ } + + /// Split the circuit into chunks and process each in a separate thread. + #[tracing::instrument(target = "taso::metrics", skip(self, circ, logger))] + fn split_run( + &self, + circ: &Hugr, + mut logger: TasoLogger, + timeout: Option, + n_threads: NonZeroUsize, + ) -> Result { + let circ_cost = self.cost(circ); + let max_chunk_cost = circ_cost.clone().div_cost(n_threads); + logger.log(format!( + "Splitting circuit with cost {:?} into chunks of at most {max_chunk_cost:?}.", + circ_cost.clone() + )); + let mut chunks = + CircuitChunks::split_with_cost(circ, max_chunk_cost, |op| self.strategy.op_cost(op)); + + logger.log_best(circ_cost.clone()); + + let (joins, rx_work): (Vec<_>, Vec<_>) = chunks + .iter_mut() + .enumerate() + .map(|(i, chunk)| { + let (tx, rx) = crossbeam_channel::unbounded(); + let taso = self.clone(); + let chunk = mem::take(chunk); + let chunk_cx_cost = chunk.circuit_cost(|op| self.strategy.op_cost(op)); + logger.log(format!("Chunk {i} has {chunk_cx_cost:?} CX gates",)); + let join = thread::Builder::new() + .name(format!("chunk-{}", i)) + .spawn(move || { + let res = + taso.optimise(&chunk, timeout, NonZeroUsize::new(1).unwrap(), false); + tx.send(res).unwrap(); + }) + .unwrap(); + (join, rx) + }) + .unzip(); + + for i in 0..chunks.len() { + let res = rx_work[i] + .recv() + .unwrap_or_else(|_| panic!("Worker thread panicked")); + chunks[i] = res; + } + + let best_circ = chunks.reassemble()?; + let best_circ_cost = self.cost(&best_circ); + if best_circ_cost.clone() < circ_cost { + logger.log_best(best_circ_cost.clone()); + } + + logger.log_processing_end(n_threads.get(), best_circ_cost, true, false); + joins.into_iter().for_each(|j| j.join().unwrap()); + + Ok(best_circ) + } } #[cfg(feature = "portmatching")] @@ -360,7 +428,7 @@ mod tests { #[rstest] fn rz_rz_cancellation(rz_rz: Hugr, taso_opt: DefaultTasoOptimiser) { - let opt_rz = taso_opt.optimise(&rz_rz, None, 1.try_into().unwrap()); + let opt_rz = taso_opt.optimise(&rz_rz, None, 1.try_into().unwrap(), false); let cmds = opt_rz .commands() .map(|cmd| { diff --git a/src/optimiser/taso/log.rs b/src/optimiser/taso/log.rs index 9a44f717..e88cbe41 100644 --- a/src/optimiser/taso/log.rs +++ b/src/optimiser/taso/log.rs @@ -80,15 +80,15 @@ impl<'w> TasoLogger<'w> { } } - /// Internal function to log general events, normally printed to stdout. + /// Log general events, normally printed to stdout. #[inline] - fn log(&self, msg: impl AsRef) { + pub fn log(&self, msg: impl AsRef) { tracing::info!(target: LOG_TARGET, "{}", msg.as_ref()); } - /// Internal function to log information on the progress of the optimization. + /// Log verbose information on the progress of the optimization. #[inline] - fn progress(&self, msg: impl AsRef) { + pub fn progress(&self, msg: impl AsRef) { tracing::info!(target: PROGRESS_TARGET, "{}", msg.as_ref()); } } diff --git a/taso-optimiser/src/main.rs b/taso-optimiser/src/main.rs index 1d06c646..abb887c6 100644 --- a/taso-optimiser/src/main.rs +++ b/taso-optimiser/src/main.rs @@ -78,6 +78,12 @@ struct CmdLineArgs { help = "The number of threads to use. By default, use a single thread." )] n_threads: Option, + /// Number of threads (default=1) + #[arg( + long = "split-circ", + help = "Split the circuit into chunks and optimize each one in a separate thread. Use `-j` to specify the number of threads to use." + )] + split_circ: bool, } fn main() -> Result<(), Box> { @@ -86,7 +92,7 @@ fn main() -> Result<(), Box> { // Setup tracing subscribers for stdout and file logging. // // We need to keep the object around to keep the logging active. - let _tracer = Tracer::setup_tracing(opts.logfile); + let _tracer = Tracer::setup_tracing(opts.logfile, opts.split_circ); let input_path = Path::new(&opts.input); let output_path = Path::new(&opts.output); @@ -115,8 +121,13 @@ fn main() -> Result<(), Box> { .unwrap_or(NonZeroUsize::new(1).unwrap()); println!("Using {n_threads} threads"); + if opts.split_circ && n_threads.get() > 1 { + println!("Splitting circuit into {n_threads} chunks."); + } + println!("Optimising..."); - let opt_circ = optimiser.optimise_with_log(&circ, taso_logger, opts.timeout, n_threads); + let opt_circ = + optimiser.optimise_with_log(&circ, taso_logger, opts.timeout, n_threads, opts.split_circ); println!("Saving result"); save_tk1_json_file(&opt_circ, output_path)?; diff --git a/taso-optimiser/src/tracing.rs b/taso-optimiser/src/tracing.rs index 0e90f0a5..30172e3a 100644 --- a/taso-optimiser/src/tracing.rs +++ b/taso-optimiser/src/tracing.rs @@ -32,11 +32,11 @@ pub struct Tracer { impl Tracer { /// Setup tracing subscribers for stdout and file logging. - pub fn setup_tracing(logfile: Option) -> Self { + pub fn setup_tracing(logfile: Option, show_threads: bool) -> Self { let mut tracer = Self::default(); tracing_subscriber::registry() - .with(tracer.stdout_layer()) - .with(logfile.map(|f| tracer.logfile_layer(f))) + .with(tracer.stdout_layer(show_threads)) + .with(logfile.map(|f| tracer.logfile_layer(f, show_threads))) .init(); tracer } @@ -48,7 +48,7 @@ impl Tracer { } /// Clean log with the most important events. - fn stdout_layer(&mut self) -> impl Layer + fn stdout_layer(&mut self, show_threads: bool) -> impl Layer where S: Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>, { @@ -56,10 +56,11 @@ impl Tracer { .without_time() .with_target(false) .with_level(false) + .with_thread_names(show_threads) .with_filter(filter_fn(log_filter)) } - fn logfile_layer(&mut self, logfile: PathBuf) -> impl Layer + fn logfile_layer(&mut self, logfile: PathBuf, show_threads: bool) -> impl Layer where S: Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>, { @@ -67,6 +68,7 @@ impl Tracer { self.logfile = Some(guard); tracing_subscriber::fmt::layer() .with_ansi(false) + .with_thread_names(show_threads) .with_writer(non_blocking) .with_filter(filter_fn(verbose_filter)) }