Skip to content

Commit

Permalink
Merge pull request #15402 from cdapio/CDAP-20858-add-launch-mode-metr…
Browse files Browse the repository at this point in the history
…ic-6-10

Cdap 20858 add launch mode metric 6 10
  • Loading branch information
albertshau authored Nov 2, 2023
2 parents 11c1d61 + b6fb2e6 commit 247147a
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.runtime.spi.common;

import io.cdap.cdap.runtime.spi.runtimejob.LaunchMode;
import javax.annotation.Nullable;

/**
* Dataproc related metric.
*/
public class DataprocMetric {
private final String region;
private final String metricName;
@Nullable
private final Exception exception;
@Nullable
private final LaunchMode launchMode;

private DataprocMetric(String metricName, String region, @Nullable Exception exception,
@Nullable LaunchMode launchMode) {
this.metricName = metricName;
this.region = region;
this.exception = exception;
this.launchMode = launchMode;
}

public String getMetricName() {
return metricName;
}

public String getRegion() {
return region;
}

@Nullable
public Exception getException() {
return exception;
}

@Nullable
public LaunchMode getLaunchMode() {
return launchMode;
}

/**
* Returns a builder to create a DataprocMetric.
*
* @param metricName metric name
* @return Builder to create a DataprocMetric
*/
public static Builder builder(String metricName) {
return new Builder(metricName);
}

/**
* Builder for a DataprocMetric.
*/
public static class Builder {
private final String metricName;
private String region;
@Nullable
private Exception exception;
@Nullable
private LaunchMode launchMode;

private Builder(String metricName) {
this.metricName = metricName;
}

public Builder setRegion(String region) {
this.region = region;
return this;
}

public Builder setException(@Nullable Exception e) {
this.exception = e;
return this;
}

public Builder setLaunchMode(@Nullable LaunchMode launchMode) {
this.launchMode = launchMode;
return this;
}

/**
* Returns a DataprocMetric.
*
* @return DataprocMetric.
*/
public DataprocMetric build() {
if (region == null) {
// region should always be set unless there is a bug in the code
throw new IllegalStateException("Dataproc metric is missing the region");
}
return new DataprocMetric(metricName, region, exception, launchMode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,20 @@ public static String getSystemProjectId() {
**/
public static void emitMetric(ProvisionerContext context, String region,
String metricName, @Nullable Exception e) {
emitMetric(context,
DataprocMetric.builder(metricName).setRegion(region).setException(e).build());
}

public static void emitMetric(ProvisionerContext context, String region, String metricName) {
emitMetric(context, region, metricName, null);
}

/**
* Emit a dataproc metric.
**/
public static void emitMetric(ProvisionerContext context, DataprocMetric dataprocMetric) {
StatusCode.Code statusCode;
Exception e = dataprocMetric.getException();
if (e == null) {
statusCode = StatusCode.Code.OK;
} else {
Expand All @@ -316,16 +329,14 @@ public static void emitMetric(ProvisionerContext context, String region,
statusCode = StatusCode.Code.INTERNAL;
}
}
Map<String, String> tags = ImmutableMap.<String, String>builder()
.put("reg", region)
.put("sc", statusCode.toString())
.build();
ProvisionerMetrics metrics = context.getMetrics(tags);
metrics.count(metricName, 1);
}

public static void emitMetric(ProvisionerContext context, String region, String metricName) {
emitMetric(context, region, metricName, null);
ImmutableMap.Builder<String, String> tags = ImmutableMap.<String, String>builder()
.put("reg", dataprocMetric.getRegion())
.put("sc", statusCode.toString());
if (dataprocMetric.getLaunchMode() != null) {
tags.put("lchmode", dataprocMetric.getLaunchMode().name());
}
ProvisionerMetrics metrics = context.getMetrics(tags.build());
metrics.count(dataprocMetric.getMetricName(), 1);
}

/**
Expand Down Expand Up @@ -451,6 +462,12 @@ private static void updateTemporaryHoldOnGcsObject(Storage storage, String bucke
}
}

/**
* Get a user friendly message pointing to an external troubleshooting doc.
*
* @param troubleshootingDocsUrl Url for the troubleshooting doc
* @return user friendly message pointing to an external troubleshooting doc.
*/
public static String getTroubleshootingHelpMessage(@Nullable String troubleshootingDocsUrl) {
if (Strings.isNullOrEmpty(troubleshootingDocsUrl)) {
return "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.cdap.cdap.runtime.spi.CacheableLocalFile;
import io.cdap.cdap.runtime.spi.ProgramRunInfo;
import io.cdap.cdap.runtime.spi.VersionInfo;
import io.cdap.cdap.runtime.spi.common.DataprocMetric;
import io.cdap.cdap.runtime.spi.common.DataprocUtils;
import io.cdap.cdap.runtime.spi.provisioner.ProvisionerContext;
import io.cdap.cdap.runtime.spi.provisioner.dataproc.DataprocRuntimeException;
Expand Down Expand Up @@ -293,6 +294,12 @@ public void launch(RuntimeJobInfo runtimeJobInfo) throws Exception {
cdapVersionInfo.getFix());
}

LaunchMode launchMode = LaunchMode.valueOf(
provisionerProperties.getOrDefault("launchMode", LaunchMode.CLUSTER.name()).toUpperCase());
DataprocMetric.Builder submitJobMetric =
DataprocMetric.builder("provisioner.submitJob.response.count")
.setRegion(region)
.setLaunchMode(launchMode);
try {
// step 1: build twill.jar and launcher.jar and add them to files to be copied to gcs
if (disableLocalCaching) {
Expand Down Expand Up @@ -339,7 +346,7 @@ public void launch(RuntimeJobInfo runtimeJobInfo) throws Exception {
}

// step 3: build the hadoop job request to be submitted to dataproc
SubmitJobRequest request = getSubmitJobRequest(runtimeJobInfo, uploadedFiles);
SubmitJobRequest request = getSubmitJobRequest(runtimeJobInfo, uploadedFiles, launchMode);

// step 4: submit hadoop job to dataproc
try {
Expand All @@ -351,15 +358,13 @@ public void launch(RuntimeJobInfo runtimeJobInfo) throws Exception {
LOG.warn("The dataproc job {} already exists. Ignoring resubmission of the job.",
request.getJob().getReference().getJobId());
}
DataprocUtils.emitMetric(provisionerContext, region,
"provisioner.submitJob.response.count");
DataprocUtils.emitMetric(provisionerContext, submitJobMetric.build());
} catch (Exception e) {
String errorMessage = String.format("Error while launching job %s on cluster %s.",
getJobId(runInfo), clusterName);
// delete all uploaded gcs files in case of exception
DataprocUtils.deleteGcsPath(getStorageClient(), bucket, runRootPath);
DataprocUtils.emitMetric(provisionerContext, region,
"provisioner.submitJob.response.count", e);
DataprocUtils.emitMetric(provisionerContext, submitJobMetric.setException(e).build());
// ResourceExhaustedException indicates Dataproc agent running on master node isn't emitting heartbeat.
// This usually indicates master VM crashing due to OOM.
if (e instanceof ResourceExhaustedException) {
Expand Down Expand Up @@ -539,7 +544,7 @@ private LocalFile uploadCacheableFile(String bucket, String targetFilePath,
try {
LOG.debug("Uploading a file of size {} bytes from {} to gs://{}/{}",
localFile.getSize(), localFile.getURI(), bucket, targetFilePath);
uploadToGCSUtil(localFile, storage, targetFilePath, newBlobInfo,
uploadToGcsUtil(localFile, storage, targetFilePath, newBlobInfo,
Storage.BlobWriteOption.generationMatch(),
Storage.BlobWriteOption.metagenerationMatch());
} catch (StorageException e) {
Expand Down Expand Up @@ -595,7 +600,7 @@ private LocalFile uploadFile(String bucket, String targetFilePath,
localFile.getSize(), localFile.getURI(), bucket, targetFilePath,
bucketObj.getLocationType(), bucketObj.getLocation());
try {
uploadToGCSUtil(localFile, storage, targetFilePath, blobInfo,
uploadToGcsUtil(localFile, storage, targetFilePath, blobInfo,
Storage.BlobWriteOption.doesNotExist());
} catch (StorageException e) {
if (e.getCode() != HttpURLConnection.HTTP_PRECON_FAILED) {
Expand All @@ -608,7 +613,7 @@ private LocalFile uploadFile(String bucket, String targetFilePath,
// Overwrite the file
Blob existingBlob = storage.get(blobId);
BlobInfo newBlobInfo = existingBlob.toBuilder().setContentType(contentType).build();
uploadToGCSUtil(localFile, storage, targetFilePath, newBlobInfo,
uploadToGcsUtil(localFile, storage, targetFilePath, newBlobInfo,
Storage.BlobWriteOption.generationNotMatch());
} else {
LOG.debug("Skip uploading file {} to gs://{}/{} because it exists.",
Expand All @@ -632,11 +637,11 @@ private long getCustomTime() {
/**
* Uploads the file to GCS Bucket.
*/
private void uploadToGCSUtil(LocalFile localFile, Storage storage, String targetFilePath,
private void uploadToGcsUtil(LocalFile localFile, Storage storage, String targetFilePath,
BlobInfo blobInfo,
Storage.BlobWriteOption... blobWriteOptions) throws IOException, StorageException {
long start = System.nanoTime();
uploadToGCS(localFile.getURI(), storage, blobInfo, blobWriteOptions);
uploadToGcs(localFile.getURI(), storage, blobInfo, blobWriteOptions);
long end = System.nanoTime();
LOG.debug("Successfully uploaded file {} to gs://{}/{} in {} ms.",
localFile.getURI(), bucket, targetFilePath, TimeUnit.NANOSECONDS.toMillis(end - start));
Expand All @@ -645,7 +650,7 @@ private void uploadToGCSUtil(LocalFile localFile, Storage storage, String target
/**
* Uploads the file to GCS bucket.
*/
private void uploadToGCS(java.net.URI localFileUri, Storage storage, BlobInfo blobInfo,
private void uploadToGcs(java.net.URI localFileUri, Storage storage, BlobInfo blobInfo,
Storage.BlobWriteOption... blobWriteOptions) throws IOException, StorageException {
try (InputStream inputStream = openStream(localFileUri);
WriteChannel writer = storage.writer(blobInfo, blobWriteOptions)) {
Expand Down Expand Up @@ -678,11 +683,9 @@ private InputStream openStream(URI uri) throws IOException {
* Creates and returns dataproc job submit request.
*/
private SubmitJobRequest getSubmitJobRequest(RuntimeJobInfo runtimeJobInfo,
List<LocalFile> localFiles) throws IOException {
List<LocalFile> localFiles, LaunchMode launchMode) throws IOException {
String applicationJarLocalizedName = runtimeJobInfo.getArguments().get(Constants.Files.APPLICATION_JAR);

LaunchMode launchMode = LaunchMode.valueOf(
provisionerProperties.getOrDefault("launchMode", LaunchMode.CLUSTER.name()).toUpperCase());
HadoopJob.Builder hadoopJobBuilder = HadoopJob.newBuilder()
// set main class
.setMainClass(DataprocJobMain.class.getName())
Expand Down Expand Up @@ -750,6 +753,17 @@ private SubmitJobRequest getSubmitJobRequest(RuntimeJobInfo runtimeJobInfo,
.build();
}

/**
* Get the list of arguments to pass to the runtime job on the command line.
* The DataprocJobMain argument is [class-name] [spark-compat] [list of archive files...]
*
* @param runtimeJobInfo information about the runtime job
* @param localFiles files to localize
* @param sparkCompat spark compat version
* @param applicationJarLocalizedName localized application jar name
* @param launchMode launch mode for the job
* @return list of arguments to pass to the runtime job on the command line
*/
@VisibleForTesting
public static List<String> getArguments(RuntimeJobInfo runtimeJobInfo, List<LocalFile> localFiles,
String sparkCompat, String applicationJarLocalizedName,
Expand All @@ -770,6 +784,12 @@ public static List<String> getArguments(RuntimeJobInfo runtimeJobInfo, List<Loca
return arguments;
}

/**
* Get the property map that should be set for the Dataproc Hadoop Job.
*
* @param runtimeJobInfo information about the runtime job
* @return property map that should be set for the Dataproc Hadoop Job
*/
@VisibleForTesting
public static Map<String, String> getProperties(RuntimeJobInfo runtimeJobInfo) {
ProgramRunInfo runInfo = runtimeJobInfo.getProgramRunInfo();
Expand Down Expand Up @@ -867,9 +887,9 @@ private String getPath(String... pathSubComponents) {
/**
* Returns job name from run info. namespace, application, program, run(36 characters) Example:
* namespace_application_program_8e1cb2ce-a102-48cf-a959-c4f991a2b475
* <p>
* The ID must contain only letters (a-z, A-Z), numbers (0-9), underscores (_), or hyphens (-).
* The maximum length is 100 characters.
*
* <p>The ID must contain only letters (a-z, A-Z), numbers (0-9), underscores (_), or hyphens (-).
* The maximum length is 100 characters.</p>
*
* @throws IllegalArgumentException if provided id does not comply with naming restrictions
*/
Expand Down

0 comments on commit 247147a

Please sign in to comment.