-
Notifications
You must be signed in to change notification settings - Fork 599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: deprecate StreamChunkWithState #14524
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is on the right track
Is the PR ready for review? |
No, still lack the "write to state table" part. |
pub fn get_split_offset_mapping_from_chunk( | ||
chunk: &StreamChunk, | ||
partition_idx: usize, | ||
offset_idx: usize, | ||
) -> Option<HashMap<SplitId, String>> { | ||
let mut split_offset_mapping = HashMap::new(); | ||
for (_, row) in chunk.rows() { | ||
let split_id = row.datum_at(partition_idx).unwrap().into_utf8().into(); | ||
let offset = row.datum_at(offset_idx).unwrap().into_utf8(); | ||
split_offset_mapping.insert(split_id, offset.to_string()); | ||
} | ||
Some(split_offset_mapping) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an additional round of iteration compared with StreamChunkWithState. We might need to benchmark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also do it in parser, just moving the logic here. It is inevitable to iter over each chunk to get the latest offsets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I know that. I emphasize "additional" here. i.e., we need to iterate here, but still need to iterate in the original place for other things.. So it's not "just moving"
The committer owns the performance too, please check https://www.notion.so/risingwave-labs/Manually-Build-Image-and-Run-Performance-Longevity-Test-b784f1eae1cf48889b2645d020b6b7d3?pvs=4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM
LGTM |
Another question: previously we only record the offset of the last record for each split every time when yielding a chunk. Now we make it per-record.
|
Not sure about what "corresponds" indicates. For now, we ensure the offset for each columns is distinct, and the latest offset of each split in one chunk is equal to the split_offset_mapping before refactor.
IIUC, there are 2 cases that offsets are used. In Source Executor, offsets of partitions are persisted and used in recovery. In Fetch Executor, offsets are used to track if the file on reading reaches the end so we can move on another file then. And this refactor can cover both cases. |
I'd like to take a look. |
@@ -42,3 +49,51 @@ pub async fn barrier_to_message_stream(mut rx: UnboundedReceiver<Barrier>) { | |||
} | |||
bail!("barrier reader closed unexpectedly"); | |||
} | |||
|
|||
pub fn get_split_offset_mapping_from_chunk( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we still rely on the split_offset_mapping
to update source internal state, do you plan to refactor that part in future? We should ensure the semantic is same as before I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
split_offset_mapping
is actually the latest offset of each partitions, we cannot avoid this round of iteration to compute it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General LGTM
I assume you meant to say rows instead of columns. That's exactly what I was going to ask! |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Resolve #14384.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.