Skip to content

Commit

Permalink
Hive hash (#30)
Browse files Browse the repository at this point in the history
* Add HiveHash support on GPU

Signed-off-by: Firestarman <[email protected]>

* Add integration tests

Signed-off-by: Firestarman <[email protected]>

* more tests

Signed-off-by: Firestarman <[email protected]>

---------

Signed-off-by: Firestarman <[email protected]>
Co-authored-by: Firestarman <[email protected]>
  • Loading branch information
wjxiz1992 and firestarman authored Jun 27, 2024
1 parent 4615429 commit e0380b9
Show file tree
Hide file tree
Showing 64 changed files with 533 additions and 89 deletions.
90 changes: 66 additions & 24 deletions integration_tests/src/main/python/datasourcev2_write_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -11,36 +11,78 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from asserts import assert_gpu_fallback_collect
from data_gen import *
from asserts import assert_gpu_fallback_collect, assert_equal_with_local_sort
from data_gen import gen_df
from marks import *
from pyspark.sql.types import *
from spark_session import is_hive_available, is_spark_330_or_later, with_cpu_session
from spark_session import is_hive_available, is_spark_330_or_later, with_cpu_session, with_gpu_session
from hive_parquet_write_test import _hive_bucket_gens, _hive_array_gens, _hive_struct_gens
from hive_write_test import _restricted_timestamp
from hive_parquet_write_test import read_single_bucket

_hive_write_conf = {
"hive.enforce.bucketing": "true",
"hive.exec.dynamic.partition": "true",
"hive.exec.dynamic.partition.mode": "nonstrict"}


@pytest.mark.skipif(not (is_hive_available() and is_spark_330_or_later()),
reason="Must have Hive on Spark 3.3+")
@pytest.mark.parametrize('file_format', ['parquet', 'orc'])
def test_write_hive_bucketed_table(spark_tmp_table_factory, file_format):
def gen_table(spark):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(_hive_bucket_gens)]
types_sql_str = ','.join('{} {}'.format(
name, gen.data_type.simpleString()) for name, gen in gen_list)
col_names_str = ','.join(name for name, gen in gen_list)
data_table = spark_tmp_table_factory.get()
gen_df(spark, gen_list).createOrReplaceTempView(data_table)
return data_table, types_sql_str, col_names_str

(input_data, input_schema, input_cols_str) = with_cpu_session(gen_table)
num_buckets = 4

def write_hive_table(spark, out_table):
spark.sql(
"create table {0} ({1}) stored as {2} clustered by ({3}) into {4} buckets".format(
out_table, input_schema, file_format, input_cols_str, num_buckets))
spark.sql(
"insert into {0} select * from {1}".format(out_table, input_data))

cpu_table = spark_tmp_table_factory.get()
gpu_table = spark_tmp_table_factory.get()
with_cpu_session(lambda spark: write_hive_table(spark, cpu_table), _hive_write_conf)
with_gpu_session(lambda spark: write_hive_table(spark, gpu_table), _hive_write_conf)
cur_bucket_id = 0
while cur_bucket_id < num_buckets:
# Verify the result bucket by bucket
ret_cpu = read_single_bucket(cpu_table, cur_bucket_id)
ret_gpu = read_single_bucket(gpu_table, cur_bucket_id)
assert_equal_with_local_sort(ret_cpu, ret_gpu)
cur_bucket_id += 1


@ignore_order
@allow_non_gpu('DataWritingCommandExec,ExecutedCommandExec,WriteFilesExec')
@pytest.mark.skipif(not (is_hive_available() and is_spark_330_or_later()), reason="Must have Hive on Spark 3.3+")
@pytest.mark.parametrize('fileFormat', ['parquet', 'orc'])
def test_write_hive_bucketed_table_fallback(spark_tmp_table_factory, fileFormat):
"""
fallback because GPU does not support Hive hash partition
"""
table = spark_tmp_table_factory.get()
@allow_non_gpu('DataWritingCommandExec,ExecutedCommandExec,SortExec,WriteFilesExec')
@pytest.mark.skipif(not (is_hive_available() and is_spark_330_or_later()),
reason="Must have Hive on Spark 3.3+")
@pytest.mark.parametrize('file_format', ['parquet', 'orc'])
@pytest.mark.parametrize('gen', [_restricted_timestamp()] + _hive_array_gens + _hive_struct_gens)
def test_write_hive_bucketed_unsupported_types_fallback(spark_tmp_table_factory, file_format, gen):
out_table = spark_tmp_table_factory.get()

