Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #737 #738

Merged
merged 8 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import zio.prelude.NonEmptyMap
import zio.schema._
import zio.schema.annotation._
import zio.schema.codec.DecodeError.ReadError
import zio.stream.ZPipeline
import zio.{ Cause, Chunk, ChunkBuilder, NonEmptyChunk, ZIO }
import zio.stream.{ ZChannel, ZPipeline }
import zio.{ Cause, Chunk, ChunkBuilder, ZIO, ZNothing }

object JsonCodec {

Expand All @@ -47,11 +47,7 @@ object JsonCodec {
override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] =
ZPipeline.fromChannel(
ZPipeline.utfDecode.channel.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage))
) >>>
ZPipeline.groupAdjacentBy[String, Unit](_ => ()) >>>
ZPipeline.map[(Unit, NonEmptyChunk[String]), String] {
case (_, fragments) => fragments.mkString
} >>>
) >>> splitOnJsonBoundary >>>
ZPipeline.mapZIO { (s: String) =>
ZIO
.fromEither(jsonCodec.decodeJson(s))
Expand All @@ -62,14 +58,110 @@ object JsonCodec {
JsonEncoder.charSequenceToByteChunk(jsonCodec.encodeJson(value, None))

override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] =
ZPipeline.mapChunks(
_.flatMap(encode)
)
ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks
}

implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): BinaryCodec[A] =
schemaBasedBinaryCodec[A](JsonCodec.Config.default)

val splitOnJsonBoundary: ZPipeline[Any, Nothing, String, String] = {
val validNumChars = Set('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'E', 'e', '-', '+', '.')
ZPipeline.suspend {
val stringBuilder = new StringBuilder
var depth = 0
var valueType = 'j' // j = json, s = string, b = boolean, u = null, n = number, x = null after first null, e = escape in string
gregor-rayman marked this conversation as resolved.
Show resolved Hide resolved

def fetchChunk(chunk: Chunk[String]): Chunk[String] = {
val chunkBuilder = ChunkBuilder.make[String]()
for {
string <- chunk
c <- string
} {
var valueEnded = false
valueType match {
case 'e' =>
valueType = 's'
case 's' =>
c match {
case '\\' => valueType = 'e'
case '"' =>
valueType = 'j'
valueEnded = true
case _ =>
}
case 'b' =>
if (c == 'e') {
valueType = 'j'
valueEnded = true
}
case 'u' =>
if (c == 'l') {
valueType = 'x'
}
case 'x' =>
if (c == 'l') {
valueType = 'j'
valueEnded = true
}
case 'n' =>
c match {
case '}' | ']' =>
depth -= 1
valueType = 'j'
valueEnded = true
case _ if !validNumChars(c) =>
valueType = 'j'
valueEnded = true
case _ =>
}
case 'j' =>
c match {
case '{' | '[' =>
depth += 1
case '}' | ']' =>
depth -= 1
valueEnded = true
case '"' =>
valueType = 's'
case 't' | 'f' =>
valueType = 'b'
case 'n' =>
valueType = 'u'
case x if validNumChars(x) =>
valueType = 'n'
case _ =>
}
}
if (depth > 0 || valueType != 'j' || valueEnded)
stringBuilder.append(c)

if (valueEnded && depth == 0) {
val str = stringBuilder.result()
if (!str.forall(_.isWhitespace)) chunkBuilder += str
stringBuilder.clear()
}
}
chunkBuilder.result()
}

lazy val loop: ZChannel[Any, ZNothing, Chunk[String], Any, Nothing, Chunk[String], Any] =
ZChannel.readWithCause(
in => {
val out = fetchChunk(in)
if (out.isEmpty) loop else ZChannel.write(out) *> loop
},
err =>
if (stringBuilder.isEmpty) ZChannel.refailCause(err)
else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.refailCause(err),
done =>
if (stringBuilder.isEmpty) ZChannel.succeed(done)
else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.succeed(done)
)

ZPipeline.fromChannel(loop)
}
}

def schemaBasedBinaryCodec[A](cfg: Config)(implicit schema: Schema[A]): BinaryCodec[A] =
new BinaryCodec[A] {
override def decode(whole: Chunk[Byte]): Either[DecodeError, A] =
Expand All @@ -80,10 +172,7 @@ object JsonCodec {

override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] =
ZPipeline.utfDecode.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage)) >>>
ZPipeline.groupAdjacentBy[String, Unit](_ => ()) >>>
ZPipeline.map[(Unit, NonEmptyChunk[String]), String] {
case (_, fragments) => fragments.mkString
} >>>
splitOnJsonBoundary >>>
ZPipeline.mapZIO { (s: String) =>
ZIO.fromEither(JsonDecoder.decode(schema, s))
}
Expand All @@ -92,9 +181,7 @@ object JsonCodec {
JsonEncoder.encode(schema, value, cfg)

override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] =
ZPipeline.mapChunks(
_.flatMap(encode)
)
ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks
}

def jsonEncoder[A](schema: Schema[A]): ZJsonEncoder[A] =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package zio.schema.codec

