diff --git a/common/src/main/scala/com/omegaup/Config.scala b/common/src/main/scala/com/omegaup/Config.scala index d3d245c..a9751f7 100644 --- a/common/src/main/scala/com/omegaup/Config.scala +++ b/common/src/main/scala/com/omegaup/Config.scala @@ -42,6 +42,7 @@ case class RoutingConfig( case class ScoreboardRefreshConfig( disabled: Boolean = false, + interval: Int = 10000, token: String = "secret", url: String = "http://localhost/api/scoreboard/refresh/" ) diff --git a/grader/src/main/scala/com/omegaup/broadcaster/Broadcaster.scala b/grader/src/main/scala/com/omegaup/broadcaster/Broadcaster.scala index 058b383..a328469 100644 --- a/grader/src/main/scala/com/omegaup/broadcaster/Broadcaster.scala +++ b/grader/src/main/scala/com/omegaup/broadcaster/Broadcaster.scala @@ -17,21 +17,21 @@ import org.eclipse.jetty.websocket.servlet._ import scala.collection.JavaConversions._ import scala.collection.{mutable,immutable} -class QueuedElement(val contest: String, val broadcast: Boolean, val targetUser: Long, val userOnly: Boolean) {} -class QueuedRun(contest: String, broadcast: Boolean, targetUser: Long, userOnly: Boolean, val ctx: RunContext) - extends QueuedElement(contest, broadcast, targetUser, userOnly) {} -class QueuedMessage(contest: String, broadcast: Boolean, targetUser: Long, userOnly: Boolean, val message: String) - extends QueuedElement(contest, broadcast, targetUser, userOnly) {} +class QueuedMessage(val contest: String, val broadcast: Boolean, val + targetUser: Long, val userOnly: Boolean, val message: String) class Broadcaster(implicit var serviceCtx: Context) extends Object with - ServiceInterface with Runnable with Log with Using { + ServiceInterface with Log with Using { private val PathRE = "^/([a-zA-Z0-9_-]+)/?".r // A collection of subscribers. private val subscribers = new mutable.HashMap[String, mutable.ArrayBuffer[BroadcasterSession]] private val subscriberLock = new Object - private val PoisonPill = new QueuedElement(null, true, -1, false) - private val queue = new LinkedBlockingQueue[QueuedElement] - private val broadcastThread = new Thread(this, "BroadcastThread") + private val scoreboardQueue = new mutable.HashSet[String] + private var scoreboardPoisonPill: Boolean = false + private val MessagePoisonPill = new QueuedMessage(null, true, -1, false, null) + private val messageQueue = new LinkedBlockingQueue[QueuedMessage] + private val broadcastThread = new Thread(new BroadcastHandler, "BroadcastThread") + private val scoreboardThread = new Thread(new ScoreboardHandler, "ScoreboardThread") private val server = new org.eclipse.jetty.server.Server override def start() = { @@ -56,6 +56,7 @@ class Broadcaster(implicit var serviceCtx: Context) extends Object with log.info("Registering port {}", broadcasterConnector.getLocalPort) + scoreboardThread.start broadcastThread.start log.info("Broadcaster started") @@ -109,16 +110,47 @@ class Broadcaster(implicit var serviceCtx: Context) extends Object with def update()(implicit ctx: RunContext): Unit = { ctx.run.contest match { case Some(contest) => { - ctx.broadcastQueued - queue.put(new QueuedRun(contest.alias, false, - ctx.run.user match { + messageQueue.put(new QueuedMessage( + contest = contest.alias, + broadcast = false, + targetUser = ctx.run.user match { case Some(user) => user.id case None => -1 - }, false, ctx)) - } - case None => { - ctx.finish + }, + userOnly = false, + message = Serialization.writeString(UpdateRunMessage("/run/update/", + RunDetails( + username = ctx.run.user.map(_.username), + contest_alias = Some(contest.alias), + alias = ctx.run.problem.alias, + guid = ctx.run.guid, + runtime = ctx.run.runtime, + memory = ctx.run.memory, + score = ctx.run.score, + contest_score = ctx.run.contest_score, + status = ctx.run.status.toString, + verdict = ctx.run.verdict.toString, + submit_delay = ctx.run.submit_delay, + time = ctx.run.time.getTime / 1000, + language = ctx.run.language.toString + ) + )) + )) + requestScoreboardUpdate(contest.alias) } + + case None => {} + } + ctx.finish + } + + def requestScoreboardUpdate(alias: String): Unit = { + if (serviceCtx.config.grader.scoreboard_refresh.disabled) { + return + } + scoreboardQueue.synchronized { + scoreboardQueue.add(alias) + scoreboardQueue.notify } } @@ -129,104 +161,105 @@ class Broadcaster(implicit var serviceCtx: Context) extends Object with targetUser: Long = -1, userOnly: Boolean = false ): BroadcastOutputMessage = { - queue.put(new QueuedMessage(contest, broadcast, targetUser, userOnly, message)) + messageQueue.put(new QueuedMessage(contest, broadcast, targetUser, userOnly, message)) new BroadcastOutputMessage(status = "ok") } - private def runLoop(elm: QueuedElement): Unit = { - val message = elm match { - case m: QueuedRun => { - m.ctx.broadcastDequeued - val run = m.ctx.run - - if (!serviceCtx.config.grader.scoreboard_refresh.disabled) { - m.ctx.trace(EventCategory.GraderRefresh) { - try { - log.info("Scoreboard refresh {}", - Https.post[ScoreboardRefreshResponse]( - serviceCtx.config.grader.scoreboard_refresh.url, - Map( - "token" -> serviceCtx.config.grader.scoreboard_refresh.token, - "alias" -> elm.contest, - "run" -> run.id.toString - ), - runner = false - ) - ) - } catch { - case e: Exception => log.error(e, "Scoreboard refresh") + private class ScoreboardHandler extends Object with Runnable { + override def run(): Unit = { + while (true) { + try { + val contests = scoreboardQueue.synchronized { + if (scoreboardQueue.isEmpty) { + scoreboardQueue.wait } + val aliases = scoreboardQueue.toList + scoreboardQueue.clear + aliases } + if (scoreboardPoisonPill) { + log.info("Scoreboard thread finished normally") + return + } + val t0 = System.currentTimeMillis + runLoop(contests) + val sleepTime = serviceCtx.config.grader.scoreboard_refresh.interval - + (System.currentTimeMillis - t0) + if (sleepTime > 0) { + Thread.sleep(sleepTime) + } + } catch { + case e: Exception => log.error(e, "Scoreboard runLoop") } - - m.ctx.finish - - Serialization.writeString(UpdateRunMessage("/run/update/", - RunDetails( - username = run.user.map(_.username), - contest_alias = Some(elm.contest), - alias = run.problem.alias, - guid = run.guid, - runtime = run.runtime, - memory = run.memory, - score = run.score, - contest_score = run.contest_score, - status = run.status.toString, - verdict = run.verdict.toString, - submit_delay = run.submit_delay, - time = run.time.getTime / 1000, - language = run.language.toString - ) - )) - } - - case m: QueuedMessage => { - m.message } } - val notifyList = subscriberLock.synchronized { - if (subscribers.contains(elm.contest)) { - subscribers(elm.contest) - .filter(subscriber => - ( - elm.broadcast || - subscriber.admin || - elm.targetUser == subscriber.user - ) && ( - !elm.userOnly || - !subscriber.admin - ) + private def runLoop(contests: Iterable[String]): Unit = { + contests.foreach(contest => { + try { + val result = Https.post[ScoreboardRefreshResponse]( + serviceCtx.config.grader.scoreboard_refresh.url, + Map( + "token" -> serviceCtx.config.grader.scoreboard_refresh.token, + "alias" -> contest + ), + runner = false ) - } else { - null - } + log.info("Scoreboard refresh {}", result) + } catch { + case e: Exception => log.error(e, "Scoreboard refresh") + } + }) } - - if (notifyList != null) - notifyList.foreach(_.send(message)) } - override def run(): Unit = { - while (true) { - try { - val elm = queue.take - if (elm == PoisonPill) { - log.info("Broadcaster thread finished normally") - return + private class BroadcastHandler extends Object with Runnable { + override def run(): Unit = { + while (true) { + try { + val elm = messageQueue.take + if (elm == MessagePoisonPill) { + log.info("Broadcaster thread finished normally") + return + } + runLoop(elm) + } catch { + case e: Exception => log.error(e, "Broadcast runLoop") } - runLoop(elm) - } catch { - case e: Exception => log.error(e, "runLoop") } } + + private def runLoop(m: QueuedMessage): Unit = { + val message = m.message + + val notifyList = subscriberLock.synchronized { + if (subscribers.contains(m.contest)) { + subscribers(m.contest) + .filter(subscriber => + ( + m.broadcast || + subscriber.admin || + m.targetUser == subscriber.user + ) && ( + !m.userOnly || + !subscriber.admin + ) + ) + } else { + null + } + } + + if (notifyList != null) + notifyList.foreach(_.send(message)) + } } class BroadcasterSession(val user: Int, val contest: String, val admin: Boolean, val session: Session) { def send(message: String): Unit = { if (!session.isOpen) return try { - session.getRemote.sendString(message) + session.getRemote.sendStringByFuture(message) } catch { case e: Exception => { log.error(e, "Failed to send a message") @@ -368,11 +401,16 @@ class Broadcaster(implicit var serviceCtx: Context) extends Object with override def stop(): Unit = { log.info("Broadcaster stopping") server.stop - queue.put(PoisonPill) + scoreboardQueue.synchronized { + scoreboardPoisonPill = true + scoreboardQueue.notify + } + messageQueue.put(MessagePoisonPill) } override def join(): Unit = { server.join + scoreboardThread.join broadcastThread.join log.info("Broadcaster stopped") }