diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 14d3c4d1f42f4..9f9b5655e26fc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -180,8 +180,8 @@ private[spark] class Client( throw e } + val sId = Client.submissionId(conf.namespace, driverPodName) if (conf.get(WAIT_FOR_APP_COMPLETION)) { - val sId = Seq(conf.namespace, driverPodName).mkString(":") breakable { while (true) { val podWithName = kubernetesClient @@ -202,10 +202,17 @@ private[spark] class Client( } } } + } else { + logInfo(s"Deployed Spark application ${conf.appName} with application ID ${conf.appId} " + + s"and submission ID $sId into Kubernetes") } } } +private[spark] object Client { + def submissionId(namespace: String, driverPodName: String): String = s"$namespace:$driverPodName" +} + /** * Main class and entry point of application submission in KUBERNETES mode. */ diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 81d9145725393..bc8b023b5ecdf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -95,8 +95,9 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) this.notifyAll() } - override def watchOrStop(sId: String): Boolean = if (conf.get(WAIT_FOR_APP_COMPLETION)) { - logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") + override def watchOrStop(sId: String): Boolean = { + logInfo(s"Waiting for application ${conf.appName} with application ID ${conf.appId} " + + s"and submission ID $sId to finish...") val interval = conf.get(REPORT_INTERVAL) synchronized { while (!podCompleted && !resourceTooOldReceived) { @@ -109,12 +110,9 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) logInfo( pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } .getOrElse("No containers were found in the driver pod.")) - logInfo(s"Application ${conf.appName} with submission ID $sId finished") + logInfo(s"Application ${conf.appName} with application ID ${conf.appId} " + + s"and submission ID $sId finished") } podCompleted - } else { - logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes") - // Always act like the application has completed since we don't want to wait for app completion - true } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index a8c25ab5002c0..8c2be6c142d74 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -33,8 +33,10 @@ import org.scalatestplus.mockito.MockitoSugar._ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{Config, _} +import org.apache.spark.deploy.k8s.Config.WAIT_FOR_APP_COMPLETION import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ +import org.apache.spark.deploy.k8s.submit.Client.submissionId import org.apache.spark.util.Utils class ClientSuite extends SparkFunSuite with BeforeAndAfter { @@ -181,7 +183,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { when(podsWithNamespace.resource(fullExpectedPod())).thenReturn(namedPods) when(namedPods.create()).thenReturn(podWithOwnerReference()) when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch]) - when(loggingPodStatusWatcher.watchOrStop(kconf.namespace + ":" + POD_NAME)).thenReturn(true) + val sId = submissionId(kconf.namespace, POD_NAME) + when(loggingPodStatusWatcher.watchOrStop(sId)).thenReturn(true) doReturn(resourceList) .when(kubernetesClient) .resourceList(createdResourcesArgumentCaptor.capture()) @@ -343,6 +346,31 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { kubernetesClient, loggingPodStatusWatcher) submissionClient.run() - verify(loggingPodStatusWatcher).watchOrStop(kconf.namespace + ":driver") + verify(loggingPodStatusWatcher).watchOrStop(submissionId(kconf.namespace, POD_NAME)) + } + + test("SPARK-42813: Print application info when waitAppCompletion is false") { + val appName = "SPARK-42813" + val logAppender = new LogAppender + withLogAppender(logAppender) { + val sparkConf = new SparkConf(loadDefaults = false) + .set("spark.app.name", appName) + .set(WAIT_FOR_APP_COMPLETION, false) + kconf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf, + resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX)) + when(driverBuilder.buildFromFeatures(kconf, kubernetesClient)) + .thenReturn(BUILT_KUBERNETES_SPEC) + val submissionClient = new Client( + kconf, + driverBuilder, + kubernetesClient, + loggingPodStatusWatcher) + submissionClient.run() + } + val appId = KubernetesTestConf.APP_ID + val sId = submissionId(kconf.namespace, POD_NAME) + assert(logAppender.loggingEvents.map(_.getMessage.getFormattedMessage).contains( + s"Deployed Spark application $appName with application ID $appId " + + s"and submission ID $sId into Kubernetes")) } }