Skip to content

Commit

Permalink
[481]: Add JSON schema, improve error messages
Browse files Browse the repository at this point in the history
feat/chore: Adjust JSON schema; improve error messages
  • Loading branch information
nickdnk authored Oct 25, 2024
2 parents 85b40cf + 3995794 commit 9334a81
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 126 deletions.
16 changes: 8 additions & 8 deletions kafkajobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ func (c *config) InitDefault() ([]kgo.Opt, error) {
// check for the key and cert files
if _, err := os.Stat(c.TLS.Key); err != nil {
if os.IsNotExist(err) {
return nil, errors.Errorf("key file '%s' does not exists", c.TLS.Key)
return nil, errors.Errorf("private key file '%s' does not exist", c.TLS.Key)
}

return nil, err
}

if _, err := os.Stat(c.TLS.Cert); err != nil {
if os.IsNotExist(err) {
return nil, errors.Errorf("cert file '%s' does not exists", c.TLS.Cert)
return nil, errors.Errorf("public certificate file '%s' does not exist", c.TLS.Cert)
}

return nil, err
Expand All @@ -63,7 +63,7 @@ func (c *config) InitDefault() ([]kgo.Opt, error) {
if c.TLS.RootCA != "" {
if _, err := os.Stat(c.TLS.RootCA); err != nil {
if os.IsNotExist(err) {
return nil, errors.Errorf("rootCA file '%s' does not exists", c.TLS.RootCA)
return nil, errors.Errorf("root CA file '%s' does not exist", c.TLS.RootCA)
}

return nil, err
Expand Down Expand Up @@ -116,7 +116,7 @@ func (c *config) InitDefault() ([]kgo.Opt, error) {
case awsMskIam:
sess, err := session.NewSession()
if err != nil {
return nil, errors.Errorf("unable to initialize aws session: %v", err)
return nil, errors.Errorf("unable to initialize AWS session: %v", err)
}

opts = append(opts, kgo.SASL(aws.ManagedStreamingIAM(func(ctx context.Context) (aws.Auth, error) {
Expand Down Expand Up @@ -144,7 +144,7 @@ func (c *config) InitDefault() ([]kgo.Opt, error) {

if c.GroupOpts != nil {
if c.GroupOpts.GroupID == "" {
return nil, errors.Str("no group for the group options")
return nil, errors.Str("no group ID defined for group options")
}

opts = append(opts, kgo.ConsumerGroup(c.GroupOpts.GroupID))
Expand Down Expand Up @@ -210,7 +210,7 @@ func (c *config) InitDefault() ([]kgo.Opt, error) {
opts = append(opts, kgo.ConsumeTopics(c.ConsumerOpts.Topics...))
case len(c.ConsumerOpts.ConsumePartitions) > 0:
default:
return nil, errors.Str("topics should not be empty for the consumer")
return nil, errors.Str("topics and consume partitions should not be empty for the consumer")
}

if c.ConsumerOpts.ConsumerOffset != nil {
Expand Down Expand Up @@ -334,7 +334,7 @@ func (c *config) tlsConfig() (*tls.Config, error) {
}

if ok := certPool.AppendCertsFromPEM(rca); !ok {
return nil, errors.Errorf("could not append Certs from PEM")
return nil, errors.Errorf("could not append certificates from Root CA file '%s'", c.TLS.RootCA)
}

tlsDialerConfig.Certificates = []tls.Certificate{cert}
Expand All @@ -353,7 +353,7 @@ func (c *config) tlsConfig() (*tls.Config, error) {

func (c *config) enableTLS() bool {
if c.TLS != nil {
return (c.TLS.RootCA != "" && c.TLS.Key != "" && c.TLS.Cert != "") || (c.TLS.Key != "" && c.TLS.Cert != "")
return c.TLS.Key != "" && c.TLS.Cert != ""
}
return false
}
Loading

0 comments on commit 9334a81

Please sign in to comment.