From 3804da10a4ce3b7d587644960b1076afbea548b9 Mon Sep 17 00:00:00 2001 From: albertshau Date: Wed, 1 Nov 2023 13:48:17 -0700 Subject: [PATCH 1/2] CDAP-20858 add tag for launch mode in Dataproc job submission metric Added a launch mode tag for the Dataproc job submission metric. --- .../runtime/spi/common/DataprocMetric.java | 111 ++++++++++++++++++ .../runtime/spi/common/DataprocUtils.java | 31 +++-- .../runtimejob/DataprocRuntimeJobManager.java | 19 +-- 3 files changed, 143 insertions(+), 18 deletions(-) create mode 100644 cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocMetric.java diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocMetric.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocMetric.java new file mode 100644 index 000000000000..738c0fb6616f --- /dev/null +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocMetric.java @@ -0,0 +1,111 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.runtime.spi.common; + +import io.cdap.cdap.runtime.spi.runtimejob.LaunchMode; +import javax.annotation.Nullable; + +/** + * Dataproc related metric + */ +public class DataprocMetric { + private final String region; + private final String metricName; + @Nullable + private final Exception exception; + @Nullable + private final LaunchMode launchMode; + + private DataprocMetric(String metricName, String region, @Nullable Exception exception, + @Nullable LaunchMode launchMode) { + this.metricName = metricName; + this.region = region; + this.exception = exception; + this.launchMode = launchMode; + } + + public String getMetricName() { + return metricName; + } + + public String getRegion() { + return region; + } + + @Nullable + public Exception getException() { + return exception; + } + + @Nullable + public LaunchMode getLaunchMode() { + return launchMode; + } + + /** + * Returns a builder to create a DataprocMetric. + * + * @param metricName metric name + * @return Builder to create a DataprocMetric + */ + public static Builder builder(String metricName) { + return new Builder(metricName); + } + + /** + * Builder for a DataprocMetric. + */ + public static class Builder { + private final String metricName; + private String region; + @Nullable + private Exception exception; + @Nullable + private LaunchMode launchMode; + + private Builder(String metricName) { + this.metricName = metricName; + } + + public Builder setRegion(String region) { + this.region = region; + return this; + } + + public Builder setException(@Nullable Exception e) { + this.exception = e; + return this; + } + + public Builder setLaunchMode(@Nullable LaunchMode launchMode) { + this.launchMode = launchMode; + return this; + } + + /** + * Returns a DataprocMetric. + * @return DataprocMetric. + */ + public DataprocMetric build() { + if (region == null) { + // region should always be set unless there is a bug in the code + throw new IllegalStateException("Dataproc metric is missing the region"); + } + return new DataprocMetric(metricName, region, exception, launchMode); + } + } +} diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocUtils.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocUtils.java index ce13b245391e..5efb82abc50f 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocUtils.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocUtils.java @@ -304,7 +304,20 @@ public static String getSystemProjectId() { **/ public static void emitMetric(ProvisionerContext context, String region, String metricName, @Nullable Exception e) { + emitMetric(context, + DataprocMetric.builder(metricName).setRegion(region).setException(e).build()); + } + + public static void emitMetric(ProvisionerContext context, String region, String metricName) { + emitMetric(context, region, metricName, null); + } + + /** + * Emit a dataproc metric. + **/ + public static void emitMetric(ProvisionerContext context, DataprocMetric dataprocMetric) { StatusCode.Code statusCode; + Exception e = dataprocMetric.getException(); if (e == null) { statusCode = StatusCode.Code.OK; } else { @@ -316,16 +329,14 @@ public static void emitMetric(ProvisionerContext context, String region, statusCode = StatusCode.Code.INTERNAL; } } - Map tags = ImmutableMap.builder() - .put("reg", region) - .put("sc", statusCode.toString()) - .build(); - ProvisionerMetrics metrics = context.getMetrics(tags); - metrics.count(metricName, 1); - } - - public static void emitMetric(ProvisionerContext context, String region, String metricName) { - emitMetric(context, region, metricName, null); + ImmutableMap.Builder tags = ImmutableMap.builder() + .put("reg", dataprocMetric.getRegion()) + .put("sc", statusCode.toString()); + if (dataprocMetric.getLaunchMode() != null) { + tags.put("lchmode", dataprocMetric.getLaunchMode().name()); + } + ProvisionerMetrics metrics = context.getMetrics(tags.build()); + metrics.count(dataprocMetric.getMetricName(), 1); } /** diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java index 9b7d5d22b662..b59b6bf534af 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java @@ -57,6 +57,7 @@ import io.cdap.cdap.runtime.spi.CacheableLocalFile; import io.cdap.cdap.runtime.spi.ProgramRunInfo; import io.cdap.cdap.runtime.spi.VersionInfo; +import io.cdap.cdap.runtime.spi.common.DataprocMetric; import io.cdap.cdap.runtime.spi.common.DataprocUtils; import io.cdap.cdap.runtime.spi.provisioner.ProvisionerContext; import io.cdap.cdap.runtime.spi.provisioner.dataproc.DataprocRuntimeException; @@ -293,6 +294,12 @@ public void launch(RuntimeJobInfo runtimeJobInfo) throws Exception { cdapVersionInfo.getFix()); } + LaunchMode launchMode = LaunchMode.valueOf( + provisionerProperties.getOrDefault("launchMode", LaunchMode.CLUSTER.name()).toUpperCase()); + DataprocMetric.Builder submitJobMetric = + DataprocMetric.builder("provisioner.submitJob.response.count") + .setRegion(region) + .setLaunchMode(launchMode); try { // step 1: build twill.jar and launcher.jar and add them to files to be copied to gcs if (disableLocalCaching) { @@ -339,7 +346,7 @@ public void launch(RuntimeJobInfo runtimeJobInfo) throws Exception { } // step 3: build the hadoop job request to be submitted to dataproc - SubmitJobRequest request = getSubmitJobRequest(runtimeJobInfo, uploadedFiles); + SubmitJobRequest request = getSubmitJobRequest(runtimeJobInfo, uploadedFiles, launchMode); // step 4: submit hadoop job to dataproc try { @@ -351,15 +358,13 @@ public void launch(RuntimeJobInfo runtimeJobInfo) throws Exception { LOG.warn("The dataproc job {} already exists. Ignoring resubmission of the job.", request.getJob().getReference().getJobId()); } - DataprocUtils.emitMetric(provisionerContext, region, - "provisioner.submitJob.response.count"); + DataprocUtils.emitMetric(provisionerContext, submitJobMetric.build()); } catch (Exception e) { String errorMessage = String.format("Error while launching job %s on cluster %s.", getJobId(runInfo), clusterName); // delete all uploaded gcs files in case of exception DataprocUtils.deleteGcsPath(getStorageClient(), bucket, runRootPath); - DataprocUtils.emitMetric(provisionerContext, region, - "provisioner.submitJob.response.count", e); + DataprocUtils.emitMetric(provisionerContext, submitJobMetric.setException(e).build()); // ResourceExhaustedException indicates Dataproc agent running on master node isn't emitting heartbeat. // This usually indicates master VM crashing due to OOM. if (e instanceof ResourceExhaustedException) { @@ -678,11 +683,9 @@ private InputStream openStream(URI uri) throws IOException { * Creates and returns dataproc job submit request. */ private SubmitJobRequest getSubmitJobRequest(RuntimeJobInfo runtimeJobInfo, - List localFiles) throws IOException { + List localFiles, LaunchMode launchMode) throws IOException { String applicationJarLocalizedName = runtimeJobInfo.getArguments().get(Constants.Files.APPLICATION_JAR); - LaunchMode launchMode = LaunchMode.valueOf( - provisionerProperties.getOrDefault("launchMode", LaunchMode.CLUSTER.name()).toUpperCase()); HadoopJob.Builder hadoopJobBuilder = HadoopJob.newBuilder() // set main class .setMainClass(DataprocJobMain.class.getName()) From b6fb2e6b54be0a1776589154b1f545ff3c0eba32 Mon Sep 17 00:00:00 2001 From: albertshau Date: Thu, 2 Nov 2023 09:43:21 -0700 Subject: [PATCH 2/2] CDAP-20858 checkstyle fixes --- .../runtime/spi/common/DataprocMetric.java | 3 +- .../runtime/spi/common/DataprocUtils.java | 6 ++++ .../runtimejob/DataprocRuntimeJobManager.java | 35 ++++++++++++++----- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocMetric.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocMetric.java index 738c0fb6616f..1e3933bca5e9 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocMetric.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocMetric.java @@ -20,7 +20,7 @@ import javax.annotation.Nullable; /** - * Dataproc related metric + * Dataproc related metric. */ public class DataprocMetric { private final String region; @@ -98,6 +98,7 @@ public Builder setLaunchMode(@Nullable LaunchMode launchMode) { /** * Returns a DataprocMetric. + * * @return DataprocMetric. */ public DataprocMetric build() { diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocUtils.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocUtils.java index 5efb82abc50f..d29b63817325 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocUtils.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocUtils.java @@ -462,6 +462,12 @@ private static void updateTemporaryHoldOnGcsObject(Storage storage, String bucke } } + /** + * Get a user friendly message pointing to an external troubleshooting doc. + * + * @param troubleshootingDocsUrl Url for the troubleshooting doc + * @return user friendly message pointing to an external troubleshooting doc. + */ public static String getTroubleshootingHelpMessage(@Nullable String troubleshootingDocsUrl) { if (Strings.isNullOrEmpty(troubleshootingDocsUrl)) { return ""; diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java index b59b6bf534af..0d252dc0462e 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java @@ -544,7 +544,7 @@ private LocalFile uploadCacheableFile(String bucket, String targetFilePath, try { LOG.debug("Uploading a file of size {} bytes from {} to gs://{}/{}", localFile.getSize(), localFile.getURI(), bucket, targetFilePath); - uploadToGCSUtil(localFile, storage, targetFilePath, newBlobInfo, + uploadToGcsUtil(localFile, storage, targetFilePath, newBlobInfo, Storage.BlobWriteOption.generationMatch(), Storage.BlobWriteOption.metagenerationMatch()); } catch (StorageException e) { @@ -600,7 +600,7 @@ private LocalFile uploadFile(String bucket, String targetFilePath, localFile.getSize(), localFile.getURI(), bucket, targetFilePath, bucketObj.getLocationType(), bucketObj.getLocation()); try { - uploadToGCSUtil(localFile, storage, targetFilePath, blobInfo, + uploadToGcsUtil(localFile, storage, targetFilePath, blobInfo, Storage.BlobWriteOption.doesNotExist()); } catch (StorageException e) { if (e.getCode() != HttpURLConnection.HTTP_PRECON_FAILED) { @@ -613,7 +613,7 @@ private LocalFile uploadFile(String bucket, String targetFilePath, // Overwrite the file Blob existingBlob = storage.get(blobId); BlobInfo newBlobInfo = existingBlob.toBuilder().setContentType(contentType).build(); - uploadToGCSUtil(localFile, storage, targetFilePath, newBlobInfo, + uploadToGcsUtil(localFile, storage, targetFilePath, newBlobInfo, Storage.BlobWriteOption.generationNotMatch()); } else { LOG.debug("Skip uploading file {} to gs://{}/{} because it exists.", @@ -637,11 +637,11 @@ private long getCustomTime() { /** * Uploads the file to GCS Bucket. */ - private void uploadToGCSUtil(LocalFile localFile, Storage storage, String targetFilePath, + private void uploadToGcsUtil(LocalFile localFile, Storage storage, String targetFilePath, BlobInfo blobInfo, Storage.BlobWriteOption... blobWriteOptions) throws IOException, StorageException { long start = System.nanoTime(); - uploadToGCS(localFile.getURI(), storage, blobInfo, blobWriteOptions); + uploadToGcs(localFile.getURI(), storage, blobInfo, blobWriteOptions); long end = System.nanoTime(); LOG.debug("Successfully uploaded file {} to gs://{}/{} in {} ms.", localFile.getURI(), bucket, targetFilePath, TimeUnit.NANOSECONDS.toMillis(end - start)); @@ -650,7 +650,7 @@ private void uploadToGCSUtil(LocalFile localFile, Storage storage, String target /** * Uploads the file to GCS bucket. */ - private void uploadToGCS(java.net.URI localFileUri, Storage storage, BlobInfo blobInfo, + private void uploadToGcs(java.net.URI localFileUri, Storage storage, BlobInfo blobInfo, Storage.BlobWriteOption... blobWriteOptions) throws IOException, StorageException { try (InputStream inputStream = openStream(localFileUri); WriteChannel writer = storage.writer(blobInfo, blobWriteOptions)) { @@ -753,6 +753,17 @@ private SubmitJobRequest getSubmitJobRequest(RuntimeJobInfo runtimeJobInfo, .build(); } + /** + * Get the list of arguments to pass to the runtime job on the command line. + * The DataprocJobMain argument is [class-name] [spark-compat] [list of archive files...] + * + * @param runtimeJobInfo information about the runtime job + * @param localFiles files to localize + * @param sparkCompat spark compat version + * @param applicationJarLocalizedName localized application jar name + * @param launchMode launch mode for the job + * @return list of arguments to pass to the runtime job on the command line + */ @VisibleForTesting public static List getArguments(RuntimeJobInfo runtimeJobInfo, List localFiles, String sparkCompat, String applicationJarLocalizedName, @@ -773,6 +784,12 @@ public static List getArguments(RuntimeJobInfo runtimeJobInfo, List getProperties(RuntimeJobInfo runtimeJobInfo) { ProgramRunInfo runInfo = runtimeJobInfo.getProgramRunInfo(); @@ -870,9 +887,9 @@ private String getPath(String... pathSubComponents) { /** * Returns job name from run info. namespace, application, program, run(36 characters) Example: * namespace_application_program_8e1cb2ce-a102-48cf-a959-c4f991a2b475 - *

- * The ID must contain only letters (a-z, A-Z), numbers (0-9), underscores (_), or hyphens (-). - * The maximum length is 100 characters. + * + *

The ID must contain only letters (a-z, A-Z), numbers (0-9), underscores (_), or hyphens (-). + * The maximum length is 100 characters.

* * @throws IllegalArgumentException if provided id does not comply with naming restrictions */