Skip to content

Commit

Permalink
Adjust the measurement of the writer progress based on the bytes written
Browse files Browse the repository at this point in the history
Versions of the parquet crate after 38 don't perform direct writes to
the underlying writer, but instead create rowgroup writers, so the
mechanism for understanding the number of bytes "written"
  • Loading branch information
rtyler committed Dec 17, 2023
1 parent fc90494 commit 91172ac
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 29 deletions.
25 changes: 0 additions & 25 deletions src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,37 +116,13 @@ pub(crate) struct InMemoryWriteableCursor {
}

impl InMemoryWriteableCursor {
/*
/// Consume this instance and return the underlying buffer as long as there are no other
/// references to this instance.
pub fn into_inner(self) -> Option<Vec<u8>> {
Arc::try_unwrap(self.buffer)
.ok()
.and_then(|mutex| mutex.into_inner().ok())
.map(|cursor| cursor.into_inner())
}*/

/// Returns a clone of the underlying buffer
pub fn data(&self) -> Vec<u8> {
let inner = self.buffer.lock().unwrap();
inner.get_ref().to_vec()
}

/// Returns a length of the underlying buffer
pub fn len(&self) -> usize {
let inner = self.buffer.lock().unwrap();
inner.get_ref().len()
}

/*
/// Returns true if the underlying buffer contains no elements
pub fn is_empty(&self) -> bool {
let inner = self.buffer.lock().unwrap();
inner.get_ref().is_empty()
}*/
}

#[allow(deprecated)]
impl Write for InMemoryWriteableCursor {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut inner = self.buffer.lock().unwrap();
Expand All @@ -159,7 +135,6 @@ impl Write for InMemoryWriteableCursor {
}
}

#[allow(deprecated)]
impl Seek for InMemoryWriteableCursor {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
let mut inner = self.buffer.lock().unwrap();
Expand Down
12 changes: 9 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ pub async fn start_ingest(

// The run loop
loop {
debug!("running the runloop");
// Consume the next message from the stream.
// Timeout if the next message is not received before the next flush interval.
let duration = ingest_processor.consume_timeout_duration();
Expand Down Expand Up @@ -417,6 +418,7 @@ pub async fn start_ingest(
// Startup can take significant time,
// so re-initialize the latency timer after consuming the first message.
if consumed == 0 {
debug!("Latency timer reset ");
ingest_processor.latency_timer = Instant::now();
}

Expand All @@ -442,7 +444,7 @@ pub async fn start_ingest(
}
}
Err(_) => {
log::debug!("Latency timer expired.");
log::error!("Latency timer expired.");
// Set the latency timer expired flag to indicate that
// that the latency timer should be reset after flush checks.
latency_timer_expired = true;
Expand Down Expand Up @@ -506,6 +508,7 @@ pub async fn start_ingest(
// Reset it to now so we don't run flush checks again
// until the next appropriate interval.
if latency_timer_expired {
debug!("latency timer expired, resetting");
ingest_processor.latency_timer = Instant::now();
}

Expand Down Expand Up @@ -1107,10 +1110,13 @@ impl IngestProcessor {
/// Returns a boolean indicating whether a record batch should be written based on current state.
fn should_complete_record_batch(&self) -> bool {
let elapsed_millis = self.latency_timer.elapsed().as_millis();
debug!("latency_timer {:?}", self.latency_timer);
debug!("elapsed_millis: {:?}", elapsed_millis);
debug!("Value buffers has {} items", self.value_buffers.len());

let should = self.value_buffers.len() > 0
&& (self.value_buffers.len() == self.opts.max_messages_per_batch
|| elapsed_millis >= (self.opts.allowed_latency * 1000) as u128);
&& (self.value_buffers.len() == self.opts.max_messages_per_batch)
|| (elapsed_millis >= (self.opts.allowed_latency * 1000) as u128);

debug!(
"Should complete record batch - latency test: {} >= {}",
Expand Down
13 changes: 12 additions & 1 deletion src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,18 @@ impl DataWriter {
/// Returns the current byte length of the in memory buffer.
/// This may be used by the caller to decide when to finalize the file write.
pub fn buffer_len(&self) -> usize {
self.arrow_writers.values().map(|w| w.cursor.len()).sum()
self.arrow_writers
.values()
.map(|w| {
let l: i64 = w
.arrow_writer
.flushed_row_groups()
.iter()
.map(|rg| rg.total_byte_size())
.sum();
l as usize
})
.sum()
}

/// Writes the existing parquet bytes to storage and resets internal state to handle another file.
Expand Down
1 change: 1 addition & 0 deletions tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ pub fn wait_until_file_created(path: &FilePath) {

let now = Local::now();
let poll_time = now - start_time;
std::thread::sleep(Duration::from_secs(1));

if poll_time > chrono::Duration::seconds(180) {
panic!("File was not created before timeout");
Expand Down

0 comments on commit 91172ac

Please sign in to comment.