Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
fix ingration tests hanging up
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Nov 24, 2023
1 parent fc2ea91 commit 106c34b
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::util::Timeout;
use rdkafka::{Message, TopicPartitionList};
use tokio::sync::Notify;
use tracing::debug;
use tokio::time::timeout;
use tracing::{debug, warn};

use capture::config::{Config, KafkaConfig};
use capture::server::serve;
Expand Down Expand Up @@ -175,9 +176,14 @@ impl EphemeralTopic {
impl Drop for EphemeralTopic {
fn drop(&mut self) {
debug!("dropping EphemeralTopic {}...", self.topic_name);
_ = self.consumer.unassign();
futures::executor::block_on(delete_topic(self.topic_name.clone()));
debug!("dropped topic");
self.consumer.unsubscribe();
match futures::executor::block_on(timeout(
Duration::from_secs(10),
delete_topic(self.topic_name.clone()),
)) {
Ok(_) => debug!("dropped topic"),
Err(err) => warn!("failed to drop topic: {}", err),
}
}
}

Expand Down

0 comments on commit 106c34b

Please sign in to comment.