Skip to content
This repository has been archived by the owner on Mar 21, 2022. It is now read-only.

Commit

Permalink
Improve Broadcaster responsiveness
Browse files Browse the repository at this point in the history
* Web Socket messages are now sent asynchronously.
* Scoreboard refreshes are now requested in a separate thread from the
  rest of the Web Socket updates, so having slow sockets won't impede
  the scoreboard from updating. Also, to improve responsiveness during a
  rejudge, scoreboards will only update up to once every 10 seconds (by
  default, configurable).

Fixed #13
  • Loading branch information
lhchavez committed May 2, 2015
1 parent ccaa3a5 commit 014eef4
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 93 deletions.
1 change: 1 addition & 0 deletions common/src/main/scala/com/omegaup/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ case class RoutingConfig(

case class ScoreboardRefreshConfig(
disabled: Boolean = false,
interval: Int = 10000,
token: String = "secret",
url: String = "http://localhost/api/scoreboard/refresh/"
)
Expand Down
224 changes: 131 additions & 93 deletions grader/src/main/scala/com/omegaup/broadcaster/Broadcaster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ import org.eclipse.jetty.websocket.servlet._
import scala.collection.JavaConversions._
import scala.collection.{mutable,immutable}

class QueuedElement(val contest: String, val broadcast: Boolean, val targetUser: Long, val userOnly: Boolean) {}
class QueuedRun(contest: String, broadcast: Boolean, targetUser: Long, userOnly: Boolean, val ctx: RunContext)
extends QueuedElement(contest, broadcast, targetUser, userOnly) {}
class QueuedMessage(contest: String, broadcast: Boolean, targetUser: Long, userOnly: Boolean, val message: String)
extends QueuedElement(contest, broadcast, targetUser, userOnly) {}
class QueuedMessage(val contest: String, val broadcast: Boolean, val
targetUser: Long, val userOnly: Boolean, val message: String)

class Broadcaster(implicit var serviceCtx: Context) extends Object with
ServiceInterface with Runnable with Log with Using {
ServiceInterface with Log with Using {
private val PathRE = "^/([a-zA-Z0-9_-]+)/?".r
// A collection of subscribers.
private val subscribers = new mutable.HashMap[String, mutable.ArrayBuffer[BroadcasterSession]]
private val subscriberLock = new Object
private val PoisonPill = new QueuedElement(null, true, -1, false)
private val queue = new LinkedBlockingQueue[QueuedElement]
private val broadcastThread = new Thread(this, "BroadcastThread")
private val scoreboardQueue = new mutable.HashSet[String]
private var scoreboardPoisonPill: Boolean = false
private val MessagePoisonPill = new QueuedMessage(null, true, -1, false, null)
private val messageQueue = new LinkedBlockingQueue[QueuedMessage]
private val broadcastThread = new Thread(new BroadcastHandler, "BroadcastThread")
private val scoreboardThread = new Thread(new ScoreboardHandler, "ScoreboardThread")
private val server = new org.eclipse.jetty.server.Server

override def start() = {
Expand All @@ -56,6 +56,7 @@ class Broadcaster(implicit var serviceCtx: Context) extends Object with

log.info("Registering port {}", broadcasterConnector.getLocalPort)

scoreboardThread.start
broadcastThread.start

log.info("Broadcaster started")
Expand Down Expand Up @@ -109,16 +110,47 @@ class Broadcaster(implicit var serviceCtx: Context) extends Object with
def update()(implicit ctx: RunContext): Unit = {
ctx.run.contest match {
case Some(contest) => {
ctx.broadcastQueued
queue.put(new QueuedRun(contest.alias, false,
ctx.run.user match {
messageQueue.put(new QueuedMessage(
contest = contest.alias,
broadcast = false,
targetUser = ctx.run.user match {
case Some(user) => user.id
case None => -1
}, false, ctx))
}
case None => {
ctx.finish
},
userOnly = false,
message = Serialization.writeString(UpdateRunMessage("/run/update/",
RunDetails(
username = ctx.run.user.map(_.username),
contest_alias = Some(contest.alias),
alias = ctx.run.problem.alias,
guid = ctx.run.guid,
runtime = ctx.run.runtime,
memory = ctx.run.memory,
score = ctx.run.score,
contest_score = ctx.run.contest_score,
status = ctx.run.status.toString,
verdict = ctx.run.verdict.toString,
submit_delay = ctx.run.submit_delay,
time = ctx.run.time.getTime / 1000,
language = ctx.run.language.toString
)
))
))
requestScoreboardUpdate(contest.alias)
}

case None => {}
}
ctx.finish
}

def requestScoreboardUpdate(alias: String): Unit = {
if (serviceCtx.config.grader.scoreboard_refresh.disabled) {
return
}
scoreboardQueue.synchronized {
scoreboardQueue.add(alias)
scoreboardQueue.notify
}
}

Expand All @@ -129,104 +161,105 @@ class Broadcaster(implicit var serviceCtx: Context) extends Object with
targetUser: Long = -1,
userOnly: Boolean = false
): BroadcastOutputMessage = {
queue.put(new QueuedMessage(contest, broadcast, targetUser, userOnly, message))
messageQueue.put(new QueuedMessage(contest, broadcast, targetUser, userOnly, message))
new BroadcastOutputMessage(status = "ok")
}

private def runLoop(elm: QueuedElement): Unit = {
val message = elm match {
case m: QueuedRun => {
m.ctx.broadcastDequeued
val run = m.ctx.run

if (!serviceCtx.config.grader.scoreboard_refresh.disabled) {
m.ctx.trace(EventCategory.GraderRefresh) {
try {
log.info("Scoreboard refresh {}",
Https.post[ScoreboardRefreshResponse](
serviceCtx.config.grader.scoreboard_refresh.url,
Map(
"token" -> serviceCtx.config.grader.scoreboard_refresh.token,
"alias" -> elm.contest,
"run" -> run.id.toString
),
runner = false
)
)
} catch {
case e: Exception => log.error(e, "Scoreboard refresh")
private class ScoreboardHandler extends Object with Runnable {
override def run(): Unit = {
while (true) {
try {
val contests = scoreboardQueue.synchronized {
if (scoreboardQueue.isEmpty) {
scoreboardQueue.wait
}
val aliases = scoreboardQueue.toList
scoreboardQueue.clear
aliases
}
if (scoreboardPoisonPill) {
log.info("Scoreboard thread finished normally")
return
}
val t0 = System.currentTimeMillis
runLoop(contests)
val sleepTime = serviceCtx.config.grader.scoreboard_refresh.interval -
(System.currentTimeMillis - t0)
if (sleepTime > 0) {
Thread.sleep(sleepTime)
}
} catch {
case e: Exception => log.error(e, "Scoreboard runLoop")
}

m.ctx.finish

Serialization.writeString(UpdateRunMessage("/run/update/",
RunDetails(
username = run.user.map(_.username),
contest_alias = Some(elm.contest),
alias = run.problem.alias,
guid = run.guid,
runtime = run.runtime,
memory = run.memory,
score = run.score,
contest_score = run.contest_score,
status = run.status.toString,
verdict = run.verdict.toString,
submit_delay = run.submit_delay,
time = run.time.getTime / 1000,
language = run.language.toString
)
))
}

case m: QueuedMessage => {
m.message
}
}

val notifyList = subscriberLock.synchronized {
if (subscribers.contains(elm.contest)) {
subscribers(elm.contest)
.filter(subscriber =>
(
elm.broadcast ||
subscriber.admin ||
elm.targetUser == subscriber.user
) && (
!elm.userOnly ||
!subscriber.admin
)
private def runLoop(contests: Iterable[String]): Unit = {
contests.foreach(contest => {
try {
val result = Https.post[ScoreboardRefreshResponse](
serviceCtx.config.grader.scoreboard_refresh.url,
Map(
"token" -> serviceCtx.config.grader.scoreboard_refresh.token,
"alias" -> contest
),
runner = false
)
} else {
null
}
log.info("Scoreboard refresh {}", result)
} catch {
case e: Exception => log.error(e, "Scoreboard refresh")
}
})
}

if (notifyList != null)
notifyList.foreach(_.send(message))
}

override def run(): Unit = {
while (true) {
try {
val elm = queue.take
if (elm == PoisonPill) {
log.info("Broadcaster thread finished normally")
return
private class BroadcastHandler extends Object with Runnable {
override def run(): Unit = {
while (true) {
try {
val elm = messageQueue.take
if (elm == MessagePoisonPill) {
log.info("Broadcaster thread finished normally")
return
}
runLoop(elm)
} catch {
case e: Exception => log.error(e, "Broadcast runLoop")
}
runLoop(elm)
} catch {
case e: Exception => log.error(e, "runLoop")
}
}

private def runLoop(m: QueuedMessage): Unit = {
val message = m.message

val notifyList = subscriberLock.synchronized {
if (subscribers.contains(m.contest)) {
subscribers(m.contest)
.filter(subscriber =>
(
m.broadcast ||
subscriber.admin ||
m.targetUser == subscriber.user
) && (
!m.userOnly ||
!subscriber.admin
)
)
} else {
null
}
}

if (notifyList != null)
notifyList.foreach(_.send(message))
}
}

class BroadcasterSession(val user: Int, val contest: String, val admin: Boolean, val session: Session) {
def send(message: String): Unit = {
if (!session.isOpen) return
try {
session.getRemote.sendString(message)
session.getRemote.sendStringByFuture(message)
} catch {
case e: Exception => {
log.error(e, "Failed to send a message")
Expand Down Expand Up @@ -368,11 +401,16 @@ class Broadcaster(implicit var serviceCtx: Context) extends Object with
override def stop(): Unit = {
log.info("Broadcaster stopping")
server.stop
queue.put(PoisonPill)
scoreboardQueue.synchronized {
scoreboardPoisonPill = true
scoreboardQueue.notify
}
messageQueue.put(MessagePoisonPill)
}

override def join(): Unit = {
server.join
scoreboardThread.join
broadcastThread.join
log.info("Broadcaster stopped")
}
Expand Down

0 comments on commit 014eef4

Please sign in to comment.