Skip to content

Commit

Permalink
Use given/using instead of implicit (#58)
Browse files Browse the repository at this point in the history
It's the Scala 3 way. Changes to client code should be limited to
changing imports from ".*" to ".given"
  • Loading branch information
Ostrzyciel authored Apr 24, 2024
1 parent c776fbb commit e1a3e41
Show file tree
Hide file tree
Showing 20 changed files with 61 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import scala.reflect.ClassTag
* "Main" trait to be implemented by RDF conversion modules (e.g., for Jena and RDF4J).
* Exposes factory methods for building protobuf encoders and decoders.
*
* This should typically be implemented as an object. You should also provide a package-scoped implicit for your
* This should typically be implemented as an object. You should also provide a package-scoped given for your
* implementation so that users can easily make use of the connector in the stream package.
*
* @tparam TEncoder Implementation of [[ProtoEncoder]] for a given RDF library.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ object RdfStreamServer:
* @param system actor system
*/
final class RdfStreamServer(options: RdfStreamServer.Options, streamService: RdfStreamService)
(implicit system: ActorSystem[_]) extends LazyLogging:
implicit val ec: ExecutionContext = system.executionContext
(using system: ActorSystem[_]) extends LazyLogging:
given ExecutionContext = system.executionContext
private var binding: Option[ServerBinding] = _

/**
Expand Down
10 changes: 5 additions & 5 deletions grpc/src/test/scala/eu/ostrzyciel/jelly/grpc/GrpcSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.concurrent.duration.*
class GrpcSpec extends AnyWordSpec, Matchers, ScalaFutures, BeforeAndAfterAll:
import ProtoTestCases.*

implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 5.seconds, interval = 50.millis)
given PatienceConfig = PatienceConfig(timeout = 5.seconds, interval = 50.millis)
val conf = ConfigFactory.parseString(
"""
|pekko.http.server.preview.enable-http2 = on
Expand All @@ -43,8 +43,8 @@ class GrpcSpec extends AnyWordSpec, Matchers, ScalaFutures, BeforeAndAfterAll:
val serverSystem: ActorSystem[_] = testKit.system

class TestService(storedData: Map[String, Seq[RdfStreamFrame]]) extends RdfStreamService:
implicit val system: ActorSystem[_] = serverSystem
implicit val ec: ExecutionContext = system.executionContext
given system: ActorSystem[_] = serverSystem
given ExecutionContext = system.executionContext
var receivedData: mutable.Map[String, Seq[RdfStreamFrame]] = mutable.Map()

override def publishRdf(in: Source[RdfStreamFrame, NotUsed]) =
Expand Down Expand Up @@ -94,11 +94,11 @@ class GrpcSpec extends AnyWordSpec, Matchers, ScalaFutures, BeforeAndAfterAll:
val bound = new RdfStreamServer(
RdfStreamServer.Options.fromConfig(conf.getConfig(s"pekko.grpc.client.$confKey")),
service
)(serverSystem).run().futureValue
)(using serverSystem).run().futureValue
(name, confKey, service, bound)
})

implicit val clientSystem: ActorSystem[_] = ActorSystem(Behaviors.empty, "TestClient", conf)
given clientSystem: ActorSystem[_] = ActorSystem(Behaviors.empty, "TestClient", conf)

override def afterAll(): Unit =
ActorTestKit.shutdown(clientSystem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import scala.concurrent.duration.*
import scala.jdk.CollectionConverters.*

class CrossStreamingSpec extends AnyWordSpec, Matchers, ScalaFutures, BeforeAndAfterAll:
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val ec: ExecutionContext = actorSystem.getDispatcher
implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 5.seconds, interval = 50.millis)
given actorSystem: ActorSystem = ActorSystem()
given ExecutionContext = actorSystem.getDispatcher
given PatienceConfig = PatienceConfig(timeout = 5.seconds, interval = 50.millis)

override def beforeAll(): Unit =
JenaSystem.init()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import scala.concurrent.ExecutionContext
import scala.jdk.CollectionConverters.*

case object JenaTestStream extends TestStream:
import eu.ostrzyciel.jelly.convert.jena.*
import eu.ostrzyciel.jelly.convert.jena.given

override def tripleSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) =
Source.fromIterator(() => AsyncParser.asyncParseTriples(is, Lang.NT, "").asScala)
Expand All @@ -33,20 +33,20 @@ case object JenaTestStream extends TestStream:
Source.fromIterator(() => graphs)
.via(EncoderFlow.namedGraphStream(Some(limiter), jellyOpt))

override def tripleSink(os: OutputStream)(implicit ec: ExecutionContext) =
override def tripleSink(os: OutputStream)(using ExecutionContext) =
Flow[RdfStreamFrame]
.via(DecoderFlow.decodeTriples.asFlatTripleStream())
// buffer the triples to avoid OOMs and keep some perf
.grouped(32)
.toMat(Sink.foreach(triples => RDFDataMgr.writeTriples(os, triples.iterator.asJava)))(Keep.right)

override def quadSink(os: OutputStream)(implicit ec: ExecutionContext) =
override def quadSink(os: OutputStream)(using ExecutionContext) =
Flow[RdfStreamFrame]
.via(DecoderFlow.decodeQuads.asFlatQuadStream())
.grouped(32)
.toMat(Sink.foreach(quads => RDFDataMgr.writeQuads(os, quads.iterator.asJava)))(Keep.right)

override def graphSink(os: OutputStream)(implicit ec: ExecutionContext) =
override def graphSink(os: OutputStream)(using ExecutionContext) =
Flow[RdfStreamFrame]
.via(DecoderFlow.decodeGraphs.asDatasetStreamOfQuads())
.toMat(Sink.foreach(quads => RDFDataMgr.writeQuads(os, quads.iterator.asJava)))(Keep.right)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import scala.concurrent.ExecutionContext
import scala.jdk.CollectionConverters.*

case object Rdf4jTestStream extends TestStream:
import eu.ostrzyciel.jelly.convert.rdf4j.*
import eu.ostrzyciel.jelly.convert.rdf4j.given

override def tripleSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions) =
// This buffers everything in memory... but I'm too lazy to implement my own RDFHandler for this
Expand Down Expand Up @@ -42,7 +42,7 @@ case object Rdf4jTestStream extends TestStream:
Source.fromIterator(() => graphs.iterator)
.via(EncoderFlow.namedGraphStream(Some(limiter), jellyOpt))

override def tripleSink(os: OutputStream)(implicit ec: ExecutionContext) =
override def tripleSink(os: OutputStream)(using ExecutionContext) =
val writer = Rio.createWriter(RDFFormat.TURTLESTAR, os)
writer.startRDF()
Flow[RdfStreamFrame]
Expand All @@ -53,7 +53,7 @@ case object Rdf4jTestStream extends TestStream:
Done
}))

override def quadSink(os: OutputStream)(implicit ec: ExecutionContext) =
override def quadSink(os: OutputStream)(using ExecutionContext) =
val writer = Rio.createWriter(RDFFormat.NQUADS, os)
writer.startRDF()
Flow[RdfStreamFrame]
Expand All @@ -64,7 +64,7 @@ case object Rdf4jTestStream extends TestStream:
Done
}))

override def graphSink(os: OutputStream)(implicit ec: ExecutionContext) =
override def graphSink(os: OutputStream)(using ExecutionContext) =
val writer = Rio.createWriter(RDFFormat.NQUADS, os)
writer.startRDF()
Flow[RdfStreamFrame]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ trait TestStream:
def graphSource(is: InputStream, limiter: SizeLimiter, jellyOpt: RdfStreamOptions):
Source[RdfStreamFrame, NotUsed]

def tripleSink(os: OutputStream)(implicit ec: ExecutionContext): Sink[RdfStreamFrame, Future[Done]]
def tripleSink(os: OutputStream)(using ExecutionContext): Sink[RdfStreamFrame, Future[Done]]

def quadSink(os: OutputStream)(implicit ec: ExecutionContext): Sink[RdfStreamFrame, Future[Done]]
def quadSink(os: OutputStream)(using ExecutionContext): Sink[RdfStreamFrame, Future[Done]]

def graphSink(os: OutputStream)(implicit ec: ExecutionContext): Sink[RdfStreamFrame, Future[Done]]
def graphSink(os: OutputStream)(using ExecutionContext): Sink[RdfStreamFrame, Future[Done]]
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileInputStre
* Tests for IO ser/des (Jena RIOT, RDF4J Rio, and semi-reactive IO over Pekko Streams).
*/
class IoSerDesSpec extends AnyWordSpec, Matchers, ScalaFutures, BeforeAndAfterAll:
implicit val as: ActorSystem = ActorSystem("test")
given ActorSystem = ActorSystem("test")

override def beforeAll(): Unit =
JenaSystem.init()
Expand Down Expand Up @@ -66,7 +66,7 @@ class IoSerDesSpec extends AnyWordSpec, Matchers, ScalaFutures, BeforeAndAfterAl
for (name, file) <- casesTriples do
s"ser/des file $name with preset $presetName, frame size $size" in {
val model = ser.readTriplesW3C(FileInputStream(file))
val originalSize = implicitly[Measure[TMSer]].size(model)
val originalSize = summon[Measure[TMSer]].size(model)
originalSize should be > 0L

val os = ByteArrayOutputStream()
Expand All @@ -77,7 +77,7 @@ class IoSerDesSpec extends AnyWordSpec, Matchers, ScalaFutures, BeforeAndAfterAl
data.size should be > 0

val model2 = des.readTriplesJelly(ByteArrayInputStream(data))
val deserializedSize = implicitly[Measure[TMDes]].size(model2)
val deserializedSize = summon[Measure[TMDes]].size(model2)
// Add -1 to account for the different statement counting of RDF4J and Jena
deserializedSize should be <= originalSize
deserializedSize should be >= originalSize - 1
Expand All @@ -86,7 +86,7 @@ class IoSerDesSpec extends AnyWordSpec, Matchers, ScalaFutures, BeforeAndAfterAl
for (name, file) <- casesQuads do
s"ser/des file $name with preset $presetName, frame size $size" in {
val ds = ser.readQuadsW3C(FileInputStream(file))
val originalSize = implicitly[Measure[TDSer]].size(ds)
val originalSize = summon[Measure[TDSer]].size(ds)
originalSize should be > 0L

val os = ByteArrayOutputStream()
Expand All @@ -97,7 +97,7 @@ class IoSerDesSpec extends AnyWordSpec, Matchers, ScalaFutures, BeforeAndAfterAl
data.size should be > 0

val ds2 = des.readQuadsJelly(ByteArrayInputStream(data))
val deserializedSize = implicitly[Measure[TDDes]].size(ds2)
val deserializedSize = summon[Measure[TDDes]].size(ds2)
// Add -2 to account for the different statement counting of RDF4J and Jena
deserializedSize should be <= originalSize
deserializedSize should be >= originalSize - 2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package eu.ostrzyciel.jelly.integration_tests.io

import eu.ostrzyciel.jelly.convert.jena.*
import eu.ostrzyciel.jelly.convert.jena.given
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions
import eu.ostrzyciel.jelly.stream.*
import org.apache.jena.query.Dataset
Expand All @@ -26,13 +26,13 @@ class JenaReactiveSerDes(implicit mat: Materializer) extends NativeSerDes[Model,
def writeQuadsJelly
(os: OutputStream, dataset: Dataset, opt: RdfStreamOptions, frameSize: Int): Unit =
val f = EncoderSource.fromDatasetAsQuads(dataset, ByteSizeLimiter(32_000), opt)
(jenaIterableAdapter, jenaConverterFactory)
(using jenaIterableAdapter, jenaConverterFactory)
.runWith(JellyIo.toIoStream(os))
Await.ready(f, 10.seconds)

def writeTriplesJelly
(os: OutputStream, model: Model, opt: RdfStreamOptions, frameSize: Int): Unit =
val f = EncoderSource.fromGraph(model, ByteSizeLimiter(32_000), opt)
(jenaIterableAdapter, jenaConverterFactory)
(using jenaIterableAdapter, jenaConverterFactory)
.runWith(JellyIo.toIoStream(os))
Await.ready(f, 10.seconds)
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import org.apache.jena.riot.{RDFDataMgr, RDFFormat, RDFLanguages}
import java.io.{InputStream, OutputStream}
import scala.jdk.CollectionConverters._

implicit val modelMeasure: Measure[Model] = (m: Model) => m.size()
implicit val datasetMeasure: Measure[Dataset] = (ds: Dataset) => ds.asDatasetGraph().find().asScala.size
given Measure[Model] = (m: Model) => m.size()
given Measure[Dataset] = (ds: Dataset) => ds.asDatasetGraph().find().asScala.size

object JenaSerDes extends NativeSerDes[Model, Dataset]:
val name = "Jena"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import java.io.{InputStream, OutputStream}
import scala.concurrent.Await
import scala.concurrent.duration.*

class Rdf4jReactiveSerDes(implicit mat: Materializer) extends NativeSerDes[Seq[Statement], Seq[Statement]]:
implicit val rdf4jConverter: Rdf4jConverterFactory.type = Rdf4jConverterFactory
class Rdf4jReactiveSerDes(using Materializer) extends NativeSerDes[Seq[Statement], Seq[Statement]]:
given Rdf4jConverterFactory.type = Rdf4jConverterFactory

override def name: String = "Reactive (RDF4J)"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.eclipse.rdf4j.rio.{RDFFormat, Rio}
import java.io.{InputStream, OutputStream}
import scala.jdk.CollectionConverters.*

implicit def seqMeasure[T]: Measure[Seq[T]] = (seq: Seq[T]) => seq.size
given seqMeasure[T]: Measure[Seq[T]] = (seq: Seq[T]) => seq.size

object Rdf4jSerDes extends NativeSerDes[Seq[Statement], Seq[Statement]]:
val name = "RDF4J"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package eu.ostrzyciel.jelly.convert

package object jena:
implicit val jenaIterableAdapter: JenaIterableAdapter.type = JenaIterableAdapter
implicit val jenaConverterFactory: JenaConverterFactory.type = JenaConverterFactory
given jenaIterableAdapter: JenaIterableAdapter.type = JenaIterableAdapter
given jenaConverterFactory: JenaConverterFactory.type = JenaConverterFactory
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package eu.ostrzyciel.jelly.convert

package object rdf4j:
implicit val rdf4jIterableAdapter: Rdf4jIterableAdapter.type = Rdf4jIterableAdapter
implicit val rdf4jConverterFactory: Rdf4jConverterFactory.type = Rdf4jConverterFactory
given rdf4jIterableAdapter: Rdf4jIterableAdapter.type = Rdf4jIterableAdapter
given rdf4jConverterFactory: Rdf4jConverterFactory.type = Rdf4jConverterFactory
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,12 @@ object DecoderFlow:
InterpretableAs.AnyStream:

override def asGroupedStream[TNode, TTriple, TQuad]
(implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, TQuad]):
(using factory: ConverterFactory[?, ?, TNode, ?, TTriple, TQuad]):
Flow[RdfStreamFrame, IterableOnce[TTriple | TQuad], NotUsed] =
groupedStream(factory.anyStatementDecoder)

override def asFlatStream[TTriple, TQuad]
(implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, TQuad]):
(using factory: ConverterFactory[?, ?, ?, ?, TTriple, TQuad]):
Flow[RdfStreamFrame, TTriple | TQuad, NotUsed] =
flatStream(factory.anyStatementDecoder)

Expand Down Expand Up @@ -271,7 +271,7 @@ object DecoderFlow:
* @return Pekko Streams flow
*/
def asGroupedStream[TNode, TTriple, TQuad]
(implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, TQuad]):
(using factory: ConverterFactory[?, ?, TNode, ?, TTriple, TQuad]):
Flow[RdfStreamFrame, IterableOnce[TTriple | TQuad], NotUsed]

/**
Expand All @@ -286,5 +286,5 @@ object DecoderFlow:
* @return Pekko Streams flow
*/
def asFlatStream[TTriple, TQuad]
(implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, TQuad]):
(using factory: ConverterFactory[?, ?, ?, ?, TTriple, TQuad]):
Flow[RdfStreamFrame, TTriple | TQuad, NotUsed]
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object EncoderFlow:
* @return Pekko Streams flow.
*/
final def flatTripleStream[TTriple](limiter: SizeLimiter, opt: RdfStreamOptions)
(implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]):
(using factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]):
Flow[TTriple, RdfStreamFrame, NotUsed] =
val encoder = factory.encoder(makeOptions(opt, PhysicalStreamType.TRIPLES, LogicalStreamType.FLAT_TRIPLES))
flatFlow(e => encoder.addTripleStatement(e), limiter)
Expand All @@ -50,7 +50,7 @@ object EncoderFlow:
* @return Pekko Streams flow.
*/
final def flatQuadStream[TQuad](limiter: SizeLimiter, opt: RdfStreamOptions)
(implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]):
(using factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]):
Flow[TQuad, RdfStreamFrame, NotUsed] =
val encoder = factory.encoder(makeOptions(opt, PhysicalStreamType.QUADS, LogicalStreamType.FLAT_QUADS))
flatFlow(e => encoder.addQuadStatement(e), limiter)
Expand All @@ -71,7 +71,7 @@ object EncoderFlow:
* @return Pekko Streams flow.
*/
final def flatTripleStreamGrouped[TTriple](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions)
(implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]):
(using factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]):
Flow[IterableOnce[TTriple], RdfStreamFrame, NotUsed] =
val encoder = factory.encoder(makeOptions(opt, PhysicalStreamType.TRIPLES, LogicalStreamType.FLAT_TRIPLES))
groupedFlow(e => encoder.addTripleStatement(e), maybeLimiter)
Expand All @@ -92,7 +92,7 @@ object EncoderFlow:
* @return Pekko Streams flow.
*/
final def graphStream[TTriple](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions)
(implicit factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]):
(using factory: ConverterFactory[?, ?, ?, ?, TTriple, ?]):
Flow[IterableOnce[TTriple], RdfStreamFrame, NotUsed] =
val encoder = factory.encoder(makeOptions(opt, PhysicalStreamType.TRIPLES, LogicalStreamType.GRAPHS))
groupedFlow(e => encoder.addTripleStatement(e), maybeLimiter)
Expand All @@ -113,7 +113,7 @@ object EncoderFlow:
* @return Pekko Streams flow.
*/
final def flatQuadStreamGrouped[TQuad](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions)
(implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]):
(using factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]):
Flow[IterableOnce[TQuad], RdfStreamFrame, NotUsed] =
val encoder = factory.encoder(makeOptions(opt, PhysicalStreamType.QUADS, LogicalStreamType.FLAT_QUADS))
groupedFlow(e => encoder.addQuadStatement(e), maybeLimiter)
Expand All @@ -134,7 +134,7 @@ object EncoderFlow:
* @return Pekko Streams flow.
*/
final def datasetStreamFromQuads[TQuad](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions)
(implicit factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]):
(using factory: ConverterFactory[?, ?, ?, ?, ?, TQuad]):
Flow[IterableOnce[TQuad], RdfStreamFrame, NotUsed] =
val encoder = factory.encoder(makeOptions(opt, PhysicalStreamType.QUADS, LogicalStreamType.DATASETS))
groupedFlow(e => encoder.addQuadStatement(e), maybeLimiter)
Expand All @@ -158,7 +158,7 @@ object EncoderFlow:
* @return Pekko Streams flow.
*/
final def namedGraphStream[TNode, TTriple](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions)
(implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]):
(using factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]):
Flow[(TNode, Iterable[TTriple]), RdfStreamFrame, NotUsed] =
val encoder = factory.encoder(makeOptions(opt, PhysicalStreamType.GRAPHS, LogicalStreamType.NAMED_GRAPHS))
Flow[(TNode, Iterable[TTriple])]
Expand All @@ -185,7 +185,7 @@ object EncoderFlow:
* @return Pekko Streams flow.
*/
final def datasetStream[TNode, TTriple](maybeLimiter: Option[SizeLimiter], opt: RdfStreamOptions)
(implicit factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]):
(using factory: ConverterFactory[?, ?, TNode, ?, TTriple, ?]):
Flow[IterableOnce[(TNode, Iterable[TTriple])], RdfStreamFrame, NotUsed] =
val encoder = factory.encoder(makeOptions(opt, PhysicalStreamType.GRAPHS, LogicalStreamType.DATASETS))
groupedFlow[(TNode, Iterable[TTriple])](graphAsIterable(encoder), maybeLimiter)
Expand Down
Loading

0 comments on commit e1a3e41

Please sign in to comment.