Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Tracing instrumentation and structured logging #115

Merged
merged 21 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
fdf95cb
re-enable multithreaded taso
aborgna-q Sep 15, 2023
edeaa6c
Move taso fns to TOptimiser methods
aborgna-q Sep 15, 2023
c01d02e
Some crossbeam (missing joins)
aborgna-q Sep 15, 2023
63c65e9
Don't spin-lock when receiving results
aborgna-q Sep 15, 2023
cf9c9e2
Nicer error handling for missing ECC json
aborgna-q Sep 15, 2023
bf26504
Avoid spinlocks, but don't wait unnecessarily
aborgna-q Sep 15, 2023
44b9a71
Simplify the recv loop
aborgna-q Sep 15, 2023
9e03964
Default to single threaded for the moment.
aborgna-q Sep 15, 2023
4ec01c1
Give the workers their own mini-priority queue
aborgna-q Sep 15, 2023
4298898
drop the centralized queue
aborgna-q Sep 15, 2023
d701735
Use a priority channel, without worker queues
aborgna-q Sep 16, 2023
714b654
Better logging and tracing metrics
aborgna-q Sep 17, 2023
2016513
Traces and non-blocking writers
aborgna-q Sep 17, 2023
123989f
Drop the tracefile for now
aborgna-q Sep 17, 2023
5afe1f7
Merge remote-tracking branch 'origin/main' into feat/parallel-taso
aborgna-q Sep 20, 2023
b660ace
scale priority queue capacity by the number of threads
aborgna-q Sep 20, 2023
9621141
Merge branch 'feat/parallel-taso' into feat/taso-tracing
aborgna-q Sep 20, 2023
f7b45c1
Merge remote-tracking branch 'origin/main' into feat/taso-tracing
aborgna-q Sep 21, 2023
8028140
Use helper `trace_span`
aborgna-q Sep 21, 2023
0bd3a72
Remove duplicated best circuit output
aborgna-q Sep 22, 2023
d28d3d9
Merge remote-tracking branch 'origin/main' into feat/tracing-logs
aborgna-q Sep 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ chrono = { version ="0.4.30" }
bytemuck = "1.14.0"
stringreader = "0.1.1"
crossbeam-channel = "0.5.8"
tracing = { workspace = true }

[features]
pyo3 = [
Expand Down Expand Up @@ -79,3 +80,4 @@ itertools = { version = "0.11.0" }
tket-json-rs = { git = "https://github.com/CQCL/tket-json-rs", rev = "619db15d3", features = [
"tket2ops",
] }
tracing = "0.1.37"
238 changes: 50 additions & 188 deletions src/optimiser/taso.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,28 @@
mod eq_circ_class;
mod hugr_pchannel;
mod hugr_pqueue;
pub mod log;
mod qtz_circuit;
mod worker;

use crossbeam_channel::select;
pub use eq_circ_class::{load_eccs_json_file, EqCircClass};

use std::io;
use std::num::NonZeroUsize;
use std::time::{Duration, Instant};
use std::{fs, io};

use fxhash::FxHashSet;
use hugr::Hugr;

use crate::circuit::CircuitHash;
use crate::json::save_tk1_json_writer;
use crate::optimiser::taso::hugr_pchannel::HugrPriorityChannel;
use crate::optimiser::taso::hugr_pqueue::{Entry, HugrPQ};
use crate::optimiser::taso::worker::TasoWorker;
use crate::rewrite::strategy::RewriteStrategy;
use crate::rewrite::Rewriter;
use hugr_pqueue::{Entry, HugrPQ};

use self::hugr_pchannel::HugrPriorityChannel;

/// Logging configuration for the TASO optimiser.
#[derive(Default)]
pub struct LogConfig<'w> {
final_circ_json: Option<Box<dyn io::Write + 'w>>,
circ_candidates_csv: Option<Box<dyn io::Write + 'w>>,
progress_log: Option<Box<dyn io::Write + 'w>>,
}

impl<'w> LogConfig<'w> {
/// Create a new logging configuration.
///
/// Three writer objects must be provided:
/// - best_circ_json: for the final optimised circuit, in TK1 JSON format,
/// - circ_candidates_csv: for a log of the successive best candidate circuits,
/// - progress_log: for a log of the progress of the optimisation.
pub fn new(
best_circ_json: impl io::Write + 'w,
circ_candidates_csv: impl io::Write + 'w,
progress_log: impl io::Write + 'w,
) -> Self {
Self {
final_circ_json: Some(Box::new(best_circ_json)),
circ_candidates_csv: Some(Box::new(circ_candidates_csv)),
progress_log: Some(Box::new(progress_log)),
}
}
}
use self::log::TasoLogger;

/// The TASO optimiser.
///
Expand Down Expand Up @@ -116,7 +90,7 @@ where
pub fn optimise_with_log(
&self,
circ: &Hugr,
log_config: LogConfig,
log_config: TasoLogger,
timeout: Option<u64>,
n_threads: NonZeroUsize,
) -> Hugr {
Expand All @@ -126,37 +100,13 @@ where
}
}

