Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
Yicong-Huang committed Dec 20, 2024
1 parent ad59019 commit 4e6bc2e
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,15 @@ import org.scalatest.flatspec.AnyFlatSpec

class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory {

"CostBasedRegionPlanGenerator" should "finish bottom-up search using different pruning techniques with correct number of states explored in csv->->filter->join->sink workflow" in {
"CostBasedRegionPlanGenerator" should "finish bottom-up search using different pruning techniques with correct number of states explored in csv->->filter->join workflow" in {
val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc()
val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia")
val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1")
val sink = TestOperators.sinkOpDesc()
val workflow = buildWorkflow(
List(
headerlessCsvOpDesc1,
keywordOpDesc,
joinOpDesc,
sink
joinOpDesc
),
List(
LogicalLink(
Expand All @@ -41,12 +39,6 @@ class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory {
PortIdentity(),
joinOpDesc.operatorIdentifier,
PortIdentity(1)
),
LogicalLink(
joinOpDesc.operatorIdentifier,
PortIdentity(),
sink.operatorIdentifier,
PortIdentity()
)
),
new WorkflowContext()
Expand Down Expand Up @@ -106,17 +98,15 @@ class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory {

}

"CostBasedRegionPlanGenerator" should "finish top-down search using different pruning techniques with correct number of states explored in csv->->filter->join->sink workflow" in {
"CostBasedRegionPlanGenerator" should "finish top-down search using different pruning techniques with correct number of states explored in csv->->filter->join workflow" in {
val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc()
val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia")
val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1")
val sink = TestOperators.sinkOpDesc()
val workflow = buildWorkflow(
List(
headerlessCsvOpDesc1,
keywordOpDesc,
joinOpDesc,
sink
joinOpDesc
),
List(
LogicalLink(
Expand All @@ -136,12 +126,6 @@ class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory {
PortIdentity(),
joinOpDesc.operatorIdentifier,
PortIdentity(1)
),
LogicalLink(
joinOpDesc.operatorIdentifier,
PortIdentity(),
sink.operatorIdentifier,
PortIdentity()
)
),
new WorkflowContext()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,17 @@ import org.scalatest.flatspec.AnyFlatSpec

class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory {

"RegionPlanGenerator" should "correctly find regions in headerlessCsv->keyword->sink workflow" in {
"RegionPlanGenerator" should "correctly find regions in headerlessCsv->keyword workflow" in {
val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc()
val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia")
val sink = TestOperators.sinkOpDesc()
val workflow = buildWorkflow(
List(headerlessCsvOpDesc, keywordOpDesc, sink),
List(headerlessCsvOpDesc, keywordOpDesc),
List(
LogicalLink(
headerlessCsvOpDesc.operatorIdentifier,
PortIdentity(0),
keywordOpDesc.operatorIdentifier,
PortIdentity(0)
),
LogicalLink(
keywordOpDesc.operatorIdentifier,
PortIdentity(0),
sink.operatorIdentifier,
PortIdentity(0)
)
),
new WorkflowContext()
Expand Down Expand Up @@ -61,17 +54,15 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory
}
}

"RegionPlanGenerator" should "correctly find regions in csv->(csv->)->join->sink workflow" in {
"RegionPlanGenerator" should "correctly find regions in csv->(csv->)->join workflow" in {
val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc()
val headerlessCsvOpDesc2 = TestOperators.headerlessSmallCsvScanOpDesc()
val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1")
val sink = TestOperators.sinkOpDesc()
val workflow = buildWorkflow(
List(
headerlessCsvOpDesc1,
headerlessCsvOpDesc2,
joinOpDesc,
sink
joinOpDesc
),
List(
LogicalLink(
Expand All @@ -85,12 +76,6 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory
PortIdentity(),
joinOpDesc.operatorIdentifier,
PortIdentity(1)
),
LogicalLink(
joinOpDesc.operatorIdentifier,
PortIdentity(),
sink.operatorIdentifier,
PortIdentity()
)
),
new WorkflowContext()
Expand Down Expand Up @@ -140,17 +125,15 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory

}

"RegionPlanGenerator" should "correctly find regions in csv->->filter->join->sink workflow" in {
"RegionPlanGenerator" should "correctly find regions in csv->->filter->join workflow" in {
val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc()
val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia")
val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1")
val sink = TestOperators.sinkOpDesc()
val workflow = buildWorkflow(
List(
headerlessCsvOpDesc1,
keywordOpDesc,
joinOpDesc,
sink
joinOpDesc
),
List(
LogicalLink(
Expand All @@ -170,12 +153,6 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory
PortIdentity(),
joinOpDesc.operatorIdentifier,
PortIdentity(1)
),
LogicalLink(
joinOpDesc.operatorIdentifier,
PortIdentity(),
sink.operatorIdentifier,
PortIdentity()
)
),
new WorkflowContext()
Expand Down Expand Up @@ -206,19 +183,17 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory
}
}
//
"RegionPlanGenerator" should "correctly find regions in buildcsv->probecsv->hashjoin->hashjoin->sink workflow" in {
"RegionPlanGenerator" should "correctly find regions in buildcsv->probecsv->hashjoin->hashjoin workflow" in {
val buildCsv = TestOperators.headerlessSmallCsvScanOpDesc()
val probeCsv = TestOperators.smallCsvScanOpDesc()
val hashJoin1 = TestOperators.joinOpDesc("column-1", "Region")
val hashJoin2 = TestOperators.joinOpDesc("column-2", "Country")
val sink = TestOperators.sinkOpDesc()
val workflow = buildWorkflow(
List(
buildCsv,
probeCsv,
hashJoin1,
hashJoin2,
sink
hashJoin2
),
List(
LogicalLink(
Expand All @@ -244,12 +219,6 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory
PortIdentity(),
hashJoin2.operatorIdentifier,
PortIdentity(1)
),
LogicalLink(
hashJoin2.operatorIdentifier,
PortIdentity(),
sink.operatorIdentifier,
PortIdentity()
)
),
new WorkflowContext()
Expand Down Expand Up @@ -284,14 +253,12 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory
val split = new SplitOpDesc()
val training = new PythonUDFOpDescV2()
val inference = new DualInputPortsPythonUDFOpDescV2()
val sink = TestOperators.sinkOpDesc()
val workflow = buildWorkflow(
List(
csv,
split,
training,
inference,
sink
inference
),
List(
LogicalLink(
Expand All @@ -317,12 +284,6 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory
PortIdentity(1),
inference.operatorIdentifier,
PortIdentity(1)
),
LogicalLink(
inference.operatorIdentifier,
PortIdentity(),
sink.operatorIdentifier,
PortIdentity()
)
),
new WorkflowContext()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,26 +102,18 @@ class BatchSizePropagationSpec
}
}

"Engine" should "propagate the correct batch size for headerlessCsv->sink workflow" in {
"Engine" should "propagate the correct batch size for headerlessCsv workflow" in {
val expectedBatchSize = 1

val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize)

val context = new WorkflowContext(workflowSettings = customWorkflowSettings)

val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc()
val sink = TestOperators.sinkOpDesc()

val workflow = buildWorkflow(
List(headerlessCsvOpDesc, sink),
List(
LogicalLink(
headerlessCsvOpDesc.operatorIdentifier,
PortIdentity(),
sink.operatorIdentifier,
PortIdentity()
)
),
List(headerlessCsvOpDesc),
List(),
context
)

Expand All @@ -131,7 +123,7 @@ class BatchSizePropagationSpec
verifyBatchSizeInPartitioning(workflowScheduler, 1)
}

"Engine" should "propagate the correct batch size for headerlessCsv->keyword->sink workflow" in {
"Engine" should "propagate the correct batch size for headerlessCsv->keyword workflow" in {
val expectedBatchSize = 500

val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize)
Expand All @@ -140,22 +132,15 @@ class BatchSizePropagationSpec

val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc()
val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia")
val sink = TestOperators.sinkOpDesc()

val workflow = buildWorkflow(
List(headerlessCsvOpDesc, keywordOpDesc, sink),
List(headerlessCsvOpDesc, keywordOpDesc),
List(
LogicalLink(
headerlessCsvOpDesc.operatorIdentifier,
PortIdentity(),
keywordOpDesc.operatorIdentifier,
PortIdentity()
),
LogicalLink(
keywordOpDesc.operatorIdentifier,
PortIdentity(),
sink.operatorIdentifier,
PortIdentity()
)
),
context
Expand All @@ -167,7 +152,7 @@ class BatchSizePropagationSpec
verifyBatchSizeInPartitioning(workflowScheduler, 500)
}

"Engine" should "propagate the correct batch size for csv->keyword->count->sink workflow" in {
"Engine" should "propagate the correct batch size for csv->keyword->count workflow" in {
val expectedBatchSize = 100

val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize)
Expand All @@ -178,10 +163,9 @@ class BatchSizePropagationSpec
val keywordOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia")
val countOpDesc =
TestOperators.aggregateAndGroupByDesc("Region", AggregationFunction.COUNT, List[String]())
val sink = TestOperators.sinkOpDesc()

val workflow = buildWorkflow(
List(csvOpDesc, keywordOpDesc, countOpDesc, sink),
List(csvOpDesc, keywordOpDesc, countOpDesc),
List(
LogicalLink(
csvOpDesc.operatorIdentifier,
Expand All @@ -194,12 +178,6 @@ class BatchSizePropagationSpec
PortIdentity(),
countOpDesc.operatorIdentifier,
PortIdentity()
),
LogicalLink(
countOpDesc.operatorIdentifier,
PortIdentity(),
sink.operatorIdentifier,
PortIdentity()
)
),
context
Expand All @@ -211,7 +189,7 @@ class BatchSizePropagationSpec
verifyBatchSizeInPartitioning(workflowScheduler, 100)
}

"Engine" should "propagate the correct batch size for csv->keyword->averageAndGroupBy->sink workflow" in {
"Engine" should "propagate the correct batch size for csv->keyword->averageAndGroupBy workflow" in {
val expectedBatchSize = 300

val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize)
Expand All @@ -226,10 +204,8 @@ class BatchSizePropagationSpec
AggregationFunction.AVERAGE,
List[String]("Country")
)
val sink = TestOperators.sinkOpDesc()

val workflow = buildWorkflow(
List(csvOpDesc, keywordOpDesc, averageAndGroupByOpDesc, sink),
List(csvOpDesc, keywordOpDesc, averageAndGroupByOpDesc),
List(
LogicalLink(
csvOpDesc.operatorIdentifier,
Expand All @@ -242,12 +218,6 @@ class BatchSizePropagationSpec
PortIdentity(),
averageAndGroupByOpDesc.operatorIdentifier,
PortIdentity()
),
LogicalLink(
averageAndGroupByOpDesc.operatorIdentifier,
PortIdentity(),
sink.operatorIdentifier,
PortIdentity()
)
),
context
Expand All @@ -259,7 +229,7 @@ class BatchSizePropagationSpec
verifyBatchSizeInPartitioning(workflowScheduler, 300)
}

"Engine" should "propagate the correct batch size for csv->(csv->)->join->sink workflow" in {
"Engine" should "propagate the correct batch size for csv->(csv->)->join workflow" in {
val expectedBatchSize = 1

val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize = expectedBatchSize)
Expand All @@ -269,14 +239,12 @@ class BatchSizePropagationSpec
val headerlessCsvOpDesc1 = TestOperators.headerlessSmallCsvScanOpDesc()
val headerlessCsvOpDesc2 = TestOperators.headerlessSmallCsvScanOpDesc()
val joinOpDesc = TestOperators.joinOpDesc("column-1", "column-1")
val sink = TestOperators.sinkOpDesc()

val workflow = buildWorkflow(
List(
headerlessCsvOpDesc1,
headerlessCsvOpDesc2,
joinOpDesc,
sink
joinOpDesc
),
List(
LogicalLink(
Expand All @@ -290,12 +258,6 @@ class BatchSizePropagationSpec
PortIdentity(),
joinOpDesc.operatorIdentifier,
PortIdentity(1)
),
LogicalLink(
joinOpDesc.operatorIdentifier,
PortIdentity(),
sink.operatorIdentifier,
PortIdentity()
)
),
context
Expand Down
Loading

0 comments on commit 4e6bc2e

Please sign in to comment.