Spark Structured Streaming (aka Spark Streams) is a module in Apache Spark for stream processing engines with a high-level declarative streaming API built on top of Spark SQL.
The semantics of the Structured Streaming model is as follows (see the article Structured Streaming In Apache Spark):
At any time, the output of a continuous application is equivalent to executing a batch job on a prefix of the data.
Note
|
As of Spark 2.2.0, Structured Streaming has been marked stable and ready for production use. With that the other older streaming module Spark Streaming is considered obsolete and not used for developing new streaming applications with Apache Spark. |
Structured Streaming attempts to unify streaming, interactive, and batch queries over structured datasets for developing end-to-end stream processing applications dubbed continuous applications using Spark SQL’s Datasets API with additional support for the following features:
-
Micro-Batch and Continuous Stream Processing Engines
In Structured Streaming, Spark developers describe custom streaming computations in the same way as with Spark SQL. Internally, Structured Streaming applies the user-defined structured query to the continuously and indefinitely arriving data to analyze real-time streaming data.
With Structured Streaming, Spark 2 aims at simplifying streaming analytics with little to no need to reason about effective data streaming (trying to hide the unnecessary complexity in your streaming analytics architectures).
Structured Streaming introduces the concept of streaming datasets that are infinite datasets with primitives like input streaming data sources and output streaming data sinks.
Tip
|
A val batchQuery = spark.
read. // <-- batch non-streaming query
csv("sales")
assert(batchQuery.isStreaming == false)
val streamingQuery = spark.
readStream. // <-- streaming query
format("rate").
load
assert(streamingQuery.isStreaming) Read up on Spark SQL, Datasets and logical plans in The Internals of Spark SQL book. |
Structured Streaming models a stream of data as an infinite (and hence continuous) table that could be changed every streaming batch.
You can specify output mode of a streaming dataset which is what gets written to a streaming sink (i.e. the infinite result table) when there is a new data available.
Streaming Datasets use streaming query plans (as opposed to regular batch Datasets that are based on batch query plans).
Note
|
From this perspective, batch queries can be considered streaming Datasets executed once only (and is why some batch queries, e.g. KafkaSource, can easily work in batch mode). val batchQuery = spark.read.format("rate").load
assert(batchQuery.isStreaming == false)
val streamingQuery = spark.readStream.format("rate").load
assert(streamingQuery.isStreaming) |
// The following example executes a streaming query over CSV files
// CSV format requires a schema before you can start the query
// You could build your schema manually
import org.apache.spark.sql.types._
val schema = StructType(
StructField("id", LongType, false) ::
StructField("name", StringType, true) ::
StructField("city", StringType, true) :: Nil)
// ...or using the Schema DSL
val schema = new StructType().
add($"long".long.copy(nullable = false)).
add($"name".string).
add($"city".string)
// ...but is error-prone and time-consuming, isn't it?
// Use the business object that describes the dataset
case class Person(id: Long, name: String, city: String)
import org.apache.spark.sql.Encoders
val schema = Encoders.product[Person].schema
val people = spark.
readStream.
schema(schema).
csv("in/*.csv").
as[Person]
// people has this additional capability of being streaming
assert(people.isStreaming)
// ...but it is still a Dataset.
// (Almost) any Dataset operation is available
val population = people.
groupBy('city).
agg(count('city) as "population")
// Start the streaming query
// Write the result using console format, i.e. print to the console
// Only Complete output mode supported by groupBy
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val populationStream = population.
writeStream.
format("console").
trigger(Trigger.ProcessingTime(30.seconds)).
outputMode(OutputMode.Complete).
queryName("textStream").
start
assert(populationStream.isActive)
scala> populationStream.explain(extended = true)
== Parsed Logical Plan ==
Aggregate [city#112], [city#112, count(city#112) AS population#19L]
+- Relation[id#110L,name#111,city#112] csv
== Analyzed Logical Plan ==
city: string, population: bigint
Aggregate [city#112], [city#112, count(city#112) AS population#19L]
+- Relation[id#110L,name#111,city#112] csv
== Optimized Logical Plan ==
Aggregate [city#112], [city#112, count(city#112) AS population#19L]
+- Project [city#112]
+- Relation[id#110L,name#111,city#112] csv
== Physical Plan ==
*HashAggregate(keys=[city#112], functions=[count(city#112)], output=[city#112, population#19L])
+- Exchange hashpartitioning(city#112, 200)
+- *HashAggregate(keys=[city#112], functions=[partial_count(city#112)], output=[city#112, count#118L])
+- *FileScan csv [city#112] Batched: false, Format: CSV, InputPaths: file:/Users/jacek/dev/oss/spark/in/1.csv, file:/Users/jacek/dev/oss/spark/in/2.csv, file:/Users/j..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<city:string>
// Let's query for all active streams
scala> spark.streams.active.foreach(println)
Streaming Query - Population [state = ACTIVE]
// You may eventually want to stop the streaming query
populationStream.stop
assert(populationStream.isActive == false)
Structured streaming is defined by the following data abstractions in org.apache.spark.sql.streaming
package:
Structured Streaming follows micro-batch model and periodically fetches data from the data source (and uses the DataFrame
data abstraction to represent the fetched data for a certain batch).
With Datasets as Spark SQL’s view of structured data, structured streaming checks input sources for new data every trigger (time) and executes the (continuous) queries.
Tip
|
Structured Streaming was introduced in SPARK-8360 Structured Streaming (aka Streaming DataFrames). |
Tip
|
Read the official programming guide of Spark about Structured Streaming. |
Note
|
The feature has also been called Streaming Spark SQL Query, Streaming DataFrames, Continuous DataFrame or Continuous Query. There have been lots of names before the Spark project settled on Structured Streaming. |
Note
|
The example is "borrowed" from the official documentation of Spark. Changes and errors are only mine. |
Tip
|
You need to run nc -lk 9999 first before running the example.
|
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load
.as[String]
val words = lines.flatMap(_.split("\\W+"))
scala> words.printSchema
root
|-- value: string (nullable = true)
val counter = words.groupBy("value").count
// nc -lk 9999 is supposed to be up at this point
import org.apache.spark.sql.streaming.OutputMode.Complete
val query = counter.writeStream
.outputMode(Complete)
.format("console")
.start
query.stop
Below you can find a complete example of a streaming query in a form of DataFrame
of data from csv-logs
files in csv
format of a given schema into a console
sink every 5 seconds.
Tip
|
Copy and paste it to Spark Shell in :paste mode to run it.
|
// Explicit schema with nullables false
import org.apache.spark.sql.types._
val schemaExp = StructType(
StructField("name", StringType, false) ::
StructField("city", StringType, true) ::
StructField("country", StringType, true) ::
StructField("age", IntegerType, true) ::
StructField("alive", BooleanType, false) :: Nil
)
// Implicit inferred schema
val schemaImp = spark.read
.format("csv")
.option("header", true)
.option("inferSchema", true)
.load("csv-logs")
.schema
val in = spark.readStream
.schema(schemaImp)
.format("csv")
.option("header", true)
.option("maxFilesPerTrigger", 1)
.load("csv-logs")
scala> in.printSchema
root
|-- name: string (nullable = true)
|-- city: string (nullable = true)
|-- country: string (nullable = true)
|-- age: integer (nullable = true)
|-- alive: boolean (nullable = true)
println("Is the query streaming" + in.isStreaming)
println("Are there any streaming queries?" + spark.streams.active.isEmpty)
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val out = in.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.ProcessingTime("5 seconds")).
queryName("consoleStream").
outputMode(Output.Append).
start
16/07/13 12:32:11 TRACE FileStreamSource: Listed 3 file(s) in 4.274022 ms
16/07/13 12:32:11 TRACE FileStreamSource: Files are:
file:///Users/jacek/dev/oss/spark/csv-logs/people-1.csv
file:///Users/jacek/dev/oss/spark/csv-logs/people-2.csv
file:///Users/jacek/dev/oss/spark/csv-logs/people-3.csv
16/07/13 12:32:11 DEBUG FileStreamSource: New file: file:///Users/jacek/dev/oss/spark/csv-logs/people-1.csv
16/07/13 12:32:11 TRACE FileStreamSource: Number of new files = 3
16/07/13 12:32:11 TRACE FileStreamSource: Number of files selected for batch = 1
16/07/13 12:32:11 TRACE FileStreamSource: Number of seen files = 1
16/07/13 12:32:11 INFO FileStreamSource: Max batch id increased to 0 with 1 new files
16/07/13 12:32:11 INFO FileStreamSource: Processing 1 files from 0:0
16/07/13 12:32:11 TRACE FileStreamSource: Files are:
file:///Users/jacek/dev/oss/spark/csv-logs/people-1.csv
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+--------+-------+---+-----+
| name| city|country|age|alive|
+-----+--------+-------+---+-----+
|Jacek|Warszawa| Polska| 42| true|
+-----+--------+-------+---+-----+
spark.streams
.active
.foreach(println)
// Streaming Query - consoleStream [state = ACTIVE]
scala> spark.streams.active(0).explain
== Physical Plan ==
*Scan csv [name#130,city#131,country#132,age#133,alive#134] Format: CSV, InputPaths: file:/Users/jacek/dev/oss/spark/csv-logs/people-3.csv, PushedFilters: [], ReadSchema: struct<name:string,city:string,country:string,age:int,alive:boolean>
-
(article) Structured Streaming In Apache Spark
-
(video) The Future of Real Time in Spark from Spark Summit East 2016 in which Reynold Xin presents the concept of Streaming DataFrames to the public
-
(video) Structuring Spark: DataFrames, Datasets, and Streaming
-
(video) A Deep Dive Into Structured Streaming by Tathagata "TD" Das from Spark Summit 2016
-
(video) Arbitrary Stateful Aggregations in Structured Streaming in Apache Spark by Burak Yavuz