Skip to content

Commit

Permalink
Batch send to priority queue
Browse files Browse the repository at this point in the history
  • Loading branch information
lmondada committed Sep 21, 2023
1 parent b660ace commit 32a8f53
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 17 deletions.
22 changes: 11 additions & 11 deletions src/optimiser/taso.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ where
drop(tx_result);

// Queue the initial circuit
tx_work.send((initial_circ_hash, circ.clone())).unwrap();
tx_work
.send(vec![(initial_circ_hash, circ.clone())])
.unwrap();

// A counter of circuits seen.
let mut circ_cnt = 1;
Expand All @@ -295,34 +297,32 @@ where
match msg {
Ok(hashed_circs) => {
jobs_completed += 1;
for (circ_hash, circ) in hashed_circs {
for (circ_hash, circ) in &hashed_circs {
circ_cnt += 1;
if circ_cnt % 1000 == 0 {
// TODO: Add a minimum time between logs
log_progress::<_,u64,usize>(log_config.progress_log.as_mut(), circ_cnt, None, &seen_hashes)
.expect("Failed to write to progress log");
}
if seen_hashes.contains(&circ_hash) {
if !seen_hashes.insert(*circ_hash) {
continue;
}
seen_hashes.insert(circ_hash);

let cost = (self.cost)(&circ);
let cost = (self.cost)(circ);

// Check if we got a new best circuit
if cost < best_circ_cost {
best_circ = circ.clone();
best_circ_cost = cost;
log_best(best_circ_cost, log_candidates.as_mut()).unwrap();
}

// Fill the workqueue with data from pq
if tx_work.send((circ_hash, circ)).is_err() {
eprintln!("All our workers panicked. Stopping optimisation.");
break;
};
jobs_sent += 1;
}
// Fill the workqueue with data from pq
if tx_work.send(hashed_circs).is_err() {
eprintln!("All our workers panicked. Stopping optimisation.");
break;
};

// If there is no more data to process, we are done.
//
Expand Down
16 changes: 10 additions & 6 deletions src/optimiser/taso/hugr_pchannel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ where
///
/// Get back a channel on which to queue hugrs with their hash, and
/// a channel on which to receive the output.
pub fn init(cost_fn: C, queue_capacity: usize) -> (Sender<Item>, Receiver<Item>) {
pub fn init(cost_fn: C, queue_capacity: usize) -> (Sender<Vec<Item>>, Receiver<Item>) {
let (ins, inr) = crossbeam_channel::unbounded();
let (outs, outr) = crossbeam_channel::bounded(0);
Self::run(inr, outs, cost_fn, queue_capacity);
Expand All @@ -36,7 +36,7 @@ where

/// Run the queuer as a thread.
fn run(
in_channel_orig: Receiver<(u64, Hugr)>,
in_channel_orig: Receiver<Vec<(u64, Hugr)>>,
out_channel_orig: Sender<(u64, Hugr)>,
cost_fn: C,
queue_capacity: usize,
Expand All @@ -54,8 +54,10 @@ where
if pq.is_empty() {
// Nothing queued to go out. Wait for input.
match in_channel.recv() {
Ok((hash, circ)) => {
pq.push_with_hash_unchecked(circ, hash);
Ok(new_circs) => {
for (hash, circ) in new_circs {
pq.push_with_hash_unchecked(circ, hash);
}
}
// The sender has closed the channel, we can stop.
Err(_) => break,
Expand All @@ -64,8 +66,10 @@ where
select! {
recv(in_channel) -> result => {
match result {
Ok((hash, circ)) => {
pq.push_with_hash_unchecked(circ, hash);
Ok(new_circs) => {
for (hash, circ) in new_circs {
pq.push_with_hash_unchecked(circ, hash);
}
}
// The sender has closed the channel, we can stop.
Err(_) => break,
Expand Down

0 comments on commit 32a8f53

Please sign in to comment.