diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index 9144fa63d4..2eb645c2e2 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -67,66 +67,6 @@ class WorkflowCompiler( .foldLeft(physicalPlan) { (plan, link) => plan.addLink(link) } } }) - - // assign the sinks to toAddSink operators' external output ports - subPlan - .topologicalIterator() - .map(subPlan.getOperator) - .flatMap { physicalOp => - physicalOp.outputPorts.map(outputPort => (physicalOp, outputPort)) - } - .filter({ - case (physicalOp, (_, (outputPort, _, _))) => - toAddSink.contains(physicalOp.id.logicalOpId) && !outputPort.id.internal - }) - .foreach({ - case (physicalOp, (_, (outputPort, _, schema))) => - val storage = ResultStorage.getOpResultStorage(context.workflowId) - val storageKey = physicalOp.id.logicalOpId - - // due to the size limit of single document in mongoDB (16MB) - // for sinks visualizing HTMLs which could possibly be large in size, we always use the memory storage. - val storageType = { - if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY - else OpResultStorage.defaultStorageMode - } - if (!storage.contains(storageKey)) { - // get the schema for result storage in certain mode - val sinkStorageSchema: Option[Schema] = - if (storageType == OpResultStorage.MONGODB) { - // use the output schema on the first output port as the schema for storage - Some(schema.right.get) - } else { - None - } - storage.create( - s"${context.executionId}_", - storageKey, - storageType, - sinkStorageSchema - ) - // add the sink collection name to the JSON array of sinks - val storageNode = objectMapper.createObjectNode() - storageNode.put("storageType", storageType) - storageNode.put("storageKey", s"${context.executionId}_$storageKey") - sinksPointers.add(storageNode) - } - - val sinkPhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp( - context.workflowId, - context.executionId, - storageKey.id, - outputPort.mode - ) - val sinkLink = - PhysicalLink( - physicalOp.id, - outputPort.id, - sinkPhysicalOp.id, - PortIdentity(internal = true) - ) - physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink) - }) } match { case Success(_) => @@ -138,6 +78,66 @@ class WorkflowCompiler( } ) + // assign the sinks to toAddSink operators' external output ports + physicalPlan + .topologicalIterator() + .map(physicalPlan.getOperator) + .flatMap { physicalOp => + physicalOp.outputPorts.map(outputPort => (physicalOp, outputPort)) + } + .filter({ + case (physicalOp, (_, (outputPort, _, _))) => + toAddSink.contains(physicalOp.id.logicalOpId) && !outputPort.id.internal + }) + .foreach({ + case (physicalOp, (_, (outputPort, _, schema))) => + val storage = ResultStorage.getOpResultStorage(context.workflowId) + val storageKey = physicalOp.id.logicalOpId + + // due to the size limit of single document in mongoDB (16MB) + // for sinks visualizing HTMLs which could possibly be large in size, we always use the memory storage. + val storageType = { + if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY + else OpResultStorage.defaultStorageMode + } + if (!storage.contains(storageKey)) { + // get the schema for result storage in certain mode + val sinkStorageSchema: Option[Schema] = + if (storageType == OpResultStorage.MONGODB) { + // use the output schema on the first output port as the schema for storage + Some(schema.right.get) + } else { + None + } + storage.create( + s"${context.executionId}_", + storageKey, + storageType, + sinkStorageSchema + ) + // add the sink collection name to the JSON array of sinks + val storageNode = objectMapper.createObjectNode() + storageNode.put("storageType", storageType) + storageNode.put("storageKey", s"${context.executionId}_$storageKey") + sinksPointers.add(storageNode) + } + + val sinkPhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp( + context.workflowId, + context.executionId, + storageKey.id, + outputPort.mode + ) + val sinkLink = + PhysicalLink( + physicalOp.id, + outputPort.id, + sinkPhysicalOp.id, + PortIdentity(internal = true) + ) + physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink) + }) + // update execution entry in MySQL to have pointers to the mongo collections resultsJSON.set("results", sinksPointers) ExecutionsMetadataPersistService.tryUpdateExistingExecution(context.executionId) {