Skip to content

Commit

Permalink
Fixes #737
Browse files Browse the repository at this point in the history
JsonCodec can encode and decode multiple elements
  • Loading branch information
gregor-rayman committed Sep 9, 2024
1 parent c741f6b commit 9ae0f62
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 25 deletions.
129 changes: 107 additions & 22 deletions zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,25 @@ package zio.schema.codec

import java.nio.CharBuffer
import java.nio.charset.StandardCharsets

import scala.annotation.{ switch, tailrec }
import scala.collection.immutable.ListMap

import zio.json.JsonCodec._
import zio.json.JsonDecoder.{ JsonError, UnsafeJson }
import zio.json.ast.Json
import zio.json.internal.{ Lexer, RecordingReader, RetractReader, StringMatrix, WithRecordingReader, Write }
import zio.json.{
JsonFieldDecoder,
JsonFieldEncoder,
JsonCodec => ZJsonCodec,
JsonDecoder => ZJsonDecoder,
JsonEncoder => ZJsonEncoder,
JsonFieldDecoder,
JsonFieldEncoder
JsonEncoder => ZJsonEncoder
}
import zio.prelude.NonEmptyMap
import zio.schema._
import zio.schema.annotation._
import zio.schema.codec.DecodeError.ReadError
import zio.stream.ZPipeline
import zio.{ Cause, Chunk, ChunkBuilder, NonEmptyChunk, ZIO }
import zio.stream.{ ZChannel, ZPipeline }
import zio.{ Cause, Chunk, ChunkBuilder, NonEmptyChunk, ZIO, ZNothing }

object JsonCodec {

Expand All @@ -47,11 +45,7 @@ object JsonCodec {
override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] =
ZPipeline.fromChannel(
ZPipeline.utfDecode.channel.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage))
) >>>
ZPipeline.groupAdjacentBy[String, Unit](_ => ()) >>>
ZPipeline.map[(Unit, NonEmptyChunk[String]), String] {
case (_, fragments) => fragments.mkString
} >>>
) >>> splitOnJsonBoundary >>>
ZPipeline.mapZIO { (s: String) =>
ZIO
.fromEither(jsonCodec.decodeJson(s))
Expand All @@ -62,14 +56,110 @@ object JsonCodec {
JsonEncoder.charSequenceToByteChunk(jsonCodec.encodeJson(value, None))

override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] =
ZPipeline.mapChunks(
_.flatMap(encode)
)
ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks
}

implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): BinaryCodec[A] =
schemaBasedBinaryCodec[A](JsonCodec.Config.default)

val splitOnJsonBoundary: ZPipeline[Any, Nothing, String, String] = {
val validNumChars = Set('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'E', 'e', '-', '+', '.')
ZPipeline.suspend {
val stringBuilder = new StringBuilder
var depth = 0
var valueType = 'j' // j = json, s = string, b = boolean, u = null, n = number, x = null after first null, e = escape in string

def fetchChunk(chunk: Chunk[String]): Chunk[String] = {
val chunkBuilder = ChunkBuilder.make[String]()
for {
string <- chunk
c <- string
} {
var valueEnded = false
valueType match {
case 'e' =>
valueType = 's'
case 's' =>
c match {
case '\\' => valueType = 'e'
case '"' =>
valueType = 'j'
valueEnded = true
case _ =>
}
case 'b' =>
if (c == 'e') {
valueType = 'j'
valueEnded = true
}
case 'u' =>
if (c == 'l') {
valueType = 'x'
}
case 'x' =>
if (c == 'l') {
valueType = 'j'
valueEnded = true
}
case 'n' =>
c match {
case '}' | ']' =>
depth -= 1
valueType = 'j'
valueEnded = true
case _ if !validNumChars(c) =>
valueType = 'j'
valueEnded = true
case _ =>
}
case 'j' =>
c match {
case '{' | '[' =>
depth += 1
case '}' | ']' =>
depth -= 1
valueEnded = true
case '"' =>
valueType = 's'
case 't' | 'f' =>
valueType = 'b'
case 'n' =>
valueType = 'u'
case x if validNumChars(x) =>
valueType = 'n'
case _ =>
}
}
if (depth > 0 || valueType != 'j' || valueEnded)
stringBuilder.append(c)

if (valueEnded && depth == 0) {
val str = stringBuilder.result()
if (!str.isBlank) chunkBuilder += str
stringBuilder.clear()
}
}
chunkBuilder.result()
}

lazy val loop: ZChannel[Any, ZNothing, Chunk[String], Any, Nothing, Chunk[String], Any] =
ZChannel.readWithCause(
in => {
val out = fetchChunk(in)
if (out.isEmpty) loop else ZChannel.write(out) *> loop
},
err =>
if (stringBuilder.isEmpty) ZChannel.refailCause(err)
else ZChannel.write(Chunk.single(stringBuilder.result)) *> ZChannel.refailCause(err),
done =>
if (stringBuilder.isEmpty) ZChannel.succeed(done)
else ZChannel.write(Chunk.single(stringBuilder.result)) *> ZChannel.succeed(done)
)

ZPipeline.fromChannel(loop)
}
}

def schemaBasedBinaryCodec[A](cfg: Config)(implicit schema: Schema[A]): BinaryCodec[A] =
new BinaryCodec[A] {
override def decode(whole: Chunk[Byte]): Either[DecodeError, A] =
Expand All @@ -80,10 +170,7 @@ object JsonCodec {

override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] =
ZPipeline.utfDecode.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage)) >>>
ZPipeline.groupAdjacentBy[String, Unit](_ => ()) >>>
ZPipeline.map[(Unit, NonEmptyChunk[String]), String] {
case (_, fragments) => fragments.mkString
} >>>
splitOnJsonBoundary >>>
ZPipeline.mapZIO { (s: String) =>
ZIO.fromEither(JsonDecoder.decode(schema, s))
}
Expand All @@ -92,9 +179,7 @@ object JsonCodec {
JsonEncoder.encode(schema, value, cfg)

override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] =
ZPipeline.mapChunks(
_.flatMap(encode)
)
ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks
}