/// Run the TASO optimiser on a circuit with default logging.
///
/// The following files will be created:
/// - `final_circ.json`: the final optimised circuit, in TK1 JSON format,
/// - `best_circs.csv`: a log of the successive best candidate circuits,
/// - `taso-optimisation.log`: a log of the progress of the optimisation.
///
/// If the creation of any of these files fails, an error is returned.
///
/// A timeout (in seconds) can be provided.
pub fn optimise_with_default_log(
&self,
circ: &Hugr,
timeout: Option<u64>,
n_threads: NonZeroUsize,
) -> io::Result<Hugr> {
let final_circ_json = fs::File::create("final_circ.json")?;
let circ_candidates_csv = fs::File::create("best_circs.csv")?;
let progress_log = fs::File::create("taso-optimisation.log")?;
let log_config = LogConfig::new(final_circ_json, circ_candidates_csv, progress_log);
Ok(self.optimise_with_log(circ, log_config, timeout, n_threads))
}

fn taso(&self, circ: &Hugr, mut log_config: LogConfig, timeout: Option<u64>) -> Hugr {
#[tracing::instrument(target = "taso::metrics", skip(self, circ, logger))]
fn taso(&self, circ: &Hugr, mut logger: TasoLogger, timeout: Option<u64>) -> Hugr {
let start_time = Instant::now();

let mut log_candidates = log_config.circ_candidates_csv.map(csv::Writer::from_writer);

let mut best_circ = circ.clone();
let mut best_circ_cost = (self.cost)(circ);
log_best(best_circ_cost, log_candidates.as_mut()).unwrap();
logger.log_best(best_circ_cost);

// Hash of seen circuits. Dot not store circuits as this map gets huge
let mut seen_hashes: FxHashSet<_> = FromIterator::from_iter([(circ.circuit_hash())]);
Expand All @@ -167,26 +117,19 @@ where
pq.push(circ.clone());

let mut circ_cnt = 1;
let mut timeout_flag = false;
while let Some(Entry { circ, cost, .. }) = pq.pop() {
if cost < best_circ_cost {
best_circ = circ.clone();
best_circ_cost = cost;
log_best(best_circ_cost, log_candidates.as_mut()).unwrap();
logger.log_best(best_circ_cost);
}

let rewrites = self.rewriter.get_rewrites(&circ);
for new_circ in self.strategy.apply_rewrites(rewrites, &circ) {
let new_circ_hash = new_circ.circuit_hash();
circ_cnt += 1;
if circ_cnt % 1000 == 0 {
log_progress(
log_config.progress_log.as_mut(),
circ_cnt,
Some(&pq),
&seen_hashes,
)
.expect("Failed to write to progress log");
}
logger.log_progress(circ_cnt, Some(pq.len()), seen_hashes.len());
if seen_hashes.contains(&new_circ_hash) {
continue;
}
Expand All @@ -201,41 +144,31 @@ where

if let Some(timeout) = timeout {
if start_time.elapsed().as_secs() > timeout {
println!("Timeout");
timeout_flag = true;
break;
}
}
}

log_processing_end(circ_cnt, false);

log_final(
&best_circ,
log_config.progress_log.as_mut(),
log_config.final_circ_json.as_mut(),
&self.cost,
)
.expect("Failed to write to progress log and/or final circuit JSON");

logger.log_processing_end(circ_cnt, best_circ_cost, false, timeout_flag);
best_circ
}

/// Run the TASO optimiser on a circuit, using multiple threads.
///
/// This is the multi-threaded version of [`taso`]. See [`TasoOptimiser`] for
/// more details.
#[tracing::instrument(target = "taso::metrics", skip(self, circ, logger))]
fn taso_multithreaded(
&self,
circ: &Hugr,
mut log_config: LogConfig,
mut logger: TasoLogger,
timeout: Option<u64>,
n_threads: NonZeroUsize,
) -> Hugr {
let n_threads: usize = n_threads.get();
const PRIORITY_QUEUE_CAPACITY: usize = 10_000;

let mut log_candidates = log_config.circ_candidates_csv.map(csv::Writer::from_writer);

// multi-consumer priority channel for queuing circuits to be processed by the workers
let (tx_work, rx_work) =
HugrPriorityChannel::init((self.cost).clone(), PRIORITY_QUEUE_CAPACITY * n_threads);
Expand All @@ -245,20 +178,21 @@ where
let initial_circ_hash = circ.circuit_hash();
let mut best_circ = circ.clone();
let mut best_circ_cost = (self.cost)(&best_circ);
log_best(best_circ_cost, log_candidates.as_mut()).unwrap();
logger.log_best(best_circ_cost);

// Hash of seen circuits. Dot not store circuits as this map gets huge
let mut seen_hashes: FxHashSet<_> = FromIterator::from_iter([(initial_circ_hash)]);

// Each worker waits for circuits to scan for rewrites using all the
// patterns and sends the results back to main.
let joins: Vec<_> = (0..n_threads)
.map(|_| {
worker::spawn_pattern_matching_thread(
.map(|i| {
TasoWorker::spawn(
rx_work.clone(),
tx_result.clone(),
self.rewriter.clone(),
self.strategy.clone(),
Some(format!("taso-worker-{i}")),
)
})
.collect();
Expand All @@ -283,46 +217,47 @@ where
let mut jobs_completed = 0usize;
// TODO: Report dropped jobs in the queue, so we can check for termination.

// Deadline for the optimization timeout
// Deadline for the optimisation timeout
let timeout_event = match timeout {
None => crossbeam_channel::never(),
Some(t) => crossbeam_channel::at(Instant::now() + Duration::from_secs(t)),
};

// Process worker results until we have seen all the circuits, or we run
// out of time.
let mut timeout_flag = false;
loop {
select! {
recv(rx_result) -> msg => {
match msg {
Ok(hashed_circs) => {
jobs_completed += 1;
for (circ_hash, circ) in &hashed_circs {
circ_cnt += 1;
if circ_cnt % 1000 == 0 {
// TODO: Add a minimum time between logs
log_progress::<_,u64,usize>(log_config.progress_log.as_mut(), circ_cnt, None, &seen_hashes)
.expect("Failed to write to progress log");
}
if !seen_hashes.insert(*circ_hash) {
continue;
let send_result = tracing::trace_span!(target: "taso::metrics", "recv_result").in_scope(|| {
jobs_completed += 1;
for (circ_hash, circ) in &hashed_circs {
circ_cnt += 1;
logger.log_progress(circ_cnt, None, seen_hashes.len());
if seen_hashes.contains(circ_hash) {
continue;
}
seen_hashes.insert(*circ_hash);

let cost = (self.cost)(circ);

// Check if we got a new best circuit
if cost < best_circ_cost {
best_circ = circ.clone();
best_circ_cost = cost;
logger.log_best(best_circ_cost);
}
jobs_sent += 1;
}

let cost = (self.cost)(circ);

// Check if we got a new best circuit
if cost < best_circ_cost {
best_circ = circ.clone();
best_circ_cost = cost;
log_best(best_circ_cost, log_candidates.as_mut()).unwrap();
}
jobs_sent += 1;
}
// Fill the workqueue with data from pq
if tx_work.send(hashed_circs).is_err() {
// Fill the workqueue with data from pq
tx_work.send(hashed_circs)
});
if send_result.is_err() {
eprintln!("All our workers panicked. Stopping optimisation.");
break;
};
}

// If there is no more data to process, we are done.
//
Expand All @@ -338,25 +273,17 @@ where
}
}
recv(timeout_event) -> _ => {
println!("Timeout");
timeout_flag = true;
break;
}
}
}

log_processing_end(circ_cnt, true);
logger.log_processing_end(circ_cnt, best_circ_cost, true, timeout_flag);

// Drop the channel so the threads know to stop.
drop(tx_work);
let _ = joins; // joins.into_iter().for_each(|j| j.join().unwrap());

log_final(
&best_circ,
log_config.progress_log.as_mut(),
log_config.final_circ_json.as_mut(),
&self.cost,
)
.expect("Failed to write to progress log and/or final circuit JSON");
joins.into_iter().for_each(|j| j.join().unwrap());

best_circ
}
Expand All @@ -381,68 +308,3 @@ mod taso_default {
}
}
}

/// A helper struct for logging improvements in circuit size seen during the
/// TASO execution.
//
// TODO: Replace this fixed logging. Report back intermediate results.
#[derive(serde::Serialize, Clone, Debug)]
struct BestCircSer {
circ_len: usize,
time: String,
}

impl BestCircSer {
fn new(circ_len: usize) -> Self {
let time = chrono::Local::now().to_rfc3339();
Self { circ_len, time }
}
}

fn log_best<W: io::Write>(cbest: usize, wtr: Option<&mut csv::Writer<W>>) -> io::Result<()> {
let Some(wtr) = wtr else {
return Ok(());
};
println!("new best of size {}", cbest);
wtr.serialize(BestCircSer::new(cbest)).unwrap();
wtr.flush()
}

fn log_processing_end(circuit_count: usize, needs_joining: bool) {
println!("END");
println!("Tried {circuit_count} circuits");
if needs_joining {
println!("Joining");
}
}

fn log_progress<W: io::Write, P: Ord, C>(
wr: Option<&mut W>,
circ_cnt: usize,
pq: Option<&HugrPQ<P, C>>,
seen_hashes: &FxHashSet<u64>,
) -> io::Result<()> {
if let Some(wr) = wr {
writeln!(wr, "{circ_cnt} circuits...")?;
if let Some(pq) = pq {
writeln!(wr, "Queue size: {} circuits", pq.len())?;
}
writeln!(wr, "Total seen: {} circuits", seen_hashes.len())?;
}
Ok(())
}

fn log_final<W1: io::Write, W2: io::Write>(
best_circ: &Hugr,
log: Option<&mut W1>,
final_circ: Option<&mut W2>,
cost: impl Fn(&Hugr) -> usize,
) -> io::Result<()> {
if let Some(log) = log {
writeln!(log, "END RESULT: {}", cost(best_circ))?;
}
if let Some(circ_writer) = final_circ {
save_tk1_json_writer(best_circ, circ_writer).unwrap();
}
Ok(())
}
Loading