From 106c34b543b22b13ddc2ed19d51d458b274cf42e Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Fri, 24 Nov 2023 16:01:26 +0100 Subject: [PATCH] fix ingration tests hanging up --- capture-server/tests/common.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/capture-server/tests/common.rs b/capture-server/tests/common.rs index befc7b8..d74b5bf 100644 --- a/capture-server/tests/common.rs +++ b/capture-server/tests/common.rs @@ -18,7 +18,8 @@ use rdkafka::consumer::{BaseConsumer, Consumer}; use rdkafka::util::Timeout; use rdkafka::{Message, TopicPartitionList}; use tokio::sync::Notify; -use tracing::debug; +use tokio::time::timeout; +use tracing::{debug, warn}; use capture::config::{Config, KafkaConfig}; use capture::server::serve; @@ -175,9 +176,14 @@ impl EphemeralTopic { impl Drop for EphemeralTopic { fn drop(&mut self) { debug!("dropping EphemeralTopic {}...", self.topic_name); - _ = self.consumer.unassign(); - futures::executor::block_on(delete_topic(self.topic_name.clone())); - debug!("dropped topic"); + self.consumer.unsubscribe(); + match futures::executor::block_on(timeout( + Duration::from_secs(10), + delete_topic(self.topic_name.clone()), + )) { + Ok(_) => debug!("dropped topic"), + Err(err) => warn!("failed to drop topic: {}", err), + } } }