Skip to content

Commit

Permalink
Merge branch 'branch-25.02' into trunc
Browse files Browse the repository at this point in the history
  • Loading branch information
ttnghia committed Dec 10, 2024
2 parents 9e7fb91 + 4fbecbc commit 9d79c33
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 80 deletions.
21 changes: 12 additions & 9 deletions .github/workflows/mvn-verify-check/populate-daily-cache.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.

set -x
max_retry=3; delay=30; i=1
set -e
set -o pipefail

if [[ $SCALA_VER == '2.12' ]]; then
pom='pom.xml'
elif [[ $SCALA_VER == '2.13' ]]; then
pom='scala2.13/pom.xml'
fi

max_retry=3; delay=30; i=1
while true; do
buildvers=($(python build/get_buildvers.py no_snapshots $pom | tr -d ',')) &&
{
python build/get_buildvers.py "no_snapshots.buildvers" $pom | tr -d ',' | \
xargs -n 1 -I {} bash -c \
"mvn $COMMON_MVN_FLAGS --file $pom -Dbuildver={} de.qaware.maven:go-offline-maven-plugin:resolve-dependencies"

for buildver in "${buildvers[@]}"; do
mvn $COMMON_MVN_FLAGS --file $pom -Dbuildver=$buildver de.qaware.maven:go-offline-maven-plugin:resolve-dependencies
done
} && {
# compile base versions to cache scala compiler and compiler bridge
mvn $COMMON_MVN_FLAGS --file $pom \
process-test-resources -pl sql-plugin-api -am
mvn $COMMON_MVN_FLAGS --file $pom process-test-resources -pl sql-plugin-api -am
} && break || {
if [[ $i -le $max_retry ]]; then
echo "mvn command failed. Retry $i/$max_retry."; ((i++)); sleep $delay; ((delay=delay*2))
else
echo "mvn command failed. Exit 1"; exit 1
fi
}
done
done
10 changes: 4 additions & 6 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,14 +974,12 @@ def do_it(spark):
def test_lead_lag_for_structs_with_arrays(a_b_gen, struct_gen):
data_gen = [
('a', RepeatSeqGen(a_b_gen, length=20)),
('b', IntegerGen(nullable=False, special_cases=[])),
('b', UniqueLongGen(nullable=False)),
('c', struct_gen)]
# By default for many operations a range of unbounded to unbounded is used
# This will not work until https://github.com/NVIDIA/spark-rapids/issues/216
# is fixed.
# For many operations, a range of unbounded to unbounded is used by default.

# Ordering needs to include c because with nulls and especially on booleans
# it is possible to get a different ordering when it is ambiguous.
# Ordering needs to include `b` because with nulls and especially on booleans,
# it is possible to get a different result when the ordering is ambiguous.
base_window_spec = Window.partitionBy('a').orderBy('b')

def do_it(spark):
Expand Down
90 changes: 30 additions & 60 deletions jenkins/Jenkinsfile-blossom.premerge-databricks
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ pipeline {
DATABRICKS_PUBKEY = credentials("SPARK_DATABRICKS_PUBKEY")
DATABRICKS_DRIVER = DbUtils.getDriver("$DB_TYPE")
DATABRICKS_WORKER = DbUtils.getWorker("$DB_TYPE")
INIT_SCRIPTS_DIR = "/databricks/init_scripts/${BUILD_TAG}"
TEST_TYPE = 'pre-commit'
}

