diff --git a/akka-js-actor-typed/js/src/main/scala/akka/actor/typed/internal/DispatchersImpl.scala b/akka-js-actor-typed/js/src/main/scala/akka/actor/typed/internal/DispatchersImpl.scala
deleted file mode 100644
index 3a1caf8..0000000
--- a/akka-js-actor-typed/js/src/main/scala/akka/actor/typed/internal/DispatchersImpl.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Copyright (C) 2016-2017 Lightbend Inc.
- */
-package akka.actor.typed
-package internal
-
-import scala.concurrent.ExecutionContextExecutor
-// import java.util.concurrent.{ Executors, ExecutorService }
-import akka.event.LoggingAdapter
-import akka.{ actor ⇒ a, dispatch ⇒ d, event ⇒ e }
-import java.util.concurrent.ConcurrentHashMap
-import akka.ConfigurationException
-import com.typesafe.config.{ Config, ConfigFactory }
-import akka.dispatch.ExecutionContexts
-import java.util.concurrent.ConcurrentSkipListSet
-import java.util.Comparator
-
-class DispatchersImpl(settings: Settings, log: LoggingAdapter, prerequisites: d.DispatcherPrerequisites) extends Dispatchers {
-
- def lookup(selector: DispatcherSelector): ExecutionContextExecutor =
- scala.scalajs.concurrent.JSExecutionContext.queue
-/* selector match {
- case DispatcherDefault(_) ⇒ defaultGlobalDispatcher
- case DispatcherFromConfig(path, _) ⇒ lookup(path)
- case DispatcherFromExecutor(ex: ExecutionContextExecutor, _) ⇒ ex
- case DispatcherFromExecutor(ex, _) ⇒ d.ExecutionContexts.fromExecutor(ex)
- case DispatcherFromExecutionContext(ec: ExecutionContextExecutor, _) ⇒ ec
- case DispatcherFromExecutionContext(ec, _) ⇒ throw new UnsupportedOperationException("I thought all ExecutionContexts are also Executors?") // FIXME
- }
-*/
- def shutdown(): Unit = {
- // val i = allCreatedServices.iterator()
- // while (i.hasNext) i.next().shutdown()
- // allCreatedServices.clear()
- }
-
- import Dispatchers._
-
- // val cachingConfig = new d.CachingConfig(settings.config)
-
- val defaultDispatcherConfig: Config =
- idConfig(DefaultDispatcherId).withFallback(settings.config.getConfig(DefaultDispatcherId))
-
- /**
- * The one and only default dispatcher.
- */
- def defaultGlobalDispatcher: ExecutionContextExecutor = lookup(DefaultDispatcherId)
-
- private val dispatcherConfigurators = new ConcurrentHashMap[String, MessageDispatcherConfigurator]
- // private val allCreatedServices = new ConcurrentSkipListSet[ExecutorService](new Comparator[ExecutorService] {
- // override def compare(left: ExecutorService, right: ExecutorService): Int = {
- // val l = if (left == null) 0 else left.hashCode
- // val r = if (right == null) 0 else right.hashCode
- // if (l < r) -1 else if (l > r) 1 else 0
- // }
- // })
-
- /**
- * Returns a dispatcher as specified in configuration. Please note that this
- * method _may_ create and return a NEW dispatcher, _every_ call.
- *
- * Throws ConfigurationException if the specified dispatcher cannot be found in the configuration.
- */
- def lookup(id: String): ExecutionContextExecutor =
- lookupConfigurator(id).dispatcher() match {
- // case es: ExecutorService ⇒
- // allCreatedServices.add(es)
- // es
- case ece ⇒ ece
- }
-
- /**
- * Checks that the configuration provides a section for the given dispatcher.
- * This does not guarantee that no ConfigurationException will be thrown when
- * using this dispatcher, because the details can only be checked by trying
- * to instantiate it, which might be undesirable when just checking.
- */
- def hasDispatcher(id: String): Boolean = dispatcherConfigurators.containsKey(id) // || cachingConfig.hasPath(id)
-
- private def lookupConfigurator(id: String): MessageDispatcherConfigurator = {
- dispatcherConfigurators.get(id) match {
- case null ⇒
- // It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup.
- // That shouldn't happen often and in case it does the actual ExecutorService isn't
- // created until used, i.e. cheap.
- val newConfigurator =
- new DispatcherConfigurator(config(id), prerequisites)
- // if (cachingConfig.hasPath(id)) configuratorFrom(config(id))
- // else throw new ConfigurationException(s"Dispatcher [$id] not configured")
-
- dispatcherConfigurators.putIfAbsent(id, newConfigurator) match {
- case null ⇒ newConfigurator
- case existing ⇒ existing
- }
-
- case existing ⇒ existing
- }
- }
-
- /**
- * INTERNAL API
- */
- private[akka] def config(id: String): Config = {
- import scala.collection.JavaConverters._
- def simpleName = id.substring(id.lastIndexOf('.') + 1)
- idConfig(id)
- .withFallback(settings.config.getConfig(id))
- .withFallback( // ConfigFactory.parseMap(Map("name" → simpleName).asJava))
- ConfigFactory.parseString(s"{ name=$simpleName }")
- )
- .withFallback(defaultDispatcherConfig)
- }
-
- private def idConfig(id: String): Config = {
- import scala.collection.JavaConverters._
- // ConfigFactory.parseMap(Map("id" → id).asJava)
- ConfigFactory.parseString(s"{ id=$id }")
- }
-
- /**
- * INTERNAL API
- *
- * Creates a MessageDispatcherConfigurator from a Config.
- *
- * The Config must also contain a `id` property, which is the identifier of the dispatcher.
- *
- * Throws: IllegalArgumentException if the value of "type" is not valid
- * IllegalArgumentException if it cannot create the MessageDispatcherConfigurator
- */
- private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = {
- if (!cfg.hasPath("id")) throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render)
-
- cfg.getString("type") match {
- case "Dispatcher" ⇒ new DispatcherConfigurator(cfg, prerequisites)
- // case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites)
- case fqn ⇒
- val args = List(classOf[Config] → cfg)
- prerequisites.dynamicAccess.createInstanceFor[DispatcherConfigurator](fqn, args).recover({
- case exception ⇒
- throw new ConfigurationException(
- ("Cannot instantiate DispatcherConfigurator type [%s], defined in [%s], " +
- "make sure it has constructor with [com.typesafe.config.Config] and " +
- "[akka.dispatch.DispatcherPrerequisites] parameters")
- .format(fqn, cfg.getString("id")), exception)
- }).get
- }
- }
-}
-
-/**
- * Base class to be used for hooking in new dispatchers into Dispatchers.
- */
-abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: d.DispatcherPrerequisites) {
-
- // val config: Config = new d.CachingConfig(_config)
-
- /**
- * Returns an instance of MessageDispatcher given the configuration.
- * Depending on the needs the implementation may return a new instance for
- * each invocation or return the same instance every time.
- */
- def dispatcher(): ExecutionContextExecutor
-
- def configureExecutor(): d.ExecutorServiceConfigurator = {
- def configurator(executor: String): d.ExecutorServiceConfigurator = executor match {
- case "event-loop-executor" ⇒ new d.EventLoopExecutorConfigurator(new Config, prerequisites)
- }
- configurator("event-loop-executor")
- }
-/*
- def configureExecutor(): d.ExecutorServiceConfigurator = {
- def configurator(executor: String): d.ExecutorServiceConfigurator = executor match {
- case null | "" | "fork-join-executor" ⇒ new d.ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
- case "thread-pool-executor" ⇒ new d.ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
- case fqcn ⇒
- val args = List(
- classOf[Config] → config,
- classOf[d.DispatcherPrerequisites] → prerequisites)
- prerequisites.dynamicAccess.createInstanceFor[d.ExecutorServiceConfigurator](fqcn, args).recover({
- case exception ⇒ throw new IllegalArgumentException(
- ("Cannot instantiate ExecutorServiceConfigurator (\"executor = [%s]\"), defined in [%s], " +
- "make sure it has an accessible constructor with a [%s,%s] signature")
- .format(fqcn, config.getString("id"), classOf[Config], classOf[d.DispatcherPrerequisites]), exception)
- }).get
- }
-
- config.getString("executor") match {
- case "default-executor" ⇒ new d.DefaultExecutorServiceConfigurator(config.getConfig("default-executor"), prerequisites, configurator(config.getString("default-executor.fallback")))
- case other ⇒ configurator(other)
- }
- }
-*/
-}
-
-/**
- * Configurator for creating [[akka.dispatch.Dispatcher]].
- * Returns the same dispatcher instance for for each invocation
- * of the `dispatcher()` method.
- */
-class DispatcherConfigurator(config: Config, prerequisites: d.DispatcherPrerequisites)
- extends MessageDispatcherConfigurator(config, prerequisites) {
-
- private val instance = ExecutionContexts.fromExecutorService(
- configureExecutor().createExecutorServiceFactory(config.getString("id")/*, prerequisites.threadFactory*/)
- .createExecutorService)
-
- /**
- * Returns the same dispatcher instance for each invocation
- */
- override def dispatcher(): ExecutionContextExecutor = instance
-}
-/*
-/**
- * Configurator for creating [[akka.dispatch.PinnedDispatcher]].
- * Returns new dispatcher instance for for each invocation
- * of the `dispatcher()` method.
- */
-class PinnedDispatcherConfigurator(config: Config, prerequisites: d.DispatcherPrerequisites)
- extends MessageDispatcherConfigurator(config, prerequisites) {
-
- private val threadPoolConfig: d.ThreadPoolConfig = configureExecutor() match {
- case e: d.ThreadPoolExecutorConfigurator ⇒ e.threadPoolConfig
- case other ⇒
- prerequisites.eventStream.publish(
- e.Logging.Warning(
- "PinnedDispatcherConfigurator",
- this.getClass,
- "PinnedDispatcher [%s] not configured to use ThreadPoolExecutor, falling back to default config.".format(
- config.getString("id"))))
- d.ThreadPoolConfig()
- }
-
- private val factory = threadPoolConfig.createExecutorServiceFactory(config.getString("id"), prerequisites.threadFactory)
-
- /**
- * Creates new dispatcher for each invocation.
- */
- override def dispatcher(): ExecutionContextExecutor = ExecutionContexts.fromExecutorService(factory.createExecutorService)
-
-}
-*/
diff --git a/akka-js-actor-typed/js/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala b/akka-js-actor-typed/js/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala
new file mode 100644
index 0000000..d7690d5
--- /dev/null
+++ b/akka-js-actor-typed/js/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala
@@ -0,0 +1,236 @@
+/*
+ * Copyright (C) 2009-2021 Lightbend Inc.
+ */
+
+package akka.actor.typed.internal
+
+import scala.reflect.ClassTag
+
+import org.slf4j.LoggerFactory
+import org.slf4j.event.Level
+
+import akka.actor.typed
+import akka.actor.typed._
+import akka.actor.typed.LogOptions
+import akka.actor.typed.scaladsl.Behaviors
+import akka.annotation.InternalApi
+import akka.util.LineNumbers
+
+/**
+ * Provides the impl of any behavior that could nest another behavior
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[akka] object InterceptorImpl {
+
+ def apply[O, I](interceptor: () => BehaviorInterceptor[O, I], nestedBehavior: Behavior[I]): Behavior[O] = {
+ BehaviorImpl.DeferredBehavior[O] { ctx =>
+ val interceptorBehavior = new InterceptorImpl[O, I](interceptor(), nestedBehavior)
+ interceptorBehavior.preStart(ctx)
+ }
+ }
+}
+
+/**
+ * Provides the impl of any behavior that could nest another behavior
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[akka] final class InterceptorImpl[O, I](
+ val interceptor: BehaviorInterceptor[O, I],
+ val nestedBehavior: Behavior[I])
+ extends ExtensibleBehavior[O] {
+
+ import BehaviorInterceptor._
+
+ private val preStartTarget: PreStartTarget[I] = new PreStartTarget[I] {
+ override def start(ctx: TypedActorContext[_]): Behavior[I] = {
+ Behavior.start[I](nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]])
+ }
+ override def toString: String = s"PreStartTarget($nestedBehavior)"
+ }
+
+ private val receiveTarget: ReceiveTarget[I] = new ReceiveTarget[I] {
+ override def apply(ctx: TypedActorContext[_], msg: I): Behavior[I] =
+ Behavior.interpretMessage(nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]], msg)
+
+ override def signalRestart(ctx: TypedActorContext[_]): Unit =
+ Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]], PreRestart)
+
+ override def toString: String = s"ReceiveTarget($nestedBehavior)"
+ }
+
+ private val signalTarget = new SignalTarget[I] {
+ override def apply(ctx: TypedActorContext[_], signal: Signal): Behavior[I] =
+ Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]], signal)
+ override def toString: String = s"SignalTarget($nestedBehavior)"
+ }
+
+ // invoked pre-start to start/de-duplicate the initial behavior stack
+ def preStart(ctx: typed.TypedActorContext[O]): Behavior[O] = {
+ val started = interceptor.aroundStart(ctx, preStartTarget)
+ deduplicate(started, ctx)
+ }
+
+ def replaceNested(newNested: Behavior[I]): Behavior[O] =
+ new InterceptorImpl(interceptor, newNested)
+
+ override def receive(ctx: typed.TypedActorContext[O], msg: O): Behavior[O] = {
+ // TODO performance optimization could maybe to avoid isAssignableFrom if interceptMessageClass is Class[Object]?
+ val interceptMessageClass = interceptor.interceptMessageClass
+ val result =
+ if ((interceptMessageClass ne null) && interceptor.interceptMessageClass.isInstance(msg))
+ interceptor.aroundReceive(ctx, msg, receiveTarget)
+ else
+ receiveTarget.apply(ctx, msg.asInstanceOf[I])
+ deduplicate(result, ctx)
+ }
+
+ override def receiveSignal(ctx: typed.TypedActorContext[O], signal: Signal): Behavior[O] = {
+ val interceptedResult = interceptor.aroundSignal(ctx, signal, signalTarget)
+ deduplicate(interceptedResult, ctx)
+ }
+
+ private def deduplicate(interceptedResult: Behavior[I], ctx: TypedActorContext[O]): Behavior[O] = {
+ val started = Behavior.start(interceptedResult, ctx.asInstanceOf[TypedActorContext[I]])
+ if (started == BehaviorImpl.UnhandledBehavior || started == BehaviorImpl.SameBehavior || !Behavior.isAlive(started)) {
+ started.unsafeCast[O]
+ } else {
+ // returned behavior could be nested in setups, so we need to start before we deduplicate
+ val duplicateInterceptExists = Behavior.existsInStack(started) {
+ case i: InterceptorImpl[O, I]
+ if interceptor.isSame(i.interceptor.asInstanceOf[BehaviorInterceptor[Any, Any]]) =>
+ true
+ case _ => false
+ }
+
+ if (duplicateInterceptExists) started.unsafeCast[O]
+ else new InterceptorImpl[O, I](interceptor, started)
+ }
+ }
+
+ override def toString(): String = s"Interceptor($interceptor, $nestedBehavior)"
+}
+
+/**
+ * Fire off any incoming message to another actor before receiving it ourselves.
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[akka] final case class MonitorInterceptor[T: ClassTag](actorRef: ActorRef[T])
+ extends BehaviorInterceptor[T, T] {
+ import BehaviorInterceptor._
+
+ override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
+ actorRef ! msg
+ target(ctx, msg)
+ }
+
+ // only once to the same actor in the same behavior stack
+ override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = other match {
+ case MonitorInterceptor(`actorRef`) => true
+ case _ => false
+ }
+
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[akka] object LogMessagesInterceptor {
+ def apply[T](opts: LogOptions): BehaviorInterceptor[T, T] = {
+ new LogMessagesInterceptor(opts).asInstanceOf[BehaviorInterceptor[T, T]]
+ }
+
+ private val LogMessageTemplate = "actor [{}] received message: {}"
+ private val LogSignalTemplate = "actor [{}] received signal: {}"
+}
+
+/**
+ * Log all messages for this decorated ReceiveTarget[T] to logger before receiving it ourselves.
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[akka] final class LogMessagesInterceptor(val opts: LogOptions) extends BehaviorInterceptor[Any, Any] {
+
+ import BehaviorInterceptor._
+ import LogMessagesInterceptor._
+
+ private val logger = opts.getLogger.orElse(LoggerFactory.getLogger(getClass))
+
+ override def aroundReceive(ctx: TypedActorContext[Any], msg: Any, target: ReceiveTarget[Any]): Behavior[Any] = {
+ log(LogMessageTemplate, msg, ctx)
+ target(ctx, msg)
+ }
+
+ override def aroundSignal(ctx: TypedActorContext[Any], signal: Signal, target: SignalTarget[Any]): Behavior[Any] = {
+ log(LogSignalTemplate, signal, ctx)
+ target(ctx, signal)
+ }
+
+ private def log(template: String, messageOrSignal: Any, context: TypedActorContext[Any]): Unit = {
+ if (opts.enabled) {
+ val selfPath = context.asScala.self.path
+ opts.level match {
+ case Level.ERROR => logger.error(template, selfPath, messageOrSignal)
+ case Level.WARN => logger.warn(template, selfPath, messageOrSignal)
+ case Level.INFO => logger.info(template, selfPath, messageOrSignal)
+ case Level.DEBUG => logger.debug(template, selfPath, messageOrSignal)
+ case Level.TRACE => logger.trace(template, selfPath, messageOrSignal)
+ case other => throw new IllegalArgumentException(s"Unknown log level [$other].")
+ }
+ }
+ }
+
+ // only once in the same behavior stack
+ override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = other match {
+ case a: LogMessagesInterceptor => a.opts == opts
+ case _ => false
+ }
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[akka] object TransformMessagesInterceptor {
+
+ private final val _notMatchIndicator: Any = new AnyRef
+ private final val _any2NotMatchIndicator = (_: Any) => _notMatchIndicator
+ private final def any2NotMatchIndicator[T] = _any2NotMatchIndicator.asInstanceOf[Any => T]
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[akka] final case class TransformMessagesInterceptor[O: ClassTag, I](matcher: PartialFunction[O, I])
+ extends BehaviorInterceptor[O, I] {
+ import BehaviorInterceptor._
+ import TransformMessagesInterceptor._
+
+ override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = other match {
+ // If they use the same pf instance we can allow it, to have one way to workaround defining
+ // "recursive" narrowed behaviors.
+ case TransformMessagesInterceptor(`matcher`) => true
+ case TransformMessagesInterceptor(otherMatcher) =>
+ // there is no safe way to allow this
+ throw new IllegalStateException(
+ "transformMessages can only be used one time in the same behavior stack. " +
+ s"One defined in ${LineNumbers(matcher)}, and another in ${LineNumbers(otherMatcher)}")
+ case _ => false
+ }
+
+ def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[I]): Behavior[I] = {
+ matcher.applyOrElse(msg, any2NotMatchIndicator) match {
+ case result if _notMatchIndicator == result => Behaviors.unhandled
+ case transformed => target(ctx, transformed)
+ }
+ }
+
+ override def toString: String = s"TransformMessages(${LineNumbers(matcher)})"
+}
diff --git a/akka-js-actor/js/src/main/resources/application.conf b/akka-js-actor/js/src/main/resources/application.conf
index 16123ee..04a23e3 100644
--- a/akka-js-actor/js/src/main/resources/application.conf
+++ b/akka-js-actor/js/src/main/resources/application.conf
@@ -61,7 +61,6 @@ akka {
tail-chopping-router {
interval = 10 milliseconds
}
-
routees {
paths = []
}
@@ -75,7 +74,6 @@ akka {
backoff-rate = 0.1.
messages-per-resize = 10
}
-
optimal-size-exploring-resizer {
enabled = off
lower-bound = 1
@@ -93,10 +91,24 @@ akka {
}
default-dispatcher {
+ # Must be one of the following
+ # Dispatcher or a FQCN to a class inheriting
+ # MessageDispatcherConfigurator with a public constructor with
+ # both com.typesafe.config.Config parameter and
+ # akka.dispatch.DispatcherPrerequisites parameters.
type = "Dispatcher"
+
+ # Which kind of ExecutorService to use for this dispatcher
+ # Valid options:
+ # - "default-executor" requires a "default-executor" section
+ # - "macrotask-executor"
+ # - "event-loop-executor"
+ # - "queue-executor"
+ # - A FQCN of a class extending ExecutorServiceConfigurator
executor = "default-executor"
+
default-executor {
- fallback = "fork-join-executor"
+ fallback = "macrotask-executor"
}
fork-join-executor {
parallelism-min = 8
@@ -124,6 +136,14 @@ akka {
mailbox-requirement = ""
}
+ internal-dispatcher {
+ type = "Dispatcher"
+ executor = "macrotask-executor"
+ shutdown-timeout = 1s
+ throughput = 5
+ throughput-deadline-time = 0ms
+ }
+
default-blocking-io-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
@@ -330,8 +350,8 @@ akka {
creation-timeout = 20s
initial-input-buffer-size = 4
max-input-buffer-size = 16
+ dispatcher = "akka.actor.default-dispatcher"
blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher"
- dispatcher = ""
subscription-timeout {
mode = cancel
timeout = 5s
diff --git a/akka-js-actor/js/src/main/scala/akka/actor/ReflectiveDynamicAccess.scala b/akka-js-actor/js/src/main/scala/akka/actor/ReflectiveDynamicAccess.scala
index f447ca6..9b7c49b 100644
--- a/akka-js-actor/js/src/main/scala/akka/actor/ReflectiveDynamicAccess.scala
+++ b/akka-js-actor/js/src/main/scala/akka/actor/ReflectiveDynamicAccess.scala
@@ -17,7 +17,7 @@ class JSDynamicAccess(val classLoader: ClassLoader) extends DynamicAccess {
def getRuntimeClass[A](name: String): InstantiatableClass = {
Reflect.lookupInstantiatableClass(name).getOrElse {
- throw new InstantiationError(s"JSDynamicAccess $name is not js instantiable class")
+ throw new ClassNotFoundException(s"Class [$name] does not exist or is not a JS instantiable class")
}
}
@@ -27,10 +27,11 @@ class JSDynamicAccess(val classLoader: ClassLoader) extends DynamicAccess {
dyn.declaredConstructors.find(_.parameterTypes == constructorClasses).map{ ctor =>
ctor.newInstance(args.map(_._2): _*).asInstanceOf[A]
}.getOrElse{
- throw new InstantiationError("error trying to get instance for " + dyn.runtimeClass.getName + "\n" + dyn.toString)
+ throw new InstantiationError("Error trying to get instance for " + dyn.runtimeClass.getName + "\n" + dyn.toString)
}
} catch {
- case err: Exception => err.printStackTrace()
+ case err: Exception =>
+ err.printStackTrace()
throw err
}
}
diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/AbstractBoundedNodeQueue.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/AbstractBoundedNodeQueue.scala
index 34b4039..cb302e3 100644
--- a/akka-js-actor/js/src/main/scala/akka/dispatch/AbstractBoundedNodeQueue.scala
+++ b/akka-js-actor/js/src/main/scala/akka/dispatch/AbstractBoundedNodeQueue.scala
@@ -6,5 +6,16 @@ package akka.dispatch
import java.util.ArrayDeque
-class AbstractBoundedNodeQueue[T](capacity: Int)
- extends ArrayDeque[T](capacity) {}
+class AbstractBoundedNodeQueue[T](capacity: Int) extends ArrayDeque[T](capacity) {
+ override def offerFirst(e: T): Boolean = {
+ if (size == capacity) false
+ else super.offerFirst(e)
+ }
+
+ override def offerLast(e: T): Boolean = {
+ if (size == capacity) false
+ else super.offerLast(e)
+ }
+
+ override def add(e: T): Boolean = offerLast(e)
+}
diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/AbstractDispatcher.scala
index 0cf4c61..6c62a27 100644
--- a/akka-js-actor/js/src/main/scala/akka/dispatch/AbstractDispatcher.scala
+++ b/akka-js-actor/js/src/main/scala/akka/dispatch/AbstractDispatcher.scala
@@ -4,11 +4,16 @@
package akka.dispatch
-import akka.event.Logging.{ Debug, Error, LogEventException }
+import akka.event.Logging.{Debug, Error, LogEventException}
import akka.actor._
+import akka.annotation.InternalStableApi
import akka.dispatch.sysmsg._
-import akka.event.{ BusLogging, EventStream }
- /**import com.typesafe.config.{ ConfigFactory, Config } */
+import akka.event.{BusLogging, EventStream}
+import akka.util.unused
+
+import java.util
+import java.util.concurrent.{AbstractExecutorService, Callable, ExecutorService, Future, TimeUnit}
+/**import com.typesafe.config.{ ConfigFactory, Config } */
import akka.util.{ /** @note IMPLEMENT IN SCALA.JS Unsafe,*/ Index }
import akka.event.EventStream
import com.typesafe.config.Config
@@ -20,51 +25,57 @@ import scala.concurrent.duration.FiniteDuration
import scala.scalajs.js.timers._
import scala.util.control.NonFatal
import scala.util.Try
-import java.{ util ⇒ ju }
+import java.{ util => ju }
import java.util.concurrent.Executor
-final case class Envelope private (val message: Any, val sender: ActorRef)
+final case class Envelope private (message: Any, sender: ActorRef) {
+
+ def copy(message: Any = message, sender: ActorRef = sender) = {
+ Envelope(message, sender)
+ }
+}
object Envelope {
def apply(message: Any, sender: ActorRef, system: ActorSystem): Envelope = {
- if (message == null) throw new InvalidMessageException("Message is null")
+ if (message == null) {
+ if (sender eq Actor.noSender)
+ throw InvalidMessageException(s"Message is null.")
+ else
+ throw InvalidMessageException(s"Message sent from [$sender] is null.")
+ }
new Envelope(message, if (sender ne Actor.noSender) sender else system.deadLetters)
}
}
-final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () ⇒ Unit) extends Runnable /** @note IMPLEMENT IN SCALA.JS extends Batchable */ {
- final def isBatchable: Boolean = runnable match {
- case _ ⇒ false
- }
+final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () => Unit) extends Runnable /** @note IMPLEMENT IN SCALA.JS extends Batchable */ {
+ final def isBatchable: Boolean = false /** @note IMPLEMENT IN SCALA.JS akka.dispatch.internal.ScalaBatchable.isBatchable(runnable) */
def run(): Unit =
try runnable.run() catch {
- case NonFatal(e) ⇒ eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage))
+ case NonFatal(e) => eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage))
} finally cleanup()
}
-
-
/**
* INTERNAL API
*/
- private[akka] trait LoadMetrics { self: Executor ⇒
- def atFullThrottle(): Boolean
- }
+private[akka] trait LoadMetrics { self: Executor =>
+ def atFullThrottle(): Boolean
+}
/**
* INTERNAL API
*/
private[akka] object MessageDispatcher {
- val UNSCHEDULED = 0 //WARNING DO NOT CHANGE THE VALUE OF THIS: It relies on the faster init of 0 in AbstractMessageDispatcher
+ val UNSCHEDULED = 0 //WARNING DO NOT CHANGE THE VALUE OF THIS: It relies on the faster init of 0 in AbstractMessageDispatcher
val SCHEDULED = 1
- val RESCHEDULED = 2
+ val RESCHEDULED = 2
- // dispatcher debugging helper using println (see below)
- // since this is a compile-time constant, scalac will elide code behind if (MessageDispatcher.debug) (RK checked with 2.9.1)
+ // dispatcher debugging helper using println (see below)
+ // since this is a compile-time constant, scalac will elide code behind if (MessageDispatcher.debug) (RK checked with 2.9.1)
final val debug = false // Deliberately without type ascription to make it a compile-time constant
- lazy val actors = new Index[MessageDispatcher, ActorRef](16, new ju.Comparator[ActorRef] {
+ lazy val actors = new Index[MessageDispatcher, ActorRef](16, new ju.Comparator[ActorRef] {
override def compare(a: ActorRef, b: ActorRef): Int = a.compareTo(b)
})
def printActors(): Unit =
@@ -75,12 +86,12 @@ private[akka] object MessageDispatcher {
} {
val status = if (a.isTerminated) " (terminated)" else " (alive)"
val messages = a match {
- case r: ActorRefWithCell ⇒ " " + r.underlying.numberOfMessages + " messages"
- case _ ⇒ " " + a.getClass
+ case r: ActorRefWithCell => " " + r.underlying.numberOfMessages + " messages"
+ case _ => " " + a.getClass
}
val parent = a match {
- case i: InternalActorRef ⇒ ", parent: " + i.getParent
- case _ ⇒ ""
+ case i: InternalActorRef => ", parent: " + i.getParent
+ case _ => ""
}
println(" -> " + a + status + messages + parent)
}
@@ -89,249 +100,262 @@ private[akka] object MessageDispatcher {
abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator) extends /** @note IMPLEMENT IN SCALA.JS AbstractMessageDispatcher with BatchingExecutor with */ ExecutionContextExecutor {
- import MessageDispatcher._
- /** @note IMPLEMENT IN SCALA.JS
+ import MessageDispatcher._
+ /** @note IMPLEMENT IN SCALA.JS
import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset }
- */
- import configurator.prerequisites
-
- val mailboxes = prerequisites.mailboxes
- val eventStream = prerequisites.eventStream
-
- @volatile private[this] var _inhabitantsDoNotCallMeDirectly: Long = _ // DO NOT TOUCH!
- @volatile private[this] var _shutdownScheduleDoNotCallMeDirectly: Int = _ // DO NOT TOUCH!
+ */
+ import configurator.prerequisites
- /** @note IMPLEMENT IN SCALA.JS @tailrec */ private final def addInhabitants(add: Long): Long = {
- val c = inhabitants
- val r = c + add
+ val mailboxes = prerequisites.mailboxes
+ val eventStream = prerequisites.eventStream
- if (r < 0) {
- // We haven't succeeded in decreasing the inhabitants yet but the simple fact that we're trying to
- // go below zero means that there is an imbalance and we might as well throw the exception
- val e = new IllegalStateException("ACTOR SYSTEM CORRUPTED!!! A dispatcher can't have less than 0 inhabitants!")
- reportFailure(e)
- throw e
- }
+ @volatile private[this] var _inhabitantsDoNotCallMeDirectly: Long = _ // DO NOT TOUCH!
+ @volatile private[this] var _shutdownScheduleDoNotCallMeDirectly: Int = _ // DO NOT TOUCH!
+ private final def addInhabitants(add: Long): Long = {
/** @note IMPLEMENT IN SCALA.JS
- if (Unsafe.instance.compareAndSwapLong(this, inhabitantsOffset, c, r)) r else addInhabitants(add)
- */
- _inhabitantsDoNotCallMeDirectly = r
- r
- }
+ val old = Unsafe.instance.getAndAddLong(this, inhabitantsOffset, add)
+ val ret = old + add
+ */
+ _inhabitantsDoNotCallMeDirectly += add
+ if (_inhabitantsDoNotCallMeDirectly < 0) {
+ // We haven't succeeded in decreasing the inhabitants yet but the simple fact that we're trying to
+ // go below zero means that there is an imbalance and we might as well throw the exception
+ val e = new IllegalStateException("ACTOR SYSTEM CORRUPTED!!! A dispatcher can't have less than 0 inhabitants!")
+ reportFailure(e)
+ throw e
+ }
+ _inhabitantsDoNotCallMeDirectly
+ }
/** @note IMPLEMENT IN SCALA.JS
final def inhabitants: Long = Unsafe.instance.getLongVolatile(this, inhabitantsOffset)
- */
+ */
final def inhabitants: Long = _inhabitantsDoNotCallMeDirectly
/** @note IMPLEMENT IN SCALA.JS
private final def shutdownSchedule: Int = Unsafe.instance.getIntVolatile(this, shutdownScheduleOffset)
private final def updateShutdownSchedule(expect: Int, update: Int): Boolean = Unsafe.instance.compareAndSwapInt(this, shutdownScheduleOffset, expect, update)
- */
+ */
private final def shutdownSchedule: Int = _shutdownScheduleDoNotCallMeDirectly
private final def updateShutdownSchedule(expect: Int, update: Int): Boolean = {
_shutdownScheduleDoNotCallMeDirectly = update
true
}
- /**
- * Creates and returns a mailbox for the given actor.
- */
- protected[akka] def createMailbox(actor: Cell, mailboxType: MailboxType): Mailbox
-
- /**
- * Identifier of this dispatcher, corresponds to the full key
- * of the dispatcher configuration.
- */
- def id: String
-
- /**
- * Attaches the specified actor instance to this dispatcher, which includes
- * scheduling it to run for the first time (Create() is expected to have
- * been enqueued by the ActorCell upon mailbox creation).
- */
- final def attach(actor: ActorCell): Unit = {
- register(actor)
- registerForExecution(actor.mailbox, false, true)
- }
-
- /**
- * Detaches the specified actor instance from this dispatcher
- */
- final def detach(actor: ActorCell): Unit = try unregister(actor) finally ifSensibleToDoSoThenScheduleShutdown()
+ /**
+ * Creates and returns a mailbox for the given actor.
+ */
+ protected[akka] def createMailbox(actor: Cell, mailboxType: MailboxType): Mailbox
+
+ /**
+ * Identifier of this dispatcher, corresponds to the full key
+ * of the dispatcher configuration.
+ */
+ def id: String
+
+ /**
+ * Attaches the specified actor instance to this dispatcher, which includes
+ * scheduling it to run for the first time (Create() is expected to have
+ * been enqueued by the ActorCell upon mailbox creation).
+ */
+ final def attach(actor: ActorCell): Unit = {
+ register(actor)
+ registerForExecution(actor.mailbox, false, true)
+ }
+
+ /**
+ * Detaches the specified actor instance from this dispatcher
+ */
+ final def detach(actor: ActorCell): Unit =
+ try unregister(actor)
+ finally ifSensibleToDoSoThenScheduleShutdown()
+
+ /** @note IMPLEMENT IN SCALA.JS
+ final protected def resubmitOnBlock: Boolean = true // We want to avoid starvation
+ */
/** THIS COMES FROM BatchingExecutor */
override def execute(runnable: Runnable): Unit = unbatchedExecute(runnable)
/** END */
- final /** @note IMPLEMENT IN SCALA.JS override */ protected def unbatchedExecute(r: Runnable): Unit = {
- val invocation = TaskInvocation(eventStream, r, taskCleanup)
- addInhabitants(+1)
- try {
- executeTask(invocation)
- } catch {
- case t: Throwable ⇒
- addInhabitants(-1)
- throw t
- }
- }
-
- override def reportFailure(t: Throwable): Unit = t match {
- /** @note IMPLEMENT IN SCALA.JS case e: LogEventException ⇒ eventStream.publish(e.event) */
- case _ ⇒ eventStream.publish(Error(t, getClass.getName, getClass, t.getMessage))
- }
-
- @tailrec
- private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = {
- if (inhabitants <= 0) shutdownSchedule match {
- case UNSCHEDULED ⇒
- if (updateShutdownSchedule(UNSCHEDULED, SCHEDULED)) scheduleShutdownAction()
- else ifSensibleToDoSoThenScheduleShutdown()
- case SCHEDULED ⇒
- if (updateShutdownSchedule(SCHEDULED, RESCHEDULED)) ()
- else ifSensibleToDoSoThenScheduleShutdown()
- case RESCHEDULED ⇒
- }
- }
-
- private def scheduleShutdownAction(): Unit = {
- // IllegalStateException is thrown if scheduler has been shutdown
- try prerequisites.scheduler.scheduleOnce(shutdownTimeout, shutdownAction)(new ExecutionContext {
- override def execute(runnable: Runnable): Unit = scalajs.js.timers.setTimeout(0) { runnable.run() } // @note IMPLEMENT IN SCALA.JS runnable.run()
- override def reportFailure(t: Throwable): Unit = MessageDispatcher.this.reportFailure(t)
- }) catch {
- case _: IllegalStateException ⇒ shutdown()
- }
- }
-
- private final val taskCleanup: () ⇒ Unit = () ⇒ if (addInhabitants(-1) == 0) ifSensibleToDoSoThenScheduleShutdown()
-
- /**
- * If you override it, you must call it. But only ever once. See "attach" for only invocation.
- *
- * INTERNAL API
- */
- protected[akka] def register(actor: ActorCell) {
- /**@note IMPLEMENT IN SCALA.JS if (debug) actors.put(this, actor.self) */
- addInhabitants(+1)
+ final /** @note IMPLEMENT IN SCALA.JS override */ protected def unbatchedExecute(r: Runnable): Unit = {
+ val invocation = TaskInvocation(eventStream, r, taskCleanup)
+ addInhabitants(+1)
+ try {
+ executeTask(invocation)
+ } catch {
+ case t: Throwable =>
+ addInhabitants(-1)
+ throw t
+ }
+ }
+
+ override def reportFailure(t: Throwable): Unit = t match {
+ /** @note IMPLEMENT IN SCALA.JS case e: LogEventException => eventStream.publish(e.event) */
+ case _ => eventStream.publish(Error(t, getClass.getName, getClass, t.getMessage))
+ }
+
+ @tailrec
+ private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = {
+ if (inhabitants <= 0) shutdownSchedule match {
+ case UNSCHEDULED =>
+ if (updateShutdownSchedule(UNSCHEDULED, SCHEDULED)) scheduleShutdownAction()
+ else ifSensibleToDoSoThenScheduleShutdown()
+ case SCHEDULED =>
+ if (updateShutdownSchedule(SCHEDULED, RESCHEDULED)) ()
+ else ifSensibleToDoSoThenScheduleShutdown()
+ case RESCHEDULED =>
+ case unexpected =>
+ throw new IllegalArgumentException(s"Unexpected actor class marker: $unexpected") // will not happen, for exhaustiveness check
+ }
+ }
+
+ private def scheduleShutdownAction(): Unit = {
+ // IllegalStateException is thrown if scheduler has been shutdown
+ try prerequisites.scheduler.scheduleOnce(shutdownTimeout, shutdownAction)(new ExecutionContext {
+ override def execute(runnable: Runnable): Unit = runnable.run()
+ override def reportFailure(t: Throwable): Unit = MessageDispatcher.this.reportFailure(t)
+ }) catch {
+ case _: IllegalStateException =>
+ shutdown()
+ // Since there is no scheduler anymore, restore the state to UNSCHEDULED.
+ // When this dispatcher is used again,
+ // shutdown is only attempted if the state is UNSCHEDULED
+ // (as per ifSensibleToDoSoThenScheduleShutdown above)
+ updateShutdownSchedule(SCHEDULED, UNSCHEDULED)
+ }
+ }
+
+ private final val taskCleanup: () => Unit = () => if (addInhabitants(-1) == 0) ifSensibleToDoSoThenScheduleShutdown()
+
+ /**
+ * If you override it, you must call it. But only ever once. See "attach" for only invocation.
+ *
+ * INTERNAL API
+ */
+ protected[akka] def register(actor: ActorCell) {
+ /**@note IMPLEMENT IN SCALA.JS if (debug) actors.put(this, actor.self) */
+ addInhabitants(+1)
+ }
+
+ /**
+ * If you override it, you must call it. But only ever once. See "detach" for the only invocation
+ *
+ * INTERNAL API
+ */
+ protected[akka] def unregister(actor: ActorCell) {
+ /**@note IMPLEMENT IN SCALA.JS if (debug) actors.remove(this, actor.self) */
+ addInhabitants(-1)
+ val mailBox = actor.swapMailbox(mailboxes.deadLetterMailbox)
+ mailBox.becomeClosed()
+ mailBox.cleanUp()
}
- /**
- * If you override it, you must call it. But only ever once. See "detach" for the only invocation
- *
- * INTERNAL API
- */
- protected[akka] def unregister(actor: ActorCell) {
- /**@note IMPLEMENT IN SCALA.JS if (debug) actors.remove(this, actor.self) */
- addInhabitants(-1)
- val mailBox = actor.swapMailbox(mailboxes.deadLetterMailbox)
- mailBox.becomeClosed()
- mailBox.cleanUp()
- }
-
- private val shutdownAction = new Runnable {
- @tailrec
- final def run() {
- shutdownSchedule match {
- case SCHEDULED ⇒
- try {
- if (inhabitants == 0) shutdown() //Warning, racy
- } finally {
- while (!updateShutdownSchedule(shutdownSchedule, UNSCHEDULED)) {}
- }
- case RESCHEDULED ⇒
- if (updateShutdownSchedule(RESCHEDULED, SCHEDULED)) scheduleShutdownAction()
- else run()
- case UNSCHEDULED ⇒
+ private val shutdownAction = new Runnable {
+ @tailrec
+ final def run() {
+ shutdownSchedule match {
+ case SCHEDULED =>
+ try {
+ if (inhabitants == 0) shutdown() //Warning, racy
+ } finally {
+ while (!updateShutdownSchedule(shutdownSchedule, UNSCHEDULED)) {}
+ }
+ case RESCHEDULED =>
+ if (updateShutdownSchedule(RESCHEDULED, SCHEDULED)) scheduleShutdownAction()
+ else run()
+ case UNSCHEDULED =>
+ case unexpected =>
+ throw new IllegalArgumentException(s"Unexpected actor class marker: $unexpected") // will not happen, for exhaustiveness check
}
- }
- }
-
- /**
- * When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down,
- * defaulting to your akka configs "akka.actor.default-dispatcher.shutdown-timeout" or default specified in
- * reference.conf
- *
- * INTERNAL API
- */
- protected[akka] def shutdownTimeout: FiniteDuration
-
- /**
- * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
- */
- protected[akka] def suspend(actor: ActorCell): Unit = {
- val mbox = actor.mailbox
- if ((mbox.actor eq actor) && (mbox.dispatcher eq this))
- mbox.suspend()
- }
-
- /*
- * After the call to this method, the dispatcher must begin any new message processing for the specified reference
- */
- protected[akka] def resume(actor: ActorCell): Unit = {
- val mbox = actor.mailbox
- if ((mbox.actor eq actor) && (mbox.dispatcher eq this) && mbox.resume())
- registerForExecution(mbox, false, false)
- }
-
- /**
- * Will be called when the dispatcher is to queue an invocation for execution
- *
- * INTERNAL API
- */
- protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage)
-
- /**
- * Will be called when the dispatcher is to queue an invocation for execution
- *
- * INTERNAL API
- */
- protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope)
-
- /**
- * Suggest to register the provided mailbox for execution
- *
- * INTERNAL API
- */
- protected[akka] def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean
-
- // TODO check whether this should not actually be a property of the mailbox
- /**
- * INTERNAL API
- */
- protected[akka] def throughput: Int
-
- /**
- * INTERNAL API
- */
- protected[akka] def throughputDeadlineTime: Duration
-
- /**
- * INTERNAL API
- */
- @inline protected[akka] final val isThroughputDeadlineTimeDefined = throughputDeadlineTime.toMillis > 0
-
- /**
- * INTERNAL API
- */
+ }
+ }
+
+ /**
+ * When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down,
+ * defaulting to your akka configs "akka.actor.default-dispatcher.shutdown-timeout" or default specified in
+ * reference.conf
+ *
+ * INTERNAL API
+ */
+ protected[akka] def shutdownTimeout: FiniteDuration
+
+ /**
+ * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
+ */
+ protected[akka] def suspend(actor: ActorCell): Unit = {
+ val mbox = actor.mailbox
+ if ((mbox.actor eq actor) && (mbox.dispatcher eq this))
+ mbox.suspend()
+ }
+
+ /*
+ * After the call to this method, the dispatcher must begin any new message processing for the specified reference
+ */
+ protected[akka] def resume(actor: ActorCell): Unit = {
+ val mbox = actor.mailbox
+ if ((mbox.actor eq actor) && (mbox.dispatcher eq this) && mbox.resume())
+ registerForExecution(mbox, false, false)
+ }
+
+ /**
+ * Will be called when the dispatcher is to queue an invocation for execution
+ *
+ * INTERNAL API
+ */
+ protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage)
+
+ /**
+ * Will be called when the dispatcher is to queue an invocation for execution
+ *
+ * INTERNAL API
+ */
+ protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope)
+
+ /**
+ * Suggest to register the provided mailbox for execution
+ *
+ * INTERNAL API
+ */
+ protected[akka] def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean
+
+ // TODO check whether this should not actually be a property of the mailbox
+ /**
+ * INTERNAL API
+ */
+ protected[akka] def throughput: Int
+
+ /**
+ * INTERNAL API
+ */
+ protected[akka] def throughputDeadlineTime: Duration
+
+ /**
+ * INTERNAL API
+ */
+ @inline protected[akka] final val isThroughputDeadlineTimeDefined = throughputDeadlineTime.toMillis > 0
+
+ /**
+ * INTERNAL API
+ */
protected[akka] def executeTask(invocation: TaskInvocation)
- /**
- * Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached
- * Must be idempotent
- *
- * INTERNAL API
- */
- protected[akka] def shutdown(): Unit
+ /**
+ * Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached
+ * Must be idempotent
+ *
+ * INTERNAL API
+ */
+ @InternalStableApi
+ protected[akka] def shutdown(): Unit
}
-
/**
* An ExecutorServiceConfigurator is a class that given some prerequisites and a configuration can create instances of ExecutorService
*/
-abstract class ExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceFactoryProvider
-
+abstract class ExecutorServiceConfigurator(@unused config: Config, @unused prerequisites: DispatcherPrerequisites)
+ extends ExecutorServiceFactoryProvider
/**
* Base class to be used for hooking in new dispatchers into Dispatchers.
@@ -352,8 +376,56 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites:
def configureExecutor(): ExecutorServiceConfigurator = {
def configurator(executor: String): ExecutorServiceConfigurator = executor match {
- case "event-loop-executor" ⇒ new EventLoopExecutorConfigurator(new Config, prerequisites)
+ case null | "" | "macrotask-executor" => new MacrotaskExecutorConfigurator(new Config, prerequisites)
+ case "event-loop-executor" => new EventLoopExecutorConfigurator(new Config, prerequisites)
+ case "queue-executor" => new QueueExecutorConfigurator(new Config, prerequisites)
+ case fqcn =>
+ val args = List(classOf[Config] -> config, classOf[DispatcherPrerequisites] -> prerequisites)
+ prerequisites.dynamicAccess
+ .createInstanceFor[ExecutorServiceConfigurator](fqcn, args)
+ .recover {
+ case exception =>
+ throw new IllegalArgumentException(
+ ("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
+ make sure it has an accessible constructor with a [%s,%s] signature""")
+ .format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]),
+ exception)
+ }
+ .get
+ }
+
+ config.getString("executor") match {
+ case "default-executor" =>
+ new DefaultExecutorServiceConfigurator(
+ config.getConfig("default-executor"),
+ prerequisites,
+ configurator(config.getString("default-executor.fallback")))
+ case other => configurator(other)
}
- configurator("event-loop-executor")
}
}
+
+class DefaultExecutorServiceConfigurator(
+ config: Config,
+ prerequisites: DispatcherPrerequisites,
+ fallback: ExecutorServiceConfigurator)
+ extends ExecutorServiceConfigurator(config, prerequisites) {
+ val provider: ExecutorServiceFactoryProvider =
+ prerequisites.defaultExecutionContext match {
+ case Some(ec) =>
+ prerequisites.eventStream.publish(
+ Debug(
+ "DefaultExecutorServiceConfigurator",
+ this.getClass,
+ s"Using passed in ExecutionContext as default executor for this ActorSystem. If you want to use a different executor, please specify one in akka.actor.default-dispatcher.default-executor."))
+
+ new ExecutionContextExecutorServiceDelegate(ec) with ExecutorServiceFactory with ExecutorServiceFactoryProvider {
+ def createExecutorServiceFactory(id: String): ExecutorServiceFactory = this
+ def createExecutorService: ExecutorService = this
+ }
+ case None => fallback
+ }
+
+ def createExecutorServiceFactory(id: String): ExecutorServiceFactory =
+ provider.createExecutorServiceFactory(id)
+}
diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/Dispatcher.scala
index 9436493..a80c1ed 100644
--- a/akka-js-actor/js/src/main/scala/akka/dispatch/Dispatcher.scala
+++ b/akka-js-actor/js/src/main/scala/akka/dispatch/Dispatcher.scala
@@ -68,8 +68,8 @@ class Dispatcher(
/**
* INTERNAL API
*/
- protected[akka] def executeTask(invocation: TaskInvocation) {
- executorService execute invocation
+ protected[akka] def executeTask(invocation: TaskInvocation): Unit = {
+ executorService.execute(invocation)
}
/**
@@ -100,7 +100,7 @@ class Dispatcher(
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
if (mbox.setAsScheduled()) {
- executorService execute mbox
+ executorService.execute(mbox)
mbox.setAsIdle()
true
} else false
@@ -114,7 +114,7 @@ object PriorityGenerator {
/**
* Creates a PriorityGenerator that uses the supplied function as priority generator
*/
- def apply(priorityFunction: Any ⇒ Int): PriorityGenerator = new PriorityGenerator {
+ def apply(priorityFunction: Any => Int): PriorityGenerator = new PriorityGenerator {
def gen(message: Any): Int = priorityFunction(message)
}
}
diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/Dispatchers.scala
index 74b475c..72d18bf 100644
--- a/akka-js-actor/js/src/main/scala/akka/dispatch/Dispatchers.scala
+++ b/akka-js-actor/js/src/main/scala/akka/dispatch/Dispatchers.scala
@@ -5,20 +5,20 @@
package akka.dispatch
import java.util.concurrent.TimeUnit
-
-import java.util.concurrent.{ ConcurrentHashMap, ThreadFactory }
-import com.typesafe.config.{ ConfigFactory, Config }
-import akka.actor.{ Scheduler, DynamicAccess, ActorSystem }
-import com.typesafe.config.Config
-import akka.actor.{ Scheduler, ActorSystem }
+import java.util.concurrent.{ConcurrentHashMap, ThreadFactory}
+import com.typesafe.config.{Config, ConfigFactory, ConfigValueType}
+import akka.actor.{ActorSystem, DynamicAccess, Scheduler}
+import akka.actor.{ActorSystem, Scheduler}
import akka.event.Logging.Warning
-import akka.event.{ EventStream, LoggingAdapter }
+import akka.event.{EventStream, LoggingAdapter}
+
import scala.concurrent.duration.Duration
import akka.ConfigurationException
import akka.actor.Deploy
-/** @note IMPLEMENT IN SCALA.JS
+import akka.annotation.{DoNotInherit, InternalApi}
+import org.akkajs.shocon
import akka.util.Helpers.ConfigOps
-*/
+
import scala.concurrent.ExecutionContext
/**
@@ -37,14 +37,16 @@ trait DispatcherPrerequisites {
/**
* INTERNAL API
*/
+@InternalApi
private[akka] final case class DefaultDispatcherPrerequisites(
- val threadFactory: ThreadFactory,
- val eventStream: EventStream,
- val scheduler: Scheduler,
- val dynamicAccess: DynamicAccess,
- val settings: ActorSystem.Settings,
- val mailboxes: Mailboxes,
- val defaultExecutionContext: Option[ExecutionContext]) extends DispatcherPrerequisites
+ threadFactory: ThreadFactory,
+ eventStream: EventStream,
+ scheduler: Scheduler,
+ dynamicAccess: DynamicAccess,
+ settings: ActorSystem.Settings,
+ mailboxes: Mailboxes,
+ defaultExecutionContext: Option[ExecutionContext])
+ extends DispatcherPrerequisites
object Dispatchers {
/**
@@ -56,8 +58,8 @@ object Dispatchers {
/**
* INTERNAL API
*/
+ @InternalApi
private[akka] final val InternalDispatcherId = "akka.actor.internal-dispatcher"
-
}
/**
@@ -67,8 +69,12 @@ object Dispatchers {
*
* Look in `akka.actor.default-dispatcher` section of the reference.conf
* for documentation of dispatcher options.
+ *
+ * Not for user instantiation or extension
*/
-class Dispatchers(val settings: ActorSystem.Settings,
+@DoNotInherit
+class Dispatchers @InternalApi private[akka] (
+ val settings: ActorSystem.Settings,
val prerequisites: DispatcherPrerequisites,
logger: LoggingAdapter) {
@@ -77,8 +83,11 @@ class Dispatchers(val settings: ActorSystem.Settings,
/** @note IMPLEMENT IN SCALA.JS
val cachingConfig = new CachingConfig(settings.config)
*/
+
+ val cachingConfig: Config = settings.config
+
val defaultDispatcherConfig: Config =
- idConfig(DefaultDispatcherId).withFallback(settings.config.getConfig(DefaultDispatcherId))
+ idConfig(DefaultDispatcherId).withFallback(cachingConfig.getConfig(DefaultDispatcherId))
/**
* The one and only default dispatcher.
@@ -99,9 +108,7 @@ class Dispatchers(val settings: ActorSystem.Settings,
* @throws ConfigurationException if the specified dispatcher cannot be found in the configuration
*/
- def lookup(id: String): MessageDispatcher =
- //new DispatcherConfigurator(new Config, prerequisites).dispatcher()
- lookupConfigurator(id).dispatcher()
+ def lookup(id: String): MessageDispatcher = lookupConfigurator(id).dispatcher()
/**
* Checks that the configuration provides a section for the given dispatcher.
@@ -113,19 +120,20 @@ class Dispatchers(val settings: ActorSystem.Settings,
private def lookupConfigurator(id: String): MessageDispatcherConfigurator = {
dispatcherConfigurators.get(id) match {
- case null ⇒
+ case null =>
// It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup.
// That shouldn't happen often and in case it does the actual ExecutorService isn't
// created until used, i.e. cheap.
- val newConfigurator = new DispatcherConfigurator(config(id), prerequisites)
- //if (cachingConfig.hasPath(id)) configuratorFrom(config(id))
- //else throw new ConfigurationException(s"Dispatcher [$id] not configured")
+
+ val newConfigurator: MessageDispatcherConfigurator =
+ if (cachingConfig.hasPath(id)) configuratorFrom(config(id))
+ else throw new ConfigurationException(s"Dispatcher [$id] not configured")
dispatcherConfigurators.putIfAbsent(id, newConfigurator) match {
- case null ⇒ newConfigurator
- case existing ⇒ existing
+ case null => newConfigurator
+ case existing => existing
}
- case existing ⇒ existing
+ case existing => existing
}
}
@@ -145,18 +153,19 @@ class Dispatchers(val settings: ActorSystem.Settings,
def registerConfigurator(id: String, configurator: MessageDispatcherConfigurator): Boolean =
dispatcherConfigurators.putIfAbsent(id, configurator) == null
*/
+
/**
* INTERNAL API
*/
private[akka] def config(id: String): Config = {
- config(id, settings.config.getConfig(id))
+ config(id, cachingConfig.getConfig(id))
}
/**
* INTERNAL API
*/
private[akka] def config(id: String, appConfig: Config): Config = {
- import scala.collection.JavaConverters._
+ import akka.util.ccompat.JavaConverters._
def simpleName = id.substring(id.lastIndexOf('.') + 1)
idConfig(id)
.withFallback(appConfig)
@@ -165,13 +174,8 @@ class Dispatchers(val settings: ActorSystem.Settings,
}
private def idConfig(id: String): Config = {
- com.typesafe.config.Config(
- org.akkajs.shocon.Config.Object(
- Map(
- "id" -> org.akkajs.shocon.Config.StringLiteral(id)
- )
- )
- )
+ import akka.util.ccompat.JavaConverters._
+ ConfigFactory.parseMap(Map[String, Any]("id" -> id).asJava)
}
/**
@@ -199,31 +203,32 @@ class Dispatchers(val settings: ActorSystem.Settings,
* IllegalArgumentException if it cannot create the MessageDispatcherConfigurator
*/
private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = {
- /** IMPLEMENT IN SCALA.JS
- if (!cfg.hasPath("id")) throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render)
+ if (!cfg.hasPath("id"))
+ throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render)
cfg.getString("type") match {
- case "Dispatcher" ⇒ new DispatcherConfigurator(cfg, prerequisites)
- case "BalancingDispatcher" ⇒
+ case "Dispatcher" => new DispatcherConfigurator(cfg, prerequisites)
+ /** IMPLEMENT IN SCALA.JS
+ case "BalancingDispatcher" =>
// FIXME remove this case in 2.4
throw new IllegalArgumentException("BalancingDispatcher is deprecated, use a BalancingPool instead. " +
"During a migration period you can still use BalancingDispatcher by specifying the full class name: " +
classOf[BalancingDispatcherConfigurator].getName)
- case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites)
- case fqn ⇒
+ case "PinnedDispatcher" => new PinnedDispatcherConfigurator(cfg, prerequisites)
+ */
+ case fqn =>
val args = List(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites)
- prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({
- case exception ⇒
- throw new ConfigurationException(
- ("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " +
- "make sure it has constructor with [com.typesafe.config.Config] and " +
- "[akka.dispatch.DispatcherPrerequisites] parameters")
- .format(fqn, cfg.getString("id")), exception)
- }).get
+ prerequisites.dynamicAccess
+ .createInstanceFor[MessageDispatcherConfigurator](fqn, args)
+ .recover {
+ case exception =>
+ throw new ConfigurationException(
+ ("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " +
+ "make sure it has constructor with [com.typesafe.config.Config] and " +
+ "[akka.dispatch.DispatcherPrerequisites] parameters")
+ .format(fqn, cfg.getString("id")), exception)
+ }.get
}
- */
- // WE ONLY HAVE ONE DISPATCHER, I'M AFRAID
- new DispatcherConfigurator(cfg, prerequisites)
}
}
@@ -237,11 +242,11 @@ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisi
private val instance = new Dispatcher(
this,
- /** @note IMPLEMENT IN SCALA.JS config.getString("id"), */ scala.util.Random.nextString(4),
- /** @note IMPLEMENT IN SCALA.JS config.getInt("throughput"), */ 10, // TODO: MAKE IT CONFIGURABLE
- /** @note IMPLEMENT IN SCALA.JS config.getNanosDuration("throughput-deadline-time"), */ Duration.fromNanos(1000),
+ config.getString("id"),
+ config.getInt("throughput"),
+ ConfigOps(config).getNanosDuration("throughput-deadline-time") /** @note IMPLEMENT IN SCALA.JS config.getNanosDuration("throughput-deadline-time") */,
configureExecutor(),
- /** @note IMPLEMENT IN SCALA.JS config.getMillisDuration("shutdown-timeout")) */ Duration.create(1, TimeUnit.SECONDS))
+ ConfigOps(config).getMillisDuration("shutdown-timeout") /** @note IMPLEMENT IN SCALA.JS config.getMillisDuration("shutdown-timeout") */)
/**
* Returns the same dispatcher instance for each invocation
@@ -252,7 +257,7 @@ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisi
/**
* INTERNAL API
*/
-/** @note IMPLEMENT IN SCALA.J
+/** @note IMPLEMENT IN SCALA.JS
private[akka] object BalancingDispatcherConfigurator {
private val defaultRequirement =
ConfigFactory.parseString("mailbox-requirement = akka.dispatch.MultipleConsumerSemantics")
@@ -314,8 +319,8 @@ class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrer
extends MessageDispatcherConfigurator(config, prerequisites) {
private val threadPoolConfig: ThreadPoolConfig = configureExecutor() match {
- case e: ThreadPoolExecutorConfigurator ⇒ e.threadPoolConfig
- case other ⇒
+ case e: ThreadPoolExecutorConfigurator => e.threadPoolConfig
+ case other =>
prerequisites.eventStream.publish(
Warning("PinnedDispatcherConfigurator",
this.getClass,
diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/ExecutorServiceDelegate.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/EventLoopExecutor.scala
similarity index 100%
rename from akka-js-actor/js/src/main/scala/akka/dispatch/ExecutorServiceDelegate.scala
rename to akka-js-actor/js/src/main/scala/akka/dispatch/EventLoopExecutor.scala
diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/EventLoopExecutorConfigurator.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/EventLoopExecutorConfigurator.scala
index 4fb1561..f729a55 100644
--- a/akka-js-actor/js/src/main/scala/akka/dispatch/EventLoopExecutorConfigurator.scala
+++ b/akka-js-actor/js/src/main/scala/akka/dispatch/EventLoopExecutorConfigurator.scala
@@ -1,8 +1,11 @@
package akka.dispatch
import com.typesafe.config.Config
+
import java.util.concurrent.ExecutorService
+import scala.scalajs.reflect.annotation.EnableReflectiveInstantiation
+@EnableReflectiveInstantiation
class EventLoopExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) {
def createExecutorServiceFactory(id: String): ExecutorServiceFactory =
diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/ExecutionContextExecutorServiceDelegate.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/ExecutionContextExecutorServiceDelegate.scala
new file mode 100644
index 0000000..a53da60
--- /dev/null
+++ b/akka-js-actor/js/src/main/scala/akka/dispatch/ExecutionContextExecutorServiceDelegate.scala
@@ -0,0 +1,14 @@
+package akka.dispatch
+
+import java.util
+import java.util.concurrent.{AbstractExecutorService, TimeUnit}
+import scala.concurrent.ExecutionContext
+
+private[akka] class ExecutionContextExecutorServiceDelegate(ec: ExecutionContext) extends AbstractExecutorService {
+ def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = false
+ def execute(runnable: Runnable): Unit = ec.execute(runnable)
+ def isShutdown: Boolean = false
+ def isTerminated: Boolean = false
+ def shutdown(): Unit = ()
+ def shutdownNow(): util.List[Runnable] = util.Collections.emptyList()
+}
diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/MacrotaskExecutorConfigurator.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/MacrotaskExecutorConfigurator.scala
new file mode 100644
index 0000000..7b12b2f
--- /dev/null
+++ b/akka-js-actor/js/src/main/scala/akka/dispatch/MacrotaskExecutorConfigurator.scala
@@ -0,0 +1,17 @@
+package akka.dispatch
+
+import com.typesafe.config.Config
+import org.scalajs.macrotaskexecutor.MacrotaskExecutor
+
+import java.util.concurrent.ExecutorService
+import scala.scalajs.reflect.annotation.EnableReflectiveInstantiation
+
+@EnableReflectiveInstantiation
+class MacrotaskExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) {
+
+ private lazy val factory = new ExecutorServiceFactory {
+ def createExecutorService: ExecutorService = new ExecutionContextExecutorServiceDelegate(MacrotaskExecutor)
+ }
+
+ def createExecutorServiceFactory(id: String): ExecutorServiceFactory = factory
+}
diff --git a/akka-js-actor/js/src/main/scala/akka/dispatch/QueueExecutorConfigurator.scala b/akka-js-actor/js/src/main/scala/akka/dispatch/QueueExecutorConfigurator.scala
new file mode 100644
index 0000000..7942caa
--- /dev/null
+++ b/akka-js-actor/js/src/main/scala/akka/dispatch/QueueExecutorConfigurator.scala
@@ -0,0 +1,17 @@
+package akka.dispatch
+
+import com.typesafe.config.Config
+
+import java.util.concurrent.ExecutorService
+import scala.scalajs.concurrent.JSExecutionContext
+import scala.scalajs.reflect.annotation.EnableReflectiveInstantiation
+
+@EnableReflectiveInstantiation
+class QueueExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) {
+
+ private lazy val factory = new ExecutorServiceFactory {
+ def createExecutorService: ExecutorService = new ExecutionContextExecutorServiceDelegate(JSExecutionContext.queue)
+ }
+
+ def createExecutorServiceFactory(id: String): ExecutorServiceFactory = factory
+}
diff --git a/akka-js-actor/js/src/main/scala/java/util/concurrent/AbstractExecutorService.scala b/akka-js-actor/js/src/main/scala/java/util/concurrent/AbstractExecutorService.scala
new file mode 100644
index 0000000..87d5795
--- /dev/null
+++ b/akka-js-actor/js/src/main/scala/java/util/concurrent/AbstractExecutorService.scala
@@ -0,0 +1,13 @@
+package java.util.concurrent
+
+import java.util
+
+abstract class AbstractExecutorService extends ExecutorService {
+ def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = ???
+ def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = ???
+ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = ???
+ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = ???
+ def submit[T](task: Callable[T]): Future[T] = ???
+ def submit(task: Runnable): Future[_] = ???
+ def submit[T](task: Runnable, result: T): Future[T] = ???
+}
diff --git a/akka-js-stream-testkit/js/src/test/resources/application.conf b/akka-js-stream-testkit/js/src/test/resources/application.conf
index 263879c..b5fcae1 100644
--- a/akka-js-stream-testkit/js/src/test/resources/application.conf
+++ b/akka-js-stream-testkit/js/src/test/resources/application.conf
@@ -96,7 +96,7 @@ akka {
type = "Dispatcher"
executor = "default-executor"
default-executor {
- fallback = "fork-join-executor"
+ fallback = "macrotask-executor"
}
fork-join-executor {
parallelism-min = 8
@@ -124,6 +124,14 @@ akka {
mailbox-requirement = ""
}
+ internal-dispatcher {
+ type = "Dispatcher"
+ executor = "macrotask-executor"
+ shutdown-timeout = 1s
+ throughput = 5
+ throughput-deadline-time = 0ms
+ }
+
default-blocking-io-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
@@ -336,8 +344,8 @@ akka {
creation-timeout = 20s
initial-input-buffer-size = 4
max-input-buffer-size = 16
+ dispatcher = "akka.actor.default-dispatcher"
blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher"
- dispatcher = ""
subscription-timeout {
mode = cancel
timeout = 5s
diff --git a/akka-js-testkit/js/src/main/resources/application.conf b/akka-js-testkit/js/src/main/resources/application.conf
index 44c0257..b623079 100644
--- a/akka-js-testkit/js/src/main/resources/application.conf
+++ b/akka-js-testkit/js/src/main/resources/application.conf
@@ -95,7 +95,7 @@ akka {
type = "Dispatcher"
executor = "default-executor"
default-executor {
- fallback = "fork-join-executor"
+ fallback = "macrotask-executor"
}
fork-join-executor {
parallelism-min = 8
@@ -123,6 +123,14 @@ akka {
mailbox-requirement = ""
}
+ internal-dispatcher {
+ type = "Dispatcher"
+ executor = "macrotask-executor"
+ shutdown-timeout = 1s
+ throughput = 5
+ throughput-deadline-time = 0ms
+ }
+
default-blocking-io-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
@@ -330,8 +338,8 @@ akka {
creation-timeout = 20s
initial-input-buffer-size = 4
max-input-buffer-size = 16
+ dispatcher = "akka.actor.default-dispatcher"
blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher"
- dispatcher = ""
subscription-timeout {
mode = cancel
timeout = 5s
diff --git a/akka-js-testkit/js/src/test/resources/application.conf b/akka-js-testkit/js/src/test/resources/application.conf
index 263879c..b5fcae1 100644
--- a/akka-js-testkit/js/src/test/resources/application.conf
+++ b/akka-js-testkit/js/src/test/resources/application.conf
@@ -96,7 +96,7 @@ akka {
type = "Dispatcher"
executor = "default-executor"
default-executor {
- fallback = "fork-join-executor"
+ fallback = "macrotask-executor"
}
fork-join-executor {
parallelism-min = 8
@@ -124,6 +124,14 @@ akka {
mailbox-requirement = ""
}
+ internal-dispatcher {
+ type = "Dispatcher"
+ executor = "macrotask-executor"
+ shutdown-timeout = 1s
+ throughput = 5
+ throughput-deadline-time = 0ms
+ }
+
default-blocking-io-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
@@ -336,8 +344,8 @@ akka {
creation-timeout = 20s
initial-input-buffer-size = 4
max-input-buffer-size = 16
+ dispatcher = "akka.actor.default-dispatcher"
blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher"
- dispatcher = ""
subscription-timeout {
mode = cancel
timeout = 5s
diff --git a/build.sbt b/build.sbt
index 1312dd4..d19179a 100644
--- a/build.sbt
+++ b/build.sbt
@@ -1,9 +1,11 @@
-val akkaJsVersion = "2.2.6.14"
+val akkaJsVersion = "2.2.6.14-SNAPSHOT"
val akkaOriginalVersion = "v2.6.14"
+val scalaVersions = Seq("2.12.15", "2.13.8")
+
val commonSettings = Seq(
- scalaVersion := "2.13.2",
- crossScalaVersions := Seq("2.12.13", "2.13.5"),
+ crossScalaVersions := scalaVersions,
+ scalaVersion := scalaVersions.last,
organization := "org.akka-js",
scalacOptions ++= Seq(
"-deprecation",
@@ -227,7 +229,8 @@ lazy val akkaJsActor = crossProject(JSPlatform)
scalaJSLinkerConfig ~= (_.withCheckIR(true)),
libraryDependencies ++= {
Seq(
- "org.akka-js" %%% "shocon" % "1.0.0",
+ "org.akka-js" %%% "shocon" % "1.1.0-SNAPSHOT",
+ "org.scala-js" %%% "scala-js-macrotask-executor" % "1.0.0",
"org.scala-lang.modules" %% "scala-java8-compat" % "0.9.1"
)
},
@@ -311,7 +314,8 @@ lazy val akkaJsTestkit = crossProject(JSPlatform)
.jsSettings(useAnnotationAdderPluginSettings : _*)
.jsSettings(
libraryDependencies ++= Seq(
- "org.scalatest" %%% "scalatest" % "3.1.1" withSources ()
+ "org.scalatest" %%% "scalatest" % "3.1.1" withSources (),
+ "org.scala-js" %%% "scalajs-fake-weakreferences" % "1.0.0"
),
scalaJSStage in Global := FastOptStage,
publishArtifact in (Test, packageBin) := true,
@@ -449,7 +453,8 @@ lazy val akkaJsStreamTestkit = crossProject(JSPlatform)
).jsSettings(
scalaJSStage in Global := FastOptStage,
libraryDependencies ++= Seq(
- "org.scalatest" %%% "scalatest" % "3.1.1" withSources ()
+ "org.scalatest" %%% "scalatest" % "3.1.1" withSources (),
+ "org.scala-js" %%% "scalajs-fake-weakreferences" % "1.0.0"
),
publishArtifact in (Test, packageBin) := true,
//scalaJSOptimizerOptions ~= { _.withDisableOptimizer(true) },
@@ -595,6 +600,7 @@ lazy val akkaJsStreamTestkit = crossProject(JSPlatform)
publishArtifact in (Test, packageBin) := true,
libraryDependencies ++= Seq(
"org.scalatest" %%% "scalatest" % "3.1.1" withSources (),
+ "org.scala-js" %%% "scalajs-fake-weakreferences" % "1.0.0",
"org.scala-lang.modules" %% "scala-java8-compat" % "0.9.1"
),
//scalaJSOptimizerOptions ~= { _.withDisableOptimizer(true) },
diff --git a/config/annotation_adder.config b/config/annotation_adder.config
index eef1410..5e66d6f 100644
--- a/config/annotation_adder.config
+++ b/config/annotation_adder.config
@@ -8,5 +8,6 @@ akka.testkit.TestEventListener scala.scalajs.reflect.annotation.EnableReflective
akka.testkit.EchoActor scala.scalajs.reflect.annotation.EnableReflectiveInstantiation
akka.testkit.BlackholeActor scala.scalajs.reflect.annotation.EnableReflectiveInstantiation
akka.testkit.ForwardActor scala.scalajs.reflect.annotation.EnableReflectiveInstantiation
+akka.testkit.CallingThreadDispatcherConfigurator scala.scalajs.reflect.annotation.EnableReflectiveInstantiation
akka.typed.DefaultLoggingFilter scala.scalajs.reflect.annotation.EnableReflectiveInstantiation
akka.typed.DefaultLogger scala.scalajs.reflect.annotation.EnableReflectiveInstantiation
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 79d6f02..218e5cc 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -1,6 +1,6 @@
-val scalaJsVersion = "1.0.1"
+val scalaJsVersion = "1.8.0"
-addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.0.0")
+addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.1.0")
addSbtPlugin("org.scala-js" % "sbt-scalajs" % scalaJsVersion)
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.3")