Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: nexmark q0 #8712

Open
Tracked by #7289
kwannoel opened this issue Mar 22, 2023 · 22 comments
Open
Tracked by #7289

perf: nexmark q0 #8712

kwannoel opened this issue Mar 22, 2023 · 22 comments

Comments

@kwannoel
Copy link
Contributor

kwannoel commented Mar 22, 2023

Background

In recent benchmark Flink had average throughput of 1M r/s. RW had average throughput of 850K r/s. Requires a 17% improvement to match Flink. Thanks to @huangjw806 for spotting this.

Flamegraph can be found here, under Artifacts.

query

    CREATE SINK nexmark_q0
    AS
    SELECT auction, bidder, price, date_time
    FROM bid
    WITH ( connector = 'blackhole', type = 'append-only');

plan

   QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 StreamSink { type: append-only, columns: [auction, bidder, price, date_time] }
 └─StreamProject { exprs: [$expr1, $expr2, $expr3, $expr4] }
   └─StreamProject { exprs: [Field(bid, 0:Int32) as $expr1, Field(bid, 1:Int32) as $expr2, Field(bid, 2:Int32) as $expr3, Field(bid, 5:Int32) as $expr4, _row_id] }
     └─StreamFilter { predicate: (event_type = 2:Int32) }
       └─StreamRowIdGen { row_id_index: 5 }
         └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_rw_kafka_timestamp", "_row_id"] }

Here are screenshots of the flamegraph, highlighting cost centers.

Screenshot 2023-03-22 at 5 31 00 PM

Screenshot 2023-03-22 at 5 31 21 PM

Screenshot 2023-03-22 at 5 33 52 PM

Screenshot 2023-03-22 at 5 33 59 PM

Screenshot 2023-03-22 at 5 34 09 PM

Screenshot 2023-03-22 at 5 34 30 PM

@github-actions github-actions bot added this to the release-0.19 milestone Mar 22, 2023
@kwannoel
Copy link
Contributor Author

Please append to screenshots if you find something interesting in the flamegraph.

@lmatz
Copy link
Contributor

lmatz commented Mar 22, 2023

https://github.com/risingwavelabs/risingwave/blob/main/src/common/src/array/mod.rs#L500-L514

SCR-20230322-s0g

append_datum_n itself takes a non-negligible amount of time, does it make sense?

@lmatz
Copy link
Contributor

lmatz commented Mar 22, 2023

@lmatz
Copy link
Contributor

lmatz commented Mar 22, 2023

https://github.com/risingwavelabs/risingwave/blob/main/src/connector/src/parser/json_parser.rs#L122

done to_ascii_lowercase() once in advance, out of the closure/loop?

Edit:
Done in #8718

@lmatz
Copy link
Contributor

lmatz commented Mar 22, 2023

https://github.com/risingwavelabs/risingwave/blob/main/src/connector/src/parser/mod.rs#L200

Vec::with_capacity() instead of vec![]?

Edit:
Done in #8718

@lmatz
Copy link
Contributor

lmatz commented Mar 23, 2023

I feel we need to find a good strategy to decide when to compact in the ProjectExecutor
https://github.com/risingwavelabs/risingwave/blob/main/src/stream/src/executor/project.rs#L113

In this case, as the expr is really computation-light, compact itself introduces 11% overhead.

Edit:
Probably not, after all, we need to output those visible ones only to an external sink, so we have to do some compaction somewhere before the final stage.

@kwannoel
Copy link
Contributor Author

kwannoel commented Mar 23, 2023

Probably not, after all, we need to output those visible ones only to an external sink, so we have to do some compaction somewhere before the final stage.

Not sure I understand why compaction is required in this case. Why can't we just output visible rows to external sink?

@lmatz
Copy link
Contributor

lmatz commented Mar 23, 2023

https://github.com/risingwavelabs/risingwave/blob/main/src/connector/src/sink/remote.rs#L276-L294

Kind of depending on the output format requirement

if it asks for JSON, then we iterate through each row of the chunk, so we can choose not to compact.
But if it asks for stream chunk format, then we have to compact.

Since this is a blackhole sink, we can save the compaction, you are right!

So whether to compact becomes something to determine at the stage of optimization?

@kwannoel
Copy link
Contributor Author

kwannoel commented Mar 23, 2023

https://github.com/risingwavelabs/risingwave/blob/main/src/connector/src/sink/remote.rs#L276-L294

Kind of depending on the output format requirement

if it asks for JSON, then we iterate through each row of the chunk, so we can choose not to compact. But if it asks for stream chunk format, then we have to compact.

Since this is a blackhole sink, we can save the compaction, you are right!

Hmm even in StreamChunk we can defer it to to_protobuf?

SinkPayloadFormat::StreamChunk => {
let prost_stream_chunk = chunk.to_protobuf();
let binary_data = Message::encode_to_vec(&prost_stream_chunk);
Payload::StreamChunkPayload(StreamChunkPayload { binary_data })
}

