Skip to content

Commit

Permalink
feat: Tracing instrumentation and structured logging (#115)
Browse files Browse the repository at this point in the history
- Adds some minimal `tracing` instrumentation. The idea is to do some
multithreaded perf debugging in the future based on that.
- Leverages the same infrastructure to refactor the TASO logger, now
events are logged into the global logger, and the executable subscribes
to either general events (for stdout), or more granular data (for the
logfile).
  • Loading branch information
aborgna-q authored Sep 22, 2023
1 parent f7e9237 commit 6378d33
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 216 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ chrono = { version ="0.4.30" }
bytemuck = "1.14.0"
stringreader = "0.1.1"
crossbeam-channel = "0.5.8"
tracing = { workspace = true }

[features]
pyo3 = [
Expand Down Expand Up @@ -79,3 +80,4 @@ itertools = { version = "0.11.0" }
tket-json-rs = { git = "https://github.com/CQCL/tket-json-rs", rev = "619db15d3", features = [
"tket2ops",
] }
tracing = "0.1.37"
238 changes: 50 additions & 188 deletions src/optimiser/taso.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,28 @@
mod eq_circ_class;
mod hugr_pchannel;
mod hugr_pqueue;
pub mod log;
mod qtz_circuit;
mod worker;

use crossbeam_channel::select;
pub use eq_circ_class::{load_eccs_json_file, EqCircClass};

use std::io;
use std::num::NonZeroUsize;
use std::time::{Duration, Instant};
use std::{fs, io};

use fxhash::FxHashSet;
use hugr::Hugr;

use crate::circuit::CircuitHash;
use crate::json::save_tk1_json_writer;
use crate::optimiser::taso::hugr_pchannel::HugrPriorityChannel;
use crate::optimiser::taso::hugr_pqueue::{Entry, HugrPQ};
use crate::optimiser::taso::worker::TasoWorker;
use crate::rewrite::strategy::RewriteStrategy;
use crate::rewrite::Rewriter;
use hugr_pqueue::{Entry, HugrPQ};

use self::hugr_pchannel::HugrPriorityChannel;

/// Logging configuration for the TASO optimiser.
#[derive(Default)]
pub struct LogConfig<'w> {
final_circ_json: Option<Box<dyn io::Write + 'w>>,
circ_candidates_csv: Option<Box<dyn io::Write + 'w>>,
progress_log: Option<Box<dyn io::Write + 'w>>,
}

impl<'w> LogConfig<'w> {
/// Create a new logging configuration.
///
/// Three writer objects must be provided:
/// - best_circ_json: for the final optimised circuit, in TK1 JSON format,
/// - circ_candidates_csv: for a log of the successive best candidate circuits,
/// - progress_log: for a log of the progress of the optimisation.
pub fn new(
best_circ_json: impl io::Write + 'w,
circ_candidates_csv: impl io::Write + 'w,
progress_log: impl io::Write + 'w,
) -> Self {
Self {
final_circ_json: Some(Box::new(best_circ_json)),
circ_candidates_csv: Some(Box::new(circ_candidates_csv)),
progress_log: Some(Box::new(progress_log)),
}
}
}
use self::log::TasoLogger;

/// The TASO optimiser.
///
Expand Down Expand Up @@ -116,7 +90,7 @@ where
pub fn optimise_with_log(
&self,
circ: &Hugr,
log_config: LogConfig,
log_config: TasoLogger,
timeout: Option<u64>,
n_threads: NonZeroUsize,
) -> Hugr {
Expand All @@ -126,37 +100,13 @@ where
}
}

/// Run the TASO optimiser on a circuit with default logging.
///
/// The following files will be created:
/// - `final_circ.json`: the final optimised circuit, in TK1 JSON format,
/// - `best_circs.csv`: a log of the successive best candidate circuits,
/// - `taso-optimisation.log`: a log of the progress of the optimisation.
///
/// If the creation of any of these files fails, an error is returned.
///
/// A timeout (in seconds) can be provided.
pub fn optimise_with_default_log(
&self,
circ: &Hugr,
timeout: Option<u64>,
n_threads: NonZeroUsize,
) -> io::Result<Hugr> {
let final_circ_json = fs::File::create("final_circ.json")?;
let circ_candidates_csv = fs::File::create("best_circs.csv")?;
let progress_log = fs::File::create("taso-optimisation.log")?;
let log_config = LogConfig::new(final_circ_json, circ_candidates_csv, progress_log);
Ok(self.optimise_with_log(circ, log_config, timeout, n_threads))
}

