Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into updateTaskPending…
Browse files Browse the repository at this point in the history
…SegmentMapping
  • Loading branch information
AmatyaAvadhanula committed Oct 10, 2023
2 parents 7274912 + 23605c1 commit 0dbbc1e
Show file tree
Hide file tree
Showing 159 changed files with 15,207 additions and 1,959 deletions.
13 changes: 11 additions & 2 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1862,7 +1862,7 @@ This strategy can be enabled by setting `druid.query.scheduler.laning.strategy=h

##### Guardrails for materialization of subqueries
Druid stores the subquery rows in temporary tables that live in the Java heap. It is a good practice to avoid large subqueries in Druid.
Therefore there are guardrails that are built in Druid to prevent the queries from generating subquery results which can exhaust the heap
Therefore, there are guardrails that are built in Druid to prevent the queries from generating subquery results which can exhaust the heap
space. They can be set on a cluster level or modified per query level as desired.
Note the following guardrails that can be set by the cluster admin to limit the subquery results:

Expand All @@ -1871,6 +1871,15 @@ Note the following guardrails that can be set by the cluster admin to limit the

Note that limiting the subquery by bytes is a newer feature therefore it is experimental as it materializes the results differently.

`maxSubqueryBytes` can be configured to the following values:
1. 'disabled' - It is the default setting out of the box. It disables the subquery's from the byte based limit, and effectively disables this feature.
2. 'auto' - Druid automatically decides the optimal byte based limit based upon the heap space available and the max number of concurrent queries.
3. A positive long value - User can manually specify the number of bytes that the results of the subqueries of a single query can occupy on the heap.

Due to the conversion between the Java objects and the Frame's format, setting `maxSubqueryBytes` can become slow if the subquery starts generating
rows in the order of magnitude of around 10 million and above. In those scenarios, disable the `maxSubqueryBytes` settings for such queries, assess the
number of rows that the subqueries generate and override the `maxSubqueryRows` to appropriate value.

If you choose to modify or set any of the above limits, you must also think about the heap size of all Brokers, Historicals, and task Peons that process data for the subqueries to accommodate the subquery results.
There is no formula to calculate the correct value. Trial and error is the best approach.

Expand All @@ -1895,7 +1904,7 @@ Druid uses Jetty to serve HTTP requests. Each query being processed consumes a s
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|
|`druid.server.http.maxScatterGatherBytes`|Maximum number of bytes gathered from data processes such as Historicals and realtime processes to execute a query. Queries that exceed this limit will fail. This is an advance configuration that allows to protect in case Broker is under heavy load and not utilizing the data gathered in memory fast enough and leading to OOMs. This limit can be further reduced at query time using `maxScatterGatherBytes` in the context. Note that having large limit is not necessarily bad if broker is never under heavy concurrent load in which case data gathered is processed quickly and freeing up the memory used. Human-readable format is supported, see [here](human-readable-byte.md). |Long.MAX_VALUE|
|`druid.server.http.maxSubqueryRows`|Maximum number of rows from all subqueries per query. Druid stores the subquery rows in temporary tables that live in the Java heap. `druid.server.http.maxSubqueryRows` is a guardrail to prevent the system from exhausting available heap. When a subquery exceeds the row limit, Druid throws a resource limit exceeded exception: "Subquery generated results beyond maximum."<br /><br />It is a good practice to avoid large subqueries in Druid. However, if you choose to raise the subquery row limit, you must also increase the heap size of all Brokers, Historicals, and task Peons that process data for the subqueries to accommodate the subquery results.<br /><br />There is no formula to calculate the correct value. Trial and error is the best approach.|100000|
|`druid.server.http.maxSubqueryBytes`|Maximum number of bytes from all subqueries per query. Since the results are stored on the Java heap, `druid.server.http.maxSubqueryBytes` is a guardrail like `druid.server.http.maxSubqueryRows` to prevent the heap space from exhausting. When a subquery exceeds the byte limit, Druid throws a resource limit exceeded exception. A negative value for the guardrail indicates that Druid won't guardrail by memory. Check the docs for `druid.server.http.maxSubqueryRows` to see how to set the optimal value for a cluster. This is an experimental feature for now as this materializes the results in a different format.|-1|
|`druid.server.http.maxSubqueryBytes`|Maximum number of bytes from all subqueries per query. Since the results are stored on the Java heap, `druid.server.http.maxSubqueryBytes` is a guardrail like `druid.server.http.maxSubqueryRows` to prevent the heap space from exhausting. When a subquery exceeds the byte limit, Druid throws a resource limit exceeded exception. A negative value for the guardrail indicates that Druid won't guardrail by memory. This can be set to 'disabled' which disables the results from being limited via the byte limit, 'auto' which sets this value automatically taking free heap space into account, or a positive long value depicting the number of bytes per query's subqueries' results can occupy. This is an experimental feature for now as this materializes the results in a different format.|'disabled'|
|`druid.server.http.gracefulShutdownTimeout`|The maximum amount of time Jetty waits after receiving shutdown signal. After this timeout the threads will be forcefully shutdown. This allows any queries that are executing to complete(Only values greater than zero are valid).|`PT30S`|
|`druid.server.http.unannouncePropagationDelay`|How long to wait for ZooKeeper unannouncements to propagate before shutting down Jetty. This is a minimum and `druid.server.http.gracefulShutdownTimeout` does not start counting down until after this period elapses.|`PT0S` (do not wait)|
|`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](../querying/query-context.md) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE|
Expand Down
2 changes: 2 additions & 0 deletions docs/development/extensions-core/lookups-cached-global.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
|`filter`|The filter to use when selecting lookups, this is used to create a where clause on lookup population|No|No Filter|
|`tsColumn`| The column in `table` which contains when the key was updated|No|Not used|
|`pollPeriod`|How often to poll the DB|No|0 (only once)|
|`jitterSeconds`| How much jitter to add (in seconds) up to maximum as a delay (actual value will be used as random from 0 to `jitterSeconds`), used to distribute db load more evenly|No|0|
|`maxHeapPercentage`|The maximum percentage of heap size that the lookup should consume. If the lookup grows beyond this size, warning messages will be logged in the respective service logs.|No|10% of JVM heap size|

