-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #105 from CodeLionX/feature/fouleggs
Add fouleggs sample application
- Loading branch information
Showing
6 changed files
with
428 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
19 changes: 19 additions & 0 deletions
19
fouleggs/src/de/up/hpi/informationsystems/fouleggs/Main.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package de.up.hpi.informationsystems.fouleggs | ||
|
||
import akka.actor.Terminated | ||
import de.up.hpi.informationsystems.fouleggs.dactors.SystemInitializer | ||
|
||
import scala.concurrent.{Await, Future} | ||
import scala.concurrent.duration._ | ||
import scala.language.postfixOps | ||
|
||
object Main extends App { | ||
|
||
println("Starting system") | ||
SystemInitializer.initializer ! SystemInitializer.Startup(5 seconds) | ||
|
||
sys.addShutdownHook({ | ||
println("Received shutdown signal from JVM") | ||
SystemInitializer.initializer ! SystemInitializer.Shutdown | ||
}) | ||
} |
184 changes: 184 additions & 0 deletions
184
fouleggs/src/de/up/hpi/informationsystems/fouleggs/dactors/AdminSession.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
package de.up.hpi.informationsystems.fouleggs.dactors | ||
|
||
import akka.actor.{Actor, ActorLogging, ActorRef, Props} | ||
import de.up.hpi.informationsystems.adbms.Dactor | ||
import de.up.hpi.informationsystems.adbms.protocols.DefaultMessagingProtocol | ||
import de.up.hpi.informationsystems.adbms.record.Record | ||
import de.up.hpi.informationsystems.adbms.record.ColumnCellMapping._ | ||
import de.up.hpi.informationsystems.adbms.relation.Relation | ||
import de.up.hpi.informationsystems.adbms.protocols.RequestResponseProtocol | ||
|
||
object AdminSession { | ||
|
||
final case object Up | ||
|
||
object AddCastToFilm { | ||
final case class Request(personId: Int, filmId: Int, roleName: String) extends RequestResponseProtocol.Request | ||
final case class Success(result: Relation) extends RequestResponseProtocol.Success | ||
final case class Failure(e: Throwable) extends RequestResponseProtocol.Failure | ||
} | ||
|
||
def props: Props = Props[AdminSession] | ||
} | ||
|
||
/** | ||
* Provides top level functionalities | ||
*/ | ||
class AdminSession extends Actor with ActorLogging { | ||
override def receive: Receive = commonBehaviour | ||
|
||
def commonBehaviour: Receive = { | ||
case AdminSession.Up => sender() ! akka.actor.Status.Success | ||
case AdminSession.AddCastToFilm.Request(personId, filmId, roleName) => | ||
addCastToFilm(personId, filmId, roleName) | ||
case DefaultMessagingProtocol.SelectAllFromRelation.Success(rel) => log.info(rel.toString) | ||
} | ||
|
||
def addCastToFilm(personId: Int, filmId: Int, roleName: String): Unit = { | ||
log.info(s"Adding person $personId as $roleName to film $filmId") | ||
val functor: ActorRef = context.system.actorOf(CastAndFilmographyFunctor.props(personId, filmId, roleName, self)) | ||
context.become(waitingForSuccess(functor) orElse commonBehaviour) | ||
} | ||
|
||
def waitingForSuccess(from: ActorRef): Receive = { | ||
case akka.actor.Status.Success if sender == from => { | ||
context.become(commonBehaviour) | ||
log.info("Connected cast to film") | ||
|
||
val empire = Dactor.dactorSelection(context.system, classOf[Film], 1) | ||
val mark = Dactor.dactorSelection(context.system, classOf[Person], 1) | ||
|
||
empire ! DefaultMessagingProtocol.SelectAllFromRelation.Request(Film.Cast.name) | ||
mark ! DefaultMessagingProtocol.SelectAllFromRelation.Request(Person.Filmography.name) | ||
} | ||
} | ||
} | ||
|
||
object CastAndFilmographyFunctor { | ||
def props(personId: Int, filmId: Int, roleName: String, backTo: ActorRef): Props = | ||
Props(new CastAndFilmographyFunctor(personId, filmId, roleName, backTo)) | ||
} | ||
|
||
class CastAndFilmographyFunctor(personId: Int, filmId: Int, roleName: String, backTo: ActorRef) extends Actor { | ||
|
||
val sub1: ActorRef = context.system.actorOf(AddFilmFunctor.props(personId, filmId, roleName, self)) | ||
val sub2: ActorRef = context.system.actorOf(AddCastFunctor.props(personId, filmId, roleName, self)) | ||
|
||
override def receive: Receive = waitingForAck(Seq(sub1, sub2)) | ||
|
||
def waitingForAck(pending: Seq[ActorRef]): Receive = { | ||
case akka.actor.Status.Success => | ||
val remainingACKs = pending.filterNot(_ == sender()) | ||
|
||
if(remainingACKs.isEmpty) { | ||
backTo ! akka.actor.Status.Success | ||
context.stop(self) | ||
} else { | ||
context.become(waitingForAck(remainingACKs)) | ||
} | ||
|
||
case akka.actor.Status.Failure(e) => | ||
backTo ! akka.actor.Status.Failure(e) | ||
context.stop(self) | ||
} | ||
} | ||
|
||
object AddFilmFunctor { | ||
def props(personId: Int, filmId: Int, roleName: String, backTo: ActorRef): Props = | ||
Props(new AddFilmFunctor(personId: Int, filmId: Int, roleName: String, backTo: ActorRef)) | ||
} | ||
|
||
class AddFilmFunctor(personId: Int, filmId: Int, roleName: String, backTo: ActorRef) extends Actor { | ||
|
||
override def receive: Receive = waitingForFilmInfo orElse commonBehaviour | ||
|
||
Dactor.dactorSelection(context.system, classOf[Film], filmId) ! DefaultMessagingProtocol.SelectAllFromRelation.Request("film_info") | ||
|
||
def waitingForFilmInfo: Receive = { | ||
case DefaultMessagingProtocol.SelectAllFromRelation.Failure(e) => fail(e) | ||
case DefaultMessagingProtocol.SelectAllFromRelation.Success(relation: Relation) => | ||
val filmInfoOption: Option[Record] = relation.records.toOption match { | ||
case Some(records: Seq[Record]) => records.headOption | ||
case _ => None | ||
} | ||
filmInfoOption match { | ||
case None => fail(new RuntimeException("Received empty film info")) | ||
|
||
case Some(filmInfo: Record) => | ||
val newFilmRecord: Record = Person.Filmography.newRecord( | ||
Person.Filmography.filmId ~> filmId & | ||
Person.Filmography.roleName ~> roleName & | ||
Person.Filmography.filmName ~> filmInfo(Film.Info.title) & | ||
Person.Filmography.filmRelease ~> filmInfo(Film.Info.release) | ||
).build() | ||
Dactor.dactorSelection(context.system, classOf[Person], personId) ! DefaultMessagingProtocol.InsertIntoRelation("filmography", Seq(newFilmRecord)) | ||
context.become(waitingForInsertAck orElse commonBehaviour) | ||
} | ||
} | ||
|
||
def waitingForInsertAck: Receive = { | ||
case akka.actor.Status.Success => | ||
backTo ! akka.actor.Status.Success | ||
context.stop(self) // because this is our last state | ||
} | ||
|
||
def commonBehaviour: Receive = { | ||
case akka.actor.Status.Failure(e) => fail(e) | ||
} | ||
|
||
private def fail(e: Throwable): Unit = { | ||
backTo ! akka.actor.Status.Failure(e) | ||
context.stop(self) | ||
} | ||
} | ||
|
||
object AddCastFunctor { | ||
def props(personId: Int, filmId: Int, roleName: String, backTo: ActorRef): Props = | ||
Props(new AddCastFunctor(personId: Int, filmId: Int, roleName: String, backTo: ActorRef)) | ||
} | ||
|
||
class AddCastFunctor(personId: Int, filmId: Int, roleName: String, backTo: ActorRef) extends Actor { | ||
|
||
override def receive: Receive = waitingForPersonInfo orElse commonBehaviour | ||
|
||
// very first message has to be sent outside of Receives | ||
Dactor.dactorSelection(context.system, classOf[Person], personId) ! DefaultMessagingProtocol.SelectAllFromRelation.Request("person_info") | ||
|
||
def waitingForPersonInfo: Receive = { | ||
case DefaultMessagingProtocol.SelectAllFromRelation.Failure(e) => fail(e) | ||
case DefaultMessagingProtocol.SelectAllFromRelation.Success(relation: Relation) => { | ||
val personInfoOption: Option[Record] = relation.records.toOption match { | ||
case Some(records: Seq[Record]) => records.headOption | ||
case _ => None | ||
} | ||
personInfoOption match { | ||
case None => fail(new RuntimeException("Received empty personInfo")) | ||
|
||
case Some(personInfo: Record) => | ||
val newCastRecord: Record = Film.Cast.newRecord( | ||
Film.Cast.firstName ~> personInfo(Person.Info.firstName) & | ||
Film.Cast.lastName ~> personInfo(Person.Info.lastName) & | ||
Film.Cast.roleName ~> roleName & | ||
Film.Cast.personId ~> personId | ||
).build() | ||
Dactor.dactorSelection(context.system, classOf[Film], filmId) ! DefaultMessagingProtocol.InsertIntoRelation("film_cast", Seq(newCastRecord)) | ||
context.become(waitingForInsertAck orElse commonBehaviour) | ||
} | ||
} | ||
} | ||
|
||
def waitingForInsertAck: Receive = { | ||
case akka.actor.Status.Success => | ||
backTo ! akka.actor.Status.Success | ||
context.stop(self) | ||
} | ||
|
||
def commonBehaviour: Receive = { | ||
case akka.actor.Status.Failure(e) => fail(e) | ||
} | ||
|
||
private def fail(e: Throwable): Unit = { | ||
backTo ! akka.actor.Status.Failure(e) | ||
context.stop(self) | ||
} | ||
} |
51 changes: 51 additions & 0 deletions
51
fouleggs/src/de/up/hpi/informationsystems/fouleggs/dactors/Film.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package de.up.hpi.informationsystems.fouleggs.dactors | ||
|
||
import java.time.ZonedDateTime | ||
|
||
import akka.actor.Actor | ||
import de.up.hpi.informationsystems.adbms.Dactor | ||
import de.up.hpi.informationsystems.adbms.definition.ColumnDef.UntypedColumnDef | ||
import de.up.hpi.informationsystems.adbms.definition.{ColumnDef, RelationDef} | ||
import de.up.hpi.informationsystems.adbms.protocols.DefaultMessageHandling | ||
import de.up.hpi.informationsystems.adbms.relation.{MutableRelation, RowRelation, SingleRowRelation} | ||
|
||
object Film { | ||
// implicit default values | ||
import de.up.hpi.informationsystems.adbms.definition.ColumnTypeDefaults._ | ||
|
||
object Info extends RelationDef { | ||
// val filmId: ColumnDef[Int] = ColumnDef[Int]("film_id") should be same as the Dactors id and therefor superfluous | ||
val title: ColumnDef[String] = ColumnDef[String]("film_title", "Untitled") | ||
val description: ColumnDef[String] = ColumnDef[String]("film_descr") | ||
val release: ColumnDef[ZonedDateTime] = ColumnDef[ZonedDateTime]("film_release") | ||
|
||
override val name: String = "film_info" | ||
override val columns: Set[UntypedColumnDef] = Set(title, description, release) | ||
} | ||
|
||
object Cast extends RelationDef { | ||
val personId: ColumnDef[Int] = ColumnDef[Int]("person_id") | ||
val firstName: ColumnDef[String] = ColumnDef[String]("first_name") | ||
val lastName: ColumnDef[String] = ColumnDef[String]("last_name") | ||
val roleName: ColumnDef[String] = ColumnDef[String]("role_name") | ||
|
||
override val name: String = "film_cast" | ||
override val columns: Set[UntypedColumnDef] = Set(personId, firstName, lastName, roleName) | ||
} | ||
|
||
class FilmBase(id: Int) extends Dactor(id) { | ||
/** | ||
* Returns a map of relation definition and corresponding relational store. | ||
* | ||
* @return map of relation definition and corresponding relational store | ||
*/ | ||
override protected val relations: Map[RelationDef, MutableRelation] = | ||
Map(Film.Info -> SingleRowRelation(Film.Info)) ++ | ||
Map(Film.Cast -> RowRelation(Film.Cast)) | ||
|
||
override def receive: Receive = Actor.emptyBehavior | ||
} | ||
} | ||
|
||
class Film(id: Int) extends Film.FilmBase(id) with DefaultMessageHandling | ||
|
52 changes: 52 additions & 0 deletions
52
fouleggs/src/de/up/hpi/informationsystems/fouleggs/dactors/Person.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package de.up.hpi.informationsystems.fouleggs.dactors | ||
|
||
import java.time.ZonedDateTime | ||
|
||
import akka.actor.Actor | ||
import de.up.hpi.informationsystems.adbms.Dactor | ||
import de.up.hpi.informationsystems.adbms.definition.ColumnDef.UntypedColumnDef | ||
import de.up.hpi.informationsystems.adbms.definition.{ColumnDef, RelationDef} | ||
import de.up.hpi.informationsystems.adbms.protocols.DefaultMessageHandling | ||
import de.up.hpi.informationsystems.adbms.relation.{MutableRelation, RowRelation, SingleRowRelation} | ||
|
||
object Person { | ||
// Relation Definitions | ||
import de.up.hpi.informationsystems.adbms.definition.ColumnTypeDefaults._ | ||
|
||
object Info extends RelationDef { | ||
val personId: ColumnDef[Int] = ColumnDef[Int]("person_id") | ||
val firstName: ColumnDef[String] = ColumnDef[String]("first_name") | ||
val lastName: ColumnDef[String] = ColumnDef[String]("last_name") | ||
val birthday: ColumnDef[ZonedDateTime] = ColumnDef[ZonedDateTime]("birthday") | ||
|
||
override val name: String = "person_info" | ||
override val columns: Set[UntypedColumnDef] = Set(personId, firstName, lastName, birthday) | ||
} | ||
|
||
object Filmography extends RelationDef { | ||
val filmId: ColumnDef[Int] = ColumnDef[Int]("film_id") | ||
val filmName: ColumnDef[String] = ColumnDef[String]("film_name") | ||
val filmRelease: ColumnDef[ZonedDateTime] = ColumnDef[ZonedDateTime]("film_release") | ||
val roleName: ColumnDef[String] = ColumnDef[String]("role_name") | ||
|
||
override val name: String = "filmography" | ||
override val columns: Set[UntypedColumnDef] = Set(filmId, filmName, filmRelease, roleName) | ||
} | ||
|
||
// Dactor Definition | ||
class PersonBase(id: Int) extends Dactor(id) { | ||
/** | ||
* Returns a map of relation definition and corresponding relational store. | ||
* | ||
* @return map of relation definition and corresponding relational store | ||
*/ | ||
override protected val relations: Map[RelationDef, MutableRelation] = | ||
Map(Person.Info -> SingleRowRelation(Person.Info)) ++ | ||
Map(Person.Filmography -> RowRelation(Person.Filmography)) | ||
|
||
override def receive: Receive = Actor.emptyBehavior | ||
} | ||
} | ||
|
||
class Person(id: Int) extends Person.PersonBase(id) with DefaultMessageHandling | ||
|
Oops, something went wrong.