Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JsonCodec can now encode/decode streams as JSON arrays. #739

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import zio.{ Cause, Chunk, ChunkBuilder, ZIO, ZNothing }

object JsonCodec {

final case class Config(ignoreEmptyCollections: Boolean)
final case class Config(ignoreEmptyCollections: Boolean, treatStreamsAsArrays: Boolean = false)

object Config {
val default: Config = Config(ignoreEmptyCollections = false)
Expand Down Expand Up @@ -64,112 +64,121 @@ object JsonCodec {
implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): BinaryCodec[A] =
schemaBasedBinaryCodec[A](JsonCodec.Config.default)

val splitOnJsonBoundary: ZPipeline[Any, Nothing, String, String] = {
val validNumChars = Set('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'E', 'e', '-', '+', '.')
val ContextJson = 'j'
val ContextString = 's'
val ContextBoolean = 'b'
val ContextNull = 'u'
val ContextNullAfterFirstL = 'x'
val ContextNumber = 'n'
val ContextEscape = 'e'

ZPipeline.suspend {
val stringBuilder = new StringBuilder
var depth = 0
var context = ContextJson

def fetchChunk(chunk: Chunk[String]): Chunk[String] = {
val chunkBuilder = ChunkBuilder.make[String]()
for {
string <- chunk
c <- string
} {
var valueEnded = false
context match {
case ContextEscape =>
context = 's'
case ContextString =>
c match {
case '\\' => context = ContextEscape
case '"' =>
context = ContextJson
valueEnded = true
case _ =>
}
case ContextBoolean =>
if (c == 'e') {
context = ContextJson
valueEnded = true
}
case ContextNull =>
if (c == 'l') {
context = ContextNullAfterFirstL
}
case ContextNullAfterFirstL =>
if (c == 'l') {
context = ContextJson
valueEnded = true
}
case ContextNumber =>
c match {
case '}' | ']' =>
depth -= 1
private object JsonSplitter {
val validNumChars: Set[Char] = Set('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'E', 'e', '-', '+', '.')
val ContextJson = 'j'
val ContextString = 's'
val ContextBoolean = 'b'
val ContextNull = 'u'
val ContextNullAfterFirstL = 'x'
val ContextNumber = 'n'
val ContextEscape = 'e'
val ContextDone = 'd'

def jsonSplitter(wrappedInArray: Boolean): ZPipeline[Any, Nothing, String, String] =
ZPipeline.suspend {
val stringBuilder = new StringBuilder
var depth = if (wrappedInArray) -1 else 0
var context = ContextJson

def fetchChunk(chunk: Chunk[String]): Chunk[String] = {
val chunkBuilder = ChunkBuilder.make[String]()
for {
string <- chunk
c <- string
} {
var valueEnded = false
context match {
case ContextEscape =>
context = 's'
case ContextString =>
c match {
case '\\' => context = ContextEscape
case '"' =>
context = ContextJson
valueEnded = true
case _ =>
}
case ContextBoolean =>
if (c == 'e') {
context = ContextJson
valueEnded = true
case _ if !validNumChars(c) =>
}
case ContextNull =>
if (c == 'l') {
context = ContextNullAfterFirstL
}
case ContextNullAfterFirstL =>
if (c == 'l') {
context = ContextJson
valueEnded = true
case _ =>
}
case _ =>
c match {
case '{' | '[' =>
depth += 1
case '}' | ']' =>
depth -= 1
valueEnded = true
case '"' =>
context = ContextString
case 't' | 'f' =>
context = ContextBoolean
case 'n' =>
context = ContextNull
case x if validNumChars(x) =>
context = ContextNumber
case _ =>
}
}
if (depth > 0 || context != ContextJson || valueEnded)
stringBuilder.append(c)
}
case ContextNumber =>
c match {
case '}' | ']' =>
depth -= 1
context = if (depth < 0) ContextDone else ContextJson
valueEnded = true
case _ if !validNumChars(c) =>
context = ContextJson
valueEnded = true
case _ =>
}
case ContextDone => // no more values, ignore everything
case _ =>
c match {
case '{' | '[' =>
depth += 1
case '}' | ']' =>
depth -= 1
valueEnded = true
if (depth == -1) context = ContextDone
case '"' =>
context = ContextString
case 't' | 'f' =>
context = ContextBoolean
case 'n' =>
context = ContextNull
case x if validNumChars(x) =>
context = ContextNumber
case _ =>
}
}
if (context != ContextDone && (depth > 0 || context != ContextJson || valueEnded))
stringBuilder.append(c)

if (valueEnded && depth == 0) {
val str = stringBuilder.result()
if (!str.forall(_.isWhitespace)) chunkBuilder += str
stringBuilder.clear()
if (valueEnded && depth == 0) {
val str = stringBuilder.result()
if (!str.forall(_.isWhitespace)) {
chunkBuilder += str
}
stringBuilder.clear()
}
}
chunkBuilder.result()
}
chunkBuilder.result()
}

lazy val loop: ZChannel[Any, ZNothing, Chunk[String], Any, Nothing, Chunk[String], Any] =
ZChannel.readWithCause(
in => {
val out = fetchChunk(in)
if (out.isEmpty) loop else ZChannel.write(out) *> loop
},
err =>
if (stringBuilder.isEmpty) ZChannel.refailCause(err)
else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.refailCause(err),
done =>
if (stringBuilder.isEmpty) ZChannel.succeed(done)
else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.succeed(done)
)
lazy val loop: ZChannel[Any, ZNothing, Chunk[String], Any, Nothing, Chunk[String], Any] =
ZChannel.readWithCause(
in => {
val out = fetchChunk(in)
if (out.isEmpty) loop else ZChannel.write(out) *> loop
},
err =>
if (stringBuilder.isEmpty) ZChannel.refailCause(err)
else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.refailCause(err),
done =>
if (stringBuilder.isEmpty) ZChannel.succeed(done)
else ZChannel.write(Chunk.single(stringBuilder.result())) *> ZChannel.succeed(done)
)

ZPipeline.fromChannel(loop)
}
ZPipeline.fromChannel(loop)
}
}

val splitOnJsonBoundary: ZPipeline[Any, Nothing, String, String] = JsonSplitter.jsonSplitter(wrappedInArray = false)
val splitJsonArrayElements: ZPipeline[Any, Nothing, String, String] = JsonSplitter.jsonSplitter(wrappedInArray = true)

def schemaBasedBinaryCodec[A](cfg: Config)(implicit schema: Schema[A]): BinaryCodec[A] =
new BinaryCodec[A] {
override def decode(whole: Chunk[Byte]): Either[DecodeError, A] =
Expand All @@ -180,7 +189,7 @@ object JsonCodec {

override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] =
ZPipeline.utfDecode.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage)) >>>
splitOnJsonBoundary >>>
(if (cfg.treatStreamsAsArrays) splitJsonArrayElements else splitOnJsonBoundary) >>>
ZPipeline.mapZIO { (s: String) =>
ZIO.fromEither(JsonDecoder.decode(schema, s))
}
Expand All @@ -189,7 +198,17 @@ object JsonCodec {
JsonEncoder.encode(schema, value, cfg)

override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] =
ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks
if (cfg.treatStreamsAsArrays) {
val interspersed: ZPipeline[Any, Nothing, A, Byte] = ZPipeline
.mapChunks[A, Chunk[Byte]](_.map(encode))
.intersperse(Chunk.single(','.toByte))
.flattenChunks
val prepended: ZPipeline[Any, Nothing, A, Byte] =
interspersed >>> ZPipeline.prepend(Chunk.single('['.toByte))
prepended >>> ZPipeline.append(Chunk.single(']'.toByte))
} else {
ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks
}
}

def jsonEncoder[A](schema: Schema[A]): ZJsonEncoder[A] =
Expand Down
Loading
Loading