From 0bc4564d900fa2d76eb8f91a55cd5b3cd7ebf84b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Mon, 3 Jul 2023 18:09:50 +0200 Subject: [PATCH 01/10] rename Opt to ClientOpt --- pkg/sr/client.go | 2 +- pkg/sr/config.go | 32 ++++++++++++++++---------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/pkg/sr/client.go b/pkg/sr/client.go index 1c4b1147..4701cd9f 100644 --- a/pkg/sr/client.go +++ b/pkg/sr/client.go @@ -63,7 +63,7 @@ type Client struct { } // NewClient returns a new schema registry client. -func NewClient(opts ...Opt) (*Client, error) { +func NewClient(opts ...ClientOpt) (*Client, error) { cl := &Client{ urls: []string{"http://localhost:8081"}, httpcl: &http.Client{Timeout: 5 * time.Second}, diff --git a/pkg/sr/config.go b/pkg/sr/config.go index db1570c4..70f5d9a9 100644 --- a/pkg/sr/config.go +++ b/pkg/sr/config.go @@ -9,30 +9,30 @@ import ( ) type ( - // Opt is an option to configure a client. - Opt interface{ apply(*Client) } - opt struct{ fn func(*Client) } + // ClientOpt is an option to configure a client. + ClientOpt interface{ apply(*Client) } + clientOpt struct{ fn func(*Client) } ) -func (o opt) apply(cl *Client) { o.fn(cl) } +func (o clientOpt) apply(cl *Client) { o.fn(cl) } // HTTPClient sets the http client that the schema registry client uses, // overriding the default client that speaks plaintext with a timeout of 5s. -func HTTPClient(httpcl *http.Client) Opt { - return opt{func(cl *Client) { cl.httpcl = httpcl }} +func HTTPClient(httpcl *http.Client) ClientOpt { + return clientOpt{func(cl *Client) { cl.httpcl = httpcl }} } // UserAgent sets the User-Agent to use in requests, overriding the default // "franz-go". -func UserAgent(ua string) Opt { - return opt{func(cl *Client) { cl.ua = ua }} +func UserAgent(ua string) ClientOpt { + return clientOpt{func(cl *Client) { cl.ua = ua }} } // URLs sets the URLs that the client speaks to, overriding the default // http://localhost:8081. This option automatically prefixes any URL that is // missing an http:// or https:// prefix with http://. -func URLs(urls ...string) Opt { - return opt{func(cl *Client) { +func URLs(urls ...string) ClientOpt { + return clientOpt{func(cl *Client) { for i, u := range urls { if strings.HasPrefix(u, "http://") || strings.HasPrefix(u, "https://") { continue @@ -45,8 +45,8 @@ func URLs(urls ...string) Opt { } // DialTLSConfig sets a tls.Config to use in the default http client. -func DialTLSConfig(c *tls.Config) Opt { - return opt{func(cl *Client) { +func DialTLSConfig(c *tls.Config) ClientOpt { + return clientOpt{func(cl *Client) { cl.httpcl = &http.Client{ Timeout: 5 * time.Second, Transport: &http.Transport{ @@ -71,13 +71,13 @@ func DialTLSConfig(c *tls.Config) Opt { // getting or creating schemas. This can help collapse duplicate schemas into // one, but can also be done with a configuration parameter on the schema // registry itself. -func Normalize() Opt { - return opt{func(cl *Client) { cl.normalize = true }} +func Normalize() ClientOpt { + return clientOpt{func(cl *Client) { cl.normalize = true }} } // BasicAuth sets basic authorization to use for every request. -func BasicAuth(user, pass string) Opt { - return opt{func(cl *Client) { +func BasicAuth(user, pass string) ClientOpt { + return clientOpt{func(cl *Client) { cl.basicAuth = &struct { user string pass string From ad5657f45d8d665b8b0a28804e6ef467cdcc5496 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Mon, 3 Jul 2023 18:49:17 +0200 Subject: [PATCH 02/10] rename SerdeOpt to EncodingOpt, introduce different SerdeOpt --- pkg/sr/serde.go | 80 ++++++++++++++++++++++++++++++-------------- pkg/sr/serde_test.go | 3 +- 2 files changed, 55 insertions(+), 28 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 3f95af01..75285a13 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -26,33 +26,52 @@ var ( ) type ( - // SerdeOpt is an option to configure a Serde. - SerdeOpt interface{ apply(*tserde) } - serdeOpt struct{ fn func(*tserde) } + // EncodingOpt is an option to configure the behavior of Serde.Encode and + // Serde.Decode. + EncodingOpt interface { + serdeOrEncodingOpt() + apply(*tserde) + } + encodingOpt struct{ fn func(*tserde) } + + // SerdeOpt is an option to configure Serde. + SerdeOpt interface { + serdeOrEncodingOpt() + apply(*Serde) + } + serdeOpt struct{ fn func(serde *Serde) } + + SerdeOrEncodingOpt interface { + serdeOrEncodingOpt() + } ) -func (o serdeOpt) apply(t *tserde) { o.fn(t) } +func (o serdeOpt) serdeOrEncodingOpt() { /* satisfy interface */ } +func (o serdeOpt) apply(s *Serde) { o.fn(s) } + +func (o encodingOpt) serdeOrEncodingOpt() { /* satisfy interface */ } +func (o encodingOpt) apply(t *tserde) { o.fn(t) } // EncodeFn allows Serde to encode a value. -func EncodeFn(fn func(any) ([]byte, error)) SerdeOpt { - return serdeOpt{func(t *tserde) { t.encode = fn }} +func EncodeFn(fn func(any) ([]byte, error)) EncodingOpt { + return encodingOpt{func(t *tserde) { t.encode = fn }} } // AppendEncodeFn allows Serde to encode a value to an existing slice. This // can be more efficient than EncodeFn; this function is used if it exists. -func AppendEncodeFn(fn func([]byte, any) ([]byte, error)) SerdeOpt { - return serdeOpt{func(t *tserde) { t.appendEncode = fn }} +func AppendEncodeFn(fn func([]byte, any) ([]byte, error)) EncodingOpt { + return encodingOpt{func(t *tserde) { t.appendEncode = fn }} } // DecodeFn allows Serde to decode into a value. -func DecodeFn(fn func([]byte, any) error) SerdeOpt { - return serdeOpt{func(t *tserde) { t.decode = fn }} +func DecodeFn(fn func([]byte, any) error) EncodingOpt { + return encodingOpt{func(t *tserde) { t.decode = fn }} } // GenerateFn returns a new(Value) that can be decoded into. This function can // be used to control the instantiation of a new type for DecodeNew. -func GenerateFn(fn func() any) SerdeOpt { - return serdeOpt{func(t *tserde) { t.gen = fn }} +func GenerateFn(fn func() any) EncodingOpt { + return encodingOpt{func(t *tserde) { t.gen = fn }} } // Index attaches a message index to a value. A single schema ID can be @@ -67,8 +86,8 @@ func GenerateFn(fn func() any) SerdeOpt { // For more information, see where `message-indexes` are described in: // // https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format -func Index(index ...int) SerdeOpt { - return serdeOpt{func(t *tserde) { t.index = index }} +func Index(index ...int) EncodingOpt { + return encodingOpt{func(t *tserde) { t.index = index }} } type tserde struct { @@ -100,7 +119,7 @@ type Serde struct { types atomic.Value // map[reflect.Type]tserde mu sync.Mutex - defaults []SerdeOpt + defaults []EncodingOpt } var ( @@ -108,6 +127,25 @@ var ( noTypes = make(map[reflect.Type]tserde) ) +// NewSerde returns a new Serde using the supplied default options, which are +// applied to every registered type. These options are always applied first, so +// you can override them as necessary when registering. +// +// This can be useful if you always want to use the same encoding or decoding +// functions. +func NewSerde(opts ...SerdeOrEncodingOpt) *Serde { + var s Serde + for _, opt := range opts { + switch opt := opt.(type) { + case SerdeOpt: + opt.apply(&s) + case EncodingOpt: + s.defaults = append(s.defaults, opt) + } + } + return &s +} + func (s *Serde) loadIDs() map[int]tserde { ids := s.ids.Load() if ids == nil { @@ -124,20 +162,10 @@ func (s *Serde) loadTypes() map[reflect.Type]tserde { return types.(map[reflect.Type]tserde) } -// SetDefaults sets default options to apply to every registered type. These -// options are always applied first, so you can override them as necessary when -// registering. -// -// This can be useful if you always want to use the same encoding or decoding -// functions. -func (s *Serde) SetDefaults(opts ...SerdeOpt) { - s.defaults = opts -} - // Register registers a schema ID and the value it corresponds to, as well as // the encoding or decoding functions. You need to register functions depending // on whether you are only encoding, only decoding, or both. -func (s *Serde) Register(id int, v any, opts ...SerdeOpt) { +func (s *Serde) Register(id int, v any, opts ...EncodingOpt) { var t tserde for _, opt := range s.defaults { opt.apply(&t) diff --git a/pkg/sr/serde_test.go b/pkg/sr/serde_test.go index 8667a0d2..e0ce2057 100644 --- a/pkg/sr/serde_test.go +++ b/pkg/sr/serde_test.go @@ -36,8 +36,7 @@ func TestSerde(t *testing.T) { } ) - var serde Serde - serde.SetDefaults( + serde := NewSerde( EncodeFn(json.Marshal), DecodeFn(json.Unmarshal), ) From 7544ce42fcfe202be41c19ef923a4952c645e6be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Mon, 3 Jul 2023 20:33:16 +0200 Subject: [PATCH 03/10] Add EncodingOpt to Encode and related methods, introduce ID option --- pkg/sr/serde.go | 146 ++++++++++++++++++++++++++++++++++++++----- pkg/sr/serde_test.go | 28 +++++++++ 2 files changed, 158 insertions(+), 16 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 75285a13..2df4156b 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -34,13 +34,14 @@ type ( } encodingOpt struct{ fn func(*tserde) } - // SerdeOpt is an option to configure Serde. + // SerdeOpt is an option to configure a Serde. SerdeOpt interface { serdeOrEncodingOpt() apply(*Serde) } serdeOpt struct{ fn func(serde *Serde) } + // SerdeOrEncodingOpt is either a SerdeOpt or EncodingOpt. SerdeOrEncodingOpt interface { serdeOrEncodingOpt() } @@ -74,6 +75,31 @@ func GenerateFn(fn func() any) EncodingOpt { return encodingOpt{func(t *tserde) { t.gen = fn }} } +// idOpt is a special encodingOpt that allows for specifying the ID when +// encoding a value using Serde.Encode. +type idOpt struct{ encodingOpt } + +func (o idOpt) ID() uint32 { + var t tserde + o.apply(&t) + return t.id +} + +// ID forces Serde.Encode to use the specified schema ID. +func ID(id int) EncodingOpt { + return idOpt{encodingOpt{func(opts *tserde) { opts.id = uint32(id) }}} +} + +// indexOpt is a special encodingOpt that allows for specifying the index when +// encoding a value using Serde.Encode. +type indexOpt struct{ encodingOpt } + +func (o indexOpt) Index() []int { + var t tserde + o.apply(&t) + return t.index +} + // Index attaches a message index to a value. A single schema ID can be // registered multiple times with different indices. // @@ -87,7 +113,12 @@ func GenerateFn(fn func() any) EncodingOpt { // // https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format func Index(index ...int) EncodingOpt { - return encodingOpt{func(t *tserde) { t.index = index }} + return indexOpt{encodingOpt{func(t *tserde) { t.index = index }}} +} + +// Header defines the SerdeHeader used to encode and decode the message header. +func Header( /* TODO header arg */ ) SerdeOpt { + return serdeOpt{func(s *Serde) { /* TODO set header */ }} } type tserde struct { @@ -173,6 +204,10 @@ func (s *Serde) Register(id int, v any, opts ...EncodingOpt) { for _, opt := range opts { opt.apply(&t) } + if id > 0 { + // The explicitly supplied id takes precedence over an ID EncodingOpt. + t.id = uint32(id) + } typeof := reflect.TypeOf(v) @@ -192,13 +227,13 @@ func (s *Serde) Register(id int, v any, opts ...EncodingOpt) { }() // For IDs, we deeply clone any path that is changing. - m := tserdeMapClone(s.loadIDs(), id, t.index) + m := tserdeMapClone(s.loadIDs(), int(t.id), t.index) s.ids.Store(m) // Now we have a full path down index initialized (or, the top // level map if there is no index). We iterate down the index // tree to find the end node we are initializing. - k := id + k := int(t.id) at := m[k] for _, idx := range t.index { m = at.subindex @@ -214,7 +249,7 @@ func (s *Serde) Register(id int, v any, opts ...EncodingOpt) { // Now, we initialize the end node. t = tserde{ - id: uint32(id), + id: t.id, exists: true, encode: t.encode, appendEncode: t.appendEncode, @@ -249,17 +284,17 @@ func tserdeMapClone(m map[int]tserde, at int, index []int) map[int]tserde { // Encode encodes a value according to the schema registry wire format and // returns it. If EncodeFn was not used, this returns ErrNotRegistered. -func (s *Serde) Encode(v any) ([]byte, error) { - return s.AppendEncode(nil, v) +func (s *Serde) Encode(v any, opts ...EncodingOpt) ([]byte, error) { + return s.AppendEncode(nil, v, opts...) } // AppendEncode appends an encoded value to b according to the schema registry // wire format and returns it. If EncodeFn was not used, this returns // ErrNotRegistered. -func (s *Serde) AppendEncode(b []byte, v any) ([]byte, error) { - t, ok := s.loadTypes()[reflect.TypeOf(v)] - if !ok || (t.encode == nil && t.appendEncode == nil) { - return b, ErrNotRegistered +func (s *Serde) AppendEncode(b []byte, v any, opts ...EncodingOpt) ([]byte, error) { + t, err := s.encodeFind(v, opts) + if err != nil { + return nil, ErrNotRegistered } b = append(b, @@ -291,10 +326,45 @@ func (s *Serde) AppendEncode(b []byte, v any) ([]byte, error) { return append(b, encoded...), nil } +func (s *Serde) encodeFind(v any, opts []EncodingOpt) (tserde, error) { + optsToApply, idopt, indexopt, err := s.filterIDAndIndexOpt(opts) + if err != nil { + return tserde{}, err + } + + var t tserde + if idopt != nil { + // Load tserde based on the supplied ID. + t = s.loadIDs()[int(idopt.ID())] + // Traverse to the right index, if Index option is supplied. + if indexopt != nil { + for _, i := range indexopt.Index() { + if len(t.subindex) <= i { + return tserde{}, ErrNotRegistered + } + t = t.subindex[i] + } + } + } else { + // Load tserde based on the registered type. + t = s.loadTypes()[reflect.TypeOf(v)] + } + + // Check if we loaded a valid tserde. + if !t.exists || (t.encode == nil && t.appendEncode == nil) { + return tserde{}, ErrNotRegistered + } + + for _, opt := range optsToApply { + opt.apply(&t) + } + return t, nil +} + // MustEncode returns the value of Encode, panicking on error. This is a // shortcut for if your encode function cannot error. -func (s *Serde) MustEncode(v any) []byte { - b, err := s.Encode(v) +func (s *Serde) MustEncode(v any, opts ...EncodingOpt) []byte { + b, err := s.Encode(v, opts...) if err != nil { panic(err) } @@ -303,8 +373,8 @@ func (s *Serde) MustEncode(v any) []byte { // MustAppendEncode returns the value of AppendEncode, panicking on error. // This is a shortcut for if your encode function cannot error. -func (s *Serde) MustAppendEncode(b []byte, v any) []byte { - b, err := s.AppendEncode(b, v) +func (s *Serde) MustAppendEncode(b []byte, v any, opts ...EncodingOpt) []byte { + b, err := s.AppendEncode(b, v, opts...) if err != nil { panic(err) } @@ -337,8 +407,10 @@ func (s *Serde) DecodeNew(b []byte) (any, error) { var v any if t.gen != nil { v = t.gen() - } else { + } else if t.typeof != nil { v = reflect.New(t.typeof).Interface() + } else { + return nil, ErrNotRegistered } return v, t.decode(b, v) } @@ -375,6 +447,48 @@ func (s *Serde) decodeFind(b []byte) ([]byte, tserde, error) { return b, t, nil } +func (s *Serde) filterIDAndIndexOpt(opts []EncodingOpt) ([]EncodingOpt, *idOpt, *indexOpt, error) { + var ( + idopt *idOpt + indexopt *indexOpt + ignoredIndices []int + ) + + // Find ID and Index options. In case multiple ID or Index options are + // supplied, only the last one is applied. + for i, opt := range opts { + switch opt := opt.(type) { + case idOpt: + idopt = &opt + case indexOpt: + indexopt = &opt + default: + continue + } + // Collect indices of all idOpt and indexOpt options. + ignoredIndices = append(ignoredIndices, i) + } + + if idopt == nil && indexopt == nil { + return opts, nil, nil, nil // no idOpt or indexOpt found + } else if idopt == nil && indexopt != nil { + return nil, nil, nil, errors.New("invalid use of encoding options: option Index is only allowed alongside option ID") + } else if len(ignoredIndices) == len(opts) { + return nil, idopt, indexopt, nil // all opts were filtered + } + + filteredOpts := make([]EncodingOpt, len(opts)-len(ignoredIndices)) + j := 0 + for i, opt := range opts { + if j < len(ignoredIndices) && i == ignoredIndices[j] { + j++ + continue + } + filteredOpts[i-j] = opt + } + return filteredOpts, idopt, indexopt, nil +} + type bReader struct{ b []byte } func (b *bReader) ReadByte() (byte, error) { diff --git a/pkg/sr/serde_test.go b/pkg/sr/serde_test.go index e0ce2057..fdefcf89 100644 --- a/pkg/sr/serde_test.go +++ b/pkg/sr/serde_test.go @@ -53,6 +53,9 @@ func TestSerde(t *testing.T) { serde.Register(3, idx4{}, Index(0, 0, 1)) serde.Register(3, idx3{}, Index(0, 0)) serde.Register(5, oneidx{}, Index(0), GenerateFn(func() any { return &oneidx{Foo: "defoo", Bar: "debar"} })) + serde.Register(0, nil, ID(100), Index(0), EncodeFn(func(v any) ([]byte, error) { + return json.MarshalIndent(v, "", " ") + })) for i, test := range []struct { enc any @@ -90,6 +93,10 @@ func TestSerde(t *testing.T) { expDec: oneidx{Foo: "defoo", Bar: "bar"}, }, } { + if _, err := serde.Encode(test.enc, ID(99)); err != ErrNotRegistered { + t.Errorf("got %v != exp ErrNotRegistered", err) + } + b, err := serde.Encode(test.enc) gotErr := err != nil if gotErr != test.expErr { @@ -112,6 +119,13 @@ func TestSerde(t *testing.T) { t.Errorf("#%d got MustAppendEncode(%v) != Encode(foo%v)", i, b2, b) } + bIndented := serde.MustEncode(test.enc, ID(100), Index(0)) + if i := bytes.IndexByte(bIndented, '{'); !bytes.Equal(bIndented[:i], []byte{0, 0, 0, 0, 100, 0}) { + t.Errorf("#%d got Encode[ID=100](%v) != exp(%v)", i, bIndented[:i], []byte{0, 0, 0, 0, 100, 0}) + } else if expIndented := extractIndentedJSON(b); !bytes.Equal(bIndented[i:], expIndented) { + t.Errorf("#%d got Encode[ID=100](%v) != exp(%v)", i, bIndented[i:], expIndented) + } + v, err := serde.DecodeNew(b) if err != nil { t.Errorf("#%d DecodeNew: got unexpected err %v", i, err) @@ -137,4 +151,18 @@ func TestSerde(t *testing.T) { if _, err := serde.DecodeNew([]byte{0, 0, 0, 0, 99}); err != ErrNotRegistered { t.Errorf("got %v != exp ErrNotRegistered", err) } + if _, err := serde.DecodeNew([]byte{0, 0, 0, 0, 100, 0}); err != ErrNotRegistered { + // schema is registered but type is unknown + t.Errorf("got %v != exp ErrNotRegistered", err) + } +} + +func extractIndentedJSON(in []byte) []byte { + i := bytes.IndexByte(in, '{') // skip header + var out bytes.Buffer + err := json.Indent(&out, in[i:], "", " ") + if err != nil { + panic(err) + } + return out.Bytes() } From 1ade586c0fcdcde72a6c81cd6dbe250d432e9dbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Mon, 3 Jul 2023 20:34:10 +0200 Subject: [PATCH 04/10] Add EncodingOpt to Decode and related methods --- pkg/sr/serde.go | 79 +++++++++++++++++++++++++++++--------------- pkg/sr/serde_test.go | 17 ++++++++++ 2 files changed, 70 insertions(+), 26 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 2df4156b..edd4e3ea 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -387,8 +387,8 @@ func (s *Serde) MustAppendEncode(b []byte, v any, opts ...EncodingOpt) []byte { // Serde does not handle references in schemas; it is up to you to register the // full decode function for any top-level ID, regardless of how many other // schemas are referenced in top-level ID. -func (s *Serde) Decode(b []byte, v any) error { - b, t, err := s.decodeFind(b) +func (s *Serde) Decode(b []byte, v any, opts ...EncodingOpt) error { + b, t, err := s.decodeFind(b, opts) if err != nil { return err } @@ -399,8 +399,8 @@ func (s *Serde) Decode(b []byte, v any) error { // the input value. If DecodeFn was not used, this returns ErrNotRegistered. // GenerateFn can be used to control the instantiation of a new value, // otherwise this uses reflect.New(reflect.TypeOf(v)).Interface(). -func (s *Serde) DecodeNew(b []byte) (any, error) { - b, t, err := s.decodeFind(b) +func (s *Serde) DecodeNew(b []byte, opts ...EncodingOpt) (any, error) { + b, t, err := s.decodeFind(b, opts) if err != nil { return nil, err } @@ -415,34 +415,61 @@ func (s *Serde) DecodeNew(b []byte) (any, error) { return v, t.decode(b, v) } -func (s *Serde) decodeFind(b []byte) ([]byte, tserde, error) { - if len(b) < 5 || b[0] != 0 { - return nil, tserde{}, ErrBadHeader +func (s *Serde) decodeFind(b []byte, opts []EncodingOpt) ([]byte, tserde, error) { + optsToApply, idopt, indexopt, err := s.filterIDAndIndexOpt(opts) + if err != nil { + return nil, tserde{}, err } - id := binary.BigEndian.Uint32(b[1:5]) - b = b[5:] - t := s.loadIDs()[int(id)] - if len(t.subindex) > 0 { - r := bReader{b} - br := io.ByteReader(&r) - l, err := binary.ReadVarint(br) - if l == 0 { // length 0 is a shortcut for length 1, index 0 - t = t.subindex[0] + var t tserde + if idopt != nil { + // Load tserde based on the supplied ID. + t = s.loadIDs()[int(idopt.ID())] + // Traverse to the right index, if Index option is supplied. + if indexopt != nil { + for _, i := range indexopt.Index() { + if len(t.subindex) <= i { + return nil, tserde{}, ErrNotRegistered + } + t = t.subindex[i] + } } - for err == nil && t.subindex != nil && l > 0 { - var idx int64 - idx, err = binary.ReadVarint(br) - t = t.subindex[int(idx)] - l-- + } else { + // Load tserde based on the header + if len(b) < 5 || b[0] != 0 { + return nil, tserde{}, ErrBadHeader } - if err != nil { - return nil, t, err + id := binary.BigEndian.Uint32(b[1:5]) + b = b[5:] + + t = s.loadIDs()[int(id)] + if len(t.subindex) > 0 { + r := bReader{b} + br := io.ByteReader(&r) + l, err := binary.ReadVarint(br) + if l == 0 { // length 0 is a shortcut for length 1, index 0 + t = t.subindex[0] + } + for err == nil && t.subindex != nil && l > 0 { + var idx int64 + idx, err = binary.ReadVarint(br) + t = t.subindex[int(idx)] + l-- + } + if err != nil { + return nil, t, err + } + b = r.b } - b = r.b } - if !t.exists { - return nil, t, ErrNotRegistered + + // Check if we loaded a valid tserde. + if !t.exists || t.decode == nil { + return nil, tserde{}, ErrNotRegistered + } + + for _, opt := range optsToApply { + opt.apply(&t) } return b, t, nil } diff --git a/pkg/sr/serde_test.go b/pkg/sr/serde_test.go index fdefcf89..40c9e834 100644 --- a/pkg/sr/serde_test.go +++ b/pkg/sr/serde_test.go @@ -62,6 +62,7 @@ func TestSerde(t *testing.T) { expEnc []byte expDec any expErr bool + expMap map[string]any }{ { enc: overridden{}, @@ -70,27 +71,33 @@ func TestSerde(t *testing.T) { { enc: overrides{"foo"}, expEnc: append([]byte{0, 0, 0, 0, 127}, `{"one":"foo"}`...), + expMap: map[string]any{"one": "foo"}, }, { enc: idx1{Two: 2, Three: 3}, expEnc: append([]byte{0, 0, 0, 0, 3, 0}, `{"two":2,"three":3}`...), + expMap: map[string]any{"two": float64(2), "three": float64(3)}, }, { enc: idx2{Biz: "bizzy", Baz: "bazzy"}, expEnc: append([]byte{0, 0, 0, 0, 3, 2, 2}, `{"biz":"bizzy","baz":"bazzy"}`...), + expMap: map[string]any{"biz": "bizzy", "baz": "bazzy"}, }, { enc: idx3{Boz: 8}, expEnc: append([]byte{0, 0, 0, 0, 3, 4, 0, 0}, `{"boz":8}`...), + expMap: map[string]any{"boz": float64(8)}, }, { enc: idx4{Bingo: "bango"}, expEnc: append([]byte{0, 0, 0, 0, 3, 6, 0, 0, 2}, `{"bingo":"bango"}`...), + expMap: map[string]any{"bingo": "bango"}, }, { enc: oneidx{Bar: "bar"}, expEnc: append([]byte{0, 0, 0, 0, 5, 0}, `{"bar":"bar"}`...), expDec: oneidx{Foo: "defoo", Bar: "bar"}, + expMap: map[string]any{"bar": "bar"}, }, } { if _, err := serde.Encode(test.enc, ID(99)); err != ErrNotRegistered { @@ -139,6 +146,16 @@ func TestSerde(t *testing.T) { } if !reflect.DeepEqual(v, exp) { t.Errorf("#%d round trip: got %v != exp %v", i, v, exp) + continue + } + + gotMap, err := serde.DecodeNew(b[bytes.IndexByte(b, '{'):], ID(100), Index(0), GenerateFn(func() any { return new(map[string]any) })) + if err != nil { + t.Errorf("#%d DecodeNew: got unexpected err %v", i, err) + } + if !reflect.DeepEqual(gotMap, &test.expMap) { + t.Errorf("#%d decode new map: got %v != exp %v", i, gotMap, test.expMap) + continue } } From 1bb340b8b288afa4d8ed2e2879a5021c751aebfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Sun, 27 Aug 2023 14:15:07 +0200 Subject: [PATCH 05/10] serde header option --- pkg/sr/serde.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 1d6335e7..f445f8a1 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -112,8 +112,8 @@ func Index(index ...int) EncodingOpt { } // Header defines the SerdeHeader used to encode and decode the message header. -func Header( /* TODO header arg */ ) SerdeOpt { - return serdeOpt{func(s *Serde) { /* TODO set header */ }} +func Header(header SerdeHeader) SerdeOpt { + return serdeOpt{func(s *Serde) { s.h = header }} } type tserde struct { From 8aaf14f401e1bf4541919c8fda60cf6e7598d3d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Sun, 27 Aug 2023 14:45:02 +0200 Subject: [PATCH 06/10] introduce DynEncode and DynAppendEncode, simplify options --- pkg/sr/serde.go | 234 ++++++++++++++----------------------------- pkg/sr/serde_test.go | 13 +-- 2 files changed, 78 insertions(+), 169 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index f445f8a1..d1fff2d8 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -70,29 +70,9 @@ func GenerateFn(fn func() any) EncodingOpt { return encodingOpt{func(t *tserde) { t.gen = fn }} } -// idOpt is a special encodingOpt that allows for specifying the ID when -// encoding a value using Serde.Encode. -type idOpt struct{ encodingOpt } - -func (o idOpt) ID() uint32 { - var t tserde - o.apply(&t) - return t.id -} - // ID forces Serde.Encode to use the specified schema ID. func ID(id int) EncodingOpt { - return idOpt{encodingOpt{func(opts *tserde) { opts.id = uint32(id) }}} -} - -// indexOpt is a special encodingOpt that allows for specifying the index when -// encoding a value using Serde.Encode. -type indexOpt struct{ encodingOpt } - -func (o indexOpt) Index() []int { - var t tserde - o.apply(&t) - return t.index + return encodingOpt{func(opts *tserde) { opts.id = uint32(id) }} } // Index attaches a message index to a value. A single schema ID can be @@ -108,7 +88,7 @@ func (o indexOpt) Index() []int { // // https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format func Index(index ...int) EncodingOpt { - return indexOpt{encodingOpt{func(t *tserde) { t.index = index }}} + return encodingOpt{func(t *tserde) { t.index = index }} } // Header defines the SerdeHeader used to encode and decode the message header. @@ -190,16 +170,6 @@ func (s *Serde) loadTypes() map[reflect.Type]tserde { return types.(map[reflect.Type]tserde) } -// SetDefaults sets default options to apply to every registered type. These -// options are always applied first, so you can override them as necessary when -// registering. -// -// This can be useful if you always want to use the same encoding or decoding -// functions. -func (s *Serde) SetDefaults(opts ...EncodingOpt) { - s.defaults = opts -} - // DecodeID decodes an ID from b, returning the ID and the remaining bytes, // or an error. func (s *Serde) DecodeID(b []byte) (id int, out []byte, err error) { @@ -325,22 +295,51 @@ func tserdeMapClone(m map[int]tserde, at int, index []int) map[int]tserde { return dup } -// Encode encodes a value and prepends the header according to the configured -// SerdeHeader. If EncodeFn was not used, this returns ErrNotRegistered. -func (s *Serde) Encode(v any, opts ...EncodingOpt) ([]byte, error) { - return s.AppendEncode(nil, v, opts...) +// Encode encodes a value with a prepended header according to the configured +// SerdeHeader. If EncodeFn was not registered, this returns ErrNotRegistered. +func (s *Serde) Encode(v any) ([]byte, error) { + return s.AppendEncode(nil, v) } -// AppendEncode appends an encoded value to b according to the schema registry -// wire format and returns it. If EncodeFn was not used, this returns -// ErrNotRegistered. -func (s *Serde) AppendEncode(b []byte, v any, opts ...EncodingOpt) ([]byte, error) { - t, err := s.encodeFind(v, opts) - if err != nil { +// AppendEncode encodes a value with a prepended header according to the +// configured SerdeHeader, appends it to b and returns b. If EncodeFn was not +// registered, this returns ErrNotRegistered. +func (s *Serde) AppendEncode(b []byte, v any) ([]byte, error) { + // Load tserde based on the registered type. + t := s.loadTypes()[reflect.TypeOf(v)] + return s.append(b, v, t) +} + +// DynEncode encodes a value with a prepended header according to the configured +// SerdeHeader. If EncodeFn was not registered using the supplied id and index, +// this returns ErrNotRegistered. +func (s *Serde) DynEncode(v any, id int, index []int) ([]byte, error) { + return s.DynAppendEncode(nil, v, id, index) +} + +// DynAppendEncode encodes a value with a prepended header according to the +// configured SerdeHeader, appends it to b and returns b. If EncodeFn was not +// registered using the supplied id and index, this returns ErrNotRegistered. +func (s *Serde) DynAppendEncode(b []byte, v any, id int, index []int) ([]byte, error) { + // Load tserde based on the supplied ID. + t := s.loadIDs()[id] + // Traverse to the right index, if index is supplied. + for _, i := range index { + if len(t.subindex) <= i { + return nil, ErrNotRegistered + } + t = t.subindex[i] + } + return s.append(b, v, t) +} + +func (s *Serde) append(b []byte, v any, t tserde) ([]byte, error) { + // Check if we loaded a valid tserde. + if !t.exists || (t.encode == nil && t.appendEncode == nil) { return nil, ErrNotRegistered } - b, err = s.header().AppendEncode(b, int(t.id), t.index) + b, err := s.header().AppendEncode(b, int(t.id), t.index) if err != nil { return nil, err } @@ -355,55 +354,40 @@ func (s *Serde) AppendEncode(b []byte, v any, opts ...EncodingOpt) ([]byte, erro return append(b, encoded...), nil } -func (s *Serde) encodeFind(v any, opts []EncodingOpt) (tserde, error) { - optsToApply, idopt, indexopt, err := s.filterIDAndIndexOpt(opts) +// MustEncode returns the value of Encode, panicking on error. This is a +// shortcut for if your encode function cannot error. +func (s *Serde) MustEncode(v any) []byte { + b, err := s.Encode(v) if err != nil { - return tserde{}, err - } - - var t tserde - if idopt != nil { - // Load tserde based on the supplied ID. - t = s.loadIDs()[int(idopt.ID())] - // Traverse to the right index, if Index option is supplied. - if indexopt != nil { - for _, i := range indexopt.Index() { - if len(t.subindex) <= i { - return tserde{}, ErrNotRegistered - } - t = t.subindex[i] - } - } - } else { - // Load tserde based on the registered type. - t = s.loadTypes()[reflect.TypeOf(v)] - } - - // Check if we loaded a valid tserde. - if !t.exists || (t.encode == nil && t.appendEncode == nil) { - return tserde{}, ErrNotRegistered + panic(err) } + return b +} - for _, opt := range optsToApply { - opt.apply(&t) +// MustAppendEncode returns the value of AppendEncode, panicking on error. +// This is a shortcut for if your encode function cannot error. +func (s *Serde) MustAppendEncode(b []byte, v any) []byte { + b, err := s.AppendEncode(b, v) + if err != nil { + panic(err) } - return t, nil + return b } -// MustEncode returns the value of Encode, panicking on error. This is a +// MustDynEncode returns the value of DynEncode, panicking on error. This is a // shortcut for if your encode function cannot error. -func (s *Serde) MustEncode(v any, opts ...EncodingOpt) []byte { - b, err := s.Encode(v, opts...) +func (s *Serde) MustDynEncode(v any, id int, index []int) []byte { + b, err := s.DynEncode(v, id, index) if err != nil { panic(err) } return b } -// MustAppendEncode returns the value of AppendEncode, panicking on error. +// MustDynAppendEncode returns the value of DynAppendEncode, panicking on error. // This is a shortcut for if your encode function cannot error. -func (s *Serde) MustAppendEncode(b []byte, v any, opts ...EncodingOpt) []byte { - b, err := s.AppendEncode(b, v, opts...) +func (s *Serde) MustDynAppendEncode(b []byte, v any, id int, index []int) []byte { + b, err := s.DynAppendEncode(b, v, id, index) if err != nil { panic(err) } @@ -416,8 +400,8 @@ func (s *Serde) MustAppendEncode(b []byte, v any, opts ...EncodingOpt) []byte { // Serde does not handle references in schemas; it is up to you to register the // full decode function for any top-level ID, regardless of how many other // schemas are referenced in top-level ID. -func (s *Serde) Decode(b []byte, v any, opts ...EncodingOpt) error { - b, t, err := s.decodeFind(b, opts) +func (s *Serde) Decode(b []byte, v any) error { + b, t, err := s.decodeFind(b) if err != nil { return err } @@ -428,8 +412,8 @@ func (s *Serde) Decode(b []byte, v any, opts ...EncodingOpt) error { // the input value. If DecodeFn was not used, this returns ErrNotRegistered. // GenerateFn can be used to control the instantiation of a new value, // otherwise this uses reflect.New(reflect.TypeOf(v)).Interface(). -func (s *Serde) DecodeNew(b []byte, opts ...EncodingOpt) (any, error) { - b, t, err := s.decodeFind(b, opts) +func (s *Serde) DecodeNew(b []byte) (any, error) { + b, t, err := s.decodeFind(b) if err != nil { return nil, err } @@ -444,44 +428,23 @@ func (s *Serde) DecodeNew(b []byte, opts ...EncodingOpt) (any, error) { return v, t.decode(b, v) } -func (s *Serde) decodeFind(b []byte, opts []EncodingOpt) ([]byte, tserde, error) { - optsToApply, idopt, indexopt, err := s.filterIDAndIndexOpt(opts) +func (s *Serde) decodeFind(b []byte) ([]byte, tserde, error) { + id, b, err := s.DecodeID(b) if err != nil { return nil, tserde{}, err } - - var t tserde - if idopt != nil { - // Load tserde based on the supplied ID. - t = s.loadIDs()[int(idopt.ID())] - // Traverse to the right index, if Index option is supplied. - if indexopt != nil { - for _, i := range indexopt.Index() { - if len(t.subindex) <= i { - return nil, tserde{}, ErrNotRegistered - } - t = t.subindex[i] - } - } - } else { - var id int - id, b, err = s.DecodeID(b) + t := s.loadIDs()[id] + if len(t.subindex) > 0 { + var index []int + index, b, err = s.DecodeIndex(b, t.subindexDepth) if err != nil { return nil, tserde{}, err } - t = s.loadIDs()[id] - if len(t.subindex) > 0 { - var index []int - index, b, err = s.DecodeIndex(b, t.subindexDepth) - if err != nil { - return nil, tserde{}, err - } - for _, idx := range index { - if t.subindex == nil { - return nil, tserde{}, ErrNotRegistered - } - t = t.subindex[idx] + for _, idx := range index { + if t.subindex == nil { + return nil, tserde{}, ErrNotRegistered } + t = t.subindex[idx] } } @@ -490,54 +453,9 @@ func (s *Serde) decodeFind(b []byte, opts []EncodingOpt) ([]byte, tserde, error) return nil, tserde{}, ErrNotRegistered } - for _, opt := range optsToApply { - opt.apply(&t) - } return b, t, nil } -func (s *Serde) filterIDAndIndexOpt(opts []EncodingOpt) ([]EncodingOpt, *idOpt, *indexOpt, error) { - var ( - idopt *idOpt - indexopt *indexOpt - ignoredIndices []int - ) - - // Find ID and Index options. In case multiple ID or Index options are - // supplied, only the last one is applied. - for i, opt := range opts { - switch opt := opt.(type) { - case idOpt: - idopt = &opt - case indexOpt: - indexopt = &opt - default: - continue - } - // Collect indices of all idOpt and indexOpt options. - ignoredIndices = append(ignoredIndices, i) - } - - if idopt == nil && indexopt == nil { - return opts, nil, nil, nil // no idOpt or indexOpt found - } else if idopt == nil && indexopt != nil { - return nil, nil, nil, errors.New("invalid use of encoding options: option Index is only allowed alongside option ID") - } else if len(ignoredIndices) == len(opts) { - return nil, idopt, indexopt, nil // all opts were filtered - } - - filteredOpts := make([]EncodingOpt, len(opts)-len(ignoredIndices)) - j := 0 - for i, opt := range opts { - if j < len(ignoredIndices) && i == ignoredIndices[j] { - j++ - continue - } - filteredOpts[i-j] = opt - } - return filteredOpts, idopt, indexopt, nil -} - // SerdeHeader encodes and decodes a message header. type SerdeHeader interface { // AppendEncode encodes a schema ID and optional index to b, returning the diff --git a/pkg/sr/serde_test.go b/pkg/sr/serde_test.go index 1c91eb27..af2ca602 100644 --- a/pkg/sr/serde_test.go +++ b/pkg/sr/serde_test.go @@ -100,7 +100,7 @@ func TestSerde(t *testing.T) { expMap: map[string]any{"bar": "bar"}, }, } { - if _, err := serde.Encode(test.enc, ID(99)); err != ErrNotRegistered { + if _, err := serde.DynEncode(test.enc, 99, nil); err != ErrNotRegistered { t.Errorf("got %v != exp ErrNotRegistered", err) } @@ -126,7 +126,7 @@ func TestSerde(t *testing.T) { t.Errorf("#%d got MustAppendEncode(%v) != Encode(foo%v)", i, b2, b) } - bIndented := serde.MustEncode(test.enc, ID(100), Index(0)) + bIndented := serde.MustDynEncode(test.enc, 100, []int{0}) if i := bytes.IndexByte(bIndented, '{'); !bytes.Equal(bIndented[:i], []byte{0, 0, 0, 0, 100, 0}) { t.Errorf("#%d got Encode[ID=100](%v) != exp(%v)", i, bIndented[:i], []byte{0, 0, 0, 0, 100, 0}) } else if expIndented := extractIndentedJSON(b); !bytes.Equal(bIndented[i:], expIndented) { @@ -148,15 +148,6 @@ func TestSerde(t *testing.T) { t.Errorf("#%d round trip: got %v != exp %v", i, v, exp) continue } - - gotMap, err := serde.DecodeNew(b[bytes.IndexByte(b, '{'):], ID(100), Index(0), GenerateFn(func() any { return new(map[string]any) })) - if err != nil { - t.Errorf("#%d DecodeNew: got unexpected err %v", i, err) - } - if !reflect.DeepEqual(gotMap, &test.expMap) { - t.Errorf("#%d decode new map: got %v != exp %v", i, gotMap, test.expMap) - continue - } } if _, err := serde.DecodeNew([]byte{1, 0, 0, 0, 0, 0}); err != ErrBadHeader { From 8591c0935304e41f452bdd16ea10511f891b7768 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Sun, 27 Aug 2023 14:48:07 +0200 Subject: [PATCH 07/10] remove option ID --- pkg/sr/serde.go | 15 +++------------ pkg/sr/serde_test.go | 2 +- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index d1fff2d8..51e926a3 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -70,11 +70,6 @@ func GenerateFn(fn func() any) EncodingOpt { return encodingOpt{func(t *tserde) { t.gen = fn }} } -// ID forces Serde.Encode to use the specified schema ID. -func ID(id int) EncodingOpt { - return encodingOpt{func(opts *tserde) { opts.id = uint32(id) }} -} - // Index attaches a message index to a value. A single schema ID can be // registered multiple times with different indices. // @@ -201,10 +196,6 @@ func (s *Serde) Register(id int, v any, opts ...EncodingOpt) { for _, opt := range opts { opt.apply(&t) } - if id > 0 { - // The explicitly supplied id takes precedence over an ID EncodingOpt. - t.id = uint32(id) - } typeof := reflect.TypeOf(v) @@ -218,7 +209,7 @@ func (s *Serde) Register(id int, v any, opts ...EncodingOpt) { } // For IDs, we deeply clone any path that is changing. - dupIDs := tserdeMapClone(s.loadIDs(), int(t.id), t.index) + dupIDs := tserdeMapClone(s.loadIDs(), id, t.index) // We defer the store because we modify the tserde below, and we // may delete a type key. @@ -231,7 +222,7 @@ func (s *Serde) Register(id int, v any, opts ...EncodingOpt) { // Now we have a full path down index initialized (or, the top // level map if there is no index). We iterate down the index // tree to find the end node we are initializing. - k := int(t.id) + k := id m := dupIDs at := m[k] depth := len(t.index) @@ -261,7 +252,7 @@ func (s *Serde) Register(id int, v any, opts ...EncodingOpt) { // Now, we initialize the end node. t = tserde{ - id: t.id, + id: uint32(id), exists: true, encode: t.encode, appendEncode: t.appendEncode, diff --git a/pkg/sr/serde_test.go b/pkg/sr/serde_test.go index af2ca602..c65a3b1f 100644 --- a/pkg/sr/serde_test.go +++ b/pkg/sr/serde_test.go @@ -53,7 +53,7 @@ func TestSerde(t *testing.T) { serde.Register(3, idx4{}, Index(0, 0, 1)) serde.Register(3, idx3{}, Index(0, 0)) serde.Register(5, oneidx{}, Index(0), GenerateFn(func() any { return &oneidx{Foo: "defoo", Bar: "debar"} })) - serde.Register(0, nil, ID(100), Index(0), EncodeFn(func(v any) ([]byte, error) { + serde.Register(100, nil, Index(0), EncodeFn(func(v any) ([]byte, error) { return json.MarshalIndent(v, "", " ") })) From 18b30d93b00417cab643d9d4a167e6a7ace45f74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Sun, 27 Aug 2023 14:56:07 +0200 Subject: [PATCH 08/10] rename append to encode --- pkg/sr/serde.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 51e926a3..159f338e 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -298,7 +298,7 @@ func (s *Serde) Encode(v any) ([]byte, error) { func (s *Serde) AppendEncode(b []byte, v any) ([]byte, error) { // Load tserde based on the registered type. t := s.loadTypes()[reflect.TypeOf(v)] - return s.append(b, v, t) + return s.encode(b, v, t) } // DynEncode encodes a value with a prepended header according to the configured @@ -321,10 +321,10 @@ func (s *Serde) DynAppendEncode(b []byte, v any, id int, index []int) ([]byte, e } t = t.subindex[i] } - return s.append(b, v, t) + return s.encode(b, v, t) } -func (s *Serde) append(b []byte, v any, t tserde) ([]byte, error) { +func (s *Serde) encode(b []byte, v any, t tserde) ([]byte, error) { // Check if we loaded a valid tserde. if !t.exists || (t.encode == nil && t.appendEncode == nil) { return nil, ErrNotRegistered From 172db908eae3980ad7c90a951e432e2d64a19520 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Sat, 24 Feb 2024 13:16:13 +0100 Subject: [PATCH 09/10] remove DynEncode method and replace it with a normal function --- pkg/sr/api.go | 4 +- pkg/sr/{config.go => clientopt.go} | 0 pkg/sr/enums.go | 2 +- pkg/sr/serde.go | 167 +++++++---------------------- pkg/sr/serde_test.go | 15 ++- pkg/sr/serdeopt.go | 72 +++++++++++++ 6 files changed, 118 insertions(+), 142 deletions(-) rename pkg/sr/{config.go => clientopt.go} (100%) create mode 100644 pkg/sr/serdeopt.go diff --git a/pkg/sr/api.go b/pkg/sr/api.go index 427eee24..14ea0036 100644 --- a/pkg/sr/api.go +++ b/pkg/sr/api.go @@ -216,7 +216,7 @@ func (cl *Client) SubjectsByID(ctx context.Context, id int) ([]string, error) { return subjects, err } -// SchemaVersion is a subject version pair. +// SubjectVersion is a subject version pair. type SubjectVersion struct { Subject string `json:"subject"` Version int `json:"version"` @@ -602,7 +602,7 @@ type SetCompatibility struct { OverrideRuleSet *SchemaRuleSet `json:"overrideRuleSet,omitempty"` // Override rule set used for schema registration. } -// SetCompatibilitysets the compatibility for each requested subject. The +// SetCompatibility sets the compatibility for each requested subject. The // global compatibility can be set by either using an empty subject or by // specifying no subjects. If specifying no subjects, this returns one element. func (cl *Client) SetCompatibility(ctx context.Context, compat SetCompatibility, subjects ...string) []CompatibilityResult { diff --git a/pkg/sr/config.go b/pkg/sr/clientopt.go similarity index 100% rename from pkg/sr/config.go rename to pkg/sr/clientopt.go diff --git a/pkg/sr/enums.go b/pkg/sr/enums.go index 2c1b1ca7..f37c1038 100644 --- a/pkg/sr/enums.go +++ b/pkg/sr/enums.go @@ -197,7 +197,7 @@ func (k *SchemaRuleKind) UnmarshalText(text []byte) error { return nil } -// Mode specifies a schema rule's mode. +// SchemaRuleMode specifies a schema rule's mode. // // Migration rules can be specified for an UPGRADE, DOWNGRADE, or both // (UPDOWN). Migration rules are used during complex schema evolution. diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index f6c4fc98..8315d266 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -20,77 +20,6 @@ var ( ErrBadHeader = errors.New("5 byte header for value is missing or does not have 0 magic byte") ) -type ( - // EncodingOpt is an option to configure the behavior of Serde.Encode and - // Serde.Decode. - EncodingOpt interface { - serdeOrEncodingOpt() - apply(*tserde) - } - encodingOpt struct{ fn func(*tserde) } - - // SerdeOpt is an option to configure a Serde. - SerdeOpt interface { - serdeOrEncodingOpt() - apply(*Serde) - } - serdeOpt struct{ fn func(serde *Serde) } - - // SerdeOrEncodingOpt is either a SerdeOpt or EncodingOpt. - SerdeOrEncodingOpt interface { - serdeOrEncodingOpt() - } -) - -func (o serdeOpt) serdeOrEncodingOpt() { /* satisfy interface */ } -func (o serdeOpt) apply(s *Serde) { o.fn(s) } - -func (o encodingOpt) serdeOrEncodingOpt() { /* satisfy interface */ } -func (o encodingOpt) apply(t *tserde) { o.fn(t) } - -// EncodeFn allows Serde to encode a value. -func EncodeFn(fn func(any) ([]byte, error)) EncodingOpt { - return encodingOpt{func(t *tserde) { t.encode = fn }} -} - -// AppendEncodeFn allows Serde to encode a value to an existing slice. This -// can be more efficient than EncodeFn; this function is used if it exists. -func AppendEncodeFn(fn func([]byte, any) ([]byte, error)) EncodingOpt { - return encodingOpt{func(t *tserde) { t.appendEncode = fn }} -} - -// DecodeFn allows Serde to decode into a value. -func DecodeFn(fn func([]byte, any) error) EncodingOpt { - return encodingOpt{func(t *tserde) { t.decode = fn }} -} - -// GenerateFn returns a new(Value) that can be decoded into. This function can -// be used to control the instantiation of a new type for DecodeNew. -func GenerateFn(fn func() any) EncodingOpt { - return encodingOpt{func(t *tserde) { t.gen = fn }} -} - -// Index attaches a message index to a value. A single schema ID can be -// registered multiple times with different indices. -// -// This option supports schemas that encode many different values from the same -// schema (namely, protobuf). The index into the schema to encode a -// particular message is specified with `index`. -// -// NOTE: this option must be used for protobuf schemas. -// -// For more information, see where `message-indexes` are described in: -// -// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format -func Index(index ...int) EncodingOpt { - return encodingOpt{func(t *tserde) { t.index = index }} -} - -// Header defines the SerdeHeader used to encode and decode the message header. -func Header(header SerdeHeader) SerdeOpt { - return serdeOpt{func(s *Serde) { s.h = header }} -} - type tserde struct { id uint32 exists bool @@ -286,63 +215,37 @@ func tserdeMapClone(m map[int]tserde, at int, index []int) map[int]tserde { return dup } -// Encode encodes a value with a prepended header according to the configured -// SerdeHeader. If EncodeFn was not registered, this returns ErrNotRegistered. +// Encode encodes a value and prepends the header according to the configured +// SerdeHeader. If EncodeFn was not used, this returns ErrNotRegistered. func (s *Serde) Encode(v any) ([]byte, error) { return s.AppendEncode(nil, v) } -// AppendEncode encodes a value with a prepended header according to the +// AppendEncode encodes a value and prepends the header according to the // configured SerdeHeader, appends it to b and returns b. If EncodeFn was not // registered, this returns ErrNotRegistered. func (s *Serde) AppendEncode(b []byte, v any) ([]byte, error) { // Load tserde based on the registered type. t := s.loadTypes()[reflect.TypeOf(v)] - return s.encode(b, v, t) -} - -// DynEncode encodes a value with a prepended header according to the configured -// SerdeHeader. If EncodeFn was not registered using the supplied id and index, -// this returns ErrNotRegistered. -func (s *Serde) DynEncode(v any, id int, index []int) ([]byte, error) { - return s.DynAppendEncode(nil, v, id, index) -} -// DynAppendEncode encodes a value with a prepended header according to the -// configured SerdeHeader, appends it to b and returns b. If EncodeFn was not -// registered using the supplied id and index, this returns ErrNotRegistered. -func (s *Serde) DynAppendEncode(b []byte, v any, id int, index []int) ([]byte, error) { - // Load tserde based on the supplied ID. - t := s.loadIDs()[id] - // Traverse to the right index, if index is supplied. - for _, i := range index { - if len(t.subindex) <= i { - return nil, ErrNotRegistered - } - t = t.subindex[i] - } - return s.encode(b, v, t) -} - -func (s *Serde) encode(b []byte, v any, t tserde) ([]byte, error) { // Check if we loaded a valid tserde. if !t.exists || (t.encode == nil && t.appendEncode == nil) { return nil, ErrNotRegistered } - b, err := s.header().AppendEncode(b, int(t.id), t.index) - if err != nil { - return nil, err + appendEncode := t.appendEncode + if appendEncode == nil { + // Fallback to t.encode. + appendEncode = func(b []byte, v any) ([]byte, error) { + encoded, err := t.encode(v) + if err != nil { + return nil, err + } + return append(b, encoded...), nil + } } - if t.appendEncode != nil { - return t.appendEncode(b, v) - } - encoded, err := t.encode(v) - if err != nil { - return nil, err - } - return append(b, encoded...), nil + return AppendEncode(b, v, int(t.id), t.index, s.header(), appendEncode) } // MustEncode returns the value of Encode, panicking on error. This is a @@ -365,26 +268,6 @@ func (s *Serde) MustAppendEncode(b []byte, v any) []byte { return b } -// MustDynEncode returns the value of DynEncode, panicking on error. This is a -// shortcut for if your encode function cannot error. -func (s *Serde) MustDynEncode(v any, id int, index []int) []byte { - b, err := s.DynEncode(v, id, index) - if err != nil { - panic(err) - } - return b -} - -// MustDynAppendEncode returns the value of DynAppendEncode, panicking on error. -// This is a shortcut for if your encode function cannot error. -func (s *Serde) MustDynAppendEncode(b []byte, v any, id int, index []int) []byte { - b, err := s.DynAppendEncode(b, v, id, index) - if err != nil { - panic(err) - } - return b -} - // Decode decodes b into v. If DecodeFn option was not used, this returns // ErrNotRegistered. // @@ -447,6 +330,28 @@ func (s *Serde) decodeFind(b []byte) ([]byte, tserde, error) { return b, t, nil } +// Encode encodes a value and prepends the header. If the encoding function +// fails, this returns an error. +func Encode(v any, id int, index []int, h SerdeHeader, enc func(any) ([]byte, error)) ([]byte, error) { + return AppendEncode(nil, v, id, index, h, func(b []byte, val any) ([]byte, error) { + encoded, err := enc(val) + if err != nil { + return nil, err + } + return append(b, encoded...), nil + }) +} + +// AppendEncode encodes a value and prepends the header, appends it to b and +// returns b. If the encoding function fails, this returns an error. +func AppendEncode(b []byte, v any, id int, index []int, h SerdeHeader, enc func([]byte, any) ([]byte, error)) ([]byte, error) { + b, err := h.AppendEncode(b, id, index) + if err != nil { + return nil, err + } + return enc(b, v) +} + // SerdeHeader encodes and decodes a message header. type SerdeHeader interface { // AppendEncode encodes a schema ID and optional index to b, returning the diff --git a/pkg/sr/serde_test.go b/pkg/sr/serde_test.go index 8615eedb..fb0a3bec 100644 --- a/pkg/sr/serde_test.go +++ b/pkg/sr/serde_test.go @@ -53,9 +53,6 @@ func TestSerde(t *testing.T) { serde.Register(3, idx4{}, Index(0, 0, 1)) serde.Register(3, idx3{}, Index(0, 0)) serde.Register(5, oneidx{}, Index(0), GenerateFn(func() any { return &oneidx{Foo: "defoo", Bar: "debar"} })) - serde.Register(100, nil, Index(0), EncodeFn(func(v any) ([]byte, error) { - return json.MarshalIndent(v, "", " ") - })) for i, test := range []struct { enc any @@ -100,10 +97,6 @@ func TestSerde(t *testing.T) { expMap: map[string]any{"bar": "bar"}, }, } { - if _, err := serde.DynEncode(test.enc, 99, nil); err != ErrNotRegistered { - t.Errorf("got %v != exp ErrNotRegistered", err) - } - b, err := serde.Encode(test.enc) gotErr := err != nil if gotErr != test.expErr { @@ -126,7 +119,13 @@ func TestSerde(t *testing.T) { t.Errorf("#%d got MustAppendEncode(%v) != Encode(foo%v)", i, b2, b) } - bIndented := serde.MustDynEncode(test.enc, 100, []int{0}) + bIndented, err := Encode(test.enc, 100, []int{0}, serde.header(), func(v any) ([]byte, error) { + return json.MarshalIndent(v, "", " ") + }) + if err != nil { + t.Errorf("#%d Encode[ID=100]: got err? %v, exp err? %v", i, gotErr, test.expErr) + continue + } if i := bytes.IndexByte(bIndented, '{'); !bytes.Equal(bIndented[:i], []byte{0, 0, 0, 0, 100, 0}) { t.Errorf("#%d got Encode[ID=100](%v) != exp(%v)", i, bIndented[:i], []byte{0, 0, 0, 0, 100, 0}) } else if expIndented := extractIndentedJSON(b); !bytes.Equal(bIndented[i:], expIndented) { diff --git a/pkg/sr/serdeopt.go b/pkg/sr/serdeopt.go new file mode 100644 index 00000000..196a1c3a --- /dev/null +++ b/pkg/sr/serdeopt.go @@ -0,0 +1,72 @@ +package sr + +type ( + // EncodingOpt is an option to configure the behavior of Serde.Encode and + // Serde.Decode. + EncodingOpt interface { + serdeOrEncodingOpt() + apply(*tserde) + } + encodingOpt struct{ fn func(*tserde) } + + // SerdeOpt is an option to configure Serde. + SerdeOpt interface { + serdeOrEncodingOpt() + apply(*Serde) + } + serdeOpt struct{ fn func(serde *Serde) } + + // SerdeOrEncodingOpt is either a SerdeOpt or EncodingOpt. + SerdeOrEncodingOpt interface { + serdeOrEncodingOpt() + } +) + +func (o serdeOpt) serdeOrEncodingOpt() { /* satisfy interface */ } +func (o serdeOpt) apply(s *Serde) { o.fn(s) } + +func (o encodingOpt) serdeOrEncodingOpt() { /* satisfy interface */ } +func (o encodingOpt) apply(t *tserde) { o.fn(t) } + +// EncodeFn allows Serde to encode a value. +func EncodeFn(fn func(any) ([]byte, error)) EncodingOpt { + return encodingOpt{func(t *tserde) { t.encode = fn }} +} + +// AppendEncodeFn allows Serde to encode a value to an existing slice. This +// can be more efficient than EncodeFn; this function is used if it exists. +func AppendEncodeFn(fn func([]byte, any) ([]byte, error)) EncodingOpt { + return encodingOpt{func(t *tserde) { t.appendEncode = fn }} +} + +// DecodeFn allows Serde to decode into a value. +func DecodeFn(fn func([]byte, any) error) EncodingOpt { + return encodingOpt{func(t *tserde) { t.decode = fn }} +} + +// GenerateFn returns a new(Value) that can be decoded into. This function can +// be used to control the instantiation of a new type for DecodeNew. +func GenerateFn(fn func() any) EncodingOpt { + return encodingOpt{func(t *tserde) { t.gen = fn }} +} + +// Index attaches a message index to a value. A single schema ID can be +// registered multiple times with different indices. +// +// This option supports schemas that encode many different values from the same +// schema (namely, protobuf). The index into the schema to encode a +// particular message is specified with `index`. +// +// NOTE: this option must be used for protobuf schemas. +// +// For more information, see where `message-indexes` are described in: +// +// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format +func Index(index ...int) EncodingOpt { + return encodingOpt{func(t *tserde) { t.index = index }} +} + +// Header defines the SerdeHeader used to encode and decode the message header. +func Header(header SerdeHeader) SerdeOpt { + return serdeOpt{func(s *Serde) { s.h = header }} +} From 2ffcf7422b1e916ab01e1f00449032306da1ad91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Sat, 24 Feb 2024 13:24:57 +0100 Subject: [PATCH 10/10] remove unused field in test cases --- pkg/sr/serde_test.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/sr/serde_test.go b/pkg/sr/serde_test.go index fb0a3bec..931768cb 100644 --- a/pkg/sr/serde_test.go +++ b/pkg/sr/serde_test.go @@ -59,7 +59,6 @@ func TestSerde(t *testing.T) { expEnc []byte expDec any expErr bool - expMap map[string]any }{ { enc: overridden{}, @@ -68,33 +67,27 @@ func TestSerde(t *testing.T) { { enc: overrides{"foo"}, expEnc: append([]byte{0, 0, 0, 0, 127}, `{"one":"foo"}`...), - expMap: map[string]any{"one": "foo"}, }, { enc: idx1{Two: 2, Three: 3}, expEnc: append([]byte{0, 0, 0, 0, 3, 0}, `{"two":2,"three":3}`...), - expMap: map[string]any{"two": float64(2), "three": float64(3)}, }, { enc: idx2{Biz: "bizzy", Baz: "bazzy"}, expEnc: append([]byte{0, 0, 0, 0, 3, 2, 2}, `{"biz":"bizzy","baz":"bazzy"}`...), - expMap: map[string]any{"biz": "bizzy", "baz": "bazzy"}, }, { enc: idx3{Boz: 8}, expEnc: append([]byte{0, 0, 0, 0, 3, 4, 0, 0}, `{"boz":8}`...), - expMap: map[string]any{"boz": float64(8)}, }, { enc: idx4{Bingo: "bango"}, expEnc: append([]byte{0, 0, 0, 0, 3, 6, 0, 0, 2}, `{"bingo":"bango"}`...), - expMap: map[string]any{"bingo": "bango"}, }, { enc: oneidx{Bar: "bar"}, expEnc: append([]byte{0, 0, 0, 0, 5, 0}, `{"bar":"bar"}`...), expDec: oneidx{Foo: "defoo", Bar: "bar"}, - expMap: map[string]any{"bar": "bar"}, }, } { b, err := serde.Encode(test.enc)