diff --git a/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala b/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala index 9eecca635..27dfed462 100644 --- a/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala +++ b/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala @@ -21,8 +21,8 @@ import zio.prelude.NonEmptyMap import zio.schema._ import zio.schema.annotation._ import zio.schema.codec.DecodeError.ReadError -import zio.stream.ZPipeline -import zio.{ Cause, Chunk, ChunkBuilder, NonEmptyChunk, ZIO } +import zio.stream.{ ZChannel, ZPipeline } +import zio.{ Cause, Chunk, ChunkBuilder, ZIO, ZNothing } object JsonCodec { @@ -47,11 +47,7 @@ object JsonCodec { override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] = ZPipeline.fromChannel( ZPipeline.utfDecode.channel.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage)) - ) >>> - ZPipeline.groupAdjacentBy[String, Unit](_ => ()) >>> - ZPipeline.map[(Unit, NonEmptyChunk[String]), String] { - case (_, fragments) => fragments.mkString - } >>> + ) >>> splitOnJsonBoundary >>> ZPipeline.mapZIO { (s: String) => ZIO .fromEither(jsonCodec.decodeJson(s)) @@ -62,14 +58,118 @@ object JsonCodec { JsonEncoder.charSequenceToByteChunk(jsonCodec.encodeJson(value, None)) override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] = - ZPipeline.mapChunks( - _.flatMap(encode) - ) + ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks } implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): BinaryCodec[A] = schemaBasedBinaryCodec[A](JsonCodec.Config.default) + val splitOnJsonBoundary: ZPipeline[Any, Nothing, String, String] = { + val validNumChars = Set('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'E', 'e', '-', '+', '.') + val ContextJson = 'j' + val ContextString = 's' + val ContextBoolean = 'b' + val ContextNull = 'u' + val ContextNullAfterFirstL = 'x' + val ContextNumber = 'n' + val ContextEscape = 'e' + + ZPipeline.suspend { + val stringBuilder = new StringBuilder + var depth = 0 + var context = ContextJson + + def fetchChunk(chunk: Chunk[String]): Chunk[String] = { + val chunkBuilder = ChunkBuilder.make[String]() + for { + string <- chunk + c <- string + } { + var valueEnded = false + context match { + case ContextEscape => + context = 's' + case ContextString => + c match { + case '\\' => context = ContextEscape + case '"' => + context = ContextJson + valueEnded = true + case _ => + } + case ContextBoolean => + if (c == 'e') { + context = ContextJson + valueEnded = true + } + case ContextNull => + if (c == 'l') { + context = ContextNullAfterFirstL + } + case ContextNullAfterFirstL => + if (c == 'l') { + context = ContextJson + valueEnded = true + } + case ContextNumber => + c match { + case '}' | ']' => + depth -= 1 + context = ContextJson + valueEnded = true + case _ if !validNumChars(c) => + context = ContextJson + valueEnded = true + case _ => + } + case _ => + c match { + case '{' | '[' => + depth += 1 + case '}' | ']' => + depth -= 1 + valueEnded = true + case '"' => + context = ContextString + case 't' | 'f' => + context = ContextBoolean + case 'n' => + context = ContextNull + case x if validNumChars(x) => + context = ContextNumber + case _ => + } + } + if (depth > 0 || context != ContextJson || valueEnded) + stringBuilder.append(c) + + if (valueEnded && depth == 0) { + val str = stringBuilder.result() + if (!str.forall(_.isWhitespace)) chunkBuilder += str + stringBuilder.clear() + } + } + chunkBuilder.result() + } + + lazy val loop: ZChannel[Any, ZNothing, Chunk[String], Any, Nothing, Chunk[String], Any] = + ZChannel.readWithCause( + in => { + val out = fetchChunk(in) + if (out.isEmpty) loop else ZChannel.write(out) *> loop + }, + err => + if (stringBuilder.isEmpty) ZChannel.refailCause(err) + else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.refailCause(err), + done => + if (stringBuilder.isEmpty) ZChannel.succeed(done) + else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.succeed(done) + ) + + ZPipeline.fromChannel(loop) + } + } + def schemaBasedBinaryCodec[A](cfg: Config)(implicit schema: Schema[A]): BinaryCodec[A] = new BinaryCodec[A] { override def decode(whole: Chunk[Byte]): Either[DecodeError, A] = @@ -80,10 +180,7 @@ object JsonCodec { override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] = ZPipeline.utfDecode.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage)) >>> - ZPipeline.groupAdjacentBy[String, Unit](_ => ()) >>> - ZPipeline.map[(Unit, NonEmptyChunk[String]), String] { - case (_, fragments) => fragments.mkString - } >>> + splitOnJsonBoundary >>> ZPipeline.mapZIO { (s: String) => ZIO.fromEither(JsonDecoder.decode(schema, s)) } @@ -92,9 +189,7 @@ object JsonCodec { JsonEncoder.encode(schema, value, cfg) override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] = - ZPipeline.mapChunks( - _.flatMap(encode) - ) + ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks } def jsonEncoder[A](schema: Schema[A]): ZJsonEncoder[A] = diff --git a/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala b/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala index 7bdeabc08..487b42e7d 100644 --- a/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala +++ b/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala @@ -24,6 +24,9 @@ import zio.test._ object JsonCodecSpec extends ZIOSpecDefault { + case class Person(name: String, age: Int) + val personSchema: Schema[Person] = DeriveSchema.gen[Person] + def spec: Spec[TestEnvironment, Any] = suite("JsonCodec Spec")( encoderSuite, @@ -436,6 +439,114 @@ object JsonCodecSpec extends ZIOSpecDefault { """{"NumberOne":{"value":"foo"}}""" ) } + ), + suite("Streams")( + suite("Streams of integers")( + test("Encodes a stream with multiple integers") { + assertEncodesMany(Schema[Int], 1 to 5, charSequenceToByteChunk("1\n2\n3\n4\n5")) + }, + test("Decodes a stream with multiple integers separated by newlines") { + assertDecodesMany(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1\n2\n3\n4\n5")) + }, + test("Decodes a stream with multiple integers separated by spaces") { + assertDecodesMany(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2 3 4 5")) + }, + test("Decodes a stream with multiple integers separated by commas and other non JSON number characters") { + assertDecodesMany(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2, 3;;; 4x5")) + } + ), + suite("Streams of booleans")( + test("Encodes a stream with multiple booleans") { + assertEncodesMany(Schema[Boolean], List(true, true, false), charSequenceToByteChunk("true\ntrue\nfalse")) + }, + test("Decodes a stream with multiple booleans separated by newlines") { + assertDecodesMany(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("true\ntrue\nfalse")) + }, + test("Decodes a stream with multiple booleans separated by spaces") { + assertDecodesMany(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("true true false")) + }, + test( + "Decodes a stream with multiple booleans separated by commas and other non JSON boolean characters and not separated at all" + ) { + assertDecodesMany( + Schema[Boolean], + Chunk(true, true, false, false), + charSequenceToByteChunk("true true, falsefalse") + ) + } + ), + suite("Streams of strings")( + test("Encodes a stream with multiple strings") { + assertEncodesMany(Schema[String], List("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\"")) + }, + test("Decodes a stream with multiple strings separated by newlines") { + assertDecodesMany(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\"")) + }, + test("Decodes a stream with multiple strings separated by spaces, commas and not separated at all") { + assertDecodesMany(Schema[String], Chunk("a", "b", "c", "d"), charSequenceToByteChunk(""""a" "b","c""d"""")) + } + ), + suite("Stream of records")( + test("Encodes a stream with multiple records") { + assertEncodesMany( + personSchema, + List( + Person("Alice", 1), + Person("Bob", 2), + Person("Charlie", 3) + ), + charSequenceToByteChunk( + """{"name":"Alice","age":1} + |{"name":"Bob","age":2} + |{"name":"Charlie","age":3}""".stripMargin + ) + ) + }, + test("Decodes a stream with multiple records separated by newlines") { + assertDecodesMany( + personSchema, + Chunk( + Person("Alice", 1), + Person("Bob", 2), + Person("Charlie", 3) + ), + charSequenceToByteChunk( + """{"name":"Alice","age":1} + |{"name":"Bob","age":2} + |{"name":"Charlie","age":3}""".stripMargin + ) + ) + }, + test("Decodes a stream with multiple records, not separated with internalnewlines") { + assertDecodesMany( + personSchema, + Chunk( + Person("Alice", 1), + Person("Bob", 2), + Person("Charlie", 3) + ), + charSequenceToByteChunk( + """{"name":"Alice","age":1}{"name":"Bob", + |"age" + |:2}{"name":"Charlie","age":3}""".stripMargin + ) + ) + }, + test("Encodes a stream with no records") { + assertEncodesMany( + personSchema, + List.empty[Person], + charSequenceToByteChunk("") + ) + }, + test("Decodes a stream with no records") { + assertDecodesMany( + personSchema, + Chunk.empty, + charSequenceToByteChunk("") + ) + } + ) ) ) @@ -1307,7 +1418,7 @@ object JsonCodecSpec extends ZIOSpecDefault { ), test("decode discriminated case objects with extra fields")( assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"type":"Cash","extraField":1}""")) &> - assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"extraField":1,"type":"Cash"}"""")) + assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"extraField":1,"type":"Cash"}""")) ), suite("of case objects")( test("without annotation")( @@ -1505,6 +1616,23 @@ object JsonCodecSpec extends ZIOSpecDefault { assertZIO(stream)(equalTo(chunk)) } + private def assertEncodesMany[A]( + schema: Schema[A], + values: Seq[A], + chunk: Chunk[Byte], + cfg: JsonCodec.Config = JsonCodec.Config.default, + print: Boolean = false + ) = { + val stream = ZStream + .fromIterable(values) + .via(JsonCodec.schemaBasedBinaryCodec(cfg)(schema).streamEncoder) + .runCollect + .tap { chunk => + printLine(s"${new String(chunk.toArray)}").when(print).ignore + } + assertZIO(stream)(equalTo(chunk)) + } + private def assertEncodesJson[A]( schema: Schema[A], value: A, @@ -1551,6 +1679,16 @@ object JsonCodecSpec extends ZIOSpecDefault { assertZIO(result)(equalTo(Chunk(value))) } + private def assertDecodesMany[A]( + schema: Schema[A], + values: Chunk[A], + chunk: Chunk[Byte], + cfg: JsonCodec.Config = JsonCodec.Config.default + ) = { + val result = ZStream.fromChunk(chunk).via(JsonCodec.schemaBasedBinaryCodec[A](cfg)(schema).streamDecoder).runCollect + assertZIO(result)(equalTo(values)) + } + private def assertEncodesThenDecodesFallback[A, B]( schema: Schema.Fallback[A, B], value: Fallback[A, B]