diff --git a/core/src/main/java/eu/ostrzyciel/jelly/core/EncoderLookup.java b/core/src/main/java/eu/ostrzyciel/jelly/core/EncoderLookup.java index cfebf1c..0f824ee 100644 --- a/core/src/main/java/eu/ostrzyciel/jelly/core/EncoderLookup.java +++ b/core/src/main/java/eu/ostrzyciel/jelly/core/EncoderLookup.java @@ -1,5 +1,6 @@ package eu.ostrzyciel.jelly.core; +import java.util.Arrays; import java.util.HashMap; /** @@ -158,4 +159,16 @@ public LookupEntry getOrAddEntry(String key) { entryForReturns.getId = id; return entryForReturns; } + + public void clear() { + map.clear(); + Arrays.fill(table, 0); + tail = 0; + used = 0; + lastSetId = -1000; + if (useSerials) { + Arrays.fill(serials, 0); + serials[0] = -1; + } + } } diff --git a/core/src/main/java/eu/ostrzyciel/jelly/core/TranscoderLookup.java b/core/src/main/java/eu/ostrzyciel/jelly/core/TranscoderLookup.java new file mode 100644 index 0000000..764caba --- /dev/null +++ b/core/src/main/java/eu/ostrzyciel/jelly/core/TranscoderLookup.java @@ -0,0 +1,81 @@ +package eu.ostrzyciel.jelly.core; + +class TranscoderLookup { + private final int outputSize; + private int[] table; + private final EncoderLookup lookup; + + // 0-compression: + // - for prefixes and datatypes: no worries about splicing, because zeroes are not allowed at the start of the + // stream. While splitting, we need to check for zeroes at the start of the stream and remap them. + // - IRI names: remap all 0s forcefully + private final boolean isNameLookup; + private boolean isFirstElement = true; + private int lastSetId = 0; + private int lastInputGetId = 0; + private int lastOutputGetId = 0; + + TranscoderLookup(boolean isNameLookup, int outputSize) { + this.isNameLookup = isNameLookup; + this.outputSize = outputSize; + this.lookup = new EncoderLookup(outputSize, false); + } + + EncoderLookup.LookupEntry addEntry(int originalId, String value) { + if (originalId == 0) { + originalId = ++lastSetId; + } else { + lastSetId = originalId; + } + EncoderLookup.LookupEntry entry = lookup.getOrAddEntry(value); + table[originalId] = entry.getId; + return entry; + } + + int remap(int id) { + if (isNameLookup) { + if (id == 0) { + if (isFirstElement) { + lastInputGetId = id = 1; + } else { + id = ++lastInputGetId; + } + } else { + lastInputGetId = id; + } + isFirstElement = false; + int outputId = table[id]; + lookup.onAccess(outputId); + if (outputId == lastOutputGetId + 1) { + lastOutputGetId++; + return 0; + } + lastOutputGetId = outputId; + return outputId; + } + if (id == 0) { + // No need to do onAccess here, because this is the same as the last element + if (isFirstElement) { + // We are starting a new output stream, so we need to remap the first zero. + isFirstElement = false; + return lastInputGetId; + } + return 0; + } + lastInputGetId = id; + id = table[id]; + lookup.onAccess(id); + isFirstElement = false; + return id; + } + + void newInputStream(int size) { + if (size > outputSize) { + throw new IllegalArgumentException("Input lookup size cannot be greater than the output lookup size"); + } + if (table == null || table.length < size + 1) { + table = new int[size + 1]; + } + isFirstElement = true; + } +} diff --git a/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoEncoder.scala b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoEncoder.scala index 3669426..8fe7bc3 100644 --- a/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoEncoder.scala +++ b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoEncoder.scala @@ -2,7 +2,7 @@ package eu.ostrzyciel.jelly.core import eu.ostrzyciel.jelly.core.proto.v1.* -import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import scala.collection.mutable.ArrayBuffer object ProtoEncoder: private val graphEnd = Seq(RdfStreamRow(RdfGraphEnd.defaultInstance)) @@ -149,12 +149,12 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS options.maxNameTableSize, Math.max(Math.min(options.maxNameTableSize, 1024), 256), ) - private var emittedOptions = false + private var emittedOptions: Boolean = false private val lastSubject: LastNodeHolder[TNode] = new LastNodeHolder() private val lastPredicate: LastNodeHolder[TNode] = new LastNodeHolder() private val lastObject: LastNodeHolder[TNode] = new LastNodeHolder() - private var lastGraph: TNode | LastNodeHolder.NoValue.type = LastNodeHolder.NoValue + private var lastGraph: TNode | LastNodeHolder.NoValue.type = LastNodeHolder.NoValue private def nodeToProtoWrapped(node: TNode, lastNodeHolder: LastNodeHolder[TNode]): SpoTerm = if node.equals(lastNodeHolder.node) then null diff --git a/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoTranscoder.scala b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoTranscoder.scala new file mode 100644 index 0000000..5478591 --- /dev/null +++ b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoTranscoder.scala @@ -0,0 +1,17 @@ +package eu.ostrzyciel.jelly.core + +import eu.ostrzyciel.jelly.core.JellyOptions +import eu.ostrzyciel.jelly.core.proto.v1.* + +object ProtoTranscoder: + def fastMergingTranscoderUnsafe(outputOptions: RdfStreamOptions): ProtoTranscoder = + ProtoTranscoderImpl(None, outputOptions) + + def fastMergingTranscoder( + supportedInputOptions: RdfStreamOptions, outputOptions: RdfStreamOptions + ): ProtoTranscoder = ProtoTranscoderImpl(Some(supportedInputOptions), outputOptions) + +trait ProtoTranscoder: + def ingestRow(row: RdfStreamRow): Iterable[RdfStreamRow] + + def ingestFrame(frame: RdfStreamFrame): RdfStreamFrame diff --git a/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoTranscoderImpl.scala b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoTranscoderImpl.scala new file mode 100644 index 0000000..d4a4f73 --- /dev/null +++ b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoTranscoderImpl.scala @@ -0,0 +1,155 @@ +package eu.ostrzyciel.jelly.core + +import eu.ostrzyciel.jelly.core.JellyExceptions.RdfProtoDeserializationError +import eu.ostrzyciel.jelly.core.proto.v1.* + +import scala.annotation.switch +import scala.collection.mutable.ArrayBuffer + +// Note: the caller is responsible for setting a physical stream type in the output! + +private final class ProtoTranscoderImpl( + supportedInputOptions: Option[RdfStreamOptions], + outputOptions: RdfStreamOptions +) extends ProtoTranscoder: + override def ingestRow(row: RdfStreamRow): Iterable[RdfStreamRow] = + rowBuffer.clear() + processRow(row) + rowBuffer.toSeq + + override def ingestFrame(frame: RdfStreamFrame): RdfStreamFrame = + rowBuffer.clear() + for row <- frame.rows do + processRow(row) + val newFrame = RdfStreamFrame(rowBuffer.toSeq) + newFrame + + // Transcoder state + private val checkInputOptions = supportedInputOptions.isDefined + private val prefixLookup: TranscoderLookup = new TranscoderLookup(false, outputOptions.maxPrefixTableSize) + private val nameLookup: TranscoderLookup = new TranscoderLookup(true, outputOptions.maxNameTableSize) + private val datatypeLookup: TranscoderLookup = new TranscoderLookup(false, outputOptions.maxDatatypeTableSize) + + // Current input stream state + private var inputOptions: RdfStreamOptions = null + private var inputUsesPrefixes = false + + // Current output stream state + private val rowBuffer = new ArrayBuffer[RdfStreamRow](128) + private var changeInTerms = false + private var emittedOptions = false + + private def processRow(row: RdfStreamRow): Unit = + val r = row.row + if r == null then + throw new RdfProtoDeserializationError("Row kind is not set.") + (r.streamRowValueNumber: @switch) match + case RdfStreamRow.OPTIONS_FIELD_NUMBER => handleOptions(r.options) + case RdfStreamRow.TRIPLE_FIELD_NUMBER => handleTriple(row) + case RdfStreamRow.QUAD_FIELD_NUMBER => handleQuad(row) + case RdfStreamRow.GRAPH_START_FIELD_NUMBER => + case RdfStreamRow.GRAPH_END_FIELD_NUMBER => rowBuffer.append(row) + case RdfStreamRow.NAME_FIELD_NUMBER => + val name = r.name + val entry = nameLookup.addEntry(name.id, name.value) + if !entry.newEntry then () + else if entry.setId == name.id then rowBuffer.append(row) + else rowBuffer.append(RdfStreamRow(RdfNameEntry(entry.setId, name.value))) + case RdfStreamRow.PREFIX_FIELD_NUMBER => + val prefix = r.prefix + val entry = prefixLookup.addEntry(prefix.id, prefix.value) + if !entry.newEntry then () + else if entry.setId == prefix.id then rowBuffer.append(row) + else rowBuffer.append(RdfStreamRow(RdfPrefixEntry(entry.setId, prefix.value))) + case RdfStreamRow.DATATYPE_FIELD_NUMBER => + val datatype = r.datatype + val entry = datatypeLookup.addEntry(datatype.id, datatype.value) + if !entry.newEntry then () + else if entry.setId == datatype.id then rowBuffer.append(row) + else rowBuffer.append(RdfStreamRow(RdfDatatypeEntry(entry.setId, datatype.value))) + case _ => + // This case should never happen + throw new RdfProtoDeserializationError("Row kind is not set.") + + private def handleTriple(row: RdfStreamRow): Unit = + this.changeInTerms = false + val triple = row.row.triple + val s1 = handleSpoTerm(triple.subject) + val p1 = handleSpoTerm(triple.predicate) + val o1 = handleSpoTerm(triple.`object`) + if changeInTerms then + rowBuffer.append(RdfStreamRow(RdfTriple(s1, p1, o1))) + else rowBuffer.append(row) + + private def handleQuad(row: RdfStreamRow): Unit = + this.changeInTerms = false + val quad = row.row.quad + val s1 = handleSpoTerm(quad.subject) + val p1 = handleSpoTerm(quad.predicate) + val o1 = handleSpoTerm(quad.`object`) + val g1 = handleGraphTerm(quad.graph) + if changeInTerms then + rowBuffer.append(RdfStreamRow(RdfQuad(s1, p1, o1, g1))) + else rowBuffer.append(row) + + private def handleSpoTerm(term: SpoTerm): SpoTerm = + if term == null then null + else if term.isIri then handleIri(term.iri) + else if term.isBnode then term + else if term.isLiteral then handleLiteral(term.literal) + else if term.isTripleTerm then handleTripleTerm(term.tripleTerm) + else throw new RdfProtoDeserializationError("Unknown term type.") + + private def handleGraphTerm(term: GraphTerm): GraphTerm = + if term == null then null + else if term.isIri then handleIri(term.iri) + else if term.isDefaultGraph then term + else if term.isBnode then term + else if term.isLiteral then handleLiteral(term.literal) + else throw new RdfProtoDeserializationError("Unknown term type.") + + private def handleIri(iri: RdfIri): RdfIri = + val prefix = iri.prefixId + val name = iri.nameId + val prefix1 = if inputUsesPrefixes then prefixLookup.remap(prefix) else 0 + val name1 = nameLookup.remap(name) + if prefix1 != prefix || name1 != name then + changeInTerms = true + RdfIri(prefix1, name1) + else iri + + private def handleLiteral(literal: RdfLiteral): RdfLiteral = + if !literal.literalKind.isDatatype then literal + else + val dt = literal.literalKind.datatype + val dt1 = datatypeLookup.remap(dt) + if dt1 != dt then + changeInTerms = true + RdfLiteral(literal.lex, RdfLiteral.LiteralKind.Datatype(dt1)) + else literal + + private def handleTripleTerm(triple: RdfTriple): RdfTriple = + val s1 = handleSpoTerm(triple.subject) + val p1 = handleSpoTerm(triple.predicate) + val o1 = handleSpoTerm(triple.`object`) + if s1 != triple.subject || p1 != triple.predicate || o1 != triple.`object` then + changeInTerms = true + RdfTriple(s1, p1, o1) + else triple + + private def handleOptions(options: RdfStreamOptions): Unit = + if checkInputOptions then + // TODO: check if this is the same physical stream type... + JellyOptions.checkCompatibility(options, supportedInputOptions.get) + this.inputUsesPrefixes = options.maxPrefixTableSize > 0 + if inputUsesPrefixes then + prefixLookup.newInputStream(options.maxPrefixTableSize) + nameLookup.newInputStream(options.maxNameTableSize) + datatypeLookup.newInputStream(options.maxDatatypeTableSize) + // Update the input options + inputOptions = options + if !emittedOptions then + emittedOptions = true + rowBuffer.append(RdfStreamRow(outputOptions.copy( + version = Constants.protoVersion + ))) diff --git a/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoTranscoderSpec.scala b/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoTranscoderSpec.scala new file mode 100644 index 0000000..77cc46f --- /dev/null +++ b/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoTranscoderSpec.scala @@ -0,0 +1,67 @@ +package eu.ostrzyciel.jelly.core + +import eu.ostrzyciel.jelly.core.ProtoTestCases.* +import eu.ostrzyciel.jelly.core.helpers.{MockConverterFactory, Mrl} +import eu.ostrzyciel.jelly.core.proto.v1.* +import org.scalatest.Inspectors +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class ProtoTranscoderSpec extends AnyWordSpec, Inspectors, Matchers: + def smallOptions(prefixTableSize: Int) = RdfStreamOptions( + maxNameTableSize = 4, + maxPrefixTableSize = prefixTableSize, + maxDatatypeTableSize = 8, + ) + + "ProtoTranscoder" should { + "splice two identical streams" when { + "input is Triples1" in { + // TODO: more cases + val options: RdfStreamOptions = JellyOptions.smallAllFeatures.withPhysicalType(PhysicalStreamType.TRIPLES) + val input: RdfStreamFrame = Triples1.encodedFull(options, 100).head + val transcoder = ProtoTranscoder.fastMergingTranscoderUnsafe(options) + // First frame should be returned as is + val out1 = transcoder.ingestFrame(input) + out1 shouldBe input + // What's more, the rows should be the exact same objects (except the options) + forAll(input.rows.zip(out1.rows).drop(1)) { case (in, out) => + in eq out shouldBe true // reference equality + } + + val out2 = transcoder.ingestFrame(input) + out2.rows.size shouldBe < (input.rows.size) + // No row in out2 should be an options row or a lookup entry row + forAll(out2.rows) { (row: RdfStreamRow) => + row.row.isOptions shouldBe false + row.row.isPrefix shouldBe false + row.row.isName shouldBe false + row.row.isDatatype shouldBe false + } + + // If there is a row in out2 with same content as in input, it should be the same object + var identicalRows = 0 + forAll(input.rows) { (row: RdfStreamRow) => + val sameRow = out2.rows.find(_.row == row.row) + if (sameRow.isDefined) { + sameRow.get eq row shouldBe true + identicalRows += 1 + } + } + // Something should be identical + identicalRows shouldBe > (0) + + // Decode the output + val decoder = MockConverterFactory.anyStatementDecoder(None) + val statements1 = out1.rows.flatMap(decoder.ingestRow) + val statements2 = out2.rows.flatMap(decoder.ingestRow) + statements1 shouldBe statements2 + } + } + + // TODO: same, with more repetitions + + // TODO: splicing multiple different streams in various orders + + // TODO: exception handling + } \ No newline at end of file diff --git a/core/src/test/scala/eu/ostrzyciel/jelly/core/TranscoderLookupSpec.scala b/core/src/test/scala/eu/ostrzyciel/jelly/core/TranscoderLookupSpec.scala new file mode 100644 index 0000000..c7b50be --- /dev/null +++ b/core/src/test/scala/eu/ostrzyciel/jelly/core/TranscoderLookupSpec.scala @@ -0,0 +1,122 @@ +package eu.ostrzyciel.jelly.core + +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class TranscoderLookupSpec extends AnyWordSpec, Matchers: + import EncoderLookup.LookupEntry + + "TranscoderLookup" should { + "throw an exception when trying to set input lookup size greater than the output" in { + val tl = TranscoderLookup(false, 100) + val ex = intercept[IllegalArgumentException] { + tl.newInputStream(120) + } + ex.getMessage should include ("Input lookup size cannot be greater than the output lookup size") + } + + "remap IDs" when { + "it's a prefix lookup" in { + val tl = TranscoderLookup(false, 120) + tl.newInputStream(100) + tl.addEntry(80, "s80").getId shouldBe 1 + tl.addEntry(81, "s81").getId shouldBe 2 + + tl.remap(80) shouldBe 1 + tl.remap(0) shouldBe 0 + tl.remap(0) shouldBe 0 + tl.remap(81) shouldBe 2 + tl.remap(80) shouldBe 1 + tl.remap(81) shouldBe 2 + tl.remap(0) shouldBe 0 + } + + "it's a name lookup" in { + val tl = TranscoderLookup(true, 100) + tl.newInputStream(100) + tl.addEntry(80, "s80").getId shouldBe 1 + tl.addEntry(81, "s81").getId shouldBe 2 + tl.addEntry(82, "s82").getId shouldBe 3 + tl.addEntry(83, "s83").getId shouldBe 4 + + tl.remap(80) shouldBe 0 + tl.remap(80) shouldBe 1 + tl.remap(80) shouldBe 1 + tl.remap(81) shouldBe 0 + tl.remap(82) shouldBe 0 + tl.remap(82) shouldBe 3 + tl.remap(83) shouldBe 0 + + // and with 0 in the input + tl.remap(80) shouldBe 1 + tl.remap(0) shouldBe 0 + tl.remap(0) shouldBe 0 + tl.remap(80) shouldBe 1 + } + } + + "remap IDs evicting old entries" when { + "it's a prefix lookup" in { + val tl = TranscoderLookup(false, 10) + tl.newInputStream(5) + for i <- 0 to 50 do + tl.addEntry((i % 5) + 1, s"s$i").getId shouldBe (i % 10) + 1 + tl.remap((i % 5) + 1) shouldBe (i % 10) + 1 + } + + "it's a name lookup" in { + val tl = TranscoderLookup(true, 10) + tl.newInputStream(5) + for i <- 0 to 50 do + tl.addEntry((i % 5) + 1, s"s$i").getId shouldBe (i % 10) + 1 + if (i % 10) != 0 || i == 0 then + tl.remap((i % 5) + 1) shouldBe 0 + else + tl.remap((i % 5) + 1) shouldBe (i % 10) + 1 + } + } + + "decode 0-encoding in lookup entries in the input stream" when { + "it's a prefix lookup" in { + val tl = TranscoderLookup(false, 10) + tl.newInputStream(5) + tl.addEntry(0, "s1_1") + tl.addEntry(0, "s2_1") + tl.addEntry(0, "s3_1") + tl.remap(1) shouldBe 1 + + tl.addEntry(1, "s1_2") + tl.remap(1) shouldBe 4 + tl.remap(2) shouldBe 2 + tl.remap(3) shouldBe 3 + tl.remap(0) shouldBe 0 + + // Recover an entry + tl.addEntry(5, "s1_1") + tl.remap(5) shouldBe 1 + tl.remap(0) shouldBe 0 + } + + "it's a name lookup" in { + val tl = TranscoderLookup(true, 10) + tl.newInputStream(5) + tl.addEntry(0, "s1_1") + tl.addEntry(0, "s2_1") + tl.addEntry(0, "s3_1") + tl.remap(1) shouldBe 0 + + tl.addEntry(1, "s1_2") + tl.remap(1) shouldBe 4 + tl.remap(0) shouldBe 2 + tl.remap(0) shouldBe 0 + + // Recover an entry + tl.addEntry(5, "s1_1") + tl.remap(5) shouldBe 1 + tl.remap(2) shouldBe 0 + } + } + + // TODO: cases for isFirstElement + } +