diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala index 541c1c9712..6b63010894 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala @@ -7,7 +7,7 @@ import edu.uci.ics.amber.operator.sink.ProgressiveUtils import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpExec import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity} import edu.uci.ics.amber.workflow.OutputPort.OutputMode -import edu.uci.ics.amber.workflow.OutputPort.OutputMode.SET_SNAPSHOT +import edu.uci.ics.amber.workflow.OutputPort.OutputMode.{SET_DELTA, SET_SNAPSHOT, SINGLE_SNAPSHOT} import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PortIdentity} object SpecialPhysicalOpFactory { @@ -38,21 +38,23 @@ object SpecialPhysicalOpFactory { val inputSchema = inputSchemas.values.head // Define outputSchema based on outputMode - val outputSchema = if (outputMode == SET_SNAPSHOT) { - if (inputSchema.containsAttribute(ProgressiveUtils.insertRetractFlagAttr.getName)) { - // input is insert/retract delta: remove the flag column in the output - Schema - .builder() - .add(inputSchema) - .remove(ProgressiveUtils.insertRetractFlagAttr.getName) - .build() - } else { - // input is insert-only delta: output schema is the same as input schema + val outputSchema = outputMode match { + case SET_SNAPSHOT | SINGLE_SNAPSHOT => + if (inputSchema.containsAttribute(ProgressiveUtils.insertRetractFlagAttr.getName)) { + // with insert/retract delta: remove the flag column + Schema + .builder() + .add(inputSchema) + .remove(ProgressiveUtils.insertRetractFlagAttr.getName) + .build() + } else { + // with insert-only delta: output schema is the same as input schema + inputSchema + } + + case SET_DELTA => + // output schema is the same as input schema inputSchema - } - } else { - // SET_DELTA: output schema is the same as input schema - inputSchema } // Create a Scala immutable Map