From 41576000317ef30b79a730f7cd5dd69ee257978e Mon Sep 17 00:00:00 2001 From: hlin09 Date: Fri, 20 Mar 2015 15:52:40 -0400 Subject: [PATCH 1/3] Fix 237 by including private function checks in package namespaces. --- pkg/R/utils.R | 10 +++++----- pkg/inst/tests/test_utils.R | 9 ++++++--- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/R/utils.R b/pkg/R/utils.R index 9edc22c97..a09d1fa0a 100644 --- a/pkg/R/utils.R +++ b/pkg/R/utils.R @@ -349,13 +349,13 @@ processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) { topEnv <- parent.env(.GlobalEnv) # Search in function environment, and function's enclosing environments # up to global environment. There is no need to look into package environments - # above the global or namespace environment that is not SparkR below the global, - # as they are assumed to be loaded on workers. + # above the global, as they are assumed to be loaded on workers. while (!identical(func.env, topEnv)) { - # Namespaces other than "SparkR" will not be searched. + # Only examine functions in non-namespace environments or private functions in + # package namespaces. if (!isNamespace(func.env) || - (getNamespaceName(func.env) == "SparkR" && - !(nodeChar %in% getNamespaceExports("SparkR")))) { # Only include SparkR internals. + !(nodeChar %in% getNamespaceExports(getNamespaceName(func.env))) + ) { # Set parameter 'inherits' to FALSE since we do not need to search in # attached package environments. if (tryCatch(exists(nodeChar, envir = func.env, inherits = FALSE), diff --git a/pkg/inst/tests/test_utils.R b/pkg/inst/tests/test_utils.R index e9582db9d..f92e1d126 100644 --- a/pkg/inst/tests/test_utils.R +++ b/pkg/inst/tests/test_utils.R @@ -89,12 +89,15 @@ test_that("cleanClosure on R functions", { # Test for function (and variable) definitions. f <- function(x) { + privateCallRes <- unlist(convertJListToRList(x)) # Call package private functions. g <- function(y) { y * 2 } - g(x) + g(privateCallRes) } newF <- cleanClosure(f) env <- environment(newF) - expect_equal(length(ls(env)), 0) # "y" and "g" should not be included. + expect_equal(ls(env), "convertJListToRList") # Only "convertJListToRList". No "y" or "g". + actual <- get("convertJListToRList", envir = env, inherits = FALSE) + expect_equal(actual, convertJListToRList) # Test for overriding variables in base namespace (Issue: SparkR-196). nums <- as.list(1:10) @@ -113,7 +116,7 @@ test_that("cleanClosure on R functions", { a <- matrix(nrow=10, ncol=10, data=rnorm(100)) aBroadcast <- broadcast(sc, a) normMultiply <- function(x) { norm(aBroadcast$value) * x } - newnormMultiply <- SparkR:::cleanClosure(normMultiply) + newnormMultiply <- cleanClosure(normMultiply) env <- environment(newnormMultiply) expect_equal(ls(env), "aBroadcast") expect_equal(get("aBroadcast", envir = env, inherits = FALSE), aBroadcast) From be2a935e1208c3e0371a5389e34e1b9c14b06ee5 Mon Sep 17 00:00:00 2001 From: hlin09 Date: Sat, 21 Mar 2015 20:03:27 -0400 Subject: [PATCH 2/3] More fixes on 237 and 238. --- pkg/R/RDD.R | 11 ++--------- pkg/R/utils.R | 21 ++++++++++++++------- pkg/inst/tests/test_utils.R | 33 +++++++++++++++++++++++++-------- pkg/inst/worker/worker.R | 6 +++++- 4 files changed, 46 insertions(+), 25 deletions(-) diff --git a/pkg/R/RDD.R b/pkg/R/RDD.R index fe9d69298..d1ae82c2e 100644 --- a/pkg/R/RDD.R +++ b/pkg/R/RDD.R @@ -123,17 +123,13 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"), return(rdd@env$jrdd_val) } - computeFunc <- function(split, part) { - rdd@func(split, part) - } - packageNamesArr <- serialize(.sparkREnv[[".packages"]], connection = NULL) broadcastArr <- lapply(ls(.broadcastNames), function(name) { get(name, .broadcastNames) }) - serializedFuncArr <- serialize(computeFunc, connection = NULL) + serializedFuncArr <- serialize(rdd@func, connection = NULL) prev_jrdd <- rdd@prev_jrdd @@ -530,10 +526,7 @@ setMethod("lapplyPartitionsWithIndex", signature(X = "RDD", FUN = "function"), function(X, FUN) { FUN <- cleanClosure(FUN) - closureCapturingFunc <- function(split, part) { - FUN(split, part) - } - PipelinedRDD(X, closureCapturingFunc) + PipelinedRDD(X, FUN) }) #' @rdname lapplyPartitionsWithIndex diff --git a/pkg/R/utils.R b/pkg/R/utils.R index a09d1fa0a..2804eeaca 100644 --- a/pkg/R/utils.R +++ b/pkg/R/utils.R @@ -371,14 +371,21 @@ processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) { break } # Function has not been examined, record it and recursively clean its closure. - assign(nodeChar, - if (is.null(funcList[[1]])) { - list(obj) - } else { - append(funcList, obj) - }, - envir = checkedFuncs) + newFuncList <- if (is.null(funcList[[1]])) { + list(obj) + } else { + append(funcList, obj) + } + assign(nodeChar, newFuncList, envir = checkedFuncs) obj <- cleanClosure(obj, checkedFuncs) + parent.env(environment(obj)) <- newEnv + # Remove examined functions. + newFuncList[[length(newFuncList)]] <- NULL + if (length(newFuncList) > 0) { + assign(nodeChar, newFuncList, envir = checkedFuncs) + } else { + remove(list = nodeChar, envir = checkedFuncs) + } } assign(nodeChar, obj, envir = newEnv) break diff --git a/pkg/inst/tests/test_utils.R b/pkg/inst/tests/test_utils.R index f92e1d126..5d1485c63 100644 --- a/pkg/inst/tests/test_utils.R +++ b/pkg/inst/tests/test_utils.R @@ -66,38 +66,55 @@ test_that("cleanClosure on R functions", { field <- matrix(2) defUse <- 3 g <- function(x) { x + y } + h <- function(x) { f(x) } f <- function(x) { defUse <- base::as.integer(x) + 1 # Test for access operators `::`. lapply(x, g) + 1 # Test for capturing function call "g"'s closure as a argument of lapply. l$field[1,1] <- 3 # Test for access operators `$`. res <- defUse + l$field[1,] # Test for def-use chain of "defUse", and "" symbol. f(res) # Test for recursive calls. + h(res) # f should be examined again in h's env. More recursive call f -> h -> f ... } newF <- cleanClosure(f) env <- environment(newF) - expect_equal(length(ls(env)), 3) # Only "g", "l" and "f". No "base", "field" or "defUse". + expect_equal(length(ls(env)), 4) # Only "g", "l", "f" and "h". No "base", "field" or "defUse". expect_true("g" %in% ls(env)) expect_true("l" %in% ls(env)) expect_true("f" %in% ls(env)) + expect_true("h" %in% ls(env)) expect_equal(get("l", envir = env, inherits = FALSE), l) - # "y" should be in the environemnt of g. newG <- get("g", envir = env, inherits = FALSE) + newH <- get("h", envir = env, inherits = FALSE) + # "y" should be in the environemnt of g. env <- environment(newG) - expect_equal(length(ls(env)), 1) + expect_equal(ls(env), "y") actual <- get("y", envir = env, inherits = FALSE) expect_equal(actual, y) + # "f" should be in h's env. + env <- environment(newH) + expect_equal(ls(env), "f") + actual <- get("f", envir = env, inherits = FALSE) + expect_equal(actual, f) # Test for function (and variable) definitions. - f <- function(x) { - privateCallRes <- unlist(convertJListToRList(x)) # Call package private functions. + f <- function(x, y) { + privateCallRes <- unlist(joinTaggedList(x, y)) # Call package private functions. g <- function(y) { y * 2 } g(privateCallRes) } newF <- cleanClosure(f) env <- environment(newF) - expect_equal(ls(env), "convertJListToRList") # Only "convertJListToRList". No "y" or "g". - actual <- get("convertJListToRList", envir = env, inherits = FALSE) - expect_equal(actual, convertJListToRList) + expect_equal(ls(env), "joinTaggedList") # Only "joinTaggedList". No "y" or "g". + actual <- get("joinTaggedList", envir = env, inherits = FALSE) + expect_equal(actual, joinTaggedList) + env <- environment(actual) + # Private "genCompactLists" and "mergeCompactLists" are called by "joinTaggedList". + expect_true("genCompactLists" %in% ls(env)) + expect_true("mergeCompactLists" %in% ls(env)) + actual <- get("genCompactLists", envir = env, inherits = FALSE) + expect_equal(actual, genCompactLists) + actual <- get("mergeCompactLists", envir = env, inherits = FALSE) + expect_equal(actual, mergeCompactLists) # Test for overriding variables in base namespace (Issue: SparkR-196). nums <- as.list(1:10) diff --git a/pkg/inst/worker/worker.R b/pkg/inst/worker/worker.R index bb49cd84a..4b489941b 100644 --- a/pkg/inst/worker/worker.R +++ b/pkg/inst/worker/worker.R @@ -27,7 +27,11 @@ for (pkg in packageNames) { funcLen <- SparkR:::readInt(inputCon) computeFunc <- unserialize(SparkR:::readRawLen(inputCon, funcLen)) env <- environment(computeFunc) -parent.env(env) <- .GlobalEnv # Attach under global environment. +if (length(packageNames) > 0) { + parent.env(env) <- getNamespace(packageNames[1]) # Attach under package namespace environment. +} else { + parent.env(env) <- .GlobalEnv # Attach under global environment. +} # Read and set broadcast variables numBroadcastVars <- SparkR:::readInt(inputCon) From 93a3262202bd28d64ec4317e74fb294b7cdaf326 Mon Sep 17 00:00:00 2001 From: hlin09 Date: Tue, 24 Mar 2015 20:12:56 -0400 Subject: [PATCH 3/3] More fix on cleanClosure. --- pkg/NAMESPACE | 1 + pkg/R/RDD.R | 5 ++-- pkg/R/utils.R | 37 +++++++++++++++---------- pkg/inst/tests/test_utils.R | 8 ++++-- pkg/src/Makefile | 4 +-- pkg/src/Makefile.win | 4 +-- pkg/src/{string_hash_code.c => utils.c} | 19 +++++++++---- 7 files changed, 48 insertions(+), 30 deletions(-) rename pkg/src/{string_hash_code.c => utils.c} (87%) diff --git a/pkg/NAMESPACE b/pkg/NAMESPACE index 34548cdc0..da75c53f0 100644 --- a/pkg/NAMESPACE +++ b/pkg/NAMESPACE @@ -85,4 +85,5 @@ export("sparkR.init") export("sparkR.stop") export("print.jobj") useDynLib(SparkR, stringHashCode) +useDynLib(SparkR, getMissingArg) importFrom(methods, setGeneric, setMethod, setOldClass) diff --git a/pkg/R/RDD.R b/pkg/R/RDD.R index d1ae82c2e..f15e7f837 100644 --- a/pkg/R/RDD.R +++ b/pkg/R/RDD.R @@ -66,7 +66,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) { # This transformation is the first in its stage: - .Object@func <- func + .Object@func <- cleanClosure(func) .Object@prev_jrdd <- getJRDD(prev) .Object@env$prev_serializedMode <- prev@env$serializedMode # NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD @@ -75,7 +75,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) pipelinedFunc <- function(split, iterator) { func(split, prev@func(split, iterator)) } - .Object@func <- pipelinedFunc + .Object@func <- cleanClosure(pipelinedFunc) .Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline # Get the serialization mode of the parent RDD .Object@env$prev_serializedMode <- prev@env$prev_serializedMode @@ -525,7 +525,6 @@ setMethod("mapPartitions", setMethod("lapplyPartitionsWithIndex", signature(X = "RDD", FUN = "function"), function(X, FUN) { - FUN <- cleanClosure(FUN) PipelinedRDD(X, FUN) }) diff --git a/pkg/R/utils.R b/pkg/R/utils.R index 2804eeaca..323f17e0c 100644 --- a/pkg/R/utils.R +++ b/pkg/R/utils.R @@ -291,52 +291,54 @@ numToInt <- function(num) { # param # node The current AST node in the traversal. # oldEnv The original function environment. +# newEnv A new function environment to store necessary function dependencies, an output argument. # defVars An Accumulator of variables names defined in the function's calling environment, # including function argument and local variable names. # checkedFunc An environment of function objects examined during cleanClosure. It can # be considered as a "name"-to-"list of functions" mapping. -# newEnv A new function environment to store necessary function dependencies, an output argument. -processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) { +processClosure <- function(node, newEnv, oldEnv = environment(), defVars = initAccumulator(), + checkedFuncs = new.env()) { nodeLen <- length(node) if (nodeLen > 1 && typeof(node) == "language") { # Recursive case: current AST node is an internal node, check for its children. if (length(node[[1]]) > 1) { for (i in 1:nodeLen) { - processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv) + processClosure(node[[i]], newEnv, oldEnv, defVars, checkedFuncs) } } else { # if node[[1]] is length of 1, check for some R special functions. nodeChar <- as.character(node[[1]]) if (nodeChar == "{" || nodeChar == "(") { # Skip start symbol. for (i in 2:nodeLen) { - processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv) + processClosure(node[[i]], newEnv, oldEnv, defVars, checkedFuncs) } } else if (nodeChar == "<-" || nodeChar == "=" || nodeChar == "<<-") { # Assignment Ops. + for (i in 3:nodeLen) { + processClosure(node[[i]], newEnv, oldEnv, defVars, checkedFuncs) + } defVar <- node[[2]] if (length(defVar) == 1 && typeof(defVar) == "symbol") { # Add the defined variable name into defVars. addItemToAccumulator(defVars, as.character(defVar)) } else { - processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv) - } - for (i in 3:nodeLen) { - processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv) + processClosure(node[[2]], newEnv, oldEnv, defVars, checkedFuncs) } } else if (nodeChar == "function") { # Function definition. # Add parameter names. newArgs <- names(node[[2]]) lapply(newArgs, function(arg) { addItemToAccumulator(defVars, arg) }) for (i in 3:nodeLen) { - processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv) + processClosure(node[[i]], newEnv, oldEnv, defVars, checkedFuncs) } + defVars$counter <- defVars$counter - length(newArgs) } else if (nodeChar == "$") { # Skip the field. - processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv) + processClosure(node[[2]], newEnv, oldEnv, defVars, checkedFuncs) } else if (nodeChar == "::" || nodeChar == ":::") { - processClosure(node[[3]], oldEnv, defVars, checkedFuncs, newEnv) + processClosure(node[[3]], newEnv, oldEnv, defVars, checkedFuncs) } else { for (i in 1:nodeLen) { - processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv) + processClosure(node[[i]], newEnv, oldEnv, defVars, checkedFuncs) } } } @@ -344,7 +346,7 @@ processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) { (typeof(node) == "symbol" || typeof(node) == "language")) { # Base case: current AST node is a leaf node and a symbol or a function call. nodeChar <- as.character(node) - if (!nodeChar %in% defVars$data) { # Not a function parameter or local variable. + if (!nodeChar %in% defVars$data[1:defVars$counter]) { # Not a function parameter or local variable. func.env <- oldEnv topEnv <- parent.env(.GlobalEnv) # Search in function environment, and function's enclosing environments @@ -360,7 +362,12 @@ processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) { # attached package environments. if (tryCatch(exists(nodeChar, envir = func.env, inherits = FALSE), error = function(e) { FALSE })) { - obj <- get(nodeChar, envir = func.env, inherits = FALSE) + obj <- tryCatch(get(nodeChar, envir = func.env, inherits = FALSE), + error = function(e) { print(e); .Call("getMissingArg") }) + if (missing(obj)) { + assign(nodeChar, .Call("getMissingArg"), envir = newEnv) + break + } if (is.function(obj)) { # If the node is a function call. funcList <- mget(nodeChar, envir = checkedFuncs, inherits = F, ifnotfound = list(list(NULL)))[[1]] @@ -421,7 +428,7 @@ cleanClosure <- function(func, checkedFuncs = new.env()) { addItemToAccumulator(defVars, argNames[i]) } # Recursively examine variables in the function body. - processClosure(func.body, oldEnv, defVars, checkedFuncs, newEnv) + processClosure(func.body, newEnv, oldEnv, defVars, checkedFuncs) environment(func) <- newEnv } func diff --git a/pkg/inst/tests/test_utils.R b/pkg/inst/tests/test_utils.R index 5d1485c63..52f33eb08 100644 --- a/pkg/inst/tests/test_utils.R +++ b/pkg/inst/tests/test_utils.R @@ -97,14 +97,18 @@ test_that("cleanClosure on R functions", { expect_equal(actual, f) # Test for function (and variable) definitions. + z <- c(1, 2) f <- function(x, y) { privateCallRes <- unlist(joinTaggedList(x, y)) # Call package private functions. - g <- function(y) { y * 2 } + g <- function(y, z) { y * 2 } + z <- z * 2 # Write after read. g(privateCallRes) } newF <- cleanClosure(f) env <- environment(newF) - expect_equal(ls(env), "joinTaggedList") # Only "joinTaggedList". No "y" or "g". + expect_equal(length(ls(env)), 2) # Only "joinTaggedList" and "z". No "y" or "g". + expect_true("joinTaggedList" %in% ls(env)) + expect_true("z" %in% ls(env)) actual <- get("joinTaggedList", envir = env, inherits = FALSE) expect_equal(actual, joinTaggedList) env <- environment(actual) diff --git a/pkg/src/Makefile b/pkg/src/Makefile index 5e8a77cd4..7bf02a09c 100644 --- a/pkg/src/Makefile +++ b/pkg/src/Makefile @@ -41,8 +41,8 @@ $(MAVEN_TARGET_NAME): pom.xml $(SCALA_FILES) $(RESOURCE_FILES) mvn -Dhadoop.version=$(SPARK_HADOOP_VERSION) -Dspark.version=$(SPARK_VERSION) -DskipTests $(MAVEN_YARN_FLAG) -Dyarn.version=$(SPARK_YARN_VERSION) clean package shade:shade cp -f $(MAVEN_TARGET_NAME) ../inst/$(JAR_NAME) -sharelib: string_hash_code.c - R CMD SHLIB -o SparkR.so string_hash_code.c +sharelib: utils.c + R CMD SHLIB -o SparkR.so utils.c clean: $(BUILD_TOOL) clean diff --git a/pkg/src/Makefile.win b/pkg/src/Makefile.win index c2f4b367a..a3d723af1 100644 --- a/pkg/src/Makefile.win +++ b/pkg/src/Makefile.win @@ -24,8 +24,8 @@ $(MAVEN_TARGET_NAME): $(SCALA_FILES) $(RESOURCE_FILES) mvn.bat -Dhadoop.version=$(SPARK_HADOOP_VERSION) -Dspark.version=$(SPARK_VERSION) -Dyarn.version=$(SPARK_YARN_VERSION) -DskipTests clean package shade:shade cp -f $(MAVEN_TARGET_NAME) ../inst/$(JAR_NAME) -sharelib: string_hash_code.c - R CMD SHLIB -o SparkR.dll string_hash_code.c +sharelib: utils.c + R CMD SHLIB -o SparkR.dll utils.c clean: mvn.bat clean diff --git a/pkg/src/string_hash_code.c b/pkg/src/utils.c similarity index 87% rename from pkg/src/string_hash_code.c rename to pkg/src/utils.c index 831c863fc..f1a03e3a8 100644 --- a/pkg/src/string_hash_code.c +++ b/pkg/src/utils.c @@ -1,9 +1,3 @@ -/* - * A C function for R extension which implements the Java String hash algorithm. - * Refer to http://en.wikipedia.org/wiki/Java_hashCode%28%29#The_java.lang.String_hash_function - * - */ - #include #include @@ -12,6 +6,11 @@ #define IS_SCALAR(x, type) (TYPEOF(x) == (type) && XLENGTH(x) == 1) #endif +/* + * A C function for R extension which implements the Java String hash algorithm. + * Refer to http://en.wikipedia.org/wiki/Java_hashCode%28%29#The_java.lang.String_hash_function + * + */ SEXP stringHashCode(SEXP string) { const char* str; R_xlen_t len, i; @@ -30,3 +29,11 @@ SEXP stringHashCode(SEXP string) { return ScalarInteger(hashCode); } + +/* + * Get the value of missing argument symbol. + */ +SEXP getMissingArg() { + return R_MissingArg; +} +