diff --git a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/DecoderFlow.scala b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/DecoderFlow.scala index b4115c01..e7790e0f 100644 --- a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/DecoderFlow.scala +++ b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/DecoderFlow.scala @@ -93,6 +93,20 @@ object DecoderFlow: Flow[RdfStreamFrame, (TNode, Iterable[TTriple]), NotUsed] = flatStream(factory.graphsDecoder) + /** + * A flow converting a graphs stream of [[RdfStreamFrame]]s into a stream of iterables with tuples + * (graph name; triples). + * Each iterable in the stream corresponds to one [[RdfStreamFrame]]. + * RDF stream type: GRAPHS. + * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). + * @tparam TNode Type of RDF nodes. + * @tparam TTriple Type of triple statements. + * @return Pekko Streams flow + */ + def graphsToGrouped[TNode, TTriple](implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): + Flow[RdfStreamFrame, IterableOnce[(TNode, Iterable[TTriple])], NotUsed] = + groupedStream(factory.graphsDecoder) + /** * A flow converting any Jelly stream of [[RdfStreamFrame]]s into a flat stream of RDF statements (triples or quads). * The type of RDF statements is determined by the stream type. diff --git a/stream/src/test/scala/eu/ostrzyciel/jelly/stream/DecoderFlowSpec.scala b/stream/src/test/scala/eu/ostrzyciel/jelly/stream/DecoderFlowSpec.scala index 32a715ed..3ef9491e 100644 --- a/stream/src/test/scala/eu/ostrzyciel/jelly/stream/DecoderFlowSpec.scala +++ b/stream/src/test/scala/eu/ostrzyciel/jelly/stream/DecoderFlowSpec.scala @@ -152,6 +152,24 @@ class DecoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: } } + "graphsToGrouped" should { + for n <- Seq(1, 2, 100) do + s"decode graphs as groups, frame size: $n" in { + val encoded = Graphs1.encodedFull( + JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS), + n, + ) + val decoded: Seq[Seq[(Node, Iterable[Triple])]] = Source(encoded) + .via(DecoderFlow.graphsToGrouped) + .map(_.iterator.toSeq) + .toMat(Sink.seq)(Keep.right) + .run().futureValue + + assertDecoded(decoded.flatten.flatMap(_._2), Graphs1.mrl.flatMap(_._2)) + decoded.size should be (encoded.size) + } + } + val anyCases = Seq( (Triples1, Triples1.mrl, RdfStreamType.TRIPLES, "triples"), (Quads1, Quads1.mrl, RdfStreamType.QUADS, "quads"),