diff --git a/core/src/main/protobuf_shared b/core/src/main/protobuf_shared index 824931de..8022d91f 160000 --- a/core/src/main/protobuf_shared +++ b/core/src/main/protobuf_shared @@ -1 +1 @@ -Subproject commit 824931de973b4179417d31152fba7d70283c3ca2 +Subproject commit 8022d91ffd314550cd7ca126876ae856efe60498 diff --git a/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoder.scala b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoder.scala index 251d0fc8..94c7c8b7 100644 --- a/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoder.scala +++ b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoder.scala @@ -13,3 +13,12 @@ trait ProtoDecoder[+TOut]: def getStreamOpt: Option[RdfStreamOptions] def ingestRow(row: RdfStreamRow): Option[TOut] + + /** + * Checks if the version of the stream is supported. + * Throws an exception if not. + * @param options Options of the stream. + */ + protected final def checkVersion(options: RdfStreamOptions): Unit = + if options.version > Constants.protoVersion then + throw new RdfProtoDeserializationError(s"Unsupported proto version: ${options.version}") diff --git a/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoderImpl.scala b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoderImpl.scala index d4b6fbe0..5db9063a 100644 --- a/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoderImpl.scala +++ b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoderImpl.scala @@ -136,6 +136,7 @@ sealed abstract class ProtoDecoderImpl[TNode, TDatatype : ClassTag, +TTriple, +T throw new RdfProtoDeserializationError("Row kind is not set.") protected def handleOptions(opts: RdfStreamOptions): Unit = + checkVersion(opts) setStreamOpt(opts) protected def handleTriple(triple: RdfTriple): Option[TOut] = @@ -281,6 +282,7 @@ object ProtoDecoderImpl: inner.get.ingestRow(row) private def handleOptions(opts: RdfStreamOptions): Unit = + checkVersion(opts) if inner.isDefined then throw new RdfProtoDeserializationError("Stream options are already set." + "The type of the stream cannot be inferred.") diff --git a/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoEncoder.scala b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoEncoder.scala index 0710698b..54b79a83 100644 --- a/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoEncoder.scala +++ b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoEncoder.scala @@ -238,7 +238,10 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS private def emitOptions(): Unit = emittedOptions = true extraRowsBuffer.append( - RdfStreamRow(RdfStreamRow.Row.Options(options)) + RdfStreamRow(RdfStreamRow.Row.Options( + // Override whatever the user set in the options. + options.withVersion(Constants.protoVersion) + )) ) diff --git a/core/src/main/scala/eu/ostrzyciel/jelly/core/package.scala b/core/src/main/scala/eu/ostrzyciel/jelly/core/package.scala index 06152ab1..db7cbf7a 100644 --- a/core/src/main/scala/eu/ostrzyciel/jelly/core/package.scala +++ b/core/src/main/scala/eu/ostrzyciel/jelly/core/package.scala @@ -16,3 +16,5 @@ package object core: val jellyName = "Jelly" val jellyFileExtension = "jelly" val jellyContentType = "application/x-jelly-rdf" + val protoVersion = 1 + val protoSemanticVersion = "1.0.0" diff --git a/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoDecoderSpec.scala b/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoDecoderSpec.scala index 13ac4e30..ab4e1d7a 100644 --- a/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoDecoderSpec.scala +++ b/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoDecoderSpec.scala @@ -344,7 +344,9 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: for ((testCase, streamType, streamName, expected) <- cases) do s"decode $streamName" in { - val opts = JellyOptions.smallGeneralized.withStreamType(streamType) + val opts = JellyOptions.smallGeneralized + .withStreamType(streamType) + .withVersion(Constants.protoVersion) val decoder = MockConverterFactory.anyStatementDecoder val decoded = testCase .encoded(opts) @@ -388,30 +390,42 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: } val streamTypeCases = Seq( - (MockConverterFactory.triplesDecoder, "Triples", RdfStreamType.QUADS), - (MockConverterFactory.quadsDecoder, "Quads", RdfStreamType.TRIPLES), - (MockConverterFactory.graphsDecoder, "Graphs", RdfStreamType.QUADS), - (MockConverterFactory.graphsAsQuadsDecoder, "GraphsAsQuads", RdfStreamType.TRIPLES), - (MockConverterFactory.anyStatementDecoder, "AnyStatement", RdfStreamType.UNSPECIFIED), + (() => MockConverterFactory.triplesDecoder, "Triples", RdfStreamType.TRIPLES, RdfStreamType.QUADS), + (() => MockConverterFactory.quadsDecoder, "Quads", RdfStreamType.QUADS, RdfStreamType.GRAPHS), + (() => MockConverterFactory.graphsDecoder, "Graphs", RdfStreamType.GRAPHS, RdfStreamType.QUADS), + (() => MockConverterFactory.graphsAsQuadsDecoder, "GraphsAsQuads", RdfStreamType.GRAPHS, RdfStreamType.TRIPLES), + (() => MockConverterFactory.anyStatementDecoder, "AnyStatement", RdfStreamType.TRIPLES, RdfStreamType.UNSPECIFIED), ) - for (decoder, decName, streamType) <- streamTypeCases do + for (decoderFactory, decName, streamType, invalidStreamType) <- streamTypeCases do s"a ${decName}Decoder" should { "throw exception on an empty stream type" in { val data = wrapEncodedFull(Seq(JellyOptions.smallGeneralized)) val error = intercept[RdfProtoDeserializationError] { - decoder.ingestRow(data.head) + decoderFactory().ingestRow(data.head) } error.getMessage should include ("stream type is not") } "throw exception on an invalid stream type" in { val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(streamType), + JellyOptions.smallGeneralized.withStreamType(invalidStreamType), )) val error = intercept[RdfProtoDeserializationError] { - decoder.ingestRow(data.head) + decoderFactory().ingestRow(data.head) } error.getMessage should include ("stream type is not") } + + "throw exception on an unsupported proto version" in { + val data = wrapEncodedFull(Seq( + JellyOptions.smallGeneralized + .withStreamType(streamType) + .withVersion(Constants.protoVersion + 1) + )) + val error = intercept[RdfProtoDeserializationError] { + decoderFactory().ingestRow(data.head) + } + error.getMessage should include("Unsupported proto version") + } } diff --git a/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoTestCases.scala b/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoTestCases.scala index ec7c9a67..ad7def5e 100644 --- a/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoTestCases.scala +++ b/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoTestCases.scala @@ -11,7 +11,11 @@ object ProtoTestCases: val GRAPH_REPEAT: RdfGraph = RdfGraph(RdfGraph.Graph.Repeat(RdfRepeat())) def wrapEncoded(rows: Seq[RowValue]): Seq[RdfStreamRow.Row] = rows map { - case v: RdfStreamOptions => RdfStreamRow.Row.Options(v) + case v: RdfStreamOptions => v.version match + // If the version is not set, set it to the current version + case 0 => RdfStreamRow.Row.Options(v.withVersion(Constants.protoVersion)) + // Otherwise assume we are checking version compatibility + case _ => RdfStreamRow.Row.Options(v) case v: RdfDatatypeEntry => RdfStreamRow.Row.Datatype(v) case v: RdfPrefixEntry => RdfStreamRow.Row.Prefix(v) case v: RdfNameEntry => RdfStreamRow.Row.Name(v)