diff --git a/src/cursor.rs b/src/cursor.rs index 2b1ab40..2858e9d 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -116,37 +116,13 @@ pub(crate) struct InMemoryWriteableCursor { } impl InMemoryWriteableCursor { - /* - /// Consume this instance and return the underlying buffer as long as there are no other - /// references to this instance. - pub fn into_inner(self) -> Option> { - Arc::try_unwrap(self.buffer) - .ok() - .and_then(|mutex| mutex.into_inner().ok()) - .map(|cursor| cursor.into_inner()) - }*/ - /// Returns a clone of the underlying buffer pub fn data(&self) -> Vec { let inner = self.buffer.lock().unwrap(); inner.get_ref().to_vec() } - - /// Returns a length of the underlying buffer - pub fn len(&self) -> usize { - let inner = self.buffer.lock().unwrap(); - inner.get_ref().len() - } - - /* - /// Returns true if the underlying buffer contains no elements - pub fn is_empty(&self) -> bool { - let inner = self.buffer.lock().unwrap(); - inner.get_ref().is_empty() - }*/ } -#[allow(deprecated)] impl Write for InMemoryWriteableCursor { fn write(&mut self, buf: &[u8]) -> std::io::Result { let mut inner = self.buffer.lock().unwrap(); @@ -159,7 +135,6 @@ impl Write for InMemoryWriteableCursor { } } -#[allow(deprecated)] impl Seek for InMemoryWriteableCursor { fn seek(&mut self, pos: SeekFrom) -> std::io::Result { let mut inner = self.buffer.lock().unwrap(); diff --git a/src/lib.rs b/src/lib.rs index d8f2729..b165125 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -382,6 +382,7 @@ pub async fn start_ingest( // The run loop loop { + debug!("running the runloop"); // Consume the next message from the stream. // Timeout if the next message is not received before the next flush interval. let duration = ingest_processor.consume_timeout_duration(); @@ -417,6 +418,7 @@ pub async fn start_ingest( // Startup can take significant time, // so re-initialize the latency timer after consuming the first message. if consumed == 0 { + debug!("Latency timer reset "); ingest_processor.latency_timer = Instant::now(); } @@ -442,7 +444,7 @@ pub async fn start_ingest( } } Err(_) => { - log::debug!("Latency timer expired."); + log::error!("Latency timer expired."); // Set the latency timer expired flag to indicate that // that the latency timer should be reset after flush checks. latency_timer_expired = true; @@ -506,6 +508,7 @@ pub async fn start_ingest( // Reset it to now so we don't run flush checks again // until the next appropriate interval. if latency_timer_expired { + debug!("latency timer expired, resetting"); ingest_processor.latency_timer = Instant::now(); } @@ -1107,10 +1110,13 @@ impl IngestProcessor { /// Returns a boolean indicating whether a record batch should be written based on current state. fn should_complete_record_batch(&self) -> bool { let elapsed_millis = self.latency_timer.elapsed().as_millis(); + debug!("latency_timer {:?}", self.latency_timer); + debug!("elapsed_millis: {:?}", elapsed_millis); + debug!("Value buffers has {} items", self.value_buffers.len()); let should = self.value_buffers.len() > 0 - && (self.value_buffers.len() == self.opts.max_messages_per_batch - || elapsed_millis >= (self.opts.allowed_latency * 1000) as u128); + && (self.value_buffers.len() == self.opts.max_messages_per_batch) + || (elapsed_millis >= (self.opts.allowed_latency * 1000) as u128); debug!( "Should complete record batch - latency test: {} >= {}", diff --git a/src/writer.rs b/src/writer.rs index ce02642..a2ad0bf 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -430,7 +430,18 @@ impl DataWriter { /// Returns the current byte length of the in memory buffer. /// This may be used by the caller to decide when to finalize the file write. pub fn buffer_len(&self) -> usize { - self.arrow_writers.values().map(|w| w.cursor.len()).sum() + self.arrow_writers + .values() + .map(|w| { + let l: i64 = w + .arrow_writer + .flushed_row_groups() + .iter() + .map(|rg| rg.total_byte_size()) + .sum(); + l as usize + }) + .sum() } /// Writes the existing parquet bytes to storage and resets internal state to handle another file. diff --git a/tests/helpers/mod.rs b/tests/helpers/mod.rs index 9cf941d..335bc42 100644 --- a/tests/helpers/mod.rs +++ b/tests/helpers/mod.rs @@ -352,6 +352,7 @@ pub fn wait_until_file_created(path: &FilePath) { let now = Local::now(); let poll_time = now - start_time; + std::thread::sleep(Duration::from_secs(1)); if poll_time > chrono::Duration::seconds(180) { panic!("File was not created before timeout");