Skip to content

Commit

Permalink
fix notify
Browse files Browse the repository at this point in the history
fix notify

fmt
  • Loading branch information
xxhZs committed Sep 25, 2024
1 parent a5ba069 commit 895eab4
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 17 deletions.
2 changes: 2 additions & 0 deletions e2e_test/subscription/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ def test_cursor_with_table_alter():
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([1,2],row[0],"Insert")
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
assert row == []
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([4,4,4],row[0],"Insert")
execute_insert("insert into t1 values(5,5,5)",conn)
execute_insert("flush",conn)
Expand Down
31 changes: 27 additions & 4 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,19 @@ impl ObserverState for FrontendObserverNode {
let table_ids = deltas
.version_deltas
.iter()
.flat_map(|version_deltas| version_deltas.change_log_delta.keys())
.map(|table_id| TableId::new(*table_id))
.flat_map(|version_deltas| &version_deltas.change_log_delta)
.filter_map(|(table_id, change_log)| match change_log.new_log.as_ref() {
Some(new_log) => {
let new_value_empty = new_log.new_value.is_empty();
let old_value_empty = new_log.old_value.is_empty();
if !new_value_empty || !old_value_empty {
Some(TableId::new(table_id.clone()))
} else {
None
}
}
None => None,
})
.collect_vec();
self.handle_hummock_snapshot_notification(deltas);
self.handle_cursor_notify(table_ids);
Expand Down Expand Up @@ -203,8 +214,14 @@ impl ObserverState for FrontendObserverNode {
let hummock_version = FrontendHummockVersion::from_protobuf(hummock_version.unwrap());
let table_ids = hummock_version
.table_change_log
.keys()
.cloned()
.iter()
.filter_map(|(table_id, change_log)| {
if change_log.get_non_empty_epochs(0, usize::MAX).is_empty() {
None
} else {
Some(table_id.clone())
}
})
.collect_vec();
self.hummock_snapshot_manager.init(hummock_version);
self.handle_cursor_notify(table_ids);
Expand Down Expand Up @@ -496,6 +513,9 @@ impl FrontendObserverNode {
}

fn handle_cursor_notify(&self, table_ids: Vec<TableId>) {
if table_ids.is_empty() {
return;
}
for session in self.sessions_map.read().values() {
session
.get_cursor_manager()
Expand All @@ -505,6 +525,9 @@ impl FrontendObserverNode {
}

fn handle_cursor_remove_table_ids(&self, table_ids: Vec<TableId>) {
if table_ids.is_empty() {
return;
}
for session in self.sessions_map.read().values() {
session
.get_cursor_manager()
Expand Down
24 changes: 11 additions & 13 deletions src/frontend/src/session/cursor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_common::error::BoxedError;
use risingwave_common::session_config::QueryMode;
use risingwave_common::types::DataType;
use risingwave_sqlparser::ast::{Ident, ObjectName, Statement};
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::sync::broadcast::{channel, Sender};
use tokio::sync::{mpsc, oneshot, RwLock};

use super::SessionImpl;
Expand Down Expand Up @@ -921,9 +921,8 @@ enum CursorNotifyState {
RemoveTables(Vec<TableId>),
NotifyTables(Vec<TableId>),
}
pub type CursorNotifyMapRef = Arc<RwLock<HashMap<u32, (Sender<()>, Receiver<()>)>>>;
pub struct CursorNotifies {
cursor_notify_map: CursorNotifyMapRef,
cursor_notify_map: Arc<RwLock<HashMap<u32, Sender<()>>>>,
sender: mpsc::UnboundedSender<CursorNotifyState>,
shutdown_tx: Option<oneshot::Sender<()>>,
}
Expand All @@ -932,9 +931,7 @@ impl CursorNotifies {
pub fn new() -> Self {
let (sender, mut receiver) = mpsc::unbounded_channel();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let cursor_notify_map = Arc::new(RwLock::new(
HashMap::<u32, (Sender<()>, Receiver<()>)>::new(),
));
let cursor_notify_map = Arc::new(RwLock::new(HashMap::<u32, Sender<()>>::new()));
let cursor_notify_map_clone = cursor_notify_map.clone();
tokio::spawn(async move {
loop {
Expand All @@ -953,8 +950,9 @@ impl CursorNotifies {
Some(CursorNotifyState::NotifyTables(table_ids)) => {
let cursor_notify_map_clone_read = cursor_notify_map_clone.read().await;
for table_id in table_ids {
if let Some((tx,_rx)) = cursor_notify_map_clone_read.get(&table_id.table_id()) {
tx.send(()).unwrap();
if let Some(tx) = cursor_notify_map_clone_read.get(&table_id.table_id()) {
// Maybe there's no cursor.
tx.send(()).ok();
}
}
}
Expand All @@ -976,18 +974,18 @@ impl CursorNotifies {
pub async fn wait_next_epoch(&self, id: u32, timeout_seconds: Option<u64>) -> Result<()> {
let mut rx = {
let mut cursor_notify_map_write = self.cursor_notify_map.write().await;
let (_tx, rx) = cursor_notify_map_write.entry(id).or_insert_with(|| {
let (tx, rx) = channel(());
(tx, rx)
let tx = cursor_notify_map_write.entry(id).or_insert_with(|| {
let (tx, _rx) = channel(10);
tx
});
rx.clone()
tx.subscribe()
};
let mut timeout_interval =
tokio::time::interval(Duration::from_secs(timeout_seconds.unwrap_or(u64::MAX)));
timeout_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
timeout_interval.tick().await;
tokio::select! {
result = rx.changed() => {
result = rx.recv() => {
result.map_err(|_| {
ErrorCode::InternalError(format!("Cursor dependent table deleted: table_id is {:?}", id))
})?;
Expand Down

0 comments on commit 895eab4

Please sign in to comment.