ReceiverSupervisor
is an (abstract) handler object that is responsible for supervising a receiver (that runs on the worker). It assumes that implementations offer concrete methods to push received data to Spark.
Note
|
Receiver's store methods pass calls to respective push methods of ReceiverSupervisors.
|
Note
|
ReceiverTracker starts a ReceiverSupervisor per receiver. |
ReceiverSupervisor
can be started and stopped. When a supervisor is started, it calls (empty by default) onStart()
and startReceiver()
afterwards.
It attaches itself to the receiver it is a supervisor of (using Receiver.attachSupervisor
). That is how a receiver knows about its supervisor (and can hence offer the store
and management methods).
ReceiverSupervisor
is a private[streaming] abstract class
that assumes that concrete implementations offer the following push methods:
-
pushBytes
-
pushIterator
-
pushArrayBuffer
There are the other methods required:
-
createBlockGenerator
-
reportError
-
onReceiverStart
startReceiver()
calls (abstract) onReceiverStart()
. When true
(it is unknown at this point to know when it is true
or false
since it is an abstract method - see ReceiverSupervisorImpl.onReceiverStart for the default implementation), it prints the following INFO message to the logs:
INFO Starting receiver
The receiver’s onStart()
is called and another INFO message appears in the logs:
INFO Called receiver onStart
If however onReceiverStart()
returns false
, the supervisor stops (using stop
).
stop
method is called with a message and an optional cause of the stop (called error
). It calls stopReceiver
method that prints the INFO message and checks the state of the receiver to react appropriately.
When the receiver is in Started
state, stopReceiver
calls Receiver.onStop()
, prints the following INFO message, and onReceiverStop(message, error)
.
INFO Called receiver onStop
A ReceiverSupervisor
uses spark.streaming.receiverRestartDelay to restart the receiver with delay.
Note
|
Receivers can request to be restarted using restart methods.
|
When requested to restart a receiver, it uses a separate thread to perform it asynchronously. It prints the WARNING message to the logs:
WARNING Restarting receiver with delay [delay] ms: [message]
It then stops the receiver, sleeps for delay
milliseconds and starts the receiver (using startReceiver()
).
You should see the following messages in the logs:
DEBUG Sleeping for [delay]
INFO Starting receiver again
INFO Receiver started again
Caution
|
FIXME What is a backend data store? |
awaitTermination
method blocks the current thread to wait for the receiver to be stopped.
Note
|
ReceiverTracker uses awaitTermination to wait for receivers to stop (see StartAllReceivers).
|
When called, you should see the following INFO message in the logs:
INFO Waiting for receiver to be stopped
If a receiver has terminated successfully, you should see the following INFO message in the logs:
INFO Stopped receiver without error
Otherwise, you should see the ERROR message in the logs:
ERROR Stopped receiver with error: [stoppingError]
stoppingError
is the exception associated with the stopping of the receiver and is rethrown.
Note
|
Internally, ReceiverSupervisor uses java.util.concurrent.CountDownLatch with count 1 to await the termination.
|
stopLatch
is decremented when ReceiverSupervisor’s stop
is called which is in the following cases:
-
When a receiver itself calls
stop(message: String)
orstop(message: String, error: Throwable)
-
When ReceiverSupervisor.onReceiverStart() returns
false
orNonFatal
(less severe) exception is thrown inReceiverSupervisor.startReceiver
. -
When ReceiverTracker.stop is called that posts
StopAllReceivers
message toReceiverTrackerEndpoint
. It in turn sendsStopReceiver
to theReceiverSupervisorImpl
for everyReceiverSupervisor
that callsReceiverSupervisorImpl.stop
.
Caution
|
FIXME Prepare exercises
|
ReceiverSupervisorImpl
is the implementation of ReceiverSupervisor contract.
Note
|
A dedicated ReceiverSupervisorImpl is started for every receiver when ReceiverTracker starts. See ReceiverTrackerEndpoint.startReceiver.
|
It communicates with ReceiverTracker that runs on the driver (by posting messages using the ReceiverTracker RPC endpoint).
Tip
|
Enable Add the following line to
|
push methods, i.e. pushArrayBuffer
, pushIterator
, and pushBytes
solely pass calls on to ReceiverSupervisorImpl.pushAndReportBlock.
ReceiverSupervisorImpl.onReceiverStart
sends a blocking RegisterReceiver
message to ReceiverTracker that responds with a boolean value.
getCurrentRateLimit
controls the current rate limit. It asks the BlockGenerator
for the value (using getCurrentLimit
).
ReceiverSupervisorImpl
uses the internal field receivedBlockHandler
for ReceivedBlockHandler to use.
It defaults to BlockManagerBasedBlockHandler, but could use WriteAheadLogBasedBlockHandler instead when spark.streaming.receiver.writeAheadLog.enable is true
.
It uses ReceivedBlockHandler
to storeBlock
(see ReceivedBlockHandler Contract for more coverage and ReceiverSupervisorImpl.pushAndReportBlock in this document).
ReceiverSupervisorImpl.pushAndReportBlock(receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId])
stores receivedBlock
using ReceivedBlockHandler.storeBlock
and reports it to the driver.
Note
|
ReceiverSupervisorImpl.pushAndReportBlock is only used by the push methods, i.e. pushArrayBuffer , pushIterator , and pushBytes . Calling the method is actually all they do.
|
When it calls ReceivedBlockHandler.storeBlock
, you should see the following DEBUG message in the logs:
DEBUG Pushed block [blockId] in [time] ms
It then sends AddBlock
(with ReceivedBlockInfo
for streamId
, BlockStoreResult.numRecords
, metadataOption
, and the result of ReceivedBlockHandler.storeBlock
) to ReceiverTracker RPC endpoint (that runs on the driver).
When a response comes, you should see the following DEBUG message in the logs:
DEBUG Reported block [blockId]