Skip to content

Commit

Permalink
Update ZIO from 2.0.0-RC2 to 2.0.0-RC3 (#594)
Browse files Browse the repository at this point in the history
* Update ZIO from 2.0.0-RC2 to 2.0.0-RC3

This updates ZIO to 2.0.0-RC3, and replaces `ZManaged` with `Scope`.
For reference, see: <zio/zio#6420>

* Don't leak `Scope` in `ZStream`'s `R`

`s/ZStream\[Scope/ZStream[Any/g`

* Wrap `.use` -> `.flatMap` updates in `ZIO.scoped`

This ensures that open resources are closed when the functions return.
  • Loading branch information
earldouglas authored Mar 21, 2022
1 parent 6f06792 commit ddf34c1
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 33 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ addCommandAlias(

addCommandAlias("testJS", "zioJsonJS/test")

val zioVersion = "2.0.0-RC2"
val zioVersion = "2.0.0-RC3"

lazy val root = project
.in(file("."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ trait JsonPackagePlatformSpecific {
readJsonLinesAs(Paths.get(path))

def readJsonLinesAs[A: JsonDecoder](url: URL): ZStream[Any, Throwable, A] = {
val managed = ZManaged
val scoped = ZIO
.fromAutoCloseable(ZIO.attempt(url.openStream()))
.refineToOrDie[IOException]

ZStream
.fromInputStreamManaged(managed)
.fromInputStreamScoped(scoped)
.via(
ZPipeline.utf8Decode >>>
stringToChars >>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@ trait JsonDecoderPlatformSpecific[A] { self: JsonDecoder[A] =>
final def decodeJsonStreamInput[R](
stream: ZStream[R, Throwable, Byte],
charset: Charset = StandardCharsets.UTF_8
): ZIO[R, Throwable, A] =
stream.toInputStream
.flatMap(is => ZManaged.fromAutoCloseable(UIO(new java.io.InputStreamReader(is, charset))))
.use(readAll)
): ZIO[R with Scope, Throwable, A] =
ZIO.scoped[R] {
stream.toInputStream
.flatMap(is =>
ZIO
.fromAutoCloseable(ZIO.succeed(new java.io.InputStreamReader(is, charset)))
.flatMap(readAll)
)
}

/**
* Attempts to decode a stream of characters into a single value of type `A`, but may fail with
Expand All @@ -43,21 +48,21 @@ trait JsonDecoderPlatformSpecific[A] { self: JsonDecoder[A] =>
*
* @see also [[decodeJsonStreamInput]]
*/
final def decodeJsonStream[R](stream: ZStream[R, Throwable, Char]): ZIO[R, Throwable, A] =
stream.toReader.use(readAll)
final def decodeJsonStream[R](stream: ZStream[R, Throwable, Char]): ZIO[R with Scope, Throwable, A] =
ZIO.scoped[R](stream.toReader.flatMap(readAll))

final def decodeJsonPipeline(
delimiter: JsonStreamDelimiter = JsonStreamDelimiter.Array
): ZPipeline[Any, Throwable, Char, A] =
ZPipeline.fromPush {
for {
// format: off
runtime <- ZManaged.runtime[Any]
inQueue <- Queue.unbounded[Take[Nothing, Char]].toManaged
outQueue <- Queue.unbounded[Take[Throwable, A]].toManaged
ended <- Ref.makeManaged(false)
reader <- ZManaged.fromAutoCloseable {
UIO {
runtime <- ZIO.runtime[Any]
inQueue <- Queue.unbounded[Take[Nothing, Char]]
outQueue <- Queue.unbounded[Take[Throwable, A]]
ended <- Ref.make(false)
reader <- ZIO.fromAutoCloseable {
ZIO.succeed {
def readPull: Iterator[Chunk[Char]] =
runtime.unsafeRun(inQueue.take)
.fold(
Expand All @@ -69,7 +74,7 @@ trait JsonDecoderPlatformSpecific[A] { self: JsonDecoder[A] =>
new zio.stream.internal.ZReader(Iterator.empty ++ readPull)
}
}
jsonReader <- ZManaged.fromAutoCloseable(UIO(new WithRetractReader(reader)))
jsonReader <- ZIO.fromAutoCloseable(ZIO.succeed(new WithRetractReader(reader)))
process <- ZIO.attemptBlockingInterrupt {
// Exceptions fall through and are pushed into the queue
@tailrec def loop(atBeginning: Boolean): Unit = {
Expand Down Expand Up @@ -131,7 +136,7 @@ trait JsonDecoderPlatformSpecific[A] { self: JsonDecoder[A] =>
outQueue.offer(Take.fail(t))
}
.interruptible
.forkManaged
.forkScoped
push = { (is: Option[Chunk[Char]]) =>
val pollElements: IO[Throwable, Chunk[A]] =
outQueue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package zio.json

import zio.json.internal.WriteWriter
import zio.stream._
import zio.{ Chunk, Ref, ZIO, ZManaged }
import zio.{ Chunk, Ref, ZIO }

trait JsonEncoderPlatformSpecific[A] { self: JsonEncoder[A] =>

Expand All @@ -19,9 +19,9 @@ trait JsonEncoderPlatformSpecific[A] { self: JsonEncoder[A] =>
): ZPipeline[Any, Throwable, A, Char] =
ZPipeline.fromPush {
for {
runtime <- ZIO.runtime[Any].toManaged
chunkBuffer <- Ref.makeManaged(Chunk.fromIterable(startWith.toList))
writer <- ZManaged.fromAutoCloseable {
runtime <- ZIO.runtime[Any]
chunkBuffer <- Ref.make(Chunk.fromIterable(startWith.toList))
writer <- ZIO.fromAutoCloseable {
ZIO.succeed {
new java.io.BufferedWriter(
new java.io.Writer {
Expand All @@ -40,7 +40,7 @@ trait JsonEncoderPlatformSpecific[A] { self: JsonEncoder[A] =>
)
}
}
writeWriter <- ZManaged.succeed(new WriteWriter(writer))
writeWriter <- ZIO.succeed(new WriteWriter(writer))
push = { (is: Option[Chunk[A]]) =>
val pushChars = chunkBuffer.getAndUpdate(c => if (c.isEmpty) c else Chunk())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import zio.test._
import java.nio.charset.StandardCharsets
import java.nio.file.Paths

object DecoderPlatformSpecificSpec extends DefaultRunnableSpec {
object DecoderPlatformSpecificSpec extends ZIOSpecDefault {

def spec: Spec[TestEnvironment, TestFailure[Any], TestSuccess] =
def spec: Spec[TestEnvironment with Scope, TestFailure[Any], TestSuccess] =
suite("Decoder")(
test("excessively nested structures") {
// JVM specific: getResourceAsString not yet supported
Expand Down Expand Up @@ -91,12 +91,14 @@ object DecoderPlatformSpecificSpec extends DefaultRunnableSpec {
// impl is covered by the tests

getResourceAsStringM("che-2.geo.json").flatMap { str =>
ZManaged.fromAutoCloseable(Task(getResourceAsReader("che-2.geo.json"))).use { reader =>
for {
circe <- ZIO.fromEither(circe.parser.decode[GeoJSON](str))
got <- ZIO.attemptBlocking(JsonDecoder[GeoJSON].unsafeDecode(Nil, reader))
} yield {
assert(got)(equalTo(circe))
ZIO.scoped[TestEnvironment] {
ZIO.fromAutoCloseable(ZIO.attempt(getResourceAsReader("che-2.geo.json"))).flatMap { reader =>
for {
circe <- ZIO.fromEither(circe.parser.decode[GeoJSON](str))
got <- ZIO.attemptBlocking(JsonDecoder[GeoJSON].unsafeDecode(Nil, reader))
} yield {
assert(got)(equalTo(circe))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import testzio.json.data.geojson.generated._
import testzio.json.data.googlemaps._
import testzio.json.data.twitter._
import zio.Chunk
import zio.Scope
import zio.json.ast.Json
import zio.stream.ZStream
import zio.test.Assertion._
import zio.test.{ DefaultRunnableSpec, TestEnvironment, assert, _ }
import zio.test.{ ZIOSpecDefault, TestEnvironment, assert, _ }

import java.io.IOException
import java.nio.file.Files

object EncoderPlatformSpecificSpec extends DefaultRunnableSpec {
object EncoderPlatformSpecificSpec extends ZIOSpecDefault {
import testzio.json.DecoderSpec.logEvent._

def spec: Spec[TestEnvironment, TestFailure[Any], TestSuccess] =
def spec: Spec[TestEnvironment with Scope, TestFailure[Any], TestSuccess] =
suite("Encoder")(
suite("roundtrip")(
testRoundTrip[DistanceMatrix]("google_maps_api_response"),
Expand Down

0 comments on commit ddf34c1

Please sign in to comment.