Skip to content

Commit

Permalink
Merge branch 'branch-23.10' into gpucoredump
Browse files Browse the repository at this point in the history
  • Loading branch information
jlowe committed Oct 2, 2023
2 parents 320fe1d + dbc4479 commit d519bb7
Show file tree
Hide file tree
Showing 200 changed files with 5,514 additions and 2,072 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/auto-merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ name: auto-merge HEAD to BASE
on:
pull_request_target:
branches:
- branch-23.08
- branch-23.10
types: [closed]

jobs:
Expand All @@ -29,13 +29,13 @@ jobs:
steps:
- uses: actions/checkout@v3
with:
ref: branch-23.08 # force to fetch from latest upstream instead of PR ref
ref: branch-23.10 # force to fetch from latest upstream instead of PR ref

- name: auto-merge job
uses: ./.github/workflows/auto-merge
env:
OWNER: NVIDIA
REPO_NAME: spark-rapids
HEAD: branch-23.08
BASE: branch-23.10
HEAD: branch-23.10
BASE: branch-23.12
AUTOMERGE_TOKEN: ${{ secrets.AUTOMERGE_TOKEN }} # use to merge PR
3 changes: 3 additions & 0 deletions .github/workflows/blossom-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ jobs:
jbrennan333, \
jlowe,\
krajendrannv,\
kuhushukla,\
mythrocks,\
nartal1,\
nvdbaranec,\
Expand All @@ -68,6 +69,8 @@ jobs:
YanxuanLiu,\
cindyyuanjiang,\
thirtiseven,\
winningsix,\
viadea,\
', format('{0},', github.actor)) && github.event.comment.body == 'build'
steps:
- name: Check if comment is issued by authorized person
Expand Down
5 changes: 3 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ specifying the environment variable `BUILD_PARALLEL=<n>`.
### Building against different CUDA Toolkit versions
You can build against different versions of the CUDA Toolkit by using one of the following profiles:
* `-Pcuda11` (CUDA 11.0/11.1/11.2, default)
You can build against different versions of the CUDA Toolkit by modifying the variable `cuda.version`:
* `-Dcuda.version=cuda11` (CUDA 11.x, default)
* `-Dcuda.version=cuda12` (CUDA 12.x)
### Building a Distribution for a Single Spark Release
Expand Down
2 changes: 1 addition & 1 deletion aggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@
<dependencies>
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-delta-stub_${scala.binary.version}</artifactId>
<artifactId>rapids-4-spark-delta-24x_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>${spark.version.classifier}</classifier>
</dependency>
Expand Down
4 changes: 2 additions & 2 deletions api_validation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>spark-rapids-jni</artifactId>
<classifier>${cuda.version}</classifier>
<classifier>${jni.classifier}</classifier>
<scope>provided</scope>
</dependency>
<!-- use aggregator jar because accessing internal classes -->
Expand All @@ -115,7 +115,7 @@
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>${cuda.version}</classifier>
<classifier>${jni.classifier}</classifier>
<scope>provided</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package com.nvidia.spark.rapids.delta

import com.databricks.sql.transaction.tahoe.DeltaLog
import com.databricks.sql.transaction.tahoe.{DeltaLog, DeltaParquetFileFormat}
import com.databricks.sql.transaction.tahoe.commands.{DeleteCommand, DeleteCommandEdge, MergeIntoCommand, MergeIntoCommandEdge, UpdateCommand, UpdateCommandEdge}
import com.databricks.sql.transaction.tahoe.sources.DeltaDataSource
import com.nvidia.spark.rapids._

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
import org.apache.spark.sql.execution.datasources.{FileFormat, SaveIntoDataSourceCommand}
import org.apache.spark.sql.rapids.ExternalSource
import org.apache.spark.sql.sources.CreatableRelationProvider

Expand Down Expand Up @@ -72,6 +73,25 @@ object DeltaProviderImpl extends DeltaProviderImplBase {
.disabledByDefault("Delta Lake update support is experimental")
).map(r => (r.getClassFor.asSubclass(classOf[RunnableCommand]), r)).toMap
}

override def isSupportedFormat(format: Class[_ <: FileFormat]): Boolean = {
format == classOf[DeltaParquetFileFormat]
}

override def tagSupportForGpuFileSourceScan(meta: SparkPlanMeta[FileSourceScanExec]): Unit = {
val format = meta.wrapped.relation.fileFormat
if (format.getClass == classOf[DeltaParquetFileFormat]) {
GpuReadParquetFileFormat.tagSupport(meta)
GpuDeltaParquetFileFormat.tagSupportForGpuFileSourceScan(meta)
} else {
meta.willNotWorkOnGpu(s"format ${format.getClass} is not supported")
}
}

override def getReadFileFormat(format: FileFormat): FileFormat = {
val cpuFormat = format.asInstanceOf[DeltaParquetFileFormat]
GpuDeltaParquetFileFormat.convertToGpu(cpuFormat)
}
}

class DeltaCreatableRelationProviderMeta(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.delta

import com.databricks.sql.transaction.tahoe.{DeltaColumnMapping, DeltaColumnMappingMode, NoMapping}
import com.nvidia.spark.rapids.{GpuMetric, GpuParquetMultiFilePartitionReaderFactory, GpuReadParquetFileFormat}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.rapids.GpuFileSourceScanExec
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

abstract class GpuDeltaParquetFileFormatBase extends GpuReadParquetFileFormat {
val columnMappingMode: DeltaColumnMappingMode
val referenceSchema: StructType

def prepareSchema(inputSchema: StructType): StructType = {
DeltaColumnMapping.createPhysicalSchema(inputSchema, referenceSchema, columnMappingMode)
}

override def createMultiFileReaderFactory(
broadcastedConf: Broadcast[SerializableConfiguration],
pushedFilters: Array[Filter],
fileScan: GpuFileSourceScanExec): PartitionReaderFactory = {
GpuParquetMultiFilePartitionReaderFactory(
fileScan.conf,
broadcastedConf,
prepareSchema(fileScan.relation.dataSchema),
prepareSchema(fileScan.requiredSchema),
prepareSchema(fileScan.readPartitionSchema),
pushedFilters,
fileScan.rapidsConf,
fileScan.allMetrics,
fileScan.queryUsesInputFile,
fileScan.alluxioPathsMap)
}

override def buildReaderWithPartitionValuesAndMetrics(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration,
metrics: Map[String, GpuMetric],
alluxioPathReplacementMap: Option[Map[String, String]])
: PartitionedFile => Iterator[InternalRow] = {
super.buildReaderWithPartitionValuesAndMetrics(
sparkSession,
prepareSchema(dataSchema),
prepareSchema(partitionSchema),
prepareSchema(requiredSchema),
filters,
options,
hadoopConf,
metrics,
alluxioPathReplacementMap)
}

override def supportFieldName(name: String): Boolean = {
if (columnMappingMode != NoMapping) true else super.supportFieldName(name)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ object RapidsDeltaUtils {
options: Map[String, String],
spark: SparkSession): Unit = {
FileFormatChecks.tag(meta, schema, DeltaFormatType, WriteFileOp)
DeltaLogShim.fileFormat(deltaLog) match {
case _: DeltaParquetFileFormat =>
GpuParquetFileFormat.tagGpuSupport(meta, spark, options, schema)
case f =>
meta.willNotWorkOnGpu(s"file format $f is not supported")
val format = DeltaLogShim.fileFormat(deltaLog)
if (format.getClass == classOf[DeltaParquetFileFormat]) {
GpuParquetFileFormat.tagGpuSupport(meta, spark, options, schema)
} else {
meta.willNotWorkOnGpu(s"file format $format is not supported")
}
checkIncompatibleConfs(meta, schema, deltaLog, spark.sessionState.conf, options)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import scala.util.Try
import com.nvidia.spark.rapids._

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.{DeltaLog, DeltaParquetFileFormat}
import org.apache.spark.sql.delta.rapids.DeltaRuntimeShim
import org.apache.spark.sql.delta.sources.DeltaDataSource
import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
import org.apache.spark.sql.execution.datasources.{FileFormat, SaveIntoDataSourceCommand}
import org.apache.spark.sql.rapids.ExternalSource
import org.apache.spark.sql.rapids.execution.UnshimmedTrampolineUtil
import org.apache.spark.sql.sources.CreatableRelationProvider
Expand All @@ -44,6 +44,10 @@ abstract class DeltaIOProvider extends DeltaProviderImplBase {
})
).map(r => (r.getClassFor.asSubclass(classOf[CreatableRelationProvider]), r)).toMap
}

override def isSupportedFormat(format: Class[_ <: FileFormat]): Boolean = {
format == classOf[DeltaParquetFileFormat]
}
}

class DeltaCreatableRelationProviderMeta(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.delta

import com.nvidia.spark.rapids.{GpuMetric, GpuParquetMultiFilePartitionReaderFactory, GpuReadParquetFileFormat}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaColumnMappingMode, NoMapping}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.rapids.GpuFileSourceScanExec
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

trait GpuDeltaParquetFileFormat extends GpuReadParquetFileFormat {
val columnMappingMode: DeltaColumnMappingMode
val referenceSchema: StructType

def prepareSchema(inputSchema: StructType): StructType = {
DeltaColumnMapping.createPhysicalSchema(inputSchema, referenceSchema, columnMappingMode)
}

override def createMultiFileReaderFactory(
broadcastedConf: Broadcast[SerializableConfiguration],
pushedFilters: Array[Filter],
fileScan: GpuFileSourceScanExec): PartitionReaderFactory = {
GpuParquetMultiFilePartitionReaderFactory(
fileScan.conf,
broadcastedConf,
prepareSchema(fileScan.relation.dataSchema),
prepareSchema(fileScan.requiredSchema),
prepareSchema(fileScan.readPartitionSchema),
pushedFilters,
fileScan.rapidsConf,
fileScan.allMetrics,
fileScan.queryUsesInputFile,
fileScan.alluxioPathsMap)
}

override def buildReaderWithPartitionValuesAndMetrics(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration,
metrics: Map[String, GpuMetric],
alluxioPathReplacementMap: Option[Map[String, String]])
: PartitionedFile => Iterator[InternalRow] = {
super.buildReaderWithPartitionValuesAndMetrics(
sparkSession,
prepareSchema(dataSchema),
prepareSchema(partitionSchema),
prepareSchema(requiredSchema),
filters,
options,
hadoopConf,
metrics,
alluxioPathReplacementMap)
}

override def supportFieldName(name: String): Boolean = {
if (columnMappingMode != NoMapping) true else super.supportFieldName(name)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ object RapidsDeltaUtils {
options: Map[String, String],
spark: SparkSession): Unit = {
FileFormatChecks.tag(meta, schema, DeltaFormatType, WriteFileOp)
DeltaRuntimeShim.fileFormatFromLog(deltaLog) match {
case _: DeltaParquetFileFormat =>
GpuParquetFileFormat.tagGpuSupport(meta, spark, options, schema)
case f =>
meta.willNotWorkOnGpu(s"file format $f is not supported")
val format = DeltaRuntimeShim.fileFormatFromLog(deltaLog)
if (format.getClass == classOf[DeltaParquetFileFormat]) {
GpuParquetFileFormat.tagGpuSupport(meta, spark, options, schema)
} else {
meta.willNotWorkOnGpu(s"file format $format is not supported")
}
checkIncompatibleConfs(meta, deltaLog, spark.sessionState.conf, options)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

package com.nvidia.spark.rapids.delta.delta20x

import com.nvidia.spark.rapids.{GpuOverrides, RunnableCommandRule}
import com.nvidia.spark.rapids.{GpuOverrides, GpuReadParquetFileFormat, RunnableCommandRule, SparkPlanMeta}
import com.nvidia.spark.rapids.delta.DeltaIOProvider

import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.commands.{DeleteCommand, MergeIntoCommand, UpdateCommand}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.FileFormat

object Delta20xProvider extends DeltaIOProvider {

Expand All @@ -41,4 +44,18 @@ object Delta20xProvider extends DeltaIOProvider {
.disabledByDefault("Delta Lake update support is experimental")
).map(r => (r.getClassFor.asSubclass(classOf[RunnableCommand]), r)).toMap
}

override def tagSupportForGpuFileSourceScan(meta: SparkPlanMeta[FileSourceScanExec]): Unit = {
val format = meta.wrapped.relation.fileFormat
if (format.getClass == classOf[DeltaParquetFileFormat]) {
GpuReadParquetFileFormat.tagSupport(meta)
} else {
meta.willNotWorkOnGpu(s"format ${format.getClass} is not supported")
}
}

override def getReadFileFormat(format: FileFormat): FileFormat = {
val cpuFormat = format.asInstanceOf[DeltaParquetFileFormat]
GpuDelta20xParquetFileFormat(cpuFormat.columnMappingMode, cpuFormat.referenceSchema)
}
}
Loading

0 comments on commit d519bb7

Please sign in to comment.