Skip to content

Commit

Permalink
add hdfs support
Browse files Browse the repository at this point in the history
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
  • Loading branch information
binmahone committed Jun 25, 2024
1 parent 4d2652f commit d67af5e
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.nvidia.spark.rapids

import java.io.{ByteArrayOutputStream, File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
import java.io.{ByteArrayOutputStream, File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream, OutputStream}
import java.util.Random

import scala.collection.mutable
Expand Down Expand Up @@ -118,7 +118,7 @@ object DumpUtils extends Logging {

private def dumpToParquetFileImp(table: Table, filePrefix: String): String = {
val path = genPath(filePrefix)
withResource(new ParquetDumper(path, table)) { dumper =>
withResource(new ParquetDumper(new FileOutputStream(path), table)) { dumper =>
dumper.writeTable(table)
path
}
Expand Down Expand Up @@ -146,9 +146,9 @@ object DumpUtils extends Logging {
}

// parquet dumper
class ParquetDumper(path: String, table: Table) extends HostBufferConsumer
class ParquetDumper(private[this] val outputStream: OutputStream, table: Table)
extends HostBufferConsumer
with AutoCloseable {
private[this] val outputStream = new FileOutputStream(path)
private[this] val tempBuffer = new Array[Byte](128 * 1024)
private[this] val buffers = mutable.Queue[(HostMemoryBuffer, Long)]()

Expand Down
34 changes: 19 additions & 15 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ object GpuMetric extends Logging {
val DESCRIPTION_FILECACHE_DATA_RANGE_READ_TIME = "cached data read time"

def unwrap(input: GpuMetric): SQLMetric = input match {
case w :WrappedGpuMetric => w.sqlMetric
case w: WrappedGpuMetric => w.sqlMetric
case i => throw new IllegalArgumentException(s"found unsupported GpuMetric ${i.getClass}")
}

Expand Down Expand Up @@ -250,6 +250,7 @@ trait GpuExec extends SparkPlan {
@transient lazy val loreDumpOperator: Option[String] = RapidsConf.LORE_DUMP_OPERATOR.get(conf)
@transient lazy val loreDumpLOREIds: String = RapidsConf.LORE_DUMP_LORE_IDS.get(conf)
@transient lazy val loreDumpPartitions: String = RapidsConf.LORE_DUMP_PARTITIONS.get(conf)
@transient lazy val loreDumpPath: String = RapidsConf.LORE_DUMP_PATH.get(conf)

// For LORE DumpedExecReplayer, the spark plan is deserialized from the plan.meta file, so
// some of the transient fields will be null, and we need to workaround this
Expand Down Expand Up @@ -295,9 +296,9 @@ trait GpuExec extends SparkPlan {
*/
def outputBatching: CoalesceGoal = null

private [this] lazy val metricsConf = MetricsLevel(RapidsConf.METRICS_LEVEL.get(conf))
private[this] lazy val metricsConf = MetricsLevel(RapidsConf.METRICS_LEVEL.get(conf))

private [this] def createMetricInternal(level: MetricsLevel, f: => SQLMetric): GpuMetric = {
private[this] def createMetricInternal(level: MetricsLevel, f: => SQLMetric): GpuMetric = {
if (level >= metricsConf) {
WrappedGpuMetric(f)
} else {
Expand Down Expand Up @@ -355,7 +356,7 @@ trait GpuExec extends SparkPlan {
lazy val allMetrics: Map[String, GpuMetric] = Map(
NUM_OUTPUT_ROWS -> createMetric(outputRowsLevel, DESCRIPTION_NUM_OUTPUT_ROWS),
NUM_OUTPUT_BATCHES -> createMetric(outputBatchesLevel, DESCRIPTION_NUM_OUTPUT_BATCHES)) ++
additionalMetrics
additionalMetrics

def gpuLongMetric(name: String): GpuMetric = allMetrics(name)

Expand Down Expand Up @@ -413,8 +414,8 @@ trait GpuExec extends SparkPlan {

final override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val hadoopConf = new SerializableConfiguration(sparkSession.sparkContext.hadoopConfiguration)
def getOutputStream(filePath: String): FSDataOutputStream = {
val hadoopPath = new Path(filePath)

def getOutputStream(hadoopPath: Path): FSDataOutputStream = {
val fs = hadoopPath.getFileSystem(hadoopConf.value)
fs.create(hadoopPath, true)
}
Expand All @@ -434,7 +435,7 @@ trait GpuExec extends SparkPlan {
// dump plan node
val planBytes = serializeObject(this)
val fos = getOutputStream(
s"file:/tmp/lore/lore_id=${myLoreId}_plan_id=${childPlanId}/plan.meta")
new Path(new Path(loreDumpPath), s"lore_id=${myLoreId}_plan_id=${childPlanId}/plan.meta"))
fos.write(planBytes)
fos.close()
}
Expand Down Expand Up @@ -476,21 +477,24 @@ trait GpuExec extends SparkPlan {
val cbTypes = GpuColumnVector.extractTypes(cb)
val bytes = serializeObject(cbTypes)
val fos = getOutputStream(
s"file:/tmp/lore/lore_id=${dumpForLOREIdToBroadcast}_plan_id=${planId}/" +
s"partition_id=${partitionId}/" +
s"batch_id=${batchId}/col_types.meta")
new Path(new Path(loreDumpPath),
s"lore_id=${dumpForLOREIdToBroadcast}_plan_id=${planId}/" +
s"partition_id=${partitionId}/" +
s"batch_id=${batchId}/col_types.meta"))
fos.write(bytes)
fos.close()
}

// dump data for column batch to /tmp dir
withResource(GpuColumnVector.from(cb)) { table =>
val path = s"/tmp/lore/lore_id=${dumpForLOREIdToBroadcast}_plan_id=${planId}/" +
s"partition_id=${partitionId}/" +
s"batch_id=${batchId}/cb_data.parquet"
withResource(new ParquetDumper(path, table)) { dumper =>
val fos = getOutputStream(
new Path(new Path(loreDumpPath),
s"lore_id=${dumpForLOREIdToBroadcast}_plan_id=${planId}/" +
s"partition_id=${partitionId}/" +
s"batch_id=${batchId}/cb_data.parquet"))
// ParquetDumper will close the output stream
withResource(new ParquetDumper(fos, table)) { dumper =>
dumper.writeTable(table)
path
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,13 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.stringConf
.createWithDefault("0")

val LORE_DUMP_PATH = conf("spark.rapids.LORE.pathPrefix")
.doc("Specifies a URI path to use when dumping with LORE, the default path is: " +
"file:/tmp/lore/")
.internal()
.stringConf
.createWithDefault("file:/tmp/lore/")

val PROFILE_PATH = conf("spark.rapids.profile.pathPrefix")
.doc("Enables profiling and specifies a URI path to use when writing profile data")
.internal()
Expand Down Expand Up @@ -2536,6 +2543,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val loreDumpPartitions: String = get(LORE_DUMP_PARTITIONS)

lazy val loreDumpPath: String = get(LORE_DUMP_PATH)

lazy val profilePath: Option[String] = get(PROFILE_PATH)

lazy val profileExecutors: String = get(PROFILE_EXECUTORS)
Expand Down

0 comments on commit d67af5e

Please sign in to comment.