Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: deprecate StreamChunkWithState #14524

Merged
merged 47 commits into from
Jan 25, 2024

Conversation

Rossil2012
Copy link
Contributor

@Rossil2012 Rossil2012 commented Jan 12, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Resolve #14384.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@Rossil2012 Rossil2012 changed the title Kanzhen/deprecate chunk with state refactor: deprecate StreamChunkWithState Jan 12, 2024
@Rossil2012 Rossil2012 marked this pull request as ready for review January 15, 2024 06:22
@Rossil2012 Rossil2012 requested a review from tabVersion January 15, 2024 06:22
Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is on the right track

src/common/src/catalog/mod.rs Outdated Show resolved Hide resolved
src/connector/src/parser/mod.rs Outdated Show resolved Hide resolved
src/source/src/source_desc.rs Outdated Show resolved Hide resolved
src/source/src/source_desc.rs Outdated Show resolved Hide resolved
src/stream/src/executor/source/mod.rs Outdated Show resolved Hide resolved
src/stream/src/executor/source/fs_source_executor.rs Outdated Show resolved Hide resolved
@BugenZhao
Copy link
Member

Is the PR ready for review?

@BugenZhao BugenZhao self-requested a review January 16, 2024 03:02
@tabVersion tabVersion marked this pull request as draft January 16, 2024 04:35
@tabVersion
Copy link
Contributor

Is the PR ready for review?

No, still lack the "write to state table" part.

Comment on lines 52 to 64
pub fn get_split_offset_mapping_from_chunk(
chunk: &StreamChunk,
partition_idx: usize,
offset_idx: usize,
) -> Option<HashMap<SplitId, String>> {
let mut split_offset_mapping = HashMap::new();
for (_, row) in chunk.rows() {
let split_id = row.datum_at(partition_idx).unwrap().into_utf8().into();
let offset = row.datum_at(offset_idx).unwrap().into_utf8();
split_offset_mapping.insert(split_id, offset.to_string());
}
Some(split_offset_mapping)
}
Copy link
Member

@xxchan xxchan Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an additional round of iteration compared with StreamChunkWithState. We might need to benchmark.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also do it in parser, just moving the logic here. It is inevitable to iter over each chunk to get the latest offsets.

Copy link
Member

@xxchan xxchan Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I know that. I emphasize "additional" here. i.e., we need to iterate here, but still need to iterate in the original place for other things.. So it's not "just moving"

@lmatz
Copy link
Contributor

lmatz commented Jan 23, 2024

The committer owns the performance too, please check https://www.notion.so/risingwave-labs/Manually-Build-Image-and-Run-Performance-Longevity-Test-b784f1eae1cf48889b2645d020b6b7d3?pvs=4

Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM

src/connector/src/source/manager.rs Outdated Show resolved Hide resolved
src/connector/src/source/nexmark/source/reader.rs Outdated Show resolved Hide resolved
@Rossil2012
Copy link
Contributor Author

Before refactor:
截屏2024-01-25 10 39 52
After refactor:
截屏2024-01-25 10 40 58

Is the performance degradation acceptable?

@BugenZhao
Copy link
Member

Is the performance degradation acceptable?

LGTM

@BugenZhao
Copy link
Member

Another question: previously we only record the offset of the last record for each split every time when yielding a chunk. Now we make it per-record.

  • Do we ensure that the offset for each record corresponds exactly?
  • Is there or will there be any downstream relying on this invariant?

@BugenZhao BugenZhao requested a review from StrikeW January 25, 2024 03:46
@Rossil2012
Copy link
Contributor Author

  • Do we ensure that the offset for each record corresponds exactly?

Not sure about what "corresponds" indicates. For now, we ensure the offset for each columns is distinct, and the latest offset of each split in one chunk is equal to the split_offset_mapping before refactor.

  • Is there or will there be any downstream relying on this invariant?

IIUC, there are 2 cases that offsets are used. In Source Executor, offsets of partitions are persisted and used in recovery. In Fetch Executor, offsets are used to track if the file on reading reaches the end so we can move on another file then. And this refactor can cover both cases.

@StrikeW
Copy link
Contributor

StrikeW commented Jan 25, 2024

I'd like to take a look.

@@ -42,3 +49,51 @@ pub async fn barrier_to_message_stream(mut rx: UnboundedReceiver<Barrier>) {
}
bail!("barrier reader closed unexpectedly");
}

pub fn get_split_offset_mapping_from_chunk(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we still rely on the split_offset_mapping to update source internal state, do you plan to refactor that part in future? We should ensure the semantic is same as before I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split_offset_mapping is actually the latest offset of each partitions, we cannot avoid this round of iteration to compute it.

Copy link
Contributor

@StrikeW StrikeW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General LGTM

@Rossil2012 Rossil2012 added this pull request to the merge queue Jan 25, 2024
Merged via the queue into main with commit 372c2d7 Jan 25, 2024
26 of 27 checks passed
@Rossil2012 Rossil2012 deleted the kanzhen/deprecate_chunk_with_state branch January 25, 2024 11:14
@BugenZhao
Copy link
Member

Not sure about what "corresponds" indicates. For now, we ensure the offset for each columns is distinct

I assume you meant to say rows instead of columns. That's exactly what I was going to ask!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

refactor: deprecate StreamChunkWithState for source state
6 participants