diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala index eabde5c156b..faca6d8e3c7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids import java.io.{File, IOException} import java.net.{URI, URISyntaxException} -import java.util.concurrent.{Callable, ConcurrentLinkedQueue, ExecutorCompletionService, Future, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} +import java.util.concurrent.{Callable, ConcurrentLinkedQueue, ExecutorCompletionService, Future, ThreadPoolExecutor, TimeUnit} import scala.annotation.tailrec import scala.collection.JavaConverters._ @@ -123,20 +123,11 @@ object MultiFileReaderThreadPool extends Logging { private def initThreadPool( maxThreads: Int, - keepAliveSeconds: Long = 60): ThreadPoolExecutor = synchronized { + keepAliveSeconds: Int = 60): ThreadPoolExecutor = synchronized { if (threadPool.isEmpty) { - val threadFactory = new ThreadFactoryBuilder() - .setNameFormat(s"multithreaded file reader worker-%d") - .setDaemon(true) - .build() - - val threadPoolExecutor = new ThreadPoolExecutor( - maxThreads, // corePoolSize: max number of threads to create before queuing the tasks - maxThreads, // maximumPoolSize: because we use LinkedBlockingDeque, this is not used - keepAliveSeconds, - TimeUnit.SECONDS, - new LinkedBlockingQueue[Runnable], - threadFactory) + val threadPoolExecutor = + TrampolineUtil.newDaemonCachedThreadPool("multithreaded file reader worker", maxThreads, + keepAliveSeconds) threadPoolExecutor.allowCoreThreadTimeOut(true) logDebug(s"Using $maxThreads for the multithreaded reader thread pool") threadPool = Some(threadPoolExecutor) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index b9fc5ffc889..7e75940869b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -38,7 +38,7 @@ import com.nvidia.spark.rapids.RapidsConf.ParquetFooterReaderType import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.filecache.FileCache import com.nvidia.spark.rapids.jni.{DateTimeRebase, ParquetFooter} -import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, GpuParquetCrypto, GpuTypeShims, ParquetLegacyNanoAsLongShims, ParquetSchemaClipShims, ParquetStringPredShims, ReaderUtils, ShimFilePartitionReaderFactory, SparkShimImpl} +import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, GpuParquetCrypto, GpuTypeShims, ParquetLegacyNanoAsLongShims, ParquetSchemaClipShims, ParquetStringPredShims, ShimFilePartitionReaderFactory, SparkShimImpl} import org.apache.commons.io.IOUtils import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream} import org.apache.hadoop.conf.Configuration @@ -683,12 +683,10 @@ private case class GpuParquetFileFilterHandler( conf.unset(encryptConf) } } - val fileHadoopConf = - ReaderUtils.getHadoopConfForReaderThread(new Path(file.filePath.toString), conf) val footer: ParquetMetadata = try { footerReader match { case ParquetFooterReaderType.NATIVE => - val serialized = withResource(readAndFilterFooter(file, fileHadoopConf, + val serialized = withResource(readAndFilterFooter(file, conf, readDataSchema, filePath)) { tableFooter => if (tableFooter.getNumColumns <= 0) { // Special case because java parquet reader does not like having 0 columns. @@ -712,7 +710,7 @@ private case class GpuParquetFileFilterHandler( } } case _ => - readAndSimpleFilterFooter(file, fileHadoopConf, filePath) + readAndSimpleFilterFooter(file, conf, filePath) } } catch { case e if GpuParquetCrypto.isColumnarCryptoException(e) => @@ -739,9 +737,9 @@ private case class GpuParquetFileFilterHandler( val blocks = if (pushedFilters.isDefined) { withResource(new NvtxRange("getBlocksWithFilter", NvtxColor.CYAN)) { _ => // Use the ParquetFileReader to perform dictionary-level filtering - ParquetInputFormat.setFilterPredicate(fileHadoopConf, pushedFilters.get) + ParquetInputFormat.setFilterPredicate(conf, pushedFilters.get) //noinspection ScalaDeprecation - withResource(new ParquetFileReader(fileHadoopConf, footer.getFileMetaData, filePath, + withResource(new ParquetFileReader(conf, footer.getFileMetaData, filePath, footer.getBlocks, Collections.emptyList[ColumnDescriptor])) { parquetReader => parquetReader.getRowGroups } @@ -1551,14 +1549,13 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics val filePathString: String = filePath.toString val remoteItems = new ArrayBuffer[CopyRange](blocks.length) var totalBytesToCopy = 0L - val fileHadoopConf = ReaderUtils.getHadoopConfForReaderThread(filePath, conf) withResource(new ArrayBuffer[LocalCopy](blocks.length)) { localItems => blocks.foreach { block => block.getColumns.asScala.foreach { column => val columnSize = column.getTotalSize val outputOffset = totalBytesToCopy + startPos val channel = FileCache.get.getDataRangeChannel(filePathString, - column.getStartingPos, columnSize, fileHadoopConf) + column.getStartingPos, columnSize, conf) if (channel.isDefined) { localItems += LocalCopy(channel.get, columnSize, outputOffset) } else { @@ -1589,14 +1586,13 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics return 0L } - val fileHadoopConf = ReaderUtils.getHadoopConfForReaderThread(filePath, conf) val coalescedRanges = coalesceReads(remoteCopies) val totalBytesCopied = PerfIO.readToHostMemory( - fileHadoopConf, out.buffer, filePath.toUri, + conf, out.buffer, filePath.toUri, coalescedRanges.map(r => IntRangeWithOffset(r.offset, r.length, r.outputOffset)) ).getOrElse { - withResource(filePath.getFileSystem(fileHadoopConf).open(filePath)) { in => + withResource(filePath.getFileSystem(conf).open(filePath)) { in => val copyBuffer: Array[Byte] = new Array[Byte](copyBufferSize) coalescedRanges.foldLeft(0L) { (acc, blockCopy) => acc + copyDataRange(blockCopy, in, out, copyBuffer) @@ -1608,7 +1604,7 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics metrics.getOrElse(GpuMetric.FILECACHE_DATA_RANGE_MISSES, NoopMetric) += 1 metrics.getOrElse(GpuMetric.FILECACHE_DATA_RANGE_MISSES_SIZE, NoopMetric) += range.length val cacheToken = FileCache.get.startDataRangeCache( - filePathString, range.offset, range.length, fileHadoopConf) + filePathString, range.offset, range.length, conf) // If we get a filecache token then we can complete the caching by providing the data. // If we do not get a token then we should not cache this data. cacheToken.foreach { token => diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala index c6db132e9a0..9e03dc26141 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.rapids.execution +import java.util.concurrent.ThreadPoolExecutor + import org.json4s.JsonAST import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkMasterRegex, SparkUpgradeException, TaskContext} @@ -219,6 +221,13 @@ object TrampolineUtil { } } + def newDaemonCachedThreadPool( + prefix: String, + maxThreadNumber: Int, + keepAliveSeconds: Int): ThreadPoolExecutor = { + org.apache.spark.util.ThreadUtils.newDaemonCachedThreadPool(prefix, maxThreadNumber, + keepAliveSeconds) + } def postEvent(sc: SparkContext, sparkEvent: SparkListenerEvent): Unit = { sc.listenerBus.post(sparkEvent) diff --git a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/ReaderUtils.scala b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/ReaderUtils.scala deleted file mode 100644 index 0fa04858b3f..00000000000 --- a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/ReaderUtils.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "311"} -{"spark": "312"} -{"spark": "313"} -{"spark": "320"} -{"spark": "321"} -{"spark": "321cdh"} -{"spark": "322"} -{"spark": "323"} -{"spark": "324"} -{"spark": "330"} -{"spark": "330cdh"} -{"spark": "331"} -{"spark": "332"} -{"spark": "332cdh"} -{"spark": "333"} -{"spark": "334"} -{"spark": "340"} -{"spark": "341"} -{"spark": "342"} -{"spark": "343"} -{"spark": "350"} -{"spark": "351"} -{"spark": "400"} -spark-rapids-shim-json-lines ***/ -package com.nvidia.spark.rapids.shims - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path - -object ReaderUtils { - def getHadoopConfForReaderThread(filePath: Path, conf: Configuration): Configuration = { - conf - } -} diff --git a/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/ReaderUtils.scala b/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/ReaderUtils.scala deleted file mode 100644 index 2f4360424fb..00000000000 --- a/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/ReaderUtils.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "330db"} -{"spark": "332db"} -{"spark": "341db"} -spark-rapids-shim-json-lines ***/ -package com.nvidia.spark.rapids.shims - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path - -import org.apache.spark.internal.Logging - -object ReaderUtils extends Logging { - - private lazy val isUnityCatalogEnabled = com.databricks.unity.UnityConf.isEnabled - - /* - * Databricks has the Unity Catalog that allows accessing files across multiple metastores and - * catalogs. When our readers run in different threads, the credentials don't get setup - * properly. Here we get the Hadoop configuration associated specifically with that file which - * seems to contain the necessary credentials. This conf will be used when creating the - * Hadoop Filesystem, which with Unity ends up being a special Credentials file system. - */ - def getHadoopConfForReaderThread(filePath: Path, conf: Configuration): Configuration = { - if (isUnityCatalogEnabled) { - try { - com.databricks.unity.ClusterDefaultSAM.createDelegateHadoopConf(filePath, conf) - } catch { - case a: AssertionError => - // ignore this and just return the regular conf, it might be a filesystem not supported - // and I don't have a good way to check this - logWarning("Assertion error calling createDelegateHadoopConf, skipping.", a) - conf - } - } else { - conf - } - } -}