fn taso(&self, circ: &Hugr, mut log_config: LogConfig, timeout: Option<u64>) -> Hugr {
#[tracing::instrument(target = "taso::metrics", skip(self, circ, logger))]
fn taso(&self, circ: &Hugr, mut logger: TasoLogger, timeout: Option<u64>) -> Hugr {
let start_time = Instant::now();

let mut log_candidates = log_config.circ_candidates_csv.map(csv::Writer::from_writer);

let mut best_circ = circ.clone();
let mut best_circ_cost = (self.cost)(circ);
log_best(best_circ_cost, log_candidates.as_mut()).unwrap();
logger.log_best(best_circ_cost);

// Hash of seen circuits. Dot not store circuits as this map gets huge
let mut seen_hashes: FxHashSet<_> = FromIterator::from_iter([(circ.circuit_hash())]);
Expand All @@ -167,26 +117,19 @@ where
pq.push(circ.clone());

let mut circ_cnt = 1;
let mut timeout_flag = false;
while let Some(Entry { circ, cost, .. }) = pq.pop() {
if cost < best_circ_cost {
best_circ = circ.clone();
best_circ_cost = cost;
log_best(best_circ_cost, log_candidates.as_mut()).unwrap();
logger.log_best(best_circ_cost);
}

let rewrites = self.rewriter.get_rewrites(&circ);
for new_circ in self.strategy.apply_rewrites(rewrites, &circ) {
let new_circ_hash = new_circ.circuit_hash();
circ_cnt += 1;
if circ_cnt % 1000 == 0 {
log_progress(
log_config.progress_log.as_mut(),
circ_cnt,
Some(&pq),
&seen_hashes,
)
.expect("Failed to write to progress log");
}
logger.log_progress(circ_cnt, Some(pq.len()), seen_hashes.len());
if seen_hashes.contains(&new_circ_hash) {
continue;
}
Expand All @@ -201,41 +144,31 @@ where

if let Some(timeout) = timeout {
if start_time.elapsed().as_secs() > timeout {
println!("Timeout");
timeout_flag = true;
break;
}
}
}

log_processing_end(circ_cnt, false);

log_final(
&best_circ,
log_config.progress_log.as_mut(),
log_config.final_circ_json.as_mut(),
&self.cost,
)
.expect("Failed to write to progress log and/or final circuit JSON");

logger.log_processing_end(circ_cnt, best_circ_cost, false, timeout_flag);
best_circ
}

/// Run the TASO optimiser on a circuit, using multiple threads.
///
/// This is the multi-threaded version of [`taso`]. See [`TasoOptimiser`] for
/// more details.
#[tracing::instrument(target = "taso::metrics", skip(self, circ, logger))]
fn taso_multithreaded(
&self,
circ: &Hugr,
mut log_config: LogConfig,
mut logger: TasoLogger,
timeout: Option<u64>,
n_threads: NonZeroUsize,
) -> Hugr {
let n_threads: usize = n_threads.get();
const PRIORITY_QUEUE_CAPACITY: usize = 10_000;

let mut log_candidates = log_config.circ_candidates_csv.map(csv::Writer::from_writer);

// multi-consumer priority channel for queuing circuits to be processed by the workers
let (tx_work, rx_work) =
HugrPriorityChannel::init((self.cost).clone(), PRIORITY_QUEUE_CAPACITY * n_threads);
Expand All @@ -245,20 +178,21 @@ where
let initial_circ_hash = circ.circuit_hash();
let mut best_circ = circ.clone();
let mut best_circ_cost = (self.cost)(&best_circ);
log_best(best_circ_cost, log_candidates.as_mut()).unwrap();
logger.log_best(best_circ_cost);

// Hash of seen circuits. Dot not store circuits as this map gets huge
let mut seen_hashes: FxHashSet<_> = FromIterator::from_iter([(initial_circ_hash)]);

// Each worker waits for circuits to scan for rewrites using all the
// patterns and sends the results back to main.
let joins: Vec<_> = (0..n_threads)
.map(|_| {
worker::spawn_pattern_matching_thread(
.map(|i| {
TasoWorker::spawn(
rx_work.clone(),
tx_result.clone(),
self.rewriter.clone(),
self.strategy.clone(),
Some(format!("taso-worker-{i}")),
)
})
.collect();
Expand All @@ -283,46 +217,47 @@ where
let mut jobs_completed = 0usize;
// TODO: Report dropped jobs in the queue, so we can check for termination.

// Deadline for the optimization timeout
// Deadline for the optimisation timeout
let timeout_event = match timeout {
None => crossbeam_channel::never(),
Some(t) => crossbeam_channel::at(Instant::now() + Duration::from_secs(t)),
};

// Process worker results until we have seen all the circuits, or we run
// out of time.
let mut timeout_flag = false;
loop {
select! {
recv(rx_result) -> msg => {
match msg {
Ok(hashed_circs) => {
jobs_completed += 1;
for (circ_hash, circ) in &hashed_circs {
circ_cnt += 1;
if circ_cnt % 1000 == 0 {
// TODO: Add a minimum time between logs
log_progress::<_,u64,usize>(log_config.progress_log.as_mut(), circ_cnt, None, &seen_hashes)
.expect("Failed to write to progress log");
}
if !seen_hashes.insert(*circ_hash) {
continue;
let send_result = tracing::trace_span!(target: "taso::metrics", "recv_result").in_scope(|| {
jobs_completed += 1;
for (circ_hash, circ) in &hashed_circs {
circ_cnt += 1;
logger.log_progress(circ_cnt, None, seen_hashes.len());
if seen_hashes.contains(circ_hash) {
continue;
}
seen_hashes.insert(*circ_hash);

let cost = (self.cost)(circ);

// Check if we got a new best circuit
if cost < best_circ_cost {
best_circ = circ.clone();
best_circ_cost = cost;
logger.log_best(best_circ_cost);
}
jobs_sent += 1;
}

let cost = (self.cost)(circ);

// Check if we got a new best circuit
if cost < best_circ_cost {
best_circ = circ.clone();
best_circ_cost = cost;
log_best(best_circ_cost, log_candidates.as_mut()).unwrap();
}
jobs_sent += 1;
}
// Fill the workqueue with data from pq
if tx_work.send(hashed_circs).is_err() {
// Fill the workqueue with data from pq
tx_work.send(hashed_circs)
});
if send_result.is_err() {
eprintln!("All our workers panicked. Stopping optimisation.");
break;
};
}

// If there is no more data to process, we are done.
//
Expand All @@ -338,25 +273,17 @@ where
}
}
recv(timeout_event) -> _ => {
println!("Timeout");
timeout_flag = true;
break;
}
}
}

log_processing_end(circ_cnt, true);
logger.log_processing_end(circ_cnt, best_circ_cost, true, timeout_flag);

// Drop the channel so the threads know to stop.
drop(tx_work);
let _ = joins; // joins.into_iter().for_each(|j| j.join().unwrap());

log_final(
&best_circ,
log_config.progress_log.as_mut(),
log_config.final_circ_json.as_mut(),
&self.cost,
)
.expect("Failed to write to progress log and/or final circuit JSON");
joins.into_iter().for_each(|j| j.join().unwrap());

best_circ
}
Expand All @@ -381,68 +308,3 @@ mod taso_default {
}
}
}

/// A helper struct for logging improvements in circuit size seen during the
/// TASO execution.
//
// TODO: Replace this fixed logging. Report back intermediate results.
#[derive(serde::Serialize, Clone, Debug)]
struct BestCircSer {
circ_len: usize,
time: String,
}

impl BestCircSer {
fn new(circ_len: usize) -> Self {
let time = chrono::Local::now().to_rfc3339();
Self { circ_len, time }
}
}

fn log_best<W: io::Write>(cbest: usize, wtr: Option<&mut csv::Writer<W>>) -> io::Result<()> {
let Some(wtr) = wtr else {
return Ok(());
};
println!("new best of size {}", cbest);
wtr.serialize(BestCircSer::new(cbest)).unwrap();
wtr.flush()
}

fn log_processing_end(circuit_count: usize, needs_joining: bool) {
println!("END");
println!("Tried {circuit_count} circuits");
if needs_joining {
println!("Joining");
}
}

fn log_progress<W: io::Write, P: Ord, C>(
wr: Option<&mut W>,
circ_cnt: usize,
pq: Option<&HugrPQ<P, C>>,
seen_hashes: &FxHashSet<u64>,
) -> io::Result<()> {
if let Some(wr) = wr {
writeln!(wr, "{circ_cnt} circuits...")?;
if let Some(pq) = pq {
writeln!(wr, "Queue size: {} circuits", pq.len())?;
}
writeln!(wr, "Total seen: {} circuits", seen_hashes.len())?;
}
Ok(())
}

fn log_final<W1: io::Write, W2: io::Write>(
best_circ: &Hugr,
log: Option<&mut W1>,
final_circ: Option<&mut W2>,
cost: impl Fn(&Hugr) -> usize,
) -> io::Result<()> {
if let Some(log) = log {
writeln!(log, "END RESULT: {}", cost(best_circ))?;
}
if let Some(circ_writer) = final_circ {
save_tk1_json_writer(best_circ, circ_writer).unwrap();
}
Ok(())
}
Loading

0 comments on commit 6378d33

Please sign in to comment.