diff --git a/docs/compatibility.md b/docs/compatibility.md index 2644c873e98..343fb6bb4fe 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -537,6 +537,10 @@ The following regular expression patterns are not yet supported on the GPU and w Work is ongoing to increase the range of regular expressions that can run on the GPU. +## URL Parsing + +`parse_url` QUERY with a column key could produce different results on CPU and GPU. In Spark, the `key` in `parse_url` could act like a regex, but GPU will match the key exactly. If key is literal, GPU will check if key contains regex special characters and fallback to CPU if it does, but if key is column, it will not be able to fallback. For example, `parse_url("http://foo/bar?abc=BAD&a.c=GOOD", QUERY, "a.c")` will return "BAD" on CPU, but "GOOD" on GPU. See the Spark issue: https://issues.apache.org/jira/browse/SPARK-44500 + ## Timestamps Spark stores timestamps internally relative to the JVM time zone. Converting an arbitrary timestamp diff --git a/docs/supported_ops.md b/docs/supported_ops.md index c23349467b9..345403f9218 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -10817,7 +10817,7 @@ are limited. -PS
only support partToExtract = PROTOCOL | HOST;
Literal value only
+PS
only support partToExtract = PROTOCOL | HOST | QUERY;
Literal value only
@@ -10838,7 +10838,7 @@ are limited. -PS
Literal value only
+S diff --git a/integration_tests/src/main/python/url_test.py b/integration_tests/src/main/python/url_test.py index ba31126736a..ca6bae1853f 100644 --- a/integration_tests/src/main/python/url_test.py +++ b/integration_tests/src/main/python/url_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ r'(:[0-9]{1,3}){0,1}(/[a-zA-Z0-9]{1,3}){0,3}(\?[a-zA-Z0-9]{1,3}=[a-zA-Z0-9]{1,3}){0,1}(#([a-zA-Z0-9]{1,3})){0,1}' url_pattern_with_key = r'((http|https|ftp|file)://)(([a-z]{1,3}\.){0,3}([a-z]{1,3})\.([a-z]{1,3}))' \ - r'(:[0-9]{1,3}){0,1}(/[a-z]{1,3}){0,3}(\?key=[a-z]{1,3}){0,1}(#([a-z]{1,3})){0,1}' + r'(:[0-9]{1,3}){0,1}(/[a-z]{1,3}){0,3}(\?[a-c]{1,3}=[a-z]{1,3}(&[a-c]{1,3}=[a-z]{1,3}){0,3}){0,1}(#([a-z]{1,3})){0,1}' edge_cases = [ "userinfo@spark.apache.org/path?query=1#Ref", @@ -150,8 +150,8 @@ supported_parts = ['PROTOCOL', 'HOST', 'QUERY'] unsupported_parts = ['PATH', 'REF', 'FILE', 'AUTHORITY', 'USERINFO'] -supported_with_key_parts = ['PROTOCOL', 'HOST'] -unsupported_with_key_parts = ['QUERY', 'PATH', 'REF', 'FILE', 'AUTHORITY', 'USERINFO'] +supported_with_key_parts = ['PROTOCOL', 'HOST', 'QUERY'] +unsupported_with_key_parts = ['PATH', 'REF', 'FILE', 'AUTHORITY', 'USERINFO'] @pytest.mark.parametrize('data_gen', [url_gen, edge_cases_gen], ids=idfn) @pytest.mark.parametrize('part', supported_parts, ids=idfn) @@ -166,16 +166,38 @@ def test_parse_url_unsupported_fallback(part): lambda spark: unary_op_df(spark, url_gen).selectExpr("a", "parse_url(a, '" + part + "')"), 'ParseUrl') +def test_parse_url_query_with_key(): + url_gen = StringGen(url_pattern_with_key) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: unary_op_df(spark, url_gen) + .selectExpr("a", "parse_url(a, 'QUERY', 'abc')", "parse_url(a, 'QUERY', 'a')") + ) + +def test_parse_url_query_with_key_column(): + url_gen = StringGen(url_pattern_with_key) + key_gen = StringGen('[a-d]{1,3}') + assert_gpu_and_cpu_are_equal_collect( + lambda spark: two_col_df(spark, url_gen, key_gen) + .selectExpr("a", "parse_url(a, 'QUERY', b)") + ) + +@pytest.mark.parametrize('key', ['a?c', '*'], ids=idfn) +@allow_non_gpu('ProjectExec', 'ParseUrl') +def test_parse_url_query_with_key_regex_fallback(key): + url_gen = StringGen(url_pattern_with_key) + assert_gpu_fallback_collect( + lambda spark: unary_op_df(spark, url_gen) + .selectExpr("a", "parse_url(a, 'QUERY', '" + key + "')"), + 'ParseUrl') + @pytest.mark.parametrize('part', supported_with_key_parts, ids=idfn) def test_parse_url_with_key(part): assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, url_gen).selectExpr("parse_url(a, '" + part + "', 'key')")) - - @allow_non_gpu('ProjectExec', 'ParseUrl') @pytest.mark.parametrize('part', unsupported_with_key_parts, ids=idfn) -def test_parse_url_query_with_key_fallback(part): +def test_parse_url_with_key_fallback(part): assert_gpu_fallback_collect( lambda spark: unary_op_df(spark, url_gen).selectExpr("parse_url(a, '" + part + "', 'key')"), - 'ParseUrl') \ No newline at end of file + 'ParseUrl') diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index b330eb5b52d..2f509239577 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -445,7 +445,7 @@ object GpuOverrides extends Logging { "\f", "\\a", "\\e", "\\cx", "[", "]", "^", "&", ".", "*", "\\d", "\\D", "\\h", "\\H", "\\s", "\\S", "\\v", "\\V", "\\w", "\\w", "\\p", "$", "\\b", "\\B", "\\A", "\\G", "\\Z", "\\z", "\\R", "?", "|", "(", ")", "{", "}", "\\k", "\\Q", "\\E", ":", "!", "<=", ">") - + val regexMetaChars = ".$^[]\\|?*+(){}" /** * Provides a way to log an info message about how long an operation took in milliseconds. */ @@ -3252,18 +3252,32 @@ object GpuOverrides extends Logging { ExprChecks.projectOnly(TypeSig.STRING, TypeSig.STRING, Seq(ParamCheck("url", TypeSig.STRING, TypeSig.STRING), ParamCheck("partToExtract", TypeSig.lit(TypeEnum.STRING).withPsNote( - TypeEnum.STRING, "only support partToExtract = PROTOCOL | HOST"), TypeSig.STRING)), + TypeEnum.STRING, "only support partToExtract = PROTOCOL | HOST | QUERY"), + TypeSig.STRING)), // Should really be an OptionalParam - Some(RepeatingParamCheck("key", TypeSig.lit(TypeEnum.STRING), TypeSig.STRING))), + Some(RepeatingParamCheck("key", TypeSig.STRING, TypeSig.STRING))), (a, conf, p, r) => new ExprMeta[ParseUrl](a, conf, p, r) { override def tagExprForGpu(): Unit = { if (a.failOnError) { willNotWorkOnGpu("Fail on error is not supported on GPU when parsing urls.") } - + extractStringLit(a.children(1)).map(_.toUpperCase) match { - case Some("QUERY") if (a.children.size == 3) => - willNotWorkOnGpu("Part to extract QUERY with key is not supported on GPU") + // In Spark, the key in parse_url could act like a regex, but GPU will match the key + // exactly. When key is literal, GPU will check if the key contains regex special and + // fallbcak to CPU if it does, but we are not able to fallback when key is column. + // see Spark issue: https://issues.apache.org/jira/browse/SPARK-44500 + case Some("QUERY") if (a.children.size == 3) => { + extractLit(a.children(2)).foreach { key => + if (key.value != null) { + val keyStr = key.value.asInstanceOf[UTF8String].toString + if (regexMetaChars.exists(keyStr.contains(_))) { + willNotWorkOnGpu(s"Key $keyStr could act like a regex which is not " + + "supported on GPU") + } + } + } + } case Some(part) if GpuParseUrl.isSupportedPart(part) => case Some(other) => willNotWorkOnGpu(s"Part to extract $other is not supported on GPU") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RegexParser.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RegexParser.scala index 3c46ffde5a4..815314e7639 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RegexParser.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RegexParser.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import java.sql.SQLException import scala.collection.mutable.ListBuffer +import com.nvidia.spark.rapids.GpuOverrides.regexMetaChars import com.nvidia.spark.rapids.RegexParser.toReadableString /** @@ -684,7 +685,6 @@ sealed class RegexRewriteFlags(val emptyRepetition: Boolean) RegexSplitMode if performing a split (string_split) */ class CudfRegexTranspiler(mode: RegexMode) { - private val regexMetaChars = ".$^[]\\|?*+(){}" private val regexPunct = "!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~" private val escapeChars = Map('n' -> '\n', 'r' -> '\r', 't' -> '\t', 'f' -> '\f', 'a' -> '\u0007', 'b' -> '\b', 'e' -> '\u001b') diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuParseUrl.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuParseUrl.scala index 8b6769bf810..6a72c9e76c2 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuParseUrl.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuParseUrl.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -81,8 +81,18 @@ case class GpuParseUrl(children: Seq[Expression]) // return a null columnvector return GpuColumnVector.columnVectorFromNull(col.getRowCount.toInt, StringType) } - throw new UnsupportedOperationException(s"$this is not supported partToExtract=$part. " + - s"Only PROTOCOL, HOST and QUERY without a key are supported") + val keyStr = key.getValue.asInstanceOf[UTF8String].toString + ParseURI.parseURIQueryWithLiteral(col.getBase, keyStr) + } + + def doColumnar(col: GpuColumnVector, partToExtract: GpuScalar, + key: GpuColumnVector): ColumnVector = { + val part = partToExtract.getValue.asInstanceOf[UTF8String].toString + if (part != QUERY) { + // return a null columnvector + return GpuColumnVector.columnVectorFromNull(col.getRowCount.toInt, StringType) + } + ParseURI.parseURIQueryWithColumn(col.getBase, key.getBase) } override def columnarEval(batch: ColumnarBatch): GpuColumnVector = { @@ -109,6 +119,8 @@ case class GpuParseUrl(children: Seq[Expression]) (urls, parts, keys) match { case (urlCv: GpuColumnVector, partScalar: GpuScalar, keyScalar: GpuScalar) => GpuColumnVector.from(doColumnar(urlCv, partScalar, keyScalar), dataType) + case (urlCv: GpuColumnVector, partScalar: GpuScalar, keyCv: GpuColumnVector) => + GpuColumnVector.from(doColumnar(urlCv, partScalar, keyCv), dataType) case _ => throw new UnsupportedOperationException(s"Cannot columnar evaluate expression: $this") diff --git a/tools/generated_files/supportedExprs.csv b/tools/generated_files/supportedExprs.csv index 2dbc386656d..e38af29c3ed 100644 --- a/tools/generated_files/supportedExprs.csv +++ b/tools/generated_files/supportedExprs.csv @@ -386,7 +386,7 @@ Or,S,`or`,None,AST,rhs,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA Or,S,`or`,None,AST,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA ParseUrl,S,`parse_url`,None,project,url,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA ParseUrl,S,`parse_url`,None,project,partToExtract,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA -ParseUrl,S,`parse_url`,None,project,key,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA +ParseUrl,S,`parse_url`,None,project,key,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA ParseUrl,S,`parse_url`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA PercentRank,S,`percent_rank`,None,window,ordering,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NS,NS,NS PercentRank,S,`percent_rank`,None,window,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA