Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update MULTITHREADED shuffle maxBytesInFlight default to 128MB #9344

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Name | Description | Default Value | Applicable at
<a name="python.memory.gpu.pooling.enabled"></a>spark.rapids.python.memory.gpu.pooling.enabled|Should RMM in Python workers act as a pooling allocator for GPU memory, or should it just pass through to CUDA memory allocation directly. When not specified, It will honor the value of config 'spark.rapids.memory.gpu.pooling.enabled'|None|Runtime
<a name="shuffle.enabled"></a>spark.rapids.shuffle.enabled|Enable or disable the RAPIDS Shuffle Manager at runtime. The [RAPIDS Shuffle Manager](rapids-shuffle.md) must already be configured. When set to `false`, the built-in Spark shuffle will be used. |true|Runtime
<a name="shuffle.mode"></a>spark.rapids.shuffle.mode|RAPIDS Shuffle Manager mode. "MULTITHREADED": shuffle file writes and reads are parallelized using a thread pool. "UCX": (requires UCX installation) uses accelerated transports for transferring shuffle blocks. "CACHE_ONLY": use when running a single executor, for short-circuit cached shuffle (for testing purposes).|MULTITHREADED|Startup
<a name="shuffle.multiThreaded.maxBytesInFlight"></a>spark.rapids.shuffle.multiThreaded.maxBytesInFlight|The size limit, in bytes, that the RAPIDS shuffle manager configured in "MULTITHREADED" mode will allow to be deserialized concurrently per task. This is also the maximum amount of memory that will be used per task. This should ideally be at least the same size as the batch size so we don't have to wait to process a single batch.|2147483647|Startup
<a name="shuffle.multiThreaded.maxBytesInFlight"></a>spark.rapids.shuffle.multiThreaded.maxBytesInFlight|The size limit, in bytes, that the RAPIDS shuffle manager configured in "MULTITHREADED" mode will allow to be deserialized concurrently per task. This is also the maximum amount of memory that will be used per task. This should be set larger than Spark's default maxBytesInFlight (48MB). The larger this setting is, the more compressed shuffle chunks are processed concurrently. In practice, care needs to be taken to not go over the amount of off-heap memory that Netty has available. See https://github.com/NVIDIA/spark-rapids/issues/9153.|134217728|Startup
<a name="shuffle.multiThreaded.reader.threads"></a>spark.rapids.shuffle.multiThreaded.reader.threads|The number of threads to use for reading shuffle blocks per executor in the RAPIDS shuffle manager configured in "MULTITHREADED" mode. There are two special values: 0 = feature is disabled, falls back to Spark built-in shuffle reader; 1 = our implementation of Spark's built-in shuffle reader with extra metrics.|20|Startup
<a name="shuffle.multiThreaded.writer.threads"></a>spark.rapids.shuffle.multiThreaded.writer.threads|The number of threads to use for writing shuffle blocks per executor in the RAPIDS shuffle manager configured in "MULTITHREADED" mode. There are two special values: 0 = feature is disabled, falls back to Spark built-in shuffle writer; 1 = our implementation of Spark's built-in shuffle writer with extra metrics.|20|Startup
<a name="shuffle.transport.earlyStart"></a>spark.rapids.shuffle.transport.earlyStart|Enable early connection establishment for RAPIDS Shuffle|true|Startup
Expand Down
7 changes: 7 additions & 0 deletions docs/additional-functionality/rapids-shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ configuration can be independently changed for writers and readers using:
`spark.rapids.shuffle.multiThreaded.[writer|reader].threads`. An appropriate value for these
pools is the number of cores in the system divided by the number of executors per machine.

On the reader side, when blocks are received from the network, they are queued onto these threads
for decompression and decode. The amount of bytes we allow in flight per Spark task is
controlled by: `spark.rapids.shuffle.multiThreaded.maxBytesInFlight`, and it is set to
128MB-per-task as a default. Note that this memory comes from the Netty off-heap pool, and this
is sized at startup automatically by Netty, but this limit can be controlled by setting
`-Dio.netty.maxDirectMemory=[amount in Bytes]` under `spark.executor.extraJavaOptions`.

## UCX Mode

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1611,14 +1611,17 @@ object RapidsConf {

val SHUFFLE_MULTITHREADED_MAX_BYTES_IN_FLIGHT =
conf("spark.rapids.shuffle.multiThreaded.maxBytesInFlight")
.doc("The size limit, in bytes, that the RAPIDS shuffle manager configured in " +
"\"MULTITHREADED\" mode will allow to be deserialized concurrently per task. This is " +
"also the maximum amount of memory that will be used per task. This should ideally be " +
"at least the same size as the batch size so we don't have to wait to process a " +
"single batch.")
.doc(
"The size limit, in bytes, that the RAPIDS shuffle manager configured in " +
"\"MULTITHREADED\" mode will allow to be deserialized concurrently per task. This is " +
"also the maximum amount of memory that will be used per task. This should be set larger " +
"than Spark's default maxBytesInFlight (48MB). The larger this setting is, the " +
"more compressed shuffle chunks are processed concurrently. In practice, " +
"care needs to be taken to not go over the amount of off-heap memory that Netty has " +
"available. See https://github.com/NVIDIA/spark-rapids/issues/9153.")
.startupOnly()
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Integer.MAX_VALUE)
.createWithDefault(128 * 1024 * 1024)

val SHUFFLE_MULTITHREADED_WRITER_THREADS =
conf("spark.rapids.shuffle.multiThreaded.writer.threads")
Expand Down