From 08fc66613792b8157320cacde8509bb1f0be73a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Sowi=C5=84ski?= Date: Wed, 24 Apr 2024 12:06:06 +0200 Subject: [PATCH] Implement RDF-STaX in Jelly (#53) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Issue #51, #52 Implemented: - Physical & logical stream types in Jelly, where logical are directly from RDF-STaX - Logical types are implemented as an enum - Type numbers were picked such that the most common types (1, 2, 3, 4) are low, making the varint in the protobuf output smaller. - Type numbers signify taxonomical relations in RDF-STaX. 14 is a child of 4, and 114 is a child of 14. This makes the system forward compatible – current consumers will be able to interpret future subtypes as their parent types. - Support for logical types in streaming encoders/decoders - Automatic checks for logical type compatibility and logical-physical consistency (if a given stream is possible, can be consumed, etc.) - Emitting RDF-based annotations (metadata) about the logical type of the stream, using the RDF-STaX ontology. This uses the existing facilities of decoder converters. --- core/src/main/protobuf_shared | 2 +- .../eu/ostrzyciel/jelly/core/Constants.scala | 8 + .../jelly/core/ConverterFactory.scala | 24 +- .../{package.scala => JellyExceptions.scala} | 16 +- .../ostrzyciel/jelly/core/JellyOptions.scala | 2 +- .../core/LogicalStreamTypeExtensions.scala | 92 +++++ .../ostrzyciel/jelly/core/ProtoDecoder.scala | 41 +- .../jelly/core/ProtoDecoderImpl.scala | 44 ++- .../LogicalStreamTypeExtensionsSpec.scala | 89 +++++ .../jelly/core/ProtoDecoderSpec.scala | 234 ++++++++--- .../jelly/core/ProtoEncoderSpec.scala | 16 +- .../core/helpers/MockConverterFactory.scala | 4 +- .../eu/ostrzyciel/jelly/grpc/GrpcSpec.scala | 8 +- .../CrossStreamingSpec.scala | 2 +- .../integration_tests/JenaTestStream.scala | 12 +- .../integration_tests/Rdf4jTestStream.scala | 12 +- .../io/Rdf4jReactiveSerDes.scala | 6 +- .../integration_tests/io/Rdf4jSerDes.scala | 6 +- .../convert/jena/JenaConverterFactory.scala | 2 +- .../jelly/convert/jena/riot/JellyWriter.scala | 10 +- .../convert/rdf4j/Rdf4jConverterFactory.scala | 4 +- .../jelly/convert/rdf4j/rio/JellyWriter.scala | 22 +- .../rdf4j/rio/JellyWriterSettings.scala | 12 +- .../ostrzyciel/jelly/stream/DecoderFlow.scala | 374 ++++++++++++------ .../ostrzyciel/jelly/stream/EncoderFlow.scala | 136 +++++-- .../jelly/stream/EncoderSource.scala | 13 +- .../stream/JellyOptionsFromTypesafe.scala | 12 +- .../jelly/stream/DecoderFlowSpec.scala | 126 ++++-- .../jelly/stream/EncoderFlowSpec.scala | 146 +++++-- .../ostrzyciel/jelly/stream/JellyIoSpec.scala | 12 +- .../stream/JellyOptionsFromTypesafeSpec.scala | 8 +- 31 files changed, 1087 insertions(+), 408 deletions(-) create mode 100644 core/src/main/scala/eu/ostrzyciel/jelly/core/Constants.scala rename core/src/main/scala/eu/ostrzyciel/jelly/core/{package.scala => JellyExceptions.scala} (62%) create mode 100644 core/src/main/scala/eu/ostrzyciel/jelly/core/LogicalStreamTypeExtensions.scala create mode 100644 core/src/test/scala/eu/ostrzyciel/jelly/core/LogicalStreamTypeExtensionsSpec.scala diff --git a/core/src/main/protobuf_shared b/core/src/main/protobuf_shared index fa815843..9c68006f 160000 --- a/core/src/main/protobuf_shared +++ b/core/src/main/protobuf_shared @@ -1 +1 @@ -Subproject commit fa815843efb4822898f56713e202d2ae06554f1c +Subproject commit 9c68006fe21d77e6842a73de90d22e878708abbf diff --git a/core/src/main/scala/eu/ostrzyciel/jelly/core/Constants.scala b/core/src/main/scala/eu/ostrzyciel/jelly/core/Constants.scala new file mode 100644 index 00000000..6cbbe58d --- /dev/null +++ b/core/src/main/scala/eu/ostrzyciel/jelly/core/Constants.scala @@ -0,0 +1,8 @@ +package eu.ostrzyciel.jelly.core + +object Constants: + val jellyName = "Jelly" + val jellyFileExtension = "jelly" + val jellyContentType = "application/x-jelly-rdf" + val protoVersion = 1 + val protoSemanticVersion = "1.0.0" diff --git a/core/src/main/scala/eu/ostrzyciel/jelly/core/ConverterFactory.scala b/core/src/main/scala/eu/ostrzyciel/jelly/core/ConverterFactory.scala index 1b8489d4..d411e948 100644 --- a/core/src/main/scala/eu/ostrzyciel/jelly/core/ConverterFactory.scala +++ b/core/src/main/scala/eu/ostrzyciel/jelly/core/ConverterFactory.scala @@ -1,7 +1,7 @@ package eu.ostrzyciel.jelly.core import ProtoDecoderImpl.* -import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions +import eu.ostrzyciel.jelly.core.proto.v1.{LogicalStreamType, RdfStreamOptions} import scala.reflect.ClassTag @@ -24,35 +24,39 @@ trait ConverterFactory[ +TDecConv <: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad], TNode, TDatatype : ClassTag, TTriple, TQuad ]: - protected def decoderConverter: TDecConv + def decoderConverter: TDecConv /** * Create a new [[TriplesDecoder]]. * @return */ - final def triplesDecoder: TriplesDecoder[TNode, TDatatype, TTriple, TQuad] = - new TriplesDecoder(decoderConverter) + final def triplesDecoder(expLogicalType: Option[LogicalStreamType]): + TriplesDecoder[TNode, TDatatype, TTriple, TQuad] = + new TriplesDecoder(decoderConverter, expLogicalType) /** * Create a new [[QuadsDecoder]]. * @return */ - final def quadsDecoder: QuadsDecoder[TNode, TDatatype, TTriple, TQuad] = - new QuadsDecoder(decoderConverter) + final def quadsDecoder(expLogicalType: Option[LogicalStreamType]): + QuadsDecoder[TNode, TDatatype, TTriple, TQuad] = + new QuadsDecoder(decoderConverter, expLogicalType) /** * Create a new [[GraphsAsQuadsDecoder]]. * @return */ - final def graphsAsQuadsDecoder: GraphsAsQuadsDecoder[TNode, TDatatype, TTriple, TQuad] = - new GraphsAsQuadsDecoder(decoderConverter) + final def graphsAsQuadsDecoder(expLogicalType: Option[LogicalStreamType]): + GraphsAsQuadsDecoder[TNode, TDatatype, TTriple, TQuad] = + new GraphsAsQuadsDecoder(decoderConverter, expLogicalType) /** * Create a new [[GraphsDecoder]]. * @return */ - final def graphsDecoder: GraphsDecoder[TNode, TDatatype, TTriple, TQuad] = - new GraphsDecoder(decoderConverter) + final def graphsDecoder(expLogicalType: Option[LogicalStreamType]): + GraphsDecoder[TNode, TDatatype, TTriple, TQuad] = + new GraphsDecoder(decoderConverter, expLogicalType) /** * Create a new [[AnyStatementDecoder]]. diff --git a/core/src/main/scala/eu/ostrzyciel/jelly/core/package.scala b/core/src/main/scala/eu/ostrzyciel/jelly/core/JellyExceptions.scala similarity index 62% rename from core/src/main/scala/eu/ostrzyciel/jelly/core/package.scala rename to core/src/main/scala/eu/ostrzyciel/jelly/core/JellyExceptions.scala index db7cbf7a..2a5f6512 100644 --- a/core/src/main/scala/eu/ostrzyciel/jelly/core/package.scala +++ b/core/src/main/scala/eu/ostrzyciel/jelly/core/JellyExceptions.scala @@ -1,20 +1,18 @@ -package eu.ostrzyciel.jelly +package eu.ostrzyciel.jelly.core -package object core: +private trait JellyExceptions: sealed class RdfProtoDeserializationError(msg: String) extends Error(msg) + final class MissingPrefixEntryError(val prefixId: Int) extends RdfProtoDeserializationError( s"Missing entry in prefix table at ID: $prefixId" ) + final class MissingNameEntryError(val nameId: Int) extends RdfProtoDeserializationError( s"Missing entry in name table at ID: $nameId" ) final class RdfProtoSerializationError(msg: String) extends Error(msg) + +private object JellyExceptions extends JellyExceptions - // Constants - object Constants: - val jellyName = "Jelly" - val jellyFileExtension = "jelly" - val jellyContentType = "application/x-jelly-rdf" - val protoVersion = 1 - val protoSemanticVersion = "1.0.0" +export JellyExceptions.* diff --git a/core/src/main/scala/eu/ostrzyciel/jelly/core/JellyOptions.scala b/core/src/main/scala/eu/ostrzyciel/jelly/core/JellyOptions.scala index 4606c7f4..fcafee74 100644 --- a/core/src/main/scala/eu/ostrzyciel/jelly/core/JellyOptions.scala +++ b/core/src/main/scala/eu/ostrzyciel/jelly/core/JellyOptions.scala @@ -4,7 +4,7 @@ import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions /** * A collection of convenient streaming option presets. - * None of the presets specifies the stream type – do that with the .withStreamType method. + * None of the presets specifies the stream type – do that with the .withPhysicalType method. */ object JellyOptions: diff --git a/core/src/main/scala/eu/ostrzyciel/jelly/core/LogicalStreamTypeExtensions.scala b/core/src/main/scala/eu/ostrzyciel/jelly/core/LogicalStreamTypeExtensions.scala new file mode 100644 index 00000000..30815205 --- /dev/null +++ b/core/src/main/scala/eu/ostrzyciel/jelly/core/LogicalStreamTypeExtensions.scala @@ -0,0 +1,92 @@ +package eu.ostrzyciel.jelly.core + +import eu.ostrzyciel.jelly.core.proto.v1.LogicalStreamType + +import java.util.UUID + +private trait LogicalStreamTypeExtensions: + val staxPrefix = "https://w3id.org/stax/ontology#" + + extension (logicalType: LogicalStreamType) + /** + * Converts the logical stream type to its base concrete stream type in RDF-STaX. + * For example, [[LogicalStreamType.TIMESTAMPED_NAMED_GRAPHS]] will be converted to [[LogicalStreamType.DATASETS]]. + * UNSPECIFIED values will be left as-is. + * + * @return base stream type + */ + def toBaseType: LogicalStreamType = + LogicalStreamType.fromValue(logicalType.value % 10) + + /** + * Checks if the logical stream type is equal to or a subtype of the other logical stream type. + * For example, [[LogicalStreamType.TIMESTAMPED_NAMED_GRAPHS]] is a subtype of [[LogicalStreamType.DATASETS]]. + * + * @param other the other logical stream type + * @return true if the logical stream type is equal to or a subtype of the other logical stream type + */ + def isEqualOrSubtypeOf(other: LogicalStreamType): Boolean = + logicalType == other || logicalType.value.toString.endsWith(other.value.toString) + + /** + * Returns the IRI of the RDF-STaX stream type individual for the logical stream type. + * If the logical stream type is not supported or is not specified, None is returned. + * + * @return the IRI of the RDF-STaX stream type individual + */ + def getRdfStaxType: Option[String] = + logicalType match + case LogicalStreamType.FLAT_TRIPLES => Some(s"${staxPrefix}flatTripleStream") + case LogicalStreamType.FLAT_QUADS => Some(s"${staxPrefix}flatQuadStream") + case LogicalStreamType.GRAPHS => Some(s"${staxPrefix}graphStream") + case LogicalStreamType.SUBJECT_GRAPHS => Some(s"${staxPrefix}subjectGraphStream") + case LogicalStreamType.DATASETS => Some(s"${staxPrefix}datasetStream") + case LogicalStreamType.NAMED_GRAPHS => Some(s"${staxPrefix}namedGraphStream") + case LogicalStreamType.TIMESTAMPED_NAMED_GRAPHS => Some(s"${staxPrefix}timestampedNamedGraphStream") + case _ => None + + /** + * Returns an RDF-STaX annotation for the logical stream type, in RDF. The annotation simply states that + * has a stream type usage, and that stream type usage has this stream type. + * + * Example in Turtle for a flat triple stream: + * stax:hasStreamTypeUsage [ + * a stax:RdfStreamTypeUsage ; + * stax:hasStreamType stax:flatTripleStream + * ] . + * + * @param subjectNode the subject node to annotate + * @param converterFactory the converter factory to use for creating RDF nodes and triples + * @tparam TNode the type of RDF nodes + * @tparam TTriple the type of RDF triples + * @throws IllegalArgumentException if the logical stream type is not supported + * @return the RDF-STaX annotation + */ + def getRdfStaxAnnotation[TNode, TTriple](subjectNode: TNode) + (using converterFactory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): Seq[TTriple] = + getRdfStaxType match + case Some(typeIri) => + val converter = converterFactory.decoderConverter + val bNode = converter.makeBlankNode(UUID.randomUUID().toString) + Seq( + converter.makeTriple( + subjectNode, + converter.makeIriNode(s"${staxPrefix}hasStreamTypeUsage"), + bNode + ), + converter.makeTriple( + bNode, + converter.makeIriNode("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), + converter.makeIriNode(s"${staxPrefix}RdfStreamTypeUsage") + ), + converter.makeTriple( + bNode, + converter.makeIriNode(s"${staxPrefix}hasStreamType"), + converter.makeIriNode(typeIri) + ) + ) + case None => throw new IllegalArgumentException(s"Unsupported logical stream type: $logicalType") + +private object LogicalStreamTypeExtensions extends LogicalStreamTypeExtensions + +export LogicalStreamTypeExtensions.* diff --git a/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoder.scala b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoder.scala index 94c7c8b7..9cf4194b 100644 --- a/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoder.scala +++ b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoder.scala @@ -1,6 +1,6 @@ package eu.ostrzyciel.jelly.core -import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamOptions, RdfStreamRow} +import eu.ostrzyciel.jelly.core.proto.v1.* /** * Base extendable trait for decoders of protobuf RDF streams. @@ -22,3 +22,42 @@ trait ProtoDecoder[+TOut]: protected final def checkVersion(options: RdfStreamOptions): Unit = if options.version > Constants.protoVersion then throw new RdfProtoDeserializationError(s"Unsupported proto version: ${options.version}") + + /** + * Checks if the logical and physical stream types are compatible. Additionally, if the expected logical stream type + * is provided, checks if the actual logical stream type is a subtype of the expected one. + * @param options Options of the stream. + * @param expLogicalType Expected logical stream type. + */ + protected final def checkLogicalStreamType(options: RdfStreamOptions, expLogicalType: Option[LogicalStreamType]): + Unit = + val baseLogicalType = options.logicalType.toBaseType + + val conflict = baseLogicalType match + case LogicalStreamType.UNSPECIFIED => false + case LogicalStreamType.FLAT_TRIPLES => options.physicalType match + case PhysicalStreamType.QUADS => true + case PhysicalStreamType.GRAPHS => true + case _ => false + case LogicalStreamType.FLAT_QUADS => options.physicalType match + case PhysicalStreamType.TRIPLES => true + case _ => false + case LogicalStreamType.GRAPHS => options.physicalType match + case PhysicalStreamType.QUADS => true + case PhysicalStreamType.GRAPHS => true + case _ => false + case LogicalStreamType.DATASETS => options.physicalType match + case PhysicalStreamType.TRIPLES => true + case _ => false + case _ => false + + if conflict then + throw new RdfProtoDeserializationError(s"Logical stream type $baseLogicalType is incompatible with " + + s"physical stream type ${options.physicalType}.") + + expLogicalType match + case Some(v) => + if !options.logicalType.isEqualOrSubtypeOf(v) then + throw new RdfProtoDeserializationError(s"Expected logical stream type $v, got ${options.logicalType}. " + + s"${options.logicalType} is not a subtype of $v.") + case None => diff --git a/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoderImpl.scala b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoderImpl.scala index 5db9063a..1aee4090 100644 --- a/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoderImpl.scala +++ b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoDecoderImpl.scala @@ -12,7 +12,7 @@ import scala.reflect.ClassTag * See the base (extendable) trait: [[ProtoDecoder]]. */ sealed abstract class ProtoDecoderImpl[TNode, TDatatype : ClassTag, +TTriple, +TQuad, +TOut] -(converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad]) +(converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad], expLogicalType: Option[LogicalStreamType]) extends ProtoDecoder[TOut]: private var streamOpt: Option[RdfStreamOptions] = None @@ -137,6 +137,7 @@ sealed abstract class ProtoDecoderImpl[TNode, TDatatype : ClassTag, +TTriple, +T protected def handleOptions(opts: RdfStreamOptions): Unit = checkVersion(opts) + checkLogicalStreamType(opts, expLogicalType) setStreamOpt(opts) protected def handleTriple(triple: RdfTriple): Option[TOut] = @@ -161,11 +162,11 @@ object ProtoDecoderImpl: * A decoder that reads TRIPLES streams and outputs a sequence of triples. */ final class TriplesDecoder[TNode, TDatatype : ClassTag, TTriple, TQuad] - (converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad]) - extends ProtoDecoderImpl[TNode, TDatatype, TTriple, TQuad, TTriple](converter): + (converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad], expLogicalType: Option[LogicalStreamType]) + extends ProtoDecoderImpl[TNode, TDatatype, TTriple, TQuad, TTriple](converter, expLogicalType): override protected def handleOptions(opts: RdfStreamOptions): Unit = - if !opts.streamType.isTriples then + if !opts.physicalType.isTriples then throw new RdfProtoDeserializationError("Incoming stream type is not TRIPLES.") super.handleOptions(opts) @@ -176,11 +177,11 @@ object ProtoDecoderImpl: * A decoder that reads QUADS streams and outputs a sequence of quads. */ final class QuadsDecoder[TNode, TDatatype : ClassTag, TTriple, TQuad] - (converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad]) - extends ProtoDecoderImpl[TNode, TDatatype, TTriple, TQuad, TQuad](converter): + (converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad], expLogicalType: Option[LogicalStreamType]) + extends ProtoDecoderImpl[TNode, TDatatype, TTriple, TQuad, TQuad](converter, expLogicalType): override protected def handleOptions(opts: RdfStreamOptions): Unit = - if !opts.streamType.isQuads then + if !opts.physicalType.isQuads then throw new RdfProtoDeserializationError("Incoming stream type is not QUADS.") super.handleOptions(opts) @@ -191,12 +192,12 @@ object ProtoDecoderImpl: * A decoder that reads GRAPHS streams and outputs a flat sequence of quads. */ final class GraphsAsQuadsDecoder[TNode, TDatatype : ClassTag, TTriple, TQuad] - (converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad]) - extends ProtoDecoderImpl[TNode, TDatatype, TTriple, TQuad, TQuad](converter): + (converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad], expLogicalType: Option[LogicalStreamType]) + extends ProtoDecoderImpl[TNode, TDatatype, TTriple, TQuad, TQuad](converter, expLogicalType): private var currentGraph: Option[TNode] = None override protected def handleOptions(opts: RdfStreamOptions): Unit = - if !opts.streamType.isGraphs then + if !opts.physicalType.isGraphs then throw new RdfProtoDeserializationError("Incoming stream type is not GRAPHS.") super.handleOptions(opts) @@ -223,13 +224,13 @@ object ProtoDecoderImpl: * Each graph is emitted as soon as the producer signals that it's complete. */ final class GraphsDecoder[TNode, TDatatype : ClassTag, TTriple, TQuad] - (converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad]) - extends ProtoDecoderImpl[TNode, TDatatype, TTriple, TQuad, (TNode, Iterable[TTriple])](converter): + (converter: ProtoDecoderConverter[TNode, TDatatype, TTriple, TQuad], expLogicalType: Option[LogicalStreamType]) + extends ProtoDecoderImpl[TNode, TDatatype, TTriple, TQuad, (TNode, Iterable[TTriple])](converter, expLogicalType): private var currentGraph: Option[TNode] = None private var buffer: ListBuffer[TTriple] = new ListBuffer[TTriple]() override protected def handleOptions(opts: RdfStreamOptions): Unit = - if !opts.streamType.isGraphs then + if !opts.physicalType.isGraphs then throw new RdfProtoDeserializationError("Incoming stream type is not GRAPHS.") super.handleOptions(opts) @@ -283,17 +284,18 @@ object ProtoDecoderImpl: private def handleOptions(opts: RdfStreamOptions): Unit = checkVersion(opts) + checkLogicalStreamType(opts, None) if inner.isDefined then throw new RdfProtoDeserializationError("Stream options are already set." + "The type of the stream cannot be inferred.") - val dec = opts.streamType match - case RdfStreamType.TRIPLES => - new TriplesDecoder[TNode, TDatatype, TTriple, TQuad](converter) - case RdfStreamType.QUADS => - new QuadsDecoder[TNode, TDatatype, TTriple, TQuad](converter) - case RdfStreamType.GRAPHS => - new GraphsAsQuadsDecoder[TNode, TDatatype, TTriple, TQuad](converter) - case RdfStreamType.UNSPECIFIED => + val dec = opts.physicalType match + case PhysicalStreamType.TRIPLES => + new TriplesDecoder[TNode, TDatatype, TTriple, TQuad](converter, None) + case PhysicalStreamType.QUADS => + new QuadsDecoder[TNode, TDatatype, TTriple, TQuad](converter, None) + case PhysicalStreamType.GRAPHS => + new GraphsAsQuadsDecoder[TNode, TDatatype, TTriple, TQuad](converter, None) + case PhysicalStreamType.UNSPECIFIED => throw new RdfProtoDeserializationError("Incoming stream type is not set.") case _ => throw new RdfProtoDeserializationError("Incoming stream type is not recognized.") diff --git a/core/src/test/scala/eu/ostrzyciel/jelly/core/LogicalStreamTypeExtensionsSpec.scala b/core/src/test/scala/eu/ostrzyciel/jelly/core/LogicalStreamTypeExtensionsSpec.scala new file mode 100644 index 00000000..89f02527 --- /dev/null +++ b/core/src/test/scala/eu/ostrzyciel/jelly/core/LogicalStreamTypeExtensionsSpec.scala @@ -0,0 +1,89 @@ +package eu.ostrzyciel.jelly.core + +import eu.ostrzyciel.jelly.core.helpers.Assertions.* +import eu.ostrzyciel.jelly.core.helpers.MockConverterFactory +import eu.ostrzyciel.jelly.core.helpers.Mrl.* +import eu.ostrzyciel.jelly.core.proto.v1.* +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class LogicalStreamTypeExtensionsSpec extends AnyWordSpec, Matchers: + private val validStreamTypes = LogicalStreamType.values.filter(_.value > 0) + + given MockConverterFactory.type = MockConverterFactory + + "toBaseType" should { + for streamType <- validStreamTypes do + s"return base type for $streamType" in { + val baseValue = streamType.toBaseType.value + baseValue should be > 0 + baseValue should be < 10 + streamType.value.toString should endWith (baseValue.toString) + } + } + + "isEqualOrSubtypeOf" should { + for streamType <- validStreamTypes do + s"return true for $streamType and itself" in { + streamType.isEqualOrSubtypeOf(streamType) shouldBe true + } + + s"return true for $streamType and its base type" in { + streamType.isEqualOrSubtypeOf(streamType.toBaseType) shouldBe true + } + + if streamType.toBaseType != streamType then + s"return false for ${streamType.toBaseType} and $streamType" in { + streamType.toBaseType.isEqualOrSubtypeOf(streamType) shouldBe false + } + + s"return false for $streamType and an undefined type" in { + streamType.isEqualOrSubtypeOf(LogicalStreamType.UNSPECIFIED) shouldBe false + } + + s"return false for an undefined type and $streamType" in { + LogicalStreamType.UNSPECIFIED.isEqualOrSubtypeOf(streamType) shouldBe false + } + } + + "getRdfStaxType" should { + for streamType <- validStreamTypes do + s"return RDF STaX type for $streamType" in { + val t = streamType.getRdfStaxType + t.isDefined should be (true) + t.get should startWith ("https://w3id.org/stax/ontology#") + } + + "not return RDF STaX type for UNSPECIFIED" in { + LogicalStreamType.UNSPECIFIED.getRdfStaxType should be (None) + } + } + + "getRdfStaxAnnotation" should { + val subjectNodes = Seq( + Iri("https://example.org/stream"), + BlankNode("stream"), + null, + ) + + for + streamType <- validStreamTypes + subjectNode <- subjectNodes + do + s"return RDF STaX annotation for $streamType and $subjectNode" in { + val a = streamType.getRdfStaxAnnotation(subjectNode) + a.size should be (3) + a.head.s should be (subjectNode) + a.head.p should be (Iri("https://w3id.org/stax/ontology#hasStreamTypeUsage")) + a(2).o should be (Iri(streamType.getRdfStaxType.get)) + } + + for subjectNode <- subjectNodes do + s"throw exception for RDF STaX annotation for UNSPECIFIED and $subjectNode" in { + val error = intercept[IllegalArgumentException] { + LogicalStreamType.UNSPECIFIED.getRdfStaxAnnotation(subjectNode) should be (empty) + } + error.getMessage should include ("Unsupported logical stream type") + error.getMessage should include ("UNSPECIFIED") + } + } diff --git a/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoDecoderSpec.scala b/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoDecoderSpec.scala index ab4e1d7a..8037313f 100644 --- a/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoDecoderSpec.scala +++ b/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoDecoderSpec.scala @@ -11,21 +11,141 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: import ProtoDecoderImpl.* import ProtoTestCases.* + "checkLogicalStreamType" should { + val decoderFactories = Seq( + ("TriplesDecoder", (MockConverterFactory.triplesDecoder, PhysicalStreamType.TRIPLES)), + ("QuadsDecoder", (MockConverterFactory.quadsDecoder, PhysicalStreamType.QUADS)), + ("GraphsAsQuadsDecoder", (MockConverterFactory.graphsAsQuadsDecoder, PhysicalStreamType.GRAPHS)), + ("GraphsDecoder", (MockConverterFactory.graphsDecoder, PhysicalStreamType.GRAPHS)), + ).toMap + val logicalStreamTypeSets = Seq( + ( + Seq(LogicalStreamType.FLAT_TRIPLES), + Seq("TriplesDecoder") + ), + ( + Seq(LogicalStreamType.FLAT_QUADS), + Seq("QuadsDecoder", "GraphsAsQuadsDecoder") + ), + ( + Seq( + LogicalStreamType.GRAPHS, + LogicalStreamType.SUBJECT_GRAPHS, + ), + Seq("TriplesDecoder") + ), + ( + Seq( + LogicalStreamType.DATASETS, + LogicalStreamType.NAMED_GRAPHS, + LogicalStreamType.TIMESTAMPED_NAMED_GRAPHS, + ), + Seq("QuadsDecoder", "GraphsDecoder", "GraphsAsQuadsDecoder") + ), + ( + Seq( + LogicalStreamType.NAMED_GRAPHS, + LogicalStreamType.TIMESTAMPED_NAMED_GRAPHS, + ), + Seq("GraphsDecoder") + ) + ) + + for + (logicalStreamTypeSet, decoders) <- logicalStreamTypeSets + decoderName <- decoders + do + val lst = logicalStreamTypeSet.head + val (decoderF, pst) = decoderFactories(decoderName) + + f"throw exception when expecting logical type $lst on a stream with no logical type, with $decoderName" in { + val decoder = decoderF(Some(lst)) + val data = wrapEncodedFull(Seq( + JellyOptions.smallGeneralized + .withPhysicalType(pst) + .withLogicalType(LogicalStreamType.UNSPECIFIED) + )) + val error = intercept[RdfProtoDeserializationError] { + decoder.ingestRow(data.head) + } + error.getMessage should include("Expected logical stream type") + } + + for lstOfStream <- logicalStreamTypeSet do + f"accept stream with logical type $lstOfStream when expecting $lst, with $decoderName" in { + val decoder = decoderF(Some(lst)) + val data = wrapEncodedFull(Seq( + JellyOptions.smallGeneralized + .withPhysicalType(pst) + .withLogicalType(lstOfStream) + )) + decoder.ingestRow(data.head) + decoder.getStreamOpt.get.logicalType should be (lstOfStream) + } + + for + (pst, decs) <- decoderFactories.groupBy(_._2._2) + (decoderName, (decoderF, _)) <- decs + (lstSet, _) <- logicalStreamTypeSets.take(4).filterNot(x => x._2.exists(y => decs.exists(z => z._1 == y))) + lstOfStream <- lstSet + do + f"throw exception that a stream with logical type $lstOfStream is incompatible with $pst, with $decoderName" in { + val decoder = decoderF(None) + val data = wrapEncodedFull(Seq( + JellyOptions.smallGeneralized + .withPhysicalType(pst) + .withLogicalType(lstOfStream) + )) + val error = intercept[RdfProtoDeserializationError] { + decoder.ingestRow(data.head) + } + error.getMessage should include("is incompatible with physical stream type") + } + } + // Test body "a TriplesDecoder" should { "decode triple statements" in { - val decoder = MockConverterFactory.triplesDecoder + val decoder = MockConverterFactory.triplesDecoder(Some(LogicalStreamType.FLAT_TRIPLES)) + val decoded = Triples1 + .encoded(JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.FLAT_TRIPLES) + ) + .flatMap(row => decoder.ingestRow(RdfStreamRow(row))) + assertDecoded(decoded, Triples1.mrl) + } + + "decode triple statements with unset expected logical stream type" in { + val decoder = MockConverterFactory.triplesDecoder(None) val decoded = Triples1 - .encoded(JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES)) + .encoded(JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.FLAT_TRIPLES) + ) .flatMap(row => decoder.ingestRow(RdfStreamRow(row))) assertDecoded(decoded, Triples1.mrl) } + "throw exception on unset logical stream type" in { + val decoder = MockConverterFactory.triplesDecoder(Some(LogicalStreamType.FLAT_TRIPLES)) + val data = wrapEncodedFull(Seq( + JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.UNSPECIFIED) + )) + val error = intercept[RdfProtoDeserializationError] { + decoder.ingestRow(data.head) + } + error.getMessage should include ("Expected logical stream type") + } + "decode triple statements (norepeat)" in { - val decoder = MockConverterFactory.triplesDecoder + val decoder = MockConverterFactory.triplesDecoder(Some(LogicalStreamType.FLAT_TRIPLES)) val decoded = Triples2NoRepeat .encoded(JellyOptions.smallGeneralized - .withStreamType(RdfStreamType.TRIPLES) + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.FLAT_TRIPLES) .withUseRepeat(false) ) .flatMap(row => decoder.ingestRow(RdfStreamRow(row))) @@ -33,9 +153,9 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: } "throw exception on a quad in a TRIPLES stream" in { - val decoder = MockConverterFactory.triplesDecoder + val decoder = MockConverterFactory.triplesDecoder(None) val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.TRIPLES), RdfQuad( RdfTerm(RdfTerm.Term.Bnode("1")), RdfTerm(RdfTerm.Term.Bnode("2")), @@ -54,11 +174,11 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: // The code is the same in quads, triples, or graphs decoders, so this is fine. // Code coverage checks out. "ignore duplicate stream options" in { - val decoder = MockConverterFactory.triplesDecoder + val decoder = MockConverterFactory.triplesDecoder(None) val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.TRIPLES), JellyOptions.smallGeneralized - .withStreamType(RdfStreamType.TRIPLES) + .withPhysicalType(PhysicalStreamType.TRIPLES) .withUseRepeat(false), )) @@ -69,9 +189,9 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: } "throw exception on RdfRepeat without preceding value" in { - val decoder = MockConverterFactory.triplesDecoder + val decoder = MockConverterFactory.triplesDecoder(None) val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.TRIPLES), RdfTriple(TERM_REPEAT, TERM_REPEAT, TERM_REPEAT), )) decoder.ingestRow(data.head) @@ -82,9 +202,9 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: } "throw exception on RdfRepeat in a quoted triple" in { - val decoder = MockConverterFactory.triplesDecoder + val decoder = MockConverterFactory.triplesDecoder(None) val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.TRIPLES), RdfTriple( RdfTerm(RdfTerm.Term.Bnode("1")), RdfTerm(RdfTerm.Term.Bnode("2")), @@ -101,7 +221,7 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: } "throw exception on unset row kind" in { - val decoder = MockConverterFactory.triplesDecoder + val decoder = MockConverterFactory.triplesDecoder(None) val error = intercept[RdfProtoDeserializationError] { decoder.ingestRow(RdfStreamRow(RdfStreamRow.Row.Empty)) } @@ -109,9 +229,9 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: } "throw exception on unset term kind" in { - val decoder = MockConverterFactory.triplesDecoder + val decoder = MockConverterFactory.triplesDecoder(None) val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.TRIPLES), RdfTriple( RdfTerm(RdfTerm.Term.Bnode("1")), RdfTerm(RdfTerm.Term.Bnode("2")), @@ -126,9 +246,9 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: } "throw exception on unset literal kind" in { - val decoder = MockConverterFactory.triplesDecoder + val decoder = MockConverterFactory.triplesDecoder(None) val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.TRIPLES), RdfTriple( RdfTerm(RdfTerm.Term.Bnode("1")), RdfTerm(RdfTerm.Term.Bnode("2")), @@ -145,21 +265,21 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: "a QuadsDecoder" should { "decode quad statements" in { - val decoder = MockConverterFactory.quadsDecoder + val decoder = MockConverterFactory.quadsDecoder(None) val decoded = Quads1 .encoded( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.QUADS) + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.QUADS) ) .flatMap(row => decoder.ingestRow(RdfStreamRow(row))) assertDecoded(decoded, Quads1.mrl) } "decode quad statements (norepeat)" in { - val decoder = MockConverterFactory.quadsDecoder + val decoder = MockConverterFactory.quadsDecoder(None) val decoded = Quads2NoRepeat .encoded( JellyOptions.smallGeneralized - .withStreamType(RdfStreamType.QUADS) + .withPhysicalType(PhysicalStreamType.QUADS) .withUseRepeat(false) ) .flatMap(row => decoder.ingestRow(RdfStreamRow(row))) @@ -167,19 +287,19 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: } "decode quad statements (repeated default graph)" in { - val decoder = MockConverterFactory.quadsDecoder + val decoder = MockConverterFactory.quadsDecoder(None) val decoded = Quads3RepeatDefault .encoded( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.QUADS) + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.QUADS) ) .flatMap(row => decoder.ingestRow(RdfStreamRow(row))) assertDecoded(decoded, Quads3RepeatDefault.mrl) } "throw exception on a triple in a QUADS stream" in { - val decoder = MockConverterFactory.quadsDecoder + val decoder = MockConverterFactory.quadsDecoder(None) val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.QUADS), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.QUADS), RdfTriple( RdfTerm(RdfTerm.Term.Bnode("1")), RdfTerm(RdfTerm.Term.Bnode("2")), @@ -194,9 +314,9 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: } "throw exception on a graph start in a QUADS stream" in { - val decoder = MockConverterFactory.quadsDecoder + val decoder = MockConverterFactory.quadsDecoder(None) val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.QUADS), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.QUADS), RdfGraphStart(RdfGraph(RdfGraph.Graph.DefaultGraph(RdfDefaultGraph()))), )) decoder.ingestRow(data.head) @@ -207,9 +327,9 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: } "throw exception on a graph end in a QUADS stream" in { - val decoder = MockConverterFactory.quadsDecoder + val decoder = MockConverterFactory.quadsDecoder(None) val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.QUADS), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.QUADS), RdfGraphEnd(), )) decoder.ingestRow(data.head) @@ -222,10 +342,10 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: "a GraphsDecoder" should { "decode graphs" in { - val decoder = MockConverterFactory.graphsDecoder + val decoder = MockConverterFactory.graphsDecoder(None) val decoded = Graphs1 .encoded( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS) + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.GRAPHS) ) .flatMap(row => decoder.ingestRow(RdfStreamRow(row))) @@ -242,9 +362,9 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: } "throw exception on a quad in a GRAPHS stream" in { - val decoder = MockConverterFactory.graphsDecoder + val decoder = MockConverterFactory.graphsDecoder(None) val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.GRAPHS), RdfQuad( RdfTerm(RdfTerm.Term.Bnode("1")), RdfTerm(RdfTerm.Term.Bnode("2")), @@ -260,9 +380,9 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: } "throw exception on a graph end before a graph start" in { - val decoder = MockConverterFactory.graphsDecoder + val decoder = MockConverterFactory.graphsDecoder(None) val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.GRAPHS), RdfTriple( RdfTerm(RdfTerm.Term.Bnode("1")), RdfTerm(RdfTerm.Term.Bnode("2")), @@ -280,9 +400,9 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: // The following cases are for the [[ProtoDecoder]] base class – but tested on the child. "throw exception on graph term repeat in graph name" in { - val decoder = MockConverterFactory.graphsDecoder + val decoder = MockConverterFactory.graphsDecoder(None) val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.GRAPHS), RdfGraphStart(RdfGraph(RdfGraph.Graph.Repeat(RdfRepeat()))), )) decoder.ingestRow(data.head) @@ -293,9 +413,9 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: } "throw exception on unset graph term type" in { - val decoder = MockConverterFactory.graphsDecoder + val decoder = MockConverterFactory.graphsDecoder(None) val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.GRAPHS), RdfGraphStart(), )) decoder.ingestRow(data.head) @@ -308,19 +428,19 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: "a GraphsAsQuadsDecoder" should { "decode graphs as quads" in { - val decoder = MockConverterFactory.graphsAsQuadsDecoder + val decoder = MockConverterFactory.graphsAsQuadsDecoder(None) val decoded = Graphs1 .encoded( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS) + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.GRAPHS) ) .flatMap(row => decoder.ingestRow(RdfStreamRow(row))) assertDecoded(decoded, Graphs1.mrlQuads) } "throw exception on a triple before a graph start" in { - val decoder = MockConverterFactory.graphsAsQuadsDecoder + val decoder = MockConverterFactory.graphsAsQuadsDecoder(None) val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.GRAPHS), RdfTriple( RdfTerm(RdfTerm.Term.Bnode("1")), RdfTerm(RdfTerm.Term.Bnode("2")), @@ -337,15 +457,15 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: "an AnyStatementDecoder" should { val cases = Seq( - (Triples1, RdfStreamType.TRIPLES, "triples", Triples1.mrl), - (Quads1, RdfStreamType.QUADS, "quads", Quads1.mrl), - (Graphs1, RdfStreamType.GRAPHS, "graphs", Graphs1.mrlQuads), + (Triples1, PhysicalStreamType.TRIPLES, "triples", Triples1.mrl), + (Quads1, PhysicalStreamType.QUADS, "quads", Quads1.mrl), + (Graphs1, PhysicalStreamType.GRAPHS, "graphs", Graphs1.mrlQuads), ) for ((testCase, streamType, streamName, expected) <- cases) do s"decode $streamName" in { val opts = JellyOptions.smallGeneralized - .withStreamType(streamType) + .withPhysicalType(streamType) .withVersion(Constants.protoVersion) val decoder = MockConverterFactory.anyStatementDecoder val decoded = testCase @@ -378,8 +498,8 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: "should throw when encountering stream options twice" in { val decoder = MockConverterFactory.anyStatementDecoder val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES), - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.TRIPLES), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.TRIPLES), )) decoder.ingestRow(data.head) val error = intercept[RdfProtoDeserializationError] { @@ -390,11 +510,11 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: } val streamTypeCases = Seq( - (() => MockConverterFactory.triplesDecoder, "Triples", RdfStreamType.TRIPLES, RdfStreamType.QUADS), - (() => MockConverterFactory.quadsDecoder, "Quads", RdfStreamType.QUADS, RdfStreamType.GRAPHS), - (() => MockConverterFactory.graphsDecoder, "Graphs", RdfStreamType.GRAPHS, RdfStreamType.QUADS), - (() => MockConverterFactory.graphsAsQuadsDecoder, "GraphsAsQuads", RdfStreamType.GRAPHS, RdfStreamType.TRIPLES), - (() => MockConverterFactory.anyStatementDecoder, "AnyStatement", RdfStreamType.TRIPLES, RdfStreamType.UNSPECIFIED), + (() => MockConverterFactory.triplesDecoder(None), "Triples", PhysicalStreamType.TRIPLES, PhysicalStreamType.QUADS), + (() => MockConverterFactory.quadsDecoder(None), "Quads", PhysicalStreamType.QUADS, PhysicalStreamType.GRAPHS), + (() => MockConverterFactory.graphsDecoder(None), "Graphs", PhysicalStreamType.GRAPHS, PhysicalStreamType.QUADS), + (() => MockConverterFactory.graphsAsQuadsDecoder(None), "GraphsAsQuads", PhysicalStreamType.GRAPHS, PhysicalStreamType.TRIPLES), + (() => MockConverterFactory.anyStatementDecoder, "AnyStatement", PhysicalStreamType.TRIPLES, PhysicalStreamType.UNSPECIFIED), ) for (decoderFactory, decName, streamType, invalidStreamType) <- streamTypeCases do @@ -409,7 +529,7 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: "throw exception on an invalid stream type" in { val data = wrapEncodedFull(Seq( - JellyOptions.smallGeneralized.withStreamType(invalidStreamType), + JellyOptions.smallGeneralized.withPhysicalType(invalidStreamType), )) val error = intercept[RdfProtoDeserializationError] { decoderFactory().ingestRow(data.head) @@ -420,7 +540,7 @@ class ProtoDecoderSpec extends AnyWordSpec, Matchers: "throw exception on an unsupported proto version" in { val data = wrapEncodedFull(Seq( JellyOptions.smallGeneralized - .withStreamType(streamType) + .withPhysicalType(streamType) .withVersion(Constants.protoVersion + 1) )) val error = intercept[RdfProtoDeserializationError] { diff --git a/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoEncoderSpec.scala b/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoEncoderSpec.scala index 30c09bfa..dd2af4cd 100644 --- a/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoEncoderSpec.scala +++ b/core/src/test/scala/eu/ostrzyciel/jelly/core/ProtoEncoderSpec.scala @@ -14,7 +14,7 @@ class ProtoEncoderSpec extends AnyWordSpec, Matchers: "a ProtoEncoder" should { "encode triple statements" in { val encoder = MockProtoEncoder( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES) + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.TRIPLES) ) val encoded = Triples1.mrl.flatMap(triple => encoder.addTripleStatement(triple).toSeq) assertEncoded(encoded, Triples1.encoded(encoder.options)) @@ -23,7 +23,7 @@ class ProtoEncoderSpec extends AnyWordSpec, Matchers: "encode triple statements (norepeat)" in { val encoder = MockProtoEncoder( JellyOptions.smallGeneralized - .withStreamType(RdfStreamType.TRIPLES) + .withPhysicalType(PhysicalStreamType.TRIPLES) .withUseRepeat(false) ) val encoded = Triples2NoRepeat.mrl.flatMap(triple => encoder.addTripleStatement(triple).toSeq) @@ -32,7 +32,7 @@ class ProtoEncoderSpec extends AnyWordSpec, Matchers: "encode quad statements" in { val encoder = MockProtoEncoder( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.QUADS) + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.QUADS) ) val encoded = Quads1.mrl.flatMap(quad => encoder.addQuadStatement(quad).toSeq) assertEncoded(encoded, Quads1.encoded(encoder.options)) @@ -41,7 +41,7 @@ class ProtoEncoderSpec extends AnyWordSpec, Matchers: "encode quad statements (norepeat)" in { val encoder = MockProtoEncoder( JellyOptions.smallGeneralized - .withStreamType(RdfStreamType.QUADS) + .withPhysicalType(PhysicalStreamType.QUADS) .withUseRepeat(false) ) val encoded = Quads2NoRepeat.mrl.flatMap(quad => encoder.addQuadStatement(quad).toSeq) @@ -50,7 +50,7 @@ class ProtoEncoderSpec extends AnyWordSpec, Matchers: "encode quad statements (repeated default graph)" in { val encoder = MockProtoEncoder( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.QUADS) + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.QUADS) ) val encoded = Quads3RepeatDefault.mrl.flatMap(quad => encoder.addQuadStatement(quad).toSeq) assertEncoded(encoded, Quads3RepeatDefault.encoded(encoder.options)) @@ -58,7 +58,7 @@ class ProtoEncoderSpec extends AnyWordSpec, Matchers: "encode graphs" in { val encoder = MockProtoEncoder( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS) + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.GRAPHS) ) val encoded = Graphs1.mrl.flatMap((graphName, triples) => Seq( encoder.startGraph(graphName).toSeq, @@ -70,7 +70,7 @@ class ProtoEncoderSpec extends AnyWordSpec, Matchers: "not allow to end a graph before starting one" in { val encoder = MockProtoEncoder( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.QUADS) + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.QUADS) ) val error = intercept[RdfProtoSerializationError] { encoder.endGraph() @@ -80,7 +80,7 @@ class ProtoEncoderSpec extends AnyWordSpec, Matchers: "not allow to use quoted triples as the graph name" in { val encoder = MockProtoEncoder( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS) + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.GRAPHS) ) val error = intercept[RdfProtoSerializationError] { encoder.startGraph(TripleNode( diff --git a/core/src/test/scala/eu/ostrzyciel/jelly/core/helpers/MockConverterFactory.scala b/core/src/test/scala/eu/ostrzyciel/jelly/core/helpers/MockConverterFactory.scala index 6ae0dca1..8d6cf59b 100644 --- a/core/src/test/scala/eu/ostrzyciel/jelly/core/helpers/MockConverterFactory.scala +++ b/core/src/test/scala/eu/ostrzyciel/jelly/core/helpers/MockConverterFactory.scala @@ -7,6 +7,6 @@ import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions object MockConverterFactory extends ConverterFactory [MockProtoEncoder, MockProtoDecoderConverter, Node, Datatype, Triple, Quad]: - override protected def decoderConverter = new MockProtoDecoderConverter() + override final def decoderConverter = new MockProtoDecoderConverter() - override def encoder(options: RdfStreamOptions) = new MockProtoEncoder(options) + override final def encoder(options: RdfStreamOptions) = new MockProtoEncoder(options) diff --git a/grpc/src/test/scala/eu/ostrzyciel/jelly/grpc/GrpcSpec.scala b/grpc/src/test/scala/eu/ostrzyciel/jelly/grpc/GrpcSpec.scala index 21296306..8eee2320 100644 --- a/grpc/src/test/scala/eu/ostrzyciel/jelly/grpc/GrpcSpec.scala +++ b/grpc/src/test/scala/eu/ostrzyciel/jelly/grpc/GrpcSpec.scala @@ -62,26 +62,26 @@ class GrpcSpec extends AnyWordSpec, Matchers, ScalaFutures, BeforeAndAfterAll: "triples" -> Triples1.encodedFull( JellyOptions.smallGeneralized .withStreamName("triples") - .withStreamType(RdfStreamType.TRIPLES), + .withPhysicalType(PhysicalStreamType.TRIPLES), 1 ), "triples_norepeat" -> Triples2NoRepeat.encodedFull( JellyOptions.smallGeneralized .withStreamName("triples_norepeat") - .withStreamType(RdfStreamType.TRIPLES) + .withPhysicalType(PhysicalStreamType.TRIPLES) .withUseRepeat(false), 2 ), "quads" -> Quads1.encodedFull( JellyOptions.smallGeneralized .withStreamName("quads") - .withStreamType(RdfStreamType.QUADS), + .withPhysicalType(PhysicalStreamType.QUADS), 3 ), "graphs" -> Graphs1.encodedFull( JellyOptions.smallGeneralized .withStreamName("graphs") - .withStreamType(RdfStreamType.GRAPHS), + .withPhysicalType(PhysicalStreamType.GRAPHS), 1 ), ) diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/CrossStreamingSpec.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/CrossStreamingSpec.scala index eb9cf817..0bee41e8 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/CrossStreamingSpec.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/CrossStreamingSpec.scala @@ -69,7 +69,7 @@ class CrossStreamingSpec extends AnyWordSpec, Matchers, ScalaFutures, BeforeAndA ("stream row count: 200", StreamRowCountLimiter(200)), ) - final case class CaseKey(streamType: String, encoder: String, jOpt: String, sOpt: String, caseName: String) + final case class CaseKey(physicalType: String, encoder: String, jOpt: String, sOpt: String, caseName: String) private val encodedSizes: mutable.Map[CaseKey, Long] = mutable.Map() diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/JenaTestStream.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/JenaTestStream.scala index 8341ecd3..c915a0f4 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/JenaTestStream.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/JenaTestStream.scala @@ -16,11 +16,11 @@ case object JenaTestStream extends TestStream: override def tripleSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = Source.fromIterator(() => AsyncParser.asyncParseTriples(is, Lang.NT, "").asScala) - .via(EncoderFlow.fromFlatTriples(limiter, jellyOpt)) + .via(EncoderFlow.flatTripleStream(limiter, jellyOpt)) override def quadSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = Source.fromIterator(() => AsyncParser.asyncParseQuads(is, Lang.NQUADS, "").asScala) - .via(EncoderFlow.fromFlatQuads(limiter, jellyOpt)) + .via(EncoderFlow.flatQuadStream(limiter, jellyOpt)) override def graphSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = val ds = RDFParser.source(is) @@ -31,22 +31,22 @@ case object JenaTestStream extends TestStream: Iterator((null, Iterable.from(ds.getDefaultGraph.find.asScala))) ).filter((_, g) => g.nonEmpty) Source.fromIterator(() => graphs) - .via(EncoderFlow.fromGraphs(Some(limiter), jellyOpt)) + .via(EncoderFlow.namedGraphStream(Some(limiter), jellyOpt)) override def tripleSink(os: OutputStream)(implicit ec: ExecutionContext) = Flow[RdfStreamFrame] - .via(DecoderFlow.triplesToFlat) + .via(DecoderFlow.decodeTriples.asFlatTripleStream()) // buffer the triples to avoid OOMs and keep some perf .grouped(32) .toMat(Sink.foreach(triples => RDFDataMgr.writeTriples(os, triples.iterator.asJava)))(Keep.right) override def quadSink(os: OutputStream)(implicit ec: ExecutionContext) = Flow[RdfStreamFrame] - .via(DecoderFlow.quadsToFlat) + .via(DecoderFlow.decodeQuads.asFlatQuadStream()) .grouped(32) .toMat(Sink.foreach(quads => RDFDataMgr.writeQuads(os, quads.iterator.asJava)))(Keep.right) override def graphSink(os: OutputStream)(implicit ec: ExecutionContext) = Flow[RdfStreamFrame] - .via(DecoderFlow.graphsAsQuadsToGrouped) + .via(DecoderFlow.decodeGraphs.asDatasetStreamOfQuads()) .toMat(Sink.foreach(quads => RDFDataMgr.writeQuads(os, quads.iterator.asJava)))(Keep.right) diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/Rdf4jTestStream.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/Rdf4jTestStream.scala index 0c7c8c3c..36c37760 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/Rdf4jTestStream.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/Rdf4jTestStream.scala @@ -22,7 +22,7 @@ case object Rdf4jTestStream extends TestStream: parser.setRDFHandler(collector) parser.parse(is) Source.fromIterator(() => collector.getStatements.asScala.iterator) - .via(EncoderFlow.fromFlatTriples(limiter, jellyOpt)) + .via(EncoderFlow.flatTripleStream(limiter, jellyOpt)) override def quadSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = val parser = Rio.createParser(RDFFormat.NQUADS) @@ -30,7 +30,7 @@ case object Rdf4jTestStream extends TestStream: parser.setRDFHandler(collector) parser.parse(is) Source.fromIterator(() => collector.getStatements.asScala.iterator) - .via(EncoderFlow.fromFlatQuads(limiter, jellyOpt)) + .via(EncoderFlow.flatQuadStream(limiter, jellyOpt)) override def graphSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = val parser = Rio.createParser(RDFFormat.NQUADS) @@ -40,13 +40,13 @@ case object Rdf4jTestStream extends TestStream: val graphs = collector.getStatements.asScala.toSeq .groupBy(_.getContext) Source.fromIterator(() => graphs.iterator) - .via(EncoderFlow.fromGraphs(Some(limiter), jellyOpt)) + .via(EncoderFlow.namedGraphStream(Some(limiter), jellyOpt)) override def tripleSink(os: OutputStream)(implicit ec: ExecutionContext) = val writer = Rio.createWriter(RDFFormat.TURTLESTAR, os) writer.startRDF() Flow[RdfStreamFrame] - .via(DecoderFlow.triplesToFlat) + .via(DecoderFlow.decodeTriples.asFlatTripleStream()) .toMat(Sink.foreach(st => writer.handleStatement(st)))(Keep.right) .mapMaterializedValue(f => f.map(_ => { writer.endRDF() @@ -57,7 +57,7 @@ case object Rdf4jTestStream extends TestStream: val writer = Rio.createWriter(RDFFormat.NQUADS, os) writer.startRDF() Flow[RdfStreamFrame] - .via(DecoderFlow.quadsToFlat) + .via(DecoderFlow.decodeQuads.asFlatQuadStream()) .toMat(Sink.foreach(st => writer.handleStatement(st)))(Keep.right) .mapMaterializedValue(f => f.map(_ => { writer.endRDF() @@ -68,7 +68,7 @@ case object Rdf4jTestStream extends TestStream: val writer = Rio.createWriter(RDFFormat.NQUADS, os) writer.startRDF() Flow[RdfStreamFrame] - .via(DecoderFlow.graphsAsQuadsToFlat) + .via(DecoderFlow.decodeGraphs.asFlatQuadStream()) .toMat(Sink.foreach(st => writer.handleStatement(st)))(Keep.right) .mapMaterializedValue(f => f.map(_ => { writer.endRDF() diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/Rdf4jReactiveSerDes.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/Rdf4jReactiveSerDes.scala index 7ef5ae46..e453c4b6 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/Rdf4jReactiveSerDes.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/Rdf4jReactiveSerDes.scala @@ -22,7 +22,7 @@ class Rdf4jReactiveSerDes(implicit mat: Materializer) extends NativeSerDes[Seq[S private def read(is: InputStream): Seq[Statement] = val f = JellyIo.fromIoStream(is) - .via(DecoderFlow.anyToFlat) + .via(DecoderFlow.decodeAny.asFlatStream) .runWith(Sink.seq) Await.result(f, 10.seconds) @@ -32,12 +32,12 @@ class Rdf4jReactiveSerDes(implicit mat: Materializer) extends NativeSerDes[Seq[S override def writeTriplesJelly(os: OutputStream, model: Seq[Statement], opt: RdfStreamOptions, frameSize: Int): Unit = val f = Source.fromIterator(() => model.iterator) - .via(EncoderFlow.fromFlatTriples(StreamRowCountLimiter(frameSize), opt)) + .via(EncoderFlow.flatTripleStream(StreamRowCountLimiter(frameSize), opt)) .runWith(JellyIo.toIoStream(os)) Await.ready(f, 10.seconds) override def writeQuadsJelly(os: OutputStream, dataset: Seq[Statement], opt: RdfStreamOptions, frameSize: Int): Unit = val f = Source.fromIterator(() => dataset.iterator) - .via(EncoderFlow.fromFlatQuads(StreamRowCountLimiter(frameSize), opt)) + .via(EncoderFlow.flatQuadStream(StreamRowCountLimiter(frameSize), opt)) .runWith(JellyIo.toIoStream(os)) Await.ready(f, 10.seconds) diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/Rdf4jSerDes.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/Rdf4jSerDes.scala index 5c0716b9..eaaab0ac 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/Rdf4jSerDes.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/Rdf4jSerDes.scala @@ -2,7 +2,7 @@ package eu.ostrzyciel.jelly.integration_tests.io import eu.ostrzyciel.jelly.convert.rdf4j.rio import eu.ostrzyciel.jelly.convert.rdf4j.rio.JellyWriterSettings -import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamOptions, RdfStreamType} +import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamOptions, PhysicalStreamType} import org.eclipse.rdf4j.model.Statement import org.eclipse.rdf4j.rio.helpers.StatementCollector import org.eclipse.rdf4j.rio.{RDFFormat, Rio} @@ -39,10 +39,10 @@ object Rdf4jSerDes extends NativeSerDes[Seq[Statement], Seq[Statement]]: writer.endRDF() override def writeTriplesJelly(os: OutputStream, model: Seq[Statement], opt: RdfStreamOptions, frameSize: Int): Unit = - write(os, model, opt.withStreamType(RdfStreamType.TRIPLES), frameSize) + write(os, model, opt.withPhysicalType(PhysicalStreamType.TRIPLES), frameSize) override def writeQuadsJelly(os: OutputStream, dataset: Seq[Statement], opt: RdfStreamOptions, frameSize: Int): Unit = - write(os, dataset, opt.withStreamType(RdfStreamType.QUADS), frameSize) + write(os, dataset, opt.withPhysicalType(PhysicalStreamType.QUADS), frameSize) diff --git a/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/JenaConverterFactory.scala b/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/JenaConverterFactory.scala index 00ea5914..beb09b82 100644 --- a/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/JenaConverterFactory.scala +++ b/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/JenaConverterFactory.scala @@ -8,6 +8,6 @@ import org.apache.jena.sparql.core.Quad object JenaConverterFactory extends ConverterFactory[JenaProtoEncoder, JenaDecoderConverter, Node, RDFDatatype, Triple, Quad]: - override protected def decoderConverter: JenaDecoderConverter = new JenaDecoderConverter() + override final def decoderConverter: JenaDecoderConverter = new JenaDecoderConverter() override final def encoder(options: RdfStreamOptions): JenaProtoEncoder = JenaProtoEncoder(options) diff --git a/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyWriter.scala b/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyWriter.scala index 0b75bbe4..bb687c78 100644 --- a/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyWriter.scala +++ b/jena/src/main/scala/eu/ostrzyciel/jelly/convert/jena/riot/JellyWriter.scala @@ -2,7 +2,7 @@ package eu.ostrzyciel.jelly.convert.jena.riot import eu.ostrzyciel.jelly.convert.jena.JenaConverterFactory import eu.ostrzyciel.jelly.core.Constants.* -import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamFrame, RdfStreamType} +import eu.ostrzyciel.jelly.core.proto.v1.{LogicalStreamType, PhysicalStreamType, RdfStreamFrame} import org.apache.jena.graph.Graph import org.apache.jena.riot.adapters.RDFWriterRIOT import org.apache.jena.riot.* @@ -33,7 +33,9 @@ final class JellyGraphWriter(opt: JellyFormatVariant) extends WriterGraphRIOTBas override def write(out: OutputStream, graph: Graph, prefixMap: PrefixMap, baseURI: String, context: Context): Unit = val encoder = JenaConverterFactory.encoder( - opt.opt.withStreamType(RdfStreamType.TRIPLES) + opt.opt + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.FLAT_TRIPLES) ) graph.find().asScala .flatMap(triple => encoder.addTripleStatement(triple)) @@ -53,7 +55,9 @@ final class JellyDatasetWriter(opt: JellyFormatVariant) extends WriterDatasetRIO out: OutputStream, dataset: DatasetGraph, prefixMap: PrefixMap, baseURI: String, context: Context ): Unit = val encoder = JenaConverterFactory.encoder( - opt.opt.withStreamType(RdfStreamType.QUADS) + opt.opt + .withPhysicalType(PhysicalStreamType.QUADS) + .withLogicalType(LogicalStreamType.FLAT_QUADS) ) dataset.find().asScala .flatMap(quad => encoder.addQuadStatement(quad)) diff --git a/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/Rdf4jConverterFactory.scala b/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/Rdf4jConverterFactory.scala index acc64682..48f4ac3e 100644 --- a/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/Rdf4jConverterFactory.scala +++ b/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/Rdf4jConverterFactory.scala @@ -6,6 +6,6 @@ import org.eclipse.rdf4j.model.{Statement, Value} object Rdf4jConverterFactory extends ConverterFactory[Rdf4jProtoEncoder, Rdf4jDecoderConverter, Value, Rdf4jDatatype, Statement, Statement]: - override protected def decoderConverter: Rdf4jDecoderConverter = new Rdf4jDecoderConverter() + override final def decoderConverter: Rdf4jDecoderConverter = new Rdf4jDecoderConverter() - override def encoder(options: RdfStreamOptions): Rdf4jProtoEncoder = Rdf4jProtoEncoder(options) + override final def encoder(options: RdfStreamOptions): Rdf4jProtoEncoder = Rdf4jProtoEncoder(options) diff --git a/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriter.scala b/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriter.scala index e8312d00..ceec8d4e 100644 --- a/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriter.scala +++ b/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriter.scala @@ -1,7 +1,7 @@ package eu.ostrzyciel.jelly.convert.rdf4j.rio import eu.ostrzyciel.jelly.convert.rdf4j.Rdf4jProtoEncoder -import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamFrame, RdfStreamOptions, RdfStreamRow} +import eu.ostrzyciel.jelly.core.proto.v1.{LogicalStreamType, RdfStreamFrame, RdfStreamOptions, RdfStreamRow} import org.eclipse.rdf4j.model.Statement import org.eclipse.rdf4j.rio.RioSetting import org.eclipse.rdf4j.rio.helpers.AbstractRDFWriter @@ -25,7 +25,7 @@ final class JellyWriter(out: OutputStream) extends AbstractRDFWriter: override def getSupportedSettings = val s = new util.HashSet[RioSetting[_]](super.getSupportedSettings) s.add(STREAM_NAME) - s.add(STREAM_TYPE) + s.add(PHYSICAL_TYPE) s.add(ALLOW_GENERALIZED_STATEMENTS) s.add(USE_REPEAT) s.add(ALLOW_RDF_STAR) @@ -37,28 +37,36 @@ final class JellyWriter(out: OutputStream) extends AbstractRDFWriter: override def startRDF(): Unit = super.startRDF() + val c = getWriterConfig + val pType = c.get(PHYSICAL_TYPE) + val lType = if pType.isTriples then + LogicalStreamType.FLAT_TRIPLES + else if pType.isQuads then + LogicalStreamType.FLAT_QUADS + else + throw new IllegalStateException(s"Unsupported stream type: ${options.physicalType}") + options = RdfStreamOptions( streamName = c.get(STREAM_NAME), - streamType = c.get(STREAM_TYPE), + physicalType = c.get(PHYSICAL_TYPE), generalizedStatements = c.get(ALLOW_GENERALIZED_STATEMENTS).booleanValue(), useRepeat = c.get(USE_REPEAT).booleanValue(), rdfStar = c.get(ALLOW_RDF_STAR).booleanValue(), maxNameTableSize = c.get(MAX_NAME_TABLE_SIZE).toInt, maxPrefixTableSize = c.get(MAX_PREFIX_TABLE_SIZE).toInt, maxDatatypeTableSize = c.get(MAX_DATATYPE_TABLE_SIZE).toInt, + logicalType = lType, ) frameSize = c.get(FRAME_SIZE).toLong encoder = Rdf4jProtoEncoder(options) override def consumeStatement(st: Statement): Unit = checkWritingStarted() - val rows = if options.streamType.isTriples then + val rows = if options.physicalType.isTriples then encoder.addTripleStatement(st) - else if options.streamType.isQuads then - encoder.addQuadStatement(st) else - throw new IllegalStateException(s"Unsupported stream type: ${options.streamType}") + encoder.addQuadStatement(st) buffer ++= rows if buffer.size >= frameSize then diff --git a/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriterSettings.scala b/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriterSettings.scala index 25bc0139..f10051a2 100644 --- a/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriterSettings.scala +++ b/rdf4j/src/main/scala/eu/ostrzyciel/jelly/convert/rdf4j/rio/JellyWriterSettings.scala @@ -1,6 +1,6 @@ package eu.ostrzyciel.jelly.convert.rdf4j.rio -import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamOptions, RdfStreamType} +import eu.ostrzyciel.jelly.core.proto.v1.{PhysicalStreamType, RdfStreamOptions} import org.eclipse.rdf4j.rio.WriterConfig import org.eclipse.rdf4j.rio.helpers.* @@ -9,7 +9,7 @@ object JellyWriterSettings: val c = new WriterConfig() c.set(FRAME_SIZE, frameSize) c.set(STREAM_NAME, opt.streamName) - c.set(STREAM_TYPE, opt.streamType) + c.set(PHYSICAL_TYPE, opt.physicalType) c.set(ALLOW_GENERALIZED_STATEMENTS, opt.generalizedStatements) c.set(USE_REPEAT, opt.useRepeat) c.set(MAX_NAME_TABLE_SIZE, opt.maxNameTableSize.toLong) @@ -29,10 +29,10 @@ object JellyWriterSettings: "" ) - val STREAM_TYPE = new ClassRioSetting[RdfStreamType]( - "eu.ostrzyciel.jelly.convert.rdf4j.rio.streamType", - "Stream type", - RdfStreamType.TRIPLES + val PHYSICAL_TYPE = new ClassRioSetting[PhysicalStreamType]( + "eu.ostrzyciel.jelly.convert.rdf4j.rio.physicalType", + "Physical stream type", + PhysicalStreamType.TRIPLES ) val ALLOW_GENERALIZED_STATEMENTS = new BooleanRioSetting( diff --git a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/DecoderFlow.scala b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/DecoderFlow.scala index e7790e0f..0e94c519 100644 --- a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/DecoderFlow.scala +++ b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/DecoderFlow.scala @@ -1,147 +1,66 @@ package eu.ostrzyciel.jelly.stream -import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame import eu.ostrzyciel.jelly.core.{ConverterFactory, ProtoDecoder} +import eu.ostrzyciel.jelly.core.proto.v1.* import org.apache.pekko.NotUsed -import org.apache.pekko.stream.scaladsl.Flow +import org.apache.pekko.stream.scaladsl.* -/** - * Methods for creating Pekko Streams flows for decoding protobuf into RDF statements. - */ -object DecoderFlow: - - /** - * A flow converting a stream of [[RdfStreamFrame]]s into a flat stream of RDF triple statements. - * RDF stream type: TRIPLES. - * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). - * @tparam TTriple Type of triple statements. - * @return Pekko Streams flow - */ - def triplesToFlat[TTriple](implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): - Flow[RdfStreamFrame, TTriple, NotUsed] = - flatStream(factory.triplesDecoder) - - /** - * A flow converting a stream of [[RdfStreamFrame]]s into a stream of iterables with RDF triple statements. - * Each iterable in the stream corresponds to one [[RdfStreamFrame]]. - * RDF stream type: TRIPLES. - * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). - * @tparam TTriple Type of triple statements. - * @return Pekko Streams flow - */ - def triplesToGrouped[TTriple](implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): - Flow[RdfStreamFrame, IterableOnce[TTriple], NotUsed] = - groupedStream(factory.triplesDecoder) +import scala.concurrent.Future - /** - * A flow converting a stream of [[RdfStreamFrame]]s into a flat stream of RDF quad statements. - * RDF stream type: QUADS. - * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). - * @tparam TQuad Type of quad statements. - * @return Pekko Streams flow - */ - def quadsToFlat[TQuad](implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): - Flow[RdfStreamFrame, TQuad, NotUsed] = - flatStream(factory.quadsDecoder) - - /** - * A flow converting a stream of [[RdfStreamFrame]]s into a stream of iterables with RDF quad statements. - * Each iterable in the stream corresponds to one [[RdfStreamFrame]]. - * RDF stream type: QUADS. - * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). - * @tparam TQuad Type of quad statements. - * @return Pekko Streams flow - */ - def quadsToGrouped[TQuad](implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): - Flow[RdfStreamFrame, IterableOnce[TQuad], NotUsed] = - groupedStream(factory.quadsDecoder) +object DecoderFlow: - /** - * A flow converting a graph stream of [[RdfStreamFrame]]s into a flat stream of RDF quad statements. - * RDF stream type: GRAPHS. - * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). - * @tparam TQuad Type of quad statements. - * @return Pekko Streams flow - */ - def graphsAsQuadsToFlat[TQuad](implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): - Flow[RdfStreamFrame, TQuad, NotUsed] = - flatStream(factory.graphsAsQuadsDecoder) + // *** Public API *** /** - * A flow converting a graphs stream of [[RdfStreamFrame]]s into a stream of iterables with RDF quad statements. - * Each iterable in the stream corresponds to one [[RdfStreamFrame]]. - * If you need each element in the stream to correspond to one graph, use [[graphsToFlat]] instead. - * RDF stream type: GRAPHS. - * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). - * @tparam TQuad Type of quad statements. - * @return Pekko Streams flow + * Decode the incoming [[RdfStreamFrame]]s as a Jelly stream of physical type TRIPLES. + * If the stream is not a TRIPLES stream, the decoding will fail. + * + * @return intermediate builder object for further configuration */ - def graphsAsQuadsToGrouped[TQuad](implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): - Flow[RdfStreamFrame, IterableOnce[TQuad], NotUsed] = - groupedStream(factory.graphsAsQuadsDecoder) + def decodeTriples: DecoderIngestFlowOps.TriplesIngestFlowOps.type = DecoderIngestFlowOps.TriplesIngestFlowOps /** - * A flow converting a graphs stream of [[RdfStreamFrame]]s into a stream of tuples (graph name; triples). - * Each element in the stream corresponds to exactly one RDF graph. - * RDF stream type: GRAPHS. - * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). - * @tparam TNode Type of RDF nodes. - * @tparam TTriple Type of triple statements. - * @return Pekko Streams flow + * Decode the incoming [[RdfStreamFrame]]s as a Jelly stream of physical type QUADS. + * If the stream is not a QUADS stream, the decoding will fail. + * + * @return intermediate builder object for further configuration */ - def graphsToFlat[TNode, TTriple](implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): - Flow[RdfStreamFrame, (TNode, Iterable[TTriple]), NotUsed] = - flatStream(factory.graphsDecoder) + def decodeQuads: DecoderIngestFlowOps.QuadsIngestFlowOps.type = DecoderIngestFlowOps.QuadsIngestFlowOps /** - * A flow converting a graphs stream of [[RdfStreamFrame]]s into a stream of iterables with tuples - * (graph name; triples). - * Each iterable in the stream corresponds to one [[RdfStreamFrame]]. - * RDF stream type: GRAPHS. - * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). - * @tparam TNode Type of RDF nodes. - * @tparam TTriple Type of triple statements. - * @return Pekko Streams flow + * Decode the incoming [[RdfStreamFrame]]s as a Jelly stream of physical type GRAPHS. + * If the stream is not a GRAPHS stream, the decoding will fail. + * + * @return intermediate builder object for further configuration */ - def graphsToGrouped[TNode, TTriple](implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): - Flow[RdfStreamFrame, IterableOnce[(TNode, Iterable[TTriple])], NotUsed] = - groupedStream(factory.graphsDecoder) + def decodeGraphs: DecoderIngestFlowOps.GraphsIngestFlowOps.type = DecoderIngestFlowOps.GraphsIngestFlowOps /** - * A flow converting any Jelly stream of [[RdfStreamFrame]]s into a flat stream of RDF statements (triples or quads). - * The type of RDF statements is determined by the stream type. + * Decode the incoming [[RdfStreamFrame]]s as a Jelly stream of any physical type. + * The type of RDF statements is determined by the stream type specified in the stream options header. * The stream must have a set stream type (UNSPECIFIED is not allowed) and the stream type must not change * during the stream. - * RDF stream type: TRIPLES, QUADS, GRAPHS. * - * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). - * @tparam TNode Type of RDF nodes. - * @tparam TTriple Type of triple statements. - * @tparam TQuad Type of quad statements. - * @return Pekko Streams flow + * @return intermediate builder object for further configuration */ - def anyToFlat[TNode, TTriple, TQuad](implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, TQuad]): - Flow[RdfStreamFrame, TTriple | TQuad, NotUsed] = - flatStream(factory.anyStatementDecoder) - + def decodeAny: DecoderIngestFlowOps.AnyIngestFlowOps.type = DecoderIngestFlowOps.AnyIngestFlowOps /** - * A flow converting any Jelly stream of [[RdfStreamFrame]]s into a stream of iterables with RDF statements - * (triples or quads). Each iterable in the stream corresponds to one [[RdfStreamFrame]]. - * The type of RDF statements is determined by the stream type. - * The stream must have a set stream type (UNSPECIFIED is not allowed) and the stream type must not change - * during the stream. - * RDF stream type: TRIPLES, QUADS, GRAPHS. - * - * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). - * @tparam TNode Type of RDF nodes. - * @tparam TTriple Type of triple statements. - * @tparam TQuad Type of quad statements. - * @return Pekko Streams flow + * Snoop the incoming stream for stream options and extract them to the materialized value. + * + * @return the materialized value is a future that will return the stream options when first encountered, or + * when the stream completes. */ - def anyToGrouped[TNode, TTriple, TQuad](implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, TQuad]): - Flow[RdfStreamFrame, IterableOnce[TTriple | TQuad], NotUsed] = - groupedStream(factory.anyStatementDecoder) + def snoopStreamOptions: Flow[RdfStreamFrame, RdfStreamFrame, Future[Option[RdfStreamOptions]]] = + Flow[RdfStreamFrame].alsoToMat( + Flow[RdfStreamFrame] + .mapConcat(frame => { + frame.rows.filter(_.row.isOptions).map(_.row.options.get) + }) + .toMat(Sink.headOption)(Keep.right) + )(Keep.right) + + // *** Private API *** private def flatStream[TOut](decoder: ProtoDecoder[TOut]): Flow[RdfStreamFrame, TOut, NotUsed] = Flow[RdfStreamFrame] @@ -153,4 +72,219 @@ object DecoderFlow: Flow[RdfStreamFrame] .map(frame => { frame.rows.flatMap(decoder.ingestRow) - }) \ No newline at end of file + }) + + private sealed trait DecoderIngestFlowOps: + protected final inline def s(strict: Boolean, logicalType: LogicalStreamType): Option[LogicalStreamType] = + if strict then Some(logicalType) else None + + /** + * Flow operations for decoding Jelly streams of physical type TRIPLES. + */ + private object DecoderIngestFlowOps: + case object TriplesIngestFlowOps extends + DecoderIngestFlowOps, + InterpretableAs.FlatTripleStream, + InterpretableAs.GraphStream: + + override def asFlatTripleStream[TTriple](strict: Boolean = false) + (using factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): + Flow[RdfStreamFrame, TTriple, NotUsed] = + flatStream(factory.triplesDecoder(s(strict, LogicalStreamType.FLAT_TRIPLES))) + + override def asGraphStream[TTriple](strict: Boolean = false) + (using factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): + Flow[RdfStreamFrame, IterableOnce[TTriple], NotUsed] = + groupedStream(factory.triplesDecoder(s(strict, LogicalStreamType.GRAPHS))) + + end TriplesIngestFlowOps + + /** + * Flow operations for decoding Jelly streams of physical type QUADS. + */ + case object QuadsIngestFlowOps extends + DecoderIngestFlowOps, + InterpretableAs.FlatQuadStream, + InterpretableAs.DatasetStreamOfQuads: + + override def asFlatQuadStream[TQuad](strict: Boolean = false) + (using factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): + Flow[RdfStreamFrame, TQuad, NotUsed] = + flatStream(factory.quadsDecoder(s(strict, LogicalStreamType.FLAT_QUADS))) + + override def asDatasetStreamOfQuads[TQuad](strict: Boolean = false) + (using factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): + Flow[RdfStreamFrame, IterableOnce[TQuad], NotUsed] = + groupedStream(factory.quadsDecoder(s(strict, LogicalStreamType.DATASETS))) + + end QuadsIngestFlowOps + + /** + * Flow operations for decoding Jelly streams of physical type GRAPHS. + */ + case object GraphsIngestFlowOps extends + DecoderIngestFlowOps, + InterpretableAs.FlatQuadStream, + InterpretableAs.DatasetStreamOfQuads, + InterpretableAs.DatasetStream: + + override def asFlatQuadStream[TQuad](strict: Boolean = false) + (using factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): + Flow[RdfStreamFrame, TQuad, NotUsed] = + flatStream(factory.graphsAsQuadsDecoder(s(strict, LogicalStreamType.FLAT_QUADS))) + + override def asDatasetStreamOfQuads[TQuad](strict: Boolean = false) + (using factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): + Flow[RdfStreamFrame, IterableOnce[TQuad], NotUsed] = + groupedStream(factory.graphsAsQuadsDecoder(s(strict, LogicalStreamType.DATASETS))) + + override def asDatasetStream[TNode, TTriple](strict: Boolean = false) + (using factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): + Flow[RdfStreamFrame, IterableOnce[(TNode, Iterable[TTriple])], NotUsed] = + groupedStream(factory.graphsDecoder(s(strict, LogicalStreamType.DATASETS))) + + override def asNamedGraphStream[TNode, TTriple](strict: Boolean = false) + (using factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): + Flow[RdfStreamFrame, (TNode, Iterable[TTriple]), NotUsed] = + flatStream(factory.graphsDecoder(s(strict, LogicalStreamType.NAMED_GRAPHS))) + + end GraphsIngestFlowOps + + /** + * Flow operations for decoding Jelly streams of any physical type. + */ + case object AnyIngestFlowOps extends + DecoderIngestFlowOps, + InterpretableAs.AnyStream: + + override def asGroupedStream[TNode, TTriple, TQuad] + (implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, TQuad]): + Flow[RdfStreamFrame, IterableOnce[TTriple | TQuad], NotUsed] = + groupedStream(factory.anyStatementDecoder) + + override def asFlatStream[TTriple, TQuad] + (implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, TQuad]): + Flow[RdfStreamFrame, TTriple | TQuad, NotUsed] = + flatStream(factory.anyStatementDecoder) + + + private object InterpretableAs: + trait FlatTripleStream: + /** + * Interpret the incoming stream as a flat RDF triple stream from RDF-STaX. + * + * @param strict If true, the incoming stream must have its logical type set to FLAT_TRIPLES or its subtype, + * otherwise the decoding will fail. + * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). + * @tparam TTriple Type of triple statements. + * @return Pekko Streams flow + */ + def asFlatTripleStream[TTriple](strict: Boolean = false)(using factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): + Flow[RdfStreamFrame, TTriple, NotUsed] + + trait FlatQuadStream: + /** + * Interpret the incoming stream as a flat RDF quad stream from RDF-STaX. + * + * @param strict If true, the incoming stream must have its logical type set to FLAT_QUADS or its subtype, + * otherwise the decoding will fail. + * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). + * @tparam TQuad Type of quad statements. + * @return Pekko Streams flow + */ + def asFlatQuadStream[TQuad](strict: Boolean = false)(using factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): + Flow[RdfStreamFrame, TQuad, NotUsed] + + trait GraphStream: + /** + * Interpret the incoming stream as an RDF graph stream from RDF-STaX. + * Each iterable (graph) in the output stream corresponds to one incoming [[RdfStreamFrame]]. + * + * @param strict If true, the incoming stream must have its logical type set to GRAPHS or its subtype, + * otherwise the decoding will fail. + * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). + * @tparam TTriple Type of triple statements. + * @return Pekko Streams flow + */ + def asGraphStream[TTriple](strict: Boolean = false)(using factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): + Flow[RdfStreamFrame, IterableOnce[TTriple], NotUsed] + + trait DatasetStreamOfQuads: + /** + * Interpret the incoming stream as an RDF dataset stream from RDF-STaX. + * Each iterable (dataset) in the output stream corresponds to one incoming [[RdfStreamFrame]]. + * The dataset is represented as a sequence of quads. + * + * @param strict If true, the incoming stream must have its logical type set to DATASETS or its subtype, + * otherwise the decoding will fail. + * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). + * @tparam TQuad Type of quad statements. + * @return Pekko Streams flow + */ + def asDatasetStreamOfQuads[TQuad](strict: Boolean = false)(using factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): + Flow[RdfStreamFrame, IterableOnce[TQuad], NotUsed] + + trait DatasetStream: + /** + * Interpret the incoming stream as an RDF dataset stream from RDF-STaX. + * Each iterable (dataset) in the output stream corresponds to one incoming [[RdfStreamFrame]]. + * The dataset is represented as a sequence of triples grouped by the graph node. + * + * @param strict If true, the incoming stream must have its logical type set to DATASETS or its subtype, + * otherwise the decoding will fail. + * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). + * @tparam TNode Type of graph node. + * @tparam TTriple Type of triple statements. + * @return Pekko Streams flow + */ + def asDatasetStream[TNode, TTriple](strict: Boolean = false) + (using factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): + Flow[RdfStreamFrame, IterableOnce[(TNode, Iterable[TTriple])], NotUsed] + + /** + * Interpret the incoming stream as an RDF dataset stream from RDF-STaX and then flatten it. + * The borders between stream frames are ignored and the triples are grouped by the graph node. + * The dataset is represented as a sequence of triples grouped by the graph node. + * + * @param strict If true, the incoming stream must have its logical type set to DATASETS or its subtype, + * otherwise the decoding will fail. + * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). + * @tparam TNode Type of graph node. + * @tparam TTriple Type of triple statements. + * @return Pekko Streams flow + */ + def asNamedGraphStream[TNode, TTriple](strict: Boolean = false) + (using factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): + Flow[RdfStreamFrame, (TNode, Iterable[TTriple]), NotUsed] + + trait AnyStream: + /** + * Interpret the incoming stream as any grouped RDF stream from RDF stax. + * The type of RDF statements is determined by the stream type specified in the stream options header. + * The stream must have a set stream type (UNSPECIFIED is not allowed) and the stream type must not change + * during the stream. + * + * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). + * @tparam TNode Type of graph node. + * @tparam TTriple Type of triple statements. + * @tparam TQuad Type of quad statements. + * @return Pekko Streams flow + */ + def asGroupedStream[TNode, TTriple, TQuad] + (implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, TQuad]): + Flow[RdfStreamFrame, IterableOnce[TTriple | TQuad], NotUsed] + + /** + * Interpret the incoming stream as any flat RDF stream from RDF stax. + * The type of RDF statements is determined by the stream type specified in the stream options header. + * The stream must have a set stream type (UNSPECIFIED is not allowed) and the stream type must not change + * during the stream. + * + * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). + * @tparam TTriple Type of triple statements. + * @tparam TQuad Type of quad statements. + * @return Pekko Streams flow + */ + def asFlatStream[TTriple, TQuad] + (implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, TQuad]): + Flow[RdfStreamFrame, TTriple | TQuad, NotUsed] diff --git a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderFlow.scala b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderFlow.scala index 68b116ed..764635bc 100644 --- a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderFlow.scala +++ b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderFlow.scala @@ -7,55 +7,58 @@ import org.apache.pekko.stream.scaladsl.{Flow, Source} /** * Factory of encoder flows for Jelly streams. - * When using these methods, you don't have to set the streamType property of [[RdfStreamOptions]]. - * It will be set automatically. These methods will also ensure that the produced stream is more-or-less valid - * (that it adheres to the appropriate stream type). + * When using these methods, you don't have to set the physicalType and logicalType properties of [[RdfStreamOptions]]. + * They will be set automatically. You can set the logical stream type manually, though. + * + * These methods will also ensure that the produced stream is more-or-less valid + * (that it adheres to the appropriate physical and logical stream type). */ object EncoderFlow: /** * A flow converting a flat stream of triple statements into a stream of [[RdfStreamFrame]]s. - * RDF stream type: TRIPLES. + * Physical stream type: TRIPLES. + * Logical stream type (RDF-STaX): flat RDF triple stream (FLAT_TRIPLES). * * This flow will wait for enough items to fill the whole gRPC message, which increases latency. To mitigate that, - * use the [[fromGroupedTriples]] method instead. + * use the [[flatTripleStreamGrouped]] method instead. + * * @param limiter frame size limiter (see [[SizeLimiter]]). * @param opt Jelly serialization options. * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). * @tparam TTriple Type of triple statements. * @return Pekko Streams flow. */ - final def fromFlatTriples[TTriple](limiter: SizeLimiter, opt: RdfStreamOptions) + final def flatTripleStream[TTriple](limiter: SizeLimiter, opt: RdfStreamOptions) (implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): Flow[TTriple, RdfStreamFrame, NotUsed] = - val encoder = factory.encoder( - opt.withStreamType(RdfStreamType.TRIPLES) - ) + val encoder = factory.encoder(makeOptions(opt, PhysicalStreamType.TRIPLES, LogicalStreamType.FLAT_TRIPLES)) flatFlow(e => encoder.addTripleStatement(e), limiter) /** * A flow converting a flat stream of quad statements into a stream of [[RdfStreamFrame]]s. - * RDF stream type: QUADS. + * Physical stream type: QUADS. + * Logical stream type (RDF-STaX): flat RDF quad stream (FLAT_QUADS). * * This flow will wait for enough items to fill the whole gRPC message, which increases latency. To mitigate that, - * use the [[fromGroupedQuads]] method instead. + * use the [[flatQuadStreamGrouped]] method instead. + * * @param limiter frame size limiter (see [[SizeLimiter]]) * @param opt Jelly serialization options. * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). * @tparam TQuad Type of quad statements. * @return Pekko Streams flow. */ - final def fromFlatQuads[TQuad](limiter: SizeLimiter, opt: RdfStreamOptions) + final def flatQuadStream[TQuad](limiter: SizeLimiter, opt: RdfStreamOptions) (implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): Flow[TQuad, RdfStreamFrame, NotUsed] = - val encoder = factory.encoder( - opt.withStreamType(RdfStreamType.QUADS) - ) + val encoder = factory.encoder(makeOptions(opt, PhysicalStreamType.QUADS, LogicalStreamType.FLAT_QUADS)) flatFlow(e => encoder.addQuadStatement(e), limiter) /** * A flow converting a stream of iterables with triple statements into a stream of [[RdfStreamFrame]]s. - * RDF stream type: TRIPLES. + * Physical stream type: TRIPLES. + * Logical stream type (RDF-STaX): flat RDF triple stream (FLAT_TRIPLES). * * After this flow finishes processing an iterable in the input stream, it is guaranteed to output an * [[RdfStreamFrame]], which allows to maintain low latency. @@ -67,17 +70,37 @@ object EncoderFlow: * @tparam TTriple Type of triple statements. * @return Pekko Streams flow. */ - final def fromGroupedTriples[TTriple](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) + final def flatTripleStreamGrouped[TTriple](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) (implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): Flow[IterableOnce[TTriple], RdfStreamFrame, NotUsed] = - val encoder = factory.encoder( - opt.withStreamType(RdfStreamType.TRIPLES) - ) + val encoder = factory.encoder(makeOptions(opt, PhysicalStreamType.TRIPLES, LogicalStreamType.FLAT_TRIPLES)) + groupedFlow(e => encoder.addTripleStatement(e), maybeLimiter) + + /** + * A flow converting a stream of graphs (iterables with triple statements) into a stream of [[RdfStreamFrame]]s. + * Physical stream type: TRIPLES. + * Logical stream type (RDF-STaX): RDF graph stream (GRAPHS). + * + * Each graph (iterable of triples) in the input stream is guaranteed to correspond to exactly one + * [[RdfStreamFrame]] in the output stream IF no frame size limiter is applied. + * + * @param maybeLimiter frame size limiter (see [[SizeLimiter]]). + * If None, no size limit is applied (frames are only split by graphs). + * @param opt Jelly serialization options. + * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). + * @tparam TTriple Type of triple statements. + * @return Pekko Streams flow. + */ + final def graphStream[TTriple](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) + (implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): + Flow[IterableOnce[TTriple], RdfStreamFrame, NotUsed] = + val encoder = factory.encoder(makeOptions(opt, PhysicalStreamType.TRIPLES, LogicalStreamType.GRAPHS)) groupedFlow(e => encoder.addTripleStatement(e), maybeLimiter) /** * A flow converting a stream of iterables with quad statements into a stream of [[RdfStreamFrame]]s. - * RDF stream type: QUADS. + * Physical stream type: QUADS. + * Logical stream type (RDF-STaX): flat RDF triple stream (FLAT_QUADS). * * After this flow finishes processing an iterable in the input stream, it is guaranteed to output an * [[RdfStreamFrame]], which allows to maintain low latency. @@ -89,22 +112,42 @@ object EncoderFlow: * @tparam TQuad Type of quad statements. * @return Pekko Streams flow. */ - final def fromGroupedQuads[TQuad](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) + final def flatQuadStreamGrouped[TQuad](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) (implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): Flow[IterableOnce[TQuad], RdfStreamFrame, NotUsed] = - val encoder = factory.encoder( - opt.withStreamType(RdfStreamType.QUADS) - ) + val encoder = factory.encoder(makeOptions(opt, PhysicalStreamType.QUADS, LogicalStreamType.FLAT_QUADS)) + groupedFlow(e => encoder.addQuadStatement(e), maybeLimiter) + + /** + * A flow converting a stream of datasets (iterables with quad statements) into a stream of [[RdfStreamFrame]]s. + * Physical stream type: QUADS. + * Logical stream type (RDF-STaX): RDF dataset stream (DATASETS). + * + * Each dataset (iterable of quads) in the input stream is guaranteed to correspond to exactly one + * [[RdfStreamFrame]] in the output stream IF no frame size limiter is applied. + * + * @param maybeLimiter frame size limiter (see [[SizeLimiter]]). + * If None, no size limit is applied (frames are only split by datasets). + * @param opt Jelly serialization options. + * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). + * @tparam TQuad Type of quad statements. + * @return Pekko Streams flow. + */ + final def datasetStreamFromQuads[TQuad](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) + (implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): + Flow[IterableOnce[TQuad], RdfStreamFrame, NotUsed] = + val encoder = factory.encoder(makeOptions(opt, PhysicalStreamType.QUADS, LogicalStreamType.DATASETS)) groupedFlow(e => encoder.addQuadStatement(e), maybeLimiter) /** * A flow converting a stream of named or unnamed graphs (node as graph name + iterable of triple statements) * into a stream of [[RdfStreamFrame]]s. Each element in the output stream may contain one graph or a part of * a graph (if the frame size limiter is used). Two different graphs will never occur in the same frame. - * RDF stream type: GRAPHS. + * Physical stream type: GRAPHS. + * Logical stream type (RDF-STaX): RDF named graph stream (NAMED_GRAPHS). * - * After this flow finishes processing a single graph in the input stream, it is guaranteed to output an - * [[RdfStreamFrame]], which allows to maintain low latency. + * Each graph in the input stream is guaranteed to correspond to exactly one [[RdfStreamFrame]] in the output + * stream IF no frame size limiter is applied. * * @param maybeLimiter frame size limiter (see [[SizeLimiter]]). * If None, no size limit is applied (frames are only split by graphs). @@ -114,24 +157,25 @@ object EncoderFlow: * @tparam TTriple Type of triple statements. * @return Pekko Streams flow. */ - final def fromGraphs[TNode, TTriple](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) + final def namedGraphStream[TNode, TTriple](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) (implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): Flow[(TNode, Iterable[TTriple]), RdfStreamFrame, NotUsed] = - val encoder = factory.encoder( - opt.withStreamType(RdfStreamType.GRAPHS) - ) + val encoder = factory.encoder(makeOptions(opt, PhysicalStreamType.GRAPHS, LogicalStreamType.NAMED_GRAPHS)) Flow[(TNode, Iterable[TTriple])] // Make each graph into a 1-element "group" .map(Seq(_)) .via(groupedFlow[(TNode, Iterable[TTriple])](graphAsIterable(encoder), maybeLimiter)) /** - * A flow converting a stream of iterables with named or unnamed graphs (node as graph name + iterable of triple - * statements) into a stream of [[RdfStreamFrame]]s. Each element in the output stream may contain multiple graphs, - * a single graph, or a part of a graph (if the frame size limiter is used). + * A flow converting a stream of datasets (iterables with named or unnamed graphs: node as graph name + + * iterable of triple statements) into a stream of [[RdfStreamFrame]]s. Each element in the output stream may + * contain multiple graphs, a single graph, or a part of a graph (if the frame size limiter is used). + * Physical stream type: GRAPHS. + * Logical stream type (RDF-STaX): RDF dataset stream (DATASETS). + * + * Each dataset in the input stream is guaranteed to correspond to exactly one [[RdfStreamFrame]] in the output + * stream IF no frame size limiter is applied. * - * After this flow finishes processing an iterable in the input stream, it is guaranteed to output an - * [[RdfStreamFrame]], which allows to maintain low latency. * @param maybeLimiter frame size limiter (see [[SizeLimiter]]). * If None, no size limit is applied (frames are only split by groups). * @param opt Jelly serialization options. @@ -140,14 +184,24 @@ object EncoderFlow: * @tparam TTriple Type of triple statements. * @return Pekko Streams flow. */ - final def fromGroupedGraphs[TNode, TTriple](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) + final def datasetStream[TNode, TTriple](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) (implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): Flow[IterableOnce[(TNode, Iterable[TTriple])], RdfStreamFrame, NotUsed] = - val encoder = factory.encoder( - opt.withStreamType(RdfStreamType.GRAPHS) - ) + val encoder = factory.encoder(makeOptions(opt, PhysicalStreamType.GRAPHS, LogicalStreamType.DATASETS)) groupedFlow[(TNode, Iterable[TTriple])](graphAsIterable(encoder), maybeLimiter) + + // PRIVATE API + + /** + * Make Jelly options while preserving the user-set logical stream type. + */ + private def makeOptions(opt: RdfStreamOptions, pst: PhysicalStreamType, lst: LogicalStreamType): RdfStreamOptions = + opt.copy( + physicalType = pst, + logicalType = if opt.logicalType.isUnspecified then lst else opt.logicalType + ) + private def graphAsIterable[TEncoder <: ProtoEncoder[TNode, TTriple, ?, ?], TNode, TTriple](encoder: TEncoder): ((TNode, Iterable[TTriple])) => Iterable[RdfStreamRow] = (graphName: TNode, triples: Iterable[TTriple]) => diff --git a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderSource.scala b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderSource.scala index 5dad63f0..e3bae462 100644 --- a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderSource.scala +++ b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderSource.scala @@ -10,7 +10,8 @@ object EncoderSource: /** * A source of RDF stream frames from an RDF graph implementation. - * RDF stream type: TRIPLES. + * Physical stream type: TRIPLES. + * Logical stream type (RDF-STaX): flat RDF triple stream (FLAT_TRIPLES). * * @param graph the RDF graph to be streamed * @param limiter frame size limiter (see [[SizeLimiter]]) @@ -25,11 +26,12 @@ object EncoderSource: (implicit adapter: IterableAdapter[?, TTriple, ?, TGraph, ?], factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): Source[RdfStreamFrame, NotUsed] = Source(adapter.asTriples(graph)) - .via(fromFlatTriples(limiter, opt)) + .via(flatTripleStream(limiter, opt)) /** * A source of RDF stream frames from an RDF dataset implementation (quads format). - * RDF stream type: QUADS. + * Physical stream type: QUADS. + * Logical stream type (RDF-STaX): flat RDF quad stream (FLAT_QUADS). * * @param dataset the RDF dataset to be streamed * @param limiter frame size limiter (see [[SizeLimiter]]) @@ -44,11 +46,12 @@ object EncoderSource: (implicit adapter: IterableAdapter[?, ?, TQuad, ?, TDataset], factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): Source[RdfStreamFrame, NotUsed] = Source(adapter.asQuads(dataset)) - .via(fromFlatQuads(limiter, opt)) + .via(flatQuadStream(limiter, opt)) /** * A source of RDF stream frames from an RDF dataset implementation (graphs format). * RDF stream type: GRAPHS. + * Logical stream type (RDF-STaX): flat RDF quad stream (FLAT_QUADS). * * @param dataset the RDF dataset to be streamed * @param maybeLimiter frame size limiter (see [[SizeLimiter]]). @@ -67,4 +70,4 @@ object EncoderSource: factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): Source[RdfStreamFrame, NotUsed] = Source(adapter.asGraphs(dataset)) - .via(fromGraphs(maybeLimiter, opt)) + .via(namedGraphStream(maybeLimiter, opt)) diff --git a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/JellyOptionsFromTypesafe.scala b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/JellyOptionsFromTypesafe.scala index aa2303a6..13f26942 100644 --- a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/JellyOptionsFromTypesafe.scala +++ b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/JellyOptionsFromTypesafe.scala @@ -1,7 +1,7 @@ package eu.ostrzyciel.jelly.stream import com.typesafe.config.{Config, ConfigFactory} -import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamOptions, RdfStreamType} +import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamOptions, PhysicalStreamType} /** * Convenience methods for building Jelly's options ([[RdfStreamOptions]]) from [[com.typesafe.Config]]. @@ -35,12 +35,12 @@ object JellyOptionsFromTypesafe: def fromTypesafeConfig(config: Config): RdfStreamOptions = val merged = config.withFallback(defaultConfig) RdfStreamOptions( - streamType = ( + physicalType = ( merged.getString("stream-type") match - case "UNSPECIFIED" => RdfStreamType.UNSPECIFIED - case "TRIPLES" => RdfStreamType.TRIPLES - case "QUADS" => RdfStreamType.QUADS - case "GRAPHS" => RdfStreamType.GRAPHS + case "UNSPECIFIED" => PhysicalStreamType.UNSPECIFIED + case "TRIPLES" => PhysicalStreamType.TRIPLES + case "QUADS" => PhysicalStreamType.QUADS + case "GRAPHS" => PhysicalStreamType.GRAPHS case _ => throw IllegalArgumentException() ), generalizedStatements = merged.getBoolean("generalized-statements"), diff --git a/stream/src/test/scala/eu/ostrzyciel/jelly/stream/DecoderFlowSpec.scala b/stream/src/test/scala/eu/ostrzyciel/jelly/stream/DecoderFlowSpec.scala index 3ef9491e..3262d187 100644 --- a/stream/src/test/scala/eu/ostrzyciel/jelly/stream/DecoderFlowSpec.scala +++ b/stream/src/test/scala/eu/ostrzyciel/jelly/stream/DecoderFlowSpec.scala @@ -1,5 +1,6 @@ package eu.ostrzyciel.jelly.stream +import eu.ostrzyciel.jelly.core.* import eu.ostrzyciel.jelly.core.helpers.Assertions.* import eu.ostrzyciel.jelly.core.helpers.MockConverterFactory import eu.ostrzyciel.jelly.core.proto.v1.* @@ -16,30 +17,33 @@ class DecoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: implicit val converterFactory: MockConverterFactory.type = MockConverterFactory implicit val actorSystem: ActorSystem = ActorSystem() - "triplesToFlat" should { + "decodeTriples.asFlatTripleStream" should { for n <- Seq(1, 2, 100) do s"decode triples, frame size: $n" in { val encoded = Triples1.encodedFull( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES), + JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.FLAT_TRIPLES), n, ) val decoded: Seq[Triple] = Source(encoded) - .via(DecoderFlow.triplesToFlat) + .via(DecoderFlow.decodeTriples.asFlatTripleStream(true)) .toMat(Sink.seq)(Keep.right) .run().futureValue assertDecoded(decoded, Triples1.mrl) } - "decode triples (norepeat)" in { + "decode triples (norepeat), with options snooping" in { val encoded = Triples2NoRepeat.encodedFull( JellyOptions.smallGeneralized - .withStreamType(RdfStreamType.TRIPLES) + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.FLAT_TRIPLES) .withUseRepeat(false), 100, ) val decoded: Seq[Triple] = Source(encoded) - .via(DecoderFlow.triplesToFlat) + .via(DecoderFlow.decodeTriples.asFlatTripleStream(true)) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -47,15 +51,45 @@ class DecoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: } } - "triplesToGrouped" should { + "snoopStreamOptions with decodeTriples.asFlatTripleStream" should { + "decode triples (norepeat), with options snooping" in { + val encoded = Triples2NoRepeat.encodedFull( + JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.FLAT_TRIPLES) + .withUseRepeat(false), + 100, + ) + val (optionsF, decodedF) = Source(encoded) + .viaMat(DecoderFlow.snoopStreamOptions)(Keep.right) + .via(DecoderFlow.decodeTriples.asFlatTripleStream(true)) + .toMat(Sink.seq)(Keep.both) + .run() + + assertDecoded(decodedF.futureValue, Triples2NoRepeat.mrl) + val options = optionsF.futureValue + options.isDefined should be (true) + options.get.useRepeat should be (false) + options.get.logicalType should be (LogicalStreamType.FLAT_TRIPLES) + options.get.physicalType should be (PhysicalStreamType.TRIPLES) + + // Basic tests on logical stream type extensions + options.get.logicalType.getRdfStaxType.isDefined should be (true) + options.get.logicalType.getRdfStaxAnnotation(null)(using converterFactory).size should be (3) + } + } + + "decodeTriples.asGraphStream" should { for n <- Seq(1, 2, 100) do - s"decode triples as groups, frame size: $n" in { + s"decode triples as graphs, frame size: $n" in { val encoded = Triples1.encodedFull( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES), + JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.GRAPHS), n, ) val decoded: Seq[Seq[Triple]] = Source(encoded) - .via(DecoderFlow.triplesToGrouped) + .via(DecoderFlow.decodeTriples.asGraphStream(true)) .map(_.iterator.toSeq) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -65,15 +99,17 @@ class DecoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: } } - "quadsToFlat" should { + "decodeQuads.asFlatQuadStream" should { for n <- Seq(1, 2, 100) do s"decode quads, frame size: $n" in { val encoded = Quads1.encodedFull( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.QUADS), + JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.QUADS) + .withLogicalType(LogicalStreamType.FLAT_QUADS), n, ) val decoded: Seq[Quad] = Source(encoded) - .via(DecoderFlow.quadsToFlat) + .via(DecoderFlow.decodeQuads.asFlatQuadStream(true)) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -81,15 +117,17 @@ class DecoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: } } - "quadsToGrouped" should { + "decodeQuads.asDatasetStreamOfQuads" should { for n <- Seq(1, 2, 100) do s"decode quads as groups, frame size: $n" in { val encoded = Quads1.encodedFull( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.QUADS), + JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.QUADS) + .withLogicalType(LogicalStreamType.DATASETS), n, ) val decoded: Seq[Seq[Quad]] = Source(encoded) - .via(DecoderFlow.quadsToGrouped) + .via(DecoderFlow.decodeQuads.asDatasetStreamOfQuads(true)) .map(_.iterator.toSeq) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -99,15 +137,17 @@ class DecoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: } } - "graphsAsQuadsToFlat" should { + "decodeGraphs.asFlatQuadStream" should { for n <- Seq(1, 2, 100) do s"decode graphs as quads, frame size: $n" in { val encoded = Graphs1.encodedFull( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS), + JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.GRAPHS) + .withLogicalType(LogicalStreamType.FLAT_QUADS), n, ) val decoded: Seq[Quad] = Source(encoded) - .via(DecoderFlow.graphsAsQuadsToFlat) + .via(DecoderFlow.decodeGraphs.asFlatQuadStream(true)) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -115,15 +155,17 @@ class DecoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: } } - "graphsAsQuadsToGrouped" should { + "decodeGraphs.asDatasetStreamOfQuads" should { for n <- Seq(1, 2, 100) do - s"decode graphs as quads (grouped), frame size: $n" in { + s"decode graphs as datasets, frame size: $n" in { val encoded = Graphs1.encodedFull( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS), + JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.GRAPHS) + .withLogicalType(LogicalStreamType.DATASETS), n, ) val decoded: Seq[Seq[Quad]] = Source(encoded) - .via(DecoderFlow.graphsAsQuadsToGrouped) + .via(DecoderFlow.decodeGraphs.asDatasetStreamOfQuads(true)) .map(_.iterator.toSeq) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -133,15 +175,17 @@ class DecoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: } } - "graphsToFlat" should { + "decodeGraphs.asNamedGraphStream" should { for n <- Seq(1, 2, 100) do - s"decode graphs, frame size: $n" in { + s"decode named graphs, frame size: $n" in { val encoded = Graphs1.encodedFull( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS), + JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.GRAPHS) + .withLogicalType(LogicalStreamType.NAMED_GRAPHS), n, ) val decoded: Seq[(Node, Iterable[Triple])] = Source(encoded) - .via(DecoderFlow.graphsToFlat) + .via(DecoderFlow.decodeGraphs.asNamedGraphStream(true)) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -152,15 +196,17 @@ class DecoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: } } - "graphsToGrouped" should { + "decodeGraphs.asDatasetStream" should { for n <- Seq(1, 2, 100) do - s"decode graphs as groups, frame size: $n" in { + s"decode graphs as datasets, frame size: $n" in { val encoded = Graphs1.encodedFull( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS), + JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.GRAPHS) + .withLogicalType(LogicalStreamType.DATASETS), n, ) val decoded: Seq[Seq[(Node, Iterable[Triple])]] = Source(encoded) - .via(DecoderFlow.graphsToGrouped) + .via(DecoderFlow.decodeGraphs.asDatasetStream(true)) .map(_.iterator.toSeq) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -171,21 +217,21 @@ class DecoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: } val anyCases = Seq( - (Triples1, Triples1.mrl, RdfStreamType.TRIPLES, "triples"), - (Quads1, Quads1.mrl, RdfStreamType.QUADS, "quads"), - (Graphs1, Graphs1.mrlQuads, RdfStreamType.GRAPHS, "graphs"), + (Triples1, Triples1.mrl, PhysicalStreamType.TRIPLES, "triples"), + (Quads1, Quads1.mrl, PhysicalStreamType.QUADS, "quads"), + (Graphs1, Graphs1.mrlQuads, PhysicalStreamType.GRAPHS, "graphs"), ) - "anyToFlat" should { + "decodeAny.asFlatStream" should { for (testCase, mrl, streamType, name) <- anyCases do for n <- Seq(1, 2, 100) do s"decode $name stream to flat, frame size: $n" in { val encoded = testCase.encodedFull( - JellyOptions.smallGeneralized.withStreamType(streamType), + JellyOptions.smallGeneralized.withPhysicalType(streamType), n, ) val decoded = Source(encoded) - .via(DecoderFlow.anyToFlat) + .via(DecoderFlow.decodeAny.asFlatStream) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -193,16 +239,16 @@ class DecoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: } } - "anyToGrouped" should { + "decodeAny.asGroupedStream" should { for (testCase, mrl, streamType, name) <- anyCases do for n <- Seq(1, 2, 100) do s"decode $name stream to grouped, frame size: $n" in { val encoded = testCase.encodedFull( - JellyOptions.smallGeneralized.withStreamType(streamType), + JellyOptions.smallGeneralized.withPhysicalType(streamType), n, ) val decoded = Source(encoded) - .via(DecoderFlow.anyToGrouped) + .via(DecoderFlow.decodeAny.asGroupedStream) .map(_.iterator.toSeq) .toMat(Sink.seq)(Keep.right) .run().futureValue diff --git a/stream/src/test/scala/eu/ostrzyciel/jelly/stream/EncoderFlowSpec.scala b/stream/src/test/scala/eu/ostrzyciel/jelly/stream/EncoderFlowSpec.scala index aab7ac92..dd9e654e 100644 --- a/stream/src/test/scala/eu/ostrzyciel/jelly/stream/EncoderFlowSpec.scala +++ b/stream/src/test/scala/eu/ostrzyciel/jelly/stream/EncoderFlowSpec.scala @@ -19,42 +19,51 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: implicit val converterFactory: MockConverterFactory.type = MockConverterFactory implicit val actorSystem: ActorSystem = ActorSystem() - "fromFlatTriples" should { + "flatTripleStream" should { "encode triples" in { val encoded: Seq[RdfStreamFrame] = Source(Triples1.mrl) - .via(EncoderFlow.fromFlatTriples(StreamRowCountLimiter(1000), JellyOptions.smallGeneralized)) + .via(EncoderFlow.flatTripleStream(StreamRowCountLimiter(1000), JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue assertEncoded( encoded.flatMap(_.rows), - Triples1.encoded(JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES)) + Triples1.encoded(JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.FLAT_TRIPLES) + ) ) encoded.size should be (1) } "encode triples with max message size" in { val encoded: Seq[RdfStreamFrame] = Source(Triples1.mrl) - .via(EncoderFlow.fromFlatTriples(ByteSizeLimiter(80), JellyOptions.smallGeneralized)) + .via(EncoderFlow.flatTripleStream(ByteSizeLimiter(80), JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue assertEncoded( encoded.flatMap(_.rows), - Triples1.encoded(JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES)) + Triples1.encoded(JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.FLAT_TRIPLES) + ) ) encoded.size should be (3) } "encode triples with max row count" in { val encoded: Seq[RdfStreamFrame] = Source(Triples1.mrl) - .via(EncoderFlow.fromFlatTriples(StreamRowCountLimiter(4), JellyOptions.smallGeneralized)) + .via(EncoderFlow.flatTripleStream(StreamRowCountLimiter(4), JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue assertEncoded( encoded.flatMap(_.rows), - Triples1.encoded(JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES)) + Triples1.encoded(JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.FLAT_TRIPLES) + ) ) encoded.size should be (4) } @@ -62,29 +71,35 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: "encode triples (norepeat)" in { val jOptions = JellyOptions.smallGeneralized.withUseRepeat(false) val encoded: Seq[RdfStreamFrame] = Source(Triples2NoRepeat.mrl) - .via(EncoderFlow.fromFlatTriples(StreamRowCountLimiter(1000), jOptions)) + .via(EncoderFlow.flatTripleStream(StreamRowCountLimiter(1000), jOptions)) .toMat(Sink.seq)(Keep.right) .run().futureValue assertEncoded( encoded.flatMap(_.rows), - Triples2NoRepeat.encoded(jOptions.withStreamType(RdfStreamType.TRIPLES)) + Triples2NoRepeat.encoded(jOptions + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.FLAT_TRIPLES) + ) ) encoded.size should be (1) } } - "fromGroupedTriples" should { + "flatTripleStreamGrouped" should { "encode grouped triples" in { val encoded: Seq[RdfStreamFrame] = Source(Triples1.mrl) .grouped(2) - .via(EncoderFlow.fromGroupedTriples(None, JellyOptions.smallGeneralized)) + .via(EncoderFlow.flatTripleStreamGrouped(None, JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue assertEncoded( encoded.flatMap(_.rows), - Triples1.encoded(JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES)) + Triples1.encoded(JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.FLAT_TRIPLES) + ) ) encoded.size should be (2) encoded.head.rows.count(_.row.isTriple) should be (2) @@ -94,13 +109,16 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: "encode grouped triples with max row count" in { val encoded: Seq[RdfStreamFrame] = Source(Triples1.mrl) .grouped(2) - .via(EncoderFlow.fromGroupedTriples(Some(StreamRowCountLimiter(4)), JellyOptions.smallGeneralized)) + .via(EncoderFlow.flatTripleStreamGrouped(Some(StreamRowCountLimiter(4)), JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue assertEncoded( encoded.flatMap(_.rows), - Triples1.encoded(JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES)) + Triples1.encoded(JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.FLAT_TRIPLES) + ) ) encoded.size should be (4) encoded.head.rows.count(_.row.isTriple) should be (0) @@ -110,32 +128,59 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: } } - "fromFlatQuads" should { + "graphStream" should { + "encode graphs" in { + val encoded: Seq[RdfStreamFrame] = Source(Triples1.mrl) + .grouped(2) + .via(EncoderFlow.graphStream(None, JellyOptions.smallGeneralized)) + .toMat(Sink.seq)(Keep.right) + .run().futureValue + + assertEncoded( + encoded.flatMap(_.rows), + Triples1.encoded(JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.TRIPLES) + .withLogicalType(LogicalStreamType.GRAPHS) + ) + ) + encoded.size should be(2) + encoded.head.rows.count(_.row.isTriple) should be(2) + encoded(1).rows.count(_.row.isTriple) should be(2) + } + } + + "flatQuadStream" should { "encode quads" in { val encoded: Seq[RdfStreamFrame] = Source(Quads1.mrl) - .via(EncoderFlow.fromFlatQuads(StreamRowCountLimiter(1000), JellyOptions.smallGeneralized)) + .via(EncoderFlow.flatQuadStream(StreamRowCountLimiter(1000), JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue assertEncoded( encoded.flatMap(_.rows), - Quads1.encoded(JellyOptions.smallGeneralized.withStreamType(RdfStreamType.QUADS)) + Quads1.encoded(JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.QUADS) + .withLogicalType(LogicalStreamType.FLAT_QUADS) + ) ) encoded.size should be (1) } } - "fromGroupedQuads" should { + "flatQuadStreamGrouped" should { "encode grouped quads" in { val encoded: Seq[RdfStreamFrame] = Source(Quads1.mrl) .grouped(2) - .via(EncoderFlow.fromGroupedQuads(None, JellyOptions.smallGeneralized)) + .via(EncoderFlow.flatQuadStreamGrouped(None, JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue assertEncoded( encoded.flatMap(_.rows), - Quads1.encoded(JellyOptions.smallGeneralized.withStreamType(RdfStreamType.QUADS)) + Quads1.encoded(JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.QUADS) + .withLogicalType(LogicalStreamType.FLAT_QUADS) + ) ) encoded.size should be (2) encoded.head.rows.count(_.row.isQuad) should be (2) @@ -143,60 +188,93 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: } } - "fromGraphs" should { - "encode graphs" in { + "datasetStreamFromQuads" should { + "encode datasets" in { + val encoded: Seq[RdfStreamFrame] = Source(Quads1.mrl) + .grouped(2) + .via(EncoderFlow.datasetStreamFromQuads(None, JellyOptions.smallGeneralized)) + .toMat(Sink.seq)(Keep.right) + .run().futureValue + + assertEncoded( + encoded.flatMap(_.rows), + Quads1.encoded(JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.QUADS) + .withLogicalType(LogicalStreamType.DATASETS) + ) + ) + encoded.size should be(2) + encoded.head.rows.count(_.row.isQuad) should be(2) + encoded(1).rows.count(_.row.isQuad) should be(2) + } + } + + "namedGraphStream" should { + "encode named graphs" in { val encoded: Seq[RdfStreamFrame] = Source(Graphs1.mrl) - .via(EncoderFlow.fromGraphs(None, JellyOptions.smallGeneralized)) + .via(EncoderFlow.namedGraphStream(None, JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue assertEncoded( encoded.flatMap(_.rows), - Graphs1.encoded(JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS)) + Graphs1.encoded(JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.GRAPHS) + .withLogicalType(LogicalStreamType.NAMED_GRAPHS) + ) ) encoded.size should be (2) } - "encode graphs with max row count" in { + "encode named graphs with max row count" in { val encoded: Seq[RdfStreamFrame] = Source(Graphs1.mrl) - .via(EncoderFlow.fromGraphs(Some(StreamRowCountLimiter(4)), JellyOptions.smallGeneralized)) + .via(EncoderFlow.namedGraphStream(Some(StreamRowCountLimiter(4)), JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue assertEncoded( encoded.flatMap(_.rows), - Graphs1.encoded(JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS)) + Graphs1.encoded(JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.GRAPHS) + .withLogicalType(LogicalStreamType.NAMED_GRAPHS) + ) ) // 1 additional split due to split by graph encoded.size should be (5) } } - "fromGroupedGraphs" should { - "encode grouped graphs" in { + "datasetStream" should { + "encode datasets" in { val encoded: Seq[RdfStreamFrame] = Source(Graphs1.mrl) .grouped(2) - .via(EncoderFlow.fromGroupedGraphs(None, JellyOptions.smallGeneralized)) + .via(EncoderFlow.datasetStream(None, JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue assertEncoded( encoded.flatMap(_.rows), - Graphs1.encoded(JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS)) + Graphs1.encoded(JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.GRAPHS) + .withLogicalType(LogicalStreamType.DATASETS) + ) ) encoded.size should be (1) } - "encode grouped graphs with max row count" in { + "encode datasets with max row count" in { val encoded: Seq[RdfStreamFrame] = Source(Graphs1.mrl) .grouped(2) - .via(EncoderFlow.fromGroupedGraphs(Some(StreamRowCountLimiter(4)), JellyOptions.smallGeneralized)) + .via(EncoderFlow.datasetStream(Some(StreamRowCountLimiter(4)), JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue assertEncoded( encoded.flatMap(_.rows), - Graphs1.encoded(JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS)) + Graphs1.encoded(JellyOptions.smallGeneralized + .withPhysicalType(PhysicalStreamType.GRAPHS) + .withLogicalType(LogicalStreamType.DATASETS) + ) ) encoded.size should be (4) } diff --git a/stream/src/test/scala/eu/ostrzyciel/jelly/stream/JellyIoSpec.scala b/stream/src/test/scala/eu/ostrzyciel/jelly/stream/JellyIoSpec.scala index a622f451..1472be0f 100644 --- a/stream/src/test/scala/eu/ostrzyciel/jelly/stream/JellyIoSpec.scala +++ b/stream/src/test/scala/eu/ostrzyciel/jelly/stream/JellyIoSpec.scala @@ -1,6 +1,6 @@ package eu.ostrzyciel.jelly.stream -import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamFrame, RdfStreamType} +import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamFrame, PhysicalStreamType} import eu.ostrzyciel.jelly.core.{JellyOptions, ProtoTestCases} import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.scaladsl.* @@ -16,23 +16,23 @@ class JellyIoSpec extends AnyWordSpec, Matchers, ScalaFutures: val cases = Seq( ("triples, frame size 1", Triples1.encodedFull( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.TRIPLES), 1, )), ("triples, frame size 20", Triples1.encodedFull( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.TRIPLES), 20, )), ("triples (norepeat), frame size 5", Triples2NoRepeat.encodedFull( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.TRIPLES), 5 )), ("quads, frame size 6", Quads1.encodedFull( - JellyOptions.smallGeneralized.withStreamType(RdfStreamType.QUADS), + JellyOptions.smallGeneralized.withPhysicalType(PhysicalStreamType.QUADS), 6, )), ("graphs, frame size 3", Graphs1.encodedFull( - JellyOptions.bigGeneralized.withStreamType(RdfStreamType.GRAPHS), + JellyOptions.bigGeneralized.withPhysicalType(PhysicalStreamType.GRAPHS), 3, )) ) diff --git a/stream/src/test/scala/eu/ostrzyciel/jelly/stream/JellyOptionsFromTypesafeSpec.scala b/stream/src/test/scala/eu/ostrzyciel/jelly/stream/JellyOptionsFromTypesafeSpec.scala index 688a179b..0472d79f 100644 --- a/stream/src/test/scala/eu/ostrzyciel/jelly/stream/JellyOptionsFromTypesafeSpec.scala +++ b/stream/src/test/scala/eu/ostrzyciel/jelly/stream/JellyOptionsFromTypesafeSpec.scala @@ -1,7 +1,7 @@ package eu.ostrzyciel.jelly.stream import com.typesafe.config.ConfigFactory -import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamType +import eu.ostrzyciel.jelly.core.proto.v1.PhysicalStreamType import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec @@ -9,7 +9,7 @@ class JellyOptionsFromTypesafeSpec extends AnyWordSpec, Matchers: "JellyOptionsFromTypesafe" should { "produce defaults for empty input" in { val opt = JellyOptionsFromTypesafe.fromTypesafeConfig(ConfigFactory.empty()) - opt.streamType should be (RdfStreamType.UNSPECIFIED) + opt.physicalType should be (PhysicalStreamType.UNSPECIFIED) opt.generalizedStatements should be (false) opt.useRepeat should be (true) opt.maxNameTableSize should be (128) @@ -28,7 +28,7 @@ class JellyOptionsFromTypesafeSpec extends AnyWordSpec, Matchers: |jelly.dt-table-size = 8 |""".stripMargin) val opt = JellyOptionsFromTypesafe.fromTypesafeConfig(conf.getConfig("jelly")) - opt.streamType should be (RdfStreamType.GRAPHS) + opt.physicalType should be (PhysicalStreamType.GRAPHS) opt.generalizedStatements should be (true) opt.useRepeat should be (false) opt.rdfStar should be (true) @@ -44,7 +44,7 @@ class JellyOptionsFromTypesafeSpec extends AnyWordSpec, Matchers: |jelly.prefix-table-size = 64 |""".stripMargin) val opt = JellyOptionsFromTypesafe.fromTypesafeConfig(conf.getConfig("jelly")) - opt.streamType should be (RdfStreamType.QUADS) + opt.physicalType should be (PhysicalStreamType.QUADS) opt.generalizedStatements should be (false) opt.useRepeat should be (true) opt.rdfStar should be (false)