Skip to content

Commit

Permalink
Fix missing phys. stream type in Jena stream writer
Browse files Browse the repository at this point in the history
I've also added a bunch of tests to improve the coverage on the writer code in the various rarely used code paths.

Issue: #218
  • Loading branch information
Ostrzyciel committed Nov 14, 2024
1 parent 2892f77 commit 38cf623
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ object JenaStreamSerDes extends NativeSerDes[Seq[Triple], Seq[Quad]]:

override def writeTriplesJelly(os: OutputStream, model: Seq[Triple], opt: RdfStreamOptions, frameSize: Int): Unit =
val context = RIOT.getContext.copy()
.set(JellyLanguage.SYMBOL_STREAM_OPTIONS, opt.withPhysicalType(PhysicalStreamType.TRIPLES))
// Not setting the physical type, as it should be inferred from the data.
// This emulates how RIOT initializes the stream writer in practice.
.set(JellyLanguage.SYMBOL_STREAM_OPTIONS, opt)
.set(JellyLanguage.SYMBOL_FRAME_SIZE, frameSize)

val writerStream = StreamRDFWriter.getWriterStream(os, JellyLanguage.JELLY, context)
Expand All @@ -66,7 +68,7 @@ object JenaStreamSerDes extends NativeSerDes[Seq[Triple], Seq[Quad]]:

override def writeQuadsJelly(os: OutputStream, dataset: Seq[Quad], opt: RdfStreamOptions, frameSize: Int): Unit =
val context = RIOT.getContext.copy()
.set(JellyLanguage.SYMBOL_STREAM_OPTIONS, opt.withPhysicalType(PhysicalStreamType.QUADS))
.set(JellyLanguage.SYMBOL_STREAM_OPTIONS, opt)
.set(JellyLanguage.SYMBOL_FRAME_SIZE, frameSize)

val writerStream = StreamRDFWriter.getWriterStream(os, JellyLanguage.JELLY, context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,49 @@ final class JellyDatasetWriter(opt: JellyFormatVariant) extends WriterDatasetRIO
object JellyStreamWriterFactory extends StreamRDFWriterFactory:
override def create(out: OutputStream, format: RDFFormat, context: Context): StreamRDF =
val variant = Util.getVariant(format)
JellyStreamWriter(
JellyStreamWriterAutodetectType(
variant.copy(
opt = context.get[RdfStreamOptions](JellyLanguage.SYMBOL_STREAM_OPTIONS, variant.opt),
frameSize = context.getInt(JellyLanguage.SYMBOL_FRAME_SIZE, variant.frameSize),
),
out
)

/**
* Wrapper on JellyStreamWriter that autodetects the physical stream type based on the first element
* (triple or quad) added to the stream.
*
* This is used when initializing the stream writer with the RIOT APIs, where the stream type is not known.
*
* @param opt Jelly format variant
* @param out output stream
*/
final class JellyStreamWriterAutodetectType(opt: JellyFormatVariant, out: OutputStream) extends StreamRDF:
private var inner: JellyStreamWriter = null

override def start(): Unit = ()

override def triple(triple: Triple): Unit =
if inner == null then
inner = JellyStreamWriter(opt.copy(
opt = opt.opt.withPhysicalType(PhysicalStreamType.TRIPLES),
), out)
inner.triple(triple)

override def quad(quad: Quad): Unit =
if inner == null then
inner = JellyStreamWriter(opt.copy(
opt = opt.opt.withPhysicalType(PhysicalStreamType.QUADS),
), out)
inner.quad(quad)

override def base(base: String): Unit = ()

override def prefix(prefix: String, iri: String): Unit = ()

override def finish(): Unit =
if inner != null then
inner.finish()

/**
* A stream writer that writes RDF data in Jelly format.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package eu.ostrzyciel.jelly.convert.jena.riot

import eu.ostrzyciel.jelly.convert.jena.traits.JenaTest
import org.apache.commons.io.output.NullWriter
import org.apache.jena.riot.RiotException
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import java.io.OutputStream

/**
* Tests covering rare edge cases in the Jelly writer.
* The main tests are done in the integration-tests module.
*/
class JellyWriterSpec extends AnyWordSpec, Matchers, JenaTest:
val streamWriters = Seq(
("JellyStreamWriter", (opt, out) => JellyStreamWriter(opt, out)),
("JellyStreamWriterAutodetectType", (opt, out) => JellyStreamWriterAutodetectType(opt, out)),
)

for (writerName, writerFactory) <- streamWriters do
f"$writerName" should {
for op <- Seq("start", "base", "prefix") do
f"do nothing on $op()" in {
var mutations = 0
val out = new OutputStream {
override def write(b: Int): Unit = mutations += 1
}
val writer = writerFactory(JellyFormatVariant(), out)
op match
case "start" => writer.start()
case "base" => writer.base("http://example.com")
case "prefix" => writer.prefix("ex", "http://example.com")
mutations should be (0)
}
}

"JellyStreamWriterAutodetectType" should {
"do nothing if the stream was not started" in {
val out = new OutputStream {
override def write(b: Int): Unit = fail("Should not write anything")
}
val writer = JellyStreamWriterAutodetectType(JellyFormatVariant(), out)
writer.finish()
}
}

val classicWriters: Seq[(String, JellyFormatVariant => JellyGraphWriter | JellyDatasetWriter)] = Seq(
("JellyGraphWriter", (opt) => JellyGraphWriter(opt)),
("JellyDatasetWriter", (opt) => JellyDatasetWriter(opt)),
)

for (writerName, writerFactory) <- classicWriters do
f"$writerName" should {
"throw an exception when writing to a Java Writer" in {
val writer = writerFactory(JellyFormatVariant())
val javaWriter = NullWriter.INSTANCE
intercept[RiotException] {
writer match
case graphWriter: JellyGraphWriter => graphWriter.write(javaWriter, null, null, null, null)
case datasetWriter: JellyDatasetWriter => datasetWriter.write(javaWriter, null, null, null, null)
}.getMessage should include ("Writing binary data to a java.io.Writer is not supported")
}

".getLang return JellyLanguage.JELLY" in {
val writer = writerFactory(JellyFormatVariant())
writer match
case graphWriter: JellyGraphWriter => graphWriter.getLang should be (JellyLanguage.JELLY)
case datasetWriter: JellyDatasetWriter => datasetWriter.getLang should be (JellyLanguage.JELLY)
}
}

0 comments on commit 38cf623

Please sign in to comment.