Skip to content

Commit

Permalink
Implement RDF-STaX in Jelly (#53)
Browse files Browse the repository at this point in the history
Issue #51, #52

Implemented:

- Physical & logical stream types in Jelly, where logical are directly
from RDF-STaX
  - Logical types are implemented as an enum
- Type numbers were picked such that the most common types (1, 2, 3, 4)
are low, making the varint in the protobuf output smaller.
- Type numbers signify taxonomical relations in RDF-STaX. 14 is a child
of 4, and 114 is a child of 14. This makes the system forward compatible
– current consumers will be able to interpret future subtypes as their
parent types.
- Support for logical types in streaming encoders/decoders
- Automatic checks for logical type compatibility and logical-physical
consistency (if a given stream is possible, can be consumed, etc.)
- Emitting RDF-based annotations (metadata) about the logical type of
the stream, using the RDF-STaX ontology. This uses the existing
facilities of decoder converters.
  • Loading branch information
Ostrzyciel authored Apr 24, 2024
1 parent b232d84 commit 08fc666
Show file tree
Hide file tree
Showing 31 changed files with 1,087 additions and 408 deletions.
2 changes: 1 addition & 1 deletion core/src/main/protobuf_shared
Submodule protobuf_shared updated 2 files
+3 −0 grpc.proto
+50 −7 rdf.proto
8 changes: 8 additions & 0 deletions core/src/main/scala/eu/ostrzyciel/jelly/core/Constants.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package eu.ostrzyciel.jelly.core

object Constants:
val jellyName = "Jelly"
val jellyFileExtension = "jelly"
val jellyContentType = "application/x-jelly-rdf"
val protoVersion = 1
val protoSemanticVersion = "1.0.0"
24 changes: 14 additions & 10 deletions core/src/main/scala/eu/ostrzyciel/jelly/core/ConverterFactory.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package eu.ostrzyciel.jelly.core

import ProtoDecoderImpl.*
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions
import eu.ostrzyciel.jelly.core.proto.v1.{LogicalStreamType, RdfStreamOptions}

import scala.reflect.ClassTag

Expand All @@ -24,35 +24,39 @@ trait ConverterFactory[
+TDecConv <: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad],
TNode, TDatatype : ClassTag, TTriple, TQuad
]:
protected def decoderConverter: TDecConv
def decoderConverter: TDecConv

/**
* Create a new [[TriplesDecoder]].
* @return
*/
final def triplesDecoder: TriplesDecoder[TNode, TDatatype, TTriple, TQuad] =
new TriplesDecoder(decoderConverter)
final def triplesDecoder(expLogicalType: Option[LogicalStreamType]):
TriplesDecoder[TNode, TDatatype, TTriple, TQuad] =
new TriplesDecoder(decoderConverter, expLogicalType)

/**
* Create a new [[QuadsDecoder]].
* @return
*/
final def quadsDecoder: QuadsDecoder[TNode, TDatatype, TTriple, TQuad] =
new QuadsDecoder(decoderConverter)
final def quadsDecoder(expLogicalType: Option[LogicalStreamType]):
QuadsDecoder[TNode, TDatatype, TTriple, TQuad] =
new QuadsDecoder(decoderConverter, expLogicalType)

/**
* Create a new [[GraphsAsQuadsDecoder]].
* @return
*/
final def graphsAsQuadsDecoder: GraphsAsQuadsDecoder[TNode, TDatatype, TTriple, TQuad] =
new GraphsAsQuadsDecoder(decoderConverter)
final def graphsAsQuadsDecoder(expLogicalType: Option[LogicalStreamType]):
GraphsAsQuadsDecoder[TNode, TDatatype, TTriple, TQuad] =
new GraphsAsQuadsDecoder(decoderConverter, expLogicalType)

/**
* Create a new [[GraphsDecoder]].
* @return
*/
final def graphsDecoder: GraphsDecoder[TNode, TDatatype, TTriple, TQuad] =
new GraphsDecoder(decoderConverter)
final def graphsDecoder(expLogicalType: Option[LogicalStreamType]):
GraphsDecoder[TNode, TDatatype, TTriple, TQuad] =
new GraphsDecoder(decoderConverter, expLogicalType)

/**
* Create a new [[AnyStatementDecoder]].
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package eu.ostrzyciel.jelly
package eu.ostrzyciel.jelly.core

package object core:
private trait JellyExceptions:
sealed class RdfProtoDeserializationError(msg: String) extends Error(msg)

final class MissingPrefixEntryError(val prefixId: Int) extends RdfProtoDeserializationError(
s"Missing entry in prefix table at ID: $prefixId"
)

final class MissingNameEntryError(val nameId: Int) extends RdfProtoDeserializationError(
s"Missing entry in name table at ID: $nameId"
)

final class RdfProtoSerializationError(msg: String) extends Error(msg)

private object JellyExceptions extends JellyExceptions

// Constants
object Constants:
val jellyName = "Jelly"
val jellyFileExtension = "jelly"
val jellyContentType = "application/x-jelly-rdf"
val protoVersion = 1
val protoSemanticVersion = "1.0.0"
export JellyExceptions.*
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions

/**
* A collection of convenient streaming option presets.
* None of the presets specifies the stream type – do that with the .withStreamType method.
* None of the presets specifies the stream type – do that with the .withPhysicalType method.
*/
object JellyOptions:

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package eu.ostrzyciel.jelly.core

import eu.ostrzyciel.jelly.core.proto.v1.LogicalStreamType

import java.util.UUID

private trait LogicalStreamTypeExtensions:
val staxPrefix = "https://w3id.org/stax/ontology#"

extension (logicalType: LogicalStreamType)
/**
* Converts the logical stream type to its base concrete stream type in RDF-STaX.
* For example, [[LogicalStreamType.TIMESTAMPED_NAMED_GRAPHS]] will be converted to [[LogicalStreamType.DATASETS]].
* UNSPECIFIED values will be left as-is.
*
* @return base stream type
*/
def toBaseType: LogicalStreamType =
LogicalStreamType.fromValue(logicalType.value % 10)

/**
* Checks if the logical stream type is equal to or a subtype of the other logical stream type.
* For example, [[LogicalStreamType.TIMESTAMPED_NAMED_GRAPHS]] is a subtype of [[LogicalStreamType.DATASETS]].
*
* @param other the other logical stream type
* @return true if the logical stream type is equal to or a subtype of the other logical stream type
*/
def isEqualOrSubtypeOf(other: LogicalStreamType): Boolean =
logicalType == other || logicalType.value.toString.endsWith(other.value.toString)

/**
* Returns the IRI of the RDF-STaX stream type individual for the logical stream type.
* If the logical stream type is not supported or is not specified, None is returned.
*
* @return the IRI of the RDF-STaX stream type individual
*/
def getRdfStaxType: Option[String] =
logicalType match
case LogicalStreamType.FLAT_TRIPLES => Some(s"${staxPrefix}flatTripleStream")
case LogicalStreamType.FLAT_QUADS => Some(s"${staxPrefix}flatQuadStream")
case LogicalStreamType.GRAPHS => Some(s"${staxPrefix}graphStream")
case LogicalStreamType.SUBJECT_GRAPHS => Some(s"${staxPrefix}subjectGraphStream")
case LogicalStreamType.DATASETS => Some(s"${staxPrefix}datasetStream")
case LogicalStreamType.NAMED_GRAPHS => Some(s"${staxPrefix}namedGraphStream")
case LogicalStreamType.TIMESTAMPED_NAMED_GRAPHS => Some(s"${staxPrefix}timestampedNamedGraphStream")
case _ => None

/**
* Returns an RDF-STaX annotation for the logical stream type, in RDF. The annotation simply states that
* <subjectNode> has a stream type usage, and that stream type usage has this stream type.
*
* Example in Turtle for a flat triple stream:
* <subjectNode> stax:hasStreamTypeUsage [
* a stax:RdfStreamTypeUsage ;
* stax:hasStreamType stax:flatTripleStream
* ] .
*
* @param subjectNode the subject node to annotate
* @param converterFactory the converter factory to use for creating RDF nodes and triples
* @tparam TNode the type of RDF nodes
* @tparam TTriple the type of RDF triples
* @throws IllegalArgumentException if the logical stream type is not supported
* @return the RDF-STaX annotation
*/
def getRdfStaxAnnotation[TNode, TTriple](subjectNode: TNode)
(using converterFactory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): Seq[TTriple] =
getRdfStaxType match
case Some(typeIri) =>
val converter = converterFactory.decoderConverter
val bNode = converter.makeBlankNode(UUID.randomUUID().toString)
Seq(
converter.makeTriple(
subjectNode,
converter.makeIriNode(s"${staxPrefix}hasStreamTypeUsage"),
bNode
),
converter.makeTriple(
bNode,
converter.makeIriNode("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
converter.makeIriNode(s"${staxPrefix}RdfStreamTypeUsage")
),
converter.makeTriple(
bNode,
converter.makeIriNode(s"${staxPrefix}hasStreamType"),
converter.makeIriNode(typeIri)
)
)
case None => throw new IllegalArgumentException(s"Unsupported logical stream type: $logicalType")

private object LogicalStreamTypeExtensions extends LogicalStreamTypeExtensions

export LogicalStreamTypeExtensions.*
41 changes: 40 additions & 1 deletion core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoder.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package eu.ostrzyciel.jelly.core

import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamOptions, RdfStreamRow}
import eu.ostrzyciel.jelly.core.proto.v1.*

