diff --git a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/JellyIo.scala b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/JellyIo.scala index c6b6c859..642a8a1d 100644 --- a/stream/src/main/scala/eu/ostrzyciel/jelly/stream/JellyIo.scala +++ b/stream/src/main/scala/eu/ostrzyciel/jelly/stream/JellyIo.scala @@ -4,7 +4,7 @@ import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame import org.apache.pekko.{Done, NotUsed} import org.apache.pekko.stream.scaladsl.* -import java.io.OutputStream +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream} import scala.concurrent.Future /** @@ -29,6 +29,31 @@ object JellyIo: def fromBytes: Flow[Array[Byte], RdfStreamFrame, NotUsed] = Flow[Array[Byte]].map(RdfStreamFrame.parseFrom) + /** + * Convert a stream of Jelly frames into a stream of DELIMITED bytes. + * + * You can safely use this method to write to a file or socket. + * @return Pekko Flow + */ + def toBytesDelimited: Flow[RdfStreamFrame, Array[Byte], NotUsed] = + Flow[RdfStreamFrame].map(f => { + val os = ByteArrayOutputStream() + f.writeDelimitedTo(os) + os.toByteArray + }) + + /** + * Convert a stream of DELIMITED bytes into a stream of Jelly frames. + * + * You can safely use this method to read from a file or socket. + * @return Pekko Flow + */ + def fromBytesDelimited: Flow[Array[Byte], RdfStreamFrame, NotUsed] = + Flow[Array[Byte]].mapConcat(b => { + val is = ByteArrayInputStream(b) + RdfStreamFrame.parseDelimitedFrom(is) + }) + /** * Write a stream of Jelly frames to an output stream. The frames will be delimited. * diff --git a/stream/src/test/scala/eu/ostrzyciel/jelly/stream/JellyIoSpec.scala b/stream/src/test/scala/eu/ostrzyciel/jelly/stream/JellyIoSpec.scala index 9af6f55a..86b19381 100644 --- a/stream/src/test/scala/eu/ostrzyciel/jelly/stream/JellyIoSpec.scala +++ b/stream/src/test/scala/eu/ostrzyciel/jelly/stream/JellyIoSpec.scala @@ -50,6 +50,19 @@ class JellyIoSpec extends AnyWordSpec, Matchers, ScalaFutures: } } + "toBytesDelimited and fromBytesDelimited" should { + for (name, testCase) <- cases do + s"work for $name" in { + val decoded = Source.fromIterator(() => testCase.iterator) + .via(JellyIo.toBytesDelimited) + .via(JellyIo.fromBytesDelimited) + .runWith(Sink.seq) + .futureValue + + decoded shouldEqual testCase + } + } + "toIoStream and fromIoStream" should { for (name, testCase) <- cases do s"work for $name" in {