Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[🍒][CDAP-21007][CDAP-21041] Add an option to disable field lineage emission & add dataproc job status in logs when stopped & set the state to error on program completion if cause is not provided to avoid NPE #15676

Merged
merged 3 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.cdap.cdap.app.program.Programs;
import io.cdap.cdap.app.runtime.Arguments;
import io.cdap.cdap.app.runtime.ProgramController;
import io.cdap.cdap.app.runtime.ProgramController.State;
import io.cdap.cdap.app.runtime.ProgramOptions;
import io.cdap.cdap.app.runtime.ProgramRunner;
import io.cdap.cdap.app.runtime.ProgramRunnerFactory;
Expand Down Expand Up @@ -203,7 +204,7 @@
LoggingContextHelper.getLoggingContextWithRunId(programRunId,
systemArgs.asMap()));
// Get the cluster launch type
Cluster cluster = GSON.fromJson(systemArgs.getOption(ProgramOptionConstants.CLUSTER),

Check warning on line 207 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/runtimejob/DefaultRuntimeJob.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck

Distance between variable 'cluster' declaration and its first usage is 4, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).
Cluster.class);

// Get App spec
Expand Down Expand Up @@ -246,7 +247,7 @@
}

// remember the file names in the artifact folder before app regeneration
List<String> pluginFiles = DirUtils.listFiles(pluginDir, File::isFile).stream()

Check warning on line 250 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/runtimejob/DefaultRuntimeJob.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck

Distance between variable 'pluginFiles' declaration and its first usage is 4, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).
.map(File::getName)
.collect(Collectors.toList());

Expand Down Expand Up @@ -356,7 +357,11 @@
// Write an extra state to make sure there is always a terminal state even
// if the program application run failed to write out the state.
programStateWriter.error(programRunId, cause);
programCompletion.completeExceptionally(cause);
if (cause == null) {
programCompletion.complete(State.ERROR);
} else {
programCompletion.completeExceptionally(cause);
}
}
}, Threads.SAME_THREAD_EXECUTOR);

Expand Down Expand Up @@ -454,7 +459,7 @@
* by the {@link RuntimeJobEnvironment#getProperties()} will be set into the returned {@link
* CConfiguration} instance.
*/
private CConfiguration createCConf(RuntimeJobEnvironment runtimeJobEnv,

Check warning on line 462 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/runtimejob/DefaultRuntimeJob.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'createCConf' must contain no more than '1' consecutive capital letters.
ProgramOptions programOpts) throws IOException {
CConfiguration cConf = CConfiguration.create();
cConf.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.cdap.cdap.app.runtime.ProgramStateWriter;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.FieldLineage;
import io.cdap.cdap.common.internal.remote.RemoteClientFactory;
import io.cdap.cdap.common.lang.Exceptions;
import io.cdap.cdap.common.lang.InstantiatorFactory;
Expand Down Expand Up @@ -313,7 +314,8 @@
LOG.error("Failed to store the final workflow token of Workflow {}", workflowRunId, t);
}