import java.time.{ ZoneId, ZoneOffset }

import scala.collection.immutable.ListMap

import zio.Console._
import zio._
import zio.json.JsonDecoder.JsonError
Expand All @@ -24,6 +22,9 @@ import zio.test._

object JsonCodecSpec extends ZIOSpecDefault {

case class Person(name: String, age: Int)
val personSchema: Schema[Person] = DeriveSchema.gen[Person]

def spec: Spec[TestEnvironment, Any] =
suite("JsonCodec Spec")(
encoderSuite,
Expand Down Expand Up @@ -436,6 +437,71 @@ object JsonCodecSpec extends ZIOSpecDefault {
"""{"NumberOne":{"value":"foo"}}"""
)
}
),
suite("Streams")(
test("Encodes a stream with multiple integers") {
assertEncodesMore(Schema[Int], 1 to 5, charSequenceToByteChunk("1\n2\n3\n4\n5"))
},
test("Decodes a stream with multiple integers") {
gregor-rayman marked this conversation as resolved.
Show resolved Hide resolved
assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1\n2\n3\n4\n5")) &>
assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2 3 4 5")) &>
assertDecodesMore(Schema[Int], Chunk.fromIterable(1 to 5), charSequenceToByteChunk("1 2, 3;;; 4x5"))
},
test("Decodes a stream with multiple booleans") {
assertDecodesMore(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("true true false")) &>
assertDecodesMore(Schema[Boolean], Chunk(true, true, false), charSequenceToByteChunk("truetruefalse"))
},
test("Encodes a stream with multiple strings") {
assertEncodesMore(Schema[String], List("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\""))
},
test("Decodes a stream with multiple strings") {
assertDecodesMore(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk("\"a\"\n\"b\"\n\"c\"")) &>
assertDecodesMore(Schema[String], Chunk("a", "b", "c"), charSequenceToByteChunk(""""a" "b""c""""))
},
test("Encodes a stream with multiple records") {
assertEncodesMore(
personSchema,
List(
Person("Alice", 1),
Person("Bob", 2),
Person("Charlie", 3)
),
charSequenceToByteChunk(
"""{"name":"Alice","age":1}
|{"name":"Bob","age":2}
|{"name":"Charlie","age":3}""".stripMargin
)
)
},
test("Decodes a stream with multiple records") {
assertDecodesMore(
personSchema,
Chunk(
Person("Alice", 1),
Person("Bob", 2),
Person("Charlie", 3)
),
charSequenceToByteChunk(
"""{"name":"Alice","age":1}
|{"name":"Bob","age":2}
|{"name":"Charlie","age":3}""".stripMargin
)
)
},
test("Encodes a stream with no records") {
assertEncodesMore(
personSchema,
List.empty[Person],
charSequenceToByteChunk("")
)
},
test("Decodes a stream with no records") {
assertDecodesMore(
personSchema,
Chunk.empty,
charSequenceToByteChunk("")
)
}
)
)

Expand Down Expand Up @@ -1307,7 +1373,7 @@ object JsonCodecSpec extends ZIOSpecDefault {
),
test("decode discriminated case objects with extra fields")(
assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"type":"Cash","extraField":1}""")) &>
assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"extraField":1,"type":"Cash"}""""))
assertDecodes(Schema[Command], Command.Cash, charSequenceToByteChunk("""{"extraField":1,"type":"Cash"}"""))
),
suite("of case objects")(
test("without annotation")(
Expand Down Expand Up @@ -1505,6 +1571,23 @@ object JsonCodecSpec extends ZIOSpecDefault {
assertZIO(stream)(equalTo(chunk))
}

private def assertEncodesMore[A](
gregor-rayman marked this conversation as resolved.
Show resolved Hide resolved
schema: Schema[A],
values: Seq[A],
chunk: Chunk[Byte],
cfg: JsonCodec.Config = JsonCodec.Config.default,
print: Boolean = false
) = {
val stream = ZStream
.fromIterable(values)
.via(JsonCodec.schemaBasedBinaryCodec(cfg)(schema).streamEncoder)
.runCollect
.tap { chunk =>
printLine(s"${new String(chunk.toArray)}").when(print).ignore
}
assertZIO(stream)(equalTo(chunk))
}

private def assertEncodesJson[A](
schema: Schema[A],
value: A,
Expand Down Expand Up @@ -1551,6 +1634,16 @@ object JsonCodecSpec extends ZIOSpecDefault {
assertZIO(result)(equalTo(Chunk(value)))
}

private def assertDecodesMore[A](
schema: Schema[A],
values: Chunk[A],
chunk: Chunk[Byte],
cfg: JsonCodec.Config = JsonCodec.Config.default
) = {
val result = ZStream.fromChunk(chunk).via(JsonCodec.schemaBasedBinaryCodec[A](cfg)(schema).streamDecoder).runCollect
assertZIO(result)(equalTo(values))
}

private def assertEncodesThenDecodesFallback[A, B](
schema: Schema.Fallback[A, B],
value: Fallback[A, B]
Expand Down
Loading