Skip to content

Commit

Permalink
Experiment: guess if the parsed file is delimited or not (#184)
Browse files Browse the repository at this point in the history
Experimental implementation, should work. I'll merge it into main as-is, and finish the docs, tests, and RDF4J later.
  • Loading branch information
Ostrzyciel authored Oct 19, 2024
1 parent 1466493 commit c7f830e
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 11 deletions.
30 changes: 30 additions & 0 deletions core/src/main/scala/eu/ostrzyciel/jelly/core/IoUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package eu.ostrzyciel.jelly.core

import java.io.{ByteArrayInputStream, InputStream, SequenceInputStream}

object IoUtils:
/**
* Guesses whether the input stream is a non-delimited Jelly file or a delimited Jelly file.
*
* To do this, the first three bytes in the stream are peeked.
* The two bytes are then put back into the stream, and the stream is returned.
* @param in the input stream
* @return (isDelimited, newInputStream) where isDelimited is true if the stream is a delimited Jelly file
*/
def guessDelimiting(in: InputStream): (Boolean, InputStream) =
val scout = in.readNBytes(3)
val scoutIn = ByteArrayInputStream(scout)
val newInput = SequenceInputStream(scoutIn, in)
// Truth table (notation: 0A = 0x0A, NN = not 0x0A, ?? = don't care):
// NN ?? ?? -> delimited (all non-delimited start with 0A)
// 0A NN ?? -> non-delimited
// 0A 0A NN -> delimited (total message size = 10)
// 0A 0A 0A -> non-delimited (stream options size = 10)
//
// A case like "0A 0A 0A 0A" in the delimited variant is impossible. It would mean that the whole message
// is 10 bytes long, while stream options alone are 10 bytes long.
// Yeah, it's magic. But it works.
val isDelimited = (
scout(0) != 0x0A || scout(1) == 0x0A && scout(2) != 0x0A
)
(isDelimited, newInput)
108 changes: 108 additions & 0 deletions core/src/test/scala/eu/ostrzyciel/jelly/core/IoUtilsSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package eu.ostrzyciel.jelly.core

import eu.ostrzyciel.jelly.core.proto.v1.*
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

class IoUtilsSpec extends AnyWordSpec, Matchers:
private val frameLarge = RdfStreamFrame(Seq(
RdfStreamRow(RdfStreamRow.Row.Name(
RdfNameEntry(1, "name name name name")
))
))
private val frameSize10 = RdfStreamFrame(Seq(
RdfStreamRow(RdfStreamRow.Row.Name(
RdfNameEntry(0, "name")
))
))
private val frameOptionsSize10 = RdfStreamFrame(Seq(
RdfStreamRow(RdfStreamRow.Row.Options(
RdfStreamOptions(streamName = "name12")
))
))

"IoUtils" should {
"guessDelimiting" when {
"input stream is a non-delimited Jelly message (size >10)" in {
val bytes = frameLarge.toByteArray
bytes(0) shouldBe 0x0A
bytes(1) should not be 0x0A

val in = new ByteArrayInputStream(bytes)
val (isDelimited, newIn) = IoUtils.guessDelimiting(in)
isDelimited shouldBe false
newIn.readAllBytes() shouldBe bytes
}

"input stream is a delimited Jelly message (size >10)" in {
val os = ByteArrayOutputStream()
frameLarge.writeDelimitedTo(os)
val bytes = os.toByteArray
bytes(0) should not be 0x0A
bytes(1) shouldBe 0x0A

val in = new ByteArrayInputStream(bytes)
val (isDelimited, newIn) = IoUtils.guessDelimiting(in)
isDelimited shouldBe true
newIn.readAllBytes() shouldBe bytes
}

"input stream is a non-delimited Jelly message (size=10)" in {
val bytes = frameSize10.toByteArray
bytes.size shouldBe 10
bytes(0) shouldBe 0x0A
bytes(1) should not be 0x0A

val in = new ByteArrayInputStream(bytes)
val (isDelimited, newIn) = IoUtils.guessDelimiting(in)
isDelimited shouldBe false
newIn.readAllBytes() shouldBe bytes
}

"input stream is a delimited Jelly message (size=10)" in {
val os = ByteArrayOutputStream()
frameSize10.writeDelimitedTo(os)
val bytes = os.toByteArray
bytes.size shouldBe 11
bytes(0) shouldBe 0x0A
bytes(1) shouldBe 0x0A
bytes(2) should not be 0x0A

val in = new ByteArrayInputStream(bytes)
val (isDelimited, newIn) = IoUtils.guessDelimiting(in)
isDelimited shouldBe true
newIn.readAllBytes() shouldBe bytes
}

"input stream is a non-delimited Jelly message (options size =10)" in {
frameOptionsSize10.rows(0).toByteArray.size shouldBe 10
val bytes = frameOptionsSize10.toByteArray
bytes(0) shouldBe 0x0A
bytes(1) shouldBe 0x0A
bytes(2) shouldBe 0x0A

val in = new ByteArrayInputStream(bytes)
val (isDelimited, newIn) = IoUtils.guessDelimiting(in)
isDelimited shouldBe false
newIn.readAllBytes() shouldBe bytes
}

"input stream is a delimited Jelly message (options size =10)" in {
val os = ByteArrayOutputStream()
frameOptionsSize10.writeDelimitedTo(os)
val bytes = os.toByteArray
bytes(0) should not be 0x0A
bytes(1) shouldBe 0x0A
bytes(2) shouldBe 0x0A
bytes(3) shouldBe 0x0A

val in = new ByteArrayInputStream(bytes)
val (isDelimited, newIn) = IoUtils.guessDelimiting(in)
isDelimited shouldBe true
newIn.readAllBytes() shouldBe bytes
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package eu.ostrzyciel.jelly.convert.jena.riot

import eu.ostrzyciel.jelly.convert.jena.JenaConverterFactory
import eu.ostrzyciel.jelly.core.Constants.*
import eu.ostrzyciel.jelly.core.JellyOptions
import eu.ostrzyciel.jelly.core.{IoUtils, JellyOptions}
import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamFrame, RdfStreamOptions}
import org.apache.jena.atlas.web.ContentType
import org.apache.jena.graph.Triple
Expand All @@ -29,18 +29,28 @@ object JellyReader extends ReaderRIOT:
JellyLanguage.SYMBOL_SUPPORTED_OPTIONS, JellyOptions.defaultSupportedOptions
)
val decoder = JenaConverterFactory.anyStatementDecoder(Some(supportedOptions))
inline def processFrame(f: RdfStreamFrame): Unit =
for row <- f.rows do
decoder.ingestRow(row) match
case Some(st: Triple) => output.triple(st)
case Some(st: Quad) => output.quad(st)
case None => ()

output.start()
try {
Iterator.continually(RdfStreamFrame.parseDelimitedFrom(in))
.takeWhile(_.isDefined)
.foreach { maybeFrame =>
val frame = maybeFrame.get
for row <- frame.rows do
decoder.ingestRow(row) match
case Some(st: Triple) => output.triple(st)
case Some(st: Quad) => output.quad(st)
case None => ()
}
IoUtils.guessDelimiting(in) match
case (false, newIn) =>
// Non-delimited Jelly file
val frame = RdfStreamFrame.parseFrom(newIn)
processFrame(frame)
case (true, newIn) =>
// Delimited Jelly file
Iterator.continually(RdfStreamFrame.parseDelimitedFrom(newIn))
.takeWhile(_.isDefined)
.foreach { maybeFrame =>
val frame = maybeFrame.get
processFrame(frame)
}
}
finally {
output.finish()
Expand Down

0 comments on commit c7f830e

Please sign in to comment.