diff --git a/dozer-ingestion/src/connectors/grpc/adapter/arrow.rs b/dozer-ingestion/src/connectors/grpc/adapter/arrow.rs index 5989a2f496..94c1ccdcc6 100644 --- a/dozer-ingestion/src/connectors/grpc/adapter/arrow.rs +++ b/dozer-ingestion/src/connectors/grpc/adapter/arrow.rs @@ -137,7 +137,7 @@ fn map_record_batch( ) -> Result, ConnectorError> { let mut buf = Bytes::from(req.records).reader(); // read stream back - let mut reader = StreamReader::try_new(&mut buf, None).unwrap(); + let mut reader = StreamReader::try_new(&mut buf, None)?; let mut records = Vec::new(); while let Some(Ok(batch)) = reader.next() { let b_recs = map_record_batch_to_dozer_records(batch, schema) diff --git a/dozer-ingestion/src/errors.rs b/dozer-ingestion/src/errors.rs index f732f37236..32b0425704 100644 --- a/dozer-ingestion/src/errors.rs +++ b/dozer-ingestion/src/errors.rs @@ -1,5 +1,6 @@ #![allow(clippy::enum_variant_names)] +use deltalake::arrow::error::ArrowError; use dozer_log::errors::{ReaderBuilderError, ReaderError}; use dozer_types::errors::internal::BoxedError; use dozer_types::errors::types::{DeserializationError, SerializationError, TypeError}; @@ -52,6 +53,8 @@ pub enum ConnectorError { #[error("Unsupported grpc adapter: {0} {1}")] UnsupportedGrpcAdapter(String, String), + #[error("Arrow error: {0}")] + Arrow(#[from] ArrowError), #[error("Table not found: {0}")] TableNotFound(String),