From 895eab4756a33dc0a56103e331fb6214ad8877a9 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 25 Sep 2024 13:00:51 +0800 Subject: [PATCH] fix notify fix notify fmt --- e2e_test/subscription/main.py | 2 ++ src/frontend/src/observer/observer_manager.rs | 31 ++++++++++++++++--- src/frontend/src/session/cursor_manager.rs | 24 +++++++------- 3 files changed, 40 insertions(+), 17 deletions(-) diff --git a/e2e_test/subscription/main.py b/e2e_test/subscription/main.py index 87c2b6e7f4ab..6107d072fc82 100644 --- a/e2e_test/subscription/main.py +++ b/e2e_test/subscription/main.py @@ -275,6 +275,8 @@ def test_cursor_with_table_alter(): row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([1,2],row[0],"Insert") row = execute_query("fetch next from cur with (timeout = '2s')",conn) + assert row == [] + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([4,4,4],row[0],"Insert") execute_insert("insert into t1 values(5,5,5)",conn) execute_insert("flush",conn) diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 920c36ab887d..cc0c4cd93b7f 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -92,8 +92,19 @@ impl ObserverState for FrontendObserverNode { let table_ids = deltas .version_deltas .iter() - .flat_map(|version_deltas| version_deltas.change_log_delta.keys()) - .map(|table_id| TableId::new(*table_id)) + .flat_map(|version_deltas| &version_deltas.change_log_delta) + .filter_map(|(table_id, change_log)| match change_log.new_log.as_ref() { + Some(new_log) => { + let new_value_empty = new_log.new_value.is_empty(); + let old_value_empty = new_log.old_value.is_empty(); + if !new_value_empty || !old_value_empty { + Some(TableId::new(table_id.clone())) + } else { + None + } + } + None => None, + }) .collect_vec(); self.handle_hummock_snapshot_notification(deltas); self.handle_cursor_notify(table_ids); @@ -203,8 +214,14 @@ impl ObserverState for FrontendObserverNode { let hummock_version = FrontendHummockVersion::from_protobuf(hummock_version.unwrap()); let table_ids = hummock_version .table_change_log - .keys() - .cloned() + .iter() + .filter_map(|(table_id, change_log)| { + if change_log.get_non_empty_epochs(0, usize::MAX).is_empty() { + None + } else { + Some(table_id.clone()) + } + }) .collect_vec(); self.hummock_snapshot_manager.init(hummock_version); self.handle_cursor_notify(table_ids); @@ -496,6 +513,9 @@ impl FrontendObserverNode { } fn handle_cursor_notify(&self, table_ids: Vec) { + if table_ids.is_empty() { + return; + } for session in self.sessions_map.read().values() { session .get_cursor_manager() @@ -505,6 +525,9 @@ impl FrontendObserverNode { } fn handle_cursor_remove_table_ids(&self, table_ids: Vec) { + if table_ids.is_empty() { + return; + } for session in self.sessions_map.read().values() { session .get_cursor_manager() diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 9118579dddf5..616eb2a56938 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -31,7 +31,7 @@ use risingwave_common::error::BoxedError; use risingwave_common::session_config::QueryMode; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; -use tokio::sync::watch::{channel, Receiver, Sender}; +use tokio::sync::broadcast::{channel, Sender}; use tokio::sync::{mpsc, oneshot, RwLock}; use super::SessionImpl; @@ -921,9 +921,8 @@ enum CursorNotifyState { RemoveTables(Vec), NotifyTables(Vec), } -pub type CursorNotifyMapRef = Arc, Receiver<()>)>>>; pub struct CursorNotifies { - cursor_notify_map: CursorNotifyMapRef, + cursor_notify_map: Arc>>>, sender: mpsc::UnboundedSender, shutdown_tx: Option>, } @@ -932,9 +931,7 @@ impl CursorNotifies { pub fn new() -> Self { let (sender, mut receiver) = mpsc::unbounded_channel(); let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); - let cursor_notify_map = Arc::new(RwLock::new( - HashMap::, Receiver<()>)>::new(), - )); + let cursor_notify_map = Arc::new(RwLock::new(HashMap::>::new())); let cursor_notify_map_clone = cursor_notify_map.clone(); tokio::spawn(async move { loop { @@ -953,8 +950,9 @@ impl CursorNotifies { Some(CursorNotifyState::NotifyTables(table_ids)) => { let cursor_notify_map_clone_read = cursor_notify_map_clone.read().await; for table_id in table_ids { - if let Some((tx,_rx)) = cursor_notify_map_clone_read.get(&table_id.table_id()) { - tx.send(()).unwrap(); + if let Some(tx) = cursor_notify_map_clone_read.get(&table_id.table_id()) { + // Maybe there's no cursor. + tx.send(()).ok(); } } } @@ -976,18 +974,18 @@ impl CursorNotifies { pub async fn wait_next_epoch(&self, id: u32, timeout_seconds: Option) -> Result<()> { let mut rx = { let mut cursor_notify_map_write = self.cursor_notify_map.write().await; - let (_tx, rx) = cursor_notify_map_write.entry(id).or_insert_with(|| { - let (tx, rx) = channel(()); - (tx, rx) + let tx = cursor_notify_map_write.entry(id).or_insert_with(|| { + let (tx, _rx) = channel(10); + tx }); - rx.clone() + tx.subscribe() }; let mut timeout_interval = tokio::time::interval(Duration::from_secs(timeout_seconds.unwrap_or(u64::MAX))); timeout_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); timeout_interval.tick().await; tokio::select! { - result = rx.changed() => { + result = rx.recv() => { result.map_err(|_| { ErrorCode::InternalError(format!("Cursor dependent table deleted: table_id is {:?}", id)) })?;