diff --git a/build.sbt b/build.sbt index 17bfcb5f2..49d2a7e75 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ addCommandAlias("prepare", "fmt") addCommandAlias( "testJVM", - "zioJsonJVM/test; zioJsonYaml/test; zioJsonGolden/test; zioJsonMacrosJVM/test; zioJsonInteropHttp4s/test; zioJsonInteropScalaz7xJVM/test; zioJsonInteropScalaz7xJS/test; zioJsonInteropRefinedJVM/test; zioJsonInteropRefinedJS/test" + "zioJsonJVM/test; zioJsonYaml/test; zioJsonMacrosJVM/test; zioJsonInteropHttp4s/test; zioJsonInteropScalaz7xJVM/test; zioJsonInteropScalaz7xJS/test; zioJsonInteropRefinedJVM/test; zioJsonInteropRefinedJS/test" ) addCommandAlias( @@ -35,7 +35,7 @@ addCommandAlias( addCommandAlias("testJS", "zioJsonJS/test") -val zioVersion = "2.0.0-RC6" +val zioVersion = "2.0.0" lazy val root = project .in(file(".")) @@ -48,7 +48,6 @@ lazy val root = project zioJsonJVM, zioJsonJS, zioJsonYaml, - zioJsonGolden, zioJsonMacrosJVM, zioJsonMacrosJS, zioJsonInteropHttp4s, @@ -227,12 +226,12 @@ lazy val zioJsonGolden = project .settings(buildInfoSettings("zio.json.golden")) .settings( libraryDependencies ++= Seq( - "dev.zio" %% "zio" % zioVersion, - "dev.zio" %% "zio-nio" % "2.0.0-RC7", - "dev.zio" %% "zio-test" % zioVersion, - "dev.zio" %% "zio-test-sbt" % zioVersion, - "dev.zio" %% "zio-test-magnolia" % zioVersion, - "org.scala-lang" % "scala-reflect" % scalaVersion.value % Provided, + "dev.zio" %% "zio" % zioVersion, + "dev.zio" %% "zio-nio" % "2.0.0", + "dev.zio" %% "zio-test" % zioVersion, + "dev.zio" %% "zio-test-sbt" % zioVersion, + "dev.zio" %% "zio-test-magnolia" % zioVersion, + "org.scala-lang" % "scala-reflect" % scalaVersion.value % Provided ), testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework") ) @@ -287,9 +286,9 @@ lazy val zioJsonInteropHttp4s = project "org.http4s" %% "http4s-dsl" % "0.23.7", "dev.zio" %% "zio" % zioVersion, "org.typelevel" %% "cats-effect" % "3.3.0", - "dev.zio" %% "zio-interop-cats" % "3.3.0-RC7" % "test", - "dev.zio" %% "zio-test" % zioVersion % "test", - "dev.zio" %% "zio-test-sbt" % zioVersion % "test" + "dev.zio" %% "zio-interop-cats" % "3.3.0" % "test", + "dev.zio" %% "zio-test" % zioVersion % "test", + "dev.zio" %% "zio-test-sbt" % zioVersion % "test" ), testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework") ) diff --git a/zio-json-interop-scalaz7x/shared/src/test/scala/zio/json/interop/scalaz/ScalazSpec.scala b/zio-json-interop-scalaz7x/shared/src/test/scala/zio/json/interop/scalaz/ScalazSpec.scala index d620f9b29..06e59edbd 100644 --- a/zio-json-interop-scalaz7x/shared/src/test/scala/zio/json/interop/scalaz/ScalazSpec.scala +++ b/zio-json-interop-scalaz7x/shared/src/test/scala/zio/json/interop/scalaz/ScalazSpec.scala @@ -6,8 +6,8 @@ import zio.json.interop.scalaz7x._ import zio.test.Assertion._ import zio.test._ -object ScalaSpec extends ZIOSpecDefault { - val spec: Spec[Environment, Any] = +object ScalazSpec extends ZIOSpecDefault { + def spec = suite("Scalaz")( test("scalaz.IList[A]") { assert(IList[Int]().toJson)(equalTo("[]")) && diff --git a/zio-json/jvm/src/jmh/scala/zio/json/UUIDBenchmarks.scala b/zio-json/jvm/src/jmh/scala/zio/json/UUIDBenchmarks.scala index 3cd96fd06..6b987e219 100644 --- a/zio-json/jvm/src/jmh/scala/zio/json/UUIDBenchmarks.scala +++ b/zio-json/jvm/src/jmh/scala/zio/json/UUIDBenchmarks.scala @@ -1,7 +1,7 @@ package zio.json import org.openjdk.jmh.annotations._ -import zio.Chunk +import zio.{ Chunk, Unsafe } import zio.json.uuid.UUIDParser import zio.test.Gen @@ -29,7 +29,11 @@ class UUIDBenchmarks { s5 <- section5 } yield s"$s1-$s2-$s3-$s4-$s5" - unparsedUUIDChunk = zio.Runtime.default.unsafeRun(gen.runCollectN(10000).map(Chunk.fromIterable)) + unparsedUUIDChunk = { + Unsafe.unsafeCompat { implicit u => + zio.Runtime.default.unsafe.run(gen.runCollectN(10000).map(Chunk.fromIterable)).getOrThrow() + } + } } @Benchmark diff --git a/zio-json/jvm/src/main/scala/zio/json/JsonDecoderPlatformSpecific.scala b/zio-json/jvm/src/main/scala/zio/json/JsonDecoderPlatformSpecific.scala index 9ff00b033..2e91c2a95 100644 --- a/zio-json/jvm/src/main/scala/zio/json/JsonDecoderPlatformSpecific.scala +++ b/zio-json/jvm/src/main/scala/zio/json/JsonDecoderPlatformSpecific.scala @@ -53,118 +53,127 @@ trait JsonDecoderPlatformSpecific[A] { self: JsonDecoder[A] => final def decodeJsonPipeline( delimiter: JsonStreamDelimiter = JsonStreamDelimiter.Array - ): ZPipeline[Any, Throwable, Char, A] = - ZPipeline.fromPush { - for { - // format: off - runtime <- ZIO.runtime[Any] - inQueue <- Queue.unbounded[Take[Nothing, Char]] - outQueue <- Queue.unbounded[Take[Throwable, A]] - ended <- Ref.make(false) - reader <- ZIO.fromAutoCloseable { - ZIO.succeed { - def readPull: Iterator[Chunk[Char]] = - runtime.unsafeRun(inQueue.take) - .fold( - end = Iterator.empty, - error = _ => Iterator.empty, // impossible - value = v => Iterator.single(v) ++ readPull - ) - - new zio.stream.internal.ZReader(Iterator.empty ++ readPull) - } - } - jsonReader <- ZIO.fromAutoCloseable(ZIO.succeed(new WithRetractReader(reader))) - process <- ZIO.attemptBlockingInterrupt { - // Exceptions fall through and are pushed into the queue - @tailrec def loop(atBeginning: Boolean): Unit = { - val nextElem = try { - if (atBeginning && delimiter == JsonStreamDelimiter.Array) { - Lexer.char(Nil, jsonReader, '[') - - jsonReader.nextNonWhitespace() match { - case ']' => - // returning empty here instead of falling through, which would - // attempt to decode a value that we know doesn’t exist. - return () - - case _ => - jsonReader.retract() - } - } else { - delimiter match { - case JsonStreamDelimiter.Newline => - jsonReader.readChar() match { - case '\r' => - jsonReader.readChar() match { - case '\n' => () - case _ => jsonReader.retract() - } - case '\n' => () - case _ => jsonReader.retract() - } - - case JsonStreamDelimiter.Array => - jsonReader.nextNonWhitespace() match { - case ',' | ']' => () - case _ => jsonReader.retract() - } - } + ): ZPipeline[Any, Throwable, Char, A] = { + Unsafe.unsafeCompat { (u: Unsafe) => + implicit val unsafe: Unsafe = u + + ZPipeline.fromPush { + for { + // format: off + runtime <- ZIO.runtime[Any] + inQueue <- Queue.unbounded[Take[Nothing, Char]] + outQueue <- Queue.unbounded[Take[Throwable, A]] + ended <- Ref.make(false) + reader <- ZIO.fromAutoCloseable { + ZIO.succeed { + def readPull: Iterator[Chunk[Char]] = + runtime.unsafe.run(inQueue.take).getOrThrow() + .fold( + end = Iterator.empty, + error = _ => Iterator.empty, // impossible + value = v => Iterator.single(v) ++ readPull + ) + + new zio.stream.internal.ZReader(Iterator.empty ++ readPull) + } + } + jsonReader <- ZIO.fromAutoCloseable(ZIO.succeed(new WithRetractReader(reader))) + process <- ZIO.attemptBlockingInterrupt { + // Exceptions fall through and are pushed into the queue + @tailrec def loop(atBeginning: Boolean): Unit = { + val nextElem = try { + if (atBeginning && delimiter == JsonStreamDelimiter.Array) { + Lexer.char(Nil, jsonReader, '[') + + jsonReader.nextNonWhitespace() match { + case ']' => + // returning empty here instead of falling through, which would + // attempt to decode a value that we know doesn’t exist. + return () + + case _ => + jsonReader.retract() + } + } else { + delimiter match { + case JsonStreamDelimiter.Newline => + jsonReader.readChar() match { + case '\r' => + jsonReader.readChar() match { + case '\n' => () + case _ => jsonReader.retract() } + case '\n' => () + case _ => jsonReader.retract() + } - unsafeDecode(Nil, jsonReader) - } catch { - case t @ JsonDecoder.UnsafeJson(trace) => - throw new Exception(JsonError.render(trace)) - } - - runtime.unsafeRun(outQueue.offer(Take.single(nextElem))) - - loop(false) + case JsonStreamDelimiter.Array => + jsonReader.nextNonWhitespace() match { + case ',' | ']' => () + case _ => jsonReader.retract() } + } + } - loop(true) - } - .catchAll { - case t: zio.json.internal.UnexpectedEnd => - // swallow if stream ended - ZIO.unlessZIO(ended.get) { - outQueue.offer(Take.fail(t)) - } - - case t: Throwable => - outQueue.offer(Take.fail(t)) - } - .interruptible - .forkScoped - push = { (is: Option[Chunk[Char]]) => - val pollElements: IO[Throwable, Chunk[A]] = - outQueue - .takeUpTo(ZStream.DefaultChunkSize) - .flatMap { takes => - ZIO.foldLeft(takes)(Chunk[A]()) { case (acc, take) => - take.fold(ZIO.succeedNow(acc), e => ZIO.fail(e.squash), c => ZIO.succeedNow(acc ++ c)) + unsafeDecode(Nil, jsonReader) + } catch { + case t @ JsonDecoder.UnsafeJson(trace) => + throw new Exception(JsonError.render(trace)) } - } - val pullRest = - outQueue - .takeAll - .flatMap { takes => - ZIO.foldLeft(takes)(Chunk[A]()) { case (acc, take) => - take.fold(ZIO.succeedNow(acc), e => ZIO.fail(e.squash), c => ZIO.succeedNow(acc ++ c)) + Unsafe.unsafeCompat { (u: Unsafe) => + implicit val unsafe: Unsafe = u + + runtime.unsafe.run(outQueue.offer(Take.single(nextElem))).getOrThrow() } - } - is match { - case Some(c) => - inQueue.offer(Take.chunk(c)) *> pollElements + loop(false) + } - case None => - ended.set(true) *> inQueue.offer(Take.end) *> process.join *> pullRest - } + loop(true) + } + .catchAll { + case t: zio.json.internal.UnexpectedEnd => + // swallow if stream ended + ZIO.unlessZIO(ended.get) { + outQueue.offer(Take.fail(t)) + } + + case t: Throwable => + outQueue.offer(Take.fail(t)) + } + .interruptible + .forkScoped + push = { (is: Option[Chunk[Char]]) => + val pollElements: IO[Throwable, Chunk[A]] = + outQueue + .takeUpTo(ZStream.DefaultChunkSize) + .flatMap { takes => + ZIO.foldLeft(takes)(Chunk[A]()) { case (acc, take) => + take.fold(ZIO.succeedNow(acc), e => ZIO.fail(e.squash), c => ZIO.succeedNow(acc ++ c)) + } + } + + val pullRest = + outQueue + .takeAll + .flatMap { takes => + ZIO.foldLeft(takes)(Chunk[A]()) { case (acc, take) => + take.fold(ZIO.succeedNow(acc), e => ZIO.fail(e.squash), c => ZIO.succeedNow(acc ++ c)) + } + } + + is match { + case Some(c) => + inQueue.offer(Take.chunk(c)) *> pollElements + + case None => + ended.set(true) *> inQueue.offer(Take.end) *> process.join *> pullRest + } + } + } yield push + // format: on } - } yield push - // format: on } + } } diff --git a/zio-json/jvm/src/main/scala/zio/json/JsonEncoderPlatformSpecific.scala b/zio-json/jvm/src/main/scala/zio/json/JsonEncoderPlatformSpecific.scala index 953c3fb34..7ded7d221 100644 --- a/zio-json/jvm/src/main/scala/zio/json/JsonEncoderPlatformSpecific.scala +++ b/zio-json/jvm/src/main/scala/zio/json/JsonEncoderPlatformSpecific.scala @@ -2,7 +2,7 @@ package zio.json import zio.json.internal.WriteWriter import zio.stream._ -import zio.{ Chunk, Ref, ZIO } +import zio.{ Chunk, Ref, Unsafe, ZIO } trait JsonEncoderPlatformSpecific[A] { self: JsonEncoder[A] => @@ -17,54 +17,58 @@ trait JsonEncoderPlatformSpecific[A] { self: JsonEncoder[A] => delimiter: Option[Char], endWith: Option[Char] ): ZPipeline[Any, Throwable, A, Char] = - ZPipeline.fromPush { - for { - runtime <- ZIO.runtime[Any] - chunkBuffer <- Ref.make(Chunk.fromIterable(startWith.toList)) - writer <- ZIO.fromAutoCloseable { - ZIO.succeed { - new java.io.BufferedWriter( - new java.io.Writer { - override def write(buffer: Array[Char], offset: Int, len: Int): Unit = { - val copy = new Array[Char](len) - System.arraycopy(buffer, offset, copy, 0, len) + Unsafe.unsafeCompat { (u: Unsafe) => + implicit val unsafe: Unsafe = u - val chunk = Chunk.fromArray(copy).drop(offset).take(len) - runtime.unsafeRun(chunkBuffer.update(_ ++ chunk)) - } + ZPipeline.fromPush { + for { + runtime <- ZIO.runtime[Any] + chunkBuffer <- Ref.make(Chunk.fromIterable(startWith.toList)) + writer <- ZIO.fromAutoCloseable { + ZIO.succeed { + new java.io.BufferedWriter( + new java.io.Writer { + override def write(buffer: Array[Char], offset: Int, len: Int): Unit = { + val copy = new Array[Char](len) + System.arraycopy(buffer, offset, copy, 0, len) - override def close(): Unit = () - override def flush(): Unit = () - }, - ZStream.DefaultChunkSize - ) + val chunk = Chunk.fromArray(copy).drop(offset).take(len) + runtime.unsafe.run(chunkBuffer.update(_ ++ chunk)).getOrThrow() + } + + override def close(): Unit = () + override def flush(): Unit = () + }, + ZStream.DefaultChunkSize + ) + } } - } - writeWriter <- ZIO.succeed(new WriteWriter(writer)) - push = { (is: Option[Chunk[A]]) => - val pushChars = chunkBuffer.getAndUpdate(c => if (c.isEmpty) c else Chunk()) + writeWriter <- ZIO.succeed(new WriteWriter(writer)) + push = { (is: Option[Chunk[A]]) => + val pushChars = chunkBuffer.getAndUpdate(c => if (c.isEmpty) c else Chunk()) - is match { - case None => - ZIO.attemptBlocking(writer.close()) *> pushChars.map { terminal => - endWith.fold(terminal) { last => - // Chop off terminal deliminator - (if (delimiter.isDefined) terminal.dropRight(1) else terminal) :+ last + is match { + case None => + ZIO.attemptBlocking(writer.close()) *> pushChars.map { terminal => + endWith.fold(terminal) { last => + // Chop off terminal deliminator + (if (delimiter.isDefined) terminal.dropRight(1) else terminal) :+ last + } } - } - case Some(xs) => - ZIO.attemptBlocking { - for (x <- xs) { - unsafeEncode(x, indent = None, writeWriter) + case Some(xs) => + ZIO.attemptBlocking { + for (x <- xs) { + unsafeEncode(x, indent = None, writeWriter) - for (s <- delimiter) - writeWriter.write(s) - } - } *> pushChars + for (s <- delimiter) + writeWriter.write(s) + } + } *> pushChars + } } - } - } yield push + } yield push + } } final val encodeJsonLinesPipeline: ZPipeline[Any, Throwable, A, Char] =