FileStreamSink
is a concrete streaming sink that writes out the results of a streaming query to files (of the specified FileFormat) in the root path.
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val out = in.
writeStream.
format("parquet").
option("path", "parquet-output-dir").
option("checkpointLocation", "checkpoint-dir").
trigger(Trigger.ProcessingTime(10.seconds)).
outputMode(OutputMode.Append).
start
FileStreamSink
is created exclusively when DataSource
is requested to create a streaming sink for a file-based data source (i.e. FileFormat
).
Tip
|
Read up on FileFormat in The Internals of Spark SQL book. |
FileStreamSink
supports Append output mode only.
FileStreamSink
uses spark.sql.streaming.fileSink.log.deletion (as isDeletingExpiredLog
)
The textual representation of FileStreamSink
is FileSink[path]
FileStreamSink
uses _spark_metadata directory for…FIXME
Tip
|
Enable Add the following line to
Refer to Logging. |
addBatch(batchId: Long, data: DataFrame): Unit
Note
|
addBatch is a part of Sink Contract to "add" a batch of data to the sink.
|
addBatch
…FIXME
FileStreamSink
takes the following to be created:
FileStreamSink
initializes the internal properties.
basicWriteJobStatsTracker: BasicWriteJobStatsTracker
basicWriteJobStatsTracker
simply creates a BasicWriteJobStatsTracker
with the basic metrics:
-
number of written files
-
bytes of written output
-
number of output rows
-
number of dynamic partitions
Tip
|
Read up on BasicWriteJobStatsTracker in The Internals of Spark SQL book. |
Note
|
basicWriteJobStatsTracker is used exclusively when FileStreamSink is requested to addBatch.
|
Name | Description |
---|---|
|
Base path (Hadoop HDFS’s Path for the given path) Used when…FIXME |
|
Metadata log path (Hadoop HDFS’s Path for the base path and the _spark_metadata) Used when…FIXME |
|
FileStreamSinkLog (for the version 1 and the metadata log path) Used exclusively when |
|
Used when…FIXME |