def jsonEncoder[A](schema: Schema[A]): ZJsonEncoder[A] =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package zio.schema.codec

import java.time.{ ZoneId, ZoneOffset }

import scala.collection.immutable.ListMap

import zio.Console._
import zio._
import zio.json.JsonDecoder.JsonError
Expand All @@ -22,8 +20,13 @@ import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

import java.io.IOException

object JsonCodecSpec extends ZIOSpecDefault {

case class Person(name: String, age: Int)
val personSchema: Schema[Person] = DeriveSchema.gen[Person]

def spec: Spec[TestEnvironment, Any] =
suite("JsonCodec Spec")(
encoderSuite,
Expand Down Expand Up @@ -436,6 +439,71 @@ object JsonCodecSpec extends ZIOSpecDefault {
"""{"NumberOne":{"value":"foo"}}"""
)
}
),
suite("Streams")(
test("Encodes a stream with multiple integers") {
assertEncodesMore(Schema[Int], 1 to 5, charSequenceToByteChunk("1\n2\n3\n4\n5"))
},
test("Decodes a stream with multiple integers") {
assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1\n2\n3\n4\n5")) &>
assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2 3 4 5")) &>
assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2, 3;;; 4x5"))
},
test("Decodes a stream with multiple booleans") {
assertDecodesMore(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("true true false")) &>
assertDecodesMore(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("truetruefalse"))
},
test("Encodes a stream with multiple strings") {
assertEncodesMore(Schema[String], List("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\""))
},
test("Decodes a stream with multiple strings") {
assertDecodesMore(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\"")) &>
assertDecodesMore(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk(""""a" "b""c""""))
},
test("Encodes a stream with multiple records") {
assertEncodesMore(
personSchema,
List(
Person("Alice", 1),
Person("Bob", 2),
Person("Charlie", 3)
),
charSequenceToByteChunk(
"""{"name":"Alice","age":1}
|{"name":"Bob","age":2}
|{"name":"Charlie","age":3}""".stripMargin
)
)
},
test("Decodes a stream with multiple records") {
assertDecodesMore(
personSchema,
Chunk(
Person("Alice", 1),
Person("Bob", 2),
Person("Charlie", 3)
),
charSequenceToByteChunk(
"""{"name":"Alice","age":1}
|{"name":"Bob","age":2}
|{"name":"Charlie","age":3}""".stripMargin
)
)
},
test("Encodes a stream with no records") {
assertEncodesMore(
personSchema,
List.empty[Person],
charSequenceToByteChunk("")
)
},
test("Decodes a stream with no records") {
assertDecodesMore(
personSchema,
Chunk.empty,
charSequenceToByteChunk("")
)
}
)
)

Expand Down Expand Up @@ -1307,7 +1375,7 @@ object JsonCodecSpec extends ZIOSpecDefault {
),
test("decode discriminated case objects with extra fields")(
assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"type":"Cash","extraField":1}""")) &>
assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"extraField":1,"type":"Cash"}""""))
assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"extraField":1,"type":"Cash"}"""))
),
suite("of case objects")(
test("without annotation")(
Expand Down Expand Up @@ -1505,6 +1573,23 @@ object JsonCodecSpec extends ZIOSpecDefault {
assertZIO(stream)(equalTo(chunk))
}

private def assertEncodesMore[A](
schema: Schema[A],
values: Seq[A],
chunk: Chunk[Byte],
cfg: JsonCodec.Config = JsonCodec.Config.default,
print: Boolean = false
) = {
val stream = ZStream
.fromIterable(values)
.via(JsonCodec.schemaBasedBinaryCodec(cfg)(schema).streamEncoder)
.runCollect
.tap { chunk =>
printLine(s"${new String(chunk.toArray)}").when(print).ignore
}
assertZIO(stream)(equalTo(chunk))
}

private def assertEncodesJson[A](
schema: Schema[A],
value: A,
Expand Down Expand Up @@ -1551,6 +1636,16 @@ object JsonCodecSpec extends ZIOSpecDefault {
assertZIO(result)(equalTo(Chunk(value)))
}

private def assertDecodesMore[A](
schema: Schema[A],
values: Chunk[A],
chunk: Chunk[Byte],
cfg: JsonCodec.Config = JsonCodec.Config.default
) = {
val result = ZStream.fromChunk(chunk).via(JsonCodec.schemaBasedBinaryCodec[A](cfg)(schema).streamDecoder).runCollect
assertZIO(result)(equalTo(values))
}

private def assertEncodesThenDecodesFallback[A, B](
schema: Schema.Fallback[A, B],
value: Fallback[A, B]
Expand Down

0 comments on commit 9ae0f62

Please sign in to comment.