def create_hive_table(spark):
spark.sql("""create table {0} (a bigint, b bigint, c bigint)
stored as {1}
clustered by (b) into 3 buckets""".format(table, fileFormat))
return None

conf = {"hive.enforce.bucketing": "true",
"hive.exec.dynamic.partition": "true",
"hive.exec.dynamic.partition.mode": "nonstrict"}
with_cpu_session(create_hive_table, conf = conf)
spark.sql("create table {0} (a {1}) stored as {2} clustered by (a) into 3 buckets".format(
out_table, gen.data_type.simpleString(), file_format))
data_table = spark_tmp_table_factory.get()
gen_df(spark, [('a', gen)], length=10).createOrReplaceTempView(data_table)
return data_table

input_table = with_cpu_session(create_hive_table, _hive_write_conf)
assert_gpu_fallback_collect(
lambda spark: spark.sql("insert into {} values (1, 2, 3)".format(table)),
lambda spark: spark.sql(
"insert into {0} select * from {1}".format(out_table, input_table)),
'DataWritingCommandExec',
conf = conf)
_hive_write_conf)
105 changes: 100 additions & 5 deletions integration_tests/src/main/python/hive_parquet_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,23 @@

import pytest

from asserts import assert_gpu_and_cpu_sql_writes_are_equal_collect
from asserts import assert_gpu_and_cpu_sql_writes_are_equal_collect, assert_equal_with_local_sort
from conftest import is_databricks_runtime
from data_gen import *
from hive_write_test import _restricted_timestamp
from marks import allow_non_gpu, ignore_order
from spark_session import with_cpu_session, is_before_spark_320, is_spark_350_or_later
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_320, is_spark_350_or_later, is_before_spark_330, is_spark_330_or_later

# Disable the meta conversion from Hive write to FrameData write in Spark, to test
# "GpuInsertIntoHiveTable" for Parquet write.
_write_to_hive_conf = {"spark.sql.hive.convertMetastoreParquet": False}

_hive_basic_gens = [
byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen,
DateGen(start=date(1590, 1, 1)), _restricted_timestamp(),
_hive_bucket_gens = [
boolean_gen, byte_gen, short_gen, int_gen, long_gen, string_gen, float_gen, double_gen,
DateGen(start=date(1590, 1, 1))]

_hive_basic_gens = _hive_bucket_gens + [
_restricted_timestamp(),
DecimalGen(precision=19, scale=1, nullable=True),
DecimalGen(precision=23, scale=5, nullable=True),
DecimalGen(precision=36, scale=3, nullable=True)]
Expand Down Expand Up @@ -58,6 +61,20 @@
fallback_nodes = ['ProjectExec'] if is_databricks_runtime() or is_spark_350_or_later() else []


def read_single_bucket(table, bucket_id):
# Bucket Id string format: f"_$id%05d" + ".c$fileCounter%03d".
# fileCounter is always 0 in this test. For example '_00002.c000' is for
# bucket id being 2.
# We leverage this bucket segment in the file path to filter rows belong to a bucket.
bucket_segment = '_' + "{}".format(bucket_id).rjust(5, '0') + '.c000'
return with_cpu_session(
lambda spark: spark.sql("select * from {}".format(table))
.withColumn('file_name', f.input_file_name())
.filter(f.col('file_name').contains(bucket_segment))
.drop('file_name') # need to drop the file_name column for comparison.
.collect())


@allow_non_gpu(*(non_utc_allow + fallback_nodes))
@ignore_order(local=True)
@pytest.mark.parametrize("is_ctas", [True, False], ids=['CTAS', 'CTTW'])
Expand Down Expand Up @@ -174,3 +191,81 @@ def write_to_hive_sql(spark, output_table):
spark_tmp_table_factory,
write_to_hive_sql,
_write_to_hive_conf)


