From 9ae0f6212685b27c09fa1361504fb755f2f22183 Mon Sep 17 00:00:00 2001 From: Gregor Rayman Date: Tue, 10 Sep 2024 00:16:30 +0200 Subject: [PATCH 1/8] Fixes #737 JsonCodec can encode and decode multiple elements --- .../scala/zio/schema/codec/JsonCodec.scala | 129 +++++++++++++++--- .../zio/schema/codec/JsonCodecSpec.scala | 101 +++++++++++++- 2 files changed, 205 insertions(+), 25 deletions(-) diff --git a/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala b/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala index 9eecca635..72c84d988 100644 --- a/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala +++ b/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala @@ -2,27 +2,25 @@ package zio.schema.codec import java.nio.CharBuffer import java.nio.charset.StandardCharsets - import scala.annotation.{ switch, tailrec } import scala.collection.immutable.ListMap - import zio.json.JsonCodec._ import zio.json.JsonDecoder.{ JsonError, UnsafeJson } import zio.json.ast.Json import zio.json.internal.{ Lexer, RecordingReader, RetractReader, StringMatrix, WithRecordingReader, Write } import zio.json.{ + JsonFieldDecoder, + JsonFieldEncoder, JsonCodec => ZJsonCodec, JsonDecoder => ZJsonDecoder, - JsonEncoder => ZJsonEncoder, - JsonFieldDecoder, - JsonFieldEncoder + JsonEncoder => ZJsonEncoder } import zio.prelude.NonEmptyMap import zio.schema._ import zio.schema.annotation._ import zio.schema.codec.DecodeError.ReadError -import zio.stream.ZPipeline -import zio.{ Cause, Chunk, ChunkBuilder, NonEmptyChunk, ZIO } +import zio.stream.{ ZChannel, ZPipeline } +import zio.{ Cause, Chunk, ChunkBuilder, NonEmptyChunk, ZIO, ZNothing } object JsonCodec { @@ -47,11 +45,7 @@ object JsonCodec { override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] = ZPipeline.fromChannel( ZPipeline.utfDecode.channel.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage)) - ) >>> - ZPipeline.groupAdjacentBy[String, Unit](_ => ()) >>> - ZPipeline.map[(Unit, NonEmptyChunk[String]), String] { - case (_, fragments) => fragments.mkString - } >>> + ) >>> splitOnJsonBoundary >>> ZPipeline.mapZIO { (s: String) => ZIO .fromEither(jsonCodec.decodeJson(s)) @@ -62,14 +56,110 @@ object JsonCodec { JsonEncoder.charSequenceToByteChunk(jsonCodec.encodeJson(value, None)) override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] = - ZPipeline.mapChunks( - _.flatMap(encode) - ) + ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks } implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): BinaryCodec[A] = schemaBasedBinaryCodec[A](JsonCodec.Config.default) + val splitOnJsonBoundary: ZPipeline[Any, Nothing, String, String] = { + val validNumChars = Set('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'E', 'e', '-', '+', '.') + ZPipeline.suspend { + val stringBuilder = new StringBuilder + var depth = 0 + var valueType = 'j' // j = json, s = string, b = boolean, u = null, n = number, x = null after first null, e = escape in string + + def fetchChunk(chunk: Chunk[String]): Chunk[String] = { + val chunkBuilder = ChunkBuilder.make[String]() + for { + string <- chunk + c <- string + } { + var valueEnded = false + valueType match { + case 'e' => + valueType = 's' + case 's' => + c match { + case '\\' => valueType = 'e' + case '"' => + valueType = 'j' + valueEnded = true + case _ => + } + case 'b' => + if (c == 'e') { + valueType = 'j' + valueEnded = true + } + case 'u' => + if (c == 'l') { + valueType = 'x' + } + case 'x' => + if (c == 'l') { + valueType = 'j' + valueEnded = true + } + case 'n' => + c match { + case '}' | ']' => + depth -= 1 + valueType = 'j' + valueEnded = true + case _ if !validNumChars(c) => + valueType = 'j' + valueEnded = true + case _ => + } + case 'j' => + c match { + case '{' | '[' => + depth += 1 + case '}' | ']' => + depth -= 1 + valueEnded = true + case '"' => + valueType = 's' + case 't' | 'f' => + valueType = 'b' + case 'n' => + valueType = 'u' + case x if validNumChars(x) => + valueType = 'n' + case _ => + } + } + if (depth > 0 || valueType != 'j' || valueEnded) + stringBuilder.append(c) + + if (valueEnded && depth == 0) { + val str = stringBuilder.result() + if (!str.isBlank) chunkBuilder += str + stringBuilder.clear() + } + } + chunkBuilder.result() + } + + lazy val loop: ZChannel[Any, ZNothing, Chunk[String], Any, Nothing, Chunk[String], Any] = + ZChannel.readWithCause( + in => { + val out = fetchChunk(in) + if (out.isEmpty) loop else ZChannel.write(out) *> loop + }, + err => + if (stringBuilder.isEmpty) ZChannel.refailCause(err) + else ZChannel.write(Chunk.single(stringBuilder.result)) *> ZChannel.refailCause(err), + done => + if (stringBuilder.isEmpty) ZChannel.succeed(done) + else ZChannel.write(Chunk.single(stringBuilder.result)) *> ZChannel.succeed(done) + ) + + ZPipeline.fromChannel(loop) + } + } + def schemaBasedBinaryCodec[A](cfg: Config)(implicit schema: Schema[A]): BinaryCodec[A] = new BinaryCodec[A] { override def decode(whole: Chunk[Byte]): Either[DecodeError, A] = @@ -80,10 +170,7 @@ object JsonCodec { override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] = ZPipeline.utfDecode.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage)) >>> - ZPipeline.groupAdjacentBy[String, Unit](_ => ()) >>> - ZPipeline.map[(Unit, NonEmptyChunk[String]), String] { - case (_, fragments) => fragments.mkString - } >>> + splitOnJsonBoundary >>> ZPipeline.mapZIO { (s: String) => ZIO.fromEither(JsonDecoder.decode(schema, s)) } @@ -92,9 +179,7 @@ object JsonCodec { JsonEncoder.encode(schema, value, cfg) override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] = - ZPipeline.mapChunks( - _.flatMap(encode) - ) + ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks } def jsonEncoder[A](schema: Schema[A]): ZJsonEncoder[A] = diff --git a/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala b/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala index 7bdeabc08..ec86e0c43 100644 --- a/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala +++ b/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala @@ -1,9 +1,7 @@ package zio.schema.codec import java.time.{ ZoneId, ZoneOffset } - import scala.collection.immutable.ListMap - import zio.Console._ import zio._ import zio.json.JsonDecoder.JsonError @@ -22,8 +20,13 @@ import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ +import java.io.IOException + object JsonCodecSpec extends ZIOSpecDefault { + case class Person(name: String, age: Int) + val personSchema: Schema[Person] = DeriveSchema.gen[Person] + def spec: Spec[TestEnvironment, Any] = suite("JsonCodec Spec")( encoderSuite, @@ -436,6 +439,71 @@ object JsonCodecSpec extends ZIOSpecDefault { """{"NumberOne":{"value":"foo"}}""" ) } + ), + suite("Streams")( + test("Encodes a stream with multiple integers") { + assertEncodesMore(Schema[Int], 1 to 5, charSequenceToByteChunk("1\n2\n3\n4\n5")) + }, + test("Decodes a stream with multiple integers") { + assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1\n2\n3\n4\n5")) &> + assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2 3 4 5")) &> + assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2, 3;;; 4x5")) + }, + test("Decodes a stream with multiple booleans") { + assertDecodesMore(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("true true false")) &> + assertDecodesMore(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("truetruefalse")) + }, + test("Encodes a stream with multiple strings") { + assertEncodesMore(Schema[String], List("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\"")) + }, + test("Decodes a stream with multiple strings") { + assertDecodesMore(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\"")) &> + assertDecodesMore(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk(""""a" "b""c"""")) + }, + test("Encodes a stream with multiple records") { + assertEncodesMore( + personSchema, + List( + Person("Alice", 1), + Person("Bob", 2), + Person("Charlie", 3) + ), + charSequenceToByteChunk( + """{"name":"Alice","age":1} + |{"name":"Bob","age":2} + |{"name":"Charlie","age":3}""".stripMargin + ) + ) + }, + test("Decodes a stream with multiple records") { + assertDecodesMore( + personSchema, + Chunk( + Person("Alice", 1), + Person("Bob", 2), + Person("Charlie", 3) + ), + charSequenceToByteChunk( + """{"name":"Alice","age":1} + |{"name":"Bob","age":2} + |{"name":"Charlie","age":3}""".stripMargin + ) + ) + }, + test("Encodes a stream with no records") { + assertEncodesMore( + personSchema, + List.empty[Person], + charSequenceToByteChunk("") + ) + }, + test("Decodes a stream with no records") { + assertDecodesMore( + personSchema, + Chunk.empty, + charSequenceToByteChunk("") + ) + } ) ) @@ -1307,7 +1375,7 @@ object JsonCodecSpec extends ZIOSpecDefault { ), test("decode discriminated case objects with extra fields")( assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"type":"Cash","extraField":1}""")) &> - assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"extraField":1,"type":"Cash"}"""")) + assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"extraField":1,"type":"Cash"}""")) ), suite("of case objects")( test("without annotation")( @@ -1505,6 +1573,23 @@ object JsonCodecSpec extends ZIOSpecDefault { assertZIO(stream)(equalTo(chunk)) } + private def assertEncodesMore[A]( + schema: Schema[A], + values: Seq[A], + chunk: Chunk[Byte], + cfg: JsonCodec.Config = JsonCodec.Config.default, + print: Boolean = false + ) = { + val stream = ZStream + .fromIterable(values) + .via(JsonCodec.schemaBasedBinaryCodec(cfg)(schema).streamEncoder) + .runCollect + .tap { chunk => + printLine(s"${new String(chunk.toArray)}").when(print).ignore + } + assertZIO(stream)(equalTo(chunk)) + } + private def assertEncodesJson[A]( schema: Schema[A], value: A, @@ -1551,6 +1636,16 @@ object JsonCodecSpec extends ZIOSpecDefault { assertZIO(result)(equalTo(Chunk(value))) } + private def assertDecodesMore[A]( + schema: Schema[A], + values: Chunk[A], + chunk: Chunk[Byte], + cfg: JsonCodec.Config = JsonCodec.Config.default + ) = { + val result = ZStream.fromChunk(chunk).via(JsonCodec.schemaBasedBinaryCodec[A](cfg)(schema).streamDecoder).runCollect + assertZIO(result)(equalTo(values)) + } + private def assertEncodesThenDecodesFallback[A, B]( schema: Schema.Fallback[A, B], value: Fallback[A, B] From e8519258beef30d6d3d6d3b5510bc9fe141476cb Mon Sep 17 00:00:00 2001 From: Gregor Rayman Date: Tue, 10 Sep 2024 06:19:32 +0200 Subject: [PATCH 2/8] scalafmt --- .../src/main/scala/zio/schema/codec/JsonCodec.scala | 10 ++++++---- .../test/scala/zio/schema/codec/JsonCodecSpec.scala | 8 ++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala b/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala index 72c84d988..b53716229 100644 --- a/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala +++ b/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala @@ -2,25 +2,27 @@ package zio.schema.codec import java.nio.CharBuffer import java.nio.charset.StandardCharsets + import scala.annotation.{ switch, tailrec } import scala.collection.immutable.ListMap + import zio.json.JsonCodec._ import zio.json.JsonDecoder.{ JsonError, UnsafeJson } import zio.json.ast.Json import zio.json.internal.{ Lexer, RecordingReader, RetractReader, StringMatrix, WithRecordingReader, Write } import zio.json.{ - JsonFieldDecoder, - JsonFieldEncoder, JsonCodec => ZJsonCodec, JsonDecoder => ZJsonDecoder, - JsonEncoder => ZJsonEncoder + JsonEncoder => ZJsonEncoder, + JsonFieldDecoder, + JsonFieldEncoder } import zio.prelude.NonEmptyMap import zio.schema._ import zio.schema.annotation._ import zio.schema.codec.DecodeError.ReadError import zio.stream.{ ZChannel, ZPipeline } -import zio.{ Cause, Chunk, ChunkBuilder, NonEmptyChunk, ZIO, ZNothing } +import zio.{ Cause, Chunk, ChunkBuilder, ZIO, ZNothing } object JsonCodec { diff --git a/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala b/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala index ec86e0c43..15a54e0a9 100644 --- a/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala +++ b/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala @@ -446,19 +446,19 @@ object JsonCodecSpec extends ZIOSpecDefault { }, test("Decodes a stream with multiple integers") { assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1\n2\n3\n4\n5")) &> - assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2 3 4 5")) &> - assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2, 3;;; 4x5")) + assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2 3 4 5")) &> + assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2, 3;;; 4x5")) }, test("Decodes a stream with multiple booleans") { assertDecodesMore(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("true true false")) &> - assertDecodesMore(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("truetruefalse")) + assertDecodesMore(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("truetruefalse")) }, test("Encodes a stream with multiple strings") { assertEncodesMore(Schema[String], List("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\"")) }, test("Decodes a stream with multiple strings") { assertDecodesMore(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\"")) &> - assertDecodesMore(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk(""""a" "b""c"""")) + assertDecodesMore(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk(""""a" "b""c"""")) }, test("Encodes a stream with multiple records") { assertEncodesMore( From 4d1f83d4f66fe5045e4906de16999a273ca22ba8 Mon Sep 17 00:00:00 2001 From: Gregor Rayman Date: Tue, 10 Sep 2024 06:36:44 +0200 Subject: [PATCH 3/8] Scala3 missing parentheses added --- .../shared/src/main/scala/zio/schema/codec/JsonCodec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala b/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala index b53716229..7107211b9 100644 --- a/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala +++ b/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala @@ -152,10 +152,10 @@ object JsonCodec { }, err => if (stringBuilder.isEmpty) ZChannel.refailCause(err) - else ZChannel.write(Chunk.single(stringBuilder.result)) *> ZChannel.refailCause(err), + else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.refailCause(err), done => if (stringBuilder.isEmpty) ZChannel.succeed(done) - else ZChannel.write(Chunk.single(stringBuilder.result)) *> ZChannel.succeed(done) + else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.succeed(done) ) ZPipeline.fromChannel(loop) From 5bd1a37537582f7cd386930fd73dd506c21c18aa Mon Sep 17 00:00:00 2001 From: Gregor Rayman Date: Tue, 10 Sep 2024 07:30:25 +0200 Subject: [PATCH 4/8] Java8 compatibility --- .../shared/src/main/scala/zio/schema/codec/JsonCodec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala b/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala index 7107211b9..0a885faa7 100644 --- a/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala +++ b/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala @@ -137,7 +137,7 @@ object JsonCodec { if (valueEnded && depth == 0) { val str = stringBuilder.result() - if (!str.isBlank) chunkBuilder += str + if (!str.forall(_.isWhitespace)) chunkBuilder += str stringBuilder.clear() } } From 6ef0d43821e302c5a5d72c2e489df4612ce4cd63 Mon Sep 17 00:00:00 2001 From: Gregor Rayman Date: Tue, 10 Sep 2024 07:36:19 +0200 Subject: [PATCH 5/8] removed unused import --- .../shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala b/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala index 15a54e0a9..a324e6ec7 100644 --- a/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala +++ b/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala @@ -20,8 +20,6 @@ import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ -import java.io.IOException - object JsonCodecSpec extends ZIOSpecDefault { case class Person(name: String, age: Int) From b14c5c32da1a1c122895a36daf798fa3cfb8ff37 Mon Sep 17 00:00:00 2001 From: Gregor Rayman Date: Tue, 10 Sep 2024 08:02:18 +0200 Subject: [PATCH 6/8] imports formatted for scalafix --- .../shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala b/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala index a324e6ec7..28dd627e0 100644 --- a/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala +++ b/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala @@ -1,7 +1,9 @@ package zio.schema.codec import java.time.{ ZoneId, ZoneOffset } + import scala.collection.immutable.ListMap + import zio.Console._ import zio._ import zio.json.JsonDecoder.JsonError From 052d94fcdd01ec46c653a8b41c61a6503511f36c Mon Sep 17 00:00:00 2001 From: Gregor Rayman Date: Tue, 10 Sep 2024 08:22:48 +0200 Subject: [PATCH 7/8] imports formatted for scalafix --- .../zio/schema/codec/JsonCodecSpec.scala | 169 +++++++++++------- 1 file changed, 106 insertions(+), 63 deletions(-) diff --git a/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala b/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala index 28dd627e0..487b42e7d 100644 --- a/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala +++ b/zio-schema-json/shared/src/test/scala/zio/schema/codec/JsonCodecSpec.scala @@ -441,69 +441,112 @@ object JsonCodecSpec extends ZIOSpecDefault { } ), suite("Streams")( - test("Encodes a stream with multiple integers") { - assertEncodesMore(Schema[Int], 1 to 5, charSequenceToByteChunk("1\n2\n3\n4\n5")) - }, - test("Decodes a stream with multiple integers") { - assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1\n2\n3\n4\n5")) &> - assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2 3 4 5")) &> - assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2, 3;;; 4x5")) - }, - test("Decodes a stream with multiple booleans") { - assertDecodesMore(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("true true false")) &> - assertDecodesMore(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("truetruefalse")) - }, - test("Encodes a stream with multiple strings") { - assertEncodesMore(Schema[String], List("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\"")) - }, - test("Decodes a stream with multiple strings") { - assertDecodesMore(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\"")) &> - assertDecodesMore(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk(""""a" "b""c"""")) - }, - test("Encodes a stream with multiple records") { - assertEncodesMore( - personSchema, - List( - Person("Alice", 1), - Person("Bob", 2), - Person("Charlie", 3) - ), - charSequenceToByteChunk( - """{"name":"Alice","age":1} - |{"name":"Bob","age":2} - |{"name":"Charlie","age":3}""".stripMargin + suite("Streams of integers")( + test("Encodes a stream with multiple integers") { + assertEncodesMany(Schema[Int], 1 to 5, charSequenceToByteChunk("1\n2\n3\n4\n5")) + }, + test("Decodes a stream with multiple integers separated by newlines") { + assertDecodesMany(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1\n2\n3\n4\n5")) + }, + test("Decodes a stream with multiple integers separated by spaces") { + assertDecodesMany(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2 3 4 5")) + }, + test("Decodes a stream with multiple integers separated by commas and other non JSON number characters") { + assertDecodesMany(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2, 3;;; 4x5")) + } + ), + suite("Streams of booleans")( + test("Encodes a stream with multiple booleans") { + assertEncodesMany(Schema[Boolean], List(true, true, false), charSequenceToByteChunk("true\ntrue\nfalse")) + }, + test("Decodes a stream with multiple booleans separated by newlines") { + assertDecodesMany(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("true\ntrue\nfalse")) + }, + test("Decodes a stream with multiple booleans separated by spaces") { + assertDecodesMany(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("true true false")) + }, + test( + "Decodes a stream with multiple booleans separated by commas and other non JSON boolean characters and not separated at all" + ) { + assertDecodesMany( + Schema[Boolean], + Chunk(true, true, false, false), + charSequenceToByteChunk("true true, falsefalse") ) - ) - }, - test("Decodes a stream with multiple records") { - assertDecodesMore( - personSchema, - Chunk( - Person("Alice", 1), - Person("Bob", 2), - Person("Charlie", 3) - ), - charSequenceToByteChunk( - """{"name":"Alice","age":1} - |{"name":"Bob","age":2} - |{"name":"Charlie","age":3}""".stripMargin + } + ), + suite("Streams of strings")( + test("Encodes a stream with multiple strings") { + assertEncodesMany(Schema[String], List("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\"")) + }, + test("Decodes a stream with multiple strings separated by newlines") { + assertDecodesMany(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\"")) + }, + test("Decodes a stream with multiple strings separated by spaces, commas and not separated at all") { + assertDecodesMany(Schema[String], Chunk("a", "b", "c", "d"), charSequenceToByteChunk(""""a" "b","c""d"""")) + } + ), + suite("Stream of records")( + test("Encodes a stream with multiple records") { + assertEncodesMany( + personSchema, + List( + Person("Alice", 1), + Person("Bob", 2), + Person("Charlie", 3) + ), + charSequenceToByteChunk( + """{"name":"Alice","age":1} + |{"name":"Bob","age":2} + |{"name":"Charlie","age":3}""".stripMargin + ) ) - ) - }, - test("Encodes a stream with no records") { - assertEncodesMore( - personSchema, - List.empty[Person], - charSequenceToByteChunk("") - ) - }, - test("Decodes a stream with no records") { - assertDecodesMore( - personSchema, - Chunk.empty, - charSequenceToByteChunk("") - ) - } + }, + test("Decodes a stream with multiple records separated by newlines") { + assertDecodesMany( + personSchema, + Chunk( + Person("Alice", 1), + Person("Bob", 2), + Person("Charlie", 3) + ), + charSequenceToByteChunk( + """{"name":"Alice","age":1} + |{"name":"Bob","age":2} + |{"name":"Charlie","age":3}""".stripMargin + ) + ) + }, + test("Decodes a stream with multiple records, not separated with internalnewlines") { + assertDecodesMany( + personSchema, + Chunk( + Person("Alice", 1), + Person("Bob", 2), + Person("Charlie", 3) + ), + charSequenceToByteChunk( + """{"name":"Alice","age":1}{"name":"Bob", + |"age" + |:2}{"name":"Charlie","age":3}""".stripMargin + ) + ) + }, + test("Encodes a stream with no records") { + assertEncodesMany( + personSchema, + List.empty[Person], + charSequenceToByteChunk("") + ) + }, + test("Decodes a stream with no records") { + assertDecodesMany( + personSchema, + Chunk.empty, + charSequenceToByteChunk("") + ) + } + ) ) ) @@ -1573,7 +1616,7 @@ object JsonCodecSpec extends ZIOSpecDefault { assertZIO(stream)(equalTo(chunk)) } - private def assertEncodesMore[A]( + private def assertEncodesMany[A]( schema: Schema[A], values: Seq[A], chunk: Chunk[Byte], @@ -1636,7 +1679,7 @@ object JsonCodecSpec extends ZIOSpecDefault { assertZIO(result)(equalTo(Chunk(value))) } - private def assertDecodesMore[A]( + private def assertDecodesMany[A]( schema: Schema[A], values: Chunk[A], chunk: Chunk[Byte], From 52d42a0c0b91cb4b4917ae9f563b0f4e841ac8a5 Mon Sep 17 00:00:00 2001 From: Gregor Rayman Date: Tue, 10 Sep 2024 09:09:11 +0200 Subject: [PATCH 8/8] small refactoring - named constants in JsonCodec.splitOnJsonBoundary --- .../scala/zio/schema/codec/JsonCodec.scala | 54 +++++++++++-------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala b/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala index 0a885faa7..27dfed462 100644 --- a/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala +++ b/zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala @@ -65,11 +65,19 @@ object JsonCodec { schemaBasedBinaryCodec[A](JsonCodec.Config.default) val splitOnJsonBoundary: ZPipeline[Any, Nothing, String, String] = { - val validNumChars = Set('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'E', 'e', '-', '+', '.') + val validNumChars = Set('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'E', 'e', '-', '+', '.') + val ContextJson = 'j' + val ContextString = 's' + val ContextBoolean = 'b' + val ContextNull = 'u' + val ContextNullAfterFirstL = 'x' + val ContextNumber = 'n' + val ContextEscape = 'e' + ZPipeline.suspend { val stringBuilder = new StringBuilder var depth = 0 - var valueType = 'j' // j = json, s = string, b = boolean, u = null, n = number, x = null after first null, e = escape in string + var context = ContextJson def fetchChunk(chunk: Chunk[String]): Chunk[String] = { val chunkBuilder = ChunkBuilder.make[String]() @@ -78,43 +86,43 @@ object JsonCodec { c <- string } { var valueEnded = false - valueType match { - case 'e' => - valueType = 's' - case 's' => + context match { + case ContextEscape => + context = 's' + case ContextString => c match { - case '\\' => valueType = 'e' + case '\\' => context = ContextEscape case '"' => - valueType = 'j' + context = ContextJson valueEnded = true case _ => } - case 'b' => + case ContextBoolean => if (c == 'e') { - valueType = 'j' + context = ContextJson valueEnded = true } - case 'u' => + case ContextNull => if (c == 'l') { - valueType = 'x' + context = ContextNullAfterFirstL } - case 'x' => + case ContextNullAfterFirstL => if (c == 'l') { - valueType = 'j' + context = ContextJson valueEnded = true } - case 'n' => + case ContextNumber => c match { case '}' | ']' => depth -= 1 - valueType = 'j' + context = ContextJson valueEnded = true case _ if !validNumChars(c) => - valueType = 'j' + context = ContextJson valueEnded = true case _ => } - case 'j' => + case _ => c match { case '{' | '[' => depth += 1 @@ -122,17 +130,17 @@ object JsonCodec { depth -= 1 valueEnded = true case '"' => - valueType = 's' + context = ContextString case 't' | 'f' => - valueType = 'b' + context = ContextBoolean case 'n' => - valueType = 'u' + context = ContextNull case x if validNumChars(x) => - valueType = 'n' + context = ContextNumber case _ => } } - if (depth > 0 || valueType != 'j' || valueEnded) + if (depth > 0 || context != ContextJson || valueEnded) stringBuilder.append(c) if (valueEnded && depth == 0) {