From 26481e7aaa23dc539e3a7cf59ba1cbb5e57d1a92 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Tue, 17 Dec 2024 14:31:45 +0100 Subject: [PATCH] Stop publication cache task on disconnected channels --- zenoh-ext/src/publication_cache.rs | 78 ++++++++++++++++-------------- 1 file changed, 41 insertions(+), 37 deletions(-) diff --git a/zenoh-ext/src/publication_cache.rs b/zenoh-ext/src/publication_cache.rs index bdc1d3d11..efe832236 100644 --- a/zenoh-ext/src/publication_cache.rs +++ b/zenoh-ext/src/publication_cache.rs @@ -260,34 +260,53 @@ impl PublicationCache { tokio::select! { // on publication received by the local subscriber, store it sample = sub_recv.recv_async() => { - if let Ok(sample) = sample { - let queryable_key_expr: KeyExpr<'_> = if let Some(prefix) = &queryable_prefix { - prefix.join(&sample.key_expr()).unwrap().into() - } else { - sample.key_expr().clone() - }; - - if let Some(queue) = cache.get_mut(queryable_key_expr.as_keyexpr()) { - if queue.len() >= history { - queue.pop_front(); - } - queue.push_back(sample); - } else if cache.len() >= limit { - tracing::error!("PublicationCache on {}: resource_limit exceeded - can't cache publication for a new resource", - pub_key_expr); - } else { - let mut queue: VecDeque = VecDeque::new(); - queue.push_back(sample); - cache.insert(queryable_key_expr.into(), queue); + let Ok(sample) = sample else { + return; + }; + + let queryable_key_expr: KeyExpr<'_> = if let Some(prefix) = &queryable_prefix { + prefix.join(&sample.key_expr()).unwrap().into() + } else { + sample.key_expr().clone() + }; + + if let Some(queue) = cache.get_mut(queryable_key_expr.as_keyexpr()) { + if queue.len() >= history { + queue.pop_front(); } + queue.push_back(sample); + } else if cache.len() >= limit { + tracing::error!("PublicationCache on {}: resource_limit exceeded - can't cache publication for a new resource", + pub_key_expr); + } else { + let mut queue: VecDeque = VecDeque::new(); + queue.push_back(sample); + cache.insert(queryable_key_expr.into(), queue); } }, // on query, reply with cached content query = quer_recv.recv_async() => { - if let Ok(query) = query { - if !query.key_expr().as_str().contains('*') { - if let Some(queue) = cache.get(query.key_expr().as_keyexpr()) { + let Ok(query) = query else { + return; + }; + + if !query.key_expr().as_str().contains('*') { + if let Some(queue) = cache.get(query.key_expr().as_keyexpr()) { + for sample in queue { + if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) { + if !time_range.contains(timestamp.get_time().to_system_time()){ + continue; + } + } + if let Err(e) = query.reply_sample(sample.clone()).await { + tracing::warn!("Error replying to query: {}", e); + } + } + } + } else { + for (key_expr, queue) in cache.iter() { + if query.key_expr().intersects(unsafe{ keyexpr::from_str_unchecked(key_expr) }) { for sample in queue { if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) { if !time_range.contains(timestamp.get_time().to_system_time()){ @@ -299,21 +318,6 @@ impl PublicationCache { } } } - } else { - for (key_expr, queue) in cache.iter() { - if query.key_expr().intersects(unsafe{ keyexpr::from_str_unchecked(key_expr) }) { - for sample in queue { - if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) { - if !time_range.contains(timestamp.get_time().to_system_time()){ - continue; - } - } - if let Err(e) = query.reply_sample(sample.clone()).await { - tracing::warn!("Error replying to query: {}", e); - } - } - } - } } } },