From ba1badd2adfd175c6680dff90e14b8aaa6cecd78 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Tue, 21 Mar 2023 09:08:18 -0700 Subject: [PATCH] [SPARK-42813][K8S] Print application info when waitAppCompletion is false ### What changes were proposed in this pull request? On K8s cluster mode, 1. when `spark.kubernetes.submission.waitAppCompletion=false`, print the application information on `spark-submit` exit, as it did before [SPARK-35174](https://issues.apache.org/jira/browse/SPARK-35174) 2. add `appId` in the output message ### Why are the changes needed? On K8s cluster mode, when `spark.kubernetes.submission.waitAppCompletion=false`, before [SPARK-35174](https://issues.apache.org/jira/browse/SPARK-35174), the `spark-submit` will exit quickly w/ the basic application information. ``` logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes") ``` After [SPARK-35174](https://issues.apache.org/jira/browse/SPARK-35174), those part of code is unreachable, so nothing is output. This PR also proposes to add `appId` in the output message, to make it consistent w/ the context (if you look at the `LoggingPodStatusWatcherImpl`, this is kind of an exception, `... application $appId ...` is used in other places), and YARN. https://github.com/apache/spark/blob/8860f69455e5a722626194c4797b4b42cccd4510/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L1311-L1318 ### Does this PR introduce _any_ user-facing change? Yes, changes contain 1) when `spark.kubernetes.submission.waitAppCompletion=false`, the user can see the app information when `spark-submit` exit. 2) the end of `spark-submit` information contains app id now, which is consistent w/ the context and other resource managers like YARN. ### How was this patch tested? Pass CI. Closes #40444 from pan3793/SPARK-42813. Authored-by: Cheng Pan Signed-off-by: Dongjoon Hyun --- .../submit/KubernetesClientApplication.scala | 9 +++++- .../k8s/submit/LoggingPodStatusWatcher.scala | 12 +++---- .../spark/deploy/k8s/submit/ClientSuite.scala | 32 +++++++++++++++++-- 3 files changed, 43 insertions(+), 10 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 14d3c4d1f42f4..9f9b5655e26fc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -180,8 +180,8 @@ private[spark] class Client( throw e } + val sId = Client.submissionId(conf.namespace, driverPodName) if (conf.get(WAIT_FOR_APP_COMPLETION)) { - val sId = Seq(conf.namespace, driverPodName).mkString(":") breakable { while (true) { val podWithName = kubernetesClient @@ -202,10 +202,17 @@ private[spark] class Client( } } } + } else { + logInfo(s"Deployed Spark application ${conf.appName} with application ID ${conf.appId} " + + s"and submission ID $sId into Kubernetes") } } } +private[spark] object Client { + def submissionId(namespace: String, driverPodName: String): String = s"$namespace:$driverPodName" +} + /** * Main class and entry point of application submission in KUBERNETES mode. */ diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 81d9145725393..bc8b023b5ecdf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -95,8 +95,9 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) this.notifyAll() } - override def watchOrStop(sId: String): Boolean = if (conf.get(WAIT_FOR_APP_COMPLETION)) { - logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") + override def watchOrStop(sId: String): Boolean = { + logInfo(s"Waiting for application ${conf.appName} with application ID ${conf.appId} " + + s"and submission ID $sId to finish...") val interval = conf.get(REPORT_INTERVAL) synchronized { while (!podCompleted && !resourceTooOldReceived) { @@ -109,12 +110,9 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) logInfo( pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } .getOrElse("No containers were found in the driver pod.")) - logInfo(s"Application ${conf.appName} with submission ID $sId finished") + logInfo(s"Application ${conf.appName} with application ID ${conf.appId} " + + s"and submission ID $sId finished") } podCompleted - } else { - logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes") - // Always act like the application has completed since we don't want to wait for app completion - true } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index a8c25ab5002c0..8c2be6c142d74 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -33,8 +33,10 @@ import org.scalatestplus.mockito.MockitoSugar._ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{Config, _} +import org.apache.spark.deploy.k8s.Config.WAIT_FOR_APP_COMPLETION import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ +import org.apache.spark.deploy.k8s.submit.Client.submissionId import org.apache.spark.util.Utils class ClientSuite extends SparkFunSuite with BeforeAndAfter { @@ -181,7 +183,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { when(podsWithNamespace.resource(fullExpectedPod())).thenReturn(namedPods) when(namedPods.create()).thenReturn(podWithOwnerReference()) when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch]) - when(loggingPodStatusWatcher.watchOrStop(kconf.namespace + ":" + POD_NAME)).thenReturn(true) + val sId = submissionId(kconf.namespace, POD_NAME) + when(loggingPodStatusWatcher.watchOrStop(sId)).thenReturn(true) doReturn(resourceList) .when(kubernetesClient) .resourceList(createdResourcesArgumentCaptor.capture()) @@ -343,6 +346,31 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { kubernetesClient, loggingPodStatusWatcher) submissionClient.run() - verify(loggingPodStatusWatcher).watchOrStop(kconf.namespace + ":driver") + verify(loggingPodStatusWatcher).watchOrStop(submissionId(kconf.namespace, POD_NAME)) + } + + test("SPARK-42813: Print application info when waitAppCompletion is false") { + val appName = "SPARK-42813" + val logAppender = new LogAppender + withLogAppender(logAppender) { + val sparkConf = new SparkConf(loadDefaults = false) + .set("spark.app.name", appName) + .set(WAIT_FOR_APP_COMPLETION, false) + kconf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf, + resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX)) + when(driverBuilder.buildFromFeatures(kconf, kubernetesClient)) + .thenReturn(BUILT_KUBERNETES_SPEC) + val submissionClient = new Client( + kconf, + driverBuilder, + kubernetesClient, + loggingPodStatusWatcher) + submissionClient.run() + } + val appId = KubernetesTestConf.APP_ID + val sId = submissionId(kconf.namespace, POD_NAME) + assert(logAppender.loggingEvents.map(_.getMessage.getFormattedMessage).contains( + s"Deployed Spark application $appName with application ID $appId " + + s"and submission ID $sId into Kubernetes")) } }