Skip to content

Commit

Permalink
update mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Yicong-Huang committed Dec 19, 2024
1 parent 2a24e40 commit 8f05a4b
Showing 1 changed file with 17 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import edu.uci.ics.amber.operator.sink.ProgressiveUtils
import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpExec
import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity}
import edu.uci.ics.amber.workflow.OutputPort.OutputMode
import edu.uci.ics.amber.workflow.OutputPort.OutputMode.SET_SNAPSHOT
import edu.uci.ics.amber.workflow.OutputPort.OutputMode.{SET_DELTA, SET_SNAPSHOT, SINGLE_SNAPSHOT}
import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PortIdentity}

object SpecialPhysicalOpFactory {
Expand Down Expand Up @@ -38,21 +38,23 @@ object SpecialPhysicalOpFactory {
val inputSchema = inputSchemas.values.head

// Define outputSchema based on outputMode
val outputSchema = if (outputMode == SET_SNAPSHOT) {
if (inputSchema.containsAttribute(ProgressiveUtils.insertRetractFlagAttr.getName)) {
// input is insert/retract delta: remove the flag column in the output
Schema
.builder()
.add(inputSchema)
.remove(ProgressiveUtils.insertRetractFlagAttr.getName)
.build()
} else {
// input is insert-only delta: output schema is the same as input schema
val outputSchema = outputMode match {
case SET_SNAPSHOT | SINGLE_SNAPSHOT =>
if (inputSchema.containsAttribute(ProgressiveUtils.insertRetractFlagAttr.getName)) {
// with insert/retract delta: remove the flag column
Schema
.builder()
.add(inputSchema)
.remove(ProgressiveUtils.insertRetractFlagAttr.getName)
.build()
} else {
// with insert-only delta: output schema is the same as input schema
inputSchema
}

case SET_DELTA =>
// output schema is the same as input schema
inputSchema
}
} else {
// SET_DELTA: output schema is the same as input schema
inputSchema
}

// Create a Scala immutable Map
Expand Down

0 comments on commit 8f05a4b

Please sign in to comment.