Skip to content
This repository has been archived by the owner on Jan 9, 2023. It is now read-only.

Commit

Permalink
Merge pull request #110 from caskdata/fix/update-apps
Browse files Browse the repository at this point in the history
(CDAP-5254) Updated apps to use the new Spark API and to package guav…
  • Loading branch information
bdmogal committed Apr 19, 2016
2 parents 4358c22 + 245f1c6 commit 1ee46b7
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 39 deletions.
14 changes: 9 additions & 5 deletions MovieRecommender/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@
<app.main.class>co.cask.cdap.apps.movierecommender.MovieRecommenderApp</app.main.class>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<cdap.version>3.4.0-SNAPSHOT</cdap.version>
<spark.core.version>1.4.0</spark.core.version>
<spark.version>1.6.1</spark.version>
<slf4j.version>1.7.5</slf4j.version>
<guava.version>13.0.1</guava.version>
<gson.version>2.2.4</gson.version>
<scala.version>2.10.3</scala.version>
<mllib.version>1.4.0</mllib.version>
<hadoop.version>2.3.0</hadoop.version>
</properties>

Expand All @@ -59,10 +58,15 @@
</pluginRepositories>

<dependencies>
<dependency>
<groupId>co.cask.cdap</groupId>
<artifactId>cdap-api-spark</artifactId>
<version>${cdap.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.core.version}</version>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -136,7 +140,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>${mllib.version}</version>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -245,7 +249,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.14.1</version>
<configuration>
<argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
<argLine>-Xmx2048m -XX:MaxPermSize=256m -Djava.net.preferIPv4Stack=true</argLine>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,40 @@
package co.cask.cdap.apps.movierecommender

import co.cask.cdap.api.common.Bytes
import co.cask.cdap.api.spark.{ScalaSparkProgram, SparkContext}
import org.apache.hadoop.io.Text
import org.apache.spark.SparkContext._
import co.cask.cdap.api.spark.{SparkExecutionContext, SparkMain}
import org.apache.spark.SparkContext
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.spark.rdd.RDD
import org.slf4j.{Logger, LoggerFactory}

import scala.util.control.Exception._