if (ProgramStatus.COMPLETED != workflowContext.getState().getStatus()) {
if (ProgramStatus.COMPLETED != workflowContext.getState().getStatus()
|| (!cConf.getBoolean(FieldLineage.FIELD_LINEAGE_EMISSION_ENABLED))) {
return;
}

Expand Down Expand Up @@ -560,7 +562,7 @@

private DatasetProperties addLocalDatasetProperty(DatasetProperties properties,
boolean keepLocal) {
String dsDescription = properties.getDescription();

Check warning on line 565 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/workflow/WorkflowDriver.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck

Distance between variable 'dsDescription' declaration and its first usage is 9, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).
DatasetProperties.Builder builder = DatasetProperties.builder();
builder.addAll(properties.getProperties());
builder.add(Constants.AppFabric.WORKFLOW_LOCAL_DATASET_PROPERTY, "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@
* Convenient method to get ZK quorum string from the configuration with
* proper default value.
*/
public static String getZKQuorum(CConfiguration cConf) {

Check warning on line 174 in cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'getZKQuorum' must contain no more than '1' consecutive capital letters.
String quorum = cConf.get(QUORUM);
if (!Strings.isNullOrEmpty(quorum)) {
return quorum;
Expand All @@ -185,7 +185,7 @@
/**
* HBase configurations.
*/
public static final class HBase {

Check warning on line 188 in cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'HBase' must contain no more than '1' consecutive capital letters.

public static final String AUTH_KEY_UPDATE_INTERVAL = "hbase.auth.key.update.interval";
public static final String MANAGE_COPROCESSORS = "master.manage.hbase.coprocessors";
Expand Down Expand Up @@ -1170,7 +1170,7 @@
/**
* JVM resource metrics.
*/
public static final class JVMResource {

Check warning on line 1173 in cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'JVMResource' must contain no more than '1' consecutive capital letters.

public static final String HEAP_USED_MB = "jvm.resource.heap.used.mb";
public static final String HEAP_MAX_MB = "jvm.resource.heap.max.mb";
Expand Down Expand Up @@ -1606,7 +1606,7 @@
/**
* App Fabric.
*/
public static final class SSL {

Check warning on line 1609 in cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'SSL' must contain no more than '1' consecutive capital letters.

/**
* Enables SSL for external services.
Expand Down Expand Up @@ -2149,7 +2149,7 @@
/**
* Constants for HBase DDL executor.
*/
public static final class HBaseDDLExecutor {

Check warning on line 2152 in cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'HBaseDDLExecutor' must contain no more than '1' consecutive capital letters.

public static final String EXTENSIONS_DIR = "hbase.ddlexecutor.extension.dir";
}
Expand All @@ -2175,6 +2175,9 @@
OUTGOING,
BOTH
}

public static final String FIELD_LINEAGE_EMISSION_ENABLED =
"metadata.messaging.field.lineage.emission.enabled";
}

/**
Expand Down Expand Up @@ -2318,7 +2321,7 @@
/**
* JMX metrics collector config.
*/
public static final class JMXMetricsCollector {

Check warning on line 2324 in cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.naming.AbbreviationAsWordInNameCheck

Abbreviation in name 'JMXMetricsCollector' must contain no more than '1' consecutive capital letters.

public static final String POLL_INTERVAL_SECS = "jmx.metrics.collector.poll.interval.secs";
public static final String SERVER_PORT = "jmx.metrics.collector.server.port";
Expand Down
9 changes: 9 additions & 0 deletions cdap-common/src/main/resources/cdap-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2755,6 +2755,15 @@
</description>
</property>

<property>
<name>metadata.messaging.field.lineage.emission.enabled</name>
<value>true</value>
<description>
Enable or disable publishing of field level lineage,
by default set to true.
</description>
</property>

<!-- Metrics Configuration -->

<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
/**
* In reuse scenario we can't find "our" cluster by cluster name, so let's put it into the label
*
* @see {@link DataprocProvisioner#getAllocatedClusterName(ProvisionerContext)}

Check warning on line 74 in cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/AbstractDataprocProvisioner.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.AtclauseOrderCheck

Javadoc comment at column 11 has parse error. Details: no viable alternative at input '{' while parsing JAVADOC_TAG
*/
public static final String LABEL_RUN_KEY = "cdap-run-key";

Expand Down Expand Up @@ -121,12 +121,13 @@
}

if (jobDetail != null
&& jobDetail.getStatus() == RuntimeJobStatus.FAILED
&& jobDetail.getStatus() != RuntimeJobStatus.COMPLETED
&& (jobDetail instanceof DataprocRuntimeJobDetail)) {
// Status details is specific to dataproc jobs, so it was not added to RuntimeJobDetail spi.
String statusDetails = ((DataprocRuntimeJobDetail) jobDetail).getJobStatusDetails();
if (statusDetails != null) {
LOG.error("Dataproc job failed with the status details: {}", statusDetails);
LOG.error("Dataproc job '{}' with the status details: {}",
jobDetail.getStatus().name(), statusDetails);
}
}
} finally {
Expand Down