From 710653146f1bcea0c959f36616f6a378c83831ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Sowi=C5=84ski?= Date: Wed, 6 Nov 2024 11:49:25 +0100 Subject: [PATCH] Jena RIOT writer: improve buffer handling (#210) For some reason the Jena writer had completely different logic for handling RDF stream frame sizes than RDF4J. Instead of the simple check if the size of the row buffer is at least the target size, we had some complicated shenaninigans to make the frames exactly the same. This is great, but there was no technical reason to do that (nobody really cares) and the extra logic was unnecessary burden. This PR unifies this behavior. The regular (non-streaming) Jena writers also got a revamp -- now they are just a wrapper over the streaming writer. I've also fixed a small bug (?) where an empty stream frame would be emitted at the end of the stream for no reason. --- .../convert/jena/riot/JellyLanguage.scala | 3 +- .../jelly/convert/jena/riot/JellyWriter.scala | 63 +++++++++---------- .../jelly/convert/rdf4j/rio/JellyWriter.scala | 3 +- .../rdf4j/rio/JellyWriterSettings.scala | 3 +- 4 files changed, 36 insertions(+), 36 deletions(-) diff --git a/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyLanguage.scala b/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyLanguage.scala index bddf5203..6a55bbdf 100644 --- a/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyLanguage.scala +++ b/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyLanguage.scala @@ -41,7 +41,8 @@ object JellyLanguage: val SYMBOL_SUPPORTED_OPTIONS: util.Symbol = org.apache.jena.sparql.util.Symbol.create(SYMBOL_NS + "supportedOptions") /** - * Symbol for the frame size to be used when writing RDF data. + * Symbol for the target stream frame size to be used when writing RDF data. + * Frame size may be slightly larger than this value, to fit the entire statement and its lookup entries in one frame. * * Set this in Jena's Context to an integer (not long!) value. */ diff --git a/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyWriter.scala b/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyWriter.scala index b70e7571..ff0f301a 100644 --- a/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyWriter.scala +++ b/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyWriter.scala @@ -10,6 +10,7 @@ import org.apache.jena.sparql.core.{DatasetGraph, Quad} import org.apache.jena.sparql.util.Context import java.io.{OutputStream, Writer} +import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters.* @@ -48,15 +49,14 @@ final class JellyGraphWriter(opt: JellyFormatVariant) extends WriterGraphRIOTBas "Please use an OutputStream.") override def write(out: OutputStream, graph: Graph, prefixMap: PrefixMap, baseURI: String, context: Context): Unit = - val encoder = JenaConverterFactory.encoder( - opt.opt - .withPhysicalType(PhysicalStreamType.TRIPLES) - .withLogicalType(LogicalStreamType.FLAT_TRIPLES) + val variant = opt.copy(opt.opt + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.FLAT_TRIPLES) ) - graph.find().asScala - .flatMap(triple => encoder.addTripleStatement(triple)) - .grouped(opt.frameSize) - .foreach(rows => RdfStreamFrame(rows = rows.toSeq).writeDelimitedTo(out)) + val inner = JellyStreamWriter(variant, out) + for triple <- graph.find().asScala do + inner.triple(triple) + inner.finish() override def getLang: Lang = JellyLanguage.JELLY @@ -74,15 +74,14 @@ final class JellyDatasetWriter(opt: JellyFormatVariant) extends WriterDatasetRIO override def write( out: OutputStream, dataset: DatasetGraph, prefixMap: PrefixMap, baseURI: String, context: Context ): Unit = - val encoder = JenaConverterFactory.encoder( - opt.opt - .withPhysicalType(PhysicalStreamType.QUADS) - .withLogicalType(LogicalStreamType.FLAT_QUADS) + val variant = opt.copy(opt.opt + .withPhysicalType(PhysicalStreamType.QUADS) + .withLogicalType(LogicalStreamType.FLAT_QUADS) ) - dataset.find().asScala - .flatMap(quad => encoder.addQuadStatement(quad)) - .grouped(opt.frameSize) - .foreach(rows => RdfStreamFrame(rows = rows.toSeq).writeDelimitedTo(out)) + val inner = JellyStreamWriter(variant, out) + for quad <- dataset.find().asScala do + inner.quad(quad) + inner.finish() override def getLang: Lang = JellyLanguage.JELLY @@ -113,18 +112,20 @@ final class JellyStreamWriter(opt: JellyFormatVariant, out: OutputStream) extend // We don't set any options here – it is the responsibility of the caller to set // a valid stream type here. private val encoder = JenaConverterFactory.encoder(opt.opt) - private val buffer = collection.mutable.ArrayBuffer.empty[RdfStreamRow] + private val buffer: ArrayBuffer[RdfStreamRow] = new ArrayBuffer[RdfStreamRow]() // No need to handle this, the encoder will emit the header automatically anyway override def start(): Unit = () override def triple(triple: Triple): Unit = - buffer.addAll(encoder.addTripleStatement(triple)) - writeBuffer(isFinishing = false) + buffer ++= encoder.addTripleStatement(triple) + if buffer.size >= opt.frameSize then + flushBuffer() override def quad(quad: Quad): Unit = - buffer.addAll(encoder.addQuadStatement(quad)) - writeBuffer(isFinishing = false) + buffer ++= encoder.addQuadStatement(quad) + if buffer.size >= opt.frameSize then + flushBuffer() // Not supported override def base(base: String): Unit = () @@ -133,15 +134,11 @@ final class JellyStreamWriter(opt: JellyFormatVariant, out: OutputStream) extend override def prefix(prefix: String, iri: String): Unit = () // Flush the buffer - override def finish(): Unit = writeBuffer(isFinishing = true) - - private def writeBuffer(isFinishing: Boolean): Unit = - if isFinishing then - buffer.grouped(opt.frameSize) - .foreach(rows => RdfStreamFrame(rows = rows.toSeq).writeDelimitedTo(out)) - buffer.clear() - else if buffer.size >= opt.frameSize then - buffer.take(buffer.size - (buffer.size % opt.frameSize)) - .grouped(opt.frameSize) - .foreach(rows => RdfStreamFrame(rows = rows.toSeq).writeDelimitedTo(out)) - buffer.remove(0, buffer.size - (buffer.size % opt.frameSize)) + override def finish(): Unit = + if buffer.nonEmpty then + flushBuffer() + + private def flushBuffer(): Unit = + val frame = RdfStreamFrame(rows = buffer.toSeq) + frame.writeDelimitedTo(out) + buffer.clear() diff --git a/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriter.scala b/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriter.scala index b5b0f817..718d2d6f 100644 --- a/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriter.scala +++ b/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriter.scala @@ -77,7 +77,8 @@ final class JellyWriter(out: OutputStream) extends AbstractRDFWriter: override def endRDF(): Unit = checkWritingStarted() - flushBuffer() + if buffer.nonEmpty then + flushBuffer() out.flush() override def handleComment(comment: String): Unit = diff --git a/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriterSettings.scala b/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriterSettings.scala index 414e5f7e..85d866ef 100644 --- a/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriterSettings.scala +++ b/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriterSettings.scala @@ -19,7 +19,8 @@ object JellyWriterSettings: val FRAME_SIZE = new LongRioSetting( "eu.ostrzyciel.jelly.convert.rdf4j.rio.frameSize", - "Target RDF frame size", + "Target RDF stream frame size. Frame size may be slightly larger than this value, " + + "to fit the entire statement and its lookup entries in one frame.", 256L )