```json
Expand All @@ -367,6 +368,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
"valueColumn":"the_new_dim_value",
"tsColumn":"timestamp_column",
"pollPeriod":600000,
"jitterSeconds": 120,
"maxHeapPercentage": 10
}
```
Expand Down
4 changes: 1 addition & 3 deletions docs/multi-stage-query/known-issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ an [UnknownError](./reference.md#error_UnknownError) with a message including "N

## `SELECT` Statement

- `SELECT` from a Druid datasource does not include unpublished real-time data.

- `GROUPING SETS` and `UNION ALL` are not implemented. Queries using these features return a
- `GROUPING SETS` are not implemented. Queries using these features return a
[QueryNotSupported](reference.md#error_QueryNotSupported) error.

- For some `COUNT DISTINCT` queries, you'll encounter a [QueryNotSupported](reference.md#error_QueryNotSupported) error
Expand Down
1 change: 1 addition & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ The following table lists the context parameters for the MSQ task engine:
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |
| `selectDestination` | SELECT<br /><br /> Controls where the final result of the select query is written. <br />Use `taskReport`(the default) to write select results to the task report. <b> This is not scalable since task reports size explodes for large results </b> <br/>Use `durableStorage` to write results to durable storage location. <b>For large results sets, its recommended to use `durableStorage` </b>. To configure durable storage see [`this`](#durable-storage) section. | `taskReport` |
| `waitTillSegmentsLoad` | INSERT, REPLACE<br /><br /> If set, the ingest query waits for the generated segment to be loaded before exiting, else the ingest query exits without waiting. The task and live reports contain the information about the status of loading segments if this flag is set. This will ensure that any future queries made after the ingestion exits will include results from the ingestion. The drawback is that the controller task will stall till the segments are loaded. | `false` |
| `includeSegmentSource` | SELECT, INSERT, REPLACE<br /><br /> Controls the sources, which will be queried for results in addition to the segments present on deep storage. Can be `NONE` or `REALTIME`. If this value is `NONE`, only non-realtime (published and used) segments will be downloaded from deep storage. If this value is `REALTIME`, results will also be included from realtime tasks. | `NONE` |

## Joins

Expand Down
2 changes: 2 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`query/segments/count`|This metric is not enabled by default. See the `QueryMetrics` Interface for reference regarding enabling this metric. Number of segments that will be touched by the query. In the broker, it makes a plan to distribute the query to realtime tasks and historicals based on a snapshot of segment distribution state. If there are some segments moved after this snapshot is created, certain historicals and realtime tasks can report those segments as missing to the broker. The broker will resend the query to the new servers that serve those segments after move. In this case, those segments can be counted more than once in this metric.||Varies|
|`query/priority`|Assigned lane and priority, only if Laning strategy is enabled. Refer to [Laning strategies](../configuration/index.md#laning-strategies)|`lane`, `dataSource`, `type`|0|
|`sqlQuery/time`|Milliseconds taken to complete a SQL query.|`id`, `nativeQueryIds`, `dataSource`, `remoteAddress`, `success`, `engine`|< 1s|
Expand Down Expand Up @@ -97,6 +98,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.||

### Real-time

Expand Down
2 changes: 2 additions & 0 deletions docs/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ Unless otherwise noted, the following parameters apply to all query types.
|`finalize` | `N/A` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator returns the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|`maxScatterGatherBytes`| `druid.server.http.maxScatterGatherBytes` | Maximum number of bytes gathered from data processes such as Historicals and realtime processes to execute a query. This parameter can be used to further reduce `maxScatterGatherBytes` limit at query time. See [Broker configuration](../configuration/index.md#broker) for more details.|
|`maxQueuedBytes` | `druid.broker.http.maxQueuedBytes` | Maximum number of bytes queued per query before exerting backpressure on the channel to the data server. Similar to `maxScatterGatherBytes`, except unlike that configuration, this one will trigger backpressure rather than query failure. Zero means disabled.|
|`maxSubqueryRows`| `druid.server.http.maxSubqueryRows` | Upper limit on the number of rows a subquery can generate. See [Broker configuration](../configuration/index.md#broker) and [subquery guardrails](../configuration/index.md#Guardrails for materialization of subqueries) for more details.|
|`maxSubqueryBytes`| `druid.server.http.maxSubqueryBytes` | Upper limit on the number of bytes a subquery can generate. See [Broker configuration](../configuration/index.md#broker) and [subquery guardrails](../configuration/index.md#Guardrails for materialization of subqueries) for more details.|
|`serializeDateTimeAsLong`| `false` | If true, DateTime is serialized as long in the result returned by Broker and the data transportation between Broker and compute process|
|`serializeDateTimeAsLongInner`| `false` | If true, DateTime is serialized as long in the data transportation between Broker and compute process|
|`enableParallelMerge`|`true`|Enable parallel result merging on the Broker. Note that `druid.processing.merge.useParallelMergePool` must be enabled for this setting to be set to `true`. See [Broker configuration](../configuration/index.md#broker) for more details.|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
Expand All @@ -36,12 +34,12 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -71,12 +69,10 @@ public SqlAggFunction calciteFunction()
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
Project project,
InputAccessor inputAccessor,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
Expand All @@ -88,13 +84,8 @@ public Aggregation toDruidAggregation(
// fetch sum column expression
DruidExpression sumColumn = Expressions.toDruidExpression(
plannerContext,
rowSignature,
Expressions.fromFieldAccess(
rexBuilder.getTypeFactory(),
rowSignature,
project,
aggregateCall.getArgList().get(0)
)
inputAccessor.getInputRowSignature(),
inputAccessor.getField(aggregateCall.getArgList().get(0))
);

if (sumColumn == null) {
Expand All @@ -114,12 +105,7 @@ public Aggregation toDruidAggregation(
Integer size = null;

if (aggregateCall.getArgList().size() >= 2) {
RexNode sizeArg = Expressions.fromFieldAccess(
rexBuilder.getTypeFactory(),
rowSignature,
project,
aggregateCall.getArgList().get(1)
);
RexNode sizeArg = inputAccessor.getField(aggregateCall.getArgList().get(1));

size = ((Number) RexLiteral.value(sizeArg)).intValue();
}
Expand All @@ -128,25 +114,15 @@ public Aggregation toDruidAggregation(
Integer scale = null;

if (aggregateCall.getArgList().size() >= 3) {
RexNode scaleArg = Expressions.fromFieldAccess(
rexBuilder.getTypeFactory(),
rowSignature,
project,
aggregateCall.getArgList().get(2)
);
RexNode scaleArg = inputAccessor.getField(aggregateCall.getArgList().get(2));

scale = ((Number) RexLiteral.value(scaleArg)).intValue();
}

Boolean useStrictNumberParsing = null;

if (aggregateCall.getArgList().size() >= 4) {
RexNode useStrictNumberParsingArg = Expressions.fromFieldAccess(
rexBuilder.getTypeFactory(),
rowSignature,
project,
aggregateCall.getArgList().get(3)
);
RexNode useStrictNumberParsingArg = inputAccessor.getField(aggregateCall.getArgList().get(3));

useStrictNumberParsing = RexLiteral.booleanValue(useStrictNumberParsingArg);
}
Expand Down
Loading

0 comments on commit 0dbbc1e

Please sign in to comment.