You use stream operators to apply transformations to the elements received (often called records) from input streams and ultimately trigger computations using output operators.
Transformations are stateless, but Spark Streaming comes with an experimental support for stateful operators (e.g. mapWithState or updateStateByKey). It also offers windowed operators that can work across batches.
Note
|
You may use RDDs from other (non-streaming) data sources to build more advanced pipelines. |
There are two main types of operators:
-
transformations that transform elements in input data RDDs
-
output operators that register input streams as output streams so the execution can start.
Every Discretized Stream (DStream) offers the following operators:
-
(output operator)
print
to print 10 elements only or the more general versionprint(num: Int)
to print up tonum
elements. See print operation in this document. -
(output operator) foreachRDD
-
(output operator) saveAsObjectFiles
-
(output operator) saveAsTextFiles
-
flatMap
-
filter
-
repartition
-
mapPartitions
-
count
-
countByValue
-
countByWindow
-
countByValueAndWindow
-
union
Note
|
DStream companion object offers a Scala implicit to convert DStream[(K, V)] to PairDStreamFunctions with methods on DStreams of key-value pairs, e.g. mapWithState or updateStateByKey.
|
Most streaming operators come with their own custom DStream
to offer the service. It however very often boils down to overriding the compute method and applying corresponding RDD operator on a generated RDD.
print(num: Int)
operator prints num
first elements of each RDD in the input stream.
print
uses print(num: Int)
with num
being 10
.
It is a output operator (that returns Unit
).
For each batch, print
operator prints the following header to the standard output (regardless of the number of elements to be printed out):
-------------------------------------------
Time: [time] ms
-------------------------------------------
Internally, it calls RDD.take(num + 1)
(see take action) on each RDD in the stream to print num
elements. It then prints …
if there are more elements in the RDD (that would otherwise exceed num
elements being requested to print).
It creates a ForEachDStream stream and registers it as an output stream.
foreachRDD(foreachFunc: RDD[T] => Unit): Unit
foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
foreachRDD
operator applies foreachFunc
function to every RDD in the stream.
It creates a ForEachDStream stream and registers it as an output stream.
glom(): DStream[Array[T]]
reduce(reduceFunc: (T, T) => T): DStream[T]
reduce
operator creates a new stream of RDDs of a single element that is a result of applying reduceFunc
to the data received.
Internally, it uses map and reduceByKey operators.
map[U](mapFunc: T => U): DStream[U]
map
operator creates a new stream with the source elements being mapped over using mapFunc
function.
It creates MappedDStream
stream that, when requested to compute a RDD, uses RDD.map operator.
reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]
reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)]
reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]
transform(transformFunc: RDD[T] => RDD[U]): DStream[U]
transform(transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
transform
operator applies transformFunc
function to the generated RDD for a batch.
It creates a TransformedDStream stream.
Note
|
It asserts that one and exactly one RDD has been generated for a batch before calling the transformFunc .
|
Note
|
It is not allowed to return null from transformFunc or a SparkException is reported. See TransformedDStream.
|
import org.apache.spark.streaming.{ StreamingContext, Seconds }
val ssc = new StreamingContext(sc, batchDuration = Seconds(5))
val rdd = sc.parallelize(0 to 9)
import org.apache.spark.streaming.dstream.ConstantInputDStream
val clicks = new ConstantInputDStream(ssc, rdd)
import org.apache.spark.rdd.RDD
val transformFunc: RDD[Int] => RDD[Int] = { inputRDD =>
println(s">>> inputRDD: $inputRDD")
// Use SparkSQL's DataFrame to manipulate the input records
import spark.implicits._
inputRDD.toDF("num").show
inputRDD
}
clicks.transform(transformFunc).print
transformWith(other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]): DStream[V]
transformWith(other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]): DStream[V]
transformWith
operators apply the transformFunc
function to two generated RDD for a batch.
It creates a TransformedDStream stream.
Note
|
It asserts that two and exactly two RDDs have been generated for a batch before calling the transformFunc .
|
Note
|
It is not allowed to return null from transformFunc or a SparkException is reported. See TransformedDStream.
|
import org.apache.spark.streaming.{ StreamingContext, Seconds }
val ssc = new StreamingContext(sc, batchDuration = Seconds(5))
val ns = sc.parallelize(0 to 2)
import org.apache.spark.streaming.dstream.ConstantInputDStream
val nums = new ConstantInputDStream(ssc, ns)
val ws = sc.parallelize(Seq("zero", "one", "two"))
import org.apache.spark.streaming.dstream.ConstantInputDStream
val words = new ConstantInputDStream(ssc, ws)
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.Time
val transformFunc: (RDD[Int], RDD[String], Time) => RDD[(Int, String)] = { case (ns, ws, time) =>
println(s">>> ns: $ns")
println(s">>> ws: $ws")
println(s">>> batch: $time")
ns.zip(ws)
}
nums.transformWith(words, transformFunc).print