Skip to content

Commit

Permalink
added API to allow encoding and decoding to/from Generic record, to a…
Browse files Browse the repository at this point in the history
…llow integration with Schema Registry (AWS Glue)
  • Loading branch information
devsprint committed Aug 22, 2023
1 parent 26bb289 commit 8122cbc
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ import zio.{ Chunk, Unsafe, ZIO }

object AvroCodec {

implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): BinaryCodec[A] =
new BinaryCodec[A] {
trait ExtendedBinaryCodec[A] extends BinaryCodec[A] {
def encodeGenericRecord(value: A)(implicit schema: Schema[A]): GenericData.Record
def decodeGenericRecord(value: GenericRecord)(implicit schema: Schema[A]): Either[DecodeError, A]
}

implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): ExtendedBinaryCodec[A] =
new ExtendedBinaryCodec[A] {

val avroSchema: SchemaAvro =
AvroSchemaCodec.encodeToApacheAvro(schema).getOrElse(throw new Exception("Avro schema could not be generated."))
Expand Down Expand Up @@ -59,6 +64,12 @@ object AvroCodec {
decode(chunk).map(Chunk(_))
)
}

override def encodeGenericRecord(value: A)(implicit schema: Schema[A]): GenericData.Record =
encodeValue(value, schema).asInstanceOf[GenericData.Record]

override def decodeGenericRecord(value: GenericRecord)(implicit schema: Schema[A]): Either[DecodeError, A] =
decodeValue(value, schema)
}

private def decodeValue[A](raw: Any, schema: Schema[A]): Either[DecodeError, A] = schema match {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package zio.schema.codec

import org.apache.avro.generic.GenericData

import java.math.BigInteger
import java.time.{
DayOfWeek,
Expand All @@ -19,7 +21,6 @@ import java.time.{
ZonedDateTime
}
import java.util.UUID

import zio._
import zio.schema.{ DeriveSchema, Schema }
import zio.stream.ZStream
Expand Down Expand Up @@ -138,7 +139,8 @@ object AvroCodecSpec extends ZIOSpecDefault {
sequenceDecoderSpec,
genericRecordDecoderSpec,
enumDecoderSpec,
streamEncodingDecodingSpec
streamEncodingDecodingSpec,
genericRecordEncodeDecodeSpec
)

private val primitiveEncoderSpec = suite("Avro Codec - Encoder primitive spec")(
Expand Down Expand Up @@ -687,4 +689,13 @@ object AvroCodecSpec extends ZIOSpecDefault {

})

private val genericRecordEncodeDecodeSpec = suite("AvroCodec - encode/decode Generic Record")(
test("Encode/Decode") {
val codec = AvroCodec.schemaBasedBinaryCodec[Record]
val generic: GenericData.Record = codec.encodeGenericRecord(Record("John", 42))
val result = codec.decodeGenericRecord(generic)
assertTrue(result == Right(Record("John", 42)))
}
)

}

0 comments on commit 8122cbc

Please sign in to comment.