From 41892d5344f9c9f29f2df07b4b9ce625af7d0fc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gregor=20Ra=C3=BDman?= Date: Wed, 11 Sep 2024 14:12:06 +0200 Subject: [PATCH] JsonCodec can now encode/decode streams as JSON arrays. Whether the codec will treat streams as arrays is governed by `JsonCodec.Config.treatStreamsAsArrays` which defaults to `false` to keep it compatible with the previous version. (#739) --- .../scala/zio/schema/codec/JsonCodec.scala | 215 ++++++++++-------- .../zio/schema/codec/JsonCodecSpec.scala | 107 ++++++++- 2 files changed, 223 insertions(+), 99 deletions(-) diff --git a/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala b/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala index 27dfed462..38ea34ff6 100644 --- a/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala +++ b/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala @@ -26,7 +26,7 @@ import zio.{ Cause, Chunk, ChunkBuilder, ZIO, ZNothing } object JsonCodec { - final case class Config(ignoreEmptyCollections: Boolean) + final case class Config(ignoreEmptyCollections: Boolean, treatStreamsAsArrays: Boolean = false) object Config { val default: Config = Config(ignoreEmptyCollections = false) @@ -64,112 +64,121 @@ object JsonCodec { implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): BinaryCodec[A] = schemaBasedBinaryCodec[A](JsonCodec.Config.default) - val splitOnJsonBoundary: ZPipeline[Any, Nothing, String, String] = { - val validNumChars = Set('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'E', 'e', '-', '+', '.') - val ContextJson = 'j' - val ContextString = 's' - val ContextBoolean = 'b' - val ContextNull = 'u' - val ContextNullAfterFirstL = 'x' - val ContextNumber = 'n' - val ContextEscape = 'e' - - ZPipeline.suspend { - val stringBuilder = new StringBuilder - var depth = 0 - var context = ContextJson - - def fetchChunk(chunk: Chunk[String]): Chunk[String] = { - val chunkBuilder = ChunkBuilder.make[String]() - for { - string <- chunk - c <- string - } { - var valueEnded = false - context match { - case ContextEscape => - context = 's' - case ContextString => - c match { - case '\\' => context = ContextEscape - case '"' => - context = ContextJson - valueEnded = true - case _ => - } - case ContextBoolean => - if (c == 'e') { - context = ContextJson - valueEnded = true - } - case ContextNull => - if (c == 'l') { - context = ContextNullAfterFirstL - } - case ContextNullAfterFirstL => - if (c == 'l') { - context = ContextJson - valueEnded = true - } - case ContextNumber => - c match { - case '}' | ']' => - depth -= 1 + private object JsonSplitter { + val validNumChars: Set[Char] = Set('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'E', 'e', '-', '+', '.') + val ContextJson = 'j' + val ContextString = 's' + val ContextBoolean = 'b' + val ContextNull = 'u' + val ContextNullAfterFirstL = 'x' + val ContextNumber = 'n' + val ContextEscape = 'e' + val ContextDone = 'd' + + def jsonSplitter(wrappedInArray: Boolean): ZPipeline[Any, Nothing, String, String] = + ZPipeline.suspend { + val stringBuilder = new StringBuilder + var depth = if (wrappedInArray) -1 else 0 + var context = ContextJson + + def fetchChunk(chunk: Chunk[String]): Chunk[String] = { + val chunkBuilder = ChunkBuilder.make[String]() + for { + string <- chunk + c <- string + } { + var valueEnded = false + context match { + case ContextEscape => + context = 's' + case ContextString => + c match { + case '\\' => context = ContextEscape + case '"' => + context = ContextJson + valueEnded = true + case _ => + } + case ContextBoolean => + if (c == 'e') { context = ContextJson valueEnded = true - case _ if !validNumChars(c) => + } + case ContextNull => + if (c == 'l') { + context = ContextNullAfterFirstL + } + case ContextNullAfterFirstL => + if (c == 'l') { context = ContextJson valueEnded = true - case _ => - } - case _ => - c match { - case '{' | '[' => - depth += 1 - case '}' | ']' => - depth -= 1 - valueEnded = true - case '"' => - context = ContextString - case 't' | 'f' => - context = ContextBoolean - case 'n' => - context = ContextNull - case x if validNumChars(x) => - context = ContextNumber - case _ => - } - } - if (depth > 0 || context != ContextJson || valueEnded) - stringBuilder.append(c) + } + case ContextNumber => + c match { + case '}' | ']' => + depth -= 1 + context = if (depth < 0) ContextDone else ContextJson + valueEnded = true + case _ if !validNumChars(c) => + context = ContextJson + valueEnded = true + case _ => + } + case ContextDone => // no more values, ignore everything + case _ => + c match { + case '{' | '[' => + depth += 1 + case '}' | ']' => + depth -= 1 + valueEnded = true + if (depth == -1) context = ContextDone + case '"' => + context = ContextString + case 't' | 'f' => + context = ContextBoolean + case 'n' => + context = ContextNull + case x if validNumChars(x) => + context = ContextNumber + case _ => + } + } + if (context != ContextDone && (depth > 0 || context != ContextJson || valueEnded)) + stringBuilder.append(c) - if (valueEnded && depth == 0) { - val str = stringBuilder.result() - if (!str.forall(_.isWhitespace)) chunkBuilder += str - stringBuilder.clear() + if (valueEnded && depth == 0) { + val str = stringBuilder.result() + if (!str.forall(_.isWhitespace)) { + chunkBuilder += str + } + stringBuilder.clear() + } } + chunkBuilder.result() } - chunkBuilder.result() - } - lazy val loop: ZChannel[Any, ZNothing, Chunk[String], Any, Nothing, Chunk[String], Any] = - ZChannel.readWithCause( - in => { - val out = fetchChunk(in) - if (out.isEmpty) loop else ZChannel.write(out) *> loop - }, - err => - if (stringBuilder.isEmpty) ZChannel.refailCause(err) - else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.refailCause(err), - done => - if (stringBuilder.isEmpty) ZChannel.succeed(done) - else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.succeed(done) - ) + lazy val loop: ZChannel[Any, ZNothing, Chunk[String], Any, Nothing, Chunk[String], Any] = + ZChannel.readWithCause( + in => { + val out = fetchChunk(in) + if (out.isEmpty) loop else ZChannel.write(out) *> loop + }, + err => + if (stringBuilder.isEmpty) ZChannel.refailCause(err) + else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.refailCause(err), + done => + if (stringBuilder.isEmpty) ZChannel.succeed(done) + else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.succeed(done) + ) - ZPipeline.fromChannel(loop) - } + ZPipeline.fromChannel(loop) + } } + val splitOnJsonBoundary: ZPipeline[Any, Nothing, String, String] = JsonSplitter.jsonSplitter(wrappedInArray = false) + val splitJsonArrayElements: ZPipeline[Any, Nothing, String, String] = JsonSplitter.jsonSplitter(wrappedInArray = true) + def schemaBasedBinaryCodec[A](cfg: Config)(implicit schema: Schema[A]): BinaryCodec[A] = new BinaryCodec[A] { override def decode(whole: Chunk[Byte]): Either[DecodeError, A] = @@ -180,7 +189,7 @@ object JsonCodec { override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] = ZPipeline.utfDecode.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage)) >>> - splitOnJsonBoundary >>> + (if (cfg.treatStreamsAsArrays) splitJsonArrayElements else splitOnJsonBoundary) >>> ZPipeline.mapZIO { (s: String) => ZIO.fromEither(JsonDecoder.decode(schema, s)) } @@ -189,7 +198,17 @@ object JsonCodec { JsonEncoder.encode(schema, value, cfg) override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] = - ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks + if (cfg.treatStreamsAsArrays) { + val interspersed: ZPipeline[Any, Nothing, A, Byte] = ZPipeline + .mapChunks[A, Chunk[Byte]](_.map(encode)) + .intersperse(Chunk.single(','.toByte)) + .flattenChunks + val prepended: ZPipeline[Any, Nothing, A, Byte] = + interspersed >>> ZPipeline.prepend(Chunk.single('['.toByte)) + prepended >>> ZPipeline.append(Chunk.single(']'.toByte)) + } else { + ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks + } } def jsonEncoder[A](schema: Schema[A]): ZJsonEncoder[A] = diff --git a/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala b/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala index 487b42e7d..23d46c157 100644 --- a/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala +++ b/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala @@ -445,6 +445,14 @@ object JsonCodecSpec extends ZIOSpecDefault { test("Encodes a stream with multiple integers") { assertEncodesMany(Schema[Int], 1 to 5, charSequenceToByteChunk("1\n2\n3\n4\n5")) }, + test("Encodes a stream with multiple integers to an array") { + assertEncodesMany( + Schema[Int], + 1 to 5, + charSequenceToByteChunk("[1,2,3,4,5]"), + JsonCodec.Config(ignoreEmptyCollections = false, treatStreamsAsArrays = true) + ) + }, test("Decodes a stream with multiple integers separated by newlines") { assertDecodesMany(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1\n2\n3\n4\n5")) }, @@ -453,18 +461,53 @@ object JsonCodecSpec extends ZIOSpecDefault { }, test("Decodes a stream with multiple integers separated by commas and other non JSON number characters") { assertDecodesMany(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2, 3;;; 4x5")) + }, + test("Decodes a stream with multiple integers encoded as an array") { + assertDecodesMany( + Schema[Int], + Chunk.fromIterable(1 to 5), + charSequenceToByteChunk("[1,2,3,4,5]"), + JsonCodec.Config(ignoreEmptyCollections = false, treatStreamsAsArrays = true) + ) + }, + test("Decodes a stream with multiple integers encoded as an array with additional whitespace") { + assertDecodesMany( + Schema[Int], + Chunk.fromIterable(1 to 5), + charSequenceToByteChunk(""" + | [1, + |2,3, + |4, 5] """.stripMargin), + JsonCodec.Config(ignoreEmptyCollections = false, treatStreamsAsArrays = true) + ) } ), suite("Streams of booleans")( test("Encodes a stream with multiple booleans") { assertEncodesMany(Schema[Boolean], List(true, true, false), charSequenceToByteChunk("true\ntrue\nfalse")) }, + test("Encodes a stream with multiple booleans to an array") { + assertEncodesMany( + Schema[Boolean], + List(true, true, false), + charSequenceToByteChunk("[true,true,false]"), + JsonCodec.Config(ignoreEmptyCollections = false, treatStreamsAsArrays = true) + ) + }, test("Decodes a stream with multiple booleans separated by newlines") { assertDecodesMany(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("true\ntrue\nfalse")) }, test("Decodes a stream with multiple booleans separated by spaces") { assertDecodesMany(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("true true false")) }, + test("Decodes a stream with multiple booleans as an array") { + assertDecodesMany( + Schema[Boolean], + Chunk(true, true, false), + charSequenceToByteChunk("[true, true, false]"), + JsonCodec.Config(ignoreEmptyCollections = false, treatStreamsAsArrays = true) + ) + }, test( "Decodes a stream with multiple booleans separated by commas and other non JSON boolean characters and not separated at all" ) { @@ -479,9 +522,25 @@ object JsonCodecSpec extends ZIOSpecDefault { test("Encodes a stream with multiple strings") { assertEncodesMany(Schema[String], List("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\"")) }, + test("Encodes a stream with multiple strings as an array") { + assertEncodesMany( + Schema[String], + List("a", "b", "c"), + charSequenceToByteChunk("[\"a\",\"b\",\"c\"]"), + JsonCodec.Config(ignoreEmptyCollections = false, treatStreamsAsArrays = true) + ) + }, test("Decodes a stream with multiple strings separated by newlines") { assertDecodesMany(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\"")) }, + test("Decodes a stream with multiple strings as an array") { + assertDecodesMany( + Schema[String], + Chunk("a", "b", "c"), + charSequenceToByteChunk("[\"a\", \"b\",\n\"c\"]"), + JsonCodec.Config(ignoreEmptyCollections = false, treatStreamsAsArrays = true) + ) + }, test("Decodes a stream with multiple strings separated by spaces, commas and not separated at all") { assertDecodesMany(Schema[String], Chunk("a", "b", "c", "d"), charSequenceToByteChunk(""""a" "b","c""d"""")) } @@ -502,6 +561,20 @@ object JsonCodecSpec extends ZIOSpecDefault { ) ) }, + test("Encodes a stream with multiple records as an array") { + assertEncodesMany( + personSchema, + List( + Person("Alice", 1), + Person("Bob", 2), + Person("Charlie", 3) + ), + charSequenceToByteChunk( + """[{"name":"Alice","age":1},{"name":"Bob","age":2},{"name":"Charlie","age":3}]""" + ), + JsonCodec.Config(ignoreEmptyCollections = false, treatStreamsAsArrays = true) + ) + }, test("Decodes a stream with multiple records separated by newlines") { assertDecodesMany( personSchema, @@ -517,7 +590,7 @@ object JsonCodecSpec extends ZIOSpecDefault { ) ) }, - test("Decodes a stream with multiple records, not separated with internalnewlines") { + test("Decodes a stream with multiple records, not separated with internal newlines") { assertDecodesMany( personSchema, Chunk( @@ -532,6 +605,22 @@ object JsonCodecSpec extends ZIOSpecDefault { ) ) }, + test("Decodes a stream with multiple records formatted as an array") { + assertDecodesMany( + personSchema, + Chunk( + Person("Alice", 1), + Person("Bob", 2), + Person("Charlie", 3) + ), + charSequenceToByteChunk( + """[{"name":"Alice","age":1}, {"name":"Bob","age":2}, + |{"name":"Charlie","age" + |: 3}]""".stripMargin + ), + JsonCodec.Config(ignoreEmptyCollections = false, treatStreamsAsArrays = true) + ) + }, test("Encodes a stream with no records") { assertEncodesMany( personSchema, @@ -539,12 +628,28 @@ object JsonCodecSpec extends ZIOSpecDefault { charSequenceToByteChunk("") ) }, + test("Encodes a stream with no records") { + assertEncodesMany( + personSchema, + List.empty[Person], + charSequenceToByteChunk("[]"), + JsonCodec.Config(ignoreEmptyCollections = false, treatStreamsAsArrays = true) + ) + }, test("Decodes a stream with no records") { assertDecodesMany( personSchema, Chunk.empty, charSequenceToByteChunk("") ) + }, + test("Decodes a stream with no records from an array") { + assertDecodesMany( + personSchema, + Chunk.empty, + charSequenceToByteChunk(" [ ] "), + JsonCodec.Config(ignoreEmptyCollections = false, treatStreamsAsArrays = true) + ) } ) )