Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fouleggs sample application #105

Merged
merged 16 commits into from
Aug 2, 2018
Merged
11 changes: 10 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,17 @@ lazy val sampleapp = (project in file("sampleapp"))
)
.dependsOn(adbms)

lazy val fouleggs = (project in file("fouleggs"))
.settings(
Common.commonSettings,
organization := Common.organization,
scalaVersion := Common.scalaVersion,
libraryDependencies ++= Dependencies.akkaActorDependencies
)
.dependsOn(adbms)

lazy val root = (project in file("."))
.aggregate(adbms, sampleapp)
.aggregate(adbms, sampleapp, fouleggs)
.settings(
version := Common.frameworkVersion,
scalaVersion := Common.scalaVersion
Expand Down
19 changes: 19 additions & 0 deletions fouleggs/src/de/up/hpi/informationsystems/fouleggs/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package de.up.hpi.informationsystems.fouleggs

import akka.actor.Terminated
import de.up.hpi.informationsystems.fouleggs.dactors.SystemInitializer

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.language.postfixOps

object Main extends App {

println("Starting system")
SystemInitializer.initializer ! SystemInitializer.Startup(5 seconds)

sys.addShutdownHook({
println("Received shutdown signal from JVM")
SystemInitializer.initializer ! SystemInitializer.Shutdown
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package de.up.hpi.informationsystems.fouleggs.dactors

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import de.up.hpi.informationsystems.adbms.Dactor
import de.up.hpi.informationsystems.adbms.protocols.DefaultMessagingProtocol
import de.up.hpi.informationsystems.adbms.record.Record
import de.up.hpi.informationsystems.adbms.record.ColumnCellMapping._
import de.up.hpi.informationsystems.adbms.relation.Relation
import de.up.hpi.informationsystems.adbms.protocols.RequestResponseProtocol

object AdminSession {

final case object Up

object AddCastToFilm {
final case class Request(personId: Int, filmId: Int, roleName: String) extends RequestResponseProtocol.Request
final case class Success(result: Relation) extends RequestResponseProtocol.Success
final case class Failure(e: Throwable) extends RequestResponseProtocol.Failure
}

def props: Props = Props[AdminSession]
}

/**
* Provides top level functionalities
*/
class AdminSession extends Actor with ActorLogging {
override def receive: Receive = commonBehaviour

def commonBehaviour: Receive = {
case AdminSession.Up => sender() ! akka.actor.Status.Success
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get the point of this one. If it's for a health-check, then you could use Identify and ActorIdentity or just rely on the parent-child-relationship and listen in the SystemInitializer for a Terminated message.

Nothing, you have to change now.

case AdminSession.AddCastToFilm.Request(personId, filmId, roleName) =>
addCastToFilm(personId, filmId, roleName)
case DefaultMessagingProtocol.SelectAllFromRelation.Success(rel) => log.info(rel.toString)
}

def addCastToFilm(personId: Int, filmId: Int, roleName: String): Unit = {
log.info(s"Adding person $personId as $roleName to film $filmId")
val functor: ActorRef = context.system.actorOf(CastAndFilmographyFunctor.props(personId, filmId, roleName, self))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

context.become(waitingForSuccess(functor) orElse commonBehaviour)
}

def waitingForSuccess(from: ActorRef): Receive = {
case akka.actor.Status.Success if sender == from => {
context.become(commonBehaviour)
log.info("Connected cast to film")

val empire = Dactor.dactorSelection(context.system, classOf[Film], 1)
val mark = Dactor.dactorSelection(context.system, classOf[Person], 1)

empire ! DefaultMessagingProtocol.SelectAllFromRelation.Request(Film.Cast.name)
mark ! DefaultMessagingProtocol.SelectAllFromRelation.Request(Person.Filmography.name)
}
}
}

object CastAndFilmographyFunctor {
def props(personId: Int, filmId: Int, roleName: String, backTo: ActorRef): Props =
Props(new CastAndFilmographyFunctor(personId, filmId, roleName, backTo))
}

class CastAndFilmographyFunctor(personId: Int, filmId: Int, roleName: String, backTo: ActorRef) extends Actor {

val sub1: ActorRef = context.system.actorOf(AddFilmFunctor.props(personId, filmId, roleName, self))
val sub2: ActorRef = context.system.actorOf(AddCastFunctor.props(personId, filmId, roleName, self))

override def receive: Receive = waitingForAck(Seq(sub1, sub2))

def waitingForAck(pending: Seq[ActorRef]): Receive = {
case akka.actor.Status.Success =>
val remainingACKs = pending.filterNot(_ == sender())

if(remainingACKs.isEmpty) {
backTo ! akka.actor.Status.Success
context.stop(self)
} else {
context.become(waitingForAck(remainingACKs))
}

case akka.actor.Status.Failure(e) =>
backTo ! akka.actor.Status.Failure(e)
context.stop(self)
}
}

object AddFilmFunctor {
def props(personId: Int, filmId: Int, roleName: String, backTo: ActorRef): Props =
Props(new AddFilmFunctor(personId: Int, filmId: Int, roleName: String, backTo: ActorRef))
}

class AddFilmFunctor(personId: Int, filmId: Int, roleName: String, backTo: ActorRef) extends Actor {

override def receive: Receive = waitingForFilmInfo orElse commonBehaviour

Dactor.dactorSelection(context.system, classOf[Film], filmId) ! DefaultMessagingProtocol.SelectAllFromRelation.Request("film_info")

def waitingForFilmInfo: Receive = {
case DefaultMessagingProtocol.SelectAllFromRelation.Failure(e) => fail(e)
case DefaultMessagingProtocol.SelectAllFromRelation.Success(relation: Relation) =>
val filmInfoOption: Option[Record] = relation.records.toOption match {
case Some(records: Seq[Record]) => records.headOption
case _ => None
}
filmInfoOption match {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alignment ...

case None => fail(new RuntimeException("Received empty film info"))

case Some(filmInfo: Record) =>
val newFilmRecord: Record = Person.Filmography.newRecord(
Person.Filmography.filmId ~> filmId &
Person.Filmography.roleName ~> roleName &
Person.Filmography.filmName ~> filmInfo(Film.Info.title) &
Person.Filmography.filmRelease ~> filmInfo(Film.Info.release)
).build()
Dactor.dactorSelection(context.system, classOf[Person], personId) ! DefaultMessagingProtocol.InsertIntoRelation("filmography", Seq(newFilmRecord))
context.become(waitingForInsertAck orElse commonBehaviour)
}
}

def waitingForInsertAck: Receive = {
case akka.actor.Status.Success =>
backTo ! akka.actor.Status.Success
context.stop(self) // because this is our last state
}

def commonBehaviour: Receive = {
case akka.actor.Status.Failure(e) => fail(e)
}

private def fail(e: Throwable): Unit = {
backTo ! akka.actor.Status.Failure(e)
context.stop(self)
}
}

object AddCastFunctor {
def props(personId: Int, filmId: Int, roleName: String, backTo: ActorRef): Props =
Props(new AddCastFunctor(personId: Int, filmId: Int, roleName: String, backTo: ActorRef))
}

class AddCastFunctor(personId: Int, filmId: Int, roleName: String, backTo: ActorRef) extends Actor {

override def receive: Receive = waitingForPersonInfo orElse commonBehaviour

// very first message has to be sent outside of Receives
Dactor.dactorSelection(context.system, classOf[Person], personId) ! DefaultMessagingProtocol.SelectAllFromRelation.Request("person_info")

def waitingForPersonInfo: Receive = {
case DefaultMessagingProtocol.SelectAllFromRelation.Failure(e) => fail(e)
case DefaultMessagingProtocol.SelectAllFromRelation.Success(relation: Relation) => {
val personInfoOption: Option[Record] = relation.records.toOption match {
case Some(records: Seq[Record]) => records.headOption
case _ => None
}
personInfoOption match {
case None => fail(new RuntimeException("Received empty personInfo"))

case Some(personInfo: Record) =>
val newCastRecord: Record = Film.Cast.newRecord(
Film.Cast.firstName ~> personInfo(Person.Info.firstName) &
Film.Cast.lastName ~> personInfo(Person.Info.lastName) &
Film.Cast.roleName ~> roleName &
Film.Cast.personId ~> personId
).build()
Dactor.dactorSelection(context.system, classOf[Film], filmId) ! DefaultMessagingProtocol.InsertIntoRelation("film_cast", Seq(newCastRecord))
context.become(waitingForInsertAck orElse commonBehaviour)
}
}
}

def waitingForInsertAck: Receive = {
case akka.actor.Status.Success =>
backTo ! akka.actor.Status.Success
context.stop(self)
}

def commonBehaviour: Receive = {
case akka.actor.Status.Failure(e) => fail(e)
}

private def fail(e: Throwable): Unit = {
backTo ! akka.actor.Status.Failure(e)
context.stop(self)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package de.up.hpi.informationsystems.fouleggs.dactors

import java.time.ZonedDateTime

import akka.actor.Actor
import de.up.hpi.informationsystems.adbms.Dactor
import de.up.hpi.informationsystems.adbms.definition.ColumnDef.UntypedColumnDef
import de.up.hpi.informationsystems.adbms.definition.{ColumnDef, RelationDef}
import de.up.hpi.informationsystems.adbms.protocols.DefaultMessageHandling
import de.up.hpi.informationsystems.adbms.relation.{MutableRelation, RowRelation, SingleRowRelation}

object Film {
// implicit default values
import de.up.hpi.informationsystems.adbms.definition.ColumnTypeDefaults._

object Info extends RelationDef {
// val filmId: ColumnDef[Int] = ColumnDef[Int]("film_id") should be same as the Dactors id and therefor superfluous
val title: ColumnDef[String] = ColumnDef[String]("film_title", "Untitled")
val description: ColumnDef[String] = ColumnDef[String]("film_descr")
val release: ColumnDef[ZonedDateTime] = ColumnDef[ZonedDateTime]("film_release")

override val name: String = "film_info"
override val columns: Set[UntypedColumnDef] = Set(title, description, release)
}

object Cast extends RelationDef {
val personId: ColumnDef[Int] = ColumnDef[Int]("person_id")
val firstName: ColumnDef[String] = ColumnDef[String]("first_name")
val lastName: ColumnDef[String] = ColumnDef[String]("last_name")
val roleName: ColumnDef[String] = ColumnDef[String]("role_name")

override val name: String = "film_cast"
override val columns: Set[UntypedColumnDef] = Set(personId, firstName, lastName, roleName)
}

class FilmBase(id: Int) extends Dactor(id) {
/**
* Returns a map of relation definition and corresponding relational store.
*
* @return map of relation definition and corresponding relational store
*/
override protected val relations: Map[RelationDef, MutableRelation] =
Map(Film.Info -> SingleRowRelation(Film.Info)) ++
Map(Film.Cast -> RowRelation(Film.Cast))

override def receive: Receive = Actor.emptyBehavior
}
}

class Film(id: Int) extends Film.FilmBase(id) with DefaultMessageHandling

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package de.up.hpi.informationsystems.fouleggs.dactors

import java.time.ZonedDateTime

import akka.actor.Actor
import de.up.hpi.informationsystems.adbms.Dactor
import de.up.hpi.informationsystems.adbms.definition.ColumnDef.UntypedColumnDef
import de.up.hpi.informationsystems.adbms.definition.{ColumnDef, RelationDef}
import de.up.hpi.informationsystems.adbms.protocols.DefaultMessageHandling
import de.up.hpi.informationsystems.adbms.relation.{MutableRelation, RowRelation, SingleRowRelation}

object Person {
// Relation Definitions
import de.up.hpi.informationsystems.adbms.definition.ColumnTypeDefaults._

object Info extends RelationDef {
val personId: ColumnDef[Int] = ColumnDef[Int]("person_id")
val firstName: ColumnDef[String] = ColumnDef[String]("first_name")
val lastName: ColumnDef[String] = ColumnDef[String]("last_name")
val birthday: ColumnDef[ZonedDateTime] = ColumnDef[ZonedDateTime]("birthday")

override val name: String = "person_info"
override val columns: Set[UntypedColumnDef] = Set(personId, firstName, lastName, birthday)
}

object Filmography extends RelationDef {
val filmId: ColumnDef[Int] = ColumnDef[Int]("film_id")
val filmName: ColumnDef[String] = ColumnDef[String]("film_name")
val filmRelease: ColumnDef[ZonedDateTime] = ColumnDef[ZonedDateTime]("film_release")
val roleName: ColumnDef[String] = ColumnDef[String]("role_name")

override val name: String = "filmography"
override val columns: Set[UntypedColumnDef] = Set(filmId, filmName, filmRelease, roleName)
}

// Dactor Definition
class PersonBase(id: Int) extends Dactor(id) {
/**
* Returns a map of relation definition and corresponding relational store.
*
* @return map of relation definition and corresponding relational store
*/
override protected val relations: Map[RelationDef, MutableRelation] =
Map(Person.Info -> SingleRowRelation(Person.Info)) ++
Map(Person.Filmography -> RowRelation(Person.Filmography))

override def receive: Receive = Actor.emptyBehavior
}
}

class Person(id: Int) extends Person.PersonBase(id) with DefaultMessageHandling

Loading