Skip to content

Latest commit

 

History

History
46 lines (33 loc) · 1.61 KB

spark-sql-streaming-ForeachBatchSink.adoc

File metadata and controls

46 lines (33 loc) · 1.61 KB

ForeachBatchSink

ForeachBatchSink is a streaming sink that is used for the foreachBatch source.

ForeachBatchSink is created exclusively when DataStreamWriter is requested to start execution of the streaming query (with the foreachBatch source).

ForeachBatchSink uses ForeachBatchSink name.

import org.apache.spark.sql.Dataset
val q = spark.readStream
  .format("rate")
  .load
  .writeStream
  .foreachBatch { (output: Dataset[_], batchId: Long) => // <-- creates a ForeachBatchSink
    println(s"Batch ID: $batchId")
    output.show
  }
  .start
// q.stop

scala> println(q.lastProgress.sink.description)
ForeachBatchSink

Creating ForeachBatchSink Instance

ForeachBatchSink takes the following when created:

  • Batch writer ((Dataset[T], Long) ⇒ Unit)

  • Encoder (ExpressionEncoder[T])

Adding Batch — addBatch Method

addBatch(batchId: Long, data: DataFrame): Unit
Note
addBatch is a part of Sink Contract to "add" a batch of data to the sink.

addBatch…​FIXME