Skip to content

Commit

Permalink
Experiment: unwrap RdfStreamRow contents (Jelly-RDF#198)
Browse files Browse the repository at this point in the history
Unwrap RdfStreamRow contents

This is the same story as with Jelly-RDF#118, just a bit more straightforward.

This will save us one allocation per stream row in serialization AND deserialization, and also reduce the overhead of going through the ugly if-else chains. Now the JVM should use the tableswitch instruction which is way, way faster.

Let's see if it helps. If not, I will revert the whole thing. If yes, I will complete the docstrings.
  • Loading branch information
Ostrzyciel authored Oct 23, 2024
1 parent 4e4edd6 commit f25c778
Show file tree
Hide file tree
Showing 16 changed files with 362 additions and 89 deletions.
47 changes: 26 additions & 21 deletions core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoderImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eu.ostrzyciel.jelly.core

import eu.ostrzyciel.jelly.core.proto.v1.*

import scala.annotation.switch
import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag

Expand Down Expand Up @@ -117,24 +118,29 @@ sealed abstract class ProtoDecoderImpl[TNode, TDatatype : ClassTag, +TTriple, +T
)

final override def ingestRow(row: RdfStreamRow): Option[TOut] =
row.row match
case RdfStreamRow.Row.Options(opts) =>
handleOptions(opts)
val r = row.row
if r == null then
throw new RdfProtoDeserializationError("Row kind is not set.")
(r.streamRowValueNumber : @switch) match
case RdfStreamRow.OPTIONS_FIELD_NUMBER =>
handleOptions(r.options)
None
case RdfStreamRow.Row.Name(nameRow) =>
nameDecoder.updateNames(nameRow)
case RdfStreamRow.TRIPLE_FIELD_NUMBER => handleTriple(r.triple)
case RdfStreamRow.QUAD_FIELD_NUMBER => handleQuad(r.quad)
case RdfStreamRow.GRAPH_START_FIELD_NUMBER => handleGraphStart(r.graphStart)
case RdfStreamRow.GRAPH_END_FIELD_NUMBER => handleGraphEnd()
case RdfStreamRow.NAME_FIELD_NUMBER =>
nameDecoder.updateNames(r.name)
None
case RdfStreamRow.Row.Prefix(prefixRow) =>
nameDecoder.updatePrefixes(prefixRow)
case RdfStreamRow.PREFIX_FIELD_NUMBER =>
nameDecoder.updatePrefixes(r.prefix)
None
case RdfStreamRow.Row.Datatype(dtRow) =>
case RdfStreamRow.DATATYPE_FIELD_NUMBER =>
val dtRow = r.datatype
dtLookup.update(dtRow.id, converter.makeDatatype(dtRow.value))
None
case RdfStreamRow.Row.Triple(triple) => handleTriple(triple)
case RdfStreamRow.Row.Quad(quad) => handleQuad(quad)
case RdfStreamRow.Row.GraphStart(graph) => handleGraphStart(graph)
case RdfStreamRow.Row.GraphEnd(_) => handleGraphEnd()
case RdfStreamRow.Row.Empty =>
case _ =>
// This case should never happen
throw new RdfProtoDeserializationError("Row kind is not set.")

protected def handleOptions(opts: RdfStreamOptions): Unit =
Expand Down Expand Up @@ -289,14 +295,13 @@ object ProtoDecoderImpl:
inner.flatMap(_.getStreamOpt)

override def ingestRow(row: RdfStreamRow): Option[TTriple | TQuad] =
row.row match
case RdfStreamRow.Row.Options(opts) =>
handleOptions(opts)
inner.get.ingestRow(row)
case _ =>
if inner.isEmpty then
throw new RdfProtoDeserializationError("Stream options are not set.")
inner.get.ingestRow(row)
if row.row.isOptions then
handleOptions(row.row.options)
inner.get.ingestRow(row)
else
if inner.isEmpty then
throw new RdfProtoDeserializationError("Stream options are not set.")
inner.get.ingestRow(row)

private def handleOptions(opts: RdfStreamOptions): Unit =
// Reset the logical type to UNSPECIFIED to ignore checking if it's supported by the inner decoder
Expand Down
24 changes: 7 additions & 17 deletions core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoEncoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import eu.ostrzyciel.jelly.core.proto.v1.*
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

object ProtoEncoder:
private val graphEnd = Seq(RdfStreamRow(RdfStreamRow.Row.GraphEnd(RdfGraphEnd.defaultInstance)))
private val defaultGraphStart = RdfStreamRow(RdfStreamRow.Row.GraphStart(
RdfGraphStart(RdfDefaultGraph.defaultInstance)
))
private val graphEnd = Seq(RdfStreamRow(RdfGraphEnd.defaultInstance))
private val defaultGraphStart = RdfStreamRow(RdfGraphStart(RdfDefaultGraph.defaultInstance))

/**
* Stateful encoder of a protobuf RDF stream.
Expand All @@ -30,9 +28,7 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS
*/
final def addTripleStatement(triple: TTriple): Iterable[RdfStreamRow] =
handleHeader()
val mainRow = RdfStreamRow(RdfStreamRow.Row.Triple(
tripleToProto(triple)
))
val mainRow = RdfStreamRow(tripleToProto(triple))
extraRowsBuffer.append(mainRow).toSeq

/**
Expand All @@ -42,9 +38,7 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS
*/
final def addQuadStatement(quad: TQuad): Iterable[RdfStreamRow] =
handleHeader()
val mainRow = RdfStreamRow(RdfStreamRow.Row.Quad(
quadToProto(quad)
))
val mainRow = RdfStreamRow(quadToProto(quad))
extraRowsBuffer.append(mainRow).toSeq

/**
Expand All @@ -59,9 +53,7 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS
else
handleHeader()
val graphNode = graphNodeToProto(graph)
val mainRow = RdfStreamRow(RdfStreamRow.Row.GraphStart(
RdfGraphStart(graphNode)
))
val mainRow = RdfStreamRow(RdfGraphStart(graphNode))
extraRowsBuffer.append(mainRow).toSeq

/**
Expand Down Expand Up @@ -203,11 +195,9 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS

private def emitOptions(): Unit =
emittedOptions = true
extraRowsBuffer.append(
RdfStreamRow(RdfStreamRow.Row.Options(
extraRowsBuffer.append(RdfStreamRow(
// Override whatever the user set in the options.
options.withVersion(Constants.protoVersion)
))
)
))


Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ package eu.ostrzyciel.jelly.core.proto.v1
* optimizations to make this as fast as possible.
*/
@SerialVersionUID(0L)
final case class RdfGraphStart(graph: GraphTerm = null) extends scalapb.GeneratedMessage {
final case class RdfGraphStart(graph: GraphTerm = null) extends scalapb.GeneratedMessage, RdfStreamRowValue {
@transient
private[this] var __serializedSizeMemoized: _root_.scala.Int = 0

Expand Down Expand Up @@ -60,6 +60,12 @@ final case class RdfGraphStart(graph: GraphTerm = null) extends scalapb.Generate
def toProtoString: _root_.scala.Predef.String = _root_.scalapb.TextFormat.printToUnicodeString(this)

def companion: eu.ostrzyciel.jelly.core.proto.v1.RdfGraphStart.type = eu.ostrzyciel.jelly.core.proto.v1.RdfGraphStart

override def streamRowValueNumber: Int = 4

override def isGraphStart: Boolean = true

override def graphStart: RdfGraphStart = this
}

object RdfGraphStart extends scalapb.GeneratedMessageCompanion[eu.ostrzyciel.jelly.core.proto.v1.RdfGraphStart] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ package eu.ostrzyciel.jelly.core.proto.v1
* optimizations to make this as fast as possible.
*/
@SerialVersionUID(0L)
final case class RdfQuad(subject: SpoTerm = null, predicate: SpoTerm = null, `object`: SpoTerm = null, graph: GraphTerm = null) extends scalapb.GeneratedMessage {
final case class RdfQuad(subject: SpoTerm = null, predicate: SpoTerm = null, `object`: SpoTerm = null, graph: GraphTerm = null)
extends scalapb.GeneratedMessage, RdfStreamRowValue {
@transient private[this] var __serializedSizeMemoized: _root_.scala.Int = 0

private[this] def __computeSerializedSize(): _root_.scala.Int = {
Expand Down Expand Up @@ -120,6 +121,12 @@ final case class RdfQuad(subject: SpoTerm = null, predicate: SpoTerm = null, `ob
def toProtoString: _root_.scala.Predef.String = _root_.scalapb.TextFormat.printToUnicodeString(this)

def companion: eu.ostrzyciel.jelly.core.proto.v1.RdfQuad.type = eu.ostrzyciel.jelly.core.proto.v1.RdfQuad

override def streamRowValueNumber: Int = 3

override def isQuad: Boolean = true

override def quad: RdfQuad = this
}

object RdfQuad extends scalapb.GeneratedMessageCompanion[eu.ostrzyciel.jelly.core.proto.v1.RdfQuad] {
Expand Down Expand Up @@ -243,7 +250,7 @@ object RdfQuad extends scalapb.GeneratedMessageCompanion[eu.ostrzyciel.jelly.cor

def enumCompanionForFieldNumber(__fieldNumber: _root_.scala.Int): _root_.scalapb.GeneratedEnumCompanion[_] = throw new MatchError(__fieldNumber)

lazy val defaultInstance = eu.ostrzyciel.jelly.core.proto.v1.RdfQuad(subject = null, predicate = null, `object` = null, graph = null)
val defaultInstance: RdfQuad = eu.ostrzyciel.jelly.core.proto.v1.RdfQuad(subject = null, predicate = null, `object` = null, graph = null)

def of(subject: SpoTerm, predicate: SpoTerm, `object`: SpoTerm, graph: GraphTerm): _root_.eu.ostrzyciel.jelly.core.proto.v1.RdfQuad = _root_.eu.ostrzyciel.jelly.core.proto.v1.RdfQuad(subject, predicate, `object`, graph)
}
Loading

0 comments on commit f25c778

Please sign in to comment.