Skip to content

Commit

Permalink
Jena & RDF4J: autodetect if parsed file is delimited or not (Jelly-RD…
Browse files Browse the repository at this point in the history
…F#185)

* Jena & RDF4J: autodetect if parsed file is delimited or not

Continuation of Jelly-RDF#184. On top of support for this functionality in Jena, I've added the same for RDF4J and added a bunch of integration tests.
  • Loading branch information
Ostrzyciel authored Oct 19, 2024
1 parent 7acae0c commit 2016a47
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 26 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/eu/ostrzyciel/jelly/core/IoUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import java.io.{ByteArrayInputStream, InputStream, SequenceInputStream}

object IoUtils:
/**
* Guesses whether the input stream is a non-delimited Jelly file or a delimited Jelly file.
* Autodetects whether the input stream is a non-delimited Jelly file or a delimited Jelly file.
*
* To do this, the first three bytes in the stream are peeked.
* The two bytes are then put back into the stream, and the stream is returned.
* @param in the input stream
* @return (isDelimited, newInputStream) where isDelimited is true if the stream is a delimited Jelly file
*/
def guessDelimiting(in: InputStream): (Boolean, InputStream) =
def autodetectDelimiting(in: InputStream): (Boolean, InputStream) =
val scout = in.readNBytes(3)
val scoutIn = ByteArrayInputStream(scout)
val newInput = SequenceInputStream(scoutIn, in)
Expand Down
12 changes: 6 additions & 6 deletions core/src/test/scala/eu/ostrzyciel/jelly/core/IoUtilsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class IoUtilsSpec extends AnyWordSpec, Matchers:
bytes(1) should not be 0x0A

val in = new ByteArrayInputStream(bytes)
val (isDelimited, newIn) = IoUtils.guessDelimiting(in)
val (isDelimited, newIn) = IoUtils.autodetectDelimiting(in)
isDelimited shouldBe false
newIn.readAllBytes() shouldBe bytes
}
Expand All @@ -44,7 +44,7 @@ class IoUtilsSpec extends AnyWordSpec, Matchers:
bytes(1) shouldBe 0x0A

val in = new ByteArrayInputStream(bytes)
val (isDelimited, newIn) = IoUtils.guessDelimiting(in)
val (isDelimited, newIn) = IoUtils.autodetectDelimiting(in)
isDelimited shouldBe true
newIn.readAllBytes() shouldBe bytes
}
Expand All @@ -56,7 +56,7 @@ class IoUtilsSpec extends AnyWordSpec, Matchers:
bytes(1) should not be 0x0A

val in = new ByteArrayInputStream(bytes)
val (isDelimited, newIn) = IoUtils.guessDelimiting(in)
val (isDelimited, newIn) = IoUtils.autodetectDelimiting(in)
isDelimited shouldBe false
newIn.readAllBytes() shouldBe bytes
}
Expand All @@ -71,7 +71,7 @@ class IoUtilsSpec extends AnyWordSpec, Matchers:
bytes(2) should not be 0x0A

val in = new ByteArrayInputStream(bytes)
val (isDelimited, newIn) = IoUtils.guessDelimiting(in)
val (isDelimited, newIn) = IoUtils.autodetectDelimiting(in)
isDelimited shouldBe true
newIn.readAllBytes() shouldBe bytes
}
Expand All @@ -84,7 +84,7 @@ class IoUtilsSpec extends AnyWordSpec, Matchers:
bytes(2) shouldBe 0x0A

val in = new ByteArrayInputStream(bytes)
val (isDelimited, newIn) = IoUtils.guessDelimiting(in)
val (isDelimited, newIn) = IoUtils.autodetectDelimiting(in)
isDelimited shouldBe false
newIn.readAllBytes() shouldBe bytes
}
Expand All @@ -99,7 +99,7 @@ class IoUtilsSpec extends AnyWordSpec, Matchers:
bytes(3) shouldBe 0x0A

