Skip to content

Commit

Permalink
Merge branch 'master' into chris-normalize-workflow-runtime-statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
kunwp1 authored Dec 19, 2024
2 parents 0b20866 + 5524099 commit 5dbe0a4
Show file tree
Hide file tree
Showing 46 changed files with 155 additions and 305 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,19 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregat
KILLED,
RUNNING
}
import edu.uci.ics.amber.operator.sink.IncrementalOutputMode.{SET_DELTA, SET_SNAPSHOT}
import edu.uci.ics.amber.engine.common.client.AmberClient
import edu.uci.ics.amber.engine.common.executionruntimestate.ExecutionMetadataStore
import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime}
import edu.uci.ics.amber.operator.sink.IncrementalOutputMode
import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc
import edu.uci.ics.amber.virtualidentity.OperatorIdentity
import edu.uci.ics.amber.workflow.OutputPort.OutputMode
import edu.uci.ics.texera.web.SubscriptionManager
import edu.uci.ics.texera.web.model.websocket.event.{
PaginatedResultEvent,
TexeraWebSocketEvent,
WebResultUpdateEvent
}
import edu.uci.ics.texera.web.model.websocket.request.ResultPaginationRequest
import edu.uci.ics.texera.web.service.ExecutionResultService.WebResultUpdate
import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore}
import edu.uci.ics.texera.workflow.LogicalPlan

Expand All @@ -43,28 +41,17 @@ import scala.concurrent.duration.DurationInt

