diff --git a/docs/configuration/index.md b/docs/configuration/index.md index c5988993e424..9b6fba9ee632 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1862,7 +1862,7 @@ This strategy can be enabled by setting `druid.query.scheduler.laning.strategy=h ##### Guardrails for materialization of subqueries Druid stores the subquery rows in temporary tables that live in the Java heap. It is a good practice to avoid large subqueries in Druid. -Therefore there are guardrails that are built in Druid to prevent the queries from generating subquery results which can exhaust the heap +Therefore, there are guardrails that are built in Druid to prevent the queries from generating subquery results which can exhaust the heap space. They can be set on a cluster level or modified per query level as desired. Note the following guardrails that can be set by the cluster admin to limit the subquery results: @@ -1871,6 +1871,15 @@ Note the following guardrails that can be set by the cluster admin to limit the Note that limiting the subquery by bytes is a newer feature therefore it is experimental as it materializes the results differently. +`maxSubqueryBytes` can be configured to the following values: +1. 'disabled' - It is the default setting out of the box. It disables the subquery's from the byte based limit, and effectively disables this feature. +2. 'auto' - Druid automatically decides the optimal byte based limit based upon the heap space available and the max number of concurrent queries. +3. A positive long value - User can manually specify the number of bytes that the results of the subqueries of a single query can occupy on the heap. + +Due to the conversion between the Java objects and the Frame's format, setting `maxSubqueryBytes` can become slow if the subquery starts generating +rows in the order of magnitude of around 10 million and above. In those scenarios, disable the `maxSubqueryBytes` settings for such queries, assess the +number of rows that the subqueries generate and override the `maxSubqueryRows` to appropriate value. + If you choose to modify or set any of the above limits, you must also think about the heap size of all Brokers, Historicals, and task Peons that process data for the subqueries to accommodate the subquery results. There is no formula to calculate the correct value. Trial and error is the best approach. @@ -1895,7 +1904,7 @@ Druid uses Jetty to serve HTTP requests. Each query being processed consumes a s |`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000| |`druid.server.http.maxScatterGatherBytes`|Maximum number of bytes gathered from data processes such as Historicals and realtime processes to execute a query. Queries that exceed this limit will fail. This is an advance configuration that allows to protect in case Broker is under heavy load and not utilizing the data gathered in memory fast enough and leading to OOMs. This limit can be further reduced at query time using `maxScatterGatherBytes` in the context. Note that having large limit is not necessarily bad if broker is never under heavy concurrent load in which case data gathered is processed quickly and freeing up the memory used. Human-readable format is supported, see [here](human-readable-byte.md). |Long.MAX_VALUE| |`druid.server.http.maxSubqueryRows`|Maximum number of rows from all subqueries per query. Druid stores the subquery rows in temporary tables that live in the Java heap. `druid.server.http.maxSubqueryRows` is a guardrail to prevent the system from exhausting available heap. When a subquery exceeds the row limit, Druid throws a resource limit exceeded exception: "Subquery generated results beyond maximum."

It is a good practice to avoid large subqueries in Druid. However, if you choose to raise the subquery row limit, you must also increase the heap size of all Brokers, Historicals, and task Peons that process data for the subqueries to accommodate the subquery results.

