Skip to content

Commit

Permalink
Add DecoderFlow.graphsToGrouped (#48)
Browse files Browse the repository at this point in the history
This introduces the graphsToGrouped method which works in the same way
as other *ToGrouped methods in DecoderFlow.
  • Loading branch information
Ostrzyciel authored Oct 17, 2023
1 parent 8afe462 commit 980697d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
14 changes: 14 additions & 0 deletions stream/src/main/scala/eu/ostrzyciel/jelly/stream/DecoderFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,20 @@ object DecoderFlow:
Flow[RdfStreamFrame, (TNode, Iterable[TTriple]), NotUsed] =
flatStream(factory.graphsDecoder)

/**
* A flow converting a graphs stream of [[RdfStreamFrame]]s into a stream of iterables with tuples
* (graph name; triples).
* Each iterable in the stream corresponds to one [[RdfStreamFrame]].
* RDF stream type: GRAPHS.
* @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory).
* @tparam TNode Type of RDF nodes.
* @tparam TTriple Type of triple statements.
* @return Pekko Streams flow
*/
def graphsToGrouped[TNode, TTriple](implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]):
Flow[RdfStreamFrame, IterableOnce[(TNode, Iterable[TTriple])], NotUsed] =
groupedStream(factory.graphsDecoder)

/**
* A flow converting any Jelly stream of [[RdfStreamFrame]]s into a flat stream of RDF statements (triples or quads).
* The type of RDF statements is determined by the stream type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,24 @@ class DecoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures:
}
}

"graphsToGrouped" should {
for n <- Seq(1, 2, 100) do
s"decode graphs as groups, frame size: $n" in {
val encoded = Graphs1.encodedFull(
JellyOptions.smallGeneralized.withStreamType(RdfStreamType.GRAPHS),
n,
)
val decoded: Seq[Seq[(Node, Iterable[Triple])]] = Source(encoded)
.via(DecoderFlow.graphsToGrouped)
.map(_.iterator.toSeq)
.toMat(Sink.seq)(Keep.right)
.run().futureValue

assertDecoded(decoded.flatten.flatMap(_._2), Graphs1.mrl.flatMap(_._2))
decoded.size should be (encoded.size)
}
}

val anyCases = Seq(
(Triples1, Triples1.mrl, RdfStreamType.TRIPLES, "triples"),
(Quads1, Quads1.mrl, RdfStreamType.QUADS, "quads"),
Expand Down

0 comments on commit 980697d

Please sign in to comment.