Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source split change may use stale cache #15591

Closed
xxchan opened this issue Mar 11, 2024 · 2 comments
Closed

source split change may use stale cache #15591

xxchan opened this issue Mar 11, 2024 · 2 comments
Assignees
Labels
component/connector type/bug Something isn't working
Milestone

Comments

@xxchan
Copy link
Member

xxchan commented Mar 11, 2024

          > For existing splits, we should use the latest offset from the cache. `target_splits` is from meta and contains the initial offset.

I'm unsure whether this is always correct when scaling. The ideas are similar to the documentation on the function cache_may_stale. The safest way should be always recovering states from the state store.

Originally posted by @BugenZhao in #14172 (comment)

Copy link
Contributor

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.

@xxchan
Copy link
Member Author

xxchan commented Sep 18, 2024

When scaling out, the partition of the existing executors will likely shrink and becomes a subset of the previous one

This problem will not happen for source, since we ensure the memory states are always the same as assigned ones:

if split_changed {
tracing::info!(
actor_id = self.actor_ctx.id,
state = ?target_state,
"apply split change"
);
core.updated_splits_in_epoch
.retain(|split_id, _| target_state.contains_key(split_id));
let dropped_splits = core
.latest_split_info
.extract_if(|split_id, _| !target_state.contains_key(split_id))
.map(|(_, split)| split)
.collect_vec();
if should_trim_state && !dropped_splits.is_empty() {
// trim dropped splits' state
core.split_state_store.trim_state(&dropped_splits).await?;
}
core.latest_split_info = target_state;

@xxchan xxchan closed this as completed Sep 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/connector type/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants