Skip to content

Commit

Permalink
Add Partial Delta Lake Support for Databricks 13.3 (#9644)
Browse files Browse the repository at this point in the history
* delta lake changes

* Signing off

Signed-off-by: Raza Jafri <[email protected]>

* fixed 341db delta parent

* delta lake changes for AtomicCreateTableAsSelectExec

* added commit tags

* addressed review comments

* added scala 2.13 pom

* removed unnecessary change

* fixed merge conflicts

* fixed line length

* updated udf-compiler pom.xml

* updated sql-plugin pom.xml

* fixed multiple pom.xml

* updated udf-compiler pom.xml

* Singing off

Signed-off-by: Raza Jafri <[email protected]>

* Revert "updated udf-compiler pom.xml"

This reverts commit e2fd85c.

* Revert "fixed multiple pom.xml"

This reverts commit 7a87438.

* Revert "updated sql-plugin pom.xml"

This reverts commit 6c1259d.

* Revert "updated udf-compiler pom.xml"

This reverts commit 7f34d35.

* Fixed params to GpuAtomicCreateTableAsSelectExec

* Fixed GpuAtomicReplaceTableAsSelectExec params

* addressed review comments

* xfail test_delta_atomic_create_table_as_select and test_delta_atomic_replace_table_as_select

* allow WriteFilesExec on CPU

---------

Signed-off-by: Raza Jafri <[email protected]>
Signed-off-by: Raza Jafri <[email protected]>
Co-authored-by: raza <[email protected]>
  • Loading branch information
razajafri and raza authored Nov 14, 2023
1 parent 0f0a0ed commit df9fb5a
Show file tree
Hide file tree
Showing 37 changed files with 4,744 additions and 57 deletions.
1 change: 1 addition & 0 deletions delta-lake/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and directory contains the corresponding support code.
| Databricks 10.4 | Databricks 10.4 | `delta-spark321db` |
| Databricks 11.3 | Databricks 11.3 | `delta-spark330db` |
| Databricks 12.2 | Databricks 12.2 | `delta-spark332db` |
| Databricks 13.3 | Databricks 13.3 | `delta-spark341db` |

Delta Lake is not supported on all Spark versions, and for Spark versions where it is not
supported the `delta-stub` project is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.databricks.sql.managedcatalog.UnityCatalogV2Proxy
import com.databricks.sql.transaction.tahoe.{DeltaLog, DeltaOptions, DeltaParquetFileFormat}
import com.databricks.sql.transaction.tahoe.catalog.{DeltaCatalog, DeltaTableV2}
import com.databricks.sql.transaction.tahoe.commands.{DeleteCommand, DeleteCommandEdge, MergeIntoCommand, MergeIntoCommandEdge, UpdateCommand, UpdateCommandEdge, WriteIntoDelta}
import com.databricks.sql.transaction.tahoe.rapids.{GpuDeltaCatalog, GpuDeltaLog, GpuWriteIntoDelta}
import com.databricks.sql.transaction.tahoe.rapids.{GpuDeltaLog, GpuWriteIntoDelta}
import com.databricks.sql.transaction.tahoe.sources.{DeltaDataSource, DeltaSourceUtils}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.delta.shims.DeltaLogShim
Expand All @@ -38,15 +38,15 @@ import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.{FileFormat, LogicalRelation, SaveIntoDataSourceCommand}
import org.apache.spark.sql.execution.datasources.v2.{AppendDataExecV1, AtomicCreateTableAsSelectExec, AtomicReplaceTableAsSelectExec, OverwriteByExpressionExecV1}
import org.apache.spark.sql.execution.datasources.v2.rapids.{GpuAtomicCreateTableAsSelectExec, GpuAtomicReplaceTableAsSelectExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.ExternalSource
import org.apache.spark.sql.sources.{CreatableRelationProvider, InsertableRelation}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* Common implementation of the DeltaProvider interface for all Databricks versions.
*/
object DatabricksDeltaProvider extends DeltaProviderImplBase {
trait DatabricksDeltaProviderBase extends DeltaProviderImplBase {
override def getCreatableRelationRules: Map[Class[_ <: CreatableRelationProvider],
CreatableRelationProviderRule[_ <: CreatableRelationProvider]] = {
Seq(
Expand Down Expand Up @@ -116,6 +116,15 @@ object DatabricksDeltaProvider extends DeltaProviderImplBase {
catalogClass == classOf[DeltaCatalog] || catalogClass == classOf[UnityCatalogV2Proxy]
}

private def getWriteOptions(options: Any): Map[String, String] = {
// For Databricks 13.3 AtomicCreateTableAsSelectExec writeOptions is a Map[String, String]
// while in all the other versions it's a CaseInsensitiveMap
options match {
case c: CaseInsensitiveStringMap => c.asCaseSensitiveMap().asScala.toMap
case _ => options.asInstanceOf[Map[String, String]]
}
}

override def tagForGpu(
cpuExec: AtomicCreateTableAsSelectExec,
meta: AtomicCreateTableAsSelectExecMeta): Unit = {
Expand All @@ -131,22 +140,7 @@ object DatabricksDeltaProvider extends DeltaProviderImplBase {
meta.willNotWorkOnGpu(s"table provider '$provider' is not a Delta Lake provider")
}
RapidsDeltaUtils.tagForDeltaWrite(meta, cpuExec.query.schema, None,
cpuExec.writeOptions.asCaseSensitiveMap().asScala.toMap, cpuExec.session)
}

override def convertToGpu(
cpuExec: AtomicCreateTableAsSelectExec,
meta: AtomicCreateTableAsSelectExecMeta): GpuExec = {
GpuAtomicCreateTableAsSelectExec(
cpuExec.output,
new GpuDeltaCatalog(cpuExec.catalog, meta.conf),
cpuExec.ident,
cpuExec.partitioning,
cpuExec.plan,
meta.childPlans.head.convertIfNeeded(),
cpuExec.tableSpec,
cpuExec.writeOptions,
cpuExec.ifNotExists)
getWriteOptions(cpuExec.writeOptions), cpuExec.session)
}

override def tagForGpu(
Expand All @@ -164,23 +158,7 @@ object DatabricksDeltaProvider extends DeltaProviderImplBase {
meta.willNotWorkOnGpu(s"table provider '$provider' is not a Delta Lake provider")
}
RapidsDeltaUtils.tagForDeltaWrite(meta, cpuExec.query.schema, None,
cpuExec.writeOptions.asCaseSensitiveMap().asScala.toMap, cpuExec.session)
}

override def convertToGpu(
cpuExec: AtomicReplaceTableAsSelectExec,
meta: AtomicReplaceTableAsSelectExecMeta): GpuExec = {
GpuAtomicReplaceTableAsSelectExec(
cpuExec.output,
new GpuDeltaCatalog(cpuExec.catalog, meta.conf),
cpuExec.ident,
cpuExec.partitioning,
cpuExec.plan,
meta.childPlans.head.convertIfNeeded(),
cpuExec.tableSpec,
cpuExec.writeOptions,
cpuExec.orCreate,
cpuExec.invalidateCache)
getWriteOptions(cpuExec.writeOptions), cpuExec.session)
}

private case class DeltaWriteV1Config(
Expand Down Expand Up @@ -360,13 +338,4 @@ class DeltaCreatableRelationProviderMeta(
}

override def convertToGpu(): GpuCreatableRelationProvider = new GpuDeltaDataSource(conf)
}

/**
* Implements the Delta Probe interface for probing the Delta Lake provider on Databricks.
* @note This is instantiated via reflection from ShimLoader.
*/
class DeltaProbeImpl extends DeltaProbe {
// Delta Lake is built-in for Databricks instances, so no probing is necessary.
override def getDeltaProvider: DeltaProvider = DatabricksDeltaProvider
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@ package com.nvidia.spark.rapids.delta.shims

import com.databricks.sql.expressions.JoinedProjection
import com.databricks.sql.transaction.tahoe.DeltaColumnMapping
import com.databricks.sql.transaction.tahoe.stats.UsesMetadataFields
import com.databricks.sql.transaction.tahoe.util.JsonUtils

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
Expand Down Expand Up @@ -47,6 +46,4 @@ object ShimJoinedProjection {

object ShimJsonUtils {
def fromJson[T: Manifest](json: String): T = JsonUtils.fromJson[T](json)
}

trait ShimUsesMetadataFields extends UsesMetadataFields
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,8 @@ object DeltaShufflePartitionsUtil {
c.child
case _ => p
}
case ShuffleExchangeExec(_, child, shuffleOrigin)
if !shuffleOrigin.equals(ENSURE_REQUIREMENTS) =>
child
case s: ShuffleExchangeExec if !s.shuffleOrigin.equals(ENSURE_REQUIREMENTS) =>
s.child
case CoalesceExec(_, child) =>
child
case _ =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.delta

/**
* Implements the Delta Probe interface for probing the Delta Lake provider on Databricks.
* @note This is instantiated via reflection from ShimLoader.
*/
class DeltaProbeImpl extends DeltaProbe {
// Delta Lake is built-in for Databricks instances, so no probing is necessary.
override def getDeltaProvider: DeltaProvider = DeltaSpark321DBProvider
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.delta

import com.databricks.sql.transaction.tahoe.rapids.GpuDeltaCatalog
import com.nvidia.spark.rapids.{AtomicCreateTableAsSelectExecMeta, AtomicReplaceTableAsSelectExecMeta, GpuExec}

import org.apache.spark.sql.execution.datasources.v2.{AtomicCreateTableAsSelectExec, AtomicReplaceTableAsSelectExec}
import org.apache.spark.sql.execution.datasources.v2.rapids.{GpuAtomicCreateTableAsSelectExec, GpuAtomicReplaceTableAsSelectExec}

object DeltaSpark321DBProvider extends DatabricksDeltaProviderBase {

override def convertToGpu(
cpuExec: AtomicCreateTableAsSelectExec,
meta: AtomicCreateTableAsSelectExecMeta): GpuExec = {
GpuAtomicCreateTableAsSelectExec(
cpuExec.output,
new GpuDeltaCatalog(cpuExec.catalog, meta.conf),
cpuExec.ident,
cpuExec.partitioning,
cpuExec.plan,
meta.childPlans.head.convertIfNeeded(),
cpuExec.tableSpec,
cpuExec.writeOptions,
cpuExec.ifNotExists)
}

override def convertToGpu(
cpuExec: AtomicReplaceTableAsSelectExec,
meta: AtomicReplaceTableAsSelectExecMeta): GpuExec = {
GpuAtomicReplaceTableAsSelectExec(
cpuExec.output,
new GpuDeltaCatalog(cpuExec.catalog, meta.conf),
cpuExec.ident,
cpuExec.partitioning,
cpuExec.plan,
meta.childPlans.head.convertIfNeeded(),
cpuExec.tableSpec,
cpuExec.writeOptions,
cpuExec.orCreate,
cpuExec.invalidateCache)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.delta.shims

import com.databricks.sql.transaction.tahoe.stats.UsesMetadataFields

trait ShimUsesMetadataFields extends UsesMetadataFields
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.delta

/**
* Implements the Delta Probe interface for probing the Delta Lake provider on Databricks.
* @note This is instantiated via reflection from ShimLoader.
*/
class DeltaProbeImpl extends DeltaProbe {
// Delta Lake is built-in for Databricks instances, so no probing is necessary.
override def getDeltaProvider: DeltaProvider = DeltaSpark330DBProvider
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.delta

import com.databricks.sql.transaction.tahoe.rapids.GpuDeltaCatalog
import com.nvidia.spark.rapids.{AtomicCreateTableAsSelectExecMeta, AtomicReplaceTableAsSelectExecMeta, GpuExec}

import org.apache.spark.sql.execution.datasources.v2.{AtomicCreateTableAsSelectExec, AtomicReplaceTableAsSelectExec}
import org.apache.spark.sql.execution.datasources.v2.rapids.{GpuAtomicCreateTableAsSelectExec, GpuAtomicReplaceTableAsSelectExec}

object DeltaSpark330DBProvider extends DatabricksDeltaProviderBase {

override def convertToGpu(
cpuExec: AtomicCreateTableAsSelectExec,
meta: AtomicCreateTableAsSelectExecMeta): GpuExec = {
GpuAtomicCreateTableAsSelectExec(
cpuExec.output,
new GpuDeltaCatalog(cpuExec.catalog, meta.conf),
cpuExec.ident,
cpuExec.partitioning,
cpuExec.plan,
meta.childPlans.head.convertIfNeeded(),
cpuExec.tableSpec,
cpuExec.writeOptions,
cpuExec.ifNotExists)
}

override def convertToGpu(
cpuExec: AtomicReplaceTableAsSelectExec,
meta: AtomicReplaceTableAsSelectExecMeta): GpuExec = {
GpuAtomicReplaceTableAsSelectExec(
cpuExec.output,
new GpuDeltaCatalog(cpuExec.catalog, meta.conf),
cpuExec.ident,
cpuExec.partitioning,
cpuExec.plan,
meta.childPlans.head.convertIfNeeded(),
cpuExec.tableSpec,
cpuExec.writeOptions,
cpuExec.orCreate,
cpuExec.invalidateCache)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.delta.shims

import com.databricks.sql.transaction.tahoe.stats.UsesMetadataFields

trait ShimUsesMetadataFields extends UsesMetadataFields
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.delta

/**
* Implements the Delta Probe interface for probing the Delta Lake provider on Databricks.
* @note This is instantiated via reflection from ShimLoader.
*/
class DeltaProbeImpl extends DeltaProbe {
// Delta Lake is built-in for Databricks instances, so no probing is necessary.
override def getDeltaProvider: DeltaProvider = DeltaSpark332DBProvider
}
Loading

0 comments on commit df9fb5a

Please sign in to comment.