diff --git a/dozer-core/src/executor/mod.rs b/dozer-core/src/executor/mod.rs index a2e72a9976..45f86bb5c1 100644 --- a/dozer-core/src/executor/mod.rs +++ b/dozer-core/src/executor/mod.rs @@ -166,9 +166,14 @@ fn start_source( Ok(_) => {} // Channel disconnection means the source listener has quit. // Maybe it quit gracefully so we don't need to panic. - Err(ExecutionError::CannotSendToChannel) => {} - // Other errors result in panic. - Err(e) => std::panic::panic_any(e), + Err(e) => { + if let ExecutionError::Source(e) = &e { + if let Some(ExecutionError::CannotSendToChannel) = e.downcast_ref() { + return; + } + } + std::panic::panic_any(e); + } }) .map_err(ExecutionError::CannotSpawnWorkerThread)?; diff --git a/dozer-ingestion/src/connectors/object_store/connector.rs b/dozer-ingestion/src/connectors/object_store/connector.rs index 10b3f791cd..2c68f1db19 100644 --- a/dozer-ingestion/src/connectors/object_store/connector.rs +++ b/dozer-ingestion/src/connectors/object_store/connector.rs @@ -119,22 +119,14 @@ impl Connector for ObjectStoreConnector { } Some(evt) => { match evt { - IngestionMessageKind::SnapshottingStarted => { - ingestor_clone - .handle_message(IngestionMessage::new_snapshotting_started( - 0, seq_no, - )) - .map_err(ConnectorError::IngestorError) - .unwrap(); - } - IngestionMessageKind::SnapshottingDone => { - ingestor_clone - .handle_message(IngestionMessage::new_snapshotting_done( - 0, seq_no, - )) - .map_err(ConnectorError::IngestorError) - .unwrap(); - } + IngestionMessageKind::SnapshottingStarted => ingestor_clone + .handle_message(IngestionMessage::new_snapshotting_started( + 0, seq_no, + )) + .map_err(ConnectorError::IngestorError)?, + IngestionMessageKind::SnapshottingDone => ingestor_clone + .handle_message(IngestionMessage::new_snapshotting_done(0, seq_no)) + .map_err(ConnectorError::IngestorError)?, IngestionMessageKind::OperationEvent { table_index, op } => { ingestor_clone .handle_message(IngestionMessage::new_op( @@ -143,14 +135,14 @@ impl Connector for ObjectStoreConnector { table_index, op, )) - .map_err(ConnectorError::IngestorError) - .unwrap(); + .map_err(ConnectorError::IngestorError)? } } seq_no += 1; } } } + Ok::<_, ConnectorError>(()) }); // sender sending out message for pipeline diff --git a/dozer-ingestion/src/connectors/object_store/table_reader.rs b/dozer-ingestion/src/connectors/object_store/table_reader.rs index 5c29529812..8f54fc86ca 100644 --- a/dozer-ingestion/src/connectors/object_store/table_reader.rs +++ b/dozer-ingestion/src/connectors/object_store/table_reader.rs @@ -103,13 +103,16 @@ impl TableReader { }, }; - sender + if sender .send(Ok(Some(IngestionMessageKind::OperationEvent { table_index, op: evt, }))) .await - .unwrap(); + .is_err() + { + break; + } } }