Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Enqueue Route #13

Merged
merged 8 commits into from
Jun 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
es-priority-mailbox {
mailbox-type = "de.upb.cs.swt.delphi.webapi.ElasticPriorityMailbox"
}

akka.actor.deployment {
/espriomailboxactor {
mailbox = es-priority-mailbox
}
}

akka {
http {
server {
remote-address-header = on
}
}
}

# Use this dispatcher for actors that make blocking calls to the Elasticsearch database
elasticsearch-handling-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 4
# This thread pool is intended for development purposes, and should be increased for production
}
throughput = 1
}
8 changes: 8 additions & 0 deletions src/main/scala/de/upb/cs/swt/delphi/webapi/AppLogging.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package de.upb.cs.swt.delphi.webapi

import akka.actor.{ActorSystem, ExtendedActorSystem}
import akka.event.{BusLogging, LoggingAdapter}

trait AppLogging {
def log(implicit system: ActorSystem): LoggingAdapter = new BusLogging(system.eventStream, this.getClass.getName, this.getClass, system.asInstanceOf[ExtendedActorSystem].logFilter)
}
13 changes: 10 additions & 3 deletions src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package de.upb.cs.swt.delphi.webapi

import com.sksamuel.elastic4s.ElasticsearchClientUri
import com.sksamuel.elastic4s.{ElasticsearchClientUri, IndexAndType}
import com.sksamuel.elastic4s.http.ElasticDsl._

/**
* @author Ben Hermann
*/
class Configuration(val bindHost: String = "0.0.0.0",
class Configuration( //Server and Elasticsearch configuration
val bindHost: String = "0.0.0.0",
val bindPort: Int = 8080,
val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri(
sys.env.getOrElse("DELPHI_ELASTIC_URI", "elasticsearch://localhost:9200"))) {
sys.env.getOrElse("DELPHI_ELASTIC_URI", "elasticsearch://localhost:9200")),
val esProjectIndex: IndexAndType = "delphi" / "project",

//Actor system configuration
val elasticActorPoolSize: Int = 8
) {

}
41 changes: 41 additions & 0 deletions src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package de.upb.cs.swt.delphi.webapi

import akka.actor.{Actor, ActorLogging, Props}
import com.sksamuel.elastic4s.IndexAndType
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.http.HttpClient
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

class ElasticActor(configuration: Configuration, index: IndexAndType) extends Actor with ActorLogging{

implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("elasticsearch-handling-dispatcher")
val client = HttpClient(configuration.elasticsearchClientUri)

override def preStart(): Unit = log.info("Search actor started")
override def postStop(): Unit = log.info("Search actor shut down")
context.setReceiveTimeout(2 seconds)

override def receive = {
case Enqueue(id) => getSource(id)
case Retrieve(id) => getSource(id)
}

private def getSource(id: String) = {
log.info("Executing get on entry {}", id)
def source = client.execute{
get(id).from(index)
}.await match {
case Right(res) => res.body.get
case Left(_) => Option.empty
}
sender().tell(source, context.self)
}
}

object ElasticActor{
def props(configuration: Configuration, index: IndexAndType) : Props = Props(new ElasticActor(configuration, index))
.withMailbox("es-priority-mailbox")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package de.upb.cs.swt.delphi.webapi

import akka.actor.{Actor, ActorLogging, Props, Terminated}
import akka.routing.{ActorRefRoutee, RoundRobinRoutingLogic, Router}
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.ElasticMessage

class ElasticActorManager(configuration: Configuration) extends Actor with ActorLogging{

private val index = configuration.esProjectIndex
private var elasticRouter = {
val routees = Vector.fill(configuration.elasticActorPoolSize) {
val r = context.actorOf(ElasticActor.props(configuration, index))
context watch r
ActorRefRoutee(r)
}
Router(RoundRobinRoutingLogic(), routees)
}

override def preStart(): Unit = log.info("Actor manager started")
override def postStop(): Unit = log.info("Actor manager shut down")

override def receive = {
case em: ElasticMessage => {
log.info("Forwarding request {} to ElasticActor", em)
elasticRouter.route(em, sender())
}
case Terminated(id) => {
elasticRouter.removeRoutee(id)
val r = context.actorOf(ElasticActor.props(configuration, index))
context watch r
elasticRouter = elasticRouter.addRoutee(r)
}
}
}

object ElasticActorManager{
def props(configuration: Configuration) : Props = Props(new ElasticActorManager(configuration))
.withMailbox("es-priority-mailbox")

sealed trait ElasticMessage

final case class Retrieve(id: String) extends ElasticMessage
final case class Enqueue(id: String) extends ElasticMessage
}
20 changes: 0 additions & 20 deletions src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticClient.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package de.upb.cs.swt.delphi.webapi

import akka.actor.ActorSystem
import akka.dispatch.{PriorityGenerator, UnboundedStablePriorityMailbox}
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
import com.typesafe.config.Config

class ElasticPriorityMailbox (settings: ActorSystem.Settings, config: Config)
extends UnboundedStablePriorityMailbox(
PriorityGenerator{
case Retrieve(_) => 5
case Enqueue(_) => 1
case _ => 2
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package de.upb.cs.swt.delphi.webapi


import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.actor.Timers
import akka.http.scaladsl.model.RemoteAddress
import de.upb.cs.swt.delphi.webapi.ElasticRequestLimiter._
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.ElasticMessage

import scala.concurrent.duration._
import scala.collection.mutable

//Limits the number of requests any given IP can make by tracking how many requests an IP has made within a given
// window of time, and timing out any IP that exceeds a threshold by rejecting any further request for a period of time
class ElasticRequestLimiter(configuration: Configuration, nextActor: ActorRef) extends Actor with ActorLogging with Timers {

private val window = 1 second
private val threshold = 10
private val timeout = 2 hours

private var recentIPs: mutable.Map[String, Int] = mutable.Map()
private var blockedIPs: mutable.Set[String] = mutable.Set()

override def preStart(): Unit = {
log.info("Request limiter started")
timers.startPeriodicTimer(ClearTimer, ClearLogs, window)
}
override def postStop(): Unit = log.info("Request limiter shut down")

override def receive = {
case Validate(rawIp, message) => {
val ip = rawIp.toOption.map(_.getHostAddress).getOrElse("unknown")
//First, reject IPs marked as blocked
if (blockedIPs.contains(ip)) {
rejectRequest()
} else {
//Check if this IP has made any requests recently
if (recentIPs.contains(ip)) {
//If so, increment their counter and test if they have exceeded the request threshold
recentIPs.update(ip, recentIPs(ip) + 1)
if (recentIPs(ip) > threshold) {
//If the threshold has been exceeded, mark this IP as blocked and reject it, and set up a message to unblock it after a period
blockedIPs += ip
log.info("Blocked IP {} due to exceeding request frequency threshold", ip)
timers.startSingleTimer(ForgiveTimer(ip), Forgive(ip), timeout)
rejectRequest()
} else {
//Else, forward this message
nextActor forward message
}
} else {
//Else, register their request in the map and pass it to the next actor
recentIPs += (ip -> 1)
nextActor forward message
}
}
}
case ClearLogs =>
recentIPs.clear()
case Forgive(ip) => {
blockedIPs -= ip
log.info("Forgave IP {} after timeout", ip)
}
}

//Rejects requests from blocked IPs
private def rejectRequest() =
sender() ! "Sorry, you have exceeded the limit on request frequency for unregistered users.\n" +
"As a result, you have been timed out.\n" +
"Please wait a while or register an account with us to continue using this service."
}

object ElasticRequestLimiter{
def props(configuration: Configuration, nextActor: ActorRef) : Props = Props(new ElasticRequestLimiter(configuration, nextActor))

final case class Validate(rawIp: RemoteAddress, message: ElasticMessage)
final case object ClearLogs
final case class Forgive(ip: String)

final case object ClearTimer
final case class ForgiveTimer(ip: String)
}
41 changes: 36 additions & 5 deletions src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,33 @@
package de.upb.cs.swt.delphi.webapi

import java.util.concurrent.TimeUnit

import akka.actor.ActorSystem
import akka.http.scaladsl.server.HttpApp
import akka.pattern.ask
import akka.util.Timeout
import de.upb.cs.swt.delphi.featuredefinitions.FeatureListMapping
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
import de.upb.cs.swt.delphi.webapi.ElasticRequestLimiter.Validate
import spray.json._

/**
* Web server configuration for Delphi web API.
*/
object Server extends HttpApp with JsonSupport {
object Server extends HttpApp with JsonSupport with AppLogging {

private val configuration = new Configuration()
private val system = ActorSystem("delphi-webapi")
private val actorManager = system.actorOf(ElasticActorManager.props(configuration))
private val requestLimiter = system.actorOf(ElasticRequestLimiter.props(configuration, actorManager))
implicit val timeout = Timeout(5, TimeUnit.SECONDS)

override def routes =
path("version") { version } ~
path("features") { features } ~
pathPrefix("search" / Remaining) { query => search(query) } ~
pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) }
pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } ~
pathPrefix("enqueue" / Remaining) { identifier => enqueue(identifier) }


private def version = {
Expand All @@ -34,9 +48,25 @@ object Server extends HttpApp with JsonSupport {

def retrieve(identifier: String) = {
get {
complete(
ElasticClient.getSource(identifier)
)
pass { //TODO: Require authentication here
complete(
(actorManager ? Retrieve(identifier)).mapTo[String]
)
} ~ extractClientIP{ ip =>
complete(
(requestLimiter ? Validate(ip, Retrieve(identifier))).mapTo[String]
)
}
}
}

def enqueue(identifier: String) = {
get {
pass { //TODO: Require authorization here
complete(
(actorManager ? Enqueue(identifier)).mapTo[String]
)
}
}
}

Expand All @@ -51,6 +81,7 @@ object Server extends HttpApp with JsonSupport {
def main(args: Array[String]): Unit = {
val configuration = new Configuration()
Server.startServer(configuration.bindHost, configuration.bindPort)
system.terminate()
}


Expand Down