Skip to content

Commit

Permalink
[SPARK-42813][K8S] Print application info when waitAppCompletion is f…
Browse files Browse the repository at this point in the history
…alse

### What changes were proposed in this pull request?

On K8s cluster mode,

1. when `spark.kubernetes.submission.waitAppCompletion=false`, print the application information on `spark-submit` exit, as it did before [SPARK-35174](https://issues.apache.org/jira/browse/SPARK-35174)

2. add `appId` in the output message

### Why are the changes needed?

On K8s cluster mode, when `spark.kubernetes.submission.waitAppCompletion=false`,
before [SPARK-35174](https://issues.apache.org/jira/browse/SPARK-35174), the `spark-submit` will exit quickly w/ the basic application information.
```
logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes")
```

After [SPARK-35174](https://issues.apache.org/jira/browse/SPARK-35174), those part of code is unreachable, so nothing is output.

This PR also proposes to add `appId` in the output message, to make it consistent w/ the context (if you look at the `LoggingPodStatusWatcherImpl`, this is kind of an exception, `... application $appId ...` is used in other places), and YARN.

https://github.com/apache/spark/blob/8860f69455e5a722626194c4797b4b42cccd4510/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L1311-L1318

### Does this PR introduce _any_ user-facing change?

Yes, changes contain
1) when `spark.kubernetes.submission.waitAppCompletion=false`, the user can see the app information when `spark-submit` exit.
2) the end of `spark-submit` information contains app id now, which is consistent w/ the context and other resource managers like YARN.

### How was this patch tested?

Pass CI.

Closes apache#40444 from pan3793/SPARK-42813.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
pan3793 authored and dongjoon-hyun committed Mar 21, 2023
1 parent c04e0de commit ba1badd
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ private[spark] class Client(
throw e
}

val sId = Client.submissionId(conf.namespace, driverPodName)
if (conf.get(WAIT_FOR_APP_COMPLETION)) {
val sId = Seq(conf.namespace, driverPodName).mkString(":")
breakable {
while (true) {
val podWithName = kubernetesClient
Expand All @@ -202,10 +202,17 @@ private[spark] class Client(
}
}
}
} else {
logInfo(s"Deployed Spark application ${conf.appName} with application ID ${conf.appId} " +
s"and submission ID $sId into Kubernetes")
}
}
}

private[spark] object Client {
def submissionId(namespace: String, driverPodName: String): String = s"$namespace:$driverPodName"
}

/**
* Main class and entry point of application submission in KUBERNETES mode.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
this.notifyAll()
}

override def watchOrStop(sId: String): Boolean = if (conf.get(WAIT_FOR_APP_COMPLETION)) {
logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...")
override def watchOrStop(sId: String): Boolean = {
logInfo(s"Waiting for application ${conf.appName} with application ID ${conf.appId} " +
s"and submission ID $sId to finish...")
val interval = conf.get(REPORT_INTERVAL)
synchronized {
while (!podCompleted && !resourceTooOldReceived) {
Expand All @@ -109,12 +110,9 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
logInfo(
pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
.getOrElse("No containers were found in the driver pod."))
logInfo(s"Application ${conf.appName} with submission ID $sId finished")
logInfo(s"Application ${conf.appName} with application ID ${conf.appId} " +
s"and submission ID $sId finished")
}
podCompleted
} else {
logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes")
// Always act like the application has completed since we don't want to wait for app completion
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ import org.scalatestplus.mockito.MockitoSugar._

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{Config, _}
import org.apache.spark.deploy.k8s.Config.WAIT_FOR_APP_COMPLETION
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.deploy.k8s.submit.Client.submissionId
import org.apache.spark.util.Utils

class ClientSuite extends SparkFunSuite with BeforeAndAfter {
Expand Down Expand Up @@ -181,7 +183,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
when(podsWithNamespace.resource(fullExpectedPod())).thenReturn(namedPods)
when(namedPods.create()).thenReturn(podWithOwnerReference())
when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
when(loggingPodStatusWatcher.watchOrStop(kconf.namespace + ":" + POD_NAME)).thenReturn(true)
val sId = submissionId(kconf.namespace, POD_NAME)
when(loggingPodStatusWatcher.watchOrStop(sId)).thenReturn(true)
doReturn(resourceList)
.when(kubernetesClient)
.resourceList(createdResourcesArgumentCaptor.capture())
Expand Down Expand Up @@ -343,6 +346,31 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
kubernetesClient,
loggingPodStatusWatcher)
submissionClient.run()
verify(loggingPodStatusWatcher).watchOrStop(kconf.namespace + ":driver")
verify(loggingPodStatusWatcher).watchOrStop(submissionId(kconf.namespace, POD_NAME))
}

test("SPARK-42813: Print application info when waitAppCompletion is false") {
val appName = "SPARK-42813"
val logAppender = new LogAppender
withLogAppender(logAppender) {
val sparkConf = new SparkConf(loadDefaults = false)
.set("spark.app.name", appName)
.set(WAIT_FOR_APP_COMPLETION, false)
kconf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf,
resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX))
when(driverBuilder.buildFromFeatures(kconf, kubernetesClient))
.thenReturn(BUILT_KUBERNETES_SPEC)
val submissionClient = new Client(
kconf,
driverBuilder,
kubernetesClient,
loggingPodStatusWatcher)
submissionClient.run()
}
val appId = KubernetesTestConf.APP_ID
val sId = submissionId(kconf.namespace, POD_NAME)
assert(logAppender.loggingEvents.map(_.getMessage.getFormattedMessage).contains(
s"Deployed Spark application $appName with application ID $appId " +
s"and submission ID $sId into Kubernetes"))
}
}

0 comments on commit ba1badd

Please sign in to comment.