diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala index be180b7cba3..d67524555e7 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala @@ -11,17 +11,15 @@ import org.scalatest.flatspec.AnyFlatSpec class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory { - "CostBasedRegionPlanGenerator" should "finish bottom-up search using different pruning techniques with correct number of states explored in csv->->filter->join->sink workflow" in { + "CostBasedRegionPlanGenerator" should "finish bottom-up search using different pruning techniques with correct number of states explored in csv->->filter->join workflow" in { val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia") val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( List( headerlessCsvOpDesc1, keywordOpDesc, - joinOpDesc, - sink + joinOpDesc ), List( LogicalLink( @@ -41,12 +39,6 @@ class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory { PortIdentity(), joinOpDesc.operatorIdentifier, PortIdentity(1) - ), - LogicalLink( - joinOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), new WorkflowContext() @@ -106,17 +98,15 @@ class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory { } - "CostBasedRegionPlanGenerator" should "finish top-down search using different pruning techniques with correct number of states explored in csv->->filter->join->sink workflow" in { + "CostBasedRegionPlanGenerator" should "finish top-down search using different pruning techniques with correct number of states explored in csv->->filter->join workflow" in { val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia") val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( List( headerlessCsvOpDesc1, keywordOpDesc, - joinOpDesc, - sink + joinOpDesc ), List( LogicalLink( @@ -136,12 +126,6 @@ class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory { PortIdentity(), joinOpDesc.operatorIdentifier, PortIdentity(1) - ), - LogicalLink( - joinOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), new WorkflowContext() diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGeneratorSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGeneratorSpec.scala index ea8c3c96a51..c28c3265a20 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGeneratorSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGeneratorSpec.scala @@ -13,24 +13,17 @@ import org.scalatest.flatspec.AnyFlatSpec class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory { - "RegionPlanGenerator" should "correctly find regions in headerlessCsv->keyword->sink workflow" in { + "RegionPlanGenerator" should "correctly find regions in headerlessCsv->keyword workflow" in { val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(headerlessCsvOpDesc, keywordOpDesc, sink), + List(headerlessCsvOpDesc, keywordOpDesc), List( LogicalLink( headerlessCsvOpDesc.operatorIdentifier, PortIdentity(0), keywordOpDesc.operatorIdentifier, PortIdentity(0) - ), - LogicalLink( - keywordOpDesc.operatorIdentifier, - PortIdentity(0), - sink.operatorIdentifier, - PortIdentity(0) ) ), new WorkflowContext() @@ -61,17 +54,15 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory } } - "RegionPlanGenerator" should "correctly find regions in csv->(csv->)->join->sink workflow" in { + "RegionPlanGenerator" should "correctly find regions in csv->(csv->)->join workflow" in { val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc() val headerlessCsvOpDesc2 = TestOperators.headerlessSmallCsvScanOpDesc() val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( List( headerlessCsvOpDesc1, headerlessCsvOpDesc2, - joinOpDesc, - sink + joinOpDesc ), List( LogicalLink( @@ -85,12 +76,6 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory PortIdentity(), joinOpDesc.operatorIdentifier, PortIdentity(1) - ), - LogicalLink( - joinOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), new WorkflowContext() @@ -140,17 +125,15 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory } - "RegionPlanGenerator" should "correctly find regions in csv->->filter->join->sink workflow" in { + "RegionPlanGenerator" should "correctly find regions in csv->->filter->join workflow" in { val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia") val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( List( headerlessCsvOpDesc1, keywordOpDesc, - joinOpDesc, - sink + joinOpDesc ), List( LogicalLink( @@ -170,12 +153,6 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory PortIdentity(), joinOpDesc.operatorIdentifier, PortIdentity(1) - ), - LogicalLink( - joinOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), new WorkflowContext() @@ -206,19 +183,17 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory } } // - "RegionPlanGenerator" should "correctly find regions in buildcsv->probecsv->hashjoin->hashjoin->sink workflow" in { + "RegionPlanGenerator" should "correctly find regions in buildcsv->probecsv->hashjoin->hashjoin workflow" in { val buildCsv = TestOperators.headerlessSmallCsvScanOpDesc() val probeCsv = TestOperators.smallCsvScanOpDesc() val hashJoin1 = TestOperators.joinOpDesc("column-1", "Region") val hashJoin2 = TestOperators.joinOpDesc("column-2", "Country") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( List( buildCsv, probeCsv, hashJoin1, - hashJoin2, - sink + hashJoin2 ), List( LogicalLink( @@ -244,12 +219,6 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory PortIdentity(), hashJoin2.operatorIdentifier, PortIdentity(1) - ), - LogicalLink( - hashJoin2.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), new WorkflowContext() @@ -284,14 +253,12 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory val split = new SplitOpDesc() val training = new PythonUDFOpDescV2() val inference = new DualInputPortsPythonUDFOpDescV2() - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( List( csv, split, training, - inference, - sink + inference ), List( LogicalLink( @@ -317,12 +284,6 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory PortIdentity(1), inference.operatorIdentifier, PortIdentity(1) - ), - LogicalLink( - inference.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), new WorkflowContext() diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/BatchSizePropagationSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/BatchSizePropagationSpec.scala index 1b9a90d978c..a62d37ca284 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/BatchSizePropagationSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/BatchSizePropagationSpec.scala @@ -102,7 +102,7 @@ class BatchSizePropagationSpec } } - "Engine" should "propagate the correct batch size for headerlessCsv->sink workflow" in { + "Engine" should "propagate the correct batch size for headerlessCsv workflow" in { val expectedBatchSize = 1 val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) @@ -110,18 +110,10 @@ class BatchSizePropagationSpec val context = new WorkflowContext(workflowSettings = customWorkflowSettings) val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc() - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(headerlessCsvOpDesc, sink), - List( - LogicalLink( - headerlessCsvOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() - ) - ), + List(headerlessCsvOpDesc), + List(), context ) @@ -131,7 +123,7 @@ class BatchSizePropagationSpec verifyBatchSizeInPartitioning(workflowScheduler, 1) } - "Engine" should "propagate the correct batch size for headerlessCsv->keyword->sink workflow" in { + "Engine" should "propagate the correct batch size for headerlessCsv->keyword workflow" in { val expectedBatchSize = 500 val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) @@ -140,22 +132,15 @@ class BatchSizePropagationSpec val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(headerlessCsvOpDesc, keywordOpDesc, sink), + List(headerlessCsvOpDesc, keywordOpDesc), List( LogicalLink( headerlessCsvOpDesc.operatorIdentifier, PortIdentity(), keywordOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - keywordOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), context @@ -167,7 +152,7 @@ class BatchSizePropagationSpec verifyBatchSizeInPartitioning(workflowScheduler, 500) } - "Engine" should "propagate the correct batch size for csv->keyword->count->sink workflow" in { + "Engine" should "propagate the correct batch size for csv->keyword->count workflow" in { val expectedBatchSize = 100 val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) @@ -178,10 +163,9 @@ class BatchSizePropagationSpec val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") val countOpDesc = TestOperators.aggregateAndGroupByDesc("Region", AggregationFunction.COUNT, List[String]()) - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(csvOpDesc, keywordOpDesc, countOpDesc, sink), + List(csvOpDesc, keywordOpDesc, countOpDesc), List( LogicalLink( csvOpDesc.operatorIdentifier, @@ -194,12 +178,6 @@ class BatchSizePropagationSpec PortIdentity(), countOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - countOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), context @@ -211,7 +189,7 @@ class BatchSizePropagationSpec verifyBatchSizeInPartitioning(workflowScheduler, 100) } - "Engine" should "propagate the correct batch size for csv->keyword->averageAndGroupBy->sink workflow" in { + "Engine" should "propagate the correct batch size for csv->keyword->averageAndGroupBy workflow" in { val expectedBatchSize = 300 val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) @@ -226,10 +204,8 @@ class BatchSizePropagationSpec AggregationFunction.AVERAGE, List[String]("Country") ) - val sink = TestOperators.sinkOpDesc() - val workflow = buildWorkflow( - List(csvOpDesc, keywordOpDesc, averageAndGroupByOpDesc, sink), + List(csvOpDesc, keywordOpDesc, averageAndGroupByOpDesc), List( LogicalLink( csvOpDesc.operatorIdentifier, @@ -242,12 +218,6 @@ class BatchSizePropagationSpec PortIdentity(), averageAndGroupByOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - averageAndGroupByOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), context @@ -259,7 +229,7 @@ class BatchSizePropagationSpec verifyBatchSizeInPartitioning(workflowScheduler, 300) } - "Engine" should "propagate the correct batch size for csv->(csv->)->join->sink workflow" in { + "Engine" should "propagate the correct batch size for csv->(csv->)->join workflow" in { val expectedBatchSize = 1 val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize) @@ -269,14 +239,12 @@ class BatchSizePropagationSpec val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc() val headerlessCsvOpDesc2 = TestOperators.headerlessSmallCsvScanOpDesc() val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( List( headerlessCsvOpDesc1, headerlessCsvOpDesc2, - joinOpDesc, - sink + joinOpDesc ), List( LogicalLink( @@ -290,12 +258,6 @@ class BatchSizePropagationSpec PortIdentity(), joinOpDesc.operatorIdentifier, PortIdentity(1) - ), - LogicalLink( - joinOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), context diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala index 11a63cbf2c6..f6c04cb826b 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala @@ -123,67 +123,43 @@ class DataProcessingSpec ("localhost", config.getPort.toString, database, table, username, password) } - "Engine" should "execute headerlessCsv->sink workflow normally" in { + "Engine" should "execute headerlessCsv workflow normally" in { val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc() - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(headerlessCsvOpDesc, sink), - List( - LogicalLink( - headerlessCsvOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() - ) - ), + List(headerlessCsvOpDesc), + List(), workflowContext ) - val results = executeWorkflow(workflow)(sink.operatorIdentifier) + val results = executeWorkflow(workflow)(headerlessCsvOpDesc.operatorIdentifier) assert(results.size == 100) } - "Engine" should "execute headerlessMultiLineDataCsv-->sink workflow normally" in { + "Engine" should "execute headerlessMultiLineDataCsv workflow normally" in { val headerlessCsvOpDesc = TestOperators.headerlessSmallMultiLineDataCsvScanOpDesc() - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(headerlessCsvOpDesc, sink), - List( - LogicalLink( - headerlessCsvOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() - ) - ), + List(headerlessCsvOpDesc), + List(), workflowContext ) - val results = executeWorkflow(workflow)(sink.operatorIdentifier) + val results = executeWorkflow(workflow)(headerlessCsvOpDesc.operatorIdentifier) assert(results.size == 100) } - "Engine" should "execute jsonl->sink workflow normally" in { + "Engine" should "execute jsonl workflow normally" in { val jsonlOp = TestOperators.smallJSONLScanOpDesc() - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(jsonlOp, sink), - List( - LogicalLink( - jsonlOp.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() - ) - ), + List(jsonlOp), + List(), workflowContext ) - val results = executeWorkflow(workflow)(sink.operatorIdentifier) + val results = executeWorkflow(workflow)(jsonlOp.operatorIdentifier) assert(results.size == 100) for (result <- results) { - val schema = result.asInstanceOf[Tuple].getSchema + val schema = result.getSchema assert(schema.getAttribute("id").getType == AttributeType.LONG) assert(schema.getAttribute("first_name").getType == AttributeType.STRING) assert(schema.getAttribute("flagged").getType == AttributeType.BOOLEAN) @@ -194,27 +170,19 @@ class DataProcessingSpec } - "Engine" should "execute mediumFlattenJsonl->sink workflow normally" in { + "Engine" should "execute mediumFlattenJsonl workflow normally" in { val jsonlOp = TestOperators.mediumFlattenJSONLScanOpDesc() - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(jsonlOp, sink), - List( - LogicalLink( - jsonlOp.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() - ) - ), + List(jsonlOp), + List(), workflowContext ) - val results = executeWorkflow(workflow)(sink.operatorIdentifier) + val results = executeWorkflow(workflow)(jsonlOp.operatorIdentifier) assert(results.size == 1000) for (result <- results) { - val schema = result.asInstanceOf[Tuple].getSchema + val schema = result.getSchema assert(schema.getAttribute("id").getType == AttributeType.LONG) assert(schema.getAttribute("first_name").getType == AttributeType.STRING) assert(schema.getAttribute("flagged").getType == AttributeType.BOOLEAN) @@ -225,24 +193,17 @@ class DataProcessingSpec } } - "Engine" should "execute headerlessCsv->keyword->sink workflow normally" in { + "Engine" should "execute headerlessCsv->keyword workflow normally" in { val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(headerlessCsvOpDesc, keywordOpDesc, sink), + List(headerlessCsvOpDesc, keywordOpDesc), List( LogicalLink( headerlessCsvOpDesc.operatorIdentifier, PortIdentity(), keywordOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - keywordOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), workflowContext @@ -250,42 +211,27 @@ class DataProcessingSpec executeWorkflow(workflow) } - "Engine" should "execute csv->sink workflow normally" in { + "Engine" should "execute csv workflow normally" in { val csvOpDesc = TestOperators.smallCsvScanOpDesc() - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(csvOpDesc, sink), - List( - LogicalLink( - csvOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() - ) - ), + List(csvOpDesc), + List(), workflowContext ) executeWorkflow(workflow) } - "Engine" should "execute csv->keyword->sink workflow normally" in { + "Engine" should "execute csv->keyword workflow normally" in { val csvOpDesc = TestOperators.smallCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(csvOpDesc, keywordOpDesc, sink), + List(csvOpDesc, keywordOpDesc), List( LogicalLink( csvOpDesc.operatorIdentifier, PortIdentity(), keywordOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - keywordOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), workflowContext @@ -298,9 +244,8 @@ class DataProcessingSpec val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") val countOpDesc = TestOperators.aggregateAndGroupByDesc("Region", AggregationFunction.COUNT, List[String]()) - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(csvOpDesc, keywordOpDesc, countOpDesc, sink), + List(csvOpDesc, keywordOpDesc, countOpDesc), List( LogicalLink( csvOpDesc.operatorIdentifier, @@ -313,12 +258,6 @@ class DataProcessingSpec PortIdentity(), countOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - countOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), workflowContext @@ -326,7 +265,7 @@ class DataProcessingSpec executeWorkflow(workflow) } - "Engine" should "execute csv->keyword->averageAndGroupBy->sink workflow normally" in { + "Engine" should "execute csv->keyword->averageAndGroupBy workflow normally" in { val csvOpDesc = TestOperators.smallCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") val averageAndGroupByOpDesc = @@ -335,9 +274,8 @@ class DataProcessingSpec AggregationFunction.AVERAGE, List[String]("Country") ) - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(csvOpDesc, keywordOpDesc, averageAndGroupByOpDesc, sink), + List(csvOpDesc, keywordOpDesc, averageAndGroupByOpDesc), List( LogicalLink( csvOpDesc.operatorIdentifier, @@ -350,12 +288,6 @@ class DataProcessingSpec PortIdentity(), averageAndGroupByOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - averageAndGroupByOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), workflowContext @@ -363,17 +295,15 @@ class DataProcessingSpec executeWorkflow(workflow) } - "Engine" should "execute csv->(csv->)->join->sink workflow normally" in { + "Engine" should "execute csv->(csv->)->join workflow normally" in { val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc() val headerlessCsvOpDesc2 = TestOperators.headerlessSmallCsvScanOpDesc() val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( List( headerlessCsvOpDesc1, headerlessCsvOpDesc2, - joinOpDesc, - sink + joinOpDesc ), List( LogicalLink( @@ -387,12 +317,6 @@ class DataProcessingSpec PortIdentity(), joinOpDesc.operatorIdentifier, PortIdentity(1) - ), - LogicalLink( - joinOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), workflowContext @@ -401,20 +325,17 @@ class DataProcessingSpec } // TODO: use mock data to perform the test, remove dependency on the real AsterixDB - // "Engine" should "execute asterixdb->sink workflow normally" in { + // "Engine" should "execute asterixdb workflow normally" in { // // val asterixDBOp = TestOperators.asterixDBSourceOpDesc() - // val sink = TestOperators.sinkOpDesc() // val (id, workflow) = buildWorkflow( - // List(asterixDBOp, sink), - // List( - // OperatorLink(OperatorPort(asterixDBOp.operatorIdentifier, 0), OperatorPort(sink.operatorIdentifier, 0)) - // ) + // List(asterixDBOp), + // List() // ) // executeWorkflow(id, workflow) // } - "Engine" should "execute mysql->sink workflow normally" in { + "Engine" should "execute mysql workflow normally" in { val (host, port, database, table, username, password) = initializeInMemoryMySQLInstance() val inMemoryMsSQLSourceOpDesc = TestOperators.inMemoryMySQLSourceOpDesc( host, @@ -425,17 +346,9 @@ class DataProcessingSpec password ) - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(inMemoryMsSQLSourceOpDesc, sink), - List( - LogicalLink( - inMemoryMsSQLSourceOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() - ) - ), + List(inMemoryMsSQLSourceOpDesc), + List(), workflowContext ) executeWorkflow(workflow) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala index cc668fbc49c..014f3080b98 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala @@ -73,44 +73,29 @@ class PauseSpec Await.result(completion) } - "Engine" should "be able to pause csv->sink workflow" in { + "Engine" should "be able to pause csv workflow" in { val csvOpDesc = TestOperators.mediumCsvScanOpDesc() - val sink = TestOperators.sinkOpDesc() - logger.info(s"csv-id ${csvOpDesc.operatorIdentifier}, sink-id ${sink.operatorIdentifier}") + logger.info(s"csv-id ${csvOpDesc.operatorIdentifier}") shouldPause( - List(csvOpDesc, sink), - List( - LogicalLink( - csvOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() - ) - ) + List(csvOpDesc), + List() ) } - "Engine" should "be able to pause csv->keyword->sink workflow" in { + "Engine" should "be able to pause csv->keyword workflow" in { val csvOpDesc = TestOperators.mediumCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") - val sink = TestOperators.sinkOpDesc() logger.info( - s"csv-id ${csvOpDesc.operatorIdentifier}, keyword-id ${keywordOpDesc.operatorIdentifier}, sink-id ${sink.operatorIdentifier}" + s"csv-id ${csvOpDesc.operatorIdentifier}, keyword-id ${keywordOpDesc.operatorIdentifier}" ) shouldPause( - List(csvOpDesc, keywordOpDesc, sink), + List(csvOpDesc, keywordOpDesc), List( LogicalLink( csvOpDesc.operatorIdentifier, PortIdentity(), keywordOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - keywordOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ) ) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/faulttolerance/CheckpointSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/faulttolerance/CheckpointSpec.scala index d697a4946e6..6c694b989c1 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/faulttolerance/CheckpointSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/faulttolerance/CheckpointSpec.scala @@ -24,21 +24,14 @@ class CheckpointSpec extends AnyFlatSpecLike with BeforeAndAfterAll { val resultStorage = new OpResultStorage() val csvOpDesc = TestOperators.mediumCsvScanOpDesc() val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") - val sink = TestOperators.sinkOpDesc() val workflow = buildWorkflow( - List(csvOpDesc, keywordOpDesc, sink), + List(csvOpDesc, keywordOpDesc), List( LogicalLink( csvOpDesc.operatorIdentifier, PortIdentity(), keywordOpDesc.operatorIdentifier, PortIdentity() - ), - LogicalLink( - keywordOpDesc.operatorIdentifier, - PortIdentity(), - sink.operatorIdentifier, - PortIdentity() ) ), new WorkflowContext()