Skip to content

Commit

Permalink
Jena/RDF4J stream options: enable all features by default (#222)
Browse files Browse the repository at this point in the history
Issue: #221

This PR introduces a new preset in JellyOptions (allFeatures) which enables generalized statements and RDF-star. This preset will now be used as the default in Jena RIOT and RDF4J Rio, because it's better to break a SHOULD requirement than a MUST (see the issue for details).
  • Loading branch information
Ostrzyciel authored Nov 14, 2024
1 parent ef32fff commit 0327451
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 55 deletions.
16 changes: 16 additions & 0 deletions core/src/main/scala/eu/ostrzyciel/jelly/core/JellyOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ object JellyOptions:
def bigRdfStar: RdfStreamOptions =
bigStrict.withRdfStar(true)

/**
* "Big" preset suitable for high-volume streams and larger machines.
* Allows all protocol features (including generalized RDF statements and RDF-star statements).
* @return
*/
def bigAllFeatures: RdfStreamOptions =
bigStrict.withGeneralizedStatements(true).withRdfStar(true)

/**
* "Small" preset suitable for low-volume streams and smaller machines.
* Does not allow generalized RDF statements.
Expand Down Expand Up @@ -62,6 +70,14 @@ object JellyOptions:
def smallRdfStar: RdfStreamOptions =
smallStrict.withRdfStar(true)

/**
* "Small" preset suitable for low-volume streams and smaller machines.
* Allows all protocol features (including generalized RDF statements and RDF-star statements).
* @return
*/
def smallAllFeatures: RdfStreamOptions =
smallStrict.withGeneralizedStatements(true).withRdfStar(true)

/**
* Default maximum supported options for Jelly decoders.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileInputStre
class IoSerDesSpec extends AnyWordSpec, Matchers, ScalaFutures, JenaTest:
given ActorSystem = ActorSystem("test")

val presets: Seq[(RdfStreamOptions, Int, String)] = Seq(
(JellyOptions.smallGeneralized, 1, "small generalized"),
(JellyOptions.smallRdfStar, 1_000_000, "small RDF-star"),
(JellyOptions.smallStrict, 30, "small strict"),
(JellyOptions.bigGeneralized, 256, "big generalized"),
(JellyOptions.bigRdfStar, 10_000, "big RDF-star"),
(JellyOptions.bigStrict, 3, "big strict"),
val presets: Seq[(Option[RdfStreamOptions], Int, String)] = Seq(
(Some(JellyOptions.smallGeneralized), 1, "small generalized"),
(Some(JellyOptions.smallRdfStar), 1_000_000, "small RDF-star"),
(Some(JellyOptions.smallStrict), 30, "small strict"),
(Some(JellyOptions.smallAllFeatures), 13, "small all features"),
(Some(JellyOptions.bigGeneralized), 256, "big generalized"),
(Some(JellyOptions.bigRdfStar), 10_000, "big RDF-star"),
(Some(JellyOptions.bigStrict), 3, "big strict"),
(Some(JellyOptions.bigAllFeatures), 2, "big all features"),
(None, 10, "no options"),
)

val presetsUnsupported: Seq[(RdfStreamOptions, RdfStreamOptions, String)] = Seq(
Expand Down Expand Up @@ -65,7 +68,8 @@ class IoSerDesSpec extends AnyWordSpec, Matchers, ScalaFutures, JenaTest:
)
)

private def checkStreamOptions(bytes: Array[Byte], expectedType: String, expectedOpt: RdfStreamOptions) =
private def checkStreamOptions(bytes: Array[Byte], expectedType: String, expectedOpt: Option[RdfStreamOptions]) =
val expOpt = expectedOpt.getOrElse(JellyOptions.smallAllFeatures)
val frame = RdfStreamFrame.parseDelimitedFrom(new ByteArrayInputStream(bytes)).get
frame.rows.size should be > 0
frame.rows.head.row.isOptions should be (true)
Expand All @@ -76,11 +80,11 @@ class IoSerDesSpec extends AnyWordSpec, Matchers, ScalaFutures, JenaTest:
else if expectedType == "quads" then
options.physicalType should be (PhysicalStreamType.QUADS)
options.logicalType should be (LogicalStreamType.FLAT_QUADS)
options.generalizedStatements should be (expectedOpt.generalizedStatements)
options.rdfStar should be (expectedOpt.rdfStar)
options.maxNameTableSize should be (expectedOpt.maxNameTableSize)
options.maxPrefixTableSize should be (expectedOpt.maxPrefixTableSize)
options.maxDatatypeTableSize should be (expectedOpt.maxDatatypeTableSize)
options.generalizedStatements should be (expOpt.generalizedStatements)
options.rdfStar should be (expOpt.rdfStar)
options.maxNameTableSize should be (expOpt.maxNameTableSize)
options.maxPrefixTableSize should be (expOpt.maxPrefixTableSize)
options.maxDatatypeTableSize should be (expOpt.maxDatatypeTableSize)
options.version should be (Constants.protoVersion)

runTest(JenaSerDes, JenaSerDes)
Expand Down Expand Up @@ -123,7 +127,7 @@ class IoSerDesSpec extends AnyWordSpec, Matchers, ScalaFutures, JenaTest:
originalSize should be > 0L

val os = ByteArrayOutputStream()
ser.writeTriplesJelly(os, model, encOptions, 100)
ser.writeTriplesJelly(os, model, Some(encOptions), 100)
os.flush()
os.close()
val data = os.toByteArray
Expand All @@ -147,13 +151,21 @@ class IoSerDesSpec extends AnyWordSpec, Matchers, ScalaFutures, JenaTest:
os.close()
val data = os.toByteArray
data.size should be > 0
checkStreamOptions(data, "triples", preset)

val model2 = des.readTriplesJelly(ByteArrayInputStream(data), None)
val deserializedSize = summon[Measure[TMDes]].size(model2)
// Add -1 to account for the different statement counting of RDF4J and Jena
deserializedSize should be <= originalSize
deserializedSize should be >= originalSize - 1
// In case we are leaving the default settings, RDF4J has no way of knowing if the data is
// triples or quads, so it's going to default to quads.
val mayBeQuads = ser.name == "RDF4J" && preset.isEmpty
checkStreamOptions(data, if mayBeQuads then "quads" else "triples", preset)

// Do not test this if we are encoding with RDF4J with default settings with the streaming Jena parser,
// because the parser will just discard any quads completely.
// Funnily enough, this is not the case with the classic Jena parser, which forces quads in the
// default graph to be triples. Fun.
if !(mayBeQuads && des.name == "Jena (StreamRDF)") then
val model2 = des.readTriplesJelly(ByteArrayInputStream(data), None)
val deserializedSize = summon[Measure[TMDes]].size(model2)
// Add -1 to account for the different statement counting of RDF4J and Jena
deserializedSize should be <= originalSize
deserializedSize should be >= originalSize - 1
}

for (name, file) <- TestCases.quads do
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package eu.ostrzyciel.jelly.integration_tests.io

import eu.ostrzyciel.jelly.convert.jena.given
import eu.ostrzyciel.jelly.core.JellyOptions
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions
import eu.ostrzyciel.jelly.stream.*
import org.apache.jena.query.Dataset
Expand All @@ -26,15 +27,16 @@ class JenaReactiveSerDes(implicit mat: Materializer) extends NativeSerDes[Model,
JenaSerDes.readTriplesJelly(is, supportedOptions)

override def writeQuadsJelly
(os: OutputStream, dataset: Dataset, opt: RdfStreamOptions, frameSize: Int): Unit =
val f = EncoderSource.fromDatasetAsQuads(dataset, ByteSizeLimiter(32_000), opt)
(os: OutputStream, dataset: Dataset, opt: Option[RdfStreamOptions], frameSize: Int): Unit =
val f = EncoderSource.fromDatasetAsQuads
(dataset, ByteSizeLimiter(32_000), opt.getOrElse(JellyOptions.smallAllFeatures))
(using jenaIterableAdapter, jenaConverterFactory)
.runWith(JellyIo.toIoStream(os))
Await.ready(f, 10.seconds)

override def writeTriplesJelly
(os: OutputStream, model: Model, opt: RdfStreamOptions, frameSize: Int): Unit =
val f = EncoderSource.fromGraph(model, ByteSizeLimiter(32_000), opt)
(os: OutputStream, model: Model, opt: Option[RdfStreamOptions], frameSize: Int): Unit =
val f = EncoderSource.fromGraph(model, ByteSizeLimiter(32_000), opt.getOrElse(JellyOptions.smallAllFeatures))
(using jenaIterableAdapter, jenaConverterFactory)
.runWith(JellyIo.toIoStream(os))
Await.ready(f, 10.seconds)
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,17 @@ object JenaSerDes extends NativeSerDes[Model, Dataset]:
m

def writeQuadsJelly
(os: OutputStream, dataset: Dataset, opt: RdfStreamOptions, frameSize: Int): Unit =
val format = new RDFFormat(JellyLanguage.JELLY, JellyFormatVariant(opt, frameSize))
(os: OutputStream, dataset: Dataset, opt: Option[RdfStreamOptions], frameSize: Int): Unit =
var variant = JellyFormatVariant(frameSize = frameSize)
if opt.isDefined then
variant = variant.copy(opt = opt.get)
val format = new RDFFormat(JellyLanguage.JELLY, variant)
RDFDataMgr.write(os, dataset, format)

def writeTriplesJelly
(os: OutputStream, model: Model, opt: RdfStreamOptions, frameSize: Int): Unit =
val format = new RDFFormat(JellyLanguage.JELLY, JellyFormatVariant(opt, frameSize))
(os: OutputStream, model: Model, opt: Option[RdfStreamOptions], frameSize: Int): Unit =
var variant = JellyFormatVariant(frameSize = frameSize)
if opt.isDefined then
variant = variant.copy(opt = opt.get)
val format = new RDFFormat(JellyLanguage.JELLY, variant)
RDFDataMgr.write(os, model, format)
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,24 @@ object JenaStreamSerDes extends NativeSerDes[Seq[Triple], Seq[Quad]]:
.parse(StreamRDFLib.sinkQuads(sink))
sink.result

override def writeTriplesJelly(os: OutputStream, model: Seq[Triple], opt: RdfStreamOptions, frameSize: Int): Unit =
override def writeTriplesJelly(os: OutputStream, model: Seq[Triple], opt: Option[RdfStreamOptions], frameSize: Int): Unit =
val context = RIOT.getContext.copy()
.set(JellyLanguage.SYMBOL_FRAME_SIZE, frameSize)
if opt.isDefined then
// Not setting the physical type, as it should be inferred from the data.
// This emulates how RIOT initializes the stream writer in practice.
.set(JellyLanguage.SYMBOL_STREAM_OPTIONS, opt)
.set(JellyLanguage.SYMBOL_FRAME_SIZE, frameSize)
context.set(JellyLanguage.SYMBOL_STREAM_OPTIONS, opt.get)

val writerStream = StreamRDFWriter.getWriterStream(os, JellyLanguage.JELLY, context)
writerStream.start()
model.foreach(writerStream.triple)
writerStream.finish()

override def writeQuadsJelly(os: OutputStream, dataset: Seq[Quad], opt: RdfStreamOptions, frameSize: Int): Unit =
override def writeQuadsJelly(os: OutputStream, dataset: Seq[Quad], opt: Option[RdfStreamOptions], frameSize: Int): Unit =
val context = RIOT.getContext.copy()
.set(JellyLanguage.SYMBOL_STREAM_OPTIONS, opt)
.set(JellyLanguage.SYMBOL_FRAME_SIZE, frameSize)
if opt.isDefined then
context.set(JellyLanguage.SYMBOL_STREAM_OPTIONS, opt.get)

val writerStream = StreamRDFWriter.getWriterStream(os, JellyLanguage.JELLY, context)
writerStream.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ trait NativeSerDes[TModel : Measure, TDataset : Measure]:
def readQuadsW3C(is: InputStream): TDataset
def readTriplesJelly(is: InputStream, supportedOptions: Option[RdfStreamOptions]): TModel
def readQuadsJelly(is: InputStream, supportedOptions: Option[RdfStreamOptions]): TDataset
def writeTriplesJelly(os: OutputStream, model: TModel, opt: RdfStreamOptions, frameSize: Int): Unit
def writeQuadsJelly(os: OutputStream, dataset: TDataset, opt: RdfStreamOptions, frameSize: Int): Unit
def writeTriplesJelly(os: OutputStream, model: TModel, opt: Option[RdfStreamOptions], frameSize: Int): Unit
def writeQuadsJelly(os: OutputStream, dataset: TDataset, opt: Option[RdfStreamOptions], frameSize: Int): Unit
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ class Rdf4jReactiveSerDes(using Materializer) extends NativeSerDes[Seq[Statement
override def readQuadsJelly(is: InputStream, supportedOptions: Option[RdfStreamOptions]): Seq[Statement] =
read(is, supportedOptions)

override def writeTriplesJelly(os: OutputStream, model: Seq[Statement], opt: RdfStreamOptions, frameSize: Int): Unit =
override def writeTriplesJelly(os: OutputStream, model: Seq[Statement], opt: Option[RdfStreamOptions], frameSize: Int): Unit =
val f = Source.fromIterator(() => model.iterator)
.via(EncoderFlow.flatTripleStream(StreamRowCountLimiter(frameSize), opt))
.via(EncoderFlow.flatTripleStream(StreamRowCountLimiter(frameSize), opt.getOrElse(JellyOptions.smallAllFeatures)))
.runWith(JellyIo.toIoStream(os))
Await.ready(f, 10.seconds)

override def writeQuadsJelly(os: OutputStream, dataset: Seq[Statement], opt: RdfStreamOptions, frameSize: Int): Unit =
override def writeQuadsJelly(os: OutputStream, dataset: Seq[Statement], opt: Option[RdfStreamOptions], frameSize: Int): Unit =
val f = Source.fromIterator(() => dataset.iterator)
.via(EncoderFlow.flatQuadStream(StreamRowCountLimiter(frameSize), opt))
.via(EncoderFlow.flatQuadStream(StreamRowCountLimiter(frameSize), opt.getOrElse(JellyOptions.smallAllFeatures)))
.runWith(JellyIo.toIoStream(os))
Await.ready(f, 10.seconds)
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,21 @@ object Rdf4jSerDes extends NativeSerDes[Seq[Statement], Seq[Statement]]:
override def readQuadsJelly(is: InputStream, supportedOptions: Option[RdfStreamOptions]): Seq[Statement] =
read(is, rio.JELLY, supportedOptions)

private def write(os: OutputStream, model: Seq[Statement], opt: RdfStreamOptions, frameSize: Int): Unit =
val conf = JellyWriterSettings.configFromOptions(opt, frameSize)
private def write(os: OutputStream, model: Seq[Statement], opt: Option[RdfStreamOptions], frameSize: Int): Unit =
val conf = if opt.isDefined then
JellyWriterSettings.configFromOptions(opt.get, frameSize)
else JellyWriterSettings.configFromOptions(frameSize)
val writer = Rio.createWriter(rio.JELLY, os)
writer.setWriterConfig(conf)
writer.startRDF()
model.foreach(writer.handleStatement)
writer.endRDF()

override def writeTriplesJelly(os: OutputStream, model: Seq[Statement], opt: RdfStreamOptions, frameSize: Int): Unit =
override def writeTriplesJelly(os: OutputStream, model: Seq[Statement], opt: Option[RdfStreamOptions], frameSize: Int): Unit =
// We set the physical type to TRIPLES, because the writer has no way of telling triples from
// quads in RDF4J. Thus, the writer will default to QUADS.
write(os, model, opt.withPhysicalType(PhysicalStreamType.TRIPLES), frameSize)
write(os, model, opt.map(_.withPhysicalType(PhysicalStreamType.TRIPLES)), frameSize)

override def writeQuadsJelly(os: OutputStream, dataset: Seq[Statement], opt: RdfStreamOptions, frameSize: Int): Unit =
override def writeQuadsJelly(os: OutputStream, dataset: Seq[Statement], opt: Option[RdfStreamOptions], frameSize: Int): Unit =
// No need to set the physical type, because the writer will default to QUADS.
write(os, dataset, opt, frameSize)
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.apache.jena.riot.{RDFFormat, RDFFormatVariant}
* @param frameSize size of each RdfStreamFrame, in rows
*/
case class JellyFormatVariant(
opt: RdfStreamOptions = RdfStreamOptions.defaultInstance,
opt: RdfStreamOptions = JellyOptions.smallAllFeatures,
frameSize: Int = 256
) extends RDFFormatVariant(opt.toString)

Expand All @@ -22,6 +22,8 @@ object JellyFormat:
val JELLY_SMALL_STRICT = new RDFFormat(JELLY, JellyFormatVariant(JellyOptions.smallStrict))
val JELLY_SMALL_GENERALIZED = new RDFFormat(JELLY, JellyFormatVariant(JellyOptions.smallGeneralized))
val JELLY_SMALL_RDF_STAR = new RDFFormat(JELLY, JellyFormatVariant(JellyOptions.smallRdfStar))
val JELLY_SMALL_ALL_FEATURES = new RDFFormat(JELLY, JellyFormatVariant(JellyOptions.smallAllFeatures))
val JELLY_BIG_STRICT = new RDFFormat(JELLY, JellyFormatVariant(JellyOptions.bigStrict))
val JELLY_BIG_GENERALIZED = new RDFFormat(JELLY, JellyFormatVariant(JellyOptions.bigGeneralized))
val JELLY_BIG_RDF_STAR = new RDFFormat(JELLY, JellyFormatVariant(JellyOptions.bigRdfStar))
val JELLY_BIG_ALL_FEATURES = new RDFFormat(JELLY, JellyFormatVariant(JellyOptions.bigAllFeatures))
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ object JellyLanguage:
/**
* The Jelly language constant for use in Apache Jena RIOT.
*
* This uses by default [[JellyFormat.JELLY_SMALL_STRICT]] for serialization.
* This uses by default [[JellyFormat.JELLY_SMALL_ALL_FEATURES]] for serialization, assuming pessimistically
* that the user may want to use all features of the protocol.
*
* If you are not intending to use generalized RDF or RDF-star, you may want to use
* [[JellyFormat.JELLY_SMALL_STRICT]].
*/
val JELLY: Lang = LangBuilder.create(jellyName, jellyContentType)
.addAltNames("JELLY")
Expand Down Expand Up @@ -66,18 +70,20 @@ object JellyLanguage:
RDFLanguages.register(JELLY)

// Default serialization format
RDFWriterRegistry.register(JELLY, JellyFormat.JELLY_SMALL_STRICT)
RDFWriterRegistry.register(JELLY, JellyFormat.JELLY_SMALL_ALL_FEATURES)
// Register also the streaming writer
StreamRDFWriter.register(JELLY, JellyFormat.JELLY_SMALL_STRICT)
StreamRDFWriter.register(JELLY, JellyFormat.JELLY_SMALL_ALL_FEATURES)

// Register the writers
val allFormats = List(
JELLY_SMALL_STRICT,
JELLY_SMALL_GENERALIZED,
JELLY_SMALL_RDF_STAR,
JELLY_SMALL_ALL_FEATURES,
JELLY_BIG_STRICT,
JELLY_BIG_GENERALIZED,
JELLY_BIG_RDF_STAR
JELLY_BIG_RDF_STAR,
JELLY_BIG_ALL_FEATURES,
)

for format <- allFormats do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import org.eclipse.rdf4j.rio.WriterConfig
import org.eclipse.rdf4j.rio.helpers.*

object JellyWriterSettings:
def configFromOptions(frameSize: Long): WriterConfig =
val c = new WriterConfig()
c.set(FRAME_SIZE, frameSize)
c

def configFromOptions(opt: RdfStreamOptions, frameSize: Long = 256L): WriterConfig =
val c = new WriterConfig()
c.set(FRAME_SIZE, frameSize)
Expand Down Expand Up @@ -33,19 +38,21 @@ object JellyWriterSettings:
val PHYSICAL_TYPE = new ClassRioSetting[PhysicalStreamType](
"eu.ostrzyciel.jelly.convert.rdf4j.rio.physicalType",
"Physical stream type",
PhysicalStreamType.TRIPLES
PhysicalStreamType.QUADS
)

val ALLOW_GENERALIZED_STATEMENTS = new BooleanRioSetting(
"eu.ostrzyciel.jelly.convert.rdf4j.rio.allowGeneralizedStatements",
"Allow generalized statements",
false
"Allow generalized statements. Enabled by default, because we cannot know this in advance. " +
"If your data does not contain generalized statements, it is recommended that you set this to false.",
true
)

val ALLOW_RDF_STAR = new BooleanRioSetting(
"eu.ostrzyciel.jelly.convert.rdf4j.rio.allowRdfStar",
"Allow RDF-star statements",
false
"Allow RDF-star statements. Enabled by default, because we cannot know this in advance. " +
"If your data does not contain RDF-star statements, it is recommended that you set this to false.",
true
)

val MAX_NAME_TABLE_SIZE = new LongRioSetting(
Expand Down

0 comments on commit 0327451

Please sign in to comment.