Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cdap 20858 add launch mode metric 6 10 #15402

Merged
merged 2 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.runtime.spi.common;

import io.cdap.cdap.runtime.spi.runtimejob.LaunchMode;
import javax.annotation.Nullable;

/**
* Dataproc related metric.
*/
public class DataprocMetric {
private final String region;
private final String metricName;
@Nullable
private final Exception exception;
@Nullable
private final LaunchMode launchMode;

private DataprocMetric(String metricName, String region, @Nullable Exception exception,
@Nullable LaunchMode launchMode) {
this.metricName = metricName;
this.region = region;
this.exception = exception;
this.launchMode = launchMode;
}

public String getMetricName() {
return metricName;
}

public String getRegion() {
return region;
}

@Nullable
public Exception getException() {
return exception;
}

@Nullable
public LaunchMode getLaunchMode() {
return launchMode;
}

/**
* Returns a builder to create a DataprocMetric.
*
* @param metricName metric name
* @return Builder to create a DataprocMetric
*/
public static Builder builder(String metricName) {
return new Builder(metricName);
}

/**
* Builder for a DataprocMetric.
*/
public static class Builder {
private final String metricName;
private String region;
@Nullable
private Exception exception;
@Nullable
private LaunchMode launchMode;

private Builder(String metricName) {
this.metricName = metricName;
}

public Builder setRegion(String region) {
this.region = region;
return this;
}

public Builder setException(@Nullable Exception e) {
this.exception = e;
return this;
}

public Builder setLaunchMode(@Nullable LaunchMode launchMode) {
this.launchMode = launchMode;
return this;
}

/**
* Returns a DataprocMetric.
*
* @return DataprocMetric.
*/
public DataprocMetric build() {
if (region == null) {
// region should always be set unless there is a bug in the code
throw new IllegalStateException("Dataproc metric is missing the region");
}
return new DataprocMetric(metricName, region, exception, launchMode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,20 @@ public static String getSystemProjectId() {
**/
public static void emitMetric(ProvisionerContext context, String region,
String metricName, @Nullable Exception e) {
emitMetric(context,
DataprocMetric.builder(metricName).setRegion(region).setException(e).build());
}

public static void emitMetric(ProvisionerContext context, String region, String metricName) {
emitMetric(context, region, metricName, null);
}

/**
* Emit a dataproc metric.
**/
public static void emitMetric(ProvisionerContext context, DataprocMetric dataprocMetric) {
StatusCode.Code statusCode;
Exception e = dataprocMetric.getException();
if (e == null) {
statusCode = StatusCode.Code.OK;
} else {
Expand All @@ -316,16 +329,14 @@ public static void emitMetric(ProvisionerContext context, String region,
statusCode = StatusCode.Code.INTERNAL;
}
}
Map<String, String> tags = ImmutableMap.<String, String>builder()
.put("reg", region)
.put("sc", statusCode.toString())
.build();
ProvisionerMetrics metrics = context.getMetrics(tags);
metrics.count(metricName, 1);
}

public static void emitMetric(ProvisionerContext context, String region, String metricName) {
emitMetric(context, region, metricName, null);
ImmutableMap.Builder<String, String> tags = ImmutableMap.<String, String>builder()
.put("reg", dataprocMetric.getRegion())
.put("sc", statusCode.toString());
if (dataprocMetric.getLaunchMode() != null) {
tags.put("lchmode", dataprocMetric.getLaunchMode().name());
}
ProvisionerMetrics metrics = context.getMetrics(tags.build());
metrics.count(dataprocMetric.getMetricName(), 1);
}

/**
Expand Down Expand Up @@ -451,6 +462,12 @@ private static void updateTemporaryHoldOnGcsObject(Storage storage, String bucke
}
}

/**
* Get a user friendly message pointing to an external troubleshooting doc.
*
* @param troubleshootingDocsUrl Url for the troubleshooting doc
* @return user friendly message pointing to an external troubleshooting doc.
*/
public static String getTroubleshootingHelpMessage(@Nullable String troubleshootingDocsUrl) {
if (Strings.isNullOrEmpty(troubleshootingDocsUrl)) {
return "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.cdap.cdap.runtime.spi.CacheableLocalFile;
import io.cdap.cdap.runtime.spi.ProgramRunInfo;
import io.cdap.cdap.runtime.spi.VersionInfo;
import io.cdap.cdap.runtime.spi.common.DataprocMetric;
import io.cdap.cdap.runtime.spi.common.DataprocUtils;
import io.cdap.cdap.runtime.spi.provisioner.ProvisionerContext;
import io.cdap.cdap.runtime.spi.provisioner.dataproc.DataprocRuntimeException;
Expand Down Expand Up @@ -293,6 +294,12 @@ public void launch(RuntimeJobInfo runtimeJobInfo) throws Exception {
cdapVersionInfo.getFix());
}

LaunchMode launchMode = LaunchMode.valueOf(
provisionerProperties.getOrDefault("launchMode", LaunchMode.CLUSTER.name()).toUpperCase());
DataprocMetric.Builder submitJobMetric =
DataprocMetric.builder("provisioner.submitJob.response.count")
.setRegion(region)
.setLaunchMode(launchMode);
try {
// step 1: build twill.jar and launcher.jar and add them to files to be copied to gcs
if (disableLocalCaching) {
Expand Down Expand Up @@ -339,7 +346,7 @@ public void launch(RuntimeJobInfo runtimeJobInfo) throws Exception {
}

// step 3: build the hadoop job request to be submitted to dataproc
SubmitJobRequest request = getSubmitJobRequest(runtimeJobInfo, uploadedFiles);
SubmitJobRequest request = getSubmitJobRequest(runtimeJobInfo, uploadedFiles, launchMode);

// step 4: submit hadoop job to dataproc
try {
Expand All @@ -351,15 +358,13 @@ public void launch(RuntimeJobInfo runtimeJobInfo) throws Exception {
LOG.warn("The dataproc job {} already exists. Ignoring resubmission of the job.",
request.getJob().getReference().getJobId());
}
DataprocUtils.emitMetric(provisionerContext, region,
"provisioner.submitJob.response.count");
DataprocUtils.emitMetric(provisionerContext, submitJobMetric.build());
} catch (Exception e) {
String errorMessage = String.format("Error while launching job %s on cluster %s.",
getJobId(runInfo), clusterName);
// delete all uploaded gcs files in case of exception
DataprocUtils.deleteGcsPath(getStorageClient(), bucket, runRootPath);
DataprocUtils.emitMetric(provisionerContext, region,
"provisioner.submitJob.response.count", e);
DataprocUtils.emitMetric(provisionerContext, submitJobMetric.setException(e).build());
// ResourceExhaustedException indicates Dataproc agent running on master node isn't emitting heartbeat.
// This usually indicates master VM crashing due to OOM.
if (e instanceof ResourceExhaustedException) {
Expand Down Expand Up @@ -539,7 +544,7 @@ private LocalFile uploadCacheableFile(String bucket, String targetFilePath,
try {
LOG.debug("Uploading a file of size {} bytes from {} to gs://{}/{}",
localFile.getSize(), localFile.getURI(), bucket, targetFilePath);
uploadToGCSUtil(localFile, storage, targetFilePath, newBlobInfo,
uploadToGcsUtil(localFile, storage, targetFilePath, newBlobInfo,
Storage.BlobWriteOption.generationMatch(),
Storage.BlobWriteOption.metagenerationMatch());
} catch (StorageException e) {
Expand Down Expand Up @@ -595,7 +600,7 @@ private LocalFile uploadFile(String bucket, String targetFilePath,
localFile.getSize(), localFile.getURI(), bucket, targetFilePath,
bucketObj.getLocationType(), bucketObj.getLocation());
try {
uploadToGCSUtil(localFile, storage, targetFilePath, blobInfo,
uploadToGcsUtil(localFile, storage, targetFilePath, blobInfo,
Storage.BlobWriteOption.doesNotExist());
} catch (StorageException e) {
if (e.getCode() != HttpURLConnection.HTTP_PRECON_FAILED) {
Expand All @@ -608,7 +613,7 @@ private LocalFile uploadFile(String bucket, String targetFilePath,
// Overwrite the file
Blob existingBlob = storage.get(blobId);
BlobInfo newBlobInfo = existingBlob.toBuilder().setContentType(contentType).build();
uploadToGCSUtil(localFile, storage, targetFilePath, newBlobInfo,
uploadToGcsUtil(localFile, storage, targetFilePath, newBlobInfo,
Storage.BlobWriteOption.generationNotMatch());
} else {
LOG.debug("Skip uploading file {} to gs://{}/{} because it exists.",
Expand All @@ -632,11 +637,11 @@ private long getCustomTime() {
/**
* Uploads the file to GCS Bucket.
*/
private void uploadToGCSUtil(LocalFile localFile, Storage storage, String targetFilePath,
private void uploadToGcsUtil(LocalFile localFile, Storage storage, String targetFilePath,
BlobInfo blobInfo,
Storage.BlobWriteOption... blobWriteOptions) throws IOException, StorageException {
long start = System.nanoTime();
uploadToGCS(localFile.getURI(), storage, blobInfo, blobWriteOptions);
uploadToGcs(localFile.getURI(), storage, blobInfo, blobWriteOptions);
long end = System.nanoTime();
LOG.debug("Successfully uploaded file {} to gs://{}/{} in {} ms.",
localFile.getURI(), bucket, targetFilePath, TimeUnit.NANOSECONDS.toMillis(end - start));
Expand All @@ -645,7 +650,7 @@ private void uploadToGCSUtil(LocalFile localFile, Storage storage, String target
/**
* Uploads the file to GCS bucket.
*/
private void uploadToGCS(java.net.URI localFileUri, Storage storage, BlobInfo blobInfo,
private void uploadToGcs(java.net.URI localFileUri, Storage storage, BlobInfo blobInfo,
Storage.BlobWriteOption... blobWriteOptions) throws IOException, StorageException {
try (InputStream inputStream = openStream(localFileUri);
WriteChannel writer = storage.writer(blobInfo, blobWriteOptions)) {
Expand Down Expand Up @@ -678,11 +683,9 @@ private InputStream openStream(URI uri) throws IOException {
* Creates and returns dataproc job submit request.
*/
private SubmitJobRequest getSubmitJobRequest(RuntimeJobInfo runtimeJobInfo,
List<LocalFile> localFiles) throws IOException {
List<LocalFile> localFiles, LaunchMode launchMode) throws IOException {
String applicationJarLocalizedName = runtimeJobInfo.getArguments().get(Constants.Files.APPLICATION_JAR);

LaunchMode launchMode = LaunchMode.valueOf(
provisionerProperties.getOrDefault("launchMode", LaunchMode.CLUSTER.name()).toUpperCase());
HadoopJob.Builder hadoopJobBuilder = HadoopJob.newBuilder()
// set main class
.setMainClass(DataprocJobMain.class.getName())
Expand Down Expand Up @@ -750,6 +753,17 @@ private SubmitJobRequest getSubmitJobRequest(RuntimeJobInfo runtimeJobInfo,
.build();
}

/**
* Get the list of arguments to pass to the runtime job on the command line.
* The DataprocJobMain argument is [class-name] [spark-compat] [list of archive files...]
*
* @param runtimeJobInfo information about the runtime job
* @param localFiles files to localize
* @param sparkCompat spark compat version
* @param applicationJarLocalizedName localized application jar name
* @param launchMode launch mode for the job
* @return list of arguments to pass to the runtime job on the command line
*/
@VisibleForTesting
public static List<String> getArguments(RuntimeJobInfo runtimeJobInfo, List<LocalFile> localFiles,
String sparkCompat, String applicationJarLocalizedName,
Expand All @@ -770,6 +784,12 @@ public static List<String> getArguments(RuntimeJobInfo runtimeJobInfo, List<Loca
return arguments;
}

/**
* Get the property map that should be set for the Dataproc Hadoop Job.
*
* @param runtimeJobInfo information about the runtime job
* @return property map that should be set for the Dataproc Hadoop Job
*/
@VisibleForTesting
public static Map<String, String> getProperties(RuntimeJobInfo runtimeJobInfo) {
ProgramRunInfo runInfo = runtimeJobInfo.getProgramRunInfo();
Expand Down Expand Up @@ -867,9 +887,9 @@ private String getPath(String... pathSubComponents) {
/**
* Returns job name from run info. namespace, application, program, run(36 characters) Example:
* namespace_application_program_8e1cb2ce-a102-48cf-a959-c4f991a2b475
* <p>
* The ID must contain only letters (a-z, A-Z), numbers (0-9), underscores (_), or hyphens (-).
* The maximum length is 100 characters.
*
* <p>The ID must contain only letters (a-z, A-Z), numbers (0-9), underscores (_), or hyphens (-).
* The maximum length is 100 characters.</p>
*
* @throws IllegalArgumentException if provided id does not comply with naming restrictions
*/
Expand Down
Loading