From addf2de42d8406f96fee430c6f847d44f82e1b37 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 17 Dec 2023 21:38:25 +0000 Subject: [PATCH] Adjust the measurement of the writer progress based on the bytes written Versions of the parquet crate after 38 don't perform direct writes to the underlying writer, but instead create rowgroup writers, so the mechanism for understanding the number of bytes "written" --- src/cursor.rs | 25 ------------------------- src/lib.rs | 12 +++++++++--- src/writer.rs | 13 ++++++++++++- tests/helpers/mod.rs | 1 + 4 files changed, 22 insertions(+), 29 deletions(-) diff --git a/src/cursor.rs b/src/cursor.rs index 2b1ab40..2858e9d 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -116,37 +116,13 @@ pub(crate) struct InMemoryWriteableCursor { } impl InMemoryWriteableCursor { - /* - /// Consume this instance and return the underlying buffer as long as there are no other - /// references to this instance. - pub fn into_inner(self) -> Option> { - Arc::try_unwrap(self.buffer) - .ok() - .and_then(|mutex| mutex.into_inner().ok()) - .map(|cursor| cursor.into_inner()) - }*/ - /// Returns a clone of the underlying buffer pub fn data(&self) -> Vec { let inner = self.buffer.lock().unwrap(); inner.get_ref().to_vec() } - - /// Returns a length of the underlying buffer - pub fn len(&self) -> usize { - let inner = self.buffer.lock().unwrap(); - inner.get_ref().len() - } - - /* - /// Returns true if the underlying buffer contains no elements - pub fn is_empty(&self) -> bool { - let inner = self.buffer.lock().unwrap(); - inner.get_ref().is_empty() - }*/ } -#[allow(deprecated)] impl Write for InMemoryWriteableCursor { fn write(&mut self, buf: &[u8]) -> std::io::Result { let mut inner = self.buffer.lock().unwrap(); @@ -159,7 +135,6 @@ impl Write for InMemoryWriteableCursor { } } -#[allow(deprecated)] impl Seek for InMemoryWriteableCursor { fn seek(&mut self, pos: SeekFrom) -> std::io::Result { let mut inner = self.buffer.lock().unwrap(); diff --git a/src/lib.rs b/src/lib.rs index d8f2729..b165125 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -382,6 +382,7 @@ pub async fn start_ingest( // The run loop loop { + debug!("running the runloop"); // Consume the next message from the stream. // Timeout if the next message is not received before the next flush interval. let duration = ingest_processor.consume_timeout_duration(); @@ -417,6 +418,7 @@ pub async fn start_ingest( // Startup can take significant time, // so re-initialize the latency timer after consuming the first message. if consumed == 0 { + debug!("Latency timer reset "); ingest_processor.latency_timer = Instant::now(); } @@ -442,7 +444,7 @@ pub async fn start_ingest( } } Err(_) => { - log::debug!("Latency timer expired."); + log::error!("Latency timer expired."); // Set the latency timer expired flag to indicate that // that the latency timer should be reset after flush checks. latency_timer_expired = true; @@ -506,6 +508,7 @@ pub async fn start_ingest( // Reset it to now so we don't run flush checks again // until the next appropriate interval. if latency_timer_expired { + debug!("latency timer expired, resetting"); ingest_processor.latency_timer = Instant::now(); } @@ -1107,10 +1110,13 @@ impl IngestProcessor { /// Returns a boolean indicating whether a record batch should be written based on current state. fn should_complete_record_batch(&self) -> bool { let elapsed_millis = self.latency_timer.elapsed().as_millis(); + debug!("latency_timer {:?}", self.latency_timer); + debug!("elapsed_millis: {:?}", elapsed_millis); + debug!("Value buffers has {} items", self.value_buffers.len()); let should = self.value_buffers.len() > 0 - && (self.value_buffers.len() == self.opts.max_messages_per_batch - || elapsed_millis >= (self.opts.allowed_latency * 1000) as u128); + && (self.value_buffers.len() == self.opts.max_messages_per_batch) + || (elapsed_millis >= (self.opts.allowed_latency * 1000) as u128); debug!( "Should complete record batch - latency test: {} >= {}", diff --git a/src/writer.rs b/src/writer.rs index ce02642..a2ad0bf 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -430,7 +430,18 @@ impl DataWriter { /// Returns the current byte length of the in memory buffer. /// This may be used by the caller to decide when to finalize the file write. pub fn buffer_len(&self) -> usize { - self.arrow_writers.values().map(|w| w.cursor.len()).sum() + self.arrow_writers + .values() + .map(|w| { + let l: i64 = w + .arrow_writer + .flushed_row_groups() + .iter() + .map(|rg| rg.total_byte_size()) + .sum(); + l as usize + }) + .sum() } /// Writes the existing parquet bytes to storage and resets internal state to handle another file. diff --git a/tests/helpers/mod.rs b/tests/helpers/mod.rs index 9cf941d..335bc42 100644 --- a/tests/helpers/mod.rs +++ b/tests/helpers/mod.rs @@ -352,6 +352,7 @@ pub fn wait_until_file_created(path: &FilePath) { let now = Local::now(); let poll_time = now - start_time; + std::thread::sleep(Duration::from_secs(1)); if poll_time > chrono::Duration::seconds(180) { panic!("File was not created before timeout");