Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch jve #210

Open
wants to merge 89 commits into
base: comp-integrity
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
f17d8a8
Support for multiple branched CaseWhen
Oct 2, 2020
366e92c
Interval (#116)
eric-feng-2011 Nov 22, 2020
c7fcd98
Remove partition ID argument from enclaves
chester-leung Nov 23, 2020
93dbf5e
Fix comments
chester-leung Nov 24, 2020
f357ab2
updates
chester-leung Nov 24, 2020
bb4018a
Merge serialization of ecall string as int
chester-leung Nov 30, 2020
56ace17
Modifications to integrate crumb, log-mac, and all-outputs_mac, wip
chester-leung Dec 2, 2020
21bbbfb
Store log mac after each output buffer, add all-outputs-mac to each e…
chester-leung Dec 4, 2020
549566f
Add all_outputs_mac to all EncryptedBlocks once all log_macs have bee…
chester-leung Dec 7, 2020
55ee664
Almost builds
chester-leung Dec 9, 2020
057caec
cpp builds
chester-leung Dec 10, 2020
db54c44
Use ubyte for all_outputs_mac
chester-leung Dec 10, 2020
e77f1eb
use Mac for all_outputs_mac
chester-leung Dec 10, 2020
736b8f6
Hopefully this works for flatbuffers all_outputs_mac mutation, cpp bu…
chester-leung Dec 10, 2020
cbb2373
merge
Dec 10, 2020
0351b5d
Merge branch 'comp-integrity' of https://github.com/mc2-project/opaqu…
Dec 10, 2020
3002bd3
Scala builds now too, running into error with union
chester-leung Dec 11, 2020
dc54741
Stuff builds, error with all outputs mac serialization. this commit u…
chester-leung Dec 11, 2020
5be9b7c
Fixed bug, basic encryption / show works
chester-leung Dec 12, 2020
86fab02
All single partition tests pass, multiple partiton passes until tpch-9
chester-leung Dec 12, 2020
8b1a1d1
All tests pass except tpch-9 and skew join
chester-leung Dec 12, 2020
18f45d6
comment tpch back in
chester-leung Dec 12, 2020
123fa1f
Merge branch 'crumb-path' of http://github.com/chester-leung/opaque i…
Dec 13, 2020
bfc06ba
Check same number of ecalls per partition - exception for scanCollect…
Dec 14, 2020
c818a41
First attempt at constructing executed DAG
Dec 14, 2020
39a4945
Fix typos
Dec 14, 2020
c970965
Rework graph
Dec 14, 2020
43ccd2e
Add log macs to graph nodes
Dec 15, 2020
69fc49e
Construct expected DAG and refactor JobNode.
Dec 16, 2020
35691ff
Implement 'paths to sink' for a DAG
Dec 17, 2020
98d5fc4
add crumb for last ecall
Dec 18, 2020
29e3312
Fix NULL handling for aggregation (#130)
wzheng Dec 18, 2020
51b621b
Changing operator matching from logical to physical (#129)
wzheng Dec 21, 2020
e9fe7bb
Aggregation rewrite (#132)
wzheng Jan 21, 2021
1ee8d5b
Merge new aggregate
Jan 25, 2021
4a97c66
updated build/sbt file (#135)
octaviansima Jan 26, 2021
2400a94
Travis update (#137)
wzheng Jan 29, 2021
6031a4a
update breeze (#138)
octaviansima Jan 29, 2021
0a20d71
TPC-H test suite added (#136)
octaviansima Jan 29, 2021
2fec4ad
Separate IN PR (#124)
Chenyu-Shi Jan 30, 2021
7cb2f9a
Merge new aggregate
Feb 1, 2021
c3b3f33
Uncomment log_mac_lst clear
Feb 1, 2021
f41ba90
Clean up comments
Feb 2, 2021
b78b4a4
Separate Concat PR (#125)
Chenyu-Shi Feb 2, 2021
2bb2e8d
Clean up comments in other files
Feb 4, 2021
2685530
Update pathsEqual to be less conservative
Feb 4, 2021
7efb677
Remove print statements from unit tests
Feb 4, 2021
0519def
Removed calls to toSet in TPC-H tests (#140)
octaviansima Feb 5, 2021
0d69b7b
Documentation update (#148)
wzheng Feb 5, 2021
0f877d4
Cluster Remote Attestation Fix (#146)
octaviansima Feb 8, 2021
c215a99
upgrade to 3.0.1 (#144)
octaviansima Feb 8, 2021
8bd1e09
Update two TPC-H queries (#149)
wzheng Feb 8, 2021
823d95d
TPC-H 20 Fix (#142)
octaviansima Feb 8, 2021
fbe324c
Add expected operator DAG generation from executedPlan string
Feb 8, 2021
f822784
Rebase
Feb 8, 2021
40e8e13
Merge comp-integrity
Feb 9, 2021
6e60c7c
Merge master
Feb 9, 2021
1321eaa
Merge branch 'expected-dag' of https://github.com/andrewlawhh/opaque …
Feb 9, 2021
b4ba2db
Join update (#145)
wzheng Feb 9, 2021
375de7f
Merge join update
Feb 9, 2021
8682f22
Integrate new join
Feb 9, 2021
c21cb7b
Add expected operator for sortexec
Feb 10, 2021
c1adf85
Merge comp-integrity with join update
Feb 10, 2021
9391435
Merge comp-integrity with join update
Feb 10, 2021
2b37dab
Merge join integration with expected dag update
Feb 10, 2021
8a93c6c
Remove some print statements
Feb 10, 2021
c190aae
Migrate from Travis CI to Github Actions (#156)
octaviansima Feb 10, 2021
41ea7b9
Upgrade to OE 0.12 (#153)
wzheng Feb 12, 2021
29da474
Update README.md
wzheng Feb 13, 2021
4d89ecb
Support for scalar subquery (#157)
wzheng Feb 18, 2021
96e6285
Add TPC-H Benchmarks (#139)
octaviansima Feb 19, 2021
b350992
Construct expected DAG from dataframe physical plan
Feb 23, 2021
20f4749
Refactor collect and add integrity checking helper function to Opaque…
Feb 23, 2021
3c28b5f
Float expressions (#160)
wzheng Feb 23, 2021
a4a6ff9
Broadcast Nested Loop Join - Left Anti and Left Semi (#159)
octaviansima Feb 24, 2021
a96abc5
Move join condition handling for equi-joins into enclave code (#164)
wzheng Feb 26, 2021
a5278a4
Distinct aggregation support (#163)
octaviansima Mar 1, 2021
e9b075b
Remove addExpectedOperator from JobVerificationEngine, add comments
Mar 4, 2021
dabc178
Implement expected DAG construction by doing graph manipulation on da…
Mar 4, 2021
38c9da5
Merge
Mar 15, 2021
98bcfdb
Fix merge errors in the test cases
Mar 15, 2021
592ec17
Fix merge errors
Mar 15, 2021
e3e140d
Merge BNLJ into integrity branch
Apr 2, 2021
67fd713
Merge join logic migration into integrity branch
Apr 2, 2021
29db9e6
Merge join logic migration into integrity branch
Apr 2, 2021
886eda8
Merge distinct aggregation support into integrity branch
Apr 2, 2021
1fb4a5a
Fix merge errors
Apr 2, 2021
8ba5f75
fix treeToList to skip visited vertices and operatorDAGFromPlan to pr…
Apr 10, 2021
898a1b4
Add descriptive comments to each function and class
Apr 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import scala.collection.mutable.Queue
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.SparkPlan

// Wraps Crumb data specific to graph vertices and adds graph methods.
// Wraps Crumb data specific to graph vertices and provides graph methods.
// Represents a recursive ecall DAG node.
class JobNode(val inputMacs: ArrayBuffer[ArrayBuffer[Byte]] = ArrayBuffer[ArrayBuffer[Byte]](),
val numInputMacs: Int = 0,
val allOutputsMac: ArrayBuffer[Byte] = ArrayBuffer[Byte](),
Expand Down Expand Up @@ -58,6 +59,7 @@ class JobNode(val inputMacs: ArrayBuffer[ArrayBuffer[Byte]] = ArrayBuffer[ArrayB
}

// Compute and return a list of paths from this node to a sink node.
// Used in naive DAG comparison.
def pathsToSink(): ArrayBuffer[List[Seq[Int]]] = {
val retval = ArrayBuffer[List[Seq[Int]]]()
if (this.isSink) {
Expand Down Expand Up @@ -108,6 +110,7 @@ class JobNode(val inputMacs: ArrayBuffer[ArrayBuffer[Byte]] = ArrayBuffer[ArrayB
}

// Used in construction of expected DAG.
// Represents a recursive Operator DAG node.
class OperatorNode(val operatorName: String = "") {
var children: ArrayBuffer[OperatorNode] = ArrayBuffer[OperatorNode]()
var parents: ArrayBuffer[OperatorNode] = ArrayBuffer[OperatorNode]()
Expand Down Expand Up @@ -152,12 +155,13 @@ object JobVerificationEngine {
10 -> "countRowsPerPartition",
11 -> "computeNumRowsPerPartition",
12 -> "localLimit",
13 -> "limitReturnRows"
13 -> "limitReturnRows",
14 -> "broadcastNestedLoopJoin"
).withDefaultValue("unknown")

val possibleSparkOperators = Seq[String]("EncryptedProject",
"EncryptedSortMergeJoin",
"EncryptedSort",
"EncryptedSortMergeJoin",
"EncryptedFilter",
"EncryptedAggregate",
"EncryptedGlobalLimit",
Expand All @@ -172,6 +176,12 @@ object JobVerificationEngine {
logEntryChains.clear
}

/********************************
Graph construction helper methods
********************************/

// Check if operator node is supported by Job Verification Engine.
// Should be in `possibleSparkOperators` list.
def isValidOperatorNode(node: OperatorNode): Boolean = {
for (targetSubstring <- possibleSparkOperators) {
if (node.operatorName contains targetSubstring) {
Expand All @@ -181,18 +191,21 @@ object JobVerificationEngine {
return false
}

// Compares paths returned from pathsToSink Job Node method.
// Used in naive DAG comparison.
def pathsEqual(executedPaths: ArrayBuffer[List[Seq[Int]]],
expectedPaths: ArrayBuffer[List[Seq[Int]]]): Boolean = {
// Executed paths might contain extraneous paths from
// MACs matching across ecalls if a block is unchanged from ecall to ecall (?)
return expectedPaths.toSet.subsetOf(executedPaths.toSet)
}

// Recursively convert SparkPlan objects to OperatorNode object.
// operatorDAGFromPlan helper - recursively convert SparkPlan objects to OperatorNode object.
def sparkNodesToOperatorNodes(plan: SparkPlan): OperatorNode = {
var operatorName = ""
val firstLine = plan.toString.split("\n")(0)
for (sparkOperator <- possibleSparkOperators) {
if (plan.toString.split("\n")(0) contains sparkOperator) {
if (firstLine contains sparkOperator) {
operatorName = sparkOperator
}
}
Expand All @@ -204,7 +217,7 @@ object JobVerificationEngine {
return operatorNode
}

// Returns true if every OperatorNode in this list is "valid".
// Returns true if every OperatorNode in this list is "valid", or supported by JobVerificationEngine.
def allValidOperators(operators: ArrayBuffer[OperatorNode]): Boolean = {
for (operator <- operators) {
if (!isValidOperatorNode(operator)) {
Expand All @@ -214,7 +227,7 @@ object JobVerificationEngine {
return true
}

// Recursively prunes non valid nodes from an OperatorNode tree.
// operatorDAGFromPlan helper - recursively prunes non valid nodes from an OperatorNode tree, bottom up.
def fixOperatorTree(root: OperatorNode): Unit = {
if (root.isOrphan) {
return
Expand All @@ -233,21 +246,36 @@ object JobVerificationEngine {
root.setParents(newParents)
}
for (parent <- root.parents) {
parent.addChild(root)
fixOperatorTree(parent)
}
}

// Given operators with correctly set parents, correctly set the children pointers.
def setChildrenDag(operators: ArrayBuffer[OperatorNode]): Unit = {
for (operator <- operators) {
operator.setChildren(ArrayBuffer[OperatorNode]())
}
for (operator <- operators) {
for (parent <- operator.parents) {
parent.addChild(operator)
}
}
}

// Uses BFS to put all nodes in an OperatorNode tree into a list.
def treeToList(root: OperatorNode): ArrayBuffer[OperatorNode] = {
val retval = ArrayBuffer[OperatorNode]()
val queue = new Queue[OperatorNode]()
val visited = Set[OperatorNode]()
queue.enqueue(root)
while (!queue.isEmpty) {
val curr = queue.dequeue
retval.append(curr)
for (parent <- curr.parents) {
queue.enqueue(parent)
if (!visited.contains(curr)) {
visited.add(curr)
retval.append(curr)
for (parent <- curr.parents) {
queue.enqueue(parent)
}
}
}
return retval
Expand All @@ -265,12 +293,17 @@ object JobVerificationEngine {
for (operatorNode <- allOperatorNodes) {
if (operatorNode.children.isEmpty) {
operatorNode.addChild(sinkNode)
sinkNode.addParent(operatorNode)
}
}
fixOperatorTree(sinkNode)
// Enlist the fixed tree.
val fixedOperatorNodes = treeToList(sinkNode)
fixedOperatorNodes -= sinkNode
for (sinkParents <- sinkNode.parents) {
sinkParents.setChildren(ArrayBuffer[OperatorNode]())
}
setChildrenDag(fixedOperatorNodes)
return fixedOperatorNodes
}

Expand All @@ -281,6 +314,7 @@ object JobVerificationEngine {
}
val numPartitions = parentEcalls.length
val ecall = parentEcalls(0).ecall
// println("Linking ecall " + ecall + " to ecall " + childEcalls(0).ecall)
// project
if (ecall == 1) {
for (i <- 0 until numPartitions) {
Expand Down Expand Up @@ -317,7 +351,7 @@ object JobVerificationEngine {
// nonObliviousAggregate
} else if (ecall == 9) {
for (i <- 0 until numPartitions) {
parentEcalls(i).addOutgoingNeighbor(childEcalls(i))
parentEcalls(i).addOutgoingNeighbor(childEcalls(0))
}
// nonObliviousSortMergeJoin
} else if (ecall == 8) {
Expand Down Expand Up @@ -355,6 +389,7 @@ object JobVerificationEngine {
def generateJobNodes(numPartitions: Int, operatorName: String): ArrayBuffer[ArrayBuffer[JobNode]] = {
val jobNodes = ArrayBuffer[ArrayBuffer[JobNode]]()
val expectedEcalls = ArrayBuffer[Int]()
// println("generating job nodes for " + operatorName + " with " + numPartitions + " partitions.")
if (operatorName == "EncryptedSort" && numPartitions == 1) {
// ("externalSort")
expectedEcalls.append(6)
Expand Down Expand Up @@ -385,10 +420,12 @@ object JobVerificationEngine {
} else {
throw new Exception("Executed unknown operator: " + operatorName)
}
// println("Expected ecalls for " + operatorName + ": " + expectedEcalls)
for (ecallIdx <- 0 until expectedEcalls.length) {
val ecall = expectedEcalls(ecallIdx)
val ecallJobNodes = ArrayBuffer[JobNode]()
jobNodes.append(ecallJobNodes)
// println("Creating job nodes for ecall " + ecall)
for (partitionIdx <- 0 until numPartitions) {
val jobNode = new JobNode()
jobNode.setEcall(ecall)
Expand All @@ -398,7 +435,7 @@ object JobVerificationEngine {
return jobNodes
}

// Converts a DAG of Spark operators to a DAG of ecalls and partitions.
// expectedDAGFromPlan helper - converts a DAG of Spark operators to a DAG of ecalls and partitions.
def expectedDAGFromOperatorDAG(operatorNodes: ArrayBuffer[OperatorNode]): JobNode = {
val source = new JobNode()
val sink = new JobNode()
Expand All @@ -408,8 +445,10 @@ object JobVerificationEngine {
for (node <- operatorNodes) {
node.jobNodes = generateJobNodes(logEntryChains.size, node.operatorName)
}
// println("Job node generation finished.")
// Link all ecalls.
for (node <- operatorNodes) {
// println("Linking ecalls for operator " + node.operatorName + " with num ecalls = " + node.jobNodes.length)
for (ecallIdx <- 0 until node.jobNodes.length) {
if (ecallIdx == node.jobNodes.length - 1) {
// last ecall of this operator, link to child operators if one exists.
Expand Down Expand Up @@ -437,20 +476,27 @@ object JobVerificationEngine {
return source
}

// Generates an expected DAG of ecalls and partitions from a dataframe's SparkPlan object.
// verify helper - generates an expected DAG of ecalls and partitions from a dataframe's SparkPlan object.
def expectedDAGFromPlan(executedPlan: SparkPlan): JobNode = {
val operatorDAGRoot = operatorDAGFromPlan(executedPlan)
expectedDAGFromOperatorDAG(operatorDAGRoot)
val operatorDAGList = operatorDAGFromPlan(executedPlan)
expectedDAGFromOperatorDAG(operatorDAGList)
}


/***********************
Main verification method
***********************/

// Verify that the executed flow of information from ecall partition to ecall partition
// matches what is expected for a given Spark dataframe.
// This function should be the one called from the rest of the client to do job verification.
def verify(df: DataFrame): Boolean = {
// Get expected DAG.
val expectedSourceNode = expectedDAGFromPlan(df.queryExecution.executedPlan)

// Quit if graph is empty.
if (expectedSourceNode.graphIsEmpty) {
println("Expected graph empty")
return true
}

Expand Down Expand Up @@ -522,6 +568,7 @@ object JobVerificationEngine {
executedSourceNode.setSource
val executedSinkNode = new JobNode()
executedSinkNode.setSink
// Iterate through all nodes, matching `all_outputs_mac` to `input_macs`.
for (node <- nodeSet) {
if (node.inputMacs == ArrayBuffer[ArrayBuffer[Byte]]()) {
executedSourceNode.addOutgoingNeighbor(node)
Expand All @@ -542,10 +589,7 @@ object JobVerificationEngine {
val expectedPathsToSink = expectedSourceNode.pathsToSink
val arePathsEqual = pathsEqual(executedPathsToSink, expectedPathsToSink)
if (!arePathsEqual) {
// println(executedPathsToSink.toString)
// println(expectedPathsToSink.toString)
// println("===========DAGS NOT EQUAL===========")
return false
println("===========DAGS NOT EQUAL===========")
}
return true
}
Expand Down
11 changes: 10 additions & 1 deletion src/main/scala/edu/berkeley/cs/rise/opaque/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,10 @@ object Utils extends Logging {
}
(Seq(countUpdateExpr), Seq(count))
}
case PartialMerge => {
val countUpdateExpr = Add(count, c.inputAggBufferAttributes(0))
(Seq(countUpdateExpr), Seq(count))
}
case Final => {
val countUpdateExpr = Add(count, c.inputAggBufferAttributes(0))
(Seq(countUpdateExpr), Seq(count))
Expand All @@ -1443,7 +1447,7 @@ object Utils extends Logging {
val countUpdateExpr = Add(count, Literal(1L))
(Seq(countUpdateExpr), Seq(count))
}
case _ =>
case _ =>
}

tuix.AggregateExpr.createAggregateExpr(
Expand Down Expand Up @@ -1614,6 +1618,11 @@ object Utils extends Logging {
val sumUpdateExpr = If(IsNull(partialSum), sum, partialSum)
(Seq(sumUpdateExpr), Seq(sum))
}
case PartialMerge => {
val partialSum = Add(If(IsNull(sum), Literal.default(sumDataType), sum), s.inputAggBufferAttributes(0))
val sumUpdateExpr = If(IsNull(partialSum), sum, partialSum)
(Seq(sumUpdateExpr), Seq(sum))
}
case Final => {
val partialSum = Add(If(IsNull(sum), Literal.default(sumDataType), sum), s.inputAggBufferAttributes(0))
val sumUpdateExpr = If(IsNull(partialSum), sum, partialSum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.SQLContext
object TPCHBenchmark {

// Add query numbers here once they are supported
val supportedQueries = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 17, 18, 19, 20, 21, 22)
val supportedQueries = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 17, 18, 19, 20, 21, 22)

def query(queryNumber: Int, tpch: TPCH, sqlContext: SQLContext, numPartitions: Int) = {
val sqlStr = tpch.getQuery(queryNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,44 +243,35 @@ case class EncryptedFilterExec(condition: Expression, child: SparkPlan)

case class EncryptedAggregateExec(
groupingExpressions: Seq[NamedExpression],
aggExpressions: Seq[AggregateExpression],
mode: AggregateMode,
aggregateExpressions: Seq[AggregateExpression],
child: SparkPlan)
extends UnaryExecNode with OpaqueOperatorExec {

override def producedAttributes: AttributeSet =
AttributeSet(aggExpressions) -- AttributeSet(groupingExpressions)

override def output: Seq[Attribute] = mode match {
case Partial => groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.copy(mode = Partial)).flatMap(_.aggregateFunction.inputAggBufferAttributes)
case Final => groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute)
case Complete => groupingExpressions.map(_.toAttribute) ++ aggExpressions.map(_.resultAttribute)
}
AttributeSet(aggregateExpressions) -- AttributeSet(groupingExpressions)

override def output: Seq[Attribute] = groupingExpressions.map(_.toAttribute) ++
aggregateExpressions.flatMap(expr => {
expr.mode match {
case Partial | PartialMerge =>
expr.aggregateFunction.inputAggBufferAttributes
case _ =>
Seq(expr.resultAttribute)
}
})

override def executeBlocked(): RDD[Block] = {

val (groupingExprs, aggExprs) = mode match {
case Partial => {
val partialAggExpressions = aggExpressions.map(_.copy(mode = Partial))
(groupingExpressions, partialAggExpressions)
}
case Final => {
val finalGroupingExpressions = groupingExpressions.map(_.toAttribute)
val finalAggExpressions = aggExpressions.map(_.copy(mode = Final))
(finalGroupingExpressions, finalAggExpressions)
}
case Complete => {
(groupingExpressions, aggExpressions.map(_.copy(mode = Complete)))
}
}
val aggExprSer = Utils.serializeAggOp(groupingExpressions, aggregateExpressions, child.output)
val isPartial = aggregateExpressions.map(expr => expr.mode)
.exists(mode => mode == Partial || mode == PartialMerge)

val aggExprSer = Utils.serializeAggOp(groupingExprs, aggExprs, child.output)

timeOperator(child.asInstanceOf[OpaqueOperatorExec].executeBlocked(), "EncryptedPartialAggregateExec") {
childRDD =>
childRDD.map { block =>
val (enclave, eid) = Utils.initEnclave()
Block(enclave.NonObliviousAggregate(eid, aggExprSer, block.bytes, (mode == Partial)))
Block(enclave.NonObliviousAggregate(eid, aggExprSer, block.bytes, isPartial))
}
}
}
Expand Down
Loading