A Streaming Sink represents an external storage to write streaming datasets to. It is modeled as Sink
trait that can process batches of data given as DataFrames.
The following sinks are currently available in Spark:
-
ConsoleSink for
console
format. -
FileStreamSink for
parquet
format. -
ForeachSink used in foreach operator.
-
MemorySink for
memory
format.
You can create your own streaming format implementing StreamSinkProvider.
Sink Contract is described by Sink
trait. It defines the one and only addBatch
method to add data
as batchId
.
package org.apache.spark.sql.execution.streaming
trait Sink {
def addBatch(batchId: Long, data: DataFrame): Unit
}
FileStreamSink
is the streaming sink for the parquet
format.
Caution
|
FIXME |
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
val out = in.writeStream
.format("parquet")
.option("path", "parquet-output-dir")
.option("checkpointLocation", "checkpoint-dir")
.trigger(ProcessingTime(5.seconds))
.outputMode(OutputMode.Append)
.start()
FileStreamSink
supports Append
output mode only.
It uses spark.sql.streaming.fileSink.log.deletion (as isDeletingExpiredLog
)
MemorySink
is an memory-based Sink
particularly useful for testing. It stores the results in memory.
It is available as memory
format that requires a query name (by queryName
method or queryName
option).
...FIXME
Note
|
It was introduced in the pull request for [SPARK-14288][SQL] Memory Sink for streaming. |
Use toDebugString
to see the batches.
Its aim is to allow users to test streaming applications in the Spark shell or other local tests.
You can set checkpointLocation
using option
method or it will be set to spark.sql.streaming.checkpointLocation setting.
If spark.sql.streaming.checkpointLocation
is set, the code uses $location/$queryName
directory.
Finally, when no spark.sql.streaming.checkpointLocation
is set, a temporary directory memory.stream
under java.io.tmpdir
is used with offsets
subdirectory inside.
Note
|
The directory is cleaned up at shutdown using ShutdownHookManager.registerShutdownDeleteDir .
|
val nums = (0 to 10).toDF("num")
scala> val outStream = nums.write
.format("memory")
.queryName("memStream")
.startStream()
16/04/11 19:37:05 INFO HiveSqlParser: Parsing command: memStream
outStream: org.apache.spark.sql.StreamingQuery = Continuous Query - memStream [state = ACTIVE]
It creates MemorySink
instance based on the schema of the DataFrame it operates on.
It creates a new DataFrame using MemoryPlan
with MemorySink
instance created earlier and registers it as a temporary table (using DataFrame.registerTempTable method).
Note
|
At this point you can query the table as if it were a regular non-streaming table using sql method. |
A new StreamingQuery is started (using StreamingQueryManager.startQuery) and returned.
Caution
|
FIXME Describe else part.
|