-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(subscription): support blocking cursor #18675
Conversation
fix notify fmt
a7749eb
to
c413039
Compare
if table_ids.is_empty() { | ||
return; | ||
} | ||
for session in self.sessions_map.read().values() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently one CursorNotifier per session and within each CursorNotifier, we have one channel per table. Assume we have N sessions and M tables in total, we can end up creating N*M channels. Not to mention that we have a task in CursorNotifier to maintain the per table channel. These seem inefficient and complicated. IMO, a better way to implement the notification is:
- Create one and only one watch channel to listen on table changed log updates.
- When cursor needs to wait for changelog epoch changes for a table, it only needs to subscribe to this channel.
Implementation-wise, maybe we can store HummockSnapshotManager in each cursor instance and implement a wait_for_epoch similar to the method here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I think the current cursor implementation in frontend can be greatly simplified. The most complicated part in the current cursor is the state machine. The async feature of rust is a very powerful tool to generate state machine. So for a cursor, it can be implemented as a stream
#[try_stream(...)]
async fn cursor_stream(...) {
if need_snapshot {
for row in create_snapshot_iter(table_id) { yield row; }
}
let mut current_epoch = ...;
loop {
for row in change_log(current_epoch, table_id) { yield row; }
current_epoch = snapshot_manager.wait_next_epoch(current_epoch, table_id);
}
}
When a cursor is created, we can create such stream and hold it in the session. On a fetch
, we just call cursor_stream.next().await
, and the timeout can be applied with simply timeout(timeout_duration, cursor_stream.next()).await
.
We can be aware of table drop in snapshot_manager
, when we see that the table_id
does not exist in the state_table_info
of FrontendHummockVersion
anymore, and then changes on catalog in this PR can be totally avoided.
@@ -471,16 +487,31 @@ impl SubscriptionCursor { | |||
ans.push(row); | |||
} | |||
None => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does None
here always mean no data? I saw that we will return None when field mismatches here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, none means that there is no data for this call to next_row
if table_ids.is_empty() { | ||
return; | ||
} | ||
for session in self.sessions_map.read().values() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I think the current cursor implementation in frontend can be greatly simplified. The most complicated part in the current cursor is the state machine. The async feature of rust is a very powerful tool to generate state machine. So for a cursor, it can be implemented as a stream
#[try_stream(...)]
async fn cursor_stream(...) {
if need_snapshot {
for row in create_snapshot_iter(table_id) { yield row; }
}
let mut current_epoch = ...;
loop {
for row in change_log(current_epoch, table_id) { yield row; }
current_epoch = snapshot_manager.wait_next_epoch(current_epoch, table_id);
}
}
When a cursor is created, we can create such stream and hold it in the session. On a fetch
, we just call cursor_stream.next().await
, and the timeout can be applied with simply timeout(timeout_duration, cursor_stream.next()).await
.
We can be aware of table drop in snapshot_manager
, when we see that the table_id
does not exist in the state_table_info
of FrontendHummockVersion
anymore, and then changes on catalog in this PR can be totally avoided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
#18208
In this pr, we support blocking subscription cursor and all cursor timeout.
the stmt of our fetch cursor becomes
fetch next/num from cursor_name with (timeout = 'xx');
about subscription cursor's
timeout
:timeout
time to execute. This time we will returns all the values currently fetched.timeout
about query cursor's
timeout
:only support 1
When
with (timeout = 'xx')
is not set,The query timeout is u64::max
The block timeout is 0
timeout
is a string in interval format.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
Support
fetch next/num from cursor_name with (timeout = 'xx');
About subscription cursor's
timeout
:timeout
time to execute. This time we will returns all the values currently fetched.timeout
About query cursor's
timeout
:only support 1
When
with (timeout = 'xx')
is not set,In the case of 1, this corresponds to timeout = u64::max
In the case of 2, this corresponds to timeout = 0
timeout
is a string in interval format.