/**
* Base extendable trait for decoders of protobuf RDF streams.
Expand All @@ -22,3 +22,42 @@ trait ProtoDecoder[+TOut]:
protected final def checkVersion(options: RdfStreamOptions): Unit =
if options.version > Constants.protoVersion then
throw new RdfProtoDeserializationError(s"Unsupported proto version: ${options.version}")

/**
* Checks if the logical and physical stream types are compatible. Additionally, if the expected logical stream type
* is provided, checks if the actual logical stream type is a subtype of the expected one.
* @param options Options of the stream.
* @param expLogicalType Expected logical stream type.
*/
protected final def checkLogicalStreamType(options: RdfStreamOptions, expLogicalType: Option[LogicalStreamType]):
Unit =
val baseLogicalType = options.logicalType.toBaseType

val conflict = baseLogicalType match
case LogicalStreamType.UNSPECIFIED => false
case LogicalStreamType.FLAT_TRIPLES => options.physicalType match
case PhysicalStreamType.QUADS => true
case PhysicalStreamType.GRAPHS => true
case _ => false
case LogicalStreamType.FLAT_QUADS => options.physicalType match
case PhysicalStreamType.TRIPLES => true
case _ => false
case LogicalStreamType.GRAPHS => options.physicalType match
case PhysicalStreamType.QUADS => true
case PhysicalStreamType.GRAPHS => true
case _ => false
case LogicalStreamType.DATASETS => options.physicalType match
case PhysicalStreamType.TRIPLES => true
case _ => false
case _ => false

if conflict then
throw new RdfProtoDeserializationError(s"Logical stream type $baseLogicalType is incompatible with " +
s"physical stream type ${options.physicalType}.")

expLogicalType match
case Some(v) =>
if !options.logicalType.isEqualOrSubtypeOf(v) then
throw new RdfProtoDeserializationError(s"Expected logical stream type $v, got ${options.logicalType}. " +
s"${options.logicalType} is not a subtype of $v.")
case None =>
44 changes: 23 additions & 21 deletions core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoderImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import scala.reflect.ClassTag
* See the base (extendable) trait: [[ProtoDecoder]].
*/
sealed abstract class ProtoDecoderImpl[TNode, TDatatype : ClassTag, +TTriple, +TQuad, +TOut]
(converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad])
(converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad], expLogicalType: Option[LogicalStreamType])
extends ProtoDecoder[TOut]:

private var streamOpt: Option[RdfStreamOptions] = None
Expand Down Expand Up @@ -137,6 +137,7 @@ sealed abstract class ProtoDecoderImpl[TNode, TDatatype : ClassTag, +TTriple, +T

protected def handleOptions(opts: RdfStreamOptions): Unit =
checkVersion(opts)
checkLogicalStreamType(opts, expLogicalType)
setStreamOpt(opts)

protected def handleTriple(triple: RdfTriple): Option[TOut] =
Expand All @@ -161,11 +162,11 @@ object ProtoDecoderImpl:
* A decoder that reads TRIPLES streams and outputs a sequence of triples.
*/
final class TriplesDecoder[TNode, TDatatype : ClassTag, TTriple, TQuad]
(converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad])
extends ProtoDecoderImpl[TNode, TDatatype, TTriple, TQuad, TTriple](converter):
(converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad], expLogicalType: Option[LogicalStreamType])
extends ProtoDecoderImpl[TNode, TDatatype, TTriple, TQuad, TTriple](converter, expLogicalType):

override protected def handleOptions(opts: RdfStreamOptions): Unit =
if !opts.streamType.isTriples then
if !opts.physicalType.isTriples then
throw new RdfProtoDeserializationError("Incoming stream type is not TRIPLES.")
super.handleOptions(opts)