@lmatz
Copy link
Contributor

lmatz commented Mar 23, 2023

I think so, but this prost_stream_chunk seems not aware of visibility

@kwannoel
Copy link
Contributor Author

But typically will any system consume StreamChunk even? 👀
I thought it is used internally

@lmatz
Copy link
Contributor

lmatz commented Mar 23, 2023

But typically will any system consume StreamChunk even? 👀

I guess no, so we can save the final compact if it is connected to the sink/MV executor

@lmatz
Copy link
Contributor

lmatz commented Mar 23, 2023

For this particular query, we need to be cautious because of this two project issue.
It's a bug that has not been fixed.

The project below actually compacts in this case, so

save the final compact

is a wrong statement here, which is a correct one if we fixed this particular bug first.

@kwannoel
Copy link
Contributor Author

So whether to compact becomes something to determine at the stage of optimization?

Makes sense to me.

@kwannoel
Copy link
Contributor Author

For this particular query, we need to be cautious because of this two project issue. It's a bug that has not been fixed.

The project below actually compacts in this case, so

save the final compact

is a wrong statement here, which is a correct one if we fixed this particular bug first.

Linking it: #8577

@lmatz
Copy link
Contributor

lmatz commented Mar 23, 2023

#8577 (comment)

Which means we actually do no need to have StreamRowIdGen { row_id_index: 5 } right? @st1page

ok, not much overhead though from flamegraph

@lmatz
Copy link
Contributor

lmatz commented Mar 23, 2023

Guess SourceStreamChunkRowWriter is not efficient enough for those insert-only sources 🤔

@st1page
Copy link
Contributor

st1page commented Mar 24, 2023

I feel we need to find a good strategy to decide when to compact in the ProjectExecutor
https://github.com/risingwavelabs/risingwave/blob/main/src/stream/src/executor/project.rs#L113
In this case, as the expr is really computation-light, compact itself introduces 11% overhead.

done in #8758

But typically will any system consume StreamChunk even? eyes

I am not sure if we can sink the chunk into a system with arrow format 🤔

For this particular query, we need to be cautious because of this two project issue.
It's a bug that has not been fixed.
The project below actually compacts in this case, so

save the final compact

is a wrong statement here, which is a correct one if we fixed this particular bug first.

I think it can not help the compact performance issue because it will be compact in any project. If we have a plan ProjA->ProjB, the chunk will be compacted in the projA and the the ProjB::compact() will not be costly

@kwannoel
Copy link
Contributor Author

kwannoel commented Mar 24, 2023

I think the idea I have is slightly different, it's more to avoid compact even when there's filter, as in the case of q0.

When sinking, we don't need to build new chunk, instead we build protobuf / json encoded chunk. We can delay the top-most compact call until here, saving cost of building a new chunk, and relying on protobuf / json building step to remove invisible rows.

@st1page 's approach is still needed, as a general optimization for when to compact. This approach is complementary, it will always eliminate top-most compact when sinking regardless of selectivity, to avoid unnecessarily building a new chunk.

q0's compact call can be optimized via this complementary approach.

To implement this as a optimization requires a bit of refactoring to add should_compact as a plan-level attribute (or other suggestions?). A simple thing to do for now is just disable compact for top-most project, if sinking, since that's usually the most common case.

@st1page
Copy link
Contributor

st1page commented Mar 24, 2023

When sinking, we don't need to build new chunk, instead we build protobuf / json encoded chunk. We can delay the top-most compact call until here, saving cost of building a new chunk, and relying on protobuf / json building step to remove invisible rows.

strongly +1.

To implement this as a optimization requires a bit of refactoring to add should_compact as a plan-level attribute (or other suggestions?). A simple thing to do for now is just disable compact for top-most project, if sinking, since that's usually the most common case.

I think is not a plan-level attribute. Currently, we do compact the input chunk just to simplify the executor's implementation. But in fact, every executor should handle the visibility properly. e.g.

  • for Project, the executor should trade off between constructing a new chunk or the redundant row's computing(because our expression framework can not accept visibility)
  • for Agg, the executor should can ignore the invisible rows
  • for SinkExecutor, it should make sure the visibility can be properly handled by the SinkImpl as you say.

The question here is that we give the chunk's visibility to SinkExecutor which means it has the chance to do the optimization but it does not. So we need to do optimization in the SinkExecutor.

@lmatz
Copy link
Contributor

lmatz commented Mar 28, 2023

SCR-20230328-bqd

The peak throughput gets to 1M rows/s.
Luckily, the imbalanced source throughput problem didn't happen today.

I guess the left two things are:

  1. avoid the unnecessary compact
  2. optimize SourceStreamChunkRowWriter, or let's say have a customized code path for insert-only sources. 🤔 At least it can avoid the data type match for every Datum I suppose.

@lmatz
Copy link
Contributor

lmatz commented May 28, 2024

link #14815

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants