From a22836a1235d4b4b557cb1fe0b6633ed139ea0fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Sowi=C5=84ski?= Date: Wed, 11 Oct 2023 10:58:07 +0200 Subject: [PATCH] Rework stream frame size limits (#41) The previous system required the user to use a byte size limit on the stream frames, which did not make sense in all cases (e.g., some grouped streaming use cases). This commit introduces the SizeLimiter trait that can be used to define the policy for splitting frames when they get too big. The policy can be disabled entirely in grouped streaming formulations, but is mandatory in flat streaming. --- .../CrossStreamingSpec.scala | 26 ++-- .../integration_tests/JenaTestStream.scala | 14 +- .../integration_tests/Rdf4jTestStream.scala | 14 +- .../jelly/integration_tests/TestStream.scala | 8 +- .../io/JenaReactiveSerDes.scala | 8 +- .../io/Rdf4jReactiveSerDes.scala | 6 +- .../ostrzyciel/jelly/stream/EncoderFlow.scala | 140 ++++++++++-------- .../jelly/stream/EncoderSource.scala | 26 ++-- .../ostrzyciel/jelly/stream/SizeLimiter.scala | 40 +++++ .../jelly/stream/EncoderFlowSpec.scala | 45 +++++- 10 files changed, 207 insertions(+), 120 deletions(-) create mode 100644 stream/src/main/scala/eu/ostrzyciel/jelly/stream/SizeLimiter.scala diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/CrossStreamingSpec.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/CrossStreamingSpec.scala index df113005..c3ba0f28 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/CrossStreamingSpec.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/CrossStreamingSpec.scala @@ -56,10 +56,12 @@ class CrossStreamingSpec extends AnyWordSpec, Matchers, ScalaFutures: ("big default", JellyOptions.bigGeneralized), ) - private val streamingOptions: Seq[(String, EncoderFlow.Options)] = Seq( - ("message size: 32_000", EncoderFlow.Options(32_000)), - ("message size: 500", EncoderFlow.Options(500)), - ("message size: 2_000_000", EncoderFlow.Options(2_000_000)), + private val sizeLimiters: Seq[(String, SizeLimiter)] = Seq( + ("message byte size: 32_000", ByteSizeLimiter(32_000)), + ("message byte size: 500", ByteSizeLimiter(500)), + ("message byte size: 2_000_000", ByteSizeLimiter(2_000_000)), + ("stream row count: 5", StreamRowCountLimiter(5)), + ("stream row count: 200", StreamRowCountLimiter(200)), ) final case class CaseKey(streamType: String, encoder: String, jOpt: String, sOpt: String, caseName: String) @@ -93,8 +95,8 @@ class CrossStreamingSpec extends AnyWordSpec, Matchers, ScalaFutures: s"$encName encoder" when { for (decName, decFlow) <- implementations do for (jOptName, jOpt) <- jellyOptions do - for (sOptName, sOpt) <- streamingOptions do - s"streaming to a $decName decoder, $jOptName, $sOptName" should { + for (limiterName, limiter) <- sizeLimiters do + s"streaming to a $decName decoder, $jOptName, $limiterName" should { // Triples for (caseName, sourceFile) <- TripleTests.files do val sourceGraph = TripleTests.graphs(caseName) @@ -102,13 +104,13 @@ class CrossStreamingSpec extends AnyWordSpec, Matchers, ScalaFutures: val is = new FileInputStream(sourceFile) val os = new ByteArrayOutputStream() var encSize = 0 - encFlow.tripleSource(is, sOpt, jOpt) + encFlow.tripleSource(is, limiter, jOpt) .wireTap(f => encSize += f.serializedSize) .toMat(decFlow.tripleSink(os))(Keep.right) .run() .futureValue - val ck = CaseKey("triples", encName, jOptName, sOptName, caseName) + val ck = CaseKey("triples", encName, jOptName, limiterName, caseName) encodedSizes(ck) = encSize val resultGraph = RDFParser.source(new ByteArrayInputStream(os.toByteArray)) .lang(Lang.TURTLE) @@ -129,13 +131,13 @@ class CrossStreamingSpec extends AnyWordSpec, Matchers, ScalaFutures: val is = new FileInputStream(sourceFile) val os = new ByteArrayOutputStream() var encSize = 0 - encFlow.quadSource(is, sOpt, jOpt) + encFlow.quadSource(is, limiter, jOpt) .wireTap(f => encSize += f.serializedSize) .toMat(decFlow.quadSink(os))(Keep.right) .run() .futureValue - val ck = CaseKey("quads", encName, jOptName, sOptName, caseName) + val ck = CaseKey("quads", encName, jOptName, limiterName, caseName) encodedSizes(ck) = encSize val resultDataset = RDFParser.source(new ByteArrayInputStream(os.toByteArray)) .lang(Lang.NQ) @@ -147,13 +149,13 @@ class CrossStreamingSpec extends AnyWordSpec, Matchers, ScalaFutures: val is = new FileInputStream(sourceFile) val os = new ByteArrayOutputStream() var encSize = 0 - encFlow.graphSource(is, sOpt, jOpt) + encFlow.graphSource(is, limiter, jOpt) .wireTap(f => encSize += f.serializedSize) .toMat(decFlow.graphSink(os))(Keep.right) .run() .futureValue - val ck = CaseKey("graphs", encName, jOptName, sOptName, caseName) + val ck = CaseKey("graphs", encName, jOptName, limiterName, caseName) encodedSizes(ck) = encSize val resultDataset = RDFParser.source(new ByteArrayInputStream(os.toByteArray)) .lang(Lang.NQ) diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/JenaTestStream.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/JenaTestStream.scala index 11225fe3..8341ecd3 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/JenaTestStream.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/JenaTestStream.scala @@ -1,7 +1,7 @@ package eu.ostrzyciel.jelly.integration_tests import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamFrame, RdfStreamOptions} -import eu.ostrzyciel.jelly.stream.{DecoderFlow, EncoderFlow} +import eu.ostrzyciel.jelly.stream.* import org.apache.jena.graph.{Node, Triple} import org.apache.jena.riot.system.AsyncParser import org.apache.jena.riot.{Lang, RDFDataMgr, RDFParser} @@ -14,15 +14,15 @@ import scala.jdk.CollectionConverters.* case object JenaTestStream extends TestStream: import eu.ostrzyciel.jelly.convert.jena.* - override def tripleSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions) = + override def tripleSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = Source.fromIterator(() => AsyncParser.asyncParseTriples(is, Lang.NT, "").asScala) - .via(EncoderFlow.fromFlatTriples(streamOpt, jellyOpt)) + .via(EncoderFlow.fromFlatTriples(limiter, jellyOpt)) - override def quadSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions) = + override def quadSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = Source.fromIterator(() => AsyncParser.asyncParseQuads(is, Lang.NQUADS, "").asScala) - .via(EncoderFlow.fromFlatQuads(streamOpt, jellyOpt)) + .via(EncoderFlow.fromFlatQuads(limiter, jellyOpt)) - override def graphSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions) = + override def graphSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = val ds = RDFParser.source(is) .lang(Lang.NQ) .toDatasetGraph @@ -31,7 +31,7 @@ case object JenaTestStream extends TestStream: Iterator((null, Iterable.from(ds.getDefaultGraph.find.asScala))) ).filter((_, g) => g.nonEmpty) Source.fromIterator(() => graphs) - .via(EncoderFlow.fromGraphs(streamOpt, jellyOpt)) + .via(EncoderFlow.fromGraphs(Some(limiter), jellyOpt)) override def tripleSink(os: OutputStream)(implicit ec: ExecutionContext) = Flow[RdfStreamFrame] diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/Rdf4jTestStream.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/Rdf4jTestStream.scala index 408dfd3f..0c7c8c3c 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/Rdf4jTestStream.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/Rdf4jTestStream.scala @@ -1,7 +1,7 @@ package eu.ostrzyciel.jelly.integration_tests import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamFrame, RdfStreamOptions} -import eu.ostrzyciel.jelly.stream.{DecoderFlow, EncoderFlow} +import eu.ostrzyciel.jelly.stream.* import org.apache.pekko.Done import org.apache.pekko.stream.scaladsl.* import org.eclipse.rdf4j.rio.* @@ -14,7 +14,7 @@ import scala.jdk.CollectionConverters.* case object Rdf4jTestStream extends TestStream: import eu.ostrzyciel.jelly.convert.rdf4j.* - override def tripleSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions) = + override def tripleSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = // This buffers everything in memory... but I'm too lazy to implement my own RDFHandler for this // RDF4J at the moment only has two formats with RDF-star support – Turtle and Trig. val parser = Rio.createParser(RDFFormat.TURTLESTAR) @@ -22,17 +22,17 @@ case object Rdf4jTestStream extends TestStream: parser.setRDFHandler(collector) parser.parse(is) Source.fromIterator(() => collector.getStatements.asScala.iterator) - .via(EncoderFlow.fromFlatTriples(streamOpt, jellyOpt)) + .via(EncoderFlow.fromFlatTriples(limiter, jellyOpt)) - override def quadSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions) = + override def quadSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = val parser = Rio.createParser(RDFFormat.NQUADS) val collector = new StatementCollector() parser.setRDFHandler(collector) parser.parse(is) Source.fromIterator(() => collector.getStatements.asScala.iterator) - .via(EncoderFlow.fromFlatQuads(streamOpt, jellyOpt)) + .via(EncoderFlow.fromFlatQuads(limiter, jellyOpt)) - override def graphSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions) = + override def graphSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) = val parser = Rio.createParser(RDFFormat.NQUADS) val collector = new StatementCollector() parser.setRDFHandler(collector) @@ -40,7 +40,7 @@ case object Rdf4jTestStream extends TestStream: val graphs = collector.getStatements.asScala.toSeq .groupBy(_.getContext) Source.fromIterator(() => graphs.iterator) - .via(EncoderFlow.fromGraphs(streamOpt, jellyOpt)) + .via(EncoderFlow.fromGraphs(Some(limiter), jellyOpt)) override def tripleSink(os: OutputStream)(implicit ec: ExecutionContext) = val writer = Rio.createWriter(RDFFormat.TURTLESTAR, os) diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/TestStream.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/TestStream.scala index b996d0c6..34a33bf2 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/TestStream.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/TestStream.scala @@ -1,7 +1,7 @@ package eu.ostrzyciel.jelly.integration_tests import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamFrame, RdfStreamOptions} -import eu.ostrzyciel.jelly.stream.EncoderFlow +import eu.ostrzyciel.jelly.stream.* import org.apache.pekko.stream.scaladsl.* import org.apache.pekko.{Done, NotUsed} @@ -9,13 +9,13 @@ import java.io.{InputStream, OutputStream} import scala.concurrent.{ExecutionContext, Future} trait TestStream: - def tripleSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions): + def tripleSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions): Source[RdfStreamFrame, NotUsed] - def quadSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions): + def quadSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions): Source[RdfStreamFrame, NotUsed] - def graphSource(is: InputStream, streamOpt: EncoderFlow.Options, jellyOpt: RdfStreamOptions): + def graphSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions): Source[RdfStreamFrame, NotUsed] def tripleSink(os: OutputStream)(implicit ec: ExecutionContext): Sink[RdfStreamFrame, Future[Done]] diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/JenaReactiveSerDes.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/JenaReactiveSerDes.scala index 2b1d0cdd..ff851706 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/JenaReactiveSerDes.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/JenaReactiveSerDes.scala @@ -2,7 +2,7 @@ package eu.ostrzyciel.jelly.integration_tests.io import eu.ostrzyciel.jelly.convert.jena.* import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions -import eu.ostrzyciel.jelly.stream.{EncoderFlow, EncoderSource, JellyIo} +import eu.ostrzyciel.jelly.stream.* import org.apache.jena.query.Dataset import org.apache.jena.rdf.model.Model import org.apache.pekko.stream.Materializer @@ -25,12 +25,14 @@ class JenaReactiveSerDes(implicit mat: Materializer) extends NativeSerDes[Model, def writeQuadsJelly (os: OutputStream, dataset: Dataset, opt: RdfStreamOptions, frameSize: Int): Unit = - val f = EncoderSource.fromDatasetAsQuads(dataset, EncoderFlow.Options(), opt)(jenaIterableAdapter, jenaConverterFactory) + val f = EncoderSource.fromDatasetAsQuads(dataset, ByteSizeLimiter(32_000), opt) + (jenaIterableAdapter, jenaConverterFactory) .runWith(JellyIo.toIoStream(os)) Await.ready(f, 10.seconds) def writeTriplesJelly (os: OutputStream, model: Model, opt: RdfStreamOptions, frameSize: Int): Unit = - val f = EncoderSource.fromGraph(model, EncoderFlow.Options(), opt)(jenaIterableAdapter, jenaConverterFactory) + val f = EncoderSource.fromGraph(model, ByteSizeLimiter(32_000), opt) + (jenaIterableAdapter, jenaConverterFactory) .runWith(JellyIo.toIoStream(os)) Await.ready(f, 10.seconds) diff --git a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/Rdf4jReactiveSerDes.scala b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/Rdf4jReactiveSerDes.scala index d5fc012c..7ef5ae46 100644 --- a/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/Rdf4jReactiveSerDes.scala +++ b/integration-tests/src/test/scala/eu/ostrzyciel/jelly/integration_tests/io/Rdf4jReactiveSerDes.scala @@ -2,7 +2,7 @@ package eu.ostrzyciel.jelly.integration_tests.io import eu.ostrzyciel.jelly.convert.rdf4j.Rdf4jConverterFactory import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions -import eu.ostrzyciel.jelly.stream.{DecoderFlow, EncoderFlow, JellyIo} +import eu.ostrzyciel.jelly.stream.* import org.apache.pekko.stream.Materializer import org.apache.pekko.stream.scaladsl.* import org.eclipse.rdf4j.model.Statement @@ -32,12 +32,12 @@ class Rdf4jReactiveSerDes(implicit mat: Materializer) extends NativeSerDes[Seq[S override def writeTriplesJelly(os: OutputStream, model: Seq[Statement], opt: RdfStreamOptions, frameSize: Int): Unit = val f = Source.fromIterator(() => model.iterator) - .via(EncoderFlow.fromFlatTriples(EncoderFlow.Options(frameSize * 10), opt)) + .via(EncoderFlow.fromFlatTriples(StreamRowCountLimiter(frameSize), opt)) .runWith(JellyIo.toIoStream(os)) Await.ready(f, 10.seconds) override def writeQuadsJelly(os: OutputStream, dataset: Seq[Statement], opt: RdfStreamOptions, frameSize: Int): Unit = val f = Source.fromIterator(() => dataset.iterator) - .via(EncoderFlow.fromFlatQuads(EncoderFlow.Options(frameSize * 10), opt)) + .via(EncoderFlow.fromFlatQuads(StreamRowCountLimiter(frameSize), opt)) .runWith(JellyIo.toIoStream(os)) Await.ready(f, 10.seconds) diff --git a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderFlow.scala b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderFlow.scala index 62c13659..90c47a56 100644 --- a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderFlow.scala +++ b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderFlow.scala @@ -13,22 +13,6 @@ import org.apache.pekko.stream.scaladsl.{Flow, Source} * (that it adheres to the appropriate stream type). */ object EncoderFlow: - object Options: - /** - * Build streaming options from the application's config. - * @param config app config - * @return stream options - */ - def apply(config: Config): Options = - Options( - config.getInt("jelly.stream.target-message-size"), - ) - - /** - * @param targetMessageSize Target message size in bytes. - * After the message gets bigger than the target, it gets sent. - */ - final case class Options(targetMessageSize: Int = 32_000) /** * A flow converting a flat stream of triple statements into a stream of [[RdfStreamFrame]]s. @@ -36,19 +20,19 @@ object EncoderFlow: * * This flow will wait for enough items to fill the whole gRPC message, which increases latency. To mitigate that, * use the [[fromGroupedTriples]] method instead. - * @param opt Streaming options. - * @param streamOpt Jelly serialization options. + * @param limiter frame size limiter (see [[SizeLimiter]]). + * @param opt Jelly serialization options. * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). * @tparam TTriple Type of triple statements. * @return Pekko Streams flow. */ - final def fromFlatTriples[TTriple] - (opt: Options, streamOpt: RdfStreamOptions)(implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): + final def fromFlatTriples[TTriple](limiter: SizeLimiter, opt: RdfStreamOptions) + (implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): Flow[TTriple, RdfStreamFrame, NotUsed] = val encoder = factory.encoder( - streamOpt.withStreamType(RdfStreamType.TRIPLES) + opt.withStreamType(RdfStreamType.TRIPLES) ) - flatFlow(Flow[TTriple].mapConcat(e => encoder.addTripleStatement(e)), opt) + flatFlow(e => encoder.addTripleStatement(e), limiter) /** * A flow converting a flat stream of quad statements into a stream of [[RdfStreamFrame]]s. @@ -56,19 +40,19 @@ object EncoderFlow: * * This flow will wait for enough items to fill the whole gRPC message, which increases latency. To mitigate that, * use the [[fromGroupedQuads]] method instead. - * @param opt Streaming options. - * @param streamOpt Jelly serialization options. + * @param limiter frame size limiter (see [[SizeLimiter]]) + * @param opt Jelly serialization options. * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). * @tparam TQuad Type of quad statements. * @return Pekko Streams flow. */ - final def fromFlatQuads[TQuad] - (opt: Options, streamOpt: RdfStreamOptions)(implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): + final def fromFlatQuads[TQuad](limiter: SizeLimiter, opt: RdfStreamOptions) + (implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): Flow[TQuad, RdfStreamFrame, NotUsed] = val encoder = factory.encoder( - streamOpt.withStreamType(RdfStreamType.QUADS) + opt.withStreamType(RdfStreamType.QUADS) ) - flatFlow(Flow[TQuad].mapConcat(e => encoder.addQuadStatement(e)), opt) + flatFlow(e => encoder.addQuadStatement(e), limiter) /** * A flow converting a stream of iterables with triple statements into a stream of [[RdfStreamFrame]]s. @@ -76,19 +60,21 @@ object EncoderFlow: * * After this flow finishes processing an iterable in the input stream, it is guaranteed to output an * [[RdfStreamFrame]], which allows to maintain low latency. - * @param opt Streaming options. - * @param streamOpt Jelly serialization options. + * + * @param maybeLimiter frame size limiter (see [[SizeLimiter]]). + * If None, no size limit is applied (frames are only split by groups). + * @param opt Jelly serialization options. * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). * @tparam TTriple Type of triple statements. * @return Pekko Streams flow. */ - final def fromGroupedTriples[TTriple] - (opt: Options, streamOpt: RdfStreamOptions)(implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): + final def fromGroupedTriples[TTriple](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) + (implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): Flow[IterableOnce[TTriple], RdfStreamFrame, NotUsed] = val encoder = factory.encoder( - streamOpt.withStreamType(RdfStreamType.TRIPLES) + opt.withStreamType(RdfStreamType.TRIPLES) ) - groupedFlow(Flow[TTriple].mapConcat(e => encoder.addTripleStatement(e)), opt) + groupedFlow(e => encoder.addTripleStatement(e), maybeLimiter) /** * A flow converting a stream of iterables with quad statements into a stream of [[RdfStreamFrame]]s. @@ -96,61 +82,85 @@ object EncoderFlow: * * After this flow finishes processing an iterable in the input stream, it is guaranteed to output an * [[RdfStreamFrame]], which allows to maintain low latency. - * @param opt Streaming options. - * @param streamOpt Jelly serialization options. + * + * @param maybeLimiter frame size limiter (see [[SizeLimiter]]). + * If None, no size limit is applied (frames are only split by groups). + * @param opt Jelly serialization options. * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). * @tparam TQuad Type of quad statements. * @return Pekko Streams flow. */ - final def fromGroupedQuads[TQuad] - (opt: Options, streamOpt: RdfStreamOptions)(implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): + final def fromGroupedQuads[TQuad](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) + (implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): Flow[IterableOnce[TQuad], RdfStreamFrame, NotUsed] = val encoder = factory.encoder( - streamOpt.withStreamType(RdfStreamType.QUADS) + opt.withStreamType(RdfStreamType.QUADS) ) - groupedFlow(Flow[TQuad].mapConcat(e => encoder.addQuadStatement(e)), opt) + groupedFlow(e => encoder.addQuadStatement(e), maybeLimiter) /** - * A flow converting a stream of named graphs (node as graph name + iterable of triple statements) into a stream - * of [[RdfStreamFrame]]s. + * A flow converting a stream of named or unnamed graphs (node as graph name + iterable of triple statements) + * into a stream of [[RdfStreamFrame]]s. * RDF stream type: GRAPHS. * * After this flow finishes processing a single graph in the input stream, it is guaranteed to output an * [[RdfStreamFrame]], which allows to maintain low latency. - * @param opt Streaming options. - * @param streamOpt Jelly serialization options. + * + * @param maybeLimiter frame size limiter (see [[SizeLimiter]]). + * If None, no size limit is applied (frames are only split by graphs). + * @param opt Jelly serialization options. * @param factory Implementation of [[ConverterFactory]] (e.g., JenaConverterFactory). * @tparam TNode Type of nodes. * @tparam TTriple Type of triple statements. * @return Pekko Streams flow. */ - final def fromGraphs[TNode, TTriple] - (opt: Options, streamOpt: RdfStreamOptions)(implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): + final def fromGraphs[TNode, TTriple](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) + (implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): Flow[(TNode, Iterable[TTriple]), RdfStreamFrame, NotUsed] = val encoder = factory.encoder( - streamOpt.withStreamType(RdfStreamType.GRAPHS) + opt.withStreamType(RdfStreamType.GRAPHS) ) - Flow[(TNode, Iterable[TTriple])] - .flatMapConcat { (graphName: TNode, triples: Iterable[TTriple]) => - val it: Iterable[RdfStreamRow] = encoder.startGraph(graphName) - .concat(triples.flatMap(triple => encoder.addTripleStatement(triple))) - .concat(encoder.endGraph()) - Source.fromIterator(() => it.iterator) - .groupedWeighted(opt.targetMessageSize)(row => row.serializedSize) - .map(rows => RdfStreamFrame(rows)) - } + inline def graphAsIterable(graphName: TNode, triples: Iterable[TTriple]): Iterable[RdfStreamRow] = + encoder.startGraph(graphName) + .concat(triples.flatMap(triple => encoder.addTripleStatement(triple))) + .concat(encoder.endGraph()) + + maybeLimiter match + case Some(limiter) => + Flow[(TNode, Iterable[TTriple])] + .flatMapConcat { (graphName: TNode, triples: Iterable[TTriple]) => + Source.fromIterator(() => graphAsIterable(graphName, triples).iterator) + .via(limiter.flow) + .map(rows => RdfStreamFrame(rows)) + } + case None => + Flow[(TNode, Iterable[TTriple])] + .map { (graphName: TNode, triples: Iterable[TTriple]) => + val rows = graphAsIterable(graphName, triples).toSeq + RdfStreamFrame(rows) + } + - private def flatFlow[TIn](encoderFlow: Flow[TIn, RdfStreamRow, NotUsed], opt: Options): + private def flatFlow[TIn](transform: TIn => Iterable[RdfStreamRow], limiter: SizeLimiter): Flow[TIn, RdfStreamFrame, NotUsed] = - encoderFlow - .groupedWeighted(opt.targetMessageSize)(row => row.serializedSize) + Flow[TIn] + .mapConcat(transform) + .via(limiter.flow) .map(rows => RdfStreamFrame(rows)) - private def groupedFlow[TIn](encoderFlow: Flow[TIn, RdfStreamRow, NotUsed], opt: Options): + private def groupedFlow[TIn](transform: TIn => Iterable[RdfStreamRow], maybeLimiter: Option[SizeLimiter]): Flow[IterableOnce[TIn], RdfStreamFrame, NotUsed] = - Flow[IterableOnce[TIn]] - .flatMapConcat { elems => - Source.fromIterator(() => elems.iterator) - .via(flatFlow(encoderFlow, opt)) - } + maybeLimiter match + case Some(limiter) => + Flow[IterableOnce[TIn]].flatMapConcat(elems => { + Source.fromIterator(() => elems.iterator) + .via(flatFlow(transform, limiter)) + }) + case None => + Flow[IterableOnce[TIn]].map(elems => { + val rows = elems.iterator + .flatMap(transform) + .toSeq + RdfStreamFrame(rows) + }) diff --git a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderSource.scala b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderSource.scala index 6148f14f..5dad63f0 100644 --- a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderSource.scala +++ b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/EncoderSource.scala @@ -13,46 +13,47 @@ object EncoderSource: * RDF stream type: TRIPLES. * * @param graph the RDF graph to be streamed - * @param opt streaming options - * @param streamOpt Jelly serialization options + * @param limiter frame size limiter (see [[SizeLimiter]]) + * @param opt Jelly serialization options * @param adapter graph -> triples adapter (see implementations of [[IterableAdapter]]) * @param factory implementation of [[ConverterFactory]] (e.g., JenaConverterFactory) * @tparam TGraph type of the RDF graph * @tparam TTriple type of the RDF triple * @return Pekko Streams source of RDF stream frames */ - def fromGraph[TGraph, TTriple](graph: TGraph, opt: Options, streamOpt: RdfStreamOptions) + def fromGraph[TGraph, TTriple](graph: TGraph, limiter: SizeLimiter, opt: RdfStreamOptions) (implicit adapter: IterableAdapter[?, TTriple, ?, TGraph, ?], factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]): Source[RdfStreamFrame, NotUsed] = Source(adapter.asTriples(graph)) - .via(fromFlatTriples(opt, streamOpt)) + .via(fromFlatTriples(limiter, opt)) /** * A source of RDF stream frames from an RDF dataset implementation (quads format). * RDF stream type: QUADS. * * @param dataset the RDF dataset to be streamed - * @param opt streaming options - * @param streamOpt Jelly serialization options + * @param limiter frame size limiter (see [[SizeLimiter]]) + * @param opt Jelly serialization options * @param adapter dataset -> quads adapter (see implementations of [[IterableAdapter]]) * @param factory implementation of [[ConverterFactory]] (e.g., JenaConverterFactory) * @tparam TDataset type of the RDF dataset * @tparam TQuad type of the RDF quad * @return Pekko Streams source of RDF stream frames */ - def fromDatasetAsQuads[TDataset, TQuad](dataset: TDataset, opt: Options, streamOpt: RdfStreamOptions) + def fromDatasetAsQuads[TDataset, TQuad](dataset: TDataset, limiter: SizeLimiter, opt: RdfStreamOptions) (implicit adapter: IterableAdapter[?, ?, TQuad, ?, TDataset], factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]): Source[RdfStreamFrame, NotUsed] = Source(adapter.asQuads(dataset)) - .via(fromFlatQuads(opt, streamOpt)) + .via(fromFlatQuads(limiter, opt)) /** * A source of RDF stream frames from an RDF dataset implementation (graphs format). * RDF stream type: GRAPHS. * * @param dataset the RDF dataset to be streamed - * @param opt streaming options - * @param streamOpt Jelly serialization options + * @param maybeLimiter frame size limiter (see [[SizeLimiter]]). + * If None, no size limit is applied (frames are only split by graphs). + * @param opt Jelly serialization options * @param adapter dataset -> graphs adapter (see implementations of [[IterableAdapter]]) * @param factory implementation of [[ConverterFactory]] (e.g., JenaConverterFactory) * @tparam TDataset type of the RDF dataset @@ -60,9 +61,10 @@ object EncoderSource: * @tparam TTriple type of the RDF triple * @return */ - def fromDatasetAsGraphs[TDataset, TNode, TTriple](dataset: TDataset, opt: Options, streamOpt: RdfStreamOptions) + def fromDatasetAsGraphs[TDataset, TNode, TTriple] + (dataset: TDataset, maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions) (implicit adapter: IterableAdapter[TNode, TTriple, ?, ?, TDataset], factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]): Source[RdfStreamFrame, NotUsed] = Source(adapter.asGraphs(dataset)) - .via(fromGraphs(opt, streamOpt)) + .via(fromGraphs(maybeLimiter, opt)) diff --git a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/SizeLimiter.scala b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/SizeLimiter.scala new file mode 100644 index 00000000..b0f23a38 --- /dev/null +++ b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/SizeLimiter.scala @@ -0,0 +1,40 @@ +package eu.ostrzyciel.jelly.stream + +import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamRow +import org.apache.pekko.NotUsed +import org.apache.pekko.stream.scaladsl.Flow + +/** + * Policy for limiting the size of stream frames in the Jelly stream producer. + * + * Note that the limiter is only a **limit**. When streaming grouped triples, + * grouped quads, or graphs, the size of the stream frame may be smaller than + * the limit, as the stream frame is always created after the group is complete. + * + * See also: [[EncoderFlow]], [[EncoderSource]] + */ +trait SizeLimiter: + /** + * Flow that limits the size of stream frames. + * + * The flow should group the incoming rows into Seqs that will be later used to + * create stream frames. + * @return Apache Pekko Flow + */ + def flow: Flow[RdfStreamRow, Seq[RdfStreamRow], NotUsed] + +/** + * Stream frame size limiter that maintains a maximum byte size of stream frames. + * @param maxSize maximum byte size of stream frames + */ +final class ByteSizeLimiter(maxSize: Long) extends SizeLimiter: + override def flow: Flow[RdfStreamRow, Seq[RdfStreamRow], NotUsed] = + Flow[RdfStreamRow].groupedWeighted(maxSize)(row => row.serializedSize) + +/** + * Stream frame size limiter that maintains a maximum number of rows in stream frames. + * @param maxRows maximum number of rows in stream frames + */ +final class StreamRowCountLimiter(maxRows: Int) extends SizeLimiter: + override def flow: Flow[RdfStreamRow, Seq[RdfStreamRow], NotUsed] = + Flow[RdfStreamRow].grouped(maxRows) diff --git a/stream/src/test/scala/eu/ostrzyciel/jelly/stream/EncoderFlowSpec.scala b/stream/src/test/scala/eu/ostrzyciel/jelly/stream/EncoderFlowSpec.scala index 53ef683c..f7fcadfa 100644 --- a/stream/src/test/scala/eu/ostrzyciel/jelly/stream/EncoderFlowSpec.scala +++ b/stream/src/test/scala/eu/ostrzyciel/jelly/stream/EncoderFlowSpec.scala @@ -22,7 +22,7 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: "fromFlatTriples" should { "encode triples" in { val encoded: Seq[RdfStreamFrame] = Source(Triples1.mrl) - .via(EncoderFlow.fromFlatTriples(EncoderFlow.Options(), JellyOptions.smallGeneralized)) + .via(EncoderFlow.fromFlatTriples(StreamRowCountLimiter(1000), JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -35,7 +35,7 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: "encode triples with max message size" in { val encoded: Seq[RdfStreamFrame] = Source(Triples1.mrl) - .via(EncoderFlow.fromFlatTriples(EncoderFlow.Options(80), JellyOptions.smallGeneralized)) + .via(EncoderFlow.fromFlatTriples(ByteSizeLimiter(80), JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -46,10 +46,23 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: encoded.size should be (3) } + "encode triples with max row count" in { + val encoded: Seq[RdfStreamFrame] = Source(Triples1.mrl) + .via(EncoderFlow.fromFlatTriples(StreamRowCountLimiter(4), JellyOptions.smallGeneralized)) + .toMat(Sink.seq)(Keep.right) + .run().futureValue + + assertEncoded( + encoded.flatMap(_.rows), + Triples1.encoded(JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES)) + ) + encoded.size should be (4) + } + "encode triples (norepeat)" in { val jOptions = JellyOptions.smallGeneralized.withUseRepeat(false) val encoded: Seq[RdfStreamFrame] = Source(Triples2NoRepeat.mrl) - .via(EncoderFlow.fromFlatTriples(EncoderFlow.Options(), jOptions)) + .via(EncoderFlow.fromFlatTriples(StreamRowCountLimiter(1000), jOptions)) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -65,7 +78,7 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: "encode grouped triples" in { val encoded: Seq[RdfStreamFrame] = Source(Triples1.mrl) .grouped(2) - .via(EncoderFlow.fromGroupedTriples(EncoderFlow.Options(), JellyOptions.smallGeneralized)) + .via(EncoderFlow.fromGroupedTriples(None, JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -77,12 +90,30 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: encoded.head.rows.count(_.row.isTriple) should be (2) encoded(1).rows.count(_.row.isTriple) should be (2) } + + "encode grouped triples with max row count" in { + val encoded: Seq[RdfStreamFrame] = Source(Triples1.mrl) + .grouped(2) + .via(EncoderFlow.fromGroupedTriples(Some(StreamRowCountLimiter(4)), JellyOptions.smallGeneralized)) + .toMat(Sink.seq)(Keep.right) + .run().futureValue + + assertEncoded( + encoded.flatMap(_.rows), + Triples1.encoded(JellyOptions.smallGeneralized.withStreamType(RdfStreamType.TRIPLES)) + ) + encoded.size should be (4) + encoded.head.rows.count(_.row.isTriple) should be (0) + encoded(1).rows.count(_.row.isTriple) should be (1) + encoded(2).rows.count(_.row.isTriple) should be (1) + encoded(3).rows.count(_.row.isTriple) should be (2) + } } "fromFlatQuads" should { "encode quads" in { val encoded: Seq[RdfStreamFrame] = Source(Quads1.mrl) - .via(EncoderFlow.fromFlatQuads(EncoderFlow.Options(), JellyOptions.smallGeneralized)) + .via(EncoderFlow.fromFlatQuads(StreamRowCountLimiter(1000), JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -98,7 +129,7 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: "encode grouped quads" in { val encoded: Seq[RdfStreamFrame] = Source(Quads1.mrl) .grouped(2) - .via(EncoderFlow.fromGroupedQuads(EncoderFlow.Options(), JellyOptions.smallGeneralized)) + .via(EncoderFlow.fromGroupedQuads(None, JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue @@ -115,7 +146,7 @@ class EncoderFlowSpec extends AnyWordSpec, Matchers, ScalaFutures: "fromGraphs" should { "encode graphs" in { val encoded: Seq[RdfStreamFrame] = Source(Graphs1.mrl) - .via(EncoderFlow.fromGraphs(EncoderFlow.Options(), JellyOptions.smallGeneralized)) + .via(EncoderFlow.fromGraphs(None, JellyOptions.smallGeneralized)) .toMat(Sink.seq)(Keep.right) .run().futureValue