From 620d57d876be3dc4c3a6e65f7818b43e04c95ebf Mon Sep 17 00:00:00 2001 From: Gabriel Ciuloaica Date: Tue, 22 Aug 2023 16:29:43 +0300 Subject: [PATCH] =?UTF-8?q?added=20API=20to=20allow=20encoding=20and=20dec?= =?UTF-8?q?oding=20to/from=20Generic=20record,=20to=20a=E2=80=A6=20(#573)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * added API to allow encoding and decoding to/from Generic record, to allow integration with Schema Registry (AWS Glue) * fixed imports * added API to allow encoding and decoding to/from Generic record, to allow integration with Schema Registry (AWS Glue) * fixed imports * fix: fixed README --------- Co-authored-by: John A. De Goes --- .../main/scala/zio/schema/codec/AvroCodec.scala | 15 +++++++++++++-- .../scala-2/zio/schema/codec/AvroCodecSpec.scala | 14 +++++++++++++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/zio-schema-avro/shared/src/main/scala/zio/schema/codec/AvroCodec.scala b/zio-schema-avro/shared/src/main/scala/zio/schema/codec/AvroCodec.scala index 6caec830c..4359d0a21 100644 --- a/zio-schema-avro/shared/src/main/scala/zio/schema/codec/AvroCodec.scala +++ b/zio-schema-avro/shared/src/main/scala/zio/schema/codec/AvroCodec.scala @@ -25,8 +25,13 @@ import zio.{ Chunk, Unsafe, ZIO } object AvroCodec { - implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): BinaryCodec[A] = - new BinaryCodec[A] { + trait ExtendedBinaryCodec[A] extends BinaryCodec[A] { + def encodeGenericRecord(value: A)(implicit schema: Schema[A]): GenericData.Record + def decodeGenericRecord(value: GenericRecord)(implicit schema: Schema[A]): Either[DecodeError, A] + } + + implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): ExtendedBinaryCodec[A] = + new ExtendedBinaryCodec[A] { val avroSchema: SchemaAvro = AvroSchemaCodec.encodeToApacheAvro(schema).getOrElse(throw new Exception("Avro schema could not be generated.")) @@ -59,6 +64,12 @@ object AvroCodec { decode(chunk).map(Chunk(_)) ) } + + override def encodeGenericRecord(value: A)(implicit schema: Schema[A]): GenericData.Record = + encodeValue(value, schema).asInstanceOf[GenericData.Record] + + override def decodeGenericRecord(value: GenericRecord)(implicit schema: Schema[A]): Either[DecodeError, A] = + decodeValue(value, schema) } private def decodeValue[A](raw: Any, schema: Schema[A]): Either[DecodeError, A] = schema match { diff --git a/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala b/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala index d0796b28b..8f1526aaf 100644 --- a/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala +++ b/zio-schema-avro/shared/src/test/scala-2/zio/schema/codec/AvroCodecSpec.scala @@ -20,6 +20,8 @@ import java.time.{ } import java.util.UUID +import org.apache.avro.generic.GenericData + import zio._ import zio.schema.codec.AvroAnnotations.avroEnum import zio.schema.{ DeriveSchema, Schema } @@ -140,7 +142,8 @@ object AvroCodecSpec extends ZIOSpecDefault { sequenceDecoderSpec, genericRecordDecoderSpec, enumDecoderSpec, - streamEncodingDecodingSpec + streamEncodingDecodingSpec, + genericRecordEncodeDecodeSpec ) private val primitiveEncoderSpec = suite("Avro Codec - Encoder primitive spec")( @@ -695,4 +698,13 @@ object AvroCodecSpec extends ZIOSpecDefault { }) + private val genericRecordEncodeDecodeSpec = suite("AvroCodec - encode/decode Generic Record")( + test("Encode/Decode") { + val codec = AvroCodec.schemaBasedBinaryCodec[Record] + val generic: GenericData.Record = codec.encodeGenericRecord(Record("John", 42)) + val result = codec.decodeGenericRecord(generic) + assertTrue(result == Right(Record("John", 42))) + } + ) + }