Skip to content

Commit

Permalink
JsonCodec can now encode/decode streams as JSON arrays. Whether the c…
Browse files Browse the repository at this point in the history
…odec will treat streams as arrays is governed by `JsonCodec.Config.treatStreamsAsArrays` which defaults to `false` to keep it compatible with the previous version. (#739)
  • Loading branch information
gregor-rayman authored Sep 11, 2024
1 parent 6b9753c commit 41892d5
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 99 deletions.
215 changes: 117 additions & 98 deletions zio-schema-json/shared/src/main/scala/zio/schema/codec/JsonCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import zio.{ Cause, Chunk, ChunkBuilder, ZIO, ZNothing }

object JsonCodec {

final case class Config(ignoreEmptyCollections: Boolean)
final case class Config(ignoreEmptyCollections: Boolean, treatStreamsAsArrays: Boolean = false)

object Config {
val default: Config = Config(ignoreEmptyCollections = false)
Expand Down Expand Up @@ -64,112 +64,121 @@ object JsonCodec {
implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): BinaryCodec[A] =
schemaBasedBinaryCodec[A](JsonCodec.Config.default)

val splitOnJsonBoundary: ZPipeline[Any, Nothing, String, String] = {
val validNumChars = Set('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'E', 'e', '-', '+', '.')
val ContextJson = 'j'
val ContextString = 's'
val ContextBoolean = 'b'
val ContextNull = 'u'
val ContextNullAfterFirstL = 'x'
val ContextNumber = 'n'
val ContextEscape = 'e'

ZPipeline.suspend {
val stringBuilder = new StringBuilder
var depth = 0
var context = ContextJson

def fetchChunk(chunk: Chunk[String]): Chunk[String] = {
val chunkBuilder = ChunkBuilder.make[String]()
for {
string <- chunk
c <- string
} {
var valueEnded = false
context match {
case ContextEscape =>
context = 's'
case ContextString =>
c match {
case '\\' => context = ContextEscape
case '"' =>
context = ContextJson
valueEnded = true
case _ =>
}
case ContextBoolean =>
if (c == 'e') {
context = ContextJson
valueEnded = true
}
case ContextNull =>
if (c == 'l') {
context = ContextNullAfterFirstL
}
case ContextNullAfterFirstL =>
if (c == 'l') {
context = ContextJson
valueEnded = true
}
case ContextNumber =>
c match {
case '}' | ']' =>
depth -= 1
private object JsonSplitter {
val validNumChars: Set[Char] = Set('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'E', 'e', '-', '+', '.')
val ContextJson = 'j'
val ContextString = 's'
val ContextBoolean = 'b'
val ContextNull = 'u'
val ContextNullAfterFirstL = 'x'
val ContextNumber = 'n'
val ContextEscape = 'e'
val ContextDone = 'd'

def jsonSplitter(wrappedInArray: Boolean): ZPipeline[Any, Nothing, String, String] =
ZPipeline.suspend {
val stringBuilder = new StringBuilder
var depth = if (wrappedInArray) -1 else 0
var context = ContextJson

def fetchChunk(chunk: Chunk[String]): Chunk[String] = {
val chunkBuilder = ChunkBuilder.make[String]()
for {
string <- chunk
c <- string
} {
var valueEnded = false
context match {
case ContextEscape =>
context = 's'
case ContextString =>
c match {
case '\\' => context = ContextEscape
case '"' =>
context = ContextJson
valueEnded = true
case _ =>
}
case ContextBoolean =>
if (c == 'e') {
context = ContextJson
valueEnded = true
case _ if !validNumChars(c) =>
}
case ContextNull =>
if (c == 'l') {
context = ContextNullAfterFirstL
}
case ContextNullAfterFirstL =>
if (c == 'l') {
context = ContextJson
valueEnded = true
case _ =>
}
case _ =>
c match {
case '{' | '[' =>
depth += 1
case '}' | ']' =>
depth -= 1
valueEnded = true
case '"' =>
context = ContextString
case 't' | 'f' =>
context = ContextBoolean
case 'n' =>
context = ContextNull
case x if validNumChars(x) =>
context = ContextNumber
case _ =>
}
}
if (depth > 0 || context != ContextJson || valueEnded)
stringBuilder.append(c)
}
case ContextNumber =>
c match {
case '}' | ']' =>
depth -= 1
context = if (depth < 0) ContextDone else ContextJson
valueEnded = true
case _ if !validNumChars(c) =>
context = ContextJson
valueEnded = true
case _ =>
}
case ContextDone => // no more values, ignore everything
case _ =>
c match {
case '{' | '[' =>
depth += 1
case '}' | ']' =>
depth -= 1
valueEnded = true
if (depth == -1) context = ContextDone
case '"' =>
context = ContextString
case 't' | 'f' =>
context = ContextBoolean
case 'n' =>
context = ContextNull
case x if validNumChars(x) =>
context = ContextNumber
case _ =>
}
}
if (context != ContextDone && (depth > 0 || context != ContextJson || valueEnded))
stringBuilder.append(c)

if (valueEnded && depth == 0) {
val str = stringBuilder.result()
if (!str.forall(_.isWhitespace)) chunkBuilder += str
stringBuilder.clear()
if (valueEnded && depth == 0) {
val str = stringBuilder.result()
if (!str.forall(_.isWhitespace)) {
chunkBuilder += str
}
stringBuilder.clear()
}
}
chunkBuilder.result()
}
chunkBuilder.result()
}

lazy val loop: ZChannel[Any, ZNothing, Chunk[String], Any, Nothing, Chunk[String], Any] =
ZChannel.readWithCause(
in => {
val out = fetchChunk(in)
if (out.isEmpty) loop else ZChannel.write(out) *> loop
},
err =>
if (stringBuilder.isEmpty) ZChannel.refailCause(err)
else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.refailCause(err),
done =>
if (stringBuilder.isEmpty) ZChannel.succeed(done)
else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.succeed(done)
)
lazy val loop: ZChannel[Any, ZNothing, Chunk[String], Any, Nothing, Chunk[String], Any] =
ZChannel.readWithCause(
in => {
val out = fetchChunk(in)
if (out.isEmpty) loop else ZChannel.write(out) *> loop
},
err =>
if (stringBuilder.isEmpty) ZChannel.refailCause(err)
else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.refailCause(err),
done =>
if (stringBuilder.isEmpty) ZChannel.succeed(done)
else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.succeed(done)
)

ZPipeline.fromChannel(loop)
}
ZPipeline.fromChannel(loop)
}
}

val splitOnJsonBoundary: ZPipeline[Any, Nothing, String, String] = JsonSplitter.jsonSplitter(wrappedInArray = false)
val splitJsonArrayElements: ZPipeline[Any, Nothing, String, String] = JsonSplitter.jsonSplitter(wrappedInArray = true)

def schemaBasedBinaryCodec[A](cfg: Config)(implicit schema: Schema[A]): BinaryCodec[A] =
new BinaryCodec[A] {
override def decode(whole: Chunk[Byte]): Either[DecodeError, A] =
Expand All @@ -180,7 +189,7 @@ object JsonCodec {

override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] =
ZPipeline.utfDecode.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage)) >>>
splitOnJsonBoundary >>>
(if (cfg.treatStreamsAsArrays) splitJsonArrayElements else splitOnJsonBoundary) >>>
ZPipeline.mapZIO { (s: String) =>
ZIO.fromEither(JsonDecoder.decode(schema, s))
}
Expand All @@ -189,7 +198,17 @@ object JsonCodec {
JsonEncoder.encode(schema, value, cfg)

override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] =
ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks
if (cfg.treatStreamsAsArrays) {
val interspersed: ZPipeline[Any, Nothing, A, Byte] = ZPipeline
.mapChunks[A, Chunk[Byte]](_.map(encode))
.intersperse(Chunk.single(','.toByte))
.flattenChunks
val prepended: ZPipeline[Any, Nothing, A, Byte] =
interspersed >>> ZPipeline.prepend(Chunk.single('['.toByte))
prepended >>> ZPipeline.append(Chunk.single(']'.toByte))
} else {
ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks
}
}

def jsonEncoder[A](schema: Schema[A]): ZJsonEncoder[A] =
Expand Down
Loading

0 comments on commit 41892d5

Please sign in to comment.