Skip to content

Commit

Permalink
WIP: ProtoTranscoder implementation
Browse files Browse the repository at this point in the history
Issue: #225

Maybe it works?

TODO:

- Lookup tests
- Transcoder tests in core
- Integration tests (extensive)
- Code audit – performance, security
- Scaladoc/javadoc
  • Loading branch information
Ostrzyciel committed Nov 23, 2024
1 parent 8ea2251 commit 2c3d45b
Show file tree
Hide file tree
Showing 7 changed files with 458 additions and 3 deletions.
13 changes: 13 additions & 0 deletions core/src/main/java/eu/ostrzyciel/jelly/core/EncoderLookup.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package eu.ostrzyciel.jelly.core;

import java.util.Arrays;
import java.util.HashMap;

/**
Expand Down Expand Up @@ -158,4 +159,16 @@ public LookupEntry getOrAddEntry(String key) {
entryForReturns.getId = id;
return entryForReturns;
}

public void clear() {
map.clear();
Arrays.fill(table, 0);
tail = 0;
used = 0;
lastSetId = -1000;
if (useSerials) {
Arrays.fill(serials, 0);
serials[0] = -1;
}
}
}
81 changes: 81 additions & 0 deletions core/src/main/java/eu/ostrzyciel/jelly/core/TranscoderLookup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package eu.ostrzyciel.jelly.core;

class TranscoderLookup {
private final int outputSize;
private int[] table;
private final EncoderLookup lookup;

// 0-compression:
// - for prefixes and datatypes: no worries about splicing, because zeroes are not allowed at the start of the
// stream. While splitting, we need to check for zeroes at the start of the stream and remap them.
// - IRI names: remap all 0s forcefully
private final boolean isNameLookup;
private boolean isFirstElement = true;
private int lastSetId = 0;
private int lastInputGetId = 0;
private int lastOutputGetId = 0;

TranscoderLookup(boolean isNameLookup, int outputSize) {
this.isNameLookup = isNameLookup;
this.outputSize = outputSize;
this.lookup = new EncoderLookup(outputSize, false);
}

EncoderLookup.LookupEntry addEntry(int originalId, String value) {
if (originalId == 0) {
originalId = ++lastSetId;
} else {
lastSetId = originalId;
}
EncoderLookup.LookupEntry entry = lookup.getOrAddEntry(value);
table[originalId] = entry.getId;
return entry;
}

int remap(int id) {
if (isNameLookup) {
if (id == 0) {
if (isFirstElement) {
lastInputGetId = id = 1;
} else {
id = ++lastInputGetId;
}
} else {
lastInputGetId = id;
}
isFirstElement = false;
int outputId = table[id];
lookup.onAccess(outputId);
if (outputId == lastOutputGetId + 1) {
lastOutputGetId++;
return 0;
}
lastOutputGetId = outputId;
return outputId;
}
if (id == 0) {
// No need to do onAccess here, because this is the same as the last element
if (isFirstElement) {
// We are starting a new output stream, so we need to remap the first zero.
isFirstElement = false;
return lastInputGetId;
}
return 0;
}
lastInputGetId = id;
id = table[id];
lookup.onAccess(id);
isFirstElement = false;
return id;
}

void newInputStream(int size) {
if (size > outputSize) {
throw new IllegalArgumentException("Input lookup size cannot be greater than the output lookup size");
}
if (table == null || table.length < size + 1) {
table = new int[size + 1];
}
isFirstElement = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package eu.ostrzyciel.jelly.core

import eu.ostrzyciel.jelly.core.proto.v1.*

import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.mutable.ArrayBuffer

object ProtoEncoder:
private val graphEnd = Seq(RdfStreamRow(RdfGraphEnd.defaultInstance))
Expand Down Expand Up @@ -149,12 +149,12 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS
options.maxNameTableSize,
Math.max(Math.min(options.maxNameTableSize, 1024), 256),
)
private var emittedOptions = false
private var emittedOptions: Boolean = false

private val lastSubject: LastNodeHolder[TNode] = new LastNodeHolder()
private val lastPredicate: LastNodeHolder[TNode] = new LastNodeHolder()
private val lastObject: LastNodeHolder[TNode] = new LastNodeHolder()
private var lastGraph: TNode | LastNodeHolder.NoValue.type = LastNodeHolder.NoValue
private var lastGraph: TNode | LastNodeHolder.NoValue.type = LastNodeHolder.NoValue

private def nodeToProtoWrapped(node: TNode, lastNodeHolder: LastNodeHolder[TNode]): SpoTerm =
if node.equals(lastNodeHolder.node) then null
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoTranscoder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package eu.ostrzyciel.jelly.core

import eu.ostrzyciel.jelly.core.JellyOptions
import eu.ostrzyciel.jelly.core.proto.v1.*

object ProtoTranscoder:
def fastMergingTranscoderUnsafe(outputOptions: RdfStreamOptions): ProtoTranscoder =
ProtoTranscoderImpl(None, outputOptions)

def fastMergingTranscoder(
supportedInputOptions: RdfStreamOptions, outputOptions: RdfStreamOptions
): ProtoTranscoder = ProtoTranscoderImpl(Some(supportedInputOptions), outputOptions)

trait ProtoTranscoder:
def ingestRow(row: RdfStreamRow): Iterable[RdfStreamRow]

def ingestFrame(frame: RdfStreamFrame): RdfStreamFrame
155 changes: 155 additions & 0 deletions core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoTranscoderImpl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package eu.ostrzyciel.jelly.core

import eu.ostrzyciel.jelly.core.JellyExceptions.RdfProtoDeserializationError
import eu.ostrzyciel.jelly.core.proto.v1.*

import scala.annotation.switch
import scala.collection.mutable.ArrayBuffer

// Note: the caller is responsible for setting a physical stream type in the output!

private final class ProtoTranscoderImpl(
supportedInputOptions: Option[RdfStreamOptions],
outputOptions: RdfStreamOptions
) extends ProtoTranscoder:
override def ingestRow(row: RdfStreamRow): Iterable[RdfStreamRow] =
rowBuffer.clear()
processRow(row)
rowBuffer.toSeq

override def ingestFrame(frame: RdfStreamFrame): RdfStreamFrame =
rowBuffer.clear()
for row <- frame.rows do
processRow(row)
val newFrame = RdfStreamFrame(rowBuffer.toSeq)
newFrame

// Transcoder state
private val checkInputOptions = supportedInputOptions.isDefined
private val prefixLookup: TranscoderLookup = new TranscoderLookup(false, outputOptions.maxPrefixTableSize)
private val nameLookup: TranscoderLookup = new TranscoderLookup(true, outputOptions.maxNameTableSize)
private val datatypeLookup: TranscoderLookup = new TranscoderLookup(false, outputOptions.maxDatatypeTableSize)

// Current input stream state
private var inputOptions: RdfStreamOptions = null
private var inputUsesPrefixes = false

// Current output stream state
private val rowBuffer = new ArrayBuffer[RdfStreamRow](128)
private var changeInTerms = false
private var emittedOptions = false

private def processRow(row: RdfStreamRow): Unit =
val r = row.row
if r == null then
throw new RdfProtoDeserializationError("Row kind is not set.")
(r.streamRowValueNumber: @switch) match
case RdfStreamRow.OPTIONS_FIELD_NUMBER => handleOptions(r.options)
case RdfStreamRow.TRIPLE_FIELD_NUMBER => handleTriple(row)
case RdfStreamRow.QUAD_FIELD_NUMBER => handleQuad(row)
case RdfStreamRow.GRAPH_START_FIELD_NUMBER =>
case RdfStreamRow.GRAPH_END_FIELD_NUMBER => rowBuffer.append(row)
case RdfStreamRow.NAME_FIELD_NUMBER =>
val name = r.name
val entry = nameLookup.addEntry(name.id, name.value)
if !entry.newEntry then ()
else if entry.setId == name.id then rowBuffer.append(row)
else rowBuffer.append(RdfStreamRow(RdfNameEntry(entry.setId, name.value)))
case RdfStreamRow.PREFIX_FIELD_NUMBER =>
val prefix = r.prefix
val entry = prefixLookup.addEntry(prefix.id, prefix.value)
if !entry.newEntry then ()
else if entry.setId == prefix.id then rowBuffer.append(row)
else rowBuffer.append(RdfStreamRow(RdfPrefixEntry(entry.setId, prefix.value)))
case RdfStreamRow.DATATYPE_FIELD_NUMBER =>
val datatype = r.datatype
val entry = datatypeLookup.addEntry(datatype.id, datatype.value)
if !entry.newEntry then ()
else if entry.setId == datatype.id then rowBuffer.append(row)
else rowBuffer.append(RdfStreamRow(RdfDatatypeEntry(entry.setId, datatype.value)))
case _ =>
// This case should never happen
throw new RdfProtoDeserializationError("Row kind is not set.")

private def handleTriple(row: RdfStreamRow): Unit =
this.changeInTerms = false
val triple = row.row.triple
val s1 = handleSpoTerm(triple.subject)
val p1 = handleSpoTerm(triple.predicate)
val o1 = handleSpoTerm(triple.`object`)
if changeInTerms then
rowBuffer.append(RdfStreamRow(RdfTriple(s1, p1, o1)))
else rowBuffer.append(row)

private def handleQuad(row: RdfStreamRow): Unit =
this.changeInTerms = false
val quad = row.row.quad
val s1 = handleSpoTerm(quad.subject)
val p1 = handleSpoTerm(quad.predicate)
val o1 = handleSpoTerm(quad.`object`)
val g1 = handleGraphTerm(quad.graph)
if changeInTerms then
rowBuffer.append(RdfStreamRow(RdfQuad(s1, p1, o1, g1)))
else rowBuffer.append(row)

private def handleSpoTerm(term: SpoTerm): SpoTerm =
if term == null then null
else if term.isIri then handleIri(term.iri)
else if term.isBnode then term
else if term.isLiteral then handleLiteral(term.literal)
else if term.isTripleTerm then handleTripleTerm(term.tripleTerm)
else throw new RdfProtoDeserializationError("Unknown term type.")

private def handleGraphTerm(term: GraphTerm): GraphTerm =
if term == null then null
else if term.isIri then handleIri(term.iri)
else if term.isDefaultGraph then term
else if term.isBnode then term
else if term.isLiteral then handleLiteral(term.literal)
else throw new RdfProtoDeserializationError("Unknown term type.")

private def handleIri(iri: RdfIri): RdfIri =
val prefix = iri.prefixId
val name = iri.nameId
val prefix1 = if inputUsesPrefixes then prefixLookup.remap(prefix) else 0
val name1 = nameLookup.remap(name)
if prefix1 != prefix || name1 != name then
changeInTerms = true
RdfIri(prefix1, name1)
else iri

private def handleLiteral(literal: RdfLiteral): RdfLiteral =
if !literal.literalKind.isDatatype then literal
else
val dt = literal.literalKind.datatype
val dt1 = datatypeLookup.remap(dt)
if dt1 != dt then
changeInTerms = true
RdfLiteral(literal.lex, RdfLiteral.LiteralKind.Datatype(dt1))
else literal

private def handleTripleTerm(triple: RdfTriple): RdfTriple =
val s1 = handleSpoTerm(triple.subject)
val p1 = handleSpoTerm(triple.predicate)
val o1 = handleSpoTerm(triple.`object`)
if s1 != triple.subject || p1 != triple.predicate || o1 != triple.`object` then
changeInTerms = true
RdfTriple(s1, p1, o1)
else triple

private def handleOptions(options: RdfStreamOptions): Unit =
if checkInputOptions then
// TODO: check if this is the same physical stream type...
JellyOptions.checkCompatibility(options, supportedInputOptions.get)
this.inputUsesPrefixes = options.maxPrefixTableSize > 0
if inputUsesPrefixes then
prefixLookup.newInputStream(options.maxPrefixTableSize)
nameLookup.newInputStream(options.maxNameTableSize)
datatypeLookup.newInputStream(options.maxDatatypeTableSize)
// Update the input options
inputOptions = options
if !emittedOptions then
emittedOptions = true
rowBuffer.append(RdfStreamRow(outputOptions.copy(
version = Constants.protoVersion
)))
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package eu.ostrzyciel.jelly.core

import eu.ostrzyciel.jelly.core.ProtoTestCases.*
import eu.ostrzyciel.jelly.core.helpers.{MockConverterFactory, Mrl}
import eu.ostrzyciel.jelly.core.proto.v1.*
import org.scalatest.Inspectors
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class ProtoTranscoderSpec extends AnyWordSpec, Inspectors, Matchers:
def smallOptions(prefixTableSize: Int) = RdfStreamOptions(
maxNameTableSize = 4,
maxPrefixTableSize = prefixTableSize,
maxDatatypeTableSize = 8,
)

"ProtoTranscoder" should {
"splice two identical streams" when {
"input is Triples1" in {
// TODO: more cases
val options: RdfStreamOptions = JellyOptions.smallAllFeatures.withPhysicalType(PhysicalStreamType.TRIPLES)
val input: RdfStreamFrame = Triples1.encodedFull(options, 100).head
val transcoder = ProtoTranscoder.fastMergingTranscoderUnsafe(options)
// First frame should be returned as is
val out1 = transcoder.ingestFrame(input)
out1 shouldBe input
// What's more, the rows should be the exact same objects (except the options)
forAll(input.rows.zip(out1.rows).drop(1)) { case (in, out) =>
in eq out shouldBe true // reference equality
}

val out2 = transcoder.ingestFrame(input)
out2.rows.size shouldBe < (input.rows.size)
// No row in out2 should be an options row or a lookup entry row
forAll(out2.rows) { (row: RdfStreamRow) =>
row.row.isOptions shouldBe false
row.row.isPrefix shouldBe false
row.row.isName shouldBe false
row.row.isDatatype shouldBe false
}

// If there is a row in out2 with same content as in input, it should be the same object
var identicalRows = 0
forAll(input.rows) { (row: RdfStreamRow) =>
val sameRow = out2.rows.find(_.row == row.row)
if (sameRow.isDefined) {
sameRow.get eq row shouldBe true
identicalRows += 1
}
}
// Something should be identical
identicalRows shouldBe > (0)

// Decode the output
val decoder = MockConverterFactory.anyStatementDecoder(None)
val statements1 = out1.rows.flatMap(decoder.ingestRow)
val statements2 = out2.rows.flatMap(decoder.ingestRow)
statements1 shouldBe statements2
}
}

// TODO: same, with more repetitions

// TODO: splicing multiple different streams in various orders

// TODO: exception handling
}
Loading

0 comments on commit 2c3d45b

Please sign in to comment.