From 9ae0f6212685b27c09fa1361504fb755f2f22183 Mon Sep 17 00:00:00 2001 From: Gregor Rayman Date: Tue, 10 Sep 2024 00:16:30 +0200 Subject: [PATCH] Fixes #737 JsonCodec can encode and decode multiple elements --- .../scala/zio/schema/codec/JsonCodec.scala | 129 +++++++++++++++--- .../zio/schema/codec/JsonCodecSpec.scala | 101 +++++++++++++- 2 files changed, 205 insertions(+), 25 deletions(-) diff --git a/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala b/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala index 9eecca635..72c84d988 100644 --- a/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala +++ b/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala @@ -2,27 +2,25 @@ package zio.schema.codec import java.nio.CharBuffer import java.nio.charset.StandardCharsets - import scala.annotation.{ switch, tailrec } import scala.collection.immutable.ListMap - import zio.json.JsonCodec._ import zio.json.JsonDecoder.{ JsonError, UnsafeJson } import zio.json.ast.Json import zio.json.internal.{ Lexer, RecordingReader, RetractReader, StringMatrix, WithRecordingReader, Write } import zio.json.{ + JsonFieldDecoder, + JsonFieldEncoder, JsonCodec => ZJsonCodec, JsonDecoder => ZJsonDecoder, - JsonEncoder => ZJsonEncoder, - JsonFieldDecoder, - JsonFieldEncoder + JsonEncoder => ZJsonEncoder } import zio.prelude.NonEmptyMap import zio.schema._ import zio.schema.annotation._ import zio.schema.codec.DecodeError.ReadError -import zio.stream.ZPipeline -import zio.{ Cause, Chunk, ChunkBuilder, NonEmptyChunk, ZIO } +import zio.stream.{ ZChannel, ZPipeline } +import zio.{ Cause, Chunk, ChunkBuilder, NonEmptyChunk, ZIO, ZNothing } object JsonCodec { @@ -47,11 +45,7 @@ object JsonCodec { override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] = ZPipeline.fromChannel( ZPipeline.utfDecode.channel.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage)) - ) >>> - ZPipeline.groupAdjacentBy[String, Unit](_ => ()) >>> - ZPipeline.map[(Unit, NonEmptyChunk[String]), String] { - case (_, fragments) => fragments.mkString - } >>> + ) >>> splitOnJsonBoundary >>> ZPipeline.mapZIO { (s: String) => ZIO .fromEither(jsonCodec.decodeJson(s)) @@ -62,14 +56,110 @@ object JsonCodec { JsonEncoder.charSequenceToByteChunk(jsonCodec.encodeJson(value, None)) override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] = - ZPipeline.mapChunks( - _.flatMap(encode) - ) + ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks } implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): BinaryCodec[A] = schemaBasedBinaryCodec[A](JsonCodec.Config.default) + val splitOnJsonBoundary: ZPipeline[Any, Nothing, String, String] = { + val validNumChars = Set('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'E', 'e', '-', '+', '.') + ZPipeline.suspend { + val stringBuilder = new StringBuilder + var depth = 0 + var valueType = 'j' // j = json, s = string, b = boolean, u = null, n = number, x = null after first null, e = escape in string + + def fetchChunk(chunk: Chunk[String]): Chunk[String] = { + val chunkBuilder = ChunkBuilder.make[String]() + for { + string <- chunk + c <- string + } { + var valueEnded = false + valueType match { + case 'e' => + valueType = 's' + case 's' => + c match { + case '\\' => valueType = 'e' + case '"' => + valueType = 'j' + valueEnded = true + case _ => + } + case 'b' => + if (c == 'e') { + valueType = 'j' + valueEnded = true + } + case 'u' => + if (c == 'l') { + valueType = 'x' + } + case 'x' => + if (c == 'l') { + valueType = 'j' + valueEnded = true + } + case 'n' => + c match { + case '}' | ']' => + depth -= 1 + valueType = 'j' + valueEnded = true + case _ if !validNumChars(c) => + valueType = 'j' + valueEnded = true + case _ => + } + case 'j' => + c match { + case '{' | '[' => + depth += 1 + case '}' | ']' => + depth -= 1 + valueEnded = true + case '"' => + valueType = 's' + case 't' | 'f' => + valueType = 'b' + case 'n' => + valueType = 'u' + case x if validNumChars(x) => + valueType = 'n' + case _ => + } + } + if (depth > 0 || valueType != 'j' || valueEnded) + stringBuilder.append(c) + + if (valueEnded && depth == 0) { + val str = stringBuilder.result() + if (!str.isBlank) chunkBuilder += str + stringBuilder.clear() + } + } + chunkBuilder.result() + } + + lazy val loop: ZChannel[Any, ZNothing, Chunk[String], Any, Nothing, Chunk[String], Any] = + ZChannel.readWithCause( + in => { + val out = fetchChunk(in) + if (out.isEmpty) loop else ZChannel.write(out) *> loop + }, + err => + if (stringBuilder.isEmpty) ZChannel.refailCause(err) + else ZChannel.write(Chunk.single(stringBuilder.result)) *> ZChannel.refailCause(err), + done => + if (stringBuilder.isEmpty) ZChannel.succeed(done) + else ZChannel.write(Chunk.single(stringBuilder.result)) *> ZChannel.succeed(done) + ) + + ZPipeline.fromChannel(loop) + } + } + def schemaBasedBinaryCodec[A](cfg: Config)(implicit schema: Schema[A]): BinaryCodec[A] = new BinaryCodec[A] { override def decode(whole: Chunk[Byte]): Either[DecodeError, A] = @@ -80,10 +170,7 @@ object JsonCodec { override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] = ZPipeline.utfDecode.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage)) >>> - ZPipeline.groupAdjacentBy[String, Unit](_ => ()) >>> - ZPipeline.map[(Unit, NonEmptyChunk[String]), String] { - case (_, fragments) => fragments.mkString - } >>> + splitOnJsonBoundary >>> ZPipeline.mapZIO { (s: String) => ZIO.fromEither(JsonDecoder.decode(schema, s)) } @@ -92,9 +179,7 @@ object JsonCodec { JsonEncoder.encode(schema, value, cfg) override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] = - ZPipeline.mapChunks( - _.flatMap(encode) - ) + ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks } def jsonEncoder[A](schema: Schema[A]): ZJsonEncoder[A] = diff --git a/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala b/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala index 7bdeabc08..ec86e0c43 100644 --- a/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala +++ b/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala @@ -1,9 +1,7 @@ package zio.schema.codec import java.time.{ ZoneId, ZoneOffset } - import scala.collection.immutable.ListMap - import zio.Console._ import zio._ import zio.json.JsonDecoder.JsonError @@ -22,8 +20,13 @@ import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ +import java.io.IOException + object JsonCodecSpec extends ZIOSpecDefault { + case class Person(name: String, age: Int) + val personSchema: Schema[Person] = DeriveSchema.gen[Person] + def spec: Spec[TestEnvironment, Any] = suite("JsonCodec Spec")( encoderSuite, @@ -436,6 +439,71 @@ object JsonCodecSpec extends ZIOSpecDefault { """{"NumberOne":{"value":"foo"}}""" ) } + ), + suite("Streams")( + test("Encodes a stream with multiple integers") { + assertEncodesMore(Schema[Int], 1 to 5, charSequenceToByteChunk("1\n2\n3\n4\n5")) + }, + test("Decodes a stream with multiple integers") { + assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1\n2\n3\n4\n5")) &> + assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2 3 4 5")) &> + assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2, 3;;; 4x5")) + }, + test("Decodes a stream with multiple booleans") { + assertDecodesMore(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("true true false")) &> + assertDecodesMore(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("truetruefalse")) + }, + test("Encodes a stream with multiple strings") { + assertEncodesMore(Schema[String], List("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\"")) + }, + test("Decodes a stream with multiple strings") { + assertDecodesMore(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\"")) &> + assertDecodesMore(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk(""""a" "b""c"""")) + }, + test("Encodes a stream with multiple records") { + assertEncodesMore( + personSchema, + List( + Person("Alice", 1), + Person("Bob", 2), + Person("Charlie", 3) + ), + charSequenceToByteChunk( + """{"name":"Alice","age":1} + |{"name":"Bob","age":2} + |{"name":"Charlie","age":3}""".stripMargin + ) + ) + }, + test("Decodes a stream with multiple records") { + assertDecodesMore( + personSchema, + Chunk( + Person("Alice", 1), + Person("Bob", 2), + Person("Charlie", 3) + ), + charSequenceToByteChunk( + """{"name":"Alice","age":1} + |{"name":"Bob","age":2} + |{"name":"Charlie","age":3}""".stripMargin + ) + ) + }, + test("Encodes a stream with no records") { + assertEncodesMore( + personSchema, + List.empty[Person], + charSequenceToByteChunk("") + ) + }, + test("Decodes a stream with no records") { + assertDecodesMore( + personSchema, + Chunk.empty, + charSequenceToByteChunk("") + ) + } ) ) @@ -1307,7 +1375,7 @@ object JsonCodecSpec extends ZIOSpecDefault { ), test("decode discriminated case objects with extra fields")( assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"type":"Cash","extraField":1}""")) &> - assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"extraField":1,"type":"Cash"}"""")) + assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"extraField":1,"type":"Cash"}""")) ), suite("of case objects")( test("without annotation")( @@ -1505,6 +1573,23 @@ object JsonCodecSpec extends ZIOSpecDefault { assertZIO(stream)(equalTo(chunk)) } + private def assertEncodesMore[A]( + schema: Schema[A], + values: Seq[A], + chunk: Chunk[Byte], + cfg: JsonCodec.Config = JsonCodec.Config.default, + print: Boolean = false + ) = { + val stream = ZStream + .fromIterable(values) + .via(JsonCodec.schemaBasedBinaryCodec(cfg)(schema).streamEncoder) + .runCollect + .tap { chunk => + printLine(s"${new String(chunk.toArray)}").when(print).ignore + } + assertZIO(stream)(equalTo(chunk)) + } + private def assertEncodesJson[A]( schema: Schema[A], value: A, @@ -1551,6 +1636,16 @@ object JsonCodecSpec extends ZIOSpecDefault { assertZIO(result)(equalTo(Chunk(value))) } + private def assertDecodesMore[A]( + schema: Schema[A], + values: Chunk[A], + chunk: Chunk[Byte], + cfg: JsonCodec.Config = JsonCodec.Config.default + ) = { + val result = ZStream.fromChunk(chunk).via(JsonCodec.schemaBasedBinaryCodec[A](cfg)(schema).streamDecoder).runCollect + assertZIO(result)(equalTo(values)) + } + private def assertEncodesThenDecodesFallback[A, B]( schema: Schema.Fallback[A, B], value: Fallback[A, B]