Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use parse_url kernel for QUERY literal and column key #10211

Merged
merged 5 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,10 @@ The following regular expression patterns are not yet supported on the GPU and w

Work is ongoing to increase the range of regular expressions that can run on the GPU.

## URL Parsing

`parse_url` QUERY with a column key could produce different results on CPU and GPU. In Spark, the `key` in `parse_url` could act like a regex, but GPU will match the key exactly. If key is literal, GPU will check if key contains regex special characters and fallback to CPU if it does, but if key is column, it will not be able to fallback. For example, `parse_url("http://foo/bar?abc=BAD&a.c=GOOD", QUERY, "a.c")` will return "BAD" on CPU, but "GOOD" on GPU. See the Spark issue: https://issues.apache.org/jira/browse/SPARK-44500

## Timestamps

Spark stores timestamps internally relative to the JVM time zone. Converting an arbitrary timestamp
Expand Down
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -10817,7 +10817,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>only support partToExtract = PROTOCOL | HOST;<br/>Literal value only</em></td>
<td><em>PS<br/>only support partToExtract = PROTOCOL | HOST | QUERY;<br/>Literal value only</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand All @@ -10838,7 +10838,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>Literal value only</em></td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
Expand Down
38 changes: 30 additions & 8 deletions integration_tests/src/main/python/url_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -26,7 +26,7 @@
r'(:[0-9]{1,3}){0,1}(/[a-zA-Z0-9]{1,3}){0,3}(\?[a-zA-Z0-9]{1,3}=[a-zA-Z0-9]{1,3}){0,1}(#([a-zA-Z0-9]{1,3})){0,1}'

url_pattern_with_key = r'((http|https|ftp|file)://)(([a-z]{1,3}\.){0,3}([a-z]{1,3})\.([a-z]{1,3}))' \
r'(:[0-9]{1,3}){0,1}(/[a-z]{1,3}){0,3}(\?key=[a-z]{1,3}){0,1}(#([a-z]{1,3})){0,1}'
r'(:[0-9]{1,3}){0,1}(/[a-z]{1,3}){0,3}(\?[a-c]{1,3}=[a-z]{1,3}(&[a-c]{1,3}=[a-z]{1,3}){0,3}){0,1}(#([a-z]{1,3})){0,1}'

edge_cases = [
"[email protected]/path?query=1#Ref",
Expand Down Expand Up @@ -150,8 +150,8 @@

supported_parts = ['PROTOCOL', 'HOST', 'QUERY']
unsupported_parts = ['PATH', 'REF', 'FILE', 'AUTHORITY', 'USERINFO']
supported_with_key_parts = ['PROTOCOL', 'HOST']
unsupported_with_key_parts = ['QUERY', 'PATH', 'REF', 'FILE', 'AUTHORITY', 'USERINFO']
supported_with_key_parts = ['PROTOCOL', 'HOST', 'QUERY']
unsupported_with_key_parts = ['PATH', 'REF', 'FILE', 'AUTHORITY', 'USERINFO']

@pytest.mark.parametrize('data_gen', [url_gen, edge_cases_gen], ids=idfn)
@pytest.mark.parametrize('part', supported_parts, ids=idfn)
Expand All @@ -166,16 +166,38 @@ def test_parse_url_unsupported_fallback(part):
lambda spark: unary_op_df(spark, url_gen).selectExpr("a", "parse_url(a, '" + part + "')"),
'ParseUrl')

def test_parse_url_query_with_key():
url_gen = StringGen(url_pattern_with_key)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, url_gen)
.selectExpr("a", "parse_url(a, 'QUERY', 'abc')", "parse_url(a, 'QUERY', 'a')")
)
revans2 marked this conversation as resolved.
Show resolved Hide resolved

def test_parse_url_query_with_key_column():
url_gen = StringGen(url_pattern_with_key)
key_gen = StringGen('[a-d]{1,3}')
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, url_gen, key_gen)
.selectExpr("a", "parse_url(a, 'QUERY', b)")
)

@pytest.mark.parametrize('key', ['a?c', '*'], ids=idfn)
@allow_non_gpu('ProjectExec', 'ParseUrl')
def test_parse_url_query_with_key_regex_fallback(key):
url_gen = StringGen(url_pattern_with_key)
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, url_gen)
.selectExpr("a", "parse_url(a, 'QUERY', '" + key + "')"),
'ParseUrl')

@pytest.mark.parametrize('part', supported_with_key_parts, ids=idfn)
def test_parse_url_with_key(part):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, url_gen).selectExpr("parse_url(a, '" + part + "', 'key')"))



@allow_non_gpu('ProjectExec', 'ParseUrl')
@pytest.mark.parametrize('part', unsupported_with_key_parts, ids=idfn)
def test_parse_url_query_with_key_fallback(part):
def test_parse_url_with_key_fallback(part):
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, url_gen).selectExpr("parse_url(a, '" + part + "', 'key')"),
'ParseUrl')
'ParseUrl')
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ object GpuOverrides extends Logging {
"\f", "\\a", "\\e", "\\cx", "[", "]", "^", "&", ".", "*", "\\d", "\\D", "\\h", "\\H", "\\s",
"\\S", "\\v", "\\V", "\\w", "\\w", "\\p", "$", "\\b", "\\B", "\\A", "\\G", "\\Z", "\\z", "\\R",
"?", "|", "(", ")", "{", "}", "\\k", "\\Q", "\\E", ":", "!", "<=", ">")

val regexMetaChars = ".$^[]\\|?*+(){}"
/**
* Provides a way to log an info message about how long an operation took in milliseconds.
*/
Expand Down Expand Up @@ -3252,18 +3252,32 @@ object GpuOverrides extends Logging {
ExprChecks.projectOnly(TypeSig.STRING, TypeSig.STRING,
Seq(ParamCheck("url", TypeSig.STRING, TypeSig.STRING),
ParamCheck("partToExtract", TypeSig.lit(TypeEnum.STRING).withPsNote(
TypeEnum.STRING, "only support partToExtract = PROTOCOL | HOST"), TypeSig.STRING)),
TypeEnum.STRING, "only support partToExtract = PROTOCOL | HOST | QUERY"),
TypeSig.STRING)),
// Should really be an OptionalParam
Some(RepeatingParamCheck("key", TypeSig.lit(TypeEnum.STRING), TypeSig.STRING))),
Some(RepeatingParamCheck("key", TypeSig.STRING, TypeSig.STRING))),
(a, conf, p, r) => new ExprMeta[ParseUrl](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
if (a.failOnError) {
willNotWorkOnGpu("Fail on error is not supported on GPU when parsing urls.")
}

extractStringLit(a.children(1)).map(_.toUpperCase) match {
case Some("QUERY") if (a.children.size == 3) =>
willNotWorkOnGpu("Part to extract QUERY with key is not supported on GPU")
// In Spark, the key in parse_url could act like a regex, but GPU will match the key
// exactly. When key is literal, GPU will check if the key contains regex special and
// fallbcak to CPU if it does, but we are not able to fallback when key is column.
// see Spark issue: https://issues.apache.org/jira/browse/SPARK-44500
case Some("QUERY") if (a.children.size == 3) => {
extractLit(a.children(2)).foreach { key =>
if (key.value != null) {
val keyStr = key.value.asInstanceOf[UTF8String].toString
if (regexMetaChars.exists(keyStr.contains(_))) {
willNotWorkOnGpu(s"Key $keyStr could act like a regex which is not " +
"supported on GPU")
}
}
}
}
case Some(part) if GpuParseUrl.isSupportedPart(part) =>
case Some(other) =>
willNotWorkOnGpu(s"Part to extract $other is not supported on GPU")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@ import java.sql.SQLException

import scala.collection.mutable.ListBuffer

import com.nvidia.spark.rapids.GpuOverrides.regexMetaChars
import com.nvidia.spark.rapids.RegexParser.toReadableString

/**
Expand Down Expand Up @@ -684,7 +685,6 @@ sealed class RegexRewriteFlags(val emptyRepetition: Boolean)
RegexSplitMode if performing a split (string_split)
*/
class CudfRegexTranspiler(mode: RegexMode) {
private val regexMetaChars = ".$^[]\\|?*+(){}"
private val regexPunct = "!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~"
private val escapeChars = Map('n' -> '\n', 'r' -> '\r', 't' -> '\t', 'f' -> '\f', 'a' -> '\u0007',
'b' -> '\b', 'e' -> '\u001b')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -81,8 +81,18 @@ case class GpuParseUrl(children: Seq[Expression])
// return a null columnvector
return GpuColumnVector.columnVectorFromNull(col.getRowCount.toInt, StringType)
}
throw new UnsupportedOperationException(s"$this is not supported partToExtract=$part. " +
s"Only PROTOCOL, HOST and QUERY without a key are supported")
val keyStr = key.getValue.asInstanceOf[UTF8String].toString
ParseURI.parseURIQueryWithLiteral(col.getBase, keyStr)
}

def doColumnar(col: GpuColumnVector, partToExtract: GpuScalar,
key: GpuColumnVector): ColumnVector = {
val part = partToExtract.getValue.asInstanceOf[UTF8String].toString
if (part != QUERY) {
// return a null columnvector
return GpuColumnVector.columnVectorFromNull(col.getRowCount.toInt, StringType)
}
ParseURI.parseURIQueryWithColumn(col.getBase, key.getBase)
}

override def columnarEval(batch: ColumnarBatch): GpuColumnVector = {
Expand All @@ -109,6 +119,8 @@ case class GpuParseUrl(children: Seq[Expression])
(urls, parts, keys) match {
case (urlCv: GpuColumnVector, partScalar: GpuScalar, keyScalar: GpuScalar) =>
GpuColumnVector.from(doColumnar(urlCv, partScalar, keyScalar), dataType)
case (urlCv: GpuColumnVector, partScalar: GpuScalar, keyCv: GpuColumnVector) =>
GpuColumnVector.from(doColumnar(urlCv, partScalar, keyCv), dataType)
case _ =>
throw new
UnsupportedOperationException(s"Cannot columnar evaluate expression: $this")
Expand Down
2 changes: 1 addition & 1 deletion tools/generated_files/supportedExprs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ Or,S,`or`,None,AST,rhs,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Or,S,`or`,None,AST,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
ParseUrl,S,`parse_url`,None,project,url,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
ParseUrl,S,`parse_url`,None,project,partToExtract,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA
ParseUrl,S,`parse_url`,None,project,key,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA
ParseUrl,S,`parse_url`,None,project,key,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
ParseUrl,S,`parse_url`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
PercentRank,S,`percent_rank`,None,window,ordering,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NS,NS,NS
PercentRank,S,`percent_rank`,None,window,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Expand Down
Loading