val in = new ByteArrayInputStream(bytes)
val (isDelimited, newIn) = IoUtils.guessDelimiting(in)
val (isDelimited, newIn) = IoUtils.autodetectDelimiting(in)
isDelimited shouldBe true
newIn.readAllBytes() shouldBe bytes
}
Expand Down
1 change: 1 addition & 0 deletions docs/docs/getting-started-plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ You can simply add Jelly format support to an application based on RDF4J with Je
The Jelly-JVM plugin JARs provide the following features:

- Full support for parsing and serialization of RDF data (triples and quads) in the Jelly format.
- The parser will automatically detect if the input data [is delimited or not]({{ proto_link('user-guide#delimited-vs-non-delimited-jelly') }}). Both delimited and non-delimited Jelly data can be parsed.
- In Apache Jena also the stream serialization is supported.
- Recognizing the `.jelly` file extension.
- Recognizing the `application/x-jelly-rdf` media type.
Expand Down
3 changes: 2 additions & 1 deletion docs/docs/user/jena.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ The module also implements the {{ javadoc_link_pretty('core', 'IterableAdapter')
Usage notes:

- {{ javadoc_link_pretty('core', 'JellyOptions$') }} provides a few common presets for Jelly serialization options construct a `JellyFormatVariant`, as shown in the example above. You can also further customize the serialization options (e.g., dictionary size).
- The RIOT integration implements only the [delimited variant of Jelly]({{ proto_link('user-guide#delimited-vs-non-delimited-jelly') }}). It is used for writing Jelly to files on disk or sockets. Because of this, RIOT will not parse non-delimited Jelly data (e.g., a single message in a Kafka stream). For this, you should use the `jelly-stream` module or the more low-level API: [Low-level usage](low-level.md).
- The RIOT writer (serializer) integration implements only the [delimited variant of Jelly]({{ proto_link('user-guide#delimited-vs-non-delimited-jelly') }}). It is used for writing Jelly to files on disk or sockets. Because of this, you cannot use RIOT to write non-delimited Jelly data (e.g., a single message to a Kafka stream). For this, you should use the `jelly-stream` module or the more low-level API: [Low-level usage](low-level.md).
- However, the RIOT parser (deserializer) integration will automatically detect if the parsed Jelly data is delimited or not. If it's non-delimited, the parser will assume that there is only one `RdfStreamFrame` in the file.
- Jelly's parsers and writers are registered in the {{ javadoc_link_pretty('jena', 'riot.JellyLanguage$') }} object ([source code]({{ git_link('jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyLanguage.scala') }})). This registration should happen automatically when you include the `jelly-jena` module in your project, using Jena's [component initialization mechanism](https://jena.apache.org/documentation/notes/system-initialization.html).

## Streaming serialization with RIOT
Expand Down
1 change: 1 addition & 0 deletions docs/docs/user/low-level.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
To parse a serialized stream frame into triples/quads:

1. Call {{ javadoc_link_pretty('core', 'proto.v1.RdfStreamFrame$', 'parseFrom') }} if it's a non-delimited frame (like you would see, e.g., in a Kafka or gRPC stream), or `parseDelimitedFrom` if it's a [delimited stream]({{ proto_link('user-guide#delimited-vs-non-delimited-jelly') }}) (like you would see in a file or a socket).
- There is also a utility method to detect if the stream is delimited or not: {{ javadoc_link_pretty('core', 'IoUtils$', 'autodetectDelimiting') }}. In most cases you will not need to use it. It is used internally by the Jena and RDF4J integrations for user convenience.
2. Obtain a decoder that turns `RdfStreamFrame`s into triples/quads: {{ javadoc_link_pretty('jena', 'JenaConverterFactory$') }} has different methods for [different physical stream types]({{ proto_link('user-guide#stream-types') }}):
- `anyStatementDecoder` for any physical stream type, outputs `Triple` or `Quad`
- `triplesDecoder` for TRIPLES streams, outputs `Triple`
Expand Down
3 changes: 2 additions & 1 deletion docs/docs/user/rdf4j.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ The module also implements the {{ javadoc_link_pretty('core', 'IterableAdapter')
Usage notes:

- {{ javadoc_link_pretty('core', 'JellyOptions$') }} provides a few common presets for Jelly serialization options. These options are passed through {{ javadoc_link_pretty('rdf4j', 'rio.JellyWriterSettings$', 'configFromOptions') }} and used to configure the writer, as shown in the example above. You can also further customize the serialization options (e.g., dictionary size).
- The RDF4J Rio integration implements only the [delimited variant of Jelly]({{ proto_link('user-guide#delimited-vs-non-delimited-jelly') }}). It is used for writing Jelly to files on disk or sockets. Because of this, Rio will not parse non-delimited Jelly data (e.g., a single message in a Kafka stream). For this, you should use the `jelly-stream` module or the more low-level API: [Low-level usage](low-level.md).
- The RDF4J Rio writer (serializer) integration implements only the [delimited variant of Jelly]({{ proto_link('user-guide#delimited-vs-non-delimited-jelly') }}). It is used for writing Jelly to files on disk or sockets. Because of this, you cannot use Rio to write non-delimited Jelly data (e.g., a single message to a Kafka stream). For this, you should use the `jelly-stream` module or the more low-level API: [Low-level usage](low-level.md).
- However, the Rio parser (deserializer) integration will automatically detect if the parsed Jelly data is delimited or not. If it's non-delimited, the parser will assume that there is only one `RdfStreamFrame` in the file.
- Jelly's parsers and writers are in the {{ javadoc_link_pretty('rdf4j', 'rio') }} package ([source code]({{ git_link('rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio') }})). They are automatically registered on startup using the `RDFParserFactory` and `RDFWriterFactory` [SPIs](https://en.wikipedia.org/wiki/Service_provider_interface) provided by RDF4J.

## See also
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package eu.ostrzyciel.jelly.integration_tests.io

import eu.ostrzyciel.jelly.convert.jena.JenaConverterFactory
import eu.ostrzyciel.jelly.core.JellyOptions
import eu.ostrzyciel.jelly.core.proto.v1.*
import eu.ostrzyciel.jelly.integration_tests.TestCases
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import java.io.{ByteArrayInputStream, FileInputStream}
import scala.jdk.CollectionConverters.*

/**
* Test checking if the delimited/non-delimited auto-detection works correctly.
*
* This test only contains non-delimited tests. For the delimited ones, see:
* [[eu.ostrzyciel.jelly.integration_tests.io.IoSerDesSpec]].
* More fine-grained tests for delimited/non-delimited detection can be found in the jelly-core module.
*/
class NonDelimitedDesSpec extends AnyWordSpec, Matchers:

val presets: Seq[(RdfStreamOptions, String)] = Seq(
(JellyOptions.smallGeneralized, "small generalized"),
(JellyOptions.bigGeneralized, "big generalized"),
).map(
(opt, name) => (opt.copy(physicalType = PhysicalStreamType.TRIPLES), name)
)

val methods = Seq(
(JenaSerDes, "Jena RIOT"),
(Rdf4jSerDes, "RDF4J Rio"),
)

for (caseName, file) <- TestCases.triples do
val model = JenaSerDes.readTriplesW3C(new FileInputStream(file))
val originalSize = model.size()
for preset <- presets do
val (options, presetName) = preset
val encoder = JenaConverterFactory.encoder(options)
val rows = model.getGraph.find().asScala.flatMap(encoder.addTripleStatement).toSeq
val frame = RdfStreamFrame(rows)
val bytes = frame.toByteArray

runTest(JenaSerDes, "Jena RIOT")
runTest(Rdf4jSerDes, "RDF4J Rio")

def runTest[TMDes: Measure](method: NativeSerDes[TMDes, ?], methodName: String) =
f"$methodName" should {
f"deserialize non-delimited triples from $presetName ($caseName)" in {
val deserialized = method.readTriplesJelly(new ByteArrayInputStream(bytes), None)
summon[Measure[TMDes]].size(deserialized) shouldEqual originalSize
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ object JellyReader extends ReaderRIOT:
override def read(reader: Reader, baseURI: String, ct: ContentType, output: StreamRDF, context: Context): Unit =
throw new RiotException("RDF Jelly: Reading binary data from a java.io.Reader is not supported. " +
"Please use an InputStream.")



/**
* Reads Jelly RDF data from an InputStream.
* Automatically detects whether the input is a single frame (non-delimited) or a stream of frames (delimited).
*/
override def read(in: InputStream, baseURI: String, ct: ContentType, output: StreamRDF, context: Context): Unit =
// Get the supported options specified by the user in the context -- or the default if not available
val supportedOptions = context.get[RdfStreamOptions](
Expand All @@ -38,19 +41,18 @@ object JellyReader extends ReaderRIOT:

output.start()
try {
IoUtils.guessDelimiting(in) match
IoUtils.autodetectDelimiting(in) match
case (false, newIn) =>
// Non-delimited Jelly file
// In this case, we can only read one frame
val frame = RdfStreamFrame.parseFrom(newIn)
processFrame(frame)
case (true, newIn) =>
// Delimited Jelly file
// In this case, we can read multiple frames
Iterator.continually(RdfStreamFrame.parseDelimitedFrom(newIn))
.takeWhile(_.isDefined)
.foreach { maybeFrame =>
val frame = maybeFrame.get
processFrame(frame)
}
.foreach { maybeFrame => processFrame(maybeFrame.get) }
}
finally {
output.finish()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package eu.ostrzyciel.jelly.convert.rdf4j.rio

import eu.ostrzyciel.jelly.convert.rdf4j.Rdf4jConverterFactory
import eu.ostrzyciel.jelly.core.IoUtils
import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamFrame, RdfStreamOptions}
import org.eclipse.rdf4j.rio.{RDFFormat, RioSetting}
import org.eclipse.rdf4j.rio.helpers.AbstractRDFParser
Expand All @@ -23,6 +24,10 @@ final class JellyParser extends AbstractRDFParser:
s.add(MAX_DATATYPE_TABLE_SIZE)
s

/**
* Read Jelly RDF data from an InputStream.
* Automatically detects whether the input is a single frame (non-delimited) or a stream of frames (delimited).
*/
override def parse(in: InputStream, baseURI: String): Unit =
if (in == null) throw new IllegalArgumentException("Input stream must not be null")

Expand All @@ -35,18 +40,26 @@ final class JellyParser extends AbstractRDFParser:
maxDatatypeTableSize = config.get(MAX_DATATYPE_TABLE_SIZE).toInt,
version = config.get(PROTO_VERSION).toInt,
)))
inline def processFrame(f: RdfStreamFrame): Unit =
for row <- f.rows do
decoder.ingestRow(row) match
case Some(st) => rdfHandler.handleStatement(st)
case None => ()

rdfHandler.startRDF()
try {
Iterator.continually(RdfStreamFrame.parseDelimitedFrom(in))
.takeWhile(_.isDefined)
.foreach { maybeFrame =>
val frame = maybeFrame.get
for row <- frame.rows do
decoder.ingestRow(row) match
case Some(st) => rdfHandler.handleStatement(st)
case None => ()
}
IoUtils.autodetectDelimiting(in) match
case (false, newIn) =>
// Non-delimited Jelly file
// In this case, we can only read one frame
val frame = RdfStreamFrame.parseFrom(newIn)
processFrame(frame)
case (true, newIn) =>
// Delimited Jelly file
// In this case, we can read multiple frames
Iterator.continually(RdfStreamFrame.parseDelimitedFrom(newIn))
.takeWhile(_.isDefined)
.foreach { maybeFrame => processFrame(maybeFrame.get) }
}
finally {
rdfHandler.endRDF()
Expand Down

0 comments on commit 2016a47

Please sign in to comment.