Expand Down Expand Up @@ -111,12 +110,16 @@ pipeline {
BASE_SPARK_VERSION = DbUtils.getSparkVer("$DB_RUNTIME")
BASE_SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS = DbUtils.getInstallVer("$DB_RUNTIME")
INIT_SCRIPTS = DbUtils.getInitScripts("$DB_RUNTIME")
INIT_SCRIPTS_DIR = "/databricks/init_scripts/${BUILD_TAG}-${DB_RUNTIME}"
EXTRA_ENVS = "TEST_MODE=$TEST_MODE"
}
steps {
script {
unstash('source_tree')
databricksBuild()
deleteDir() // cleanup content if no error
container('cpu') {
unstash('source_tree')
databricksBuild()
deleteDir() // cleanup content if no error
}
}
}
}
Expand All @@ -134,79 +137,46 @@ String getDbType() {
void databricksBuild() {
def CLUSTER_ID = ''
def SPARK_MAJOR = BASE_SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS.replace('.', '')
def ws_path = "$INIT_SCRIPTS_DIR-$DB_TYPE"
def dbStep = ''
try {
stage("Create $SPARK_MAJOR DB") {
script {
container('cpu') {
sh "rm -rf spark-rapids-ci.tgz"
sh "tar -zcf spark-rapids-ci.tgz *"
def CREATE_PARAMS = " -r $DATABRICKS_RUNTIME -w $DATABRICKS_HOST -t $DATABRICKS_TOKEN" +
" -s $DB_TYPE -n CI-${BUILD_TAG}-${BASE_SPARK_VERSION} -k \"$DATABRICKS_PUBKEY\" -i $IDLE_TIMEOUT" +
" -d $DATABRICKS_DRIVER -o $DATABRICKS_WORKER -e $NUM_WORKERS"

// handle init scripts if exist
if (env.INIT_SCRIPTS) {
// foo.sh,bar.sh --> /path/foo.sh,/path/bar.sh
CREATE_PARAMS += " -f " + DbUtils.uploadFiles(this, env.INIT_SCRIPTS, ws_path)
}

CLUSTER_ID = sh(script: "python3 ./jenkins/databricks/create.py $CREATE_PARAMS",
returnStdout: true).trim()
echo CLUSTER_ID
}
}
dbStep = 'CREATE'
// Add the init_script parameter, e.g. oo.sh,bar.sh --> /path/foo.sh,/path/bar.sh
def input_params = env.INIT_SCRIPTS ? " -f " + DbUtils.uploadFiles(this, env.INIT_SCRIPTS, env.INIT_SCRIPTS_DIR) : ''
def CREATE_PARAMS = DbUtils.getParameters(this, dbStep, input_params)
CLUSTER_ID = sh(script: "python3 ./jenkins/databricks/create.py $CREATE_PARAMS", returnStdout: true).trim()
echo CLUSTER_ID
}

stage("Build against $SPARK_MAJOR DB") {
script {
container('cpu') {
withCredentials([file(credentialsId: 'SPARK_DATABRICKS_PRIVKEY', variable: 'DATABRICKS_PRIVKEY')]) {
def BUILD_PARAMS = " -w $DATABRICKS_HOST -t $DATABRICKS_TOKEN -c $CLUSTER_ID -z ./spark-rapids-ci.tgz" +
" -p $DATABRICKS_PRIVKEY -l ./jenkins/databricks/build.sh -d /home/ubuntu/build.sh" +
" -v $BASE_SPARK_VERSION -i $BASE_SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS"

// add retry for build step to try
// mitigate the issue of downloading dependencies while maven/sonatype is quite unstable
retry(3) {
sh "python3 ./jenkins/databricks/run-build.py $BUILD_PARAMS"
}
}
sh "rm -rf spark-rapids-ci.tgz"
sh "tar -zcf spark-rapids-ci.tgz * .git"
dbStep = 'BUILD'
withCredentials([file(credentialsId: 'SPARK_DATABRICKS_PRIVKEY', variable: 'DATABRICKS_PRIVKEY')]) {
def BUILD_PARAMS = DbUtils.getParameters(this, dbStep, "-c $CLUSTER_ID")
retry(3) {
sh "python3 ./jenkins/databricks/run-build.py $BUILD_PARAMS"
}
}
sh "rm spark-rapids-ci.tgz"
}

// TODO: Temporarily skip tests on Databricks 14.3 until the test failures are fixed
if (env.DB_RUNTIME != '14.3') {
stage("Test agaist $SPARK_MAJOR DB") {
script {
container('cpu') {
try {
withCredentials([file(credentialsId: 'SPARK_DATABRICKS_PRIVKEY', variable: 'DATABRICKS_PRIVKEY')]) {
def TEST_PARAMS = " -w $DATABRICKS_HOST -t $DATABRICKS_TOKEN -c $CLUSTER_ID -e TEST_MODE=$TEST_MODE" +
" -p $DATABRICKS_PRIVKEY -l ./jenkins/databricks/test.sh -v $BASE_SPARK_VERSION -d /home/ubuntu/test.sh"
if (params.SPARK_CONF) {
TEST_PARAMS += " -f ${params.SPARK_CONF}"
}
sh "python3 ./jenkins/databricks/run-tests.py $TEST_PARAMS"
}
} finally {
common.publishPytestResult(this, "${STAGE_NAME}")
}
}
dbStep = 'TEST'
withCredentials([file(credentialsId: 'SPARK_DATABRICKS_PRIVKEY', variable: 'DATABRICKS_PRIVKEY')]) {
def TEST_PARAMS = DbUtils.getParameters(this, dbStep, "-c $CLUSTER_ID")
sh "python3 ./jenkins/databricks/run-tests.py $TEST_PARAMS"
}
}
}

} finally {
if (CLUSTER_ID) {
container('cpu') {
retry(3) {
if (env.INIT_SCRIPTS) {
DbUtils.cleanUp(this, ws_path)
}
sh "python3 ./jenkins/databricks/shutdown.py -s $DATABRICKS_HOST -t $DATABRICKS_TOKEN -c $CLUSTER_ID -d"
}
(dbStep == 'TEST') ? common.publishPytestResult(this, "Test against $SPARK_MAJOR DB") : ''
retry(3) {
env.INIT_SCRIPTS ? DbUtils.cleanUp(this, env.INIT_SCRIPTS_DIR) : ''
sh "python3 ./jenkins/databricks/shutdown.py -s $DATABRICKS_HOST -t $DATABRICKS_TOKEN -c $CLUSTER_ID -d"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,9 @@ object GpuToTimestamp {
case _ =>
// this is the incompatibleDateFormats case where we do not guarantee compatibility with
// Spark and assume that all non-null inputs are valid
ColumnVector.fromScalar(Scalar.fromBool(true), col.getRowCount.toInt)
withResource(Scalar.fromBool(true)) { s =>
ColumnVector.fromScalar(s, col.getRowCount.toInt)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,11 +658,15 @@ abstract class GpuBroadcastNestedLoopJoinExecBase(

localJoinType match {
case LeftOuter if spillableBuiltBatch.numRows == 0 =>
new EmptyOuterNestedLoopJoinIterator(streamedIter, spillableBuiltBatch.dataTypes,
true)
withResource(spillableBuiltBatch) { _ =>
new EmptyOuterNestedLoopJoinIterator(streamedIter, spillableBuiltBatch.dataTypes,
true)
}
case RightOuter if spillableBuiltBatch.numRows == 0 =>
new EmptyOuterNestedLoopJoinIterator(streamedIter, spillableBuiltBatch.dataTypes,
false)
withResource(spillableBuiltBatch) { _ =>
new EmptyOuterNestedLoopJoinIterator(streamedIter, spillableBuiltBatch.dataTypes,
false)
}
case _ =>
new CrossJoinIterator(
spillableBuiltBatch,
Expand Down

0 comments on commit 9d79c33

Please sign in to comment.