Skip to content

Commit

Permalink
CDAP-20890 : updating the stage spec based on new connector name
Browse files Browse the repository at this point in the history
  • Loading branch information
sahusanket committed Nov 16, 2023
1 parent 815f131 commit be48d88
Showing 1 changed file with 39 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.condition.Condition;
import io.cdap.cdap.etl.common.Constants;
import io.cdap.cdap.etl.common.PipelinePhase;
Expand Down Expand Up @@ -382,12 +383,48 @@ private PipelinePhase dagToPipeline(Dag dag, Map<String, String> connectors,

// add other plugin types
StageSpec spec = specs.get(stageName);
phaseBuilder.addStage(spec);
phaseBuilder.addStage(updateStageSpec(spec, conditionConnectors));
}

return phaseBuilder.build();
}

private StageSpec updateStageSpec(StageSpec spec, Map<String, String> conditionConnectors) {
//Modify input schema with new name
Map<String, Schema> originalSchemas = spec.getInputSchemas();
Map<String, Schema> newSchemas = new HashMap<>();
for (Map.Entry<String, Schema> entry : originalSchemas.entrySet()) {
if (conditionConnectors.values().contains(entry.getKey())) {
newSchemas.put(conditionConnectors.get(entry.getKey()), originalSchemas.get(entry.getKey()));
} else {
newSchemas.put(entry.getKey(), originalSchemas.get(entry.getKey()));
}
}

StageSpec.Builder builder = StageSpec.builder(spec.getName(), spec.getPlugin())
.addInputSchemas(newSchemas)
.setErrorSchema(spec.getErrorSchema())
.setMaxPreviewRecords(spec.getMaxPreviewRecords())
.setOutputSchema(spec.getOutputSchema())
.setPluginProperties(spec.getPlugin().getProperties())
.setStageLoggingEnabled(spec.isStageLoggingEnabled())
.setProcessTimingEnabled(spec.isProcessTimingEnabled())
.setMaxPreviewRecords(spec.getMaxPreviewRecords());

Map<String, StageSpec.Port> outputPorts = spec.getOutputPorts();

//Modify out put stage name new name
for (String outputStageName : outputPorts.keySet()) {
String newOutputName = outputStageName;
if (conditionConnectors.values().contains(outputStageName)) {
newOutputName = conditionConnectors.get(outputStageName);
}
builder.addOutput(newOutputName, outputPorts.get(outputStageName).getPort(),
outputPorts.get(outputStageName).getSchema());
}

return builder.build();
}

@VisibleForTesting
static String getPhaseName(Dag dag) {
return getPhaseName(dag.getSources(), dag.getSinks(), String.valueOf(dag.hashCode()));
Expand Down

0 comments on commit be48d88

Please sign in to comment.