Skip to content

Commit

Permalink
cargo
Browse files Browse the repository at this point in the history
  • Loading branch information
mcroomp committed Aug 31, 2023
2 parents 1b80dbc + 99438ad commit f33a818
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 9 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 20 additions & 7 deletions src/structs/lepton_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use log::{info, warn};
use std::cmp;
use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
use std::mem::swap;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::{channel, Sender};
use std::sync::mpsc::{Receiver, SendError};
use std::thread;
use std::thread::ScopedJoinHandle;
use std::time::Instant;
Expand Down Expand Up @@ -439,6 +439,9 @@ fn run_lepton_decoder_threads<R: Read + Seek, P: Send>(
}));
}

// track if we got an error while trying to send to a thread
let mut error_sending: Option<SendError<Message>> = None;

// now that the threads are waiting for inptut, read the stream and send all the buffers to their respective readers
while reader.stream_position().context(here!())? < last_data_position - 4 {
let thread_marker = reader.read_u8().context(here!())?;
Expand Down Expand Up @@ -483,9 +486,13 @@ fn run_lepton_decoder_threads<R: Read + Seek, P: Send>(
)
})?;

channel_to_sender[thread_id as usize]
.send(Message::WriteBlock(thread_id, buffer))
.context(here!())?;
let e =
channel_to_sender[thread_id as usize].send(Message::WriteBlock(thread_id, buffer));

if let Err(e) = e {
error_sending = Some(e);
break;
}
}
//info!("done sending!");

Expand All @@ -498,11 +505,17 @@ fn run_lepton_decoder_threads<R: Read + Seek, P: Send>(

let mut result = Vec::new();
for i in running_threads.drain(..) {
let thread_result = i.join().unwrap().context(here!())?;
let (result_to_process, source_metrics) = i.join().unwrap().context(here!())?;

metrics.merge_from(source_metrics);

metrics.merge_from(thread_result.1);
result.push(result_to_process);
}

result.push(thread_result.0);
// if there was an error during send, it should have resulted in an error from one of the threads above and
// we wouldn't get here, but as an extra precaution, we check here to make sure we didn't miss anything
if let Some(e) = error_sending {
return Err(e).context(here!());
}

info!(
Expand Down

0 comments on commit f33a818

Please sign in to comment.