/**
* Spark Program which makes recommendation for movies to users
*/
class RecommendationBuilder extends ScalaSparkProgram {
private final val LOG: Logger = LoggerFactory.getLogger(classOf[RecommendationBuilder])
class RecommendationBuilder extends SparkMain {
import RecommendationBuilder._

// SPARK-1006, SPARK-958, http://bugs.java.com/view_bug.do?bug_id=4152790. When running in local mode, num iterations
// cannot be more than 10 or it causes a StackOverflow error.
case class Params(
numIterations: Int = 20,
numIterations: Int = 10,
lambda: Double = 1.0,
rank: Int = 10,
numUserBlocks: Int = -1,
numProductBlocks: Int = -1,
implicitPrefs: Boolean = false)

override def run(sc: SparkContext) {
LOG.info("Running with arguments {}", sc.getRuntimeArguments.get("args"))
val params = parseArguments(sc, Params())
override def run(implicit sec: SparkExecutionContext) {
val sc = new SparkContext
LOG.info("Running with arguments {}", sec.getRuntimeArguments.get("args"))
val params = parseArguments(sec, Params())
LOG.info("Processing ratings data with parameters {}", params)

val ratingsDataset: NewHadoopRDD[Array[Byte], Text] = sc.readFromStream("ratingsStream", classOf[Text])
val userScores = ratingsDataset.values
val userScores: RDD[String] = sc.fromStream("ratingsStream")

val usRDD = userScores.map { e =>
val userScore = e.toString.split("::")
val userScore = e.split("::")
new UserScore(userScore(0).toInt, userScore(1).toInt, userScore(2).toInt)
}.cache()
val scores = usRDD.collect()
Expand All @@ -77,15 +78,13 @@ class RecommendationBuilder extends ScalaSparkProgram {
}
}.cache()

val originalContext: org.apache.spark.SparkContext = sc.getOriginalSparkContext.
asInstanceOf[org.apache.spark.SparkContext]
val parallelizedScores = originalContext.parallelize(scores)
val parallelizedScores = sc.parallelize(scores)

val scoresRDD = parallelizedScores.keyBy(x => Bytes.add(Bytes.toBytes(x.getUserID), Bytes.toBytes(x.getMovieID)))
sc.writeToDataset(scoresRDD, "ratings", classOf[Array[Byte]], classOf[UserScore])
val scoresRDD = parallelizedScores
.keyBy(x => Bytes.add(Bytes.toBytes(x.getUserID), Bytes.toBytes(x.getMovieID)))
.saveAsDataset("ratings")

val moviesDataset: NewHadoopRDD[Array[Byte], String] = sc.readFromDataset("movies", classOf[Array[Byte]],
classOf[String])
val moviesDataset: RDD[(Array[Byte], String)] = sc.fromDataset("movies")

val numRatings = ratingData.count()
val numUsers = ratingData.map(_.user).distinct().count()
Expand Down Expand Up @@ -115,22 +114,20 @@ class RecommendationBuilder extends ScalaSparkProgram {
val notRatedMovies = userRatedMovies.map(x => (x._1, movies.keys.filter(!x._2.contains(_)).toSeq)).collect()

for (curUser <- notRatedMovies) {
var nr = originalContext.parallelize(curUser._2)
var recom = originalContext.parallelize(model.predict(nr.map((curUser._1, _)))
val nr = sc.parallelize(curUser._2)
sc.parallelize(model.predict(nr.map((curUser._1, _)))
.collect().sortBy(-_.rating).take(20))

var recomRDD = recom.keyBy(x => Bytes.add(Bytes.toBytes(x.user), Bytes.toBytes(x.product))).
map(x => (x._1, new UserScore(x._2.user, x._2.product, x._2.rating.toInt)))

sc.writeToDataset(recomRDD, "recommendations", classOf[Array[Byte]], classOf[UserScore])
.keyBy(x => Bytes.add(Bytes.toBytes(x.user), Bytes.toBytes(x.product)))
.map(x => (x._1, new UserScore(x._2.user, x._2.product, x._2.rating.toInt)))
.saveAsDataset("recommendations")
}

LOG.debug("Stored predictions in dataset. Done!")
}

/** Parse runtime arguments */
def parseArguments(sc: SparkContext, defaultParams: Params): Params = {
val arguments: String = sc.getRuntimeArguments.get("args")
def parseArguments(sec: SparkExecutionContext, defaultParams: Params): Params = {
val arguments: String = sec.getRuntimeArguments.get("args")
val args: Array[String] = if (arguments == null) Array() else arguments.split("\\s")


Expand All @@ -150,3 +147,7 @@ class RecommendationBuilder extends ScalaSparkProgram {

def getBoolean(args: Array[String], idx: Int): Option[Boolean] = catching(classOf[Exception]).opt(args(idx).toBoolean)
}

object RecommendationBuilder {
val LOG: Logger = LoggerFactory.getLogger(classOf[RecommendationBuilder])
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import co.cask.cdap.test.SparkManager;
import co.cask.cdap.test.StreamManager;
import co.cask.cdap.test.TestBase;
import co.cask.cdap.test.TestConfiguration;
import co.cask.common.http.HttpRequest;
import co.cask.common.http.HttpRequests;
import co.cask.common.http.HttpResponse;
import com.google.common.base.Charsets;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,6 +48,9 @@ public class MovieRecommenderAppTest extends TestBase {
private static final Gson GSON = new Gson();
private static final Logger LOG = LoggerFactory.getLogger(MovieRecommenderAppTest.class);

@ClassRule
public static final TestConfiguration TEST_CONFIG = new TestConfiguration("explore.enabled", false);

@Test
public void testRecommendation() throws Exception {

Expand Down
11 changes: 6 additions & 5 deletions Netlens/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@
<version>${async.http.client.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<!-- Histo math Dependency -->
<dependency>
<groupId>org.apache.commons</groupId>
Expand Down Expand Up @@ -116,11 +122,6 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
Expand Down

0 comments on commit 1ee46b7

Please sign in to comment.