Skip to content

Commit

Permalink
Handle broken pipes error (#581)
Browse files Browse the repository at this point in the history
* Handle broken pipes error

* Update changelog
  • Loading branch information
rmn-boiko authored Apr 9, 2024
1 parent af973c9 commit 3beb0f7
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 20 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
### Fixed
- Handle broken pipe panic

## [0.172.1] - 2024-04-08
### Fixed
- Add precondition flow checks
Expand Down
7 changes: 7 additions & 0 deletions src/app/cli/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::path::PathBuf;

use kamu::domain::engine::normalize_logs;
use kamu::domain::*;
use kamu_data_utils::data::format::WriterError;
use opendatafabric::DatasetRefPattern;
use thiserror::Error;

Expand Down Expand Up @@ -148,6 +149,12 @@ impl From<MultiTenantRefUnexpectedError> for CLIError {
}
}

impl From<WriterError> for CLIError {
fn from(e: WriterError) -> Self {
Self::failure(e)
}
}

/////////////////////////////////////////////////////////////////////////////////////////
// TODO: Replace with traits that distinguish critical and non-critical errors
/////////////////////////////////////////////////////////////////////////////////////////
Expand Down
9 changes: 6 additions & 3 deletions src/app/cli/src/output/records_writers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::display::array_value_to_string;
use kamu_data_utils::data::format::WriterError;
pub use kamu_data_utils::data::format::{
CsvWriter,
CsvWriterBuilder,
Expand Down Expand Up @@ -324,7 +325,7 @@ impl<W> RecordsWriter for TableWriter<W>
where
W: std::io::Write,
{
fn write_batch(&mut self, records: &RecordBatch) -> Result<(), std::io::Error> {
fn write_batch(&mut self, records: &RecordBatch) -> Result<(), WriterError> {
if !self.header_written {
let mut header = Vec::new();
for field in records.schema().fields() {
Expand All @@ -351,7 +352,7 @@ where
Ok(())
}

fn finish(&mut self) -> Result<(), std::io::Error> {
fn finish(&mut self) -> Result<(), WriterError> {
// BUG: Header doesn't render when there are no data rows in the table,
// so we add an empty row
if self.rows_written == 0 {
Expand All @@ -361,7 +362,9 @@ where
}
}

self.table.print(&mut self.out)?;
self.table
.print(&mut self.out)
.map_err(WriterError::IoError)?;
Ok(())
}
}
68 changes: 51 additions & 17 deletions src/utils/data-utils/src/data/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,47 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::io::Write;
use std::fmt::{self, Display};
use std::io::{ErrorKind, Write};

use arrow::error::ArrowError;
pub use datafusion::arrow::csv::{Writer as CsvWriter, WriterBuilder as CsvWriterBuilder};
pub use datafusion::arrow::json::{
ArrayWriter as JsonArrayWriter,
LineDelimitedWriter as JsonLineDelimitedWriter,
};
use datafusion::arrow::record_batch::RecordBatch;
use thiserror::Error;

/////////////////////////////////////////////////////////////////////////////////////////

type Error = std::io::Error;

pub trait RecordsWriter {
fn write_batch(&mut self, records: &RecordBatch) -> Result<(), Error>;
fn write_batch(&mut self, records: &RecordBatch) -> Result<(), WriterError>;

fn write_batches(&mut self, record_batches: &[RecordBatch]) -> Result<(), Error> {
fn write_batches(&mut self, record_batches: &[RecordBatch]) -> Result<(), WriterError> {
for records in record_batches {
self.write_batch(records)?;
}
Ok(())
}

fn finish(&mut self) -> Result<(), Error> {
fn finish(&mut self) -> Result<(), WriterError> {
Ok(())
}

fn handle_writer_result(
&self,
writer_result: Result<(), ArrowError>,
) -> Result<(), WriterError> {
if let Err(err) = writer_result {
match err {
ArrowError::IoError(_, io_err) => match io_err.kind() {
ErrorKind::BrokenPipe => (),
_ => return Err(WriterError::IoError(io_err)),
},
err => return Err(WriterError::ArrowError(err)),
};
}
Ok(())
}
}
Expand All @@ -40,9 +57,9 @@ pub trait RecordsWriter {
/////////////////////////////////////////////////////////////////////////////////////////

impl<W: Write> RecordsWriter for CsvWriter<W> {
fn write_batch(&mut self, records: &RecordBatch) -> Result<(), Error> {
CsvWriter::write(self, records).unwrap();
Ok(())
fn write_batch(&mut self, records: &RecordBatch) -> Result<(), WriterError> {
let writer_result = CsvWriter::write(self, records);
self.handle_writer_result(writer_result)
}
}

Expand All @@ -51,12 +68,12 @@ impl<W: Write> RecordsWriter for CsvWriter<W> {
/////////////////////////////////////////////////////////////////////////////////////////

impl<W: Write> RecordsWriter for JsonArrayWriter<W> {
fn write_batch(&mut self, records: &RecordBatch) -> Result<(), Error> {
JsonArrayWriter::write(self, records).unwrap();
Ok(())
fn write_batch(&mut self, records: &RecordBatch) -> Result<(), WriterError> {
let writer_result = JsonArrayWriter::write(self, records);
self.handle_writer_result(writer_result)
}

fn finish(&mut self) -> Result<(), Error> {
fn finish(&mut self) -> Result<(), WriterError> {
JsonArrayWriter::finish(self).unwrap();
Ok(())
}
Expand All @@ -67,13 +84,30 @@ impl<W: Write> RecordsWriter for JsonArrayWriter<W> {
/////////////////////////////////////////////////////////////////////////////////////////

impl<W: Write> RecordsWriter for JsonLineDelimitedWriter<W> {
fn write_batch(&mut self, records: &RecordBatch) -> Result<(), Error> {
JsonLineDelimitedWriter::write(self, records).unwrap();
Ok(())
fn write_batch(&mut self, records: &RecordBatch) -> Result<(), WriterError> {
let writer_result = JsonLineDelimitedWriter::write(self, records);
self.handle_writer_result(writer_result)
}

fn finish(&mut self) -> Result<(), Error> {
fn finish(&mut self) -> Result<(), WriterError> {
JsonLineDelimitedWriter::finish(self).unwrap();
Ok(())
}
}

#[derive(Debug, Error)]
pub enum WriterError {
IoError(std::io::Error),
ArrowError(ArrowError),
}

impl Display for WriterError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::IoError(io_err) => write!(f, "Cannot write output, io error occurred : {io_err}"),
Self::ArrowError(arrow_err) => {
write!(f, "Cannot write output, arrow error occurred : {arrow_err}")
}
}
}
}

0 comments on commit 3beb0f7

Please sign in to comment.