There is no formula to calculate the correct value. Trial and error is the best approach.|100000| -|`druid.server.http.maxSubqueryBytes`|Maximum number of bytes from all subqueries per query. Since the results are stored on the Java heap, `druid.server.http.maxSubqueryBytes` is a guardrail like `druid.server.http.maxSubqueryRows` to prevent the heap space from exhausting. When a subquery exceeds the byte limit, Druid throws a resource limit exceeded exception. A negative value for the guardrail indicates that Druid won't guardrail by memory. Check the docs for `druid.server.http.maxSubqueryRows` to see how to set the optimal value for a cluster. This is an experimental feature for now as this materializes the results in a different format.|-1| +|`druid.server.http.maxSubqueryBytes`|Maximum number of bytes from all subqueries per query. Since the results are stored on the Java heap, `druid.server.http.maxSubqueryBytes` is a guardrail like `druid.server.http.maxSubqueryRows` to prevent the heap space from exhausting. When a subquery exceeds the byte limit, Druid throws a resource limit exceeded exception. A negative value for the guardrail indicates that Druid won't guardrail by memory. This can be set to 'disabled' which disables the results from being limited via the byte limit, 'auto' which sets this value automatically taking free heap space into account, or a positive long value depicting the number of bytes per query's subqueries' results can occupy. This is an experimental feature for now as this materializes the results in a different format.|'disabled'| |`druid.server.http.gracefulShutdownTimeout`|The maximum amount of time Jetty waits after receiving shutdown signal. After this timeout the threads will be forcefully shutdown. This allows any queries that are executing to complete(Only values greater than zero are valid).|`PT30S`| |`druid.server.http.unannouncePropagationDelay`|How long to wait for ZooKeeper unannouncements to propagate before shutting down Jetty. This is a minimum and `druid.server.http.gracefulShutdownTimeout` does not start counting down until after this period elapses.|`PT0S` (do not wait)| |`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](../querying/query-context.md) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE| diff --git a/docs/multi-stage-query/known-issues.md b/docs/multi-stage-query/known-issues.md index 62a31ecf41af..570a7f58fa4e 100644 --- a/docs/multi-stage-query/known-issues.md +++ b/docs/multi-stage-query/known-issues.md @@ -39,7 +39,7 @@ an [UnknownError](./reference.md#error_UnknownError) with a message including "N ## `SELECT` Statement -- `GROUPING SETS` and `UNION ALL` are not implemented. Queries using these features return a +- `GROUPING SETS` are not implemented. Queries using these features return a [QueryNotSupported](reference.md#error_QueryNotSupported) error. - For some `COUNT DISTINCT` queries, you'll encounter a [QueryNotSupported](reference.md#error_QueryNotSupported) error diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index 326753970fbb..1ac3af1127c4 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -52,6 +52,8 @@ Unless otherwise noted, the following parameters apply to all query types. |`finalize` | `N/A` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator returns the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | |`maxScatterGatherBytes`| `druid.server.http.maxScatterGatherBytes` | Maximum number of bytes gathered from data processes such as Historicals and realtime processes to execute a query. This parameter can be used to further reduce `maxScatterGatherBytes` limit at query time. See [Broker configuration](../configuration/index.md#broker) for more details.| |`maxQueuedBytes` | `druid.broker.http.maxQueuedBytes` | Maximum number of bytes queued per query before exerting backpressure on the channel to the data server. Similar to `maxScatterGatherBytes`, except unlike that configuration, this one will trigger backpressure rather than query failure. Zero means disabled.| +|`maxSubqueryRows`| `druid.server.http.maxSubqueryRows` | Upper limit on the number of rows a subquery can generate. See [Broker configuration](../configuration/index.md#broker) and [subquery guardrails](../configuration/index.md#Guardrails for materialization of subqueries) for more details.| +|`maxSubqueryBytes`| `druid.server.http.maxSubqueryBytes` | Upper limit on the number of bytes a subquery can generate. See [Broker configuration](../configuration/index.md#broker) and [subquery guardrails](../configuration/index.md#Guardrails for materialization of subqueries) for more details.| |`serializeDateTimeAsLong`| `false` | If true, DateTime is serialized as long in the result returned by Broker and the data transportation between Broker and compute process| |`serializeDateTimeAsLongInner`| `false` | If true, DateTime is serialized as long in the data transportation between Broker and compute process| |`enableParallelMerge`|`true`|Enable parallel result merging on the Broker. Note that `druid.processing.merge.useParallelMergePool` must be enabled for this setting to be set to `true`. See [Broker configuration](../configuration/index.md#broker) for more details.| diff --git a/processing/src/test/java/org/apache/druid/query/QueryContextTest.java b/processing/src/test/java/org/apache/druid/query/QueryContextTest.java index 54acab0a3f87..71b477d16c37 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryContextTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueryContextTest.java @@ -339,7 +339,7 @@ public void testGetMaxSubqueryBytes() assertEquals("auto", context2.getMaxSubqueryMemoryBytes(null)); final QueryContext context3 = new QueryContext(ImmutableMap.of()); - assertEquals("unlimited", context3.getMaxSubqueryMemoryBytes("unlimited")); + assertEquals("disabled", context3.getMaxSubqueryMemoryBytes("disabled")); } @Test diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalkerUtils.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalkerUtils.java index f185a5a53266..6667cd961129 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalkerUtils.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalkerUtils.java @@ -19,14 +19,42 @@ package org.apache.druid.server; +/** + * Utilities for {@link ClientQuerySegmentWalker} + */ public class ClientQuerySegmentWalkerUtils { + + /** + * Guardrail type on the subquery's results + */ public enum SubqueryResultLimit { + /** + * Subqueries limited by the ROW_LIMIT are materialized and kept as arrays (native java objects) on heap. The + * walker ensures that the cumulative number of rows of the results of subqueries of the given query donot exceed + * the limit specified in the context or as the server default + */ ROW_LIMIT, + + /** + * Subqueries limited by the BYTE_LIMIT are materialized as {@link org.apache.druid.frame.Frame}s on heap. Frames + * depict the byte representation of the subquery results and hence the space consumed by the frames can be trivially + * fetched. The walker ensures that the cumulative number of rows of the results of subqueries (materialized as + * Frames in the broker memory) of a given query do not exceed the limit specified in the context or as the server + * default + */ MEMORY_LIMIT } + /** + * Returns the limit type to be used for a given subquery. + * It returns MEMORY_LIMIT only if: + * 1. The user has enabled the 'maxSubqueryBytes' explicitly in the query context or as the server default + * 2. All the other subqueries in the query so far didn't fall back to ROW_BASED limit due to an error while + * executing the query + * In all the other cases, it returns ROW_LIMIT + */ public static SubqueryResultLimit getLimitType(long memoryLimitBytes, boolean cannotMaterializeToFrames) { if (cannotMaterializeToFrames) { diff --git a/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelper.java b/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelper.java index 541f44744810..88845ef955ea 100644 --- a/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelper.java +++ b/server/src/main/java/org/apache/druid/server/SubqueryGuardrailHelper.java @@ -38,10 +38,10 @@ public class SubqueryGuardrailHelper private static final Logger log = new Logger(SubqueryGuardrailHelper.class); - public static final String UNLIMITED_LIMIT_VALUE = "unlimited"; + public static final String LIMIT_DISABLED_VALUE = "disabled"; public static final String AUTO_LIMIT_VALUE = "auto"; - public static final Long UNLIMITED_LIMIT_REPRESENTATION = -1L; + public static final Long LIMIT_DISABLED_REPRESENTATION = -1L; private final long autoLimitBytes; @@ -70,8 +70,8 @@ public SubqueryGuardrailHelper( public long convertSubqueryLimitStringToLong(final String maxSubqueryLimit) { - if (UNLIMITED_LIMIT_VALUE.equalsIgnoreCase(maxSubqueryLimit)) { - return UNLIMITED_LIMIT_REPRESENTATION; + if (LIMIT_DISABLED_VALUE.equalsIgnoreCase(maxSubqueryLimit)) { + return LIMIT_DISABLED_REPRESENTATION; } if (AUTO_LIMIT_VALUE.equalsIgnoreCase(maxSubqueryLimit)) { return autoLimitBytes; @@ -85,7 +85,7 @@ public long convertSubqueryLimitStringToLong(final String maxSubqueryLimit) throw InvalidInput.exception( e, "Unable to parse the provided maxSubqueryLimit [%s] to a valid number. Valid values for the " - + "maxSubqueryLimits can be 'auto', 'unlimited' or a positive number representing bytes to reserve.", + + "maxSubqueryLimits can be 'auto', 'disabled' or a positive integer representing bytes to reserve.", maxSubqueryLimit ); } diff --git a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java index 04366d30dcdb..7be35469be86 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java @@ -26,6 +26,7 @@ import org.apache.druid.common.exception.NoErrorResponseTransformStrategy; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.HumanReadableBytesRange; +import org.apache.druid.server.SubqueryGuardrailHelper; import org.apache.druid.utils.JvmUtils; import org.joda.time.Period; @@ -44,7 +45,6 @@ public class ServerConfig { public static final int DEFAULT_GZIP_INFLATE_BUFFER_SIZE = 4096; - public static final String DEFAULT_MAX_SUBQUERY_BYTES = "unlimited"; private static final boolean DEFAULT_USE_NESTED_FOR_UNKNOWN_TYPE_IN_SUBQUERY = false; @@ -140,7 +140,7 @@ public ServerConfig(boolean enableQueryRequestsQueuing) private int maxSubqueryRows = 100000; @JsonProperty - private String maxSubqueryBytes = DEFAULT_MAX_SUBQUERY_BYTES; + private String maxSubqueryBytes = SubqueryGuardrailHelper.LIMIT_DISABLED_VALUE; @JsonProperty private boolean useNestedForUnknownTypeInSubquery = DEFAULT_USE_NESTED_FOR_UNKNOWN_TYPE_IN_SUBQUERY;