Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc: add doc on rate limit and backpressure #16450

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions docs/rate-limit-and-backpressure.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Rate Limit and Backpressure

Here we compare 2 concepts in RisingWave used to keep barrier latency low.

## Why keep Barrier Latency Low?

1. Commands such as `Drop, Cancel, Alter` depend on barriers.
When barrier latency is high, it also means these commands take a long time to run.* [1]
2. High Barrier Latency means Low Data Freshness. User gets stale results.

[1] This could potentially be resolved by decoupling `Drop, Cancel`: https://github.com/risingwavelabs/risingwave/issues/13396

## Why can Barrier Latency be High?

The data between barriers takes a long time to process.
This can be due to high amplification, UDF latency, or other reasons.

By default, Barriers have an interval of 1s.
If records between those barrier take longer than 256ms to process, the barrier latency will increase by that difference.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate more on why 256ms? It seems to have few context here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, didn't get it


## How does Backpressure affect Barrier Latency?

### A simple scenario without channels

In our system, we have Actors which run various parts of a stream job.

Let's consider a cluster with a single stream job, where this job has just 2 actors:
```text
Backfill (A) -> Stream Materialize (B)
```

If B is busy, it just won't request new Stream Messages (Barriers and StreamChunks) from A.
This is called Backpressure. You can see that A's stream will be backpressured by B, i.e. no longer progress, until B is ready again.

Now consider if B had the following sequence of events:
```text
1. Scheduled Barrier X (<1ms processing time)
2. StreamChunk (250ms processing time)
3. Scheduled Barrier Y (<1ms processing time)
```

The barrier latency for Y will be 0ms, since our barrier interval is 256ms,
and the time between X and Y is less than 256ms.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this example assume barrier_interval to be 256ms? Why not the default value 1000ms?


Consider another case where B has the following sequence of events:
```text
1. Scheduled Barrier X (<1ms processing time)
2. StreamChunk (256ms processing time)
3. StreamChunk (250ms processing time)
4. Scheduled Barrier Y (<1ms processing time)
```

Now barrier latency will be around 250ms.

In the case above, where we have Backfill and Stream Materialize, this is unlikely to occur however.
Backfill should always let upstream side (barriers) bypass, so we should see stream chunk (3) only after (4).

### Adding Channels to the equation

Now consider a case where we have `join`.

```
StreamScan A -- HashJoin X
/
/
StreamScan B
```

Now both A and B will have a channel between themselves and X.
The channel is there to provide a buffer between actors, since A and B are not directly connected to X, they could be on different machines.
Hence providing a buffer isolates X from network latency of getting Stream Messages from A and B.

If X is busy, it will first backpressure the channel.
However, the channel will not immediately backpressure A and B.
It will only do so once it is full.

This means that the stream chunks between barriers are equivalent to channel capacity.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we process streaming data in batches so there have to be some records buffered in each executor (typically within the chunk builder as the local variables). I suppose they also act as channels in this sense.

And then the barrier latency is the duration taken to process these barriers.

Besides channels, some executors may also buffer chunks, for instance within the ChunkBuilder.
Barrier latency also has to consider these.

_example.toml_
```toml
[streaming.developer]
stream_exchange_initial_permits = 2048
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
stream_exchange_initial_permits = 2048
# The initial permits that a channel holds, i.e., the maximum row count can be buffered in the channel.
stream_exchange_initial_permits = 2048

```

### Considering Amplification and UDF latency

When there's high amplification or UDF latency in some downstream actor, upstream will naturally be backpressured.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Amplification" is not a widely-used term, although we are all know it. Let's add some introduction of it.

Amplification refers to a common phenomenon in streaming. For example, consider the following Join:

   SELECT * FROM users JOIN orders ON orders.user_id = users.id

We know that each user often has multiple orders, and a minority of users may have thousands of orders. If a field in the user table is updated, then reflected in the results of the above JOIN query, there could be thousands of changed rows. Such amplification can have unexpected performance impacts on streaming jobs, and therefore requires special attention.

Overtime, a stream graph will hit its max capacity (all actor channels are full).

The barrier latency is then proportional to the time it takes to process stream chunks between barriers.

This is basically the number of chunks in a channel, multiplied by the time taken to process the join for it, or the UDF call latency for it.

## What is Rate Limit

Rate Limit simply limits the rate at which records are processed.
It will only rate limit snapshot read / source read side of an actor.
This means that barriers are not rate limited.

## Rate Limit to keep barrier latency low

### Backfill

In a newly created MV, it will undergo Backfilling, to fill the MV with historical data and fresh data concurrently.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After creating new jobs we'll naturally have more channels to fill in order to make back-pressure take effects. This might also be a reason why we get temporary increase in barrier latency.

When the MV has joins or UDF calls with high latency, the barrier latency can spike.

After creating new jobs we'll naturally have more channels to fill in order to make back-pressure take effects.
This might also be a reason why we get temporary increase in barrier latency.

In such a case, we can rate limit the Backfill actor's snapshot read side to keep barrier latency low.

Caveat: This will slow down the backfill process, but it will keep barrier latency low.

### Source

Another case is to rate limit the source executor, if it's causing high barrier latency due to similar reasons as above (some downstream is the bottleneck).

Caveat: It could potentially lead to loss in data freshness.

Example case:
Source has 1-100 row/s throughput, average throughput of 10 row/s, downstream has max processing rate of 10 row/s.
We rate limit source to 10 row/s. Otherwise when we hit 100 row/s,
upstream source will read that into the channel,
barrier latency will spike as downstream can't keep up.
Loading