@pytest.mark.skipif(is_before_spark_330(),
reason="InsertIntoHiveTable supports bucketed write since Spark 330")
def test_insert_hive_bucketed_table(spark_tmp_table_factory):
def gen_table(spark):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(_hive_bucket_gens)]
types_sql_str = ','.join('{} {}'.format(
name, gen.data_type.simpleString()) for name, gen in gen_list)
col_names_str = ','.join(name for name, gen in gen_list)
data_table = spark_tmp_table_factory.get()
gen_df(spark, gen_list).createOrReplaceTempView(data_table)
return data_table, types_sql_str, col_names_str

(input_data, input_schema, input_cols_str) = with_cpu_session(gen_table)
num_buckets = 4

def insert_hive_table(spark, out_table):
spark.sql(
"CREATE TABLE {} ({}) STORED AS PARQUET CLUSTERED BY ({}) INTO {} BUCKETS".format(
out_table, input_schema, input_cols_str, num_buckets))
spark.sql(
"INSERT OVERWRITE {} SELECT * FROM {}".format(out_table, input_data))

cpu_table = spark_tmp_table_factory.get()
gpu_table = spark_tmp_table_factory.get()
with_cpu_session(lambda spark: insert_hive_table(spark, cpu_table), _write_to_hive_conf)
with_gpu_session(lambda spark: insert_hive_table(spark, gpu_table), _write_to_hive_conf)
for cur_bucket_id in range(num_buckets):
# Verify the result bucket by bucket
ret_cpu = read_single_bucket(cpu_table, cur_bucket_id)
ret_gpu = read_single_bucket(gpu_table, cur_bucket_id)
assert_equal_with_local_sort(ret_cpu, ret_gpu)


@pytest.mark.skipif(is_spark_330_or_later(),
reason = "InsertIntoHiveTable supports bucketed write since Spark 330")
@pytest.mark.parametrize("hive_hash", [True, False])
def test_insert_hive_bucketed_table_before_330(spark_tmp_table_factory, hive_hash):
num_buckets = 4

def insert_hive_table(spark, out_table):
data_table = spark_tmp_table_factory.get()
two_col_df(spark, int_gen, long_gen).createOrReplaceTempView(data_table)
spark.sql(
"""CREATE TABLE {} (a int, b long) STORED AS PARQUET
CLUSTERED BY (a) INTO {} BUCKETS""".format(out_table, num_buckets))
spark.sql(
"INSERT OVERWRITE {} SELECT * FROM {}".format(out_table, data_table))

all_confs = copy_and_update(_write_to_hive_conf, {
"hive.enforce.bucketing": False, # allow the write with bucket spec
"hive.enforce.sorting": False, # allow the write with bucket spec
"spark.rapids.sql.format.write.forceHiveHashForBucketing": hive_hash
})
cpu_table = spark_tmp_table_factory.get()
gpu_table = spark_tmp_table_factory.get()
with_cpu_session(lambda spark: insert_hive_table(spark, cpu_table), all_confs)
with_gpu_session(lambda spark: insert_hive_table(spark, gpu_table), all_confs)

all_cpu_rows = with_cpu_session(
lambda spark: spark.sql("select * from {}".format(cpu_table)).collect())
all_gpu_rows = with_cpu_session(
lambda spark: spark.sql("select * from {}".format(gpu_table)).collect())
assert_equal_with_local_sort(all_cpu_rows, all_gpu_rows)

for cur_bucket_id in range(num_buckets):
ret_cpu = read_single_bucket(cpu_table, cur_bucket_id)
ret_gpu = read_single_bucket(gpu_table, cur_bucket_id)
if hive_hash:
# GPU will write the right bucketed table, but CPU will not. Because
# InsertIntoHiveTable supports bucketed write only since Spark 330.
# GPU behaviors differently than the normal Spark.
assert len(ret_gpu) > 0 and len(ret_cpu) == 0
else:
# Both GPU and CPU write the data but no bucketing, actually.
assert len(ret_gpu) == 0 and len(ret_cpu) == 0

44 changes: 13 additions & 31 deletions integration_tests/src/main/python/hive_write_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -119,39 +119,21 @@ def test_optimized_hive_ctas_options_fallback(gens, storage_with_opts, spark_tmp
spark_tmp_table_factory.get(), opts_string, storage, data_table)),
fallback_class)

@allow_non_gpu('DataWritingCommandExec,ExecutedCommandExec,WriteFilesExec')
@pytest.mark.skipif(not (is_hive_available() and is_spark_33X() and not is_databricks122_or_later()),
@ignore_order
@pytest.mark.skipif(not (is_hive_available() and is_spark_330_or_later() and not is_databricks122_or_later()),
reason="Requires Hive and Spark 3.3.X to write bucketed Hive tables")
@pytest.mark.parametrize("gens", [_basic_gens], ids=idfn)
@pytest.mark.parametrize("storage", ["PARQUET", "ORC"], ids=idfn)
def test_optimized_hive_bucketed_fallback_33X(gens, storage, spark_tmp_table_factory):
def test_optimized_hive_ctas_bucketed_table(storage, spark_tmp_table_factory):
in_table = spark_tmp_table_factory.get()
with_cpu_session(lambda spark: three_col_df(spark, int_gen, int_gen, int_gen).createOrReplaceTempView(in_table))
assert_gpu_fallback_collect(
lambda spark: spark.sql(
"""CREATE TABLE {} STORED AS {}
CLUSTERED BY (b) INTO 3 BUCKETS
AS SELECT * FROM {}""".format(spark_tmp_table_factory.get(), storage, in_table)),
"DataWritingCommandExec")

# Since Spark 3.4.0, the internal "SortExec" will be pulled out by default
# from the FileFormatWriter. Then it is visible in the planning stage.
@allow_non_gpu("DataWritingCommandExec", "SortExec", "WriteFilesExec")
@pytest.mark.skipif(not (is_hive_available() and (is_spark_340_or_later() or is_databricks122_or_later())),
reason="Requires Hive and Spark 3.4+ to write bucketed Hive tables with SortExec pulled out")
@pytest.mark.parametrize("gens", [_basic_gens], ids=idfn)
@pytest.mark.parametrize("storage", ["PARQUET", "ORC"], ids=idfn)
@pytest.mark.parametrize("planned_write", [True, False], ids=idfn)
def test_optimized_hive_bucketed_fallback(gens, storage, planned_write, spark_tmp_table_factory):
in_table = spark_tmp_table_factory.get()
with_cpu_session(lambda spark: three_col_df(spark, int_gen, int_gen, int_gen).createOrReplaceTempView(in_table))
assert_gpu_fallback_collect(
lambda spark: spark.sql(
"""CREATE TABLE {} STORED AS {}
CLUSTERED BY (b) INTO 3 BUCKETS
AS SELECT * FROM {}""".format(spark_tmp_table_factory.get(), storage, in_table)),
"ExecutedCommandExec",
{"spark.sql.optimizer.plannedWrite.enabled": planned_write})
# Supported types of Hive hash are all checked in datasourcev2_write_test, so here just
# verify the basic functionality by only the int_gen.
with_cpu_session(lambda spark: three_col_df(
spark, int_gen, int_gen, int_gen).createOrReplaceTempView(in_table))
assert_gpu_and_cpu_sql_writes_are_equal_collect(
spark_tmp_table_factory,
lambda spark, out_table: """CREATE TABLE {} STORED AS {}
CLUSTERED BY (b) INTO 3 BUCKETS AS SELECT * FROM {}""".format(
out_table, storage, in_table))

def test_hive_copy_ints_to_long(spark_tmp_table_factory):
do_hive_copy(spark_tmp_table_factory, int_gen, "INT", "BIGINT")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ final class InsertIntoHadoopFsRelationCommandMeta(

override def tagSelfForGpuInternal(): Unit = {
if (GpuBucketingUtils.isHiveHashBucketing(cmd.options)) {
GpuBucketingUtils.tagForHiveBucketingWrite(this, cmd.bucketSpec, cmd.outputColumns, false)
GpuBucketingUtils.tagForHiveBucketingWrite(this, cmd.bucketSpec, cmd.outputColumns,
conf.isForceHiveHashForBucketedWrite)
} else {
BucketIdMetaUtils.tagForBucketingWrite(this, cmd.bucketSpec, cmd.outputColumns)
}
Expand Down Expand Up @@ -3201,6 +3202,16 @@ object GpuOverrides extends Logging {
def convertToGpu(): GpuExpression =
GpuXxHash64(childExprs.map(_.convertToGpu()), a.seed)
}),
expr[HiveHash](
"hive hash operator",
ExprChecks.projectOnly(TypeSig.INT, TypeSig.INT,
repeatingParamCheck = Some(RepeatingParamCheck("input",
TypeSig.commonCudfTypes + TypeSig.NULL - TypeSig.TIMESTAMP,
TypeSig.all))),
(a, conf, p, r) => new ExprMeta[HiveHash](a, conf, p, r) {
def convertToGpu(): GpuExpression =
GpuHiveHash(childExprs.map(_.convertToGpu()))
}),
expr[Contains](
"Contains",
ExprChecks.binaryProject(TypeSig.BOOLEAN, TypeSig.BOOLEAN,
Expand Down
11 changes: 11 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1859,6 +1859,15 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.bytesConf(ByteUnit.BYTE)
.createWithDefault(64 * 1024)

val FORCE_HIVE_HASH_FOR_BUCKETED_WRITE =
conf("spark.rapids.sql.format.write.forceHiveHashForBucketing")
.doc("Hive write commands before Spark 330 use Murmur3Hash for bucketed write. " +
"When enabled, HiveHash will be always used for this instead of Murmur3. This is " +
"used to align with some customized Spark binaries before 330.")
.internal()
.booleanConf
.createWithDefault(false)

val SHUFFLE_MULTITHREADED_MAX_BYTES_IN_FLIGHT =
conf("spark.rapids.shuffle.multiThreaded.maxBytesInFlight")
.doc(
Expand Down Expand Up @@ -3146,6 +3155,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isFastSampleEnabled: Boolean = get(ENABLE_FAST_SAMPLE)

lazy val isForceHiveHashForBucketedWrite: Boolean = get(FORCE_HIVE_HASH_FOR_BUCKETED_WRITE)

lazy val isDetectDeltaLogQueries: Boolean = get(DETECT_DELTA_LOG_QUERIES)

lazy val isDetectDeltaCheckpointQueries: Boolean = get(DETECT_DELTA_CHECKPOINT_QUERIES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object GpuHiveFileFormat extends Logging {
val insertCmd = meta.wrapped
// Bucketing write
GpuBucketingUtils.tagForHiveBucketingWrite(meta, insertCmd.table.bucketSpec,
insertCmd.outputColumns, false)
insertCmd.outputColumns, meta.conf.isForceHiveHashForBucketedWrite)

// Infer the file format from the serde string, similar as what Spark does in
// RelationConversions for Hive.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,6 +42,7 @@ private[hive] trait GpuSaveAsHiveFile extends GpuDataWritingCommand with SaveAsH
hadoopConf: Configuration,
fileFormat: ColumnarFileFormat,
outputLocation: String,
forceHiveHashForBucketing: Boolean,
customPartitionLocations: Map[TablePartitionSpec,String] = Map.empty,
partitionAttributes: Seq[Attribute] = Nil,
bucketSpec: Option[BucketSpec] = None,
Expand All @@ -65,6 +66,7 @@ private[hive] trait GpuSaveAsHiveFile extends GpuDataWritingCommand with SaveAsH
statsTrackers = Seq(gpuWriteJobStatsTracker(hadoopConf)),
options = options,
useStableSort = false, // TODO: Fetch from RapidsConf.
forceHiveHashForBucketing = forceHiveHashForBucketing,
concurrentWriterPartitionFlushSize = 0L // TODO: Fetch from RapidsConf.
)
}
Expand Down
Loading

0 comments on commit e0380b9

Please sign in to comment.