The query engine of RisingWave supports two types of queries: highly concurrent point queries and ad-hoc queries. The characteristics of these two different kinds of queries are summarized as follows:
Point Queries | Adhoc Queries | |
---|---|---|
Latency | several ms | ms to minutes |
QPS | 1000 ~ 100000 | 100 ~ 1000 |
SQL | Simple | Arbitrary complex |
Result Set | Small | Small, Medium, Large |
Use Scenarios | Dashboard | Adhoc analysis |
Our distributed query processing engine is designed for complex adhoc queries, and it can't meet the latency/QPS requirement of point queries, and in this article we introduce local execution mode for point queries.
Let's use the above SQL as an example:
The key changes from the distributed mode:
- The exchange executor will be executed directly by local query execution, not by distributed scheduler. This means that we no longer have async execution/monitoring, etc.
- The rpc is issued by exchange executor directly, not by scheduler.
Following is the plan and execution of above sql in local mode:
As explained above, the lookup join/exchange phase will be executed directly on frontend. The pushdown(filter/table, both the build and probe side) will be issued by executors rather than scheduler.
The overall process will be quite similar to distributed processing, but with a little difference:
- We only use heuristic optimizer for it, and only a limited set of rules will be applied.
- The scheduler will not be involved, and the physical plan is executed in the current thread (coroutine) immediately.
Local execution mode will not go through query management mentioned in batch query manager to reduce latency as much as possible.
As mentioned in the first paragraph, the main use case for local execution mode is determined(dashboard/reporting), so
currently we just expose a session configuration(query_mode
) to user. In future we may use optimizer to determined
it if required.
In the distributed mode we have several steps to execute a computing task and fetch results:
There are some problems with above process in local mode:
- We need at least two rpcs to fetch task execution result, this increases query overhead
- We have task lifecycle management APIs, this is unnecessary for local mode.
- We may need to add several new APIs for task monitoring/failure detection
For the local mode we will add a new rpc API:
rpc Execute(ExecuteRequest) returns (ExecuteResponse)
message ExecuteRequest {
batch_plan.PlanFragment plan = 2;
uint64 epoch = 3;
}
message ExecuteResponse {
common.Status status = 1;
data.DataChunk record_batch = 2;
}
This is quite similar to distributed execution APIs, but with some differences:
- Response is returned directly in rpc call, without the need of another call.
- No task lifecycle management/monitoring is involved, and if it fails, we just remove the task and return error in response directly.