From 3449c8a772899ff1752ea42efa89f1ddc049cc6a Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Sun, 8 Dec 2024 15:45:53 -0600 Subject: [PATCH 1/5] Fixes a leak for the empty nlj iterator (#11832) Signed-off-by: Alessandro Bellina --- .../GpuBroadcastNestedLoopJoinExecBase.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala index 578c1106eb1..b939a8c4155 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala @@ -658,11 +658,15 @@ abstract class GpuBroadcastNestedLoopJoinExecBase( localJoinType match { case LeftOuter if spillableBuiltBatch.numRows == 0 => - new EmptyOuterNestedLoopJoinIterator(streamedIter, spillableBuiltBatch.dataTypes, - true) + withResource(spillableBuiltBatch) { _ => + new EmptyOuterNestedLoopJoinIterator(streamedIter, spillableBuiltBatch.dataTypes, + true) + } case RightOuter if spillableBuiltBatch.numRows == 0 => - new EmptyOuterNestedLoopJoinIterator(streamedIter, spillableBuiltBatch.dataTypes, - false) + withResource(spillableBuiltBatch) { _ => + new EmptyOuterNestedLoopJoinIterator(streamedIter, spillableBuiltBatch.dataTypes, + false) + } case _ => new CrossJoinIterator( spillableBuiltBatch, From d1466b778c6f111f0e6b2f424c8c1d31c72df2b0 Mon Sep 17 00:00:00 2001 From: Tim Liu Date: Mon, 9 Dec 2024 09:59:51 +0800 Subject: [PATCH 2/5] Optimize Databricks Jenkins scripts [skip ci] (#11817) * Optimize Databricks Jenkins scripts Remove duplicate try/catch/container script blocks Move default Databricks parameters into the common Groovy library Signed-off-by: timl * Fix merge conflict Fix merge conflict with https://github.com/NVIDIA/spark-rapids/pull/11819/files#diff-6c8e5cceR72 Signed-off-by: Tim Liu --------- Signed-off-by: timl Signed-off-by: Tim Liu --- .../Jenkinsfile-blossom.premerge-databricks | 90 +++++++------------ 1 file changed, 30 insertions(+), 60 deletions(-) diff --git a/jenkins/Jenkinsfile-blossom.premerge-databricks b/jenkins/Jenkinsfile-blossom.premerge-databricks index 147b6a40e98..abf799371b3 100644 --- a/jenkins/Jenkinsfile-blossom.premerge-databricks +++ b/jenkins/Jenkinsfile-blossom.premerge-databricks @@ -68,7 +68,6 @@ pipeline { DATABRICKS_PUBKEY = credentials("SPARK_DATABRICKS_PUBKEY") DATABRICKS_DRIVER = DbUtils.getDriver("$DB_TYPE") DATABRICKS_WORKER = DbUtils.getWorker("$DB_TYPE") - INIT_SCRIPTS_DIR = "/databricks/init_scripts/${BUILD_TAG}" TEST_TYPE = 'pre-commit' } @@ -111,12 +110,16 @@ pipeline { BASE_SPARK_VERSION = DbUtils.getSparkVer("$DB_RUNTIME") BASE_SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS = DbUtils.getInstallVer("$DB_RUNTIME") INIT_SCRIPTS = DbUtils.getInitScripts("$DB_RUNTIME") + INIT_SCRIPTS_DIR = "/databricks/init_scripts/${BUILD_TAG}-${DB_RUNTIME}" + EXTRA_ENVS = "TEST_MODE=$TEST_MODE" } steps { script { - unstash('source_tree') - databricksBuild() - deleteDir() // cleanup content if no error + container('cpu') { + unstash('source_tree') + databricksBuild() + deleteDir() // cleanup content if no error + } } } } @@ -134,79 +137,46 @@ String getDbType() { void databricksBuild() { def CLUSTER_ID = '' def SPARK_MAJOR = BASE_SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS.replace('.', '') - def ws_path = "$INIT_SCRIPTS_DIR-$DB_TYPE" + def dbStep = '' try { stage("Create $SPARK_MAJOR DB") { - script { - container('cpu') { - sh "rm -rf spark-rapids-ci.tgz" - sh "tar -zcf spark-rapids-ci.tgz *" - def CREATE_PARAMS = " -r $DATABRICKS_RUNTIME -w $DATABRICKS_HOST -t $DATABRICKS_TOKEN" + - " -s $DB_TYPE -n CI-${BUILD_TAG}-${BASE_SPARK_VERSION} -k \"$DATABRICKS_PUBKEY\" -i $IDLE_TIMEOUT" + - " -d $DATABRICKS_DRIVER -o $DATABRICKS_WORKER -e $NUM_WORKERS" - - // handle init scripts if exist - if (env.INIT_SCRIPTS) { - // foo.sh,bar.sh --> /path/foo.sh,/path/bar.sh - CREATE_PARAMS += " -f " + DbUtils.uploadFiles(this, env.INIT_SCRIPTS, ws_path) - } - - CLUSTER_ID = sh(script: "python3 ./jenkins/databricks/create.py $CREATE_PARAMS", - returnStdout: true).trim() - echo CLUSTER_ID - } - } + dbStep = 'CREATE' + // Add the init_script parameter, e.g. oo.sh,bar.sh --> /path/foo.sh,/path/bar.sh + def input_params = env.INIT_SCRIPTS ? " -f " + DbUtils.uploadFiles(this, env.INIT_SCRIPTS, env.INIT_SCRIPTS_DIR) : '' + def CREATE_PARAMS = DbUtils.getParameters(this, dbStep, input_params) + CLUSTER_ID = sh(script: "python3 ./jenkins/databricks/create.py $CREATE_PARAMS", returnStdout: true).trim() + echo CLUSTER_ID } stage("Build against $SPARK_MAJOR DB") { - script { - container('cpu') { - withCredentials([file(credentialsId: 'SPARK_DATABRICKS_PRIVKEY', variable: 'DATABRICKS_PRIVKEY')]) { - def BUILD_PARAMS = " -w $DATABRICKS_HOST -t $DATABRICKS_TOKEN -c $CLUSTER_ID -z ./spark-rapids-ci.tgz" + - " -p $DATABRICKS_PRIVKEY -l ./jenkins/databricks/build.sh -d /home/ubuntu/build.sh" + - " -v $BASE_SPARK_VERSION -i $BASE_SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS" - - // add retry for build step to try - // mitigate the issue of downloading dependencies while maven/sonatype is quite unstable - retry(3) { - sh "python3 ./jenkins/databricks/run-build.py $BUILD_PARAMS" - } - } + sh "rm -rf spark-rapids-ci.tgz" + sh "tar -zcf spark-rapids-ci.tgz * .git" + dbStep = 'BUILD' + withCredentials([file(credentialsId: 'SPARK_DATABRICKS_PRIVKEY', variable: 'DATABRICKS_PRIVKEY')]) { + def BUILD_PARAMS = DbUtils.getParameters(this, dbStep, "-c $CLUSTER_ID") + retry(3) { + sh "python3 ./jenkins/databricks/run-build.py $BUILD_PARAMS" } } + sh "rm spark-rapids-ci.tgz" } // TODO: Temporarily skip tests on Databricks 14.3 until the test failures are fixed if (env.DB_RUNTIME != '14.3') { stage("Test agaist $SPARK_MAJOR DB") { - script { - container('cpu') { - try { - withCredentials([file(credentialsId: 'SPARK_DATABRICKS_PRIVKEY', variable: 'DATABRICKS_PRIVKEY')]) { - def TEST_PARAMS = " -w $DATABRICKS_HOST -t $DATABRICKS_TOKEN -c $CLUSTER_ID -e TEST_MODE=$TEST_MODE" + - " -p $DATABRICKS_PRIVKEY -l ./jenkins/databricks/test.sh -v $BASE_SPARK_VERSION -d /home/ubuntu/test.sh" - if (params.SPARK_CONF) { - TEST_PARAMS += " -f ${params.SPARK_CONF}" - } - sh "python3 ./jenkins/databricks/run-tests.py $TEST_PARAMS" - } - } finally { - common.publishPytestResult(this, "${STAGE_NAME}") - } - } + dbStep = 'TEST' + withCredentials([file(credentialsId: 'SPARK_DATABRICKS_PRIVKEY', variable: 'DATABRICKS_PRIVKEY')]) { + def TEST_PARAMS = DbUtils.getParameters(this, dbStep, "-c $CLUSTER_ID") + sh "python3 ./jenkins/databricks/run-tests.py $TEST_PARAMS" } } } - } finally { if (CLUSTER_ID) { - container('cpu') { - retry(3) { - if (env.INIT_SCRIPTS) { - DbUtils.cleanUp(this, ws_path) - } - sh "python3 ./jenkins/databricks/shutdown.py -s $DATABRICKS_HOST -t $DATABRICKS_TOKEN -c $CLUSTER_ID -d" - } + (dbStep == 'TEST') ? common.publishPytestResult(this, "Test against $SPARK_MAJOR DB") : '' + retry(3) { + env.INIT_SCRIPTS ? DbUtils.cleanUp(this, env.INIT_SCRIPTS_DIR) : '' + sh "python3 ./jenkins/databricks/shutdown.py -s $DATABRICKS_HOST -t $DATABRICKS_TOKEN -c $CLUSTER_ID -d" } } } From 0dbef90ffd55de5099ba3e9f3de89a7d159d3e0a Mon Sep 17 00:00:00 2001 From: YanxuanLiu Date: Mon, 9 Dec 2024 13:33:37 +0800 Subject: [PATCH 3/5] Fix bug: populate cache deps [skip ci] (#11811) * correct arg of get_buildvers.py Signed-off-by: YanxuanLiu * output fail info Signed-off-by: YanxuanLiu * fail the script when error occur Signed-off-by: YanxuanLiu * test error Signed-off-by: YanxuanLiu * test error Signed-off-by: YanxuanLiu * split command to avoid masking error Signed-off-by: YanxuanLiu --------- Signed-off-by: YanxuanLiu --- .../mvn-verify-check/populate-daily-cache.sh | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/.github/workflows/mvn-verify-check/populate-daily-cache.sh b/.github/workflows/mvn-verify-check/populate-daily-cache.sh index b93cd0b6b49..d4e9b07d1a7 100755 --- a/.github/workflows/mvn-verify-check/populate-daily-cache.sh +++ b/.github/workflows/mvn-verify-check/populate-daily-cache.sh @@ -14,22 +14,25 @@ # See the License for the specific language governing permissions and # limitations under the License. -set -x -max_retry=3; delay=30; i=1 +set -e +set -o pipefail + if [[ $SCALA_VER == '2.12' ]]; then pom='pom.xml' elif [[ $SCALA_VER == '2.13' ]]; then pom='scala2.13/pom.xml' fi + +max_retry=3; delay=30; i=1 while true; do + buildvers=($(python build/get_buildvers.py no_snapshots $pom | tr -d ',')) && { - python build/get_buildvers.py "no_snapshots.buildvers" $pom | tr -d ',' | \ - xargs -n 1 -I {} bash -c \ - "mvn $COMMON_MVN_FLAGS --file $pom -Dbuildver={} de.qaware.maven:go-offline-maven-plugin:resolve-dependencies" - + for buildver in "${buildvers[@]}"; do + mvn $COMMON_MVN_FLAGS --file $pom -Dbuildver=$buildver de.qaware.maven:go-offline-maven-plugin:resolve-dependencies + done + } && { # compile base versions to cache scala compiler and compiler bridge - mvn $COMMON_MVN_FLAGS --file $pom \ - process-test-resources -pl sql-plugin-api -am + mvn $COMMON_MVN_FLAGS --file $pom process-test-resources -pl sql-plugin-api -am } && break || { if [[ $i -le $max_retry ]]; then echo "mvn command failed. Retry $i/$max_retry."; ((i++)); sleep $delay; ((delay=delay*2)) @@ -37,4 +40,4 @@ while true; do echo "mvn command failed. Exit 1"; exit 1 fi } -done \ No newline at end of file +done From 45cdac34667638b4d29e0ec5aab663d2588e3f26 Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 9 Dec 2024 10:10:03 -0800 Subject: [PATCH 4/5] Fix for lead/lag window test failures. (#11823) Fixes #11807. `test_lead_lag_for_structs_with_arrays` in `window_function_test` fails intermittently because of non-deterministic data ordering. Window function tests are sensitive to data ordering. With certain values of DATAGEN_SEED, there are repeated values of partitioning/ordering keys, causing the window function to return different values on CPU and GPU. This commit fixes the test so that the ordering is deterministic. Signed-off-by: MithunR --- .../src/main/python/window_function_test.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 653eaffa940..7695c1adc9d 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -971,14 +971,12 @@ def do_it(spark): def test_lead_lag_for_structs_with_arrays(a_b_gen, struct_gen): data_gen = [ ('a', RepeatSeqGen(a_b_gen, length=20)), - ('b', IntegerGen(nullable=False, special_cases=[])), + ('b', UniqueLongGen(nullable=False)), ('c', struct_gen)] - # By default for many operations a range of unbounded to unbounded is used - # This will not work until https://github.com/NVIDIA/spark-rapids/issues/216 - # is fixed. + # For many operations, a range of unbounded to unbounded is used by default. - # Ordering needs to include c because with nulls and especially on booleans - # it is possible to get a different ordering when it is ambiguous. + # Ordering needs to include `b` because with nulls and especially on booleans, + # it is possible to get a different result when the ordering is ambiguous. base_window_spec = Window.partitionBy('a').orderBy('b') def do_it(spark): From 96a58d121a5af7ef956196a6141fe7777277e95e Mon Sep 17 00:00:00 2001 From: Kuhu Shukla Date: Mon, 9 Dec 2024 19:47:01 -0600 Subject: [PATCH 5/5] Fix leak in isTimeStamp (#11845) Signed-off-by: Kuhu Shukla --- .../org/apache/spark/sql/rapids/datetimeExpressions.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 0f382a7b6e6..d08c598cba4 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -707,7 +707,9 @@ object GpuToTimestamp { case _ => // this is the incompatibleDateFormats case where we do not guarantee compatibility with // Spark and assume that all non-null inputs are valid - ColumnVector.fromScalar(Scalar.fromBool(true), col.getRowCount.toInt) + withResource(Scalar.fromBool(true)) { s => + ColumnVector.fromScalar(s, col.getRowCount.toInt) + } } }