Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce State Related Operators #2858

Open
wants to merge 225 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
225 commits
Select commit Hold shift + click to select a range
3af95b0
init
aglinxinyuan Mar 10, 2024
6c17a01
Merge branch 'master' into xinyuan-marker
aglinxinyuan Mar 11, 2024
c5208d7
fix format
aglinxinyuan Mar 11, 2024
a4c75c4
Merge remote-tracking branch 'origin/xinyuan-marker' into xinyuan-marker
aglinxinyuan Mar 11, 2024
6830a06
fix format
aglinxinyuan Mar 11, 2024
c40fb17
Merge branch 'master' into xinyuan-marker
aglinxinyuan Mar 18, 2024
8f3975b
rename
aglinxinyuan Mar 18, 2024
7bb8c7e
Merge branch 'master' into xinyuan-marker
aglinxinyuan Mar 20, 2024
c1f48bf
fix fmt
aglinxinyuan Mar 20, 2024
7e40798
update
aglinxinyuan Mar 25, 2024
6ac2e71
Merge branch 'master' into xinyuan-marker
aglinxinyuan Mar 28, 2024
775aff2
Merge branch 'master' into xinyuan-marker
aglinxinyuan Mar 30, 2024
a2e9f34
Merge branch 'master' into xinyuan-marker
aglinxinyuan Apr 1, 2024
91beb72
Merge branch 'master' into xinyuan-marker
aglinxinyuan Apr 2, 2024
e7bab7f
add string serialization and test program
shengquan-ni Apr 4, 2024
e196f74
Merge branch 'master' into xinyuan-marker
aglinxinyuan Apr 4, 2024
6fb5466
update
aglinxinyuan Apr 5, 2024
772fbf7
update serialization
shengquan-ni Apr 5, 2024
7fd673e
Merge branch 'master' into xinyuan-marker
aglinxinyuan Apr 8, 2024
289787c
update
aglinxinyuan Apr 12, 2024
4ccf968
Merge branch 'master' into xinyuan-marker
aglinxinyuan Apr 16, 2024
3904018
update
aglinxinyuan Apr 16, 2024
58d6afd
update
aglinxinyuan Apr 16, 2024
d32b4f1
Merge branch 'master' into xinyuan-marker
aglinxinyuan Apr 29, 2024
ab8d438
Merge branch 'master' into xinyuan-marker
aglinxinyuan May 11, 2024
49a321d
update
aglinxinyuan May 12, 2024
5020377
update
aglinxinyuan May 13, 2024
20800ea
Merge branch 'master' into xinyuan-marker
aglinxinyuan May 13, 2024
bba1714
Merge branch 'master' into xinyuan-marker
aglinxinyuan May 19, 2024
02f420f
update
aglinxinyuan May 19, 2024
e50ad09
update
aglinxinyuan May 19, 2024
c2ede6b
Merge branch 'master' into xinyuan-marker
aglinxinyuan May 27, 2024
332aa0c
Merge branch 'master' into xinyuan-marker
aglinxinyuan Jul 24, 2024
7c27f69
Merge branch 'master' into xinyuan-marker
aglinxinyuan Jul 27, 2024
6981ea1
fix
aglinxinyuan Jul 28, 2024
2c0c8d4
fix
aglinxinyuan Jul 29, 2024
a4e4982
fix fmt
aglinxinyuan Jul 31, 2024
f5aeeb9
Merge branch 'master' into xinyuan-marker
aglinxinyuan Aug 3, 2024
e47dc9d
update
aglinxinyuan Aug 3, 2024
8cbb603
update
aglinxinyuan Aug 3, 2024
4d4babc
update
aglinxinyuan Aug 4, 2024
bcad4bf
update
aglinxinyuan Aug 4, 2024
98e98eb
update
aglinxinyuan Aug 4, 2024
7286df5
Revert "update"
aglinxinyuan Aug 4, 2024
b617d62
update
aglinxinyuan Aug 4, 2024
812706f
update
aglinxinyuan Aug 4, 2024
e6e93a6
update
aglinxinyuan Aug 4, 2024
cdf0d88
update
aglinxinyuan Aug 4, 2024
9749cdb
update
aglinxinyuan Aug 4, 2024
82367aa
update
aglinxinyuan Aug 4, 2024
8402750
update
aglinxinyuan Aug 4, 2024
7addf6e
update
aglinxinyuan Aug 5, 2024
81b72c4
update
aglinxinyuan Aug 5, 2024
6c403b8
update
aglinxinyuan Aug 5, 2024
5931bb0
update
aglinxinyuan Aug 6, 2024
c77bc97
update
aglinxinyuan Aug 6, 2024
a64df1f
Merge branch 'master' into xinyuan-marker
aglinxinyuan Aug 6, 2024
4a984b2
update
aglinxinyuan Aug 6, 2024
85a7e07
update
aglinxinyuan Aug 6, 2024
03a44f6
update
aglinxinyuan Aug 6, 2024
a5a27f6
update
aglinxinyuan Aug 6, 2024
dd623cf
update
aglinxinyuan Aug 6, 2024
ab2f9b0
update
aglinxinyuan Aug 6, 2024
eed3433
update
aglinxinyuan Aug 6, 2024
ca8d36b
update
aglinxinyuan Aug 7, 2024
cb5eecb
update
aglinxinyuan Aug 7, 2024
30d4695
update
aglinxinyuan Aug 7, 2024
4822bbe
update
aglinxinyuan Aug 7, 2024
79856ba
update
aglinxinyuan Aug 7, 2024
ca02b6d
update
aglinxinyuan Aug 7, 2024
1122caf
update
aglinxinyuan Aug 7, 2024
5472101
update
aglinxinyuan Aug 7, 2024
234e83b
update
aglinxinyuan Aug 7, 2024
526b495
update
aglinxinyuan Aug 7, 2024
3773ca1
update
aglinxinyuan Aug 7, 2024
0b19214
Merge branch 'master' into xinyuan-marker
aglinxinyuan Aug 8, 2024
1ff31d1
update
aglinxinyuan Aug 8, 2024
72c873d
update
aglinxinyuan Aug 8, 2024
b9dd6ab
update
aglinxinyuan Aug 9, 2024
0ee5fd3
update
aglinxinyuan Aug 9, 2024
d0acbbb
update
aglinxinyuan Aug 10, 2024
66b3928
update
aglinxinyuan Aug 10, 2024
78f3259
update
aglinxinyuan Aug 10, 2024
bd83627
Merge branch 'master' into xinyuan-marker
aglinxinyuan Aug 10, 2024
e177765
Merge branch 'master' into xinyuan-marker
aglinxinyuan Aug 22, 2024
dfcfa69
fix fmt
aglinxinyuan Aug 22, 2024
daf7615
fix fmt
aglinxinyuan Aug 22, 2024
1add5d2
fix fmt
aglinxinyuan Aug 22, 2024
11381c0
update
aglinxinyuan Aug 22, 2024
484feda
update
aglinxinyuan Aug 22, 2024
eb60922
init
aglinxinyuan Aug 22, 2024
25468fa
update
aglinxinyuan Aug 22, 2024
995b6d5
update
aglinxinyuan Aug 22, 2024
92b3c95
update
aglinxinyuan Aug 22, 2024
ad874a9
update
aglinxinyuan Aug 22, 2024
770deba
update
aglinxinyuan Aug 22, 2024
fc38808
update
aglinxinyuan Aug 22, 2024
df3a3ab
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 3, 2024
3da62d8
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 3, 2024
64197e2
update
aglinxinyuan Sep 5, 2024
b636546
update
aglinxinyuan Sep 5, 2024
7750b7f
update
aglinxinyuan Sep 5, 2024
ab93577
update
aglinxinyuan Sep 5, 2024
d08a830
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 5, 2024
86282de
update
aglinxinyuan Sep 5, 2024
7ceea09
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 7, 2024
4fb386c
update
aglinxinyuan Sep 7, 2024
f3d7336
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 7, 2024
693ac00
update
aglinxinyuan Sep 7, 2024
823919d
update
aglinxinyuan Sep 7, 2024
436e113
update
aglinxinyuan Sep 7, 2024
4773020
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 7, 2024
4a606e4
update
aglinxinyuan Sep 8, 2024
8ddea2c
update
aglinxinyuan Sep 9, 2024
ce832ff
update
aglinxinyuan Sep 9, 2024
ffe9d26
update
aglinxinyuan Sep 9, 2024
a60f562
update
aglinxinyuan Sep 9, 2024
27a40fc
update
aglinxinyuan Sep 9, 2024
a88edd1
update
aglinxinyuan Sep 10, 2024
79a4e8f
update
aglinxinyuan Sep 12, 2024
4c422f2
update
aglinxinyuan Sep 12, 2024
00c1504
update
aglinxinyuan Sep 12, 2024
205d589
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 14, 2024
51dff7d
update
aglinxinyuan Sep 14, 2024
bbffbbe
update
aglinxinyuan Sep 14, 2024
e0d6056
update
aglinxinyuan Sep 14, 2024
fdbc91f
update
aglinxinyuan Sep 14, 2024
e5c9cf5
update
aglinxinyuan Sep 14, 2024
20f9c02
update
aglinxinyuan Sep 14, 2024
f9c201b
update
aglinxinyuan Sep 14, 2024
5254aa7
update
aglinxinyuan Sep 14, 2024
067247f
update
aglinxinyuan Sep 15, 2024
13eafe0
update
aglinxinyuan Sep 15, 2024
1887bfb
update
aglinxinyuan Sep 16, 2024
083e866
update
aglinxinyuan Sep 16, 2024
87e305a
update
aglinxinyuan Sep 16, 2024
59aa9a9
update
aglinxinyuan Sep 16, 2024
d603c41
update
aglinxinyuan Sep 17, 2024
30be7db
fix format
aglinxinyuan Sep 17, 2024
4cc7e76
fix format
aglinxinyuan Sep 17, 2024
dbe435d
fix format
aglinxinyuan Sep 17, 2024
fdee773
fix test
aglinxinyuan Sep 17, 2024
e4594a8
fix test
aglinxinyuan Sep 17, 2024
760b590
fix fmt
aglinxinyuan Sep 17, 2024
a2783f3
fix fmt
aglinxinyuan Sep 17, 2024
809a9af
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 17, 2024
4e8c027
update
aglinxinyuan Sep 17, 2024
fdf68df
update
aglinxinyuan Sep 18, 2024
f0e028e
fix fmt
aglinxinyuan Sep 18, 2024
efa22ff
update
aglinxinyuan Sep 18, 2024
e3ff682
update
aglinxinyuan Sep 18, 2024
f4f5318
fix fmt
aglinxinyuan Sep 18, 2024
2e136f2
fix fmt
aglinxinyuan Sep 18, 2024
cff6f57
fix fmt
aglinxinyuan Sep 18, 2024
6372288
update
aglinxinyuan Sep 18, 2024
990a720
update
aglinxinyuan Sep 18, 2024
6e50be1
update
aglinxinyuan Sep 18, 2024
8889e61
update
aglinxinyuan Sep 18, 2024
183d21d
fix
aglinxinyuan Sep 19, 2024
a0a34a1
fix
aglinxinyuan Sep 19, 2024
3316f47
fix
aglinxinyuan Sep 19, 2024
e0a856b
fix
aglinxinyuan Sep 19, 2024
4eb0564
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 19, 2024
d91c167
fix
aglinxinyuan Sep 19, 2024
600934f
fix
aglinxinyuan Sep 19, 2024
7f511df
fix
aglinxinyuan Sep 19, 2024
47f88e6
fix
aglinxinyuan Sep 19, 2024
bf159cc
fix
aglinxinyuan Sep 19, 2024
a1ead8f
fix
aglinxinyuan Sep 19, 2024
4800620
fix
aglinxinyuan Sep 19, 2024
c454017
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 19, 2024
64b28f2
update
aglinxinyuan Sep 19, 2024
13b4dae
update
aglinxinyuan Sep 19, 2024
d4d2a48
update
aglinxinyuan Sep 19, 2024
38e57cc
update
aglinxinyuan Sep 19, 2024
f5b02cf
update
aglinxinyuan Sep 19, 2024
dbe12a0
update
aglinxinyuan Sep 19, 2024
6903721
update
aglinxinyuan Sep 19, 2024
4045d2f
update
aglinxinyuan Sep 20, 2024
c471697
update
aglinxinyuan Sep 20, 2024
18da3a5
Revert "update"
aglinxinyuan Sep 20, 2024
c60e606
update
aglinxinyuan Sep 20, 2024
96230f7
rename StartOfUpstream
aglinxinyuan Sep 20, 2024
c5632a3
rename EndOfUpstream
aglinxinyuan Sep 20, 2024
7e4d937
fix fmt
aglinxinyuan Sep 20, 2024
12dabe6
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 20, 2024
8989a26
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 20, 2024
1018814
update
aglinxinyuan Sep 20, 2024
485a143
update
aglinxinyuan Sep 20, 2024
a97b49d
update
aglinxinyuan Sep 20, 2024
52196b9
update
aglinxinyuan Sep 20, 2024
bda1ffc
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 20, 2024
6873652
update
aglinxinyuan Sep 20, 2024
fea6543
Merge remote-tracking branch 'origin/xinyuan-state-passing' into xiny…
aglinxinyuan Sep 20, 2024
3fad565
update
aglinxinyuan Sep 20, 2024
b816ad4
update
aglinxinyuan Sep 20, 2024
2f85274
update
aglinxinyuan Sep 20, 2024
6e92ee4
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 21, 2024
901e3ea
update
aglinxinyuan Sep 21, 2024
216dea8
fix fmt
aglinxinyuan Sep 21, 2024
d534956
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 21, 2024
3963b74
init
aglinxinyuan Sep 21, 2024
ab40d37
update
aglinxinyuan Sep 21, 2024
86fed23
update
aglinxinyuan Sep 21, 2024
405f2f5
update
aglinxinyuan Sep 21, 2024
9dfdb1b
update
aglinxinyuan Sep 21, 2024
15f0444
update
aglinxinyuan Sep 23, 2024
99ed947
update
aglinxinyuan Sep 23, 2024
8f0e08a
update
aglinxinyuan Sep 23, 2024
19ac196
update
aglinxinyuan Sep 23, 2024
f22f4e1
update
aglinxinyuan Sep 23, 2024
053eca6
update
aglinxinyuan Sep 23, 2024
5828053
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 23, 2024
efe0281
Merge branch 'xinyuan-state-passing' into xinyuan-state-operators
aglinxinyuan Sep 23, 2024
f189916
Merge branch 'master' into xinyuan-state-operators
aglinxinyuan Sep 24, 2024
26f07b3
Merge branch 'master' into xinyuan-state-operators
aglinxinyuan Sep 25, 2024
4943693
update
aglinxinyuan Sep 25, 2024
5dea6c6
update
aglinxinyuan Sep 25, 2024
6f6c3e5
Merge branch 'master' into xinyuan-state-operators
aglinxinyuan Sep 26, 2024
0f65da3
update
aglinxinyuan Sep 26, 2024
8b0d452
Merge branch 'master' into xinyuan-state-operators
aglinxinyuan Sep 26, 2024
ccc931d
Merge branch 'master' into xinyuan-state-operators
aglinxinyuan Sep 26, 2024
93783fc
update
aglinxinyuan Sep 26, 2024
de61058
update
aglinxinyuan Sep 26, 2024
2e0bd9f
Merge branch 'master' into xinyuan-state-operators
aglinxinyuan Sep 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ object OperatorGroupConstants {
final val PYTHON_GROUP = "Python"
final val JAVA_GROUP = "Java"
final val R_GROUP = "R"
final val STATE_GROUP = "State"
final val MACHINE_LEARNING_GENERAL_GROUP = "Machine Learning General"

/**
Expand All @@ -43,7 +44,7 @@ object OperatorGroupConstants {
GroupInfo(MACHINE_LEARNING_GENERAL_GROUP)
)
),
GroupInfo(UTILITY_GROUP),
GroupInfo(UTILITY_GROUP, List(GroupInfo(STATE_GROUP))),
GroupInfo(API_GROUP),
GroupInfo(UDF_GROUP, List(GroupInfo(PYTHON_GROUP), GroupInfo(JAVA_GROUP), GroupInfo(R_GROUP))),
GroupInfo(VISUALIZATION_GROUP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ import edu.uci.ics.texera.workflow.operators.visualization.quiverPlot.QuiverPlot
import edu.uci.ics.texera.workflow.operators.visualization.contourPlot.ContourPlotOpDesc
import edu.uci.ics.texera.workflow.operators.visualization.figureFactoryTable.FigureFactoryTableOpDesc
import edu.uci.ics.texera.workflow.operators.visualization.sankeyDiagram.SankeyDiagramOpDesc
import edu.uci.ics.texera.workflow.operators.state.{DataToStateOpDesc, StateToDataOpDesc}
import java.util.UUID
import scala.collection.mutable
import scala.util.Try
Expand All @@ -162,6 +163,8 @@ trait StateTransferFunc
)
@JsonSubTypes(
Array(
new Type(value = classOf[StateToDataOpDesc], name = "StateToData"),
new Type(value = classOf[DataToStateOpDesc], name = "DataToState"),
new Type(value = classOf[SankeyDiagramOpDesc], name = "SankeyDiagram"),
new Type(value = classOf[IcicleChartOpDesc], name = "IcicleChart"),
new Type(value = classOf[CSVScanSourceOpDesc], name = "CSVFileScan"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package edu.uci.ics.texera.workflow.operators.state

import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import edu.uci.ics.amber.engine.architecture.deploysemantics.{PhysicalOp, SchemaPropagationFunc}
import edu.uci.ics.amber.engine.architecture.deploysemantics.layer.OpExecInitInfo
import edu.uci.ics.amber.engine.common.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import edu.uci.ics.amber.engine.common.workflow.{InputPort, OutputPort, PortIdentity}
import edu.uci.ics.texera.workflow.common.metadata.{OperatorGroupConstants, OperatorInfo}
import edu.uci.ics.texera.workflow.common.operators.LogicalOp
import edu.uci.ics.texera.workflow.common.tuple.schema.Schema

class DataToStateOpDesc extends LogicalOp {
@JsonProperty(defaultValue = "false")
@JsonSchemaTitle("Pass To All Downstream")
@JsonDeserialize(contentAs = classOf[java.lang.Boolean])
var passToAllDownstream: Option[Boolean] = Option(false)

override def getPhysicalOp(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity
): PhysicalOp = {
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecInitInfo((_, _) => {
new DataToStateOpExec(passToAllDownstream.get)
})
)
.withInputPorts(operatorInfo.inputPorts)
.withOutputPorts(operatorInfo.outputPorts)
.withPropagateSchema(
SchemaPropagationFunc(inputSchemas => Map(PortIdentity() -> inputSchemas(PortIdentity(1))))
)
.withSuggestedWorkerNum(1)
}

override def operatorInfo: OperatorInfo =
OperatorInfo(
"Data To State",
"Convert Data to State",
OperatorGroupConstants.STATE_GROUP,
inputPorts = List(
InputPort(PortIdentity(), "State"),
InputPort(PortIdentity(1), "Data", dependencies = List(PortIdentity()))
),
outputPorts = List(OutputPort())
)

override def getOutputSchema(schemas: Array[Schema]): Schema = schemas(1)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package edu.uci.ics.texera.workflow.operators.state

import edu.uci.ics.amber.engine.common.tuple.amber.TupleLike
import edu.uci.ics.texera.workflow.common.State
import edu.uci.ics.texera.workflow.common.operators.OperatorExecutor
import edu.uci.ics.texera.workflow.common.tuple.Tuple

import scala.collection.mutable

class DataToStateOpExec(passToAllDownstream: Boolean) extends OperatorExecutor {
private val dataTuples = new mutable.ArrayBuffer[Tuple]()
private var stateTuple: Option[Tuple] = None

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
port match {
case 0 =>
if (stateTuple.isEmpty)
stateTuple = Some(tuple)
case 1 =>
dataTuples += tuple
}
Iterator.empty
}

override def produceStateOnFinish(port: Int): Option[State] =
Some(State(stateTuple, passToAllDownstream))

override def onFinish(port: Int): Iterator[TupleLike] = dataTuples.iterator
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package edu.uci.ics.texera.workflow.operators.state

import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import edu.uci.ics.amber.engine.architecture.deploysemantics.{PhysicalOp, SchemaPropagationFunc}
import edu.uci.ics.amber.engine.architecture.deploysemantics.layer.OpExecInitInfo
import edu.uci.ics.amber.engine.common.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import edu.uci.ics.amber.engine.common.workflow.{InputPort, OutputPort, PortIdentity}
import edu.uci.ics.texera.workflow.common.metadata.{OperatorGroupConstants, OperatorInfo}
import edu.uci.ics.texera.workflow.common.operators.LogicalOp
import edu.uci.ics.texera.workflow.common.tuple.schema.{Attribute, AttributeType, Schema}

class StateToDataOpDesc extends LogicalOp {
@JsonProperty
@JsonSchemaTitle("State output column(s)")
@JsonPropertyDescription(
"Name of the output columns that the first port will produce, if any"
)
var outputColumns: List[Attribute] = List()

override def getPhysicalOp(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity
): PhysicalOp = {
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecInitInfo((_, _) => {
new StateToDataOpExec()
})
)
.withInputPorts(operatorInfo.inputPorts)
.withOutputPorts(operatorInfo.outputPorts)
.withPropagateSchema(
SchemaPropagationFunc(inputSchemas =>
getOutputSchemas(
operatorInfo.inputPorts.map(port => inputSchemas(port.id)).toArray
).zipWithIndex.map {
case (schema, index) => PortIdentity(index) -> schema
}.toMap
)
)
.withSuggestedWorkerNum(1)
}

override def operatorInfo: OperatorInfo =
OperatorInfo(
"State To Data",
"Convert State to Data",
OperatorGroupConstants.STATE_GROUP,
inputPorts = List(InputPort()),
outputPorts = List(OutputPort(PortIdentity(), "State"), OutputPort(PortIdentity(1), "Data"))
)

override def getOutputSchema(schemas: Array[Schema]): Schema = throw new NotImplementedError()

override def getOutputSchemas(schemas: Array[Schema]): Array[Schema] =
Array(
Schema.builder().add("passToAllDownstream", AttributeType.BOOLEAN).add(outputColumns).build(),
schemas(0)
)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package edu.uci.ics.texera.workflow.operators.state

import edu.uci.ics.amber.engine.common.tuple.amber.TupleLike
import edu.uci.ics.amber.engine.common.workflow.PortIdentity
import edu.uci.ics.texera.workflow.common.State
import edu.uci.ics.texera.workflow.common.operators.OperatorExecutor
import edu.uci.ics.texera.workflow.common.tuple.Tuple

import scala.collection.mutable

class StateToDataOpExec extends OperatorExecutor {
private val outputTuples = new mutable.ArrayBuffer[(Tuple, Option[PortIdentity])]()
private var stateTuple: Option[Tuple] = None

override def processState(state: State, port: Int): Option[State] = {
if (stateTuple.isEmpty) {
stateTuple = Some(state.toTuple)
None
} else {
Some(state)
}
}

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
outputTuples += ((tuple, Some(PortIdentity(1))))
Iterator.empty
}

override def onFinishMultiPort(port: Int): Iterator[(TupleLike, Option[PortIdentity])] = {
if (stateTuple.isDefined) {
outputTuples += ((stateTuple.get, Some(PortIdentity())))
}
outputTuples.iterator
}
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading