Skip to content

Commit

Permalink
Use bounded executor for the DispatchManager executor
Browse files Browse the repository at this point in the history
* bounded thread pool for the DispatchManager's query executor

* inline, update the test-case

* use max to pick a higher value
  • Loading branch information
osscm authored Apr 15, 2024
1 parent b69cc47 commit 5e7e7ff
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import io.airlift.concurrent.BoundedExecutor;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
Expand Down Expand Up @@ -114,7 +115,7 @@ public DispatchManager(

this.maxQueryLength = queryManagerConfig.getMaxQueryLength();

this.dispatchExecutor = dispatchExecutor.getExecutor();
this.dispatchExecutor = new BoundedExecutor(dispatchExecutor.getExecutor(), queryManagerConfig.getDispatcherQueryPoolSize());

this.queryTracker = new QueryTracker<>(queryManagerConfig, dispatchExecutor.getScheduledExecutor());
this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.lang.Math.max;
import static java.lang.Math.round;
import static java.lang.Runtime.getRuntime;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand All @@ -55,10 +58,10 @@
})
public class QueryManagerConfig
{
public static final long AVAILABLE_HEAP_MEMORY = Runtime.getRuntime().maxMemory();
public static final long AVAILABLE_HEAP_MEMORY = getRuntime().maxMemory();
public static final int MAX_TASK_RETRY_ATTEMPTS = 126;
public static final int FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT_LIMIT = 1000;

public static final int DISPATCHER_THREADPOOL_MAX_SIZE = max(50, getRuntime().availableProcessors() * 10);
private int scheduleSplitBatchSize = 1000;
private int minScheduleSplitBatchSize = 100;
private int maxConcurrentQueries = 1000;
Expand Down Expand Up @@ -95,6 +98,7 @@ public class QueryManagerConfig
private Duration queryMaxCpuTime = new Duration(1_000_000_000, TimeUnit.DAYS);
private Optional<DataSize> queryMaxScanPhysicalBytes = Optional.empty();
private int queryReportedRuleStatsLimit = 10;
private int dispatcherQueryPoolSize = DISPATCHER_THREADPOOL_MAX_SIZE;

private int requiredWorkers = 1;
private Duration requiredWorkersMaxWait = new Duration(5, TimeUnit.MINUTES);
Expand Down Expand Up @@ -132,7 +136,7 @@ public class QueryManagerConfig

private DataSize faultTolerantExecutionStandardSplitSize = DataSize.of(64, MEGABYTE);
private int faultTolerantExecutionMaxTaskSplitCount = 256;
private DataSize faultTolerantExecutionTaskDescriptorStorageMaxMemory = DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.15));
private DataSize faultTolerantExecutionTaskDescriptorStorageMaxMemory = DataSize.ofBytes(round(AVAILABLE_HEAP_MEMORY * 0.15));
private int faultTolerantExecutionMaxPartitionCount = 50;
private int faultTolerantExecutionMinPartitionCount = 4;
private int faultTolerantExecutionMinPartitionCountForWrite = 50;
Expand Down Expand Up @@ -504,6 +508,19 @@ public QueryManagerConfig setQueryReportedRuleStatsLimit(int queryReportedRuleSt
return this;
}

@Min(1)
public int getDispatcherQueryPoolSize()
{
return dispatcherQueryPoolSize;
}

@Config("query.dispatcher-query-pool-size")
public QueryManagerConfig setDispatcherQueryPoolSize(int dispatcherQueryPoolSize)
{
this.dispatcherQueryPoolSize = dispatcherQueryPoolSize;
return this;
}

@Min(1)
public int getRemoteTaskMaxCallbackThreads()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.trino.execution.QueryManagerConfig.AVAILABLE_HEAP_MEMORY;
import static io.trino.execution.QueryManagerConfig.FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT_LIMIT;
import static java.lang.Math.max;
import static java.lang.Math.round;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MINUTES;
Expand Down Expand Up @@ -66,6 +68,7 @@ public void testDefaults()
.setQueryMaxPlanningTime(new Duration(10, MINUTES))
.setQueryMaxCpuTime(new Duration(1_000_000_000, DAYS))
.setQueryReportedRuleStatsLimit(10)
.setDispatcherQueryPoolSize(max(50, Runtime.getRuntime().availableProcessors() * 10))
.setQueryMaxScanPhysicalBytes(null)
.setRequiredWorkers(1)
.setRequiredWorkersMaxWait(new Duration(5, MINUTES))
Expand Down Expand Up @@ -96,7 +99,7 @@ public void testDefaults()
.setFaultTolerantExecutionHashDistributionWriteTaskTargetMaxCount(2000)
.setFaultTolerantExecutionStandardSplitSize(DataSize.of(64, MEGABYTE))
.setFaultTolerantExecutionMaxTaskSplitCount(256)
.setFaultTolerantExecutionTaskDescriptorStorageMaxMemory(DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.15)))
.setFaultTolerantExecutionTaskDescriptorStorageMaxMemory(DataSize.ofBytes(round(AVAILABLE_HEAP_MEMORY * 0.15)))
.setFaultTolerantExecutionMaxPartitionCount(50)
.setFaultTolerantExecutionMinPartitionCount(4)
.setFaultTolerantExecutionMinPartitionCountForWrite(50)
Expand Down Expand Up @@ -143,6 +146,7 @@ public void testExplicitPropertyMappings()
.put("query.max-planning-time", "1h")
.put("query.max-cpu-time", "2d")
.put("query.reported-rule-stats-limit", "50")
.put("query.dispatcher-query-pool-size", "151")
.put("query.max-scan-physical-bytes", "1kB")
.put("query-manager.required-workers", "333")
.put("query-manager.required-workers-max-wait", "33m")
Expand Down Expand Up @@ -217,6 +221,7 @@ public void testExplicitPropertyMappings()
.setQueryMaxPlanningTime(new Duration(1, HOURS))
.setQueryMaxCpuTime(new Duration(2, DAYS))
.setQueryReportedRuleStatsLimit(50)
.setDispatcherQueryPoolSize(151)
.setQueryMaxScanPhysicalBytes(DataSize.of(1, KILOBYTE))
.setRequiredWorkers(333)
.setRequiredWorkersMaxWait(new Duration(33, MINUTES))
Expand Down

0 comments on commit 5e7e7ff

Please sign in to comment.