diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/CrossStreamingSpec.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/CrossStreamingSpec.scala index df113005..c3ba0f28 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/CrossStreamingSpec.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/CrossStreamingSpec.scala @@ -56,10 +56,12 @@ class CrossStreamingSpec extends AnyWordSpec, Matchers, ScalaFutures: ("big default", JellyOptions.bigGeneralized), ) - private val streamingOptions: Seq[(String, EncoderFlow.Options)] = Seq( - ("message size: 32_000", EncoderFlow.Options(32_000)), - ("message size: 500", EncoderFlow.Options(500)), - ("message size: 2_000_000", EncoderFlow.Options(2_000_000)), + private val sizeLimiters: Seq[(String, SizeLimiter)] = Seq( + ("message byte size: 32_000", ByteSizeLimiter(32_000)), + ("message byte size: 500", ByteSizeLimiter(500)), + ("message byte size: 2_000_000", ByteSizeLimiter(2_000_000)), + ("stream row count: 5", StreamRowCountLimiter(5)), + ("stream row count: 200", StreamRowCountLimiter(200)), ) final case class CaseKey(streamType: String, encoder: String, jOpt: String, sOpt: String, caseName: String) @@ -93,8 +95,8 @@ class CrossStreamingSpec extends AnyWordSpec, Matchers, ScalaFutures: s"$encName encoder" when { for (decName, decFlow) <- implementations do for (jOptName, jOpt) <- jellyOptions do - for (sOptName, sOpt) <- streamingOptions do - s"streaming to a $decName decoder, $jOptName, $sOptName" should { + for (limiterName, limiter) <- sizeLimiters do + s"streaming to a $decName decoder, $jOptName, $limiterName" should { // Triples for (caseName, sourceFile) <- TripleTests.files do val sourceGraph = TripleTests.graphs(caseName) @@ -102,13 +104,13 @@ class CrossStreamingSpec extends AnyWordSpec, Matchers, ScalaFutures: val is = new FileInputStream(sourceFile) val os = new ByteArrayOutputStream() var encSize = 0 - encFlow.tripleSource(is, sOpt, jOpt) + encFlow.tripleSource(is, limiter, jOpt) .wireTap(f => encSize += f.serializedSize) .toMat(decFlow.tripleSink(os))(Keep.right) .run() .futureValue - val ck = CaseKey("triples", encName, jOptName, sOptName, caseName) + val ck = CaseKey("triples", encName, jOptName, limiterName, caseName) encodedSizes(ck) = encSize val resultGraph = RDFParser.source(new ByteArrayInputStream(os.toByteArray)) .lang(Lang.TURTLE) @@ -129,13 +131,13 @@ class CrossStreamingSpec extends AnyWordSpec, Matchers, ScalaFutures: val is = new FileInputStream(sourceFile) val os = new ByteArrayOutputStream() var encSize = 0 - encFlow.quadSource(is, sOpt, jOpt) + encFlow.quadSource(is, limiter, jOpt) .wireTap(f => encSize += f.serializedSize) .toMat(decFlow.quadSink(os))(Keep.right) .run() .futureValue - val ck = CaseKey("quads", encName, jOptName, sOptName, caseName) + val ck = CaseKey("quads", encName, jOptName, limiterName, caseName) encodedSizes(ck) = encSize val resultDataset = RDFParser.source(new ByteArrayInputStream(os.toByteArray)) .lang(Lang.NQ) @@ -147,13 +149,13 @@ class CrossStreamingSpec extends AnyWordSpec, Matchers, ScalaFutures: val is = new FileInputStream(sourceFile) val os = new ByteArrayOutputStream() var encSize = 0 - encFlow.graphSource(is, sOpt, jOpt) + encFlow.graphSource(is, limiter, jOpt) .wireTap(f => encSize += f.serializedSize) .toMat(decFlow.graphSink(os))(Keep.right) .run() .futureValue - val ck = CaseKey("graphs", encName, jOptName, sOptName, caseName) + val ck = CaseKey("graphs", encName, jOptName, limiterName, caseName) encodedSizes(ck) = encSize val resultDataset = RDFParser.source(new ByteArrayInputStream(os.toByteArray)) .lang(Lang.NQ) diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/JenaTestStream.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/JenaTestStream.scala index 11225fe3..8341ecd3 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/JenaTestStream.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/JenaTestStream.scala @@ -1,7 +1,7 @@ package eu.ostrzyciel.jelly.integration_tests import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamFrame, RdfStreamOptions} -import eu.ostrzyciel.jelly.stream.{DecoderFlow, EncoderFlow} +import eu.ostrzyciel.jelly.stream.* import org.apache.jena.graph.{Node, Triple} import org.apache.jena.riot.system.AsyncParser import org.apache.jena.riot.{Lang, RDFDataMgr, RDFParser} @@ -14,15 +14,15 @@ import scala.jdk.CollectionConverters.* case object JenaTestStream extends TestStream: import eu.ostrzyciel.jelly.convert.jena.* - override def tripleSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions) = + override def tripleSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = Source.fromIterator(() => AsyncParser.asyncParseTriples(is, Lang.NT, "").asScala) - .via(EncoderFlow.fromFlatTriples(streamOpt, jellyOpt)) + .via(EncoderFlow.fromFlatTriples(limiter, jellyOpt)) - override def quadSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions) = + override def quadSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = Source.fromIterator(() => AsyncParser.asyncParseQuads(is, Lang.NQUADS, "").asScala) - .via(EncoderFlow.fromFlatQuads(streamOpt, jellyOpt)) + .via(EncoderFlow.fromFlatQuads(limiter, jellyOpt)) - override def graphSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions) = + override def graphSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = val ds = RDFParser.source(is) .lang(Lang.NQ) .toDatasetGraph @@ -31,7 +31,7 @@ case object JenaTestStream extends TestStream: Iterator((null, Iterable.from(ds.getDefaultGraph.find.asScala))) ).filter((_, g) => g.nonEmpty) Source.fromIterator(() => graphs) - .via(EncoderFlow.fromGraphs(streamOpt, jellyOpt)) + .via(EncoderFlow.fromGraphs(Some(limiter), jellyOpt)) override def tripleSink(os: OutputStream)(implicit ec: ExecutionContext) = Flow[RdfStreamFrame] diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/Rdf4jTestStream.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/Rdf4jTestStream.scala index 408dfd3f..0c7c8c3c 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/Rdf4jTestStream.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/Rdf4jTestStream.scala @@ -1,7 +1,7 @@ package eu.ostrzyciel.jelly.integration_tests import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamFrame, RdfStreamOptions} -import eu.ostrzyciel.jelly.stream.{DecoderFlow, EncoderFlow} +import eu.ostrzyciel.jelly.stream.* import org.apache.pekko.Done import org.apache.pekko.stream.scaladsl.* import org.eclipse.rdf4j.rio.* @@ -14,7 +14,7 @@ import scala.jdk.CollectionConverters.* case object Rdf4jTestStream extends TestStream: import eu.ostrzyciel.jelly.convert.rdf4j.* - override def tripleSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions) = + override def tripleSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = // This buffers everything in memory... but I'm too lazy to implement my own RDFHandler for this // RDF4J at the moment only has two formats with RDF-star support – Turtle and Trig. val parser = Rio.createParser(RDFFormat.TURTLESTAR) @@ -22,17 +22,17 @@ case object Rdf4jTestStream extends TestStream: parser.setRDFHandler(collector) parser.parse(is) Source.fromIterator(() => collector.getStatements.asScala.iterator) - .via(EncoderFlow.fromFlatTriples(streamOpt, jellyOpt)) + .via(EncoderFlow.fromFlatTriples(limiter, jellyOpt)) - override def quadSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions) = + override def quadSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = val parser = Rio.createParser(RDFFormat.NQUADS) val collector = new StatementCollector() parser.setRDFHandler(collector) parser.parse(is) Source.fromIterator(() => collector.getStatements.asScala.iterator) - .via(EncoderFlow.fromFlatQuads(streamOpt, jellyOpt)) + .via(EncoderFlow.fromFlatQuads(limiter, jellyOpt)) - override def graphSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions) = + override def graphSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = val parser = Rio.createParser(RDFFormat.NQUADS) val collector = new StatementCollector() parser.setRDFHandler(collector) @@ -40,7 +40,7 @@ case object Rdf4jTestStream extends TestStream: val graphs = collector.getStatements.asScala.toSeq .groupBy(_.getContext) Source.fromIterator(() => graphs.iterator) - .via(EncoderFlow.fromGraphs(streamOpt, jellyOpt)) + .via(EncoderFlow.fromGraphs(Some(limiter), jellyOpt)) override def tripleSink(os: OutputStream)(implicit ec: ExecutionContext) = val writer = Rio.createWriter(RDFFormat.TURTLESTAR, os) diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/TestStream.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/TestStream.scala index b996d0c6..34a33bf2 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/TestStream.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/TestStream.scala @@ -1,7 +1,7 @@ package eu.ostrzyciel.jelly.integration_tests import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamFrame, RdfStreamOptions} -import eu.ostrzyciel.jelly.stream.EncoderFlow +import eu.ostrzyciel.jelly.stream.* import org.apache.pekko.stream.scaladsl.* import org.apache.pekko.{Done, NotUsed} @@ -9,13 +9,13 @@ import java.io.{InputStream, OutputStream} import scala.concurrent.{ExecutionContext, Future} trait TestStream: - def tripleSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions): + def tripleSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions): Source[RdfStreamFrame, NotUsed] - def quadSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions): + def quadSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions): Source[RdfStreamFrame, NotUsed] - def graphSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions): + def graphSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions): Source[RdfStreamFrame, NotUsed] def tripleSink(os: OutputStream)(implicit ec: ExecutionContext): Sink[RdfStreamFrame, Future[Done]] diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/JenaReactiveSerDes.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/JenaReactiveSerDes.scala index 2b1d0cdd..ff851706 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/JenaReactiveSerDes.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/JenaReactiveSerDes.scala @@ -2,7 +2,7 @@ package eu.ostrzyciel.jelly.integration_tests.io import eu.ostrzyciel.jelly.convert.jena.* import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions -import eu.ostrzyciel.jelly.stream.{EncoderFlow, EncoderSource, JellyIo} +import eu.ostrzyciel.jelly.stream.* import org.apache.jena.query.Dataset import org.apache.jena.rdf.model.Model import org.apache.pekko.stream.Materializer @@ -25,12 +25,14 @@ class JenaReactiveSerDes(implicit mat: Materializer) extends NativeSerDes[Model, def writeQuadsJelly (os: OutputStream, dataset: Dataset, opt: RdfStreamOptions, frameSize: Int): Unit = - val f = EncoderSource.fromDatasetAsQuads(dataset, EncoderFlow.Options(), opt)(jenaIterableAdapter, jenaConverterFactory) + val f = EncoderSource.fromDatasetAsQuads(dataset, ByteSizeLimiter(32_000), opt) + (jenaIterableAdapter, jenaConverterFactory) .runWith(JellyIo.toIoStream(os)) Await.ready(f, 10.seconds) def writeTriplesJelly (os: OutputStream, model: Model, opt: RdfStreamOptions, frameSize: Int): Unit = - val f = EncoderSource.fromGraph(model, EncoderFlow.Options(), opt)(jenaIterableAdapter, jenaConverterFactory) + val f = EncoderSource.fromGraph(model, ByteSizeLimiter(32_000), opt) + (jenaIterableAdapter, jenaConverterFactory) .runWith(JellyIo.toIoStream(os)) Await.ready(f, 10.seconds) diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/Rdf4jReactiveSerDes.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/Rdf4jReactiveSerDes.scala index d5fc012c..7ef5ae46 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/Rdf4jReactiveSerDes.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/Rdf4jReactiveSerDes.scala @@ -2,7 +2,7 @@ package eu.ostrzyciel.jelly.integration_tests.io import eu.ostrzyciel.jelly.convert.rdf4j.Rdf4jConverterFactory import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions -import eu.ostrzyciel.jelly.stream.{DecoderFlow, EncoderFlow, JellyIo} +import eu.ostrzyciel.jelly.stream.* import org.apache.pekko.stream.Materializer import org.apache.pekko.stream.scaladsl.* import org.eclipse.rdf4j.model.Statement @@ -32,12 +32,12 @@ class Rdf4jReactiveSerDes(implicit mat: Materializer) extends NativeSerDes[Seq[S override def writeTriplesJelly(os: OutputStream, model: Seq[Statement], opt: RdfStreamOptions, frameSize: Int): Unit = val f = Source.fromIterator(() => model.iterator) - .via(EncoderFlow.fromFlatTriples(EncoderFlow.Options(frameSize * 10), opt)) + .via(EncoderFlow.fromFlatTriples(StreamRowCountLimiter(frameSize), opt)) .runWith(JellyIo.toIoStream(os)) Await.ready(f, 10.seconds) override def writeQuadsJelly(os: OutputStream, dataset: Seq[Statement], opt: RdfStreamOptions, frameSize: Int): Unit = val f = Source.fromIterator(() => dataset.iterator) - .via(EncoderFlow.fromFlatQuads(EncoderFlow.Options(frameSize * 10), opt)) + .via(EncoderFlow.fromFlatQuads(StreamRowCountLimiter(frameSize), opt)) .runWith(JellyIo.toIoStream(os)) Await.ready(f, 10.seconds) diff --git a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderFlow.scala b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderFlow.scala index 62c13659..90c47a56 100644 --- a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderFlow.scala +++ b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderFlow.scala @@ -13,22 +13,6 @@ import org.apache.pekko.stream.scaladsl.{Flow, Source} * (that it adheres to the appropriate stream type). */ object EncoderFlow: - object Options: - /** - * Build streaming options from the application's config. - * @param config app config - * @return stream options - */ - def apply(config: Config): Options = - Options( - config.getInt("jelly.stream.target-message-size"), - ) - - /** - * @param targetMessageSize Target message size in bytes. - * After the message gets bigger than the target, it gets sent. - */ - final case class Options(targetMessageSize: Int = 32_000) /** * A flow converting a flat stream of triple statements into a stream of [[RdfStreamFrame]]s. @@ -36,19 +20,19 @@ object EncoderFlow: * * This flow will wait for enough items to fill the whole gRPC message, which increases latency. To mitigate that, * use the [[fromGroupedTriples]] method instead. - * @param opt Streaming options. - * @param streamOpt Jelly serialization options. + * @param limiter frame size limiter (see [[SizeLimiter]]). + * @param opt Jelly serialization options. * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). * @tparam TTriple Type of triple statements. * @return Pekko Streams flow. */ - final def fromFlatTriples[TTriple] - (opt: Options, streamOpt: RdfStreamOptions)(implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): + final def fromFlatTriples[TTriple](limiter: SizeLimiter, opt: RdfStreamOptions) + (implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): Flow[TTriple, RdfStreamFrame, NotUsed] = val encoder = factory.encoder( - streamOpt.withStreamType(RdfStreamType.TRIPLES) + opt.withStreamType(RdfStreamType.TRIPLES) ) - flatFlow(Flow[TTriple].mapConcat(e => encoder.addTripleStatement(e)), opt) + flatFlow(e => encoder.addTripleStatement(e), limiter) /** * A flow converting a flat stream of quad statements into a stream of [[RdfStreamFrame]]s. @@ -56,19 +40,19 @@ object EncoderFlow: * * This flow will wait for enough items to fill the whole gRPC message, which increases latency. To mitigate that, * use the [[fromGroupedQuads]] method instead. - * @param opt Streaming options. - * @param streamOpt Jelly serialization options. + * @param limiter frame size limiter (see [[SizeLimiter]]) + * @param opt Jelly serialization options. * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). * @tparam TQuad Type of quad statements. * @return Pekko Streams flow. */ - final def fromFlatQuads[TQuad] - (opt: Options, streamOpt: RdfStreamOptions)(implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): + final def fromFlatQuads[TQuad](limiter: SizeLimiter, opt: RdfStreamOptions) + (implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): Flow[TQuad, RdfStreamFrame, NotUsed] = val encoder = factory.encoder( - streamOpt.withStreamType(RdfStreamType.QUADS) + opt.withStreamType(RdfStreamType.QUADS) ) - flatFlow(Flow[TQuad].mapConcat(e => encoder.addQuadStatement(e)), opt) + flatFlow(e => encoder.addQuadStatement(e), limiter) /** * A flow converting a stream of iterables with triple statements into a stream of [[RdfStreamFrame]]s. @@ -76,19 +60,21 @@ object EncoderFlow: * * After this flow finishes processing an iterable in the input stream, it is guaranteed to output an * [[RdfStreamFrame]], which allows to maintain low latency. - * @param opt Streaming options. - * @param streamOpt Jelly serialization options. + * + * @param maybeLimiter frame size limiter (see [[SizeLimiter]]). + * If None, no size limit is applied (frames are only split by groups). + * @param opt Jelly serialization options. * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). * @tparam TTriple Type of triple statements. * @return Pekko Streams flow. */ - final def fromGroupedTriples[TTriple] - (opt: Options, streamOpt: RdfStreamOptions)(implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): + final def fromGroupedTriples[TTriple](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) + (implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): Flow[IterableOnce[TTriple], RdfStreamFrame, NotUsed] = val encoder = factory.encoder( - streamOpt.withStreamType(RdfStreamType.TRIPLES) + opt.withStreamType(RdfStreamType.TRIPLES) ) - groupedFlow(Flow[TTriple].mapConcat(e => encoder.addTripleStatement(e)), opt) + groupedFlow(e => encoder.addTripleStatement(e), maybeLimiter) /** * A flow converting a stream of iterables with quad statements into a stream of [[RdfStreamFrame]]s. @@ -96,61 +82,85 @@ object EncoderFlow: * * After this flow finishes processing an iterable in the input stream, it is guaranteed to output an * [[RdfStreamFrame]], which allows to maintain low latency. - * @param opt Streaming options. - * @param streamOpt Jelly serialization options. + * + * @param maybeLimiter frame size limiter (see [[SizeLimiter]]). + * If None, no size limit is applied (frames are only split by groups). + * @param opt Jelly serialization options. * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). * @tparam TQuad Type of quad statements. * @return Pekko Streams flow. */ - final def fromGroupedQuads[TQuad] - (opt: Options, streamOpt: RdfStreamOptions)(implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): + final def fromGroupedQuads[TQuad](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) + (implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): Flow[IterableOnce[TQuad], RdfStreamFrame, NotUsed] = val encoder = factory.encoder( - streamOpt.withStreamType(RdfStreamType.QUADS) + opt.withStreamType(RdfStreamType.QUADS) ) - groupedFlow(Flow[TQuad].mapConcat(e => encoder.addQuadStatement(e)), opt) + groupedFlow(e => encoder.addQuadStatement(e), maybeLimiter) /** - * A flow converting a stream of named graphs (node as graph name + iterable of triple statements) into a stream - * of [[RdfStreamFrame]]s. + * A flow converting a stream of named or unnamed graphs (node as graph name + iterable of triple statements) + * into a stream of [[RdfStreamFrame]]s. * RDF stream type: GRAPHS. * * After this flow finishes processing a single graph in the input stream, it is guaranteed to output an * [[RdfStreamFrame]], which allows to maintain low latency. - * @param opt Streaming options. - * @param streamOpt Jelly serialization options. + * + * @param maybeLimiter frame size limiter (see [[SizeLimiter]]). + * If None, no size limit is applied (frames are only split by graphs). + * @param opt Jelly serialization options. * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). * @tparam TNode Type of nodes. * @tparam TTriple Type of triple statements. * @return Pekko Streams flow. */ - final def fromGraphs[TNode, TTriple] - (opt: Options, streamOpt: RdfStreamOptions)(implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): + final def fromGraphs[TNode, TTriple](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) + (implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): Flow[(TNode, Iterable[TTriple]), RdfStreamFrame, NotUsed] = val encoder = factory.encoder( - streamOpt.withStreamType(RdfStreamType.GRAPHS) + opt.withStreamType(RdfStreamType.GRAPHS) ) - Flow[(TNode, Iterable[TTriple])] - .flatMapConcat { (graphName: TNode, triples: Iterable[TTriple]) => - val it: Iterable[RdfStreamRow] = encoder.startGraph(graphName) - .concat(triples.flatMap(triple => encoder.addTripleStatement(triple))) - .concat(encoder.endGraph()) - Source.fromIterator(() => it.iterator) - .groupedWeighted(opt.targetMessageSize)(row => row.serializedSize) - .map(rows => RdfStreamFrame(rows)) - } + inline def graphAsIterable(graphName: TNode, triples: Iterable[TTriple]): Iterable[RdfStreamRow] = + encoder.startGraph(graphName) + .concat(triples.flatMap(triple => encoder.addTripleStatement(triple))) + .concat(encoder.endGraph()) + + maybeLimiter match + case Some(limiter) => + Flow[(TNode, Iterable[TTriple])] + .flatMapConcat { (graphName: TNode, triples: Iterable[TTriple]) => + Source.fromIterator(() => graphAsIterable(graphName, triples).iterator) + .via(limiter.flow) + .map(rows => RdfStreamFrame(rows)) + } + case None => + Flow[(TNode, Iterable[TTriple])] + .map { (graphName: TNode, triples: Iterable[TTriple]) => + val rows = graphAsIterable(graphName, triples).toSeq + RdfStreamFrame(rows) + } + - private def flatFlow[TIn](encoderFlow: Flow[TIn, RdfStreamRow, NotUsed], opt: Options): + private def flatFlow[TIn](transform: TIn => Iterable[RdfStreamRow], limiter: SizeLimiter): Flow[TIn, RdfStreamFrame, NotUsed] = - encoderFlow - .groupedWeighted(opt.targetMessageSize)(row => row.serializedSize) + Flow[TIn] + .mapConcat(transform) + .via(limiter.flow) .map(rows => RdfStreamFrame(rows)) - private def groupedFlow[TIn](encoderFlow: Flow[TIn, RdfStreamRow, NotUsed], opt: Options): + private def groupedFlow[TIn](transform: TIn => Iterable[RdfStreamRow], maybeLimiter: Option[SizeLimiter]): Flow[IterableOnce[TIn], RdfStreamFrame, NotUsed] = - Flow[IterableOnce[TIn]] - .flatMapConcat { elems => - Source.fromIterator(() => elems.iterator) - .via(flatFlow(encoderFlow, opt)) - } + maybeLimiter match + case Some(limiter) => + Flow[IterableOnce[TIn]].flatMapConcat(elems => { + Source.fromIterator(() => elems.iterator) + .via(flatFlow(transform, limiter)) + }) + case None => + Flow[IterableOnce[TIn]].map(elems => { + val rows = elems.iterator + .flatMap(transform) + .toSeq + RdfStreamFrame(rows) + }) diff --git a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderSource.scala b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderSource.scala index 6148f14f..5dad63f0 100644 --- a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderSource.scala +++ b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderSource.scala @@ -13,46 +13,47 @@ object EncoderSource: * RDF stream type: TRIPLES. * * @param graph the RDF graph to be streamed - * @param opt streaming options - * @param streamOpt Jelly serialization options + * @param limiter frame size limiter (see [[SizeLimiter]]) + * @param opt Jelly serialization options * @param adapter graph -> triples adapter (see implementations of [[IterableAdapter]]) * @param factory implementation of [[ConverterFactory]] (e.g., JenaConverterFactory) * @tparam TGraph type of the RDF graph * @tparam TTriple type of the RDF triple * @return Pekko Streams source of RDF stream frames */ - def fromGraph[TGraph, TTriple](graph: TGraph, opt: Options, streamOpt: RdfStreamOptions) + def fromGraph[TGraph, TTriple](graph: TGraph, limiter: SizeLimiter, opt: RdfStreamOptions) (implicit adapter: IterableAdapter[?, TTriple, ?, TGraph, ?], factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): Source[RdfStreamFrame, NotUsed] = Source(adapter.asTriples(graph)) - .via(fromFlatTriples(opt, streamOpt)) + .via(fromFlatTriples(limiter, opt)) /** * A source of RDF stream frames from an RDF dataset implementation (quads format). * RDF stream type: QUADS. * * @param dataset the RDF dataset to be streamed - * @param opt streaming options - * @param streamOpt Jelly serialization options + * @param limiter frame size limiter (see [[SizeLimiter]]) + * @param opt Jelly serialization options * @param adapter dataset -> quads adapter (see implementations of [[IterableAdapter]]) * @param factory implementation of [[ConverterFactory]] (e.g., JenaConverterFactory) * @tparam TDataset type of the RDF dataset * @tparam TQuad type of the RDF quad * @return Pekko Streams source of RDF stream frames */ - def fromDatasetAsQuads[TDataset, TQuad](dataset: TDataset, opt: Options, streamOpt: RdfStreamOptions) + def fromDatasetAsQuads[TDataset, TQuad](dataset: TDataset, limiter: SizeLimiter, opt: RdfStreamOptions) (implicit adapter: IterableAdapter[?, ?, TQuad, ?, TDataset], factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): Source[RdfStreamFrame, NotUsed] = Source(adapter.asQuads(dataset)) - .via(fromFlatQuads(opt, streamOpt)) + .via(fromFlatQuads(limiter, opt)) /** * A source of RDF stream frames from an RDF dataset implementation (graphs format). * RDF stream type: GRAPHS. * * @param dataset the RDF dataset to be streamed - * @param opt streaming options - * @param streamOpt Jelly serialization options + * @param maybeLimiter frame size limiter (see [[SizeLimiter]]). + * If None, no size limit is applied (frames are only split by graphs). + * @param opt Jelly serialization options * @param adapter dataset -> graphs adapter (see implementations of [[IterableAdapter]]) * @param factory implementation of [[ConverterFactory]] (e.g., JenaConverterFactory) * @tparam TDataset type of the RDF dataset @@ -60,9 +61,10 @@ object EncoderSource: * @tparam TTriple type of the RDF triple * @return */ - def fromDatasetAsGraphs[TDataset, TNode, TTriple](dataset: TDataset, opt: Options, streamOpt: RdfStreamOptions) + def fromDatasetAsGraphs[TDataset, TNode, TTriple] + (dataset: TDataset, maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) (implicit adapter: IterableAdapter[TNode, TTriple, ?, ?, TDataset], factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): Source[RdfStreamFrame, NotUsed] = Source(adapter.asGraphs(dataset)) - .via(fromGraphs(opt, streamOpt)) + .via(fromGraphs(maybeLimiter, opt)) diff --git a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/SizeLimiter.scala b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/SizeLimiter.scala new file mode 100644 index 00000000..b0f23a38 --- /dev/null +++ b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/SizeLimiter.scala @@ -0,0 +1,40 @@ +package eu.ostrzyciel.jelly.stream + +import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamRow +import org.apache.pekko.NotUsed +import org.apache.pekko.stream.scaladsl.Flow + +/** + * Policy for limiting the size of stream frames in the Jelly stream producer. + * + * Note that the limiter is only a **limit**. When streaming grouped triples, + * grouped quads, or graphs, the size of the stream frame may be smaller than + * the limit, as the stream frame is always created after the group is complete. + * + * See also: [[EncoderFlow]], [[EncoderSource]] + */ +trait SizeLimiter: + /** + * Flow that limits the size of stream frames. + * + * The flow should group the incoming rows into Seqs that will be later used to + * create stream frames. + * @return Apache Pekko Flow + */ + def flow: Flow[RdfStreamRow, Seq[RdfStreamRow], NotUsed] + +/** + * Stream frame size limiter that maintains a maximum byte size of stream frames. + * @param maxSize maximum byte size of stream frames + */ +final class ByteSizeLimiter(maxSize: Long) extends SizeLimiter: + override def flow: Flow[RdfStreamRow, Seq[RdfStreamRow], NotUsed] = + Flow[RdfStreamRow].groupedWeighted(maxSize)(row => row.serializedSize) + +/** + * Stream frame size limiter that maintains a maximum number of rows in stream frames. + * @param maxRows maximum number of rows in stream frames + */ +final class StreamRowCountLimiter(maxRows: Int) extends SizeLimiter: + override def flow: Flow[RdfStreamRow, Seq[RdfStreamRow], NotUsed] = + Flow[RdfStreamRow].grouped(maxRows) diff --git a/stream/src/test/scala/eu/ostrzyciel/jelly/stream/EncoderFlowSpec.scala b/stream/src/test/scala/eu/ostrzyciel/jelly/stream/EncoderFlowSpec.scala index 53ef683c..f7fcadfa 100644 --- a/stream/src/test/scala/eu/ostrzyciel/jelly/stream/EncoderFlowSpec.scala +++ b/stream/src/test/scala/eu/ostrzyciel/jelly/stream/EncoderFlowSpec.scala @@ -22,7 +22,7 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: "fromFlatTriples" should { "encode triples" in { val encoded: Seq[RdfStreamFrame] = Source(Triples1.mrl) - .via(EncoderFlow.fromFlatTriples(EncoderFlow.Options(), JellyOptions.smallGeneralized)) + .via(EncoderFlow.fromFlatTriples(StreamRowCountLimiter(1000), JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -35,7 +35,7 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: "encode triples with max message size" in { val encoded: Seq[RdfStreamFrame] = Source(Triples1.mrl) - .via(EncoderFlow.fromFlatTriples(EncoderFlow.Options(80), JellyOptions.smallGeneralized)) + .via(EncoderFlow.fromFlatTriples(ByteSizeLimiter(80), JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -46,10 +46,23 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: encoded.size should be (3) } + "encode triples with max row count" in { + val encoded: Seq[RdfStreamFrame] = Source(Triples1.mrl) + .via(EncoderFlow.fromFlatTriples(StreamRowCountLimiter(4), JellyOptions.smallGeneralized)) + .toMat(Sink.seq)(Keep.right) + .run().futureValue + + assertEncoded( + encoded.flatMap(_.rows), + Triples1.encoded(JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES)) + ) + encoded.size should be (4) + } + "encode triples (norepeat)" in { val jOptions = JellyOptions.smallGeneralized.withUseRepeat(false) val encoded: Seq[RdfStreamFrame] = Source(Triples2NoRepeat.mrl) - .via(EncoderFlow.fromFlatTriples(EncoderFlow.Options(), jOptions)) + .via(EncoderFlow.fromFlatTriples(StreamRowCountLimiter(1000), jOptions)) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -65,7 +78,7 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: "encode grouped triples" in { val encoded: Seq[RdfStreamFrame] = Source(Triples1.mrl) .grouped(2) - .via(EncoderFlow.fromGroupedTriples(EncoderFlow.Options(), JellyOptions.smallGeneralized)) + .via(EncoderFlow.fromGroupedTriples(None, JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -77,12 +90,30 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: encoded.head.rows.count(_.row.isTriple) should be (2) encoded(1).rows.count(_.row.isTriple) should be (2) } + + "encode grouped triples with max row count" in { + val encoded: Seq[RdfStreamFrame] = Source(Triples1.mrl) + .grouped(2) + .via(EncoderFlow.fromGroupedTriples(Some(StreamRowCountLimiter(4)), JellyOptions.smallGeneralized)) + .toMat(Sink.seq)(Keep.right) + .run().futureValue + + assertEncoded( + encoded.flatMap(_.rows), + Triples1.encoded(JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES)) + ) + encoded.size should be (4) + encoded.head.rows.count(_.row.isTriple) should be (0) + encoded(1).rows.count(_.row.isTriple) should be (1) + encoded(2).rows.count(_.row.isTriple) should be (1) + encoded(3).rows.count(_.row.isTriple) should be (2) + } } "fromFlatQuads" should { "encode quads" in { val encoded: Seq[RdfStreamFrame] = Source(Quads1.mrl) - .via(EncoderFlow.fromFlatQuads(EncoderFlow.Options(), JellyOptions.smallGeneralized)) + .via(EncoderFlow.fromFlatQuads(StreamRowCountLimiter(1000), JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -98,7 +129,7 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: "encode grouped quads" in { val encoded: Seq[RdfStreamFrame] = Source(Quads1.mrl) .grouped(2) - .via(EncoderFlow.fromGroupedQuads(EncoderFlow.Options(), JellyOptions.smallGeneralized)) + .via(EncoderFlow.fromGroupedQuads(None, JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -115,7 +146,7 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: "fromGraphs" should { "encode graphs" in { val encoded: Seq[RdfStreamFrame] = Source(Graphs1.mrl) - .via(EncoderFlow.fromGraphs(EncoderFlow.Options(), JellyOptions.smallGeneralized)) + .via(EncoderFlow.fromGraphs(None, JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue