diff --git a/akka-js-actor-typed/js/src/main/scala/akka/actor/typed/internal/DispatchersImpl.scala b/akka-js-actor-typed/js/src/main/scala/akka/actor/typed/internal/DispatchersImpl.scala deleted file mode 100644 index 3a1caf8..0000000 --- a/akka-js-actor-typed/js/src/main/scala/akka/actor/typed/internal/DispatchersImpl.scala +++ /dev/null @@ -1,241 +0,0 @@ -/** - * Copyright (C) 2016-2017 Lightbend Inc. - */ -package akka.actor.typed -package internal - -import scala.concurrent.ExecutionContextExecutor -// import java.util.concurrent.{ Executors, ExecutorService } -import akka.event.LoggingAdapter -import akka.{ actor ⇒ a, dispatch ⇒ d, event ⇒ e } -import java.util.concurrent.ConcurrentHashMap -import akka.ConfigurationException -import com.typesafe.config.{ Config, ConfigFactory } -import akka.dispatch.ExecutionContexts -import java.util.concurrent.ConcurrentSkipListSet -import java.util.Comparator - -class DispatchersImpl(settings: Settings, log: LoggingAdapter, prerequisites: d.DispatcherPrerequisites) extends Dispatchers { - - def lookup(selector: DispatcherSelector): ExecutionContextExecutor = - scala.scalajs.concurrent.JSExecutionContext.queue -/* selector match { - case DispatcherDefault(_) ⇒ defaultGlobalDispatcher - case DispatcherFromConfig(path, _) ⇒ lookup(path) - case DispatcherFromExecutor(ex: ExecutionContextExecutor, _) ⇒ ex - case DispatcherFromExecutor(ex, _) ⇒ d.ExecutionContexts.fromExecutor(ex) - case DispatcherFromExecutionContext(ec: ExecutionContextExecutor, _) ⇒ ec - case DispatcherFromExecutionContext(ec, _) ⇒ throw new UnsupportedOperationException("I thought all ExecutionContexts are also Executors?") // FIXME - } -*/ - def shutdown(): Unit = { - // val i = allCreatedServices.iterator() - // while (i.hasNext) i.next().shutdown() - // allCreatedServices.clear() - } - - import Dispatchers._ - - // val cachingConfig = new d.CachingConfig(settings.config) - - val defaultDispatcherConfig: Config = - idConfig(DefaultDispatcherId).withFallback(settings.config.getConfig(DefaultDispatcherId)) - - /** - * The one and only default dispatcher. - */ - def defaultGlobalDispatcher: ExecutionContextExecutor = lookup(DefaultDispatcherId) - - private val dispatcherConfigurators = new ConcurrentHashMap[String, MessageDispatcherConfigurator] - // private val allCreatedServices = new ConcurrentSkipListSet[ExecutorService](new Comparator[ExecutorService] { - // override def compare(left: ExecutorService, right: ExecutorService): Int = { - // val l = if (left == null) 0 else left.hashCode - // val r = if (right == null) 0 else right.hashCode - // if (l < r) -1 else if (l > r) 1 else 0 - // } - // }) - - /** - * Returns a dispatcher as specified in configuration. Please note that this - * method _may_ create and return a NEW dispatcher, _every_ call. - * - * Throws ConfigurationException if the specified dispatcher cannot be found in the configuration. - */ - def lookup(id: String): ExecutionContextExecutor = - lookupConfigurator(id).dispatcher() match { - // case es: ExecutorService ⇒ - // allCreatedServices.add(es) - // es - case ece ⇒ ece - } - - /** - * Checks that the configuration provides a section for the given dispatcher. - * This does not guarantee that no ConfigurationException will be thrown when - * using this dispatcher, because the details can only be checked by trying - * to instantiate it, which might be undesirable when just checking. - */ - def hasDispatcher(id: String): Boolean = dispatcherConfigurators.containsKey(id) // || cachingConfig.hasPath(id) - - private def lookupConfigurator(id: String): MessageDispatcherConfigurator = { - dispatcherConfigurators.get(id) match { - case null ⇒ - // It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup. - // That shouldn't happen often and in case it does the actual ExecutorService isn't - // created until used, i.e. cheap. - val newConfigurator = - new DispatcherConfigurator(config(id), prerequisites) - // if (cachingConfig.hasPath(id)) configuratorFrom(config(id)) - // else throw new ConfigurationException(s"Dispatcher [$id] not configured") - - dispatcherConfigurators.putIfAbsent(id, newConfigurator) match { - case null ⇒ newConfigurator - case existing ⇒ existing - } - - case existing ⇒ existing - } - } - - /** - * INTERNAL API - */ - private[akka] def config(id: String): Config = { - import scala.collection.JavaConverters._ - def simpleName = id.substring(id.lastIndexOf('.') + 1) - idConfig(id) - .withFallback(settings.config.getConfig(id)) - .withFallback( // ConfigFactory.parseMap(Map("name" → simpleName).asJava)) - ConfigFactory.parseString(s"{ name=$simpleName }") - ) - .withFallback(defaultDispatcherConfig) - } - - private def idConfig(id: String): Config = { - import scala.collection.JavaConverters._ - // ConfigFactory.parseMap(Map("id" → id).asJava) - ConfigFactory.parseString(s"{ id=$id }") - } - - /** - * INTERNAL API - * - * Creates a MessageDispatcherConfigurator from a Config. - * - * The Config must also contain a `id` property, which is the identifier of the dispatcher. - * - * Throws: IllegalArgumentException if the value of "type" is not valid - * IllegalArgumentException if it cannot create the MessageDispatcherConfigurator - */ - private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = { - if (!cfg.hasPath("id")) throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render) - - cfg.getString("type") match { - case "Dispatcher" ⇒ new DispatcherConfigurator(cfg, prerequisites) - // case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites) - case fqn ⇒ - val args = List(classOf[Config] → cfg) - prerequisites.dynamicAccess.createInstanceFor[DispatcherConfigurator](fqn, args).recover({ - case exception ⇒ - throw new ConfigurationException( - ("Cannot instantiate DispatcherConfigurator type [%s], defined in [%s], " + - "make sure it has constructor with [com.typesafe.config.Config] and " + - "[akka.dispatch.DispatcherPrerequisites] parameters") - .format(fqn, cfg.getString("id")), exception) - }).get - } - } -} - -/** - * Base class to be used for hooking in new dispatchers into Dispatchers. - */ -abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: d.DispatcherPrerequisites) { - - // val config: Config = new d.CachingConfig(_config) - - /** - * Returns an instance of MessageDispatcher given the configuration. - * Depending on the needs the implementation may return a new instance for - * each invocation or return the same instance every time. - */ - def dispatcher(): ExecutionContextExecutor - - def configureExecutor(): d.ExecutorServiceConfigurator = { - def configurator(executor: String): d.ExecutorServiceConfigurator = executor match { - case "event-loop-executor" ⇒ new d.EventLoopExecutorConfigurator(new Config, prerequisites) - } - configurator("event-loop-executor") - } -/* - def configureExecutor(): d.ExecutorServiceConfigurator = { - def configurator(executor: String): d.ExecutorServiceConfigurator = executor match { - case null | "" | "fork-join-executor" ⇒ new d.ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) - case "thread-pool-executor" ⇒ new d.ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) - case fqcn ⇒ - val args = List( - classOf[Config] → config, - classOf[d.DispatcherPrerequisites] → prerequisites) - prerequisites.dynamicAccess.createInstanceFor[d.ExecutorServiceConfigurator](fqcn, args).recover({ - case exception ⇒ throw new IllegalArgumentException( - ("Cannot instantiate ExecutorServiceConfigurator (\"executor = [%s]\"), defined in [%s], " + - "make sure it has an accessible constructor with a [%s,%s] signature") - .format(fqcn, config.getString("id"), classOf[Config], classOf[d.DispatcherPrerequisites]), exception) - }).get - } - - config.getString("executor") match { - case "default-executor" ⇒ new d.DefaultExecutorServiceConfigurator(config.getConfig("default-executor"), prerequisites, configurator(config.getString("default-executor.fallback"))) - case other ⇒ configurator(other) - } - } -*/ -} - -/** - * Configurator for creating [[akka.dispatch.Dispatcher]]. - * Returns the same dispatcher instance for for each invocation - * of the `dispatcher()` method. - */ -class DispatcherConfigurator(config: Config, prerequisites: d.DispatcherPrerequisites) - extends MessageDispatcherConfigurator(config, prerequisites) { - - private val instance = ExecutionContexts.fromExecutorService( - configureExecutor().createExecutorServiceFactory(config.getString("id")/*, prerequisites.threadFactory*/) - .createExecutorService) - - /** - * Returns the same dispatcher instance for each invocation - */ - override def dispatcher(): ExecutionContextExecutor = instance -} -/* -/** - * Configurator for creating [[akka.dispatch.PinnedDispatcher]]. - * Returns new dispatcher instance for for each invocation - * of the `dispatcher()` method. - */ -class PinnedDispatcherConfigurator(config: Config, prerequisites: d.DispatcherPrerequisites) - extends MessageDispatcherConfigurator(config, prerequisites) { - - private val threadPoolConfig: d.ThreadPoolConfig = configureExecutor() match { - case e: d.ThreadPoolExecutorConfigurator ⇒ e.threadPoolConfig - case other ⇒ - prerequisites.eventStream.publish( - e.Logging.Warning( - "PinnedDispatcherConfigurator", - this.getClass, - "PinnedDispatcher [%s] not configured to use ThreadPoolExecutor, falling back to default config.".format( - config.getString("id")))) - d.ThreadPoolConfig() - } - - private val factory = threadPoolConfig.createExecutorServiceFactory(config.getString("id"), prerequisites.threadFactory) - - /** - * Creates new dispatcher for each invocation. - */ - override def dispatcher(): ExecutionContextExecutor = ExecutionContexts.fromExecutorService(factory.createExecutorService) - -} -*/ diff --git a/akka-js-actor-typed/js/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala b/akka-js-actor-typed/js/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala new file mode 100644 index 0000000..d7690d5 --- /dev/null +++ b/akka-js-actor-typed/js/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala @@ -0,0 +1,236 @@ +/* + * Copyright (C) 2009-2021 Lightbend Inc. + */ + +package akka.actor.typed.internal + +import scala.reflect.ClassTag + +import org.slf4j.LoggerFactory +import org.slf4j.event.Level + +import akka.actor.typed +import akka.actor.typed._ +import akka.actor.typed.LogOptions +import akka.actor.typed.scaladsl.Behaviors +import akka.annotation.InternalApi +import akka.util.LineNumbers + +/** + * Provides the impl of any behavior that could nest another behavior + * + * INTERNAL API + */ +@InternalApi +private[akka] object InterceptorImpl { + + def apply[O, I](interceptor: () => BehaviorInterceptor[O, I], nestedBehavior: Behavior[I]): Behavior[O] = { + BehaviorImpl.DeferredBehavior[O] { ctx => + val interceptorBehavior = new InterceptorImpl[O, I](interceptor(), nestedBehavior) + interceptorBehavior.preStart(ctx) + } + } +} + +/** + * Provides the impl of any behavior that could nest another behavior + * + * INTERNAL API + */ +@InternalApi +private[akka] final class InterceptorImpl[O, I]( + val interceptor: BehaviorInterceptor[O, I], + val nestedBehavior: Behavior[I]) + extends ExtensibleBehavior[O] { + + import BehaviorInterceptor._ + + private val preStartTarget: PreStartTarget[I] = new PreStartTarget[I] { + override def start(ctx: TypedActorContext[_]): Behavior[I] = { + Behavior.start[I](nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]]) + } + override def toString: String = s"PreStartTarget($nestedBehavior)" + } + + private val receiveTarget: ReceiveTarget[I] = new ReceiveTarget[I] { + override def apply(ctx: TypedActorContext[_], msg: I): Behavior[I] = + Behavior.interpretMessage(nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]], msg) + + override def signalRestart(ctx: TypedActorContext[_]): Unit = + Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]], PreRestart) + + override def toString: String = s"ReceiveTarget($nestedBehavior)" + } + + private val signalTarget = new SignalTarget[I] { + override def apply(ctx: TypedActorContext[_], signal: Signal): Behavior[I] = + Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]], signal) + override def toString: String = s"SignalTarget($nestedBehavior)" + } + + // invoked pre-start to start/de-duplicate the initial behavior stack + def preStart(ctx: typed.TypedActorContext[O]): Behavior[O] = { + val started = interceptor.aroundStart(ctx, preStartTarget) + deduplicate(started, ctx) + } + + def replaceNested(newNested: Behavior[I]): Behavior[O] = + new InterceptorImpl(interceptor, newNested) + + override def receive(ctx: typed.TypedActorContext[O], msg: O): Behavior[O] = { + // TODO performance optimization could maybe to avoid isAssignableFrom if interceptMessageClass is Class[Object]? + val interceptMessageClass = interceptor.interceptMessageClass + val result = + if ((interceptMessageClass ne null) && interceptor.interceptMessageClass.isInstance(msg)) + interceptor.aroundReceive(ctx, msg, receiveTarget) + else + receiveTarget.apply(ctx, msg.asInstanceOf[I]) + deduplicate(result, ctx) + } + + override def receiveSignal(ctx: typed.TypedActorContext[O], signal: Signal): Behavior[O] = { + val interceptedResult = interceptor.aroundSignal(ctx, signal, signalTarget) + deduplicate(interceptedResult, ctx) + } + + private def deduplicate(interceptedResult: Behavior[I], ctx: TypedActorContext[O]): Behavior[O] = { + val started = Behavior.start(interceptedResult, ctx.asInstanceOf[TypedActorContext[I]]) + if (started == BehaviorImpl.UnhandledBehavior || started == BehaviorImpl.SameBehavior || !Behavior.isAlive(started)) { + started.unsafeCast[O] + } else { + // returned behavior could be nested in setups, so we need to start before we deduplicate + val duplicateInterceptExists = Behavior.existsInStack(started) { + case i: InterceptorImpl[O, I] + if interceptor.isSame(i.interceptor.asInstanceOf[BehaviorInterceptor[Any, Any]]) => + true + case _ => false + } + + if (duplicateInterceptExists) started.unsafeCast[O] + else new InterceptorImpl[O, I](interceptor, started) + } + } + + override def toString(): String = s"Interceptor($interceptor, $nestedBehavior)" +} + +/** + * Fire off any incoming message to another actor before receiving it ourselves. + * + * INTERNAL API + */ +@InternalApi +private[akka] final case class MonitorInterceptor[T: ClassTag](actorRef: ActorRef[T]) + extends BehaviorInterceptor[T, T] { + import BehaviorInterceptor._ + + override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = { + actorRef ! msg + target(ctx, msg) + } + + // only once to the same actor in the same behavior stack + override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = other match { + case MonitorInterceptor(`actorRef`) => true + case _ => false + } + +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] object LogMessagesInterceptor { + def apply[T](opts: LogOptions): BehaviorInterceptor[T, T] = { + new LogMessagesInterceptor(opts).asInstanceOf[BehaviorInterceptor[T, T]] + } + + private val LogMessageTemplate = "actor [{}] received message: {}" + private val LogSignalTemplate = "actor [{}] received signal: {}" +} + +/** + * Log all messages for this decorated ReceiveTarget[T] to logger before receiving it ourselves. + * + * INTERNAL API + */ +@InternalApi +private[akka] final class LogMessagesInterceptor(val opts: LogOptions) extends BehaviorInterceptor[Any, Any] { + + import BehaviorInterceptor._ + import LogMessagesInterceptor._ + + private val logger = opts.getLogger.orElse(LoggerFactory.getLogger(getClass)) + + override def aroundReceive(ctx: TypedActorContext[Any], msg: Any, target: ReceiveTarget[Any]): Behavior[Any] = { + log(LogMessageTemplate, msg, ctx) + target(ctx, msg) + } + + override def aroundSignal(ctx: TypedActorContext[Any], signal: Signal, target: SignalTarget[Any]): Behavior[Any] = { + log(LogSignalTemplate, signal, ctx) + target(ctx, signal) + } + + private def log(template: String, messageOrSignal: Any, context: TypedActorContext[Any]): Unit = { + if (opts.enabled) { + val selfPath = context.asScala.self.path + opts.level match { + case Level.ERROR => logger.error(template, selfPath, messageOrSignal) + case Level.WARN => logger.warn(template, selfPath, messageOrSignal) + case Level.INFO => logger.info(template, selfPath, messageOrSignal) + case Level.DEBUG => logger.debug(template, selfPath, messageOrSignal) + case Level.TRACE => logger.trace(template, selfPath, messageOrSignal) + case other => throw new IllegalArgumentException(s"Unknown log level [$other].") + } + } + } + + // only once in the same behavior stack + override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = other match { + case a: LogMessagesInterceptor => a.opts == opts + case _ => false + } +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object TransformMessagesInterceptor { + + private final val _notMatchIndicator: Any = new AnyRef + private final val _any2NotMatchIndicator = (_: Any) => _notMatchIndicator + private final def any2NotMatchIndicator[T] = _any2NotMatchIndicator.asInstanceOf[Any => T] +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final case class TransformMessagesInterceptor[O: ClassTag, I](matcher: PartialFunction[O, I]) + extends BehaviorInterceptor[O, I] { + import BehaviorInterceptor._ + import TransformMessagesInterceptor._ + + override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = other match { + // If they use the same pf instance we can allow it, to have one way to workaround defining + // "recursive" narrowed behaviors. + case TransformMessagesInterceptor(`matcher`) => true + case TransformMessagesInterceptor(otherMatcher) => + // there is no safe way to allow this + throw new IllegalStateException( + "transformMessages can only be used one time in the same behavior stack. " + + s"One defined in ${LineNumbers(matcher)}, and another in ${LineNumbers(otherMatcher)}") + case _ => false + } + + def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[I]): Behavior[I] = { + matcher.applyOrElse(msg, any2NotMatchIndicator) match { + case result if _notMatchIndicator == result => Behaviors.unhandled + case transformed => target(ctx, transformed) + } + } + + override def toString: String = s"TransformMessages(${LineNumbers(matcher)})" +} diff --git a/akka-js-actor/js/src/main/resources/application.conf b/akka-js-actor/js/src/main/resources/application.conf index 16123ee..04a23e3 100644 --- a/akka-js-actor/js/src/main/resources/application.conf +++ b/akka-js-actor/js/src/main/resources/application.conf @@ -61,7 +61,6 @@ akka { tail-chopping-router { interval = 10 milliseconds } - routees { paths = [] } @@ -75,7 +74,6 @@ akka { backoff-rate = 0.1. messages-per-resize = 10 } - optimal-size-exploring-resizer { enabled = off lower-bound = 1 @@ -93,10 +91,24 @@ akka { } default-dispatcher { + # Must be one of the following + # Dispatcher or a FQCN to a class inheriting + # MessageDispatcherConfigurator with a public constructor with + # both com.typesafe.config.Config parameter and + # akka.dispatch.DispatcherPrerequisites parameters. type = "Dispatcher" + + # Which kind of ExecutorService to use for this dispatcher + # Valid options: + # - "default-executor" requires a "default-executor" section + # - "macrotask-executor" + # - "event-loop-executor" + # - "queue-executor" + # - A FQCN of a class extending ExecutorServiceConfigurator executor = "default-executor" + default-executor { - fallback = "fork-join-executor" + fallback = "macrotask-executor" } fork-join-executor { parallelism-min = 8 @@ -124,6 +136,14 @@ akka { mailbox-requirement = "" } + internal-dispatcher { + type = "Dispatcher" + executor = "macrotask-executor" + shutdown-timeout = 1s + throughput = 5 + throughput-deadline-time = 0ms + } + default-blocking-io-dispatcher { type = "Dispatcher" executor = "thread-pool-executor" @@ -330,8 +350,8 @@ akka { creation-timeout = 20s initial-input-buffer-size = 4 max-input-buffer-size = 16 + dispatcher = "akka.actor.default-dispatcher" blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher" - dispatcher = "" subscription-timeout { mode = cancel timeout = 5s diff --git a/akka-js-actor/js/src/main/scala/akka/actor/ReflectiveDynamicAccess.scala b/akka-js-actor/js/src/main/scala/akka/actor/ReflectiveDynamicAccess.scala index f447ca6..9b7c49b 100644 --- a/akka-js-actor/js/src/main/scala/akka/actor/ReflectiveDynamicAccess.scala +++ b/akka-js-actor/js/src/main/scala/akka/actor/ReflectiveDynamicAccess.scala @@ -17,7 +17,7 @@ class JSDynamicAccess(val classLoader: ClassLoader) extends DynamicAccess { def getRuntimeClass[A](name: String): InstantiatableClass = { Reflect.lookupInstantiatableClass(name).getOrElse { - throw new InstantiationError(s"JSDynamicAccess $name is not js instantiable class") + throw new ClassNotFoundException(s"Class [$name] does not exist or is not a JS instantiable class") } } @@ -27,10 +27,11 @@ class JSDynamicAccess(val classLoader: ClassLoader) extends DynamicAccess { dyn.declaredConstructors.find(_.parameterTypes == constructorClasses).map{ ctor => ctor.newInstance(args.map(_._2): _*).asInstanceOf[A] }.getOrElse{ - throw new InstantiationError("error trying to get instance for " + dyn.runtimeClass.getName + "\n" + dyn.toString) + throw new InstantiationError("Error trying to get instance for " + dyn.runtimeClass.getName + "\n" + dyn.toString) } } catch { - case err: Exception => err.printStackTrace() + case err: Exception => + err.printStackTrace() throw err } } diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/AbstractBoundedNodeQueue.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/AbstractBoundedNodeQueue.scala index 34b4039..cb302e3 100644 --- a/akka-js-actor/js/src/main/scala/akka/dispatch/AbstractBoundedNodeQueue.scala +++ b/akka-js-actor/js/src/main/scala/akka/dispatch/AbstractBoundedNodeQueue.scala @@ -6,5 +6,16 @@ package akka.dispatch import java.util.ArrayDeque -class AbstractBoundedNodeQueue[T](capacity: Int) - extends ArrayDeque[T](capacity) {} +class AbstractBoundedNodeQueue[T](capacity: Int) extends ArrayDeque[T](capacity) { + override def offerFirst(e: T): Boolean = { + if (size == capacity) false + else super.offerFirst(e) + } + + override def offerLast(e: T): Boolean = { + if (size == capacity) false + else super.offerLast(e) + } + + override def add(e: T): Boolean = offerLast(e) +} diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 0cf4c61..6c62a27 100644 --- a/akka-js-actor/js/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-js-actor/js/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -4,11 +4,16 @@ package akka.dispatch -import akka.event.Logging.{ Debug, Error, LogEventException } +import akka.event.Logging.{Debug, Error, LogEventException} import akka.actor._ +import akka.annotation.InternalStableApi import akka.dispatch.sysmsg._ -import akka.event.{ BusLogging, EventStream } - /**import com.typesafe.config.{ ConfigFactory, Config } */ +import akka.event.{BusLogging, EventStream} +import akka.util.unused + +import java.util +import java.util.concurrent.{AbstractExecutorService, Callable, ExecutorService, Future, TimeUnit} +/**import com.typesafe.config.{ ConfigFactory, Config } */ import akka.util.{ /** @note IMPLEMENT IN SCALA.JS Unsafe,*/ Index } import akka.event.EventStream import com.typesafe.config.Config @@ -20,51 +25,57 @@ import scala.concurrent.duration.FiniteDuration import scala.scalajs.js.timers._ import scala.util.control.NonFatal import scala.util.Try -import java.{ util ⇒ ju } +import java.{ util => ju } import java.util.concurrent.Executor -final case class Envelope private (val message: Any, val sender: ActorRef) +final case class Envelope private (message: Any, sender: ActorRef) { + + def copy(message: Any = message, sender: ActorRef = sender) = { + Envelope(message, sender) + } +} object Envelope { def apply(message: Any, sender: ActorRef, system: ActorSystem): Envelope = { - if (message == null) throw new InvalidMessageException("Message is null") + if (message == null) { + if (sender eq Actor.noSender) + throw InvalidMessageException(s"Message is null.") + else + throw InvalidMessageException(s"Message sent from [$sender] is null.") + } new Envelope(message, if (sender ne Actor.noSender) sender else system.deadLetters) } } -final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () ⇒ Unit) extends Runnable /** @note IMPLEMENT IN SCALA.JS extends Batchable */ { - final def isBatchable: Boolean = runnable match { - case _ ⇒ false - } +final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () => Unit) extends Runnable /** @note IMPLEMENT IN SCALA.JS extends Batchable */ { + final def isBatchable: Boolean = false /** @note IMPLEMENT IN SCALA.JS akka.dispatch.internal.ScalaBatchable.isBatchable(runnable) */ def run(): Unit = try runnable.run() catch { - case NonFatal(e) ⇒ eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage)) + case NonFatal(e) => eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage)) } finally cleanup() } - - /** * INTERNAL API */ - private[akka] trait LoadMetrics { self: Executor ⇒ - def atFullThrottle(): Boolean - } +private[akka] trait LoadMetrics { self: Executor => + def atFullThrottle(): Boolean +} /** * INTERNAL API */ private[akka] object MessageDispatcher { - val UNSCHEDULED = 0 //WARNING DO NOT CHANGE THE VALUE OF THIS: It relies on the faster init of 0 in AbstractMessageDispatcher + val UNSCHEDULED = 0 //WARNING DO NOT CHANGE THE VALUE OF THIS: It relies on the faster init of 0 in AbstractMessageDispatcher val SCHEDULED = 1 - val RESCHEDULED = 2 + val RESCHEDULED = 2 - // dispatcher debugging helper using println (see below) - // since this is a compile-time constant, scalac will elide code behind if (MessageDispatcher.debug) (RK checked with 2.9.1) + // dispatcher debugging helper using println (see below) + // since this is a compile-time constant, scalac will elide code behind if (MessageDispatcher.debug) (RK checked with 2.9.1) final val debug = false // Deliberately without type ascription to make it a compile-time constant - lazy val actors = new Index[MessageDispatcher, ActorRef](16, new ju.Comparator[ActorRef] { + lazy val actors = new Index[MessageDispatcher, ActorRef](16, new ju.Comparator[ActorRef] { override def compare(a: ActorRef, b: ActorRef): Int = a.compareTo(b) }) def printActors(): Unit = @@ -75,12 +86,12 @@ private[akka] object MessageDispatcher { } { val status = if (a.isTerminated) " (terminated)" else " (alive)" val messages = a match { - case r: ActorRefWithCell ⇒ " " + r.underlying.numberOfMessages + " messages" - case _ ⇒ " " + a.getClass + case r: ActorRefWithCell => " " + r.underlying.numberOfMessages + " messages" + case _ => " " + a.getClass } val parent = a match { - case i: InternalActorRef ⇒ ", parent: " + i.getParent - case _ ⇒ "" + case i: InternalActorRef => ", parent: " + i.getParent + case _ => "" } println(" -> " + a + status + messages + parent) } @@ -89,249 +100,262 @@ private[akka] object MessageDispatcher { abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator) extends /** @note IMPLEMENT IN SCALA.JS AbstractMessageDispatcher with BatchingExecutor with */ ExecutionContextExecutor { - import MessageDispatcher._ - /** @note IMPLEMENT IN SCALA.JS + import MessageDispatcher._ + /** @note IMPLEMENT IN SCALA.JS import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset } - */ - import configurator.prerequisites - - val mailboxes = prerequisites.mailboxes - val eventStream = prerequisites.eventStream - - @volatile private[this] var _inhabitantsDoNotCallMeDirectly: Long = _ // DO NOT TOUCH! - @volatile private[this] var _shutdownScheduleDoNotCallMeDirectly: Int = _ // DO NOT TOUCH! + */ + import configurator.prerequisites - /** @note IMPLEMENT IN SCALA.JS @tailrec */ private final def addInhabitants(add: Long): Long = { - val c = inhabitants - val r = c + add + val mailboxes = prerequisites.mailboxes + val eventStream = prerequisites.eventStream - if (r < 0) { - // We haven't succeeded in decreasing the inhabitants yet but the simple fact that we're trying to - // go below zero means that there is an imbalance and we might as well throw the exception - val e = new IllegalStateException("ACTOR SYSTEM CORRUPTED!!! A dispatcher can't have less than 0 inhabitants!") - reportFailure(e) - throw e - } + @volatile private[this] var _inhabitantsDoNotCallMeDirectly: Long = _ // DO NOT TOUCH! + @volatile private[this] var _shutdownScheduleDoNotCallMeDirectly: Int = _ // DO NOT TOUCH! + private final def addInhabitants(add: Long): Long = { /** @note IMPLEMENT IN SCALA.JS - if (Unsafe.instance.compareAndSwapLong(this, inhabitantsOffset, c, r)) r else addInhabitants(add) - */ - _inhabitantsDoNotCallMeDirectly = r - r - } + val old = Unsafe.instance.getAndAddLong(this, inhabitantsOffset, add) + val ret = old + add + */ + _inhabitantsDoNotCallMeDirectly += add + if (_inhabitantsDoNotCallMeDirectly < 0) { + // We haven't succeeded in decreasing the inhabitants yet but the simple fact that we're trying to + // go below zero means that there is an imbalance and we might as well throw the exception + val e = new IllegalStateException("ACTOR SYSTEM CORRUPTED!!! A dispatcher can't have less than 0 inhabitants!") + reportFailure(e) + throw e + } + _inhabitantsDoNotCallMeDirectly + } /** @note IMPLEMENT IN SCALA.JS final def inhabitants: Long = Unsafe.instance.getLongVolatile(this, inhabitantsOffset) - */ + */ final def inhabitants: Long = _inhabitantsDoNotCallMeDirectly /** @note IMPLEMENT IN SCALA.JS private final def shutdownSchedule: Int = Unsafe.instance.getIntVolatile(this, shutdownScheduleOffset) private final def updateShutdownSchedule(expect: Int, update: Int): Boolean = Unsafe.instance.compareAndSwapInt(this, shutdownScheduleOffset, expect, update) - */ + */ private final def shutdownSchedule: Int = _shutdownScheduleDoNotCallMeDirectly private final def updateShutdownSchedule(expect: Int, update: Int): Boolean = { _shutdownScheduleDoNotCallMeDirectly = update true } - /** - * Creates and returns a mailbox for the given actor. - */ - protected[akka] def createMailbox(actor: Cell, mailboxType: MailboxType): Mailbox - - /** - * Identifier of this dispatcher, corresponds to the full key - * of the dispatcher configuration. - */ - def id: String - - /** - * Attaches the specified actor instance to this dispatcher, which includes - * scheduling it to run for the first time (Create() is expected to have - * been enqueued by the ActorCell upon mailbox creation). - */ - final def attach(actor: ActorCell): Unit = { - register(actor) - registerForExecution(actor.mailbox, false, true) - } - - /** - * Detaches the specified actor instance from this dispatcher - */ - final def detach(actor: ActorCell): Unit = try unregister(actor) finally ifSensibleToDoSoThenScheduleShutdown() + /** + * Creates and returns a mailbox for the given actor. + */ + protected[akka] def createMailbox(actor: Cell, mailboxType: MailboxType): Mailbox + + /** + * Identifier of this dispatcher, corresponds to the full key + * of the dispatcher configuration. + */ + def id: String + + /** + * Attaches the specified actor instance to this dispatcher, which includes + * scheduling it to run for the first time (Create() is expected to have + * been enqueued by the ActorCell upon mailbox creation). + */ + final def attach(actor: ActorCell): Unit = { + register(actor) + registerForExecution(actor.mailbox, false, true) + } + + /** + * Detaches the specified actor instance from this dispatcher + */ + final def detach(actor: ActorCell): Unit = + try unregister(actor) + finally ifSensibleToDoSoThenScheduleShutdown() + + /** @note IMPLEMENT IN SCALA.JS + final protected def resubmitOnBlock: Boolean = true // We want to avoid starvation + */ /** THIS COMES FROM BatchingExecutor */ override def execute(runnable: Runnable): Unit = unbatchedExecute(runnable) /** END */ - final /** @note IMPLEMENT IN SCALA.JS override */ protected def unbatchedExecute(r: Runnable): Unit = { - val invocation = TaskInvocation(eventStream, r, taskCleanup) - addInhabitants(+1) - try { - executeTask(invocation) - } catch { - case t: Throwable ⇒ - addInhabitants(-1) - throw t - } - } - - override def reportFailure(t: Throwable): Unit = t match { - /** @note IMPLEMENT IN SCALA.JS case e: LogEventException ⇒ eventStream.publish(e.event) */ - case _ ⇒ eventStream.publish(Error(t, getClass.getName, getClass, t.getMessage)) - } - - @tailrec - private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = { - if (inhabitants <= 0) shutdownSchedule match { - case UNSCHEDULED ⇒ - if (updateShutdownSchedule(UNSCHEDULED, SCHEDULED)) scheduleShutdownAction() - else ifSensibleToDoSoThenScheduleShutdown() - case SCHEDULED ⇒ - if (updateShutdownSchedule(SCHEDULED, RESCHEDULED)) () - else ifSensibleToDoSoThenScheduleShutdown() - case RESCHEDULED ⇒ - } - } - - private def scheduleShutdownAction(): Unit = { - // IllegalStateException is thrown if scheduler has been shutdown - try prerequisites.scheduler.scheduleOnce(shutdownTimeout, shutdownAction)(new ExecutionContext { - override def execute(runnable: Runnable): Unit = scalajs.js.timers.setTimeout(0) { runnable.run() } // @note IMPLEMENT IN SCALA.JS runnable.run() - override def reportFailure(t: Throwable): Unit = MessageDispatcher.this.reportFailure(t) - }) catch { - case _: IllegalStateException ⇒ shutdown() - } - } - - private final val taskCleanup: () ⇒ Unit = () ⇒ if (addInhabitants(-1) == 0) ifSensibleToDoSoThenScheduleShutdown() - - /** - * If you override it, you must call it. But only ever once. See "attach" for only invocation. - * - * INTERNAL API - */ - protected[akka] def register(actor: ActorCell) { - /**@note IMPLEMENT IN SCALA.JS if (debug) actors.put(this, actor.self) */ - addInhabitants(+1) + final /** @note IMPLEMENT IN SCALA.JS override */ protected def unbatchedExecute(r: Runnable): Unit = { + val invocation = TaskInvocation(eventStream, r, taskCleanup) + addInhabitants(+1) + try { + executeTask(invocation) + } catch { + case t: Throwable => + addInhabitants(-1) + throw t + } + } + + override def reportFailure(t: Throwable): Unit = t match { + /** @note IMPLEMENT IN SCALA.JS case e: LogEventException => eventStream.publish(e.event) */ + case _ => eventStream.publish(Error(t, getClass.getName, getClass, t.getMessage)) + } + + @tailrec + private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = { + if (inhabitants <= 0) shutdownSchedule match { + case UNSCHEDULED => + if (updateShutdownSchedule(UNSCHEDULED, SCHEDULED)) scheduleShutdownAction() + else ifSensibleToDoSoThenScheduleShutdown() + case SCHEDULED => + if (updateShutdownSchedule(SCHEDULED, RESCHEDULED)) () + else ifSensibleToDoSoThenScheduleShutdown() + case RESCHEDULED => + case unexpected => + throw new IllegalArgumentException(s"Unexpected actor class marker: $unexpected") // will not happen, for exhaustiveness check + } + } + + private def scheduleShutdownAction(): Unit = { + // IllegalStateException is thrown if scheduler has been shutdown + try prerequisites.scheduler.scheduleOnce(shutdownTimeout, shutdownAction)(new ExecutionContext { + override def execute(runnable: Runnable): Unit = runnable.run() + override def reportFailure(t: Throwable): Unit = MessageDispatcher.this.reportFailure(t) + }) catch { + case _: IllegalStateException => + shutdown() + // Since there is no scheduler anymore, restore the state to UNSCHEDULED. + // When this dispatcher is used again, + // shutdown is only attempted if the state is UNSCHEDULED + // (as per ifSensibleToDoSoThenScheduleShutdown above) + updateShutdownSchedule(SCHEDULED, UNSCHEDULED) + } + } + + private final val taskCleanup: () => Unit = () => if (addInhabitants(-1) == 0) ifSensibleToDoSoThenScheduleShutdown() + + /** + * If you override it, you must call it. But only ever once. See "attach" for only invocation. + * + * INTERNAL API + */ + protected[akka] def register(actor: ActorCell) { + /**@note IMPLEMENT IN SCALA.JS if (debug) actors.put(this, actor.self) */ + addInhabitants(+1) + } + + /** + * If you override it, you must call it. But only ever once. See "detach" for the only invocation + * + * INTERNAL API + */ + protected[akka] def unregister(actor: ActorCell) { + /**@note IMPLEMENT IN SCALA.JS if (debug) actors.remove(this, actor.self) */ + addInhabitants(-1) + val mailBox = actor.swapMailbox(mailboxes.deadLetterMailbox) + mailBox.becomeClosed() + mailBox.cleanUp() } - /** - * If you override it, you must call it. But only ever once. See "detach" for the only invocation - * - * INTERNAL API - */ - protected[akka] def unregister(actor: ActorCell) { - /**@note IMPLEMENT IN SCALA.JS if (debug) actors.remove(this, actor.self) */ - addInhabitants(-1) - val mailBox = actor.swapMailbox(mailboxes.deadLetterMailbox) - mailBox.becomeClosed() - mailBox.cleanUp() - } - - private val shutdownAction = new Runnable { - @tailrec - final def run() { - shutdownSchedule match { - case SCHEDULED ⇒ - try { - if (inhabitants == 0) shutdown() //Warning, racy - } finally { - while (!updateShutdownSchedule(shutdownSchedule, UNSCHEDULED)) {} - } - case RESCHEDULED ⇒ - if (updateShutdownSchedule(RESCHEDULED, SCHEDULED)) scheduleShutdownAction() - else run() - case UNSCHEDULED ⇒ + private val shutdownAction = new Runnable { + @tailrec + final def run() { + shutdownSchedule match { + case SCHEDULED => + try { + if (inhabitants == 0) shutdown() //Warning, racy + } finally { + while (!updateShutdownSchedule(shutdownSchedule, UNSCHEDULED)) {} + } + case RESCHEDULED => + if (updateShutdownSchedule(RESCHEDULED, SCHEDULED)) scheduleShutdownAction() + else run() + case UNSCHEDULED => + case unexpected => + throw new IllegalArgumentException(s"Unexpected actor class marker: $unexpected") // will not happen, for exhaustiveness check } - } - } - - /** - * When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, - * defaulting to your akka configs "akka.actor.default-dispatcher.shutdown-timeout" or default specified in - * reference.conf - * - * INTERNAL API - */ - protected[akka] def shutdownTimeout: FiniteDuration - - /** - * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference - */ - protected[akka] def suspend(actor: ActorCell): Unit = { - val mbox = actor.mailbox - if ((mbox.actor eq actor) && (mbox.dispatcher eq this)) - mbox.suspend() - } - - /* - * After the call to this method, the dispatcher must begin any new message processing for the specified reference - */ - protected[akka] def resume(actor: ActorCell): Unit = { - val mbox = actor.mailbox - if ((mbox.actor eq actor) && (mbox.dispatcher eq this) && mbox.resume()) - registerForExecution(mbox, false, false) - } - - /** - * Will be called when the dispatcher is to queue an invocation for execution - * - * INTERNAL API - */ - protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage) - - /** - * Will be called when the dispatcher is to queue an invocation for execution - * - * INTERNAL API - */ - protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) - - /** - * Suggest to register the provided mailbox for execution - * - * INTERNAL API - */ - protected[akka] def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean - - // TODO check whether this should not actually be a property of the mailbox - /** - * INTERNAL API - */ - protected[akka] def throughput: Int - - /** - * INTERNAL API - */ - protected[akka] def throughputDeadlineTime: Duration - - /** - * INTERNAL API - */ - @inline protected[akka] final val isThroughputDeadlineTimeDefined = throughputDeadlineTime.toMillis > 0 - - /** - * INTERNAL API - */ + } + } + + /** + * When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, + * defaulting to your akka configs "akka.actor.default-dispatcher.shutdown-timeout" or default specified in + * reference.conf + * + * INTERNAL API + */ + protected[akka] def shutdownTimeout: FiniteDuration + + /** + * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference + */ + protected[akka] def suspend(actor: ActorCell): Unit = { + val mbox = actor.mailbox + if ((mbox.actor eq actor) && (mbox.dispatcher eq this)) + mbox.suspend() + } + + /* + * After the call to this method, the dispatcher must begin any new message processing for the specified reference + */ + protected[akka] def resume(actor: ActorCell): Unit = { + val mbox = actor.mailbox + if ((mbox.actor eq actor) && (mbox.dispatcher eq this) && mbox.resume()) + registerForExecution(mbox, false, false) + } + + /** + * Will be called when the dispatcher is to queue an invocation for execution + * + * INTERNAL API + */ + protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage) + + /** + * Will be called when the dispatcher is to queue an invocation for execution + * + * INTERNAL API + */ + protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) + + /** + * Suggest to register the provided mailbox for execution + * + * INTERNAL API + */ + protected[akka] def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean + + // TODO check whether this should not actually be a property of the mailbox + /** + * INTERNAL API + */ + protected[akka] def throughput: Int + + /** + * INTERNAL API + */ + protected[akka] def throughputDeadlineTime: Duration + + /** + * INTERNAL API + */ + @inline protected[akka] final val isThroughputDeadlineTimeDefined = throughputDeadlineTime.toMillis > 0 + + /** + * INTERNAL API + */ protected[akka] def executeTask(invocation: TaskInvocation) - /** - * Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached - * Must be idempotent - * - * INTERNAL API - */ - protected[akka] def shutdown(): Unit + /** + * Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached + * Must be idempotent + * + * INTERNAL API + */ + @InternalStableApi + protected[akka] def shutdown(): Unit } - /** * An ExecutorServiceConfigurator is a class that given some prerequisites and a configuration can create instances of ExecutorService */ -abstract class ExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceFactoryProvider - +abstract class ExecutorServiceConfigurator(@unused config: Config, @unused prerequisites: DispatcherPrerequisites) + extends ExecutorServiceFactoryProvider /** * Base class to be used for hooking in new dispatchers into Dispatchers. @@ -352,8 +376,56 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: def configureExecutor(): ExecutorServiceConfigurator = { def configurator(executor: String): ExecutorServiceConfigurator = executor match { - case "event-loop-executor" ⇒ new EventLoopExecutorConfigurator(new Config, prerequisites) + case null | "" | "macrotask-executor" => new MacrotaskExecutorConfigurator(new Config, prerequisites) + case "event-loop-executor" => new EventLoopExecutorConfigurator(new Config, prerequisites) + case "queue-executor" => new QueueExecutorConfigurator(new Config, prerequisites) + case fqcn => + val args = List(classOf[Config] -> config, classOf[DispatcherPrerequisites] -> prerequisites) + prerequisites.dynamicAccess + .createInstanceFor[ExecutorServiceConfigurator](fqcn, args) + .recover { + case exception => + throw new IllegalArgumentException( + ("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s], + make sure it has an accessible constructor with a [%s,%s] signature""") + .format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), + exception) + } + .get + } + + config.getString("executor") match { + case "default-executor" => + new DefaultExecutorServiceConfigurator( + config.getConfig("default-executor"), + prerequisites, + configurator(config.getString("default-executor.fallback"))) + case other => configurator(other) } - configurator("event-loop-executor") } } + +class DefaultExecutorServiceConfigurator( + config: Config, + prerequisites: DispatcherPrerequisites, + fallback: ExecutorServiceConfigurator) + extends ExecutorServiceConfigurator(config, prerequisites) { + val provider: ExecutorServiceFactoryProvider = + prerequisites.defaultExecutionContext match { + case Some(ec) => + prerequisites.eventStream.publish( + Debug( + "DefaultExecutorServiceConfigurator", + this.getClass, + s"Using passed in ExecutionContext as default executor for this ActorSystem. If you want to use a different executor, please specify one in akka.actor.default-dispatcher.default-executor.")) + + new ExecutionContextExecutorServiceDelegate(ec) with ExecutorServiceFactory with ExecutorServiceFactoryProvider { + def createExecutorServiceFactory(id: String): ExecutorServiceFactory = this + def createExecutorService: ExecutorService = this + } + case None => fallback + } + + def createExecutorServiceFactory(id: String): ExecutorServiceFactory = + provider.createExecutorServiceFactory(id) +} diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/Dispatcher.scala index 9436493..a80c1ed 100644 --- a/akka-js-actor/js/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-js-actor/js/src/main/scala/akka/dispatch/Dispatcher.scala @@ -68,8 +68,8 @@ class Dispatcher( /** * INTERNAL API */ - protected[akka] def executeTask(invocation: TaskInvocation) { - executorService execute invocation + protected[akka] def executeTask(invocation: TaskInvocation): Unit = { + executorService.execute(invocation) } /** @@ -100,7 +100,7 @@ class Dispatcher( protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = { if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races if (mbox.setAsScheduled()) { - executorService execute mbox + executorService.execute(mbox) mbox.setAsIdle() true } else false @@ -114,7 +114,7 @@ object PriorityGenerator { /** * Creates a PriorityGenerator that uses the supplied function as priority generator */ - def apply(priorityFunction: Any ⇒ Int): PriorityGenerator = new PriorityGenerator { + def apply(priorityFunction: Any => Int): PriorityGenerator = new PriorityGenerator { def gen(message: Any): Int = priorityFunction(message) } } diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/Dispatchers.scala index 74b475c..72d18bf 100644 --- a/akka-js-actor/js/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-js-actor/js/src/main/scala/akka/dispatch/Dispatchers.scala @@ -5,20 +5,20 @@ package akka.dispatch import java.util.concurrent.TimeUnit - -import java.util.concurrent.{ ConcurrentHashMap, ThreadFactory } -import com.typesafe.config.{ ConfigFactory, Config } -import akka.actor.{ Scheduler, DynamicAccess, ActorSystem } -import com.typesafe.config.Config -import akka.actor.{ Scheduler, ActorSystem } +import java.util.concurrent.{ConcurrentHashMap, ThreadFactory} +import com.typesafe.config.{Config, ConfigFactory, ConfigValueType} +import akka.actor.{ActorSystem, DynamicAccess, Scheduler} +import akka.actor.{ActorSystem, Scheduler} import akka.event.Logging.Warning -import akka.event.{ EventStream, LoggingAdapter } +import akka.event.{EventStream, LoggingAdapter} + import scala.concurrent.duration.Duration import akka.ConfigurationException import akka.actor.Deploy -/** @note IMPLEMENT IN SCALA.JS +import akka.annotation.{DoNotInherit, InternalApi} +import org.akkajs.shocon import akka.util.Helpers.ConfigOps -*/ + import scala.concurrent.ExecutionContext /** @@ -37,14 +37,16 @@ trait DispatcherPrerequisites { /** * INTERNAL API */ +@InternalApi private[akka] final case class DefaultDispatcherPrerequisites( - val threadFactory: ThreadFactory, - val eventStream: EventStream, - val scheduler: Scheduler, - val dynamicAccess: DynamicAccess, - val settings: ActorSystem.Settings, - val mailboxes: Mailboxes, - val defaultExecutionContext: Option[ExecutionContext]) extends DispatcherPrerequisites + threadFactory: ThreadFactory, + eventStream: EventStream, + scheduler: Scheduler, + dynamicAccess: DynamicAccess, + settings: ActorSystem.Settings, + mailboxes: Mailboxes, + defaultExecutionContext: Option[ExecutionContext]) + extends DispatcherPrerequisites object Dispatchers { /** @@ -56,8 +58,8 @@ object Dispatchers { /** * INTERNAL API */ + @InternalApi private[akka] final val InternalDispatcherId = "akka.actor.internal-dispatcher" - } /** @@ -67,8 +69,12 @@ object Dispatchers { * * Look in `akka.actor.default-dispatcher` section of the reference.conf * for documentation of dispatcher options. + * + * Not for user instantiation or extension */ -class Dispatchers(val settings: ActorSystem.Settings, +@DoNotInherit +class Dispatchers @InternalApi private[akka] ( + val settings: ActorSystem.Settings, val prerequisites: DispatcherPrerequisites, logger: LoggingAdapter) { @@ -77,8 +83,11 @@ class Dispatchers(val settings: ActorSystem.Settings, /** @note IMPLEMENT IN SCALA.JS val cachingConfig = new CachingConfig(settings.config) */ + + val cachingConfig: Config = settings.config + val defaultDispatcherConfig: Config = - idConfig(DefaultDispatcherId).withFallback(settings.config.getConfig(DefaultDispatcherId)) + idConfig(DefaultDispatcherId).withFallback(cachingConfig.getConfig(DefaultDispatcherId)) /** * The one and only default dispatcher. @@ -99,9 +108,7 @@ class Dispatchers(val settings: ActorSystem.Settings, * @throws ConfigurationException if the specified dispatcher cannot be found in the configuration */ - def lookup(id: String): MessageDispatcher = - //new DispatcherConfigurator(new Config, prerequisites).dispatcher() - lookupConfigurator(id).dispatcher() + def lookup(id: String): MessageDispatcher = lookupConfigurator(id).dispatcher() /** * Checks that the configuration provides a section for the given dispatcher. @@ -113,19 +120,20 @@ class Dispatchers(val settings: ActorSystem.Settings, private def lookupConfigurator(id: String): MessageDispatcherConfigurator = { dispatcherConfigurators.get(id) match { - case null ⇒ + case null => // It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup. // That shouldn't happen often and in case it does the actual ExecutorService isn't // created until used, i.e. cheap. - val newConfigurator = new DispatcherConfigurator(config(id), prerequisites) - //if (cachingConfig.hasPath(id)) configuratorFrom(config(id)) - //else throw new ConfigurationException(s"Dispatcher [$id] not configured") + + val newConfigurator: MessageDispatcherConfigurator = + if (cachingConfig.hasPath(id)) configuratorFrom(config(id)) + else throw new ConfigurationException(s"Dispatcher [$id] not configured") dispatcherConfigurators.putIfAbsent(id, newConfigurator) match { - case null ⇒ newConfigurator - case existing ⇒ existing + case null => newConfigurator + case existing => existing } - case existing ⇒ existing + case existing => existing } } @@ -145,18 +153,19 @@ class Dispatchers(val settings: ActorSystem.Settings, def registerConfigurator(id: String, configurator: MessageDispatcherConfigurator): Boolean = dispatcherConfigurators.putIfAbsent(id, configurator) == null */ + /** * INTERNAL API */ private[akka] def config(id: String): Config = { - config(id, settings.config.getConfig(id)) + config(id, cachingConfig.getConfig(id)) } /** * INTERNAL API */ private[akka] def config(id: String, appConfig: Config): Config = { - import scala.collection.JavaConverters._ + import akka.util.ccompat.JavaConverters._ def simpleName = id.substring(id.lastIndexOf('.') + 1) idConfig(id) .withFallback(appConfig) @@ -165,13 +174,8 @@ class Dispatchers(val settings: ActorSystem.Settings, } private def idConfig(id: String): Config = { - com.typesafe.config.Config( - org.akkajs.shocon.Config.Object( - Map( - "id" -> org.akkajs.shocon.Config.StringLiteral(id) - ) - ) - ) + import akka.util.ccompat.JavaConverters._ + ConfigFactory.parseMap(Map[String, Any]("id" -> id).asJava) } /** @@ -199,31 +203,32 @@ class Dispatchers(val settings: ActorSystem.Settings, * IllegalArgumentException if it cannot create the MessageDispatcherConfigurator */ private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = { - /** IMPLEMENT IN SCALA.JS - if (!cfg.hasPath("id")) throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render) + if (!cfg.hasPath("id")) + throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render) cfg.getString("type") match { - case "Dispatcher" ⇒ new DispatcherConfigurator(cfg, prerequisites) - case "BalancingDispatcher" ⇒ + case "Dispatcher" => new DispatcherConfigurator(cfg, prerequisites) + /** IMPLEMENT IN SCALA.JS + case "BalancingDispatcher" => // FIXME remove this case in 2.4 throw new IllegalArgumentException("BalancingDispatcher is deprecated, use a BalancingPool instead. " + "During a migration period you can still use BalancingDispatcher by specifying the full class name: " + classOf[BalancingDispatcherConfigurator].getName) - case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites) - case fqn ⇒ + case "PinnedDispatcher" => new PinnedDispatcherConfigurator(cfg, prerequisites) + */ + case fqn => val args = List(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites) - prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({ - case exception ⇒ - throw new ConfigurationException( - ("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " + - "make sure it has constructor with [com.typesafe.config.Config] and " + - "[akka.dispatch.DispatcherPrerequisites] parameters") - .format(fqn, cfg.getString("id")), exception) - }).get + prerequisites.dynamicAccess + .createInstanceFor[MessageDispatcherConfigurator](fqn, args) + .recover { + case exception => + throw new ConfigurationException( + ("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " + + "make sure it has constructor with [com.typesafe.config.Config] and " + + "[akka.dispatch.DispatcherPrerequisites] parameters") + .format(fqn, cfg.getString("id")), exception) + }.get } - */ - // WE ONLY HAVE ONE DISPATCHER, I'M AFRAID - new DispatcherConfigurator(cfg, prerequisites) } } @@ -237,11 +242,11 @@ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisi private val instance = new Dispatcher( this, - /** @note IMPLEMENT IN SCALA.JS config.getString("id"), */ scala.util.Random.nextString(4), - /** @note IMPLEMENT IN SCALA.JS config.getInt("throughput"), */ 10, // TODO: MAKE IT CONFIGURABLE - /** @note IMPLEMENT IN SCALA.JS config.getNanosDuration("throughput-deadline-time"), */ Duration.fromNanos(1000), + config.getString("id"), + config.getInt("throughput"), + ConfigOps(config).getNanosDuration("throughput-deadline-time") /** @note IMPLEMENT IN SCALA.JS config.getNanosDuration("throughput-deadline-time") */, configureExecutor(), - /** @note IMPLEMENT IN SCALA.JS config.getMillisDuration("shutdown-timeout")) */ Duration.create(1, TimeUnit.SECONDS)) + ConfigOps(config).getMillisDuration("shutdown-timeout") /** @note IMPLEMENT IN SCALA.JS config.getMillisDuration("shutdown-timeout") */) /** * Returns the same dispatcher instance for each invocation @@ -252,7 +257,7 @@ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisi /** * INTERNAL API */ -/** @note IMPLEMENT IN SCALA.J +/** @note IMPLEMENT IN SCALA.JS private[akka] object BalancingDispatcherConfigurator { private val defaultRequirement = ConfigFactory.parseString("mailbox-requirement = akka.dispatch.MultipleConsumerSemantics") @@ -314,8 +319,8 @@ class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrer extends MessageDispatcherConfigurator(config, prerequisites) { private val threadPoolConfig: ThreadPoolConfig = configureExecutor() match { - case e: ThreadPoolExecutorConfigurator ⇒ e.threadPoolConfig - case other ⇒ + case e: ThreadPoolExecutorConfigurator => e.threadPoolConfig + case other => prerequisites.eventStream.publish( Warning("PinnedDispatcherConfigurator", this.getClass, diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/ExecutorServiceDelegate.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/EventLoopExecutor.scala similarity index 100% rename from akka-js-actor/js/src/main/scala/akka/dispatch/ExecutorServiceDelegate.scala rename to akka-js-actor/js/src/main/scala/akka/dispatch/EventLoopExecutor.scala diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/EventLoopExecutorConfigurator.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/EventLoopExecutorConfigurator.scala index 4fb1561..f729a55 100644 --- a/akka-js-actor/js/src/main/scala/akka/dispatch/EventLoopExecutorConfigurator.scala +++ b/akka-js-actor/js/src/main/scala/akka/dispatch/EventLoopExecutorConfigurator.scala @@ -1,8 +1,11 @@ package akka.dispatch import com.typesafe.config.Config + import java.util.concurrent.ExecutorService +import scala.scalajs.reflect.annotation.EnableReflectiveInstantiation +@EnableReflectiveInstantiation class EventLoopExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) { def createExecutorServiceFactory(id: String): ExecutorServiceFactory = diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/ExecutionContextExecutorServiceDelegate.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/ExecutionContextExecutorServiceDelegate.scala new file mode 100644 index 0000000..a53da60 --- /dev/null +++ b/akka-js-actor/js/src/main/scala/akka/dispatch/ExecutionContextExecutorServiceDelegate.scala @@ -0,0 +1,14 @@ +package akka.dispatch + +import java.util +import java.util.concurrent.{AbstractExecutorService, TimeUnit} +import scala.concurrent.ExecutionContext + +private[akka] class ExecutionContextExecutorServiceDelegate(ec: ExecutionContext) extends AbstractExecutorService { + def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = false + def execute(runnable: Runnable): Unit = ec.execute(runnable) + def isShutdown: Boolean = false + def isTerminated: Boolean = false + def shutdown(): Unit = () + def shutdownNow(): util.List[Runnable] = util.Collections.emptyList() +} diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/MacrotaskExecutorConfigurator.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/MacrotaskExecutorConfigurator.scala new file mode 100644 index 0000000..7b12b2f --- /dev/null +++ b/akka-js-actor/js/src/main/scala/akka/dispatch/MacrotaskExecutorConfigurator.scala @@ -0,0 +1,17 @@ +package akka.dispatch + +import com.typesafe.config.Config +import org.scalajs.macrotaskexecutor.MacrotaskExecutor + +import java.util.concurrent.ExecutorService +import scala.scalajs.reflect.annotation.EnableReflectiveInstantiation + +@EnableReflectiveInstantiation +class MacrotaskExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) { + + private lazy val factory = new ExecutorServiceFactory { + def createExecutorService: ExecutorService = new ExecutionContextExecutorServiceDelegate(MacrotaskExecutor) + } + + def createExecutorServiceFactory(id: String): ExecutorServiceFactory = factory +} diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/QueueExecutorConfigurator.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/QueueExecutorConfigurator.scala new file mode 100644 index 0000000..7942caa --- /dev/null +++ b/akka-js-actor/js/src/main/scala/akka/dispatch/QueueExecutorConfigurator.scala @@ -0,0 +1,17 @@ +package akka.dispatch + +import com.typesafe.config.Config + +import java.util.concurrent.ExecutorService +import scala.scalajs.concurrent.JSExecutionContext +import scala.scalajs.reflect.annotation.EnableReflectiveInstantiation + +@EnableReflectiveInstantiation +class QueueExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) { + + private lazy val factory = new ExecutorServiceFactory { + def createExecutorService: ExecutorService = new ExecutionContextExecutorServiceDelegate(JSExecutionContext.queue) + } + + def createExecutorServiceFactory(id: String): ExecutorServiceFactory = factory +} diff --git a/akka-js-actor/js/src/main/scala/java/util/concurrent/AbstractExecutorService.scala b/akka-js-actor/js/src/main/scala/java/util/concurrent/AbstractExecutorService.scala new file mode 100644 index 0000000..87d5795 --- /dev/null +++ b/akka-js-actor/js/src/main/scala/java/util/concurrent/AbstractExecutorService.scala @@ -0,0 +1,13 @@ +package java.util.concurrent + +import java.util + +abstract class AbstractExecutorService extends ExecutorService { + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = ??? + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = ??? + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = ??? + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = ??? + def submit[T](task: Callable[T]): Future[T] = ??? + def submit(task: Runnable): Future[_] = ??? + def submit[T](task: Runnable, result: T): Future[T] = ??? +} diff --git a/akka-js-stream-testkit/js/src/test/resources/application.conf b/akka-js-stream-testkit/js/src/test/resources/application.conf index 263879c..b5fcae1 100644 --- a/akka-js-stream-testkit/js/src/test/resources/application.conf +++ b/akka-js-stream-testkit/js/src/test/resources/application.conf @@ -96,7 +96,7 @@ akka { type = "Dispatcher" executor = "default-executor" default-executor { - fallback = "fork-join-executor" + fallback = "macrotask-executor" } fork-join-executor { parallelism-min = 8 @@ -124,6 +124,14 @@ akka { mailbox-requirement = "" } + internal-dispatcher { + type = "Dispatcher" + executor = "macrotask-executor" + shutdown-timeout = 1s + throughput = 5 + throughput-deadline-time = 0ms + } + default-blocking-io-dispatcher { type = "Dispatcher" executor = "thread-pool-executor" @@ -336,8 +344,8 @@ akka { creation-timeout = 20s initial-input-buffer-size = 4 max-input-buffer-size = 16 + dispatcher = "akka.actor.default-dispatcher" blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher" - dispatcher = "" subscription-timeout { mode = cancel timeout = 5s diff --git a/akka-js-testkit/js/src/main/resources/application.conf b/akka-js-testkit/js/src/main/resources/application.conf index 44c0257..b623079 100644 --- a/akka-js-testkit/js/src/main/resources/application.conf +++ b/akka-js-testkit/js/src/main/resources/application.conf @@ -95,7 +95,7 @@ akka { type = "Dispatcher" executor = "default-executor" default-executor { - fallback = "fork-join-executor" + fallback = "macrotask-executor" } fork-join-executor { parallelism-min = 8 @@ -123,6 +123,14 @@ akka { mailbox-requirement = "" } + internal-dispatcher { + type = "Dispatcher" + executor = "macrotask-executor" + shutdown-timeout = 1s + throughput = 5 + throughput-deadline-time = 0ms + } + default-blocking-io-dispatcher { type = "Dispatcher" executor = "thread-pool-executor" @@ -330,8 +338,8 @@ akka { creation-timeout = 20s initial-input-buffer-size = 4 max-input-buffer-size = 16 + dispatcher = "akka.actor.default-dispatcher" blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher" - dispatcher = "" subscription-timeout { mode = cancel timeout = 5s diff --git a/akka-js-testkit/js/src/test/resources/application.conf b/akka-js-testkit/js/src/test/resources/application.conf index 263879c..b5fcae1 100644 --- a/akka-js-testkit/js/src/test/resources/application.conf +++ b/akka-js-testkit/js/src/test/resources/application.conf @@ -96,7 +96,7 @@ akka { type = "Dispatcher" executor = "default-executor" default-executor { - fallback = "fork-join-executor" + fallback = "macrotask-executor" } fork-join-executor { parallelism-min = 8 @@ -124,6 +124,14 @@ akka { mailbox-requirement = "" } + internal-dispatcher { + type = "Dispatcher" + executor = "macrotask-executor" + shutdown-timeout = 1s + throughput = 5 + throughput-deadline-time = 0ms + } + default-blocking-io-dispatcher { type = "Dispatcher" executor = "thread-pool-executor" @@ -336,8 +344,8 @@ akka { creation-timeout = 20s initial-input-buffer-size = 4 max-input-buffer-size = 16 + dispatcher = "akka.actor.default-dispatcher" blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher" - dispatcher = "" subscription-timeout { mode = cancel timeout = 5s diff --git a/build.sbt b/build.sbt index 1312dd4..d19179a 100644 --- a/build.sbt +++ b/build.sbt @@ -1,9 +1,11 @@ -val akkaJsVersion = "2.2.6.14" +val akkaJsVersion = "2.2.6.14-SNAPSHOT" val akkaOriginalVersion = "v2.6.14" +val scalaVersions = Seq("2.12.15", "2.13.8") + val commonSettings = Seq( - scalaVersion := "2.13.2", - crossScalaVersions := Seq("2.12.13", "2.13.5"), + crossScalaVersions := scalaVersions, + scalaVersion := scalaVersions.last, organization := "org.akka-js", scalacOptions ++= Seq( "-deprecation", @@ -227,7 +229,8 @@ lazy val akkaJsActor = crossProject(JSPlatform) scalaJSLinkerConfig ~= (_.withCheckIR(true)), libraryDependencies ++= { Seq( - "org.akka-js" %%% "shocon" % "1.0.0", + "org.akka-js" %%% "shocon" % "1.1.0-SNAPSHOT", + "org.scala-js" %%% "scala-js-macrotask-executor" % "1.0.0", "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.1" ) }, @@ -311,7 +314,8 @@ lazy val akkaJsTestkit = crossProject(JSPlatform) .jsSettings(useAnnotationAdderPluginSettings : _*) .jsSettings( libraryDependencies ++= Seq( - "org.scalatest" %%% "scalatest" % "3.1.1" withSources () + "org.scalatest" %%% "scalatest" % "3.1.1" withSources (), + "org.scala-js" %%% "scalajs-fake-weakreferences" % "1.0.0" ), scalaJSStage in Global := FastOptStage, publishArtifact in (Test, packageBin) := true, @@ -449,7 +453,8 @@ lazy val akkaJsStreamTestkit = crossProject(JSPlatform) ).jsSettings( scalaJSStage in Global := FastOptStage, libraryDependencies ++= Seq( - "org.scalatest" %%% "scalatest" % "3.1.1" withSources () + "org.scalatest" %%% "scalatest" % "3.1.1" withSources (), + "org.scala-js" %%% "scalajs-fake-weakreferences" % "1.0.0" ), publishArtifact in (Test, packageBin) := true, //scalaJSOptimizerOptions ~= { _.withDisableOptimizer(true) }, @@ -595,6 +600,7 @@ lazy val akkaJsStreamTestkit = crossProject(JSPlatform) publishArtifact in (Test, packageBin) := true, libraryDependencies ++= Seq( "org.scalatest" %%% "scalatest" % "3.1.1" withSources (), + "org.scala-js" %%% "scalajs-fake-weakreferences" % "1.0.0", "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.1" ), //scalaJSOptimizerOptions ~= { _.withDisableOptimizer(true) }, diff --git a/config/annotation_adder.config b/config/annotation_adder.config index eef1410..5e66d6f 100644 --- a/config/annotation_adder.config +++ b/config/annotation_adder.config @@ -8,5 +8,6 @@ akka.testkit.TestEventListener scala.scalajs.reflect.annotation.EnableReflective akka.testkit.EchoActor scala.scalajs.reflect.annotation.EnableReflectiveInstantiation akka.testkit.BlackholeActor scala.scalajs.reflect.annotation.EnableReflectiveInstantiation akka.testkit.ForwardActor scala.scalajs.reflect.annotation.EnableReflectiveInstantiation +akka.testkit.CallingThreadDispatcherConfigurator scala.scalajs.reflect.annotation.EnableReflectiveInstantiation akka.typed.DefaultLoggingFilter scala.scalajs.reflect.annotation.EnableReflectiveInstantiation akka.typed.DefaultLogger scala.scalajs.reflect.annotation.EnableReflectiveInstantiation diff --git a/project/plugins.sbt b/project/plugins.sbt index 79d6f02..218e5cc 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,6 +1,6 @@ -val scalaJsVersion = "1.0.1" +val scalaJsVersion = "1.8.0" -addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.0.0") +addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.1.0") addSbtPlugin("org.scala-js" % "sbt-scalajs" % scalaJsVersion) addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.3")