Skip to content

Commit

Permalink
Jena RIOT writer: improve buffer handling (#210)
Browse files Browse the repository at this point in the history
For some reason the Jena writer had completely different logic for handling RDF stream frame sizes than RDF4J. Instead of the simple check if the size of the row buffer is at least the target size, we had some complicated shenaninigans to make the frames exactly the same. This is great, but there was no technical reason to do that (nobody really cares) and the extra logic was unnecessary burden.

This PR unifies this behavior. The regular (non-streaming) Jena writers also got a revamp -- now they are just a wrapper over the streaming writer. I've also fixed a small bug (?) where an empty stream frame would be emitted at the end of the stream for no reason.
  • Loading branch information
Ostrzyciel authored Nov 6, 2024
1 parent a9ded13 commit 7106531
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ object JellyLanguage:
val SYMBOL_SUPPORTED_OPTIONS: util.Symbol = org.apache.jena.sparql.util.Symbol.create(SYMBOL_NS + "supportedOptions")

/**
* Symbol for the frame size to be used when writing RDF data.
* Symbol for the target stream frame size to be used when writing RDF data.
* Frame size may be slightly larger than this value, to fit the entire statement and its lookup entries in one frame.
*
* Set this in Jena's Context to an integer (not long!) value.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.jena.sparql.core.{DatasetGraph, Quad}
import org.apache.jena.sparql.util.Context

import java.io.{OutputStream, Writer}
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters.*


Expand Down Expand Up @@ -48,15 +49,14 @@ final class JellyGraphWriter(opt: JellyFormatVariant) extends WriterGraphRIOTBas
"Please use an OutputStream.")

override def write(out: OutputStream, graph: Graph, prefixMap: PrefixMap, baseURI: String, context: Context): Unit =
val encoder = JenaConverterFactory.encoder(
opt.opt
.withPhysicalType(PhysicalStreamType.TRIPLES)
.withLogicalType(LogicalStreamType.FLAT_TRIPLES)
val variant = opt.copy(opt.opt
.withPhysicalType(PhysicalStreamType.TRIPLES)
.withLogicalType(LogicalStreamType.FLAT_TRIPLES)
)
graph.find().asScala
.flatMap(triple => encoder.addTripleStatement(triple))
.grouped(opt.frameSize)
.foreach(rows => RdfStreamFrame(rows = rows.toSeq).writeDelimitedTo(out))
val inner = JellyStreamWriter(variant, out)
for triple <- graph.find().asScala do
inner.triple(triple)
inner.finish()

override def getLang: Lang = JellyLanguage.JELLY

Expand All @@ -74,15 +74,14 @@ final class JellyDatasetWriter(opt: JellyFormatVariant) extends WriterDatasetRIO
override def write(
out: OutputStream, dataset: DatasetGraph, prefixMap: PrefixMap, baseURI: String, context: Context
): Unit =
val encoder = JenaConverterFactory.encoder(
opt.opt
.withPhysicalType(PhysicalStreamType.QUADS)
.withLogicalType(LogicalStreamType.FLAT_QUADS)
val variant = opt.copy(opt.opt
.withPhysicalType(PhysicalStreamType.QUADS)
.withLogicalType(LogicalStreamType.FLAT_QUADS)
)
dataset.find().asScala
.flatMap(quad => encoder.addQuadStatement(quad))
.grouped(opt.frameSize)
.foreach(rows => RdfStreamFrame(rows = rows.toSeq).writeDelimitedTo(out))
val inner = JellyStreamWriter(variant, out)
for quad <- dataset.find().asScala do
inner.quad(quad)
inner.finish()

override def getLang: Lang = JellyLanguage.JELLY

Expand Down Expand Up @@ -113,18 +112,20 @@ final class JellyStreamWriter(opt: JellyFormatVariant, out: OutputStream) extend
// We don't set any options here – it is the responsibility of the caller to set
// a valid stream type here.
private val encoder = JenaConverterFactory.encoder(opt.opt)
private val buffer = collection.mutable.ArrayBuffer.empty[RdfStreamRow]
private val buffer: ArrayBuffer[RdfStreamRow] = new ArrayBuffer[RdfStreamRow]()

// No need to handle this, the encoder will emit the header automatically anyway
override def start(): Unit = ()

override def triple(triple: Triple): Unit =
buffer.addAll(encoder.addTripleStatement(triple))
writeBuffer(isFinishing = false)
buffer ++= encoder.addTripleStatement(triple)
if buffer.size >= opt.frameSize then
flushBuffer()

override def quad(quad: Quad): Unit =
buffer.addAll(encoder.addQuadStatement(quad))
writeBuffer(isFinishing = false)
buffer ++= encoder.addQuadStatement(quad)
if buffer.size >= opt.frameSize then
flushBuffer()

// Not supported
override def base(base: String): Unit = ()
Expand All @@ -133,15 +134,11 @@ final class JellyStreamWriter(opt: JellyFormatVariant, out: OutputStream) extend
override def prefix(prefix: String, iri: String): Unit = ()

// Flush the buffer
override def finish(): Unit = writeBuffer(isFinishing = true)

private def writeBuffer(isFinishing: Boolean): Unit =
if isFinishing then
buffer.grouped(opt.frameSize)
.foreach(rows => RdfStreamFrame(rows = rows.toSeq).writeDelimitedTo(out))
buffer.clear()
else if buffer.size >= opt.frameSize then
buffer.take(buffer.size - (buffer.size % opt.frameSize))
.grouped(opt.frameSize)
.foreach(rows => RdfStreamFrame(rows = rows.toSeq).writeDelimitedTo(out))
buffer.remove(0, buffer.size - (buffer.size % opt.frameSize))
override def finish(): Unit =
if buffer.nonEmpty then
flushBuffer()

private def flushBuffer(): Unit =
val frame = RdfStreamFrame(rows = buffer.toSeq)
frame.writeDelimitedTo(out)
buffer.clear()
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ final class JellyWriter(out: OutputStream) extends AbstractRDFWriter:

override def endRDF(): Unit =
checkWritingStarted()
flushBuffer()
if buffer.nonEmpty then
flushBuffer()
out.flush()

override def handleComment(comment: String): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ object JellyWriterSettings:

val FRAME_SIZE = new LongRioSetting(
"eu.ostrzyciel.jelly.convert.rdf4j.rio.frameSize",
"Target RDF frame size",
"Target RDF stream frame size. Frame size may be slightly larger than this value, " +
"to fit the entire statement and its lookup entries in one frame.",
256L
)

Expand Down

0 comments on commit 7106531

Please sign in to comment.