object ExecutionResultService {

val defaultPageSize: Int = 5

// convert Tuple from engine's format to JSON format
def webDataFromTuple(
mode: WebOutputMode,
table: List[Tuple],
chartType: Option[String]
): WebDataUpdate = {
val tableInJson = table.map(t => t.asKeyValuePairJson())
WebDataUpdate(mode, tableInJson, chartType)
}
private val defaultPageSize: Int = 5

/**
* convert Tuple from engine's format to JSON format
*/
private def tuplesToWebData(
mode: WebOutputMode,
table: List[Tuple],
chartType: Option[String]
table: List[Tuple]
): WebDataUpdate = {
val tableInJson = table.map(t => t.asKeyValuePairJson())
WebDataUpdate(mode, tableInJson, chartType)
WebDataUpdate(mode, tableInJson)
}

/**
Expand All @@ -75,41 +62,40 @@ object ExecutionResultService {
*
* Produces the WebResultUpdate to send to frontend from a result update from the engine.
*/
def convertWebResultUpdate(
private def convertWebResultUpdate(
sink: ProgressiveSinkOpDesc,
oldTupleCount: Int,
newTupleCount: Int
): WebResultUpdate = {
val webOutputMode: WebOutputMode = {
(sink.getOutputMode, sink.getChartType) match {
// visualization sinks use its corresponding mode
case (SET_SNAPSHOT, Some(_)) => SetSnapshotMode()
case (SET_DELTA, Some(_)) => SetDeltaMode()
// Non-visualization sinks use pagination mode
case (_, None) => PaginationMode()
sink.getOutputMode match {
// currently, only table outputs are using these modes
case OutputMode.SET_DELTA => SetDeltaMode()
case OutputMode.SET_SNAPSHOT => PaginationMode()

// currently, only visualizations are using single snapshot mode
case OutputMode.SINGLE_SNAPSHOT => SetSnapshotMode()
}
}

val storage =
ResultStorage.getOpResultStorage(sink.getContext.workflowId).get(sink.getUpstreamId.get)
val webUpdate = (webOutputMode, sink.getOutputMode) match {
case (PaginationMode(), SET_SNAPSHOT) =>
val webUpdate = webOutputMode match {
case PaginationMode() =>
val numTuples = storage.getCount
val maxPageIndex =
Math.ceil(numTuples / ExecutionResultService.defaultPageSize.toDouble).toInt
Math.ceil(numTuples / defaultPageSize.toDouble).toInt
WebPaginationUpdate(
PaginationMode(),
newTupleCount,
(1 to maxPageIndex).toList
)
case (SetSnapshotMode(), SET_SNAPSHOT) =>
tuplesToWebData(webOutputMode, storage.get().toList, sink.getChartType)
case (SetDeltaMode(), SET_DELTA) =>
case SetSnapshotMode() =>
tuplesToWebData(webOutputMode, storage.get().toList)
case SetDeltaMode() =>
val deltaList = storage.getAfter(oldTupleCount).toList
tuplesToWebData(webOutputMode, deltaList, sink.getChartType)
tuplesToWebData(webOutputMode, deltaList)

// currently not supported mode combinations
// (PaginationMode, SET_DELTA) | (DataSnapshotMode, SET_DELTA) | (DataDeltaMode, SET_SNAPSHOT)
case _ =>
throw new RuntimeException(
"update mode combination not supported: " + (webOutputMode, sink.getOutputMode)
Expand Down Expand Up @@ -152,8 +138,8 @@ object ExecutionResultService {
dirtyPageIndices: List[Int]
) extends WebResultUpdate

case class WebDataUpdate(mode: WebOutputMode, table: List[ObjectNode], chartType: Option[String])
extends WebResultUpdate
case class WebDataUpdate(mode: WebOutputMode, table: List[ObjectNode]) extends WebResultUpdate

}

/**
Expand Down Expand Up @@ -227,7 +213,7 @@ class ExecutionResultService(

addSubscription(
workflowStateStore.resultStore.registerDiffHandler((oldState, newState) => {
val buf = mutable.HashMap[String, WebResultUpdate]()
val buf = mutable.HashMap[String, ExecutionResultService.WebResultUpdate]()
val allTableStats = mutable.Map[String, Map[String, Map[String, Any]]]()
newState.resultInfo
.filter(info => {
Expand Down Expand Up @@ -332,7 +318,7 @@ class ExecutionResultService(
.toInt
val mode = sink.getOutputMode
val changeDetector =
if (mode == IncrementalOutputMode.SET_SNAPSHOT) {
if (mode == OutputMode.SET_SNAPSHOT) {
UUID.randomUUID.toString
} else ""
(id, OperatorResultMetadata(count, changeDetector))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package edu.uci.ics.texera.workflow

import edu.uci.ics.amber.operator.sink.SinkOpDesc
import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc
import edu.uci.ics.amber.operator.visualization.VisualizationOperator
import edu.uci.ics.amber.virtualidentity.OperatorIdentity
import edu.uci.ics.amber.workflow.PortIdentity

Expand Down Expand Up @@ -61,14 +60,9 @@ object SinkInjectionTransformer {
sinkOp.setUpstreamPort(edge.get.fromPortId.id)

// set output mode for visualization operator
(upstream.get, sinkOp) match {
// match the combination of a visualization operator followed by a sink operator
case (viz: VisualizationOperator, sink: ProgressiveSinkOpDesc) =>
sink.setOutputMode(viz.outputMode())
sink.setChartType(viz.chartType())
case _ =>
//skip
}
val outputPort =
upstream.get.operatorInfo.outputPorts.find(port => port.id == edge.get.fromPortId).get
sinkOp.setOutputMode(outputPort.mode)
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.controller.Workflow
import edu.uci.ics.amber.engine.common.Utils.objectMapper
import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc
import edu.uci.ics.amber.operator.visualization.VisualizationConstants
import edu.uci.ics.amber.virtualidentity.OperatorIdentity
import edu.uci.ics.amber.workflow.OutputPort.OutputMode.SINGLE_SNAPSHOT
import edu.uci.ics.amber.workflow.PhysicalLink
import edu.uci.ics.amber.workflowruntimestate.WorkflowFatalError
import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo
Expand Down Expand Up @@ -202,7 +202,7 @@ class WorkflowCompiler(
// due to the size limit of single document in mongoDB (16MB)
// for sinks visualizing HTMLs which could possibly be large in size, we always use the memory storage.
val storageType = {
if (sink.getChartType.contains(VisualizationConstants.HTML_VIZ)) OpResultStorage.MEMORY
if (sink.getOutputMode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY
else OpResultStorage.defaultStorageMode
}
if (!reuseStorageSet.contains(storageKey) || !storage.contains(storageKey)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { WebDataUpdate, WebPaginationUpdate } from "../../types/execute-workflow.interface";
import { Point, OperatorPredicate } from "../../types/workflow-common.interface";
import { PaginatedResultEvent } from "../../types/workflow-websocket.interface";
import { IndexableObject } from "ng-zorro-antd/core/types";

export const mockData: IndexableObject[] = [
Expand Down Expand Up @@ -44,7 +43,6 @@ export const mockData: IndexableObject[] = [

export const mockResultSnapshotUpdate: WebDataUpdate = {
mode: { type: "SetSnapshotMode" },
chartType: undefined,
table: mockData,
};

Expand All @@ -54,13 +52,6 @@ export const mockResultPaginationUpdate: WebPaginationUpdate = {
totalNumTuples: mockData.length,
};

export const paginationResponse: PaginatedResultEvent = {
requestID: "requestID",
operatorID: "operator-sink",
pageIndex: 1,
table: mockData,
};

export const mockResultOperator: OperatorPredicate = {
operatorID: "operator-sink",
operatorType: "ViewResults",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package edu.uci.ics.amber.compiler.util
import edu.uci.ics.amber.compiler.model.LogicalPlan
import edu.uci.ics.amber.operator.sink.SinkOpDesc
import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc
import edu.uci.ics.amber.operator.visualization.VisualizationOperator
import edu.uci.ics.amber.virtualidentity.OperatorIdentity
import edu.uci.ics.amber.workflow.PortIdentity

Expand Down Expand Up @@ -62,14 +61,9 @@ object SinkInjectionTransformer {
sinkOp.setUpstreamPort(edge.get.fromPortId.id)

// set output mode for visualization operator
(upstream.get, sinkOp) match {
// match the combination of a visualization operator followed by a sink operator
case (viz: VisualizationOperator, sink: ProgressiveSinkOpDesc) =>
sink.setOutputMode(viz.outputMode())
sink.setChartType(viz.chartType())
case _ =>
//skip
}
val outputPort =
upstream.get.operatorInfo.outputPorts.find(port => port.id == edge.get.fromPortId).get
sinkOp.setOutputMode(outputPort.mode)
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,23 @@ message InputPort {
repeated PortIdentity dependencies = 4;
}



message OutputPort {
enum OutputMode {
// outputs complete result set snapshot for each update
SET_SNAPSHOT = 0;
// outputs incremental result set delta for each update
SET_DELTA = 1;
// outputs a single snapshot for the entire execution,
// used explicitly to support visualization operators that may exceed the memory limit
// TODO: remove this mode after we have a better solution for output size limit
SINGLE_SNAPSHOT = 2;
}
PortIdentity id = 1 [(scalapb.field).no_box = true];
string displayName = 2;
bool blocking = 3;
OutputMode mode = 4;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public OperatorInfo operatorInfo() {
"Performs a filter operation",
OperatorGroupConstants.CLEANING_GROUP(),
asScala(singletonList(new InputPort(new PortIdentity(0, false), "", false, asScala(new ArrayList<PortIdentity>()).toSeq()))).toList(),
asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false))).toList(),
asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0)))).toList(),
false,
false,
true,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import edu.uci.ics.amber.core.workflow.SchemaPropagationFunc;
import edu.uci.ics.amber.operator.metadata.OperatorGroupConstants;
import edu.uci.ics.amber.operator.metadata.OperatorInfo;
import edu.uci.ics.amber.operator.sink.IncrementalOutputMode;
import edu.uci.ics.amber.operator.sink.ProgressiveUtils;
import edu.uci.ics.amber.operator.sink.SinkOpDesc;
import edu.uci.ics.amber.operator.util.OperatorDescriptorUtils;
Expand All @@ -27,7 +26,7 @@
import java.util.ArrayList;
import java.util.function.Function;

import static edu.uci.ics.amber.operator.sink.IncrementalOutputMode.SET_SNAPSHOT;

import static java.util.Collections.singletonList;
import static scala.jdk.javaapi.CollectionConverters.asScala;

Expand All @@ -36,11 +35,7 @@ public class ProgressiveSinkOpDesc extends SinkOpDesc {
// use SET_SNAPSHOT as the default output mode
// this will be set internally by the workflow compiler
@JsonIgnore
private IncrementalOutputMode outputMode = SET_SNAPSHOT;

// whether this sink corresponds to a visualization result, default is no
@JsonIgnore
private Option<String> chartType = Option.empty();
private OutputPort.OutputMode outputMode = OutputPort.OutputMode$.MODULE$.fromValue(0);


// corresponding upstream operator ID and output port, will be set by workflow compiler
Expand Down Expand Up @@ -73,7 +68,7 @@ public PhysicalOp getPhysicalOp(WorkflowIdentity workflowId, ExecutionIdentity e

// SET_SNAPSHOT:
Schema outputSchema;
if (this.outputMode.equals(SET_SNAPSHOT)) {
if (this.outputMode.equals(OutputPort.OutputMode$.MODULE$.fromValue(0))) {
if (inputSchema.containsAttribute(ProgressiveUtils.insertRetractFlagAttr().getName())) {
// input is insert/retract delta: the flag column is removed in output
outputSchema = Schema.builder().add(inputSchema)
Expand Down Expand Up @@ -101,7 +96,7 @@ public OperatorInfo operatorInfo() {
"View the results",
OperatorGroupConstants.UTILITY_GROUP(),
asScala(singletonList(new InputPort(new PortIdentity(0, false), "", false, asScala(new ArrayList<PortIdentity>()).toSeq()))).toList(),
asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false))).toList(),
asScala(singletonList(new OutputPort(new PortIdentity(0, false), "", false, OutputPort.OutputMode$.MODULE$.fromValue(0)))).toList(),
false,
false,
false,
Expand All @@ -114,7 +109,7 @@ public Schema getOutputSchema(Schema[] schemas) {
Schema inputSchema = schemas[0];

// SET_SNAPSHOT:
if (this.outputMode.equals(SET_SNAPSHOT)) {
if (this.outputMode.equals(OutputPort.OutputMode$.MODULE$.fromValue(0))) {
if (inputSchema.containsAttribute(ProgressiveUtils.insertRetractFlagAttr().getName())) {
// input is insert/retract delta: the flag column is removed in output
return Schema.builder().add(inputSchema)
Expand All @@ -130,26 +125,15 @@ public Schema getOutputSchema(Schema[] schemas) {
}

@JsonIgnore
public IncrementalOutputMode getOutputMode() {
public OutputPort.OutputMode getOutputMode() {
return outputMode;
}

@JsonIgnore
public void setOutputMode(IncrementalOutputMode outputMode) {
public void setOutputMode(OutputPort.OutputMode outputMode) {
this.outputMode = outputMode;
}

@JsonIgnore
public Option<String> getChartType() {
return this.chartType;
}

@JsonIgnore
public void setChartType(String chartType) {
this.chartType = Option.apply(chartType);
}


@JsonIgnore
public Option<OperatorIdentity> getUpstreamId() {
return upstreamId;
Expand Down
Loading

0 comments on commit 5dbe0a4

Please sign in to comment.