diff --git a/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyLanguage.scala b/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyLanguage.scala index bddf5203..6a55bbdf 100644 --- a/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyLanguage.scala +++ b/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyLanguage.scala @@ -41,7 +41,8 @@ object JellyLanguage: val SYMBOL_SUPPORTED_OPTIONS: util.Symbol = org.apache.jena.sparql.util.Symbol.create(SYMBOL_NS + "supportedOptions") /** - * Symbol for the frame size to be used when writing RDF data. + * Symbol for the target stream frame size to be used when writing RDF data. + * Frame size may be slightly larger than this value, to fit the entire statement and its lookup entries in one frame. * * Set this in Jena's Context to an integer (not long!) value. */ diff --git a/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyWriter.scala b/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyWriter.scala index b70e7571..ff0f301a 100644 --- a/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyWriter.scala +++ b/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyWriter.scala @@ -10,6 +10,7 @@ import org.apache.jena.sparql.core.{DatasetGraph, Quad} import org.apache.jena.sparql.util.Context import java.io.{OutputStream, Writer} +import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters.* @@ -48,15 +49,14 @@ final class JellyGraphWriter(opt: JellyFormatVariant) extends WriterGraphRIOTBas "Please use an OutputStream.") override def write(out: OutputStream, graph: Graph, prefixMap: PrefixMap, baseURI: String, context: Context): Unit = - val encoder = JenaConverterFactory.encoder( - opt.opt - .withPhysicalType(PhysicalStreamType.TRIPLES) - .withLogicalType(LogicalStreamType.FLAT_TRIPLES) + val variant = opt.copy(opt.opt + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.FLAT_TRIPLES) ) - graph.find().asScala - .flatMap(triple => encoder.addTripleStatement(triple)) - .grouped(opt.frameSize) - .foreach(rows => RdfStreamFrame(rows = rows.toSeq).writeDelimitedTo(out)) + val inner = JellyStreamWriter(variant, out) + for triple <- graph.find().asScala do + inner.triple(triple) + inner.finish() override def getLang: Lang = JellyLanguage.JELLY @@ -74,15 +74,14 @@ final class JellyDatasetWriter(opt: JellyFormatVariant) extends WriterDatasetRIO override def write( out: OutputStream, dataset: DatasetGraph, prefixMap: PrefixMap, baseURI: String, context: Context ): Unit = - val encoder = JenaConverterFactory.encoder( - opt.opt - .withPhysicalType(PhysicalStreamType.QUADS) - .withLogicalType(LogicalStreamType.FLAT_QUADS) + val variant = opt.copy(opt.opt + .withPhysicalType(PhysicalStreamType.QUADS) + .withLogicalType(LogicalStreamType.FLAT_QUADS) ) - dataset.find().asScala - .flatMap(quad => encoder.addQuadStatement(quad)) - .grouped(opt.frameSize) - .foreach(rows => RdfStreamFrame(rows = rows.toSeq).writeDelimitedTo(out)) + val inner = JellyStreamWriter(variant, out) + for quad <- dataset.find().asScala do + inner.quad(quad) + inner.finish() override def getLang: Lang = JellyLanguage.JELLY @@ -113,18 +112,20 @@ final class JellyStreamWriter(opt: JellyFormatVariant, out: OutputStream) extend // We don't set any options here – it is the responsibility of the caller to set // a valid stream type here. private val encoder = JenaConverterFactory.encoder(opt.opt) - private val buffer = collection.mutable.ArrayBuffer.empty[RdfStreamRow] + private val buffer: ArrayBuffer[RdfStreamRow] = new ArrayBuffer[RdfStreamRow]() // No need to handle this, the encoder will emit the header automatically anyway override def start(): Unit = () override def triple(triple: Triple): Unit = - buffer.addAll(encoder.addTripleStatement(triple)) - writeBuffer(isFinishing = false) + buffer ++= encoder.addTripleStatement(triple) + if buffer.size >= opt.frameSize then + flushBuffer() override def quad(quad: Quad): Unit = - buffer.addAll(encoder.addQuadStatement(quad)) - writeBuffer(isFinishing = false) + buffer ++= encoder.addQuadStatement(quad) + if buffer.size >= opt.frameSize then + flushBuffer() // Not supported override def base(base: String): Unit = () @@ -133,15 +134,11 @@ final class JellyStreamWriter(opt: JellyFormatVariant, out: OutputStream) extend override def prefix(prefix: String, iri: String): Unit = () // Flush the buffer - override def finish(): Unit = writeBuffer(isFinishing = true) - - private def writeBuffer(isFinishing: Boolean): Unit = - if isFinishing then - buffer.grouped(opt.frameSize) - .foreach(rows => RdfStreamFrame(rows = rows.toSeq).writeDelimitedTo(out)) - buffer.clear() - else if buffer.size >= opt.frameSize then - buffer.take(buffer.size - (buffer.size % opt.frameSize)) - .grouped(opt.frameSize) - .foreach(rows => RdfStreamFrame(rows = rows.toSeq).writeDelimitedTo(out)) - buffer.remove(0, buffer.size - (buffer.size % opt.frameSize)) + override def finish(): Unit = + if buffer.nonEmpty then + flushBuffer() + + private def flushBuffer(): Unit = + val frame = RdfStreamFrame(rows = buffer.toSeq) + frame.writeDelimitedTo(out) + buffer.clear() diff --git a/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriter.scala b/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriter.scala index b5b0f817..718d2d6f 100644 --- a/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriter.scala +++ b/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriter.scala @@ -77,7 +77,8 @@ final class JellyWriter(out: OutputStream) extends AbstractRDFWriter: override def endRDF(): Unit = checkWritingStarted() - flushBuffer() + if buffer.nonEmpty then + flushBuffer() out.flush() override def handleComment(comment: String): Unit = diff --git a/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriterSettings.scala b/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriterSettings.scala index 414e5f7e..85d866ef 100644 --- a/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriterSettings.scala +++ b/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriterSettings.scala @@ -19,7 +19,8 @@ object JellyWriterSettings: val FRAME_SIZE = new LongRioSetting( "eu.ostrzyciel.jelly.convert.rdf4j.rio.frameSize", - "Target RDF frame size", + "Target RDF stream frame size. Frame size may be slightly larger than this value, " + + "to fit the entire statement and its lookup entries in one frame.", 256L )