Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
Yicong-Huang committed Dec 20, 2024
1 parent 69d8290 commit b603b8d
Showing 1 changed file with 2 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,8 @@ class DataProcessingSpec
.registerCallback[ExecutionStateUpdate](evt => {
if (evt.state == COMPLETED) {
results = workflow.logicalPlan.getTerminalOperatorIds
.map(sinkOpId =>
(sinkOpId, workflow.logicalPlan.getUpstreamOps(sinkOpId).head.operatorIdentifier)
)
.filter {
case (_, upstreamOpId) => resultStorage.contains(upstreamOpId)
}
.map {
case (sinkOpId, upstreamOpId) =>
(sinkOpId, resultStorage.get(upstreamOpId).get().toList)
}
.filter(terminalOpId => resultStorage.contains(terminalOpId))
.map(terminalOpId => terminalOpId -> resultStorage.get(terminalOpId).get().toList)
.toMap
completion.setDone()
}
Expand Down

0 comments on commit b603b8d

Please sign in to comment.