-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Discussion: allow users to control backfilling throughput of different streams involved in one query #14559
Comments
Solution 3: This interface can also be used to plan join ordering #14140 It will be slightly more involved, so solution 2 can work in the meantime. We can support solution 2 first (solution 1 is already sort of supported, with a min throughput of 1). |
you mean right now RW already can query how many rows a table has been ingested and flow control the source operator of each upstream independently? |
For source yes, we currently support rate limit.
Then we can alter source rate limit for |
Oh I see, does it work for |
Yeah, those have associated rate limit as well. Which should be able to be altered via |
Great! I will write a doc about how it can mitigate OOM and boost performance. |
The backfilling order fundamentally depends on the functional dependency among the tables involved in the join, |
Discussed offline with @lmatz: Rate limit can't handle the case where there's lots of historical data, since upstream rate limit does not control downstream's table scan throughput, it only affects the upstream update side. We need to support modifying individual rate limit for each table scan to support this solution 1. For now, the only solution is to rate limit the entire stream job's backfill, rather than individual table scans. In terms of implementation, there's a few parts:
|
v1.6.0 Fact Table Last: https://buildkite.com/risingwave-test/tpch-benchmark/builds/980#018d35bc-85d8-4e1a-898b-39aa62d88960
|
What do the numbers mean? Throuhgput? Or duration |
throughput, I modified the original table The observation is that so fact table last can still be a good heuristic, but I think we'd better still give users the control of orderings in the near term |
This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned. |
Increased priority because there's multiple usecases which have emerged.
|
The Issue
Before memtable spilling and other backpressure-base mechanisms are implemented, OOM sometimes occurs in a join query during the stage of backfilling and OOM rarely occurs after backfilling.
It is suspected that the data ingestion patterns during and after backfilling make such a difference.
For example, during normal processing, it is almost always the case that the dimension table does not change too often while the fact table changes constantly.
case (1) A row from the fact table can produce at most one row in join, we don't expect that a large amount of intermediate results will be generated in the join operator.
case (2) A row from the dimension table can potentially produce a huge amount of intermediate results. Even worse, these results may be further amplified in the following joins if it is the multi-join query.
However, during backfilling, we ingest both fact tables and dimension tables as quickly as possible. We can expect the total bandwidth for source ingestion to be evenly split among all the tables. Sometimes, there could be multiple dimension tables. So the source throughput of the fact table is only 1/N. Therefore, we expect case (2) to happen a lot at this phase.
After backfilling, we expect that the fact table ingests the majority of data. Therefore, we expect that there are much more case (1) than case (2).
Why It's Worth Addressing
Although memtable spilling together with other backpressure-based mechanisms should have alleviated the problem a lot,
it is still beneficial that we try to avoid causing the system into a stressful state and rely on mechanisms that act as the last resort to prevent OOM.
Besides, these mechanisms may cost performance a lot if they are triggered often.
There is some interesting discussion of the topic
metastable failure
reference: https://www.usenix.org/conference/osdi22/presentation/huang-lexiang
My take from this is that when the system is stuck in an extremely stressful state, the mechanism that used to work as expected under a normal state will fail or even fire backward.
Although no direct evidence shows that RW would trigger such kind of failures, it is better if we can avoid RW going into a stressful state and prevent such failures by small efforts.
Potential Solutions
Suppose we have a multi-join query that involves all the tables below:
we tend to ingest the data from those tables that have the smallest in-degrees, e.g.
region
andpart
in this case. After they have been ingested, we remove them from the graph and continue looking for the ones that have the smallest in-degrees.Of course, the real-world query and dependency between tables may not be this clear to RW,
and RW, as a system, is in no position to learn too much of this kind of information automatically from users' queries.
Therefore, it would better give control to users to determine the orderings.
Solution 1: Manual Flow Control of Individual Source
It only requires a primitive that RW has already had: flow control.
RW has
SET RW_STREAMING_RATE_LIMIT to XXX
already. By setting it to 0, it makes the source throughput of the next materialized view to be 0.Then, users manually turn on the flow of each source. We need a way to control the throughput of each individual source operator involved in a query.
Users monitor how many rows have been ingested by querying RW. Either an RW catalog or a state table of sources can serve this functionality.
If one source is done, e.g. the number of ingested rows has passed a certain threshold (totally up to users), then the user can move on to the next source.
Since this is an advanced functionality that we don't expect to use frequently, the ease of use could be compromised.
Solution 2: Custom Backfilling Orders
Add a
with
option tocreate materialized view
, e.g.It means that RW must ingest 10K rows from t4 before it can ingest from t1.
t2 and t3 do not appear, which means their order does not matter. They will be ingested together after RW ingested 25000 rows from t1.
Comments
Both actions are triggered based on the number of messages input by users. This is because RW may not be able to know the size of the upstream, e.g. when it is a source or a table that is still backfilling.
Both of them are sort of an enhanced version of
SET RW_STREAMING_RATE_LIMIT to XXX
as they incorporate domain knowledge from users.The benefit is more stability without compromising the performance.
The text was updated successfully, but these errors were encountered: