Skip to content

Commit

Permalink
Fix missing physical stream type in Jena stream writer (#219)
Browse files Browse the repository at this point in the history
* Fix missing phys. stream type in Jena stream writer

I've also added a bunch of tests to improve the coverage on the writer code in the various rarely used code paths.

Issue: #218

* Automatically set the logical stream type

This is done in RDF4J writer and in the "classic" Jena writers, so we should do it too. I've added a test that fails without the new addition.

* Add more checks if the stream opts are saved correctly
  • Loading branch information
Ostrzyciel authored Nov 14, 2024
1 parent 2892f77 commit c586814
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package eu.ostrzyciel.jelly.integration_tests.io

import eu.ostrzyciel.jelly.convert.jena.traits.JenaTest
import eu.ostrzyciel.jelly.core.*
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions
import eu.ostrzyciel.jelly.core.proto.v1.*
import eu.ostrzyciel.jelly.integration_tests.TestCases
import org.apache.pekko.actor.ActorSystem
import org.scalatest.concurrent.ScalaFutures
Expand Down Expand Up @@ -65,6 +65,24 @@ class IoSerDesSpec extends AnyWordSpec, Matchers, ScalaFutures, JenaTest:
)
)

private def checkStreamOptions(bytes: Array[Byte], expectedType: String, expectedOpt: RdfStreamOptions) =
val frame = RdfStreamFrame.parseDelimitedFrom(new ByteArrayInputStream(bytes)).get
frame.rows.size should be > 0
frame.rows.head.row.isOptions should be (true)
val options = frame.rows.head.row.options
if expectedType == "triples" then
options.physicalType should be (PhysicalStreamType.TRIPLES)
options.logicalType should be (LogicalStreamType.FLAT_TRIPLES)
else if expectedType == "quads" then
options.physicalType should be (PhysicalStreamType.QUADS)
options.logicalType should be (LogicalStreamType.FLAT_QUADS)
options.generalizedStatements should be (expectedOpt.generalizedStatements)
options.rdfStar should be (expectedOpt.rdfStar)
options.maxNameTableSize should be (expectedOpt.maxNameTableSize)
options.maxPrefixTableSize should be (expectedOpt.maxPrefixTableSize)
options.maxDatatypeTableSize should be (expectedOpt.maxDatatypeTableSize)
options.version should be (Constants.protoVersion)

runTest(JenaSerDes, JenaSerDes)
runTest(JenaSerDes, JenaStreamSerDes)
runTest(JenaSerDes, Rdf4jSerDes)
Expand Down Expand Up @@ -129,6 +147,7 @@ class IoSerDesSpec extends AnyWordSpec, Matchers, ScalaFutures, JenaTest:
os.close()
val data = os.toByteArray
data.size should be > 0
checkStreamOptions(data, "triples", preset)

val model2 = des.readTriplesJelly(ByteArrayInputStream(data), None)
val deserializedSize = summon[Measure[TMDes]].size(model2)
Expand All @@ -149,6 +168,7 @@ class IoSerDesSpec extends AnyWordSpec, Matchers, ScalaFutures, JenaTest:
os.close()
val data = os.toByteArray
data.size should be > 0
checkStreamOptions(data, "quads", preset)

val ds2 = des.readQuadsJelly(ByteArrayInputStream(data), None)
val deserializedSize = summon[Measure[TDDes]].size(ds2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ object JenaStreamSerDes extends NativeSerDes[Seq[Triple], Seq[Quad]]:

override def writeTriplesJelly(os: OutputStream, model: Seq[Triple], opt: RdfStreamOptions, frameSize: Int): Unit =
val context = RIOT.getContext.copy()
.set(JellyLanguage.SYMBOL_STREAM_OPTIONS, opt.withPhysicalType(PhysicalStreamType.TRIPLES))
// Not setting the physical type, as it should be inferred from the data.
// This emulates how RIOT initializes the stream writer in practice.
.set(JellyLanguage.SYMBOL_STREAM_OPTIONS, opt)
.set(JellyLanguage.SYMBOL_FRAME_SIZE, frameSize)

val writerStream = StreamRDFWriter.getWriterStream(os, JellyLanguage.JELLY, context)
Expand All @@ -66,7 +68,7 @@ object JenaStreamSerDes extends NativeSerDes[Seq[Triple], Seq[Quad]]:

override def writeQuadsJelly(os: OutputStream, dataset: Seq[Quad], opt: RdfStreamOptions, frameSize: Int): Unit =
val context = RIOT.getContext.copy()
.set(JellyLanguage.SYMBOL_STREAM_OPTIONS, opt.withPhysicalType(PhysicalStreamType.QUADS))
.set(JellyLanguage.SYMBOL_STREAM_OPTIONS, opt)
.set(JellyLanguage.SYMBOL_FRAME_SIZE, frameSize)

val writerStream = StreamRDFWriter.getWriterStream(os, JellyLanguage.JELLY, context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,51 @@ final class JellyDatasetWriter(opt: JellyFormatVariant) extends WriterDatasetRIO
object JellyStreamWriterFactory extends StreamRDFWriterFactory:
override def create(out: OutputStream, format: RDFFormat, context: Context): StreamRDF =
val variant = Util.getVariant(format)
JellyStreamWriter(
JellyStreamWriterAutodetectType(
variant.copy(
opt = context.get[RdfStreamOptions](JellyLanguage.SYMBOL_STREAM_OPTIONS, variant.opt),
frameSize = context.getInt(JellyLanguage.SYMBOL_FRAME_SIZE, variant.frameSize),
),
out
)

/**
* Wrapper on JellyStreamWriter that autodetects the physical stream type based on the first element
* (triple or quad) added to the stream.
*
* This is used when initializing the stream writer with the RIOT APIs, where the stream type is not known.
*
* @param opt Jelly format variant
* @param out output stream
*/
final class JellyStreamWriterAutodetectType(opt: JellyFormatVariant, out: OutputStream) extends StreamRDF:
private var inner: JellyStreamWriter = null

override def start(): Unit = ()

override def triple(triple: Triple): Unit =
if inner == null then
inner = JellyStreamWriter(opt.copy(
opt = opt.opt.withPhysicalType(PhysicalStreamType.TRIPLES)
.withLogicalType(LogicalStreamType.FLAT_TRIPLES),
), out)
inner.triple(triple)

override def quad(quad: Quad): Unit =
if inner == null then
inner = JellyStreamWriter(opt.copy(
opt = opt.opt.withPhysicalType(PhysicalStreamType.QUADS)
.withLogicalType(LogicalStreamType.FLAT_QUADS),
), out)
inner.quad(quad)

override def base(base: String): Unit = ()

override def prefix(prefix: String, iri: String): Unit = ()

override def finish(): Unit =
if inner != null then
inner.finish()

/**
* A stream writer that writes RDF data in Jelly format.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package eu.ostrzyciel.jelly.convert.jena.riot

import eu.ostrzyciel.jelly.convert.jena.traits.JenaTest
import org.apache.commons.io.output.NullWriter
import org.apache.jena.riot.RiotException
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import java.io.OutputStream

/**
* Tests covering rare edge cases in the Jelly writer.
* The main tests are done in the integration-tests module.
*/
class JellyWriterSpec extends AnyWordSpec, Matchers, JenaTest:
val streamWriters = Seq(
("JellyStreamWriter", (opt, out) => JellyStreamWriter(opt, out)),
("JellyStreamWriterAutodetectType", (opt, out) => JellyStreamWriterAutodetectType(opt, out)),
)

for (writerName, writerFactory) <- streamWriters do
f"$writerName" should {
for op <- Seq("start", "base", "prefix") do
f"do nothing on $op()" in {
var mutations = 0
val out = new OutputStream {
override def write(b: Int): Unit = mutations += 1
}
val writer = writerFactory(JellyFormatVariant(), out)
op match
case "start" => writer.start()
case "base" => writer.base("http://example.com")
case "prefix" => writer.prefix("ex", "http://example.com")
mutations should be (0)
}
}

"JellyStreamWriterAutodetectType" should {
"do nothing if the stream was not started" in {
val out = new OutputStream {
override def write(b: Int): Unit = fail("Should not write anything")
}
val writer = JellyStreamWriterAutodetectType(JellyFormatVariant(), out)
writer.finish()
}
}

val classicWriters: Seq[(String, JellyFormatVariant => JellyGraphWriter | JellyDatasetWriter)] = Seq(
("JellyGraphWriter", (opt) => JellyGraphWriter(opt)),
("JellyDatasetWriter", (opt) => JellyDatasetWriter(opt)),
)

for (writerName, writerFactory) <- classicWriters do
f"$writerName" should {
"throw an exception when writing to a Java Writer" in {
val writer = writerFactory(JellyFormatVariant())
val javaWriter = NullWriter.INSTANCE
intercept[RiotException] {
writer match
case graphWriter: JellyGraphWriter => graphWriter.write(javaWriter, null, null, null, null)
case datasetWriter: JellyDatasetWriter => datasetWriter.write(javaWriter, null, null, null, null)
}.getMessage should include ("Writing binary data to a java.io.Writer is not supported")
}

".getLang return JellyLanguage.JELLY" in {
val writer = writerFactory(JellyFormatVariant())
writer match
case graphWriter: JellyGraphWriter => graphWriter.getLang should be (JellyLanguage.JELLY)
case datasetWriter: JellyDatasetWriter => datasetWriter.getLang should be (JellyLanguage.JELLY)
}
}

0 comments on commit c586814

Please sign in to comment.