ForeachBatchSink
is a streaming sink that is used for the foreachBatch source.
ForeachBatchSink
is created exclusively when DataStreamWriter
is requested to start execution of the streaming query (with the foreachBatch source).
ForeachBatchSink
uses ForeachBatchSink name.
import org.apache.spark.sql.Dataset
val q = spark.readStream
.format("rate")
.load
.writeStream
.foreachBatch { (output: Dataset[_], batchId: Long) => // <-- creates a ForeachBatchSink
println(s"Batch ID: $batchId")
output.show
}
.start
// q.stop
scala> println(q.lastProgress.sink.description)
ForeachBatchSink
Note
|
ForeachBatchSink was added in Spark 2.4.0 as part of SPARK-24565 Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame.
|
ForeachBatchSink
takes the following when created:
addBatch(batchId: Long, data: DataFrame): Unit
Note
|
addBatch is a part of Sink Contract to "add" a batch of data to the sink.
|
addBatch
…FIXME