Expand All @@ -176,11 +177,11 @@ object ProtoDecoderImpl:
* A decoder that reads QUADS streams and outputs a sequence of quads.
*/
final class QuadsDecoder[TNode, TDatatype : ClassTag, TTriple, TQuad]
(converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad])
extends ProtoDecoderImpl[TNode, TDatatype, TTriple, TQuad, TQuad](converter):
(converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad], expLogicalType: Option[LogicalStreamType])
extends ProtoDecoderImpl[TNode, TDatatype, TTriple, TQuad, TQuad](converter, expLogicalType):

override protected def handleOptions(opts: RdfStreamOptions): Unit =
if !opts.streamType.isQuads then
if !opts.physicalType.isQuads then
throw new RdfProtoDeserializationError("Incoming stream type is not QUADS.")
super.handleOptions(opts)

Expand All @@ -191,12 +192,12 @@ object ProtoDecoderImpl:
* A decoder that reads GRAPHS streams and outputs a flat sequence of quads.
*/
final class GraphsAsQuadsDecoder[TNode, TDatatype : ClassTag, TTriple, TQuad]
(converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad])
extends ProtoDecoderImpl[TNode, TDatatype, TTriple, TQuad, TQuad](converter):
(converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad], expLogicalType: Option[LogicalStreamType])
extends ProtoDecoderImpl[TNode, TDatatype, TTriple, TQuad, TQuad](converter, expLogicalType):
private var currentGraph: Option[TNode] = None

override protected def handleOptions(opts: RdfStreamOptions): Unit =
if !opts.streamType.isGraphs then
if !opts.physicalType.isGraphs then
throw new RdfProtoDeserializationError("Incoming stream type is not GRAPHS.")
super.handleOptions(opts)

Expand All @@ -223,13 +224,13 @@ object ProtoDecoderImpl:
* Each graph is emitted as soon as the producer signals that it's complete.
*/
final class GraphsDecoder[TNode, TDatatype : ClassTag, TTriple, TQuad]
(converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad])
extends ProtoDecoderImpl[TNode, TDatatype, TTriple, TQuad, (TNode, Iterable[TTriple])](converter):
(converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad], expLogicalType: Option[LogicalStreamType])
extends ProtoDecoderImpl[TNode, TDatatype, TTriple, TQuad, (TNode, Iterable[TTriple])](converter, expLogicalType):
private var currentGraph: Option[TNode] = None
private var buffer: ListBuffer[TTriple] = new ListBuffer[TTriple]()

override protected def handleOptions(opts: RdfStreamOptions): Unit =
if !opts.streamType.isGraphs then
if !opts.physicalType.isGraphs then
throw new RdfProtoDeserializationError("Incoming stream type is not GRAPHS.")
super.handleOptions(opts)

Expand Down Expand Up @@ -283,17 +284,18 @@ object ProtoDecoderImpl:

private def handleOptions(opts: RdfStreamOptions): Unit =
checkVersion(opts)
checkLogicalStreamType(opts, None)
if inner.isDefined then
throw new RdfProtoDeserializationError("Stream options are already set." +
"The type of the stream cannot be inferred.")
val dec = opts.streamType match
case RdfStreamType.TRIPLES =>
new TriplesDecoder[TNode, TDatatype, TTriple, TQuad](converter)
case RdfStreamType.QUADS =>
new QuadsDecoder[TNode, TDatatype, TTriple, TQuad](converter)
case RdfStreamType.GRAPHS =>
new GraphsAsQuadsDecoder[TNode, TDatatype, TTriple, TQuad](converter)
case RdfStreamType.UNSPECIFIED =>
val dec = opts.physicalType match
case PhysicalStreamType.TRIPLES =>
new TriplesDecoder[TNode, TDatatype, TTriple, TQuad](converter, None)
case PhysicalStreamType.QUADS =>
new QuadsDecoder[TNode, TDatatype, TTriple, TQuad](converter, None)
case PhysicalStreamType.GRAPHS =>
new GraphsAsQuadsDecoder[TNode, TDatatype, TTriple, TQuad](converter, None)
case PhysicalStreamType.UNSPECIFIED =>
throw new RdfProtoDeserializationError("Incoming stream type is not set.")
case _ =>
throw new RdfProtoDeserializationError("Incoming stream type is not recognized.")
Expand Down
Loading

0 comments on commit 08fc666

Please sign in to comment.