Caution
|
FIXME
|
RPC Environment (aka RpcEnv) is an environment for RpcEndpoints to process messages. A RPC Environment manages the entire lifecycle of RpcEndpoints:
-
registers (sets up) endpoints (by name or uri)
-
routes incoming messages to them
-
stops them
A RPC Environment is defined by the name, host, and port. It can also be controlled by a security manager.
The only implementation of RPC Environment is Netty-based implementation. Read the section RpcEnvFactory.
RpcEndpoints define how to handle messages (what functions to execute given a message). RpcEndpoints register (with a name or uri) to RpcEnv to receive messages from RpcEndpointRefs.
RpcEndpointRefs can be looked up by name or uri (because different RpcEnvs may have different naming schemes).
org.apache.spark.rpc
package contains the machinery for RPC communication in Spark.
Spark comes with (private[spark] trait
) RpcEnvFactory
which is the factory contract to create a RPC Environment.
An RpcEnvFactory implementation has a single method create(config: RpcEnvConfig): RpcEnv
that returns a RpcEnv
for a given RpcEnvConfig.
There are two RpcEnvFactory
implementations in Spark:
-
netty
usingorg.apache.spark.rpc.netty.NettyRpcEnvFactory
. This is the default factory for RpcEnv as of Spark 1.6.0-SNAPSHOT. -
akka
usingorg.apache.spark.rpc.akka.AkkaRpcEnvFactory
You can choose an RPC implementation to use by spark.rpc (default: netty
). The setting can be one of the two short names for the known RpcEnvFactories - netty
or akka
- or a fully-qualified class name of your custom factory (including Netty-based and Akka-based implementations).
$ ./bin/spark-shell --conf spark.rpc=netty
$ ./bin/spark-shell --conf spark.rpc=org.apache.spark.rpc.akka.AkkaRpcEnvFactory
RpcEndpoint defines how to handle messages (what functions to execute given a message). RpcEndpoints live inside RpcEnv after being registered by a name.
A RpcEndpoint can be registered to one and only one RpcEnv.
The lifecycle of a RpcEndpoint is onStart
, receive
and onStop
in sequence.
receive
can be called concurrently.
Tip
|
If you want receive to be thread-safe, use ThreadSafeRpcEndpoint.
|
onError
method is called for any exception thrown.
ThreadSafeRpcEndpoint
is a marker RpcEndpoint that does nothing by itself but tells…
Caution
|
FIXME What is marker? |
Note
|
ThreadSafeRpcEndpoint is a private[spark] trait .
|
A RpcEndpointRef is a reference for a RpcEndpoint in a RpcEnv.
It is serializable entity and so you can send it over a network or save it for later use (it can however be deserialized using the owning RpcEnv only).
A RpcEndpointRef has an address (a Spark URL), and a name.
You can send asynchronous one-way messages to the corresponding RpcEndpoint using send
method.
You can send a semi-synchronous message, i.e. "subscribe" to be notified when a response arrives, using ask
method. You can also block the current calling thread for a response using askWithRetry
method.
-
spark.rpc.numRetries
(default:3
) - the number of times to retry connection attempts. -
spark.rpc.retry.wait
(default:3s
) - the number of milliseconds to wait on each retry.
It also uses lookup timeouts.
RpcAddress is the logical address for an RPC Environment, with hostname and port.
RpcAddress is encoded as a Spark URL, i.e. spark://host:port
.
RpcEndpointAddress is the logical address for an endpoint registered to an RPC Environment, with RpcAddress and name.
It is in the format of spark://[name]@[rpcAddress.host]:[rpcAddress.port].
When a remote endpoint is resolved, a local RPC environment connects to the remote one. It is called endpoint lookup. To configure the time needed for the endpoint lookup you can use the following settings.
It is a prioritized list of lookup timeout properties (the higher on the list, the more important):
-
spark.rpc.lookupTimeout
Their value can be a number alone (seconds) or any number with time suffix, e.g. 50s
, 100ms
, or 250us
. See Settings.
Ask operation is when a RPC client expects a response to a message. It is a blocking operation.
You can control the time to wait for a response using the following settings (in that order):
Their value can be a number alone (seconds) or any number with time suffix, e.g. 50s
, 100ms
, or 250us
. See Settings.
When RpcEnv catches uncaught exceptions, it uses RpcCallContext.sendFailure
to send exceptions back to the sender, or logging them if no such sender or NotSerializableException
.
If any error is thrown from one of RpcEndpoint methods except onError
, onError
will be invoked with the cause. If onError
throws an error, RpcEnv will ignore it.
When an RPC Environment is initialized as part of the initialization of the driver or executors (using RpcEnv.create
), clientMode
is false
for the driver and true
for executors.
RpcEnv.create(actorSystemName, hostname, port, conf, securityManager, clientMode = !isDriver)
Refer to Client Mode in Netty-based RpcEnv for the implementation-specific details.
RpcEnvConfig is a placeholder for an instance of SparkConf, the name of the RPC Environment, host and port, a security manager, and clientMode.
You can create a RPC Environment using the helper method RpcEnv.create
.
It assumes that you have a RpcEnvFactory with an empty constructor so that it can be created via Reflection that is available under spark.rpc
setting.
spark.rpc
(default: netty
since Spark 1.6.0-SNAPSHOT) - the RPC implementation to use. See RpcEnvFactory.
spark.rpc.lookupTimeout
(default: 120s
) - the default timeout to use for RPC remote endpoint lookup. Refer to Endpoint Lookup Timeout.
spark.network.timeout
(default: 120s
) - the default network timeout to use for RPC remote endpoint lookup.
It is used as a fallback value for spark.rpc.askTimeout.
-
spark.rpc.numRetries
(default:3
) - the number of attempts to send a message and receive a response from a remote endpoint. -
spark.rpc.retry.wait
(default:3s
) - the time to wait on each retry.
-
spark.rpc.askTimeout
(default:120s
) - the default timeout to use for RPC ask operations. Refer to Ask Operation Timeout.
The Worker class calls startRpcEnvAndEndpoint
with the following configuration options:
-
host
-
port
-
webUiPort
-
cores
-
memory
-
masters
-
workDir
It starts sparkWorker[N]
where N
is the identifier of a worker.