From a7389e89ec96cbb16a7c61a49b5660a48484c367 Mon Sep 17 00:00:00 2001 From: oktaykcr Date: Sat, 24 Aug 2024 18:29:46 +0300 Subject: [PATCH 1/4] feat: write json and string methods for producer and consumer configs (#140) --- balancer.go | 19 +++++ balancer_test.go | 52 ++++++++++++ consumer_config.go | 55 +++++++++++++ consumer_config_test.go | 171 ++++++++++++++++++++++++++++++++++++++++ mechanism.go | 8 ++ mechanism_test.go | 34 ++++++++ producer_config.go | 27 +++++++ producer_config_test.go | 111 +++++++++++++++++++++++++- tls.go | 7 ++ tls_test.go | 30 +++++++ 10 files changed, 513 insertions(+), 1 deletion(-) create mode 100644 mechanism_test.go diff --git a/balancer.go b/balancer.go index 9319c67..de5abc5 100644 --- a/balancer.go +++ b/balancer.go @@ -27,3 +27,22 @@ func GetBalancerReferenceHash() Balancer { func GetBalancerRoundRobin() Balancer { return &kafka.RoundRobin{} } + +func GetBalancerString(balancer Balancer) string { + switch balancer.(type) { + case *kafka.CRC32Balancer: + return "CRC32Balancer" + case *kafka.Hash: + return "Hash" + case *kafka.LeastBytes: + return "LeastBytes" + case *kafka.Murmur2Balancer: + return "Murmur2Balancer" + case *kafka.ReferenceHash: + return "ReferenceHash" + case *kafka.RoundRobin: + return "RoundRobin" + default: + return "Unknown" + } +} diff --git a/balancer_test.go b/balancer_test.go index 11da72d..331f6f6 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -64,3 +64,55 @@ func TestGetBalancerRoundRobinh(t *testing.T) { t.Errorf("Expected *kafka.RoundRobin, got %s", reflect.TypeOf(balancer).String()) } } + +func TestGetBalancerString(t *testing.T) { + + tests := []struct { + name string + balancer Balancer + want string + }{ + { + name: "Should_Return_CRC32Balancer", + balancer: GetBalancerCRC32(), + want: "CRC32Balancer", + }, + { + name: "Should_Return_Hash", + balancer: GetBalancerHash(), + want: "Hash", + }, + { + name: "Should_Return_LeastBytes", + balancer: GetBalancerLeastBytes(), + want: "LeastBytes", + }, + { + name: "Should_Return_Murmur2Balancer", + balancer: GetBalancerMurmur2Balancer(), + want: "Murmur2Balancer", + }, + { + name: "Should_Return_ReferenceHash", + balancer: GetBalancerReferenceHash(), + want: "ReferenceHash", + }, + { + name: "Should_Return_RoundRobin", + balancer: GetBalancerRoundRobin(), + want: "RoundRobin", + }, + { + name: "Should_Return_Unknown", + balancer: nil, + want: "Unknown", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := GetBalancerString(tt.balancer); got != tt.want { + t.Errorf("GetBalancerString() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/consumer_config.go b/consumer_config.go index e3cf8d1..ceec7b1 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -1,6 +1,11 @@ package kafka import ( + "bytes" + "encoding/json" + "fmt" + "regexp" + "strings" "time" "github.com/segmentio/kafka-go" @@ -61,6 +66,47 @@ type ConsumerConfig struct { MetricPrefix string } +func (cfg RetryConfiguration) Json() string { + return fmt.Sprintf(`{"Brokers": ["%s"], "Topic": %q, "StartTimeCron": %q, "WorkDuration": %q, "MaxRetry": %d, "VerifyTopicOnStartup": %t, "Rack": %q}`, + strings.Join(cfg.Brokers, "\", \""), cfg.Topic, cfg.StartTimeCron, + cfg.WorkDuration, cfg.MaxRetry, cfg.VerifyTopicOnStartup, cfg.Rack) +} + +func (cfg *BatchConfiguration) Json() string { + if cfg == nil { + return "{}" + } + return fmt.Sprintf(`{"MessageGroupLimit": %d}`, cfg.MessageGroupLimit) +} + +func (cfg ReaderConfig) Json() string { + return fmt.Sprintf(`{"Brokers": ["%s"], "GroupId": %q, "GroupTopics": ["%s"], "MaxWait": %q, "CommitInterval": %q, "StartOffset": %q}`, + strings.Join(cfg.Brokers, "\", \""), cfg.GroupID, strings.Join(cfg.GroupTopics, "\", \""), + cfg.MaxWait, cfg.CommitInterval, kcronsumer.ToStringOffset(cfg.StartOffset)) +} + +func (cfg *ConsumerConfig) Json() string { + if cfg == nil { + return "{}" + } + return fmt.Sprintf(`{"ClientID": %q, "Reader": %s, "BatchConfiguration": %s, "MessageGroupDuration": %q, "TransactionalRetry": %t, "Concurrency": %d, "RetryEnabled": %t, "RetryConfiguration": %s, "VerifyTopicOnStartup": %t, "Rack": %q, "SASL": %s, "TLS": %s}`, + cfg.ClientID, cfg.Reader.Json(), cfg.BatchConfiguration.Json(), + cfg.MessageGroupDuration, *cfg.TransactionalRetry, cfg.Concurrency, + cfg.RetryEnabled, cfg.RetryConfiguration.Json(), cfg.VerifyTopicOnStartup, + cfg.Rack, cfg.SASL.Json(), cfg.TLS.Json()) +} + +func (cfg *ConsumerConfig) JsonPretty() string { + return jsonPretty(cfg.Json()) +} + +func (cfg *ConsumerConfig) String() string { + re := regexp.MustCompile(`"(\w+)"\s*:`) + modifiedString := re.ReplaceAllString(cfg.Json(), `$1:`) + modifiedString = modifiedString[1 : len(modifiedString)-1] + return modifiedString +} + func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { cronsumerCfg := kcronsumer.Config{ MetricPrefix: cfg.RetryConfiguration.MetricPrefix, @@ -266,3 +312,12 @@ func (cfg *ConsumerConfig) setDefaults() { func NewBoolPtr(value bool) *bool { return &value } + +func jsonPretty(jsonString string) string { + var out bytes.Buffer + err := json.Indent(&out, []byte(jsonString), "", "\t") + if err != nil { + return jsonString + } + return out.String() +} diff --git a/consumer_config_test.go b/consumer_config_test.go index 98a8333..2fc9586 100644 --- a/consumer_config_test.go +++ b/consumer_config_test.go @@ -189,3 +189,174 @@ func TestConsumerConfig_getTopics(t *testing.T) { } }) } + +func Test_jsonPretty(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + { + name: "Simple JSON", + input: `{"key1":"value1","key2":2}`, + expected: "{\n\t\"key1\": \"value1\",\n\t\"key2\": 2\n}", + }, + { + name: "Nested JSON", + input: `{"key1":"value1","key2":{"nestedKey1":1,"nestedKey2":2},"key3":[1,2,3]}`, + expected: "{\n\t\"key1\": \"value1\",\n\t\"key2\": {\n\t\t\"nestedKey1\": 1,\n\t\t\"nestedKey2\": 2\n\t},\n\t\"key3\": [\n\t\t1,\n\t\t2,\n\t\t3\n\t]\n}", + }, + { + name: "Invalid JSON", + input: `{"key1": "value1", "key2": 2`, + expected: `{"key1": "value1", "key2": 2`, + }, + { + name: "Empty JSON", + input: ``, + expected: ``, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := jsonPretty(tt.input) + if got != tt.expected { + t.Errorf("jsonPretty() = %v, want %v", got, tt.expected) + } + }) + } +} + +func TestConsumerConfig_Json(t *testing.T) { + t.Run("Should_Convert_Nil_Config_To_Json", func(t *testing.T) { + // Given + var config *ConsumerConfig + expected := "{}" + // When + result := config.Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json", func(t *testing.T) { + // Given + expected := "{\"ClientID\": \"test-consumer-client-id\", \"Reader\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], \"GroupId\": \"test-consumer.0\", \"GroupTopics\": [\"test-updated.0\"], \"MaxWait\": \"2s\", \"CommitInterval\": \"1s\", \"StartOffset\": \"earliest\"}, \"BatchConfiguration\": {\"MessageGroupLimit\": 100}, \"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, \"RetryEnabled\": true, \"RetryConfiguration\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], \"Topic\": \"test-exception.0\", \"StartTimeCron\": \"*/2 * * * *\", \"WorkDuration\": \"1m0s\", \"MaxRetry\": 3, \"VerifyTopicOnStartup\": true, \"Rack\": \"\"}, \"VerifyTopicOnStartup\": true, \"Rack\": \"stage\", \"SASL\": {\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}, \"TLS\": {\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}}" + // When + result := getConsumerConfigExample().Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "{\"ClientID\": \"test-consumer-client-id\", \"Reader\": {\"Brokers\": [\"\"], \"GroupId\": \"\", \"GroupTopics\": [\"\"], \"MaxWait\": \"0s\", \"CommitInterval\": \"0s\", \"StartOffset\": \"earliest\"}, \"BatchConfiguration\": {}, \"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, \"RetryEnabled\": true, \"RetryConfiguration\": {\"Brokers\": [\"\"], \"Topic\": \"\", \"StartTimeCron\": \"\", \"WorkDuration\": \"0s\", \"MaxRetry\": 0, \"VerifyTopicOnStartup\": false, \"Rack\": \"\"}, \"VerifyTopicOnStartup\": true, \"Rack\": \"stage\", \"SASL\": {}, \"TLS\": {}}" + // When + result := getConsumerConfigWithoutInnerObjectExample().Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func TestConsumerConfig_String(t *testing.T) { + t.Run("Should_Convert_To_String", func(t *testing.T) { + // Given + expected := "ClientID: \"test-consumer-client-id\", Reader: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], GroupId: \"test-consumer.0\", GroupTopics: [\"test-updated.0\"], MaxWait: \"2s\", CommitInterval: \"1s\", StartOffset: \"earliest\"}, BatchConfiguration: {MessageGroupLimit: 100}, MessageGroupDuration: \"20ns\", TransactionalRetry: false, Concurrency: 10, RetryEnabled: true, RetryConfiguration: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], Topic: \"test-exception.0\", StartTimeCron: \"*/2 * * * *\", WorkDuration: \"1m0s\", MaxRetry: 3, VerifyTopicOnStartup: true, Rack: \"\"}, VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {Mechanism: \"scram\", Username: \"user\", Password: \"pass\"}, TLS: {RootCAPath: \"resources/ca\", IntermediateCAPath: \"resources/intCa\"}" + // When + result := getConsumerConfigExample().String() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_String_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "ClientID: \"test-consumer-client-id\", Reader: {Brokers: [\"\"], GroupId: \"\", GroupTopics: [\"\"], MaxWait: \"0s\", CommitInterval: \"0s\", StartOffset: \"earliest\"}, BatchConfiguration: {}, MessageGroupDuration: \"20ns\", TransactionalRetry: false, Concurrency: 10, RetryEnabled: true, RetryConfiguration: {Brokers: [\"\"], Topic: \"\", StartTimeCron: \"\", WorkDuration: \"0s\", MaxRetry: 0, VerifyTopicOnStartup: false, Rack: \"\"}, VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {}, TLS: {}" + // When + result := getConsumerConfigWithoutInnerObjectExample().String() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func TestConsumerConfig_JsonPretty(t *testing.T) { + t.Run("Should_Convert_To_Pretty_Json", func(t *testing.T) { + // Given + expected := "{\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"Reader\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"GroupId\": \"test-consumer.0\",\n\t\t\"GroupTopics\": [\n\t\t\t\"test-updated.0\"\n\t\t],\n\t\t\"MaxWait\": \"2s\",\n\t\t\"CommitInterval\": \"1s\",\n\t\t\"StartOffset\": \"earliest\"\n\t},\n\t\"BatchConfiguration\": {\n\t\t\"MessageGroupLimit\": 100\n\t},\n\t\"MessageGroupDuration\": \"20ns\",\n\t\"TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"RetryEnabled\": true,\n\t\"RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"Topic\": \"test-exception.0\",\n\t\t\"StartTimeCron\": \"*/2 * * * *\",\n\t\t\"WorkDuration\": \"1m0s\",\n\t\t\"MaxRetry\": 3,\n\t\t\"VerifyTopicOnStartup\": true,\n\t\t\"Rack\": \"\"\n\t},\n\t\"VerifyTopicOnStartup\": true,\n\t\"Rack\": \"stage\",\n\t\"SASL\": {\n\t\t\"Mechanism\": \"scram\",\n\t\t\"Username\": \"user\",\n\t\t\"Password\": \"pass\"\n\t},\n\t\"TLS\": {\n\t\t\"RootCAPath\": \"resources/ca\",\n\t\t\"IntermediateCAPath\": \"resources/intCa\"\n\t}\n}" + // When + result := getConsumerConfigExample().JsonPretty() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Pretty_Json_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "{\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"Reader\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"GroupId\": \"\",\n\t\t\"GroupTopics\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"MaxWait\": \"0s\",\n\t\t\"CommitInterval\": \"0s\",\n\t\t\"StartOffset\": \"earliest\"\n\t},\n\t\"BatchConfiguration\": {},\n\t\"MessageGroupDuration\": \"20ns\",\n\t\"TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"RetryEnabled\": true,\n\t\"RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"Topic\": \"\",\n\t\t\"StartTimeCron\": \"\",\n\t\t\"WorkDuration\": \"0s\",\n\t\t\"MaxRetry\": 0,\n\t\t\"VerifyTopicOnStartup\": false,\n\t\t\"Rack\": \"\"\n\t},\n\t\"VerifyTopicOnStartup\": true,\n\t\"Rack\": \"stage\",\n\t\"SASL\": {},\n\t\"TLS\": {}\n}" + // When + result := getConsumerConfigWithoutInnerObjectExample().JsonPretty() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func getConsumerConfigExample() *ConsumerConfig { + return &ConsumerConfig{ + Rack: "stage", + ClientID: "test-consumer-client-id", + Reader: ReaderConfig{ + Brokers: []string{"broker-1.test.com", "broker-2.test.com"}, + GroupID: "test-consumer.0", + GroupTopics: []string{"test-updated.0"}, + MaxWait: 2 * time.Second, + CommitInterval: time.Second, + }, + BatchConfiguration: &BatchConfiguration{ + MessageGroupLimit: 100, + }, + MessageGroupDuration: 20, + TransactionalRetry: NewBoolPtr(false), + Concurrency: 10, + RetryEnabled: true, + RetryConfiguration: RetryConfiguration{ + Brokers: []string{"broker-1.test.com", "broker-2.test.com"}, + Topic: "test-exception.0", + StartTimeCron: "*/2 * * * *", + WorkDuration: time.Minute * 1, + MaxRetry: 3, + VerifyTopicOnStartup: true, + }, + VerifyTopicOnStartup: true, + TLS: &TLSConfig{ + RootCAPath: "resources/ca", + IntermediateCAPath: "resources/intCa", + }, + SASL: &SASLConfig{ + Type: "scram", + Username: "user", + Password: "pass", + }, + } +} + +func getConsumerConfigWithoutInnerObjectExample() *ConsumerConfig { + return &ConsumerConfig{ + Rack: "stage", + ClientID: "test-consumer-client-id", + Reader: ReaderConfig{}, + MessageGroupDuration: 20, + TransactionalRetry: NewBoolPtr(false), + Concurrency: 10, + RetryEnabled: true, + RetryConfiguration: RetryConfiguration{}, + VerifyTopicOnStartup: true, + } +} diff --git a/mechanism.go b/mechanism.go index 5e89bf6..ceb0fe6 100644 --- a/mechanism.go +++ b/mechanism.go @@ -1,6 +1,7 @@ package kafka import ( + "fmt" "github.com/segmentio/kafka-go/sasl" "github.com/segmentio/kafka-go/sasl/plain" "github.com/segmentio/kafka-go/sasl/scram" @@ -37,3 +38,10 @@ func (s *SASLConfig) plain() sasl.Mechanism { func (s *SASLConfig) IsEmpty() bool { return s == nil } + +func (s *SASLConfig) Json() string { + if s == nil { + return "{}" + } + return fmt.Sprintf(`{"Mechanism": %q, "Username": %q, "Password": %q}`, s.Type, s.Username, s.Password) +} diff --git a/mechanism_test.go b/mechanism_test.go new file mode 100644 index 0000000..1436fab --- /dev/null +++ b/mechanism_test.go @@ -0,0 +1,34 @@ +package kafka + +import "testing" + +func TestSASLConfig_Json(t *testing.T) { + t.Run("Should_Convert_Nil_Config_To_Json", func(t *testing.T) { + // Given + var cfg *SASLConfig + + expected := "{}" + // When + result := cfg.Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json", func(t *testing.T) { + // Given + cfg := &SASLConfig{ + Type: "scram", + Username: "user", + Password: "pass", + } + + expected := "{\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}" + // When + result := cfg.Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} diff --git a/producer_config.go b/producer_config.go index 1feeecd..1d8a908 100644 --- a/producer_config.go +++ b/producer_config.go @@ -1,6 +1,9 @@ package kafka import ( + "fmt" + "regexp" + "strings" "time" "go.opentelemetry.io/otel" @@ -29,6 +32,11 @@ type WriterConfig struct { AllowAutoTopicCreation bool } +func (cfg WriterConfig) Json() string { + return fmt.Sprintf(`{"Brokers": ["%s"], "Balancer": %q, "Compression": %q}`, + strings.Join(cfg.Brokers, "\", \""), GetBalancerString(cfg.Balancer), cfg.Compression.String()) +} + type TransportConfig struct { MetadataTopics []string DialTimeout time.Duration @@ -46,6 +54,25 @@ type ProducerConfig struct { DistributedTracingEnabled bool } +func (cfg *ProducerConfig) String() string { + re := regexp.MustCompile(`"(\w+)"\s*:`) + modifiedString := re.ReplaceAllString(cfg.Json(), `$1:`) + modifiedString = modifiedString[1 : len(modifiedString)-1] + return modifiedString +} + +func (cfg *ProducerConfig) Json() string { + if cfg == nil { + return "{}" + } + return fmt.Sprintf(`{"Writer": %s, "ClientID": %q, "DistributedTracingEnabled": %t, "SASL": %s, "TLS": %s}`, + cfg.Writer.Json(), cfg.ClientID, cfg.DistributedTracingEnabled, cfg.SASL.Json(), cfg.TLS.Json()) +} + +func (cfg *ProducerConfig) JsonPretty() string { + return jsonPretty(cfg.Json()) +} + func (cfg *ProducerConfig) newKafkaTransport() (*kafka.Transport, error) { transport := &Transport{ Transport: &kafka.Transport{ diff --git a/producer_config_test.go b/producer_config_test.go index 95bf956..d8f0fbd 100644 --- a/producer_config_test.go +++ b/producer_config_test.go @@ -1,6 +1,9 @@ package kafka -import "testing" +import ( + "github.com/segmentio/kafka-go" + "testing" +) func TestProducerConfig_setDefaults(t *testing.T) { // Given @@ -17,3 +20,109 @@ func TestProducerConfig_setDefaults(t *testing.T) { t.Fatal("Propagator cannot be null") } } + +func TestProducerConfig_Json(t *testing.T) { + t.Run("Should_Convert_Nil_Config_To_Json", func(t *testing.T) { + // Given + var config *ProducerConfig + expected := "{}" + // When + result := config.Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json", func(t *testing.T) { + // Given + expected := "{\"Writer\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], \"Balancer\": \"Hash\", \"Compression\": \"gzip\"}, \"ClientID\": \"test-consumer-client-id\", \"DistributedTracingEnabled\": false, \"SASL\": {\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}, \"TLS\": {\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}}" + // When + result := getProducerConfigExample().Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "{\"Writer\": {\"Brokers\": [\"\"], \"Balancer\": \"Unknown\", \"Compression\": \"uncompressed\"}, \"ClientID\": \"test-consumer-client-id\", \"DistributedTracingEnabled\": false, \"SASL\": {}, \"TLS\": {}}" + // When + result := getProducerConfigWithoutInnerObjectExample().Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func TestProducerConfig_JsonPretty(t *testing.T) { + t.Run("Should_Convert_To_Pretty_Json", func(t *testing.T) { + // Given + expected := "{\n\t\"Writer\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"Balancer\": \"Hash\",\n\t\t\"Compression\": \"gzip\"\n\t},\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"DistributedTracingEnabled\": false,\n\t\"SASL\": {\n\t\t\"Mechanism\": \"scram\",\n\t\t\"Username\": \"user\",\n\t\t\"Password\": \"pass\"\n\t},\n\t\"TLS\": {\n\t\t\"RootCAPath\": \"resources/ca\",\n\t\t\"IntermediateCAPath\": \"resources/intCa\"\n\t}\n}" + // When + result := getProducerConfigExample().JsonPretty() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Pretty_Json_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "{\n\t\"Writer\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"Balancer\": \"Unknown\",\n\t\t\"Compression\": \"uncompressed\"\n\t},\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"DistributedTracingEnabled\": false,\n\t\"SASL\": {},\n\t\"TLS\": {}\n}" + // When + result := getProducerConfigWithoutInnerObjectExample().JsonPretty() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func TestProducerConfig_String(t *testing.T) { + t.Run("Should_Convert_To_String", func(t *testing.T) { + // Given + expected := "Writer: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], Balancer: \"Hash\", Compression: \"gzip\"}, ClientID: \"test-consumer-client-id\", DistributedTracingEnabled: false, SASL: {Mechanism: \"scram\", Username: \"user\", Password: \"pass\"}, TLS: {RootCAPath: \"resources/ca\", IntermediateCAPath: \"resources/intCa\"}" + // When + result := getProducerConfigExample().String() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_String_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "Writer: {Brokers: [\"\"], Balancer: \"Unknown\", Compression: \"uncompressed\"}, ClientID: \"test-consumer-client-id\", DistributedTracingEnabled: false, SASL: {}, TLS: {}" + // When + result := getProducerConfigWithoutInnerObjectExample().String() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func getProducerConfigExample() *ProducerConfig { + return &ProducerConfig{ + ClientID: "test-consumer-client-id", + Writer: WriterConfig{ + Balancer: GetBalancerHash(), + Brokers: []string{"broker-1.test.com", "broker-2.test.com"}, + Compression: kafka.Gzip, + }, + TLS: &TLSConfig{ + RootCAPath: "resources/ca", + IntermediateCAPath: "resources/intCa", + }, + SASL: &SASLConfig{ + Type: "scram", + Username: "user", + Password: "pass", + }, + } +} + +func getProducerConfigWithoutInnerObjectExample() *ProducerConfig { + return &ProducerConfig{ + ClientID: "test-consumer-client-id", + } +} diff --git a/tls.go b/tls.go index d6ebf6b..dbb08e7 100644 --- a/tls.go +++ b/tls.go @@ -33,3 +33,10 @@ func (c *TLSConfig) TLSConfig() (*tls.Config, error) { func (c *TLSConfig) IsEmpty() bool { return c == nil || c.RootCAPath == "" && c.IntermediateCAPath == "" } + +func (c *TLSConfig) Json() string { + if c == nil { + return "{}" + } + return fmt.Sprintf(`{"RootCAPath": %q, "IntermediateCAPath": %q}`, c.RootCAPath, c.IntermediateCAPath) +} diff --git a/tls_test.go b/tls_test.go index e3351e0..f138507 100644 --- a/tls_test.go +++ b/tls_test.go @@ -65,3 +65,33 @@ func TestTLSConfig_IsEmpty(t *testing.T) { }) } } + +func TestTLSConfig_Json(t *testing.T) { + t.Run("Should_Convert_Nil_Config_To_Json", func(t *testing.T) { + // Given + var cfg *TLSConfig + + expected := "{}" + // When + result := cfg.Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json", func(t *testing.T) { + // Given + cfg := &TLSConfig{ + RootCAPath: "resources/ca", + IntermediateCAPath: "resources/intCa", + } + + expected := "{\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}" + // When + result := cfg.Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} From 5e312bea5ba8743c75fcf50c893ffec1833de2d2 Mon Sep 17 00:00:00 2001 From: oktaykcr Date: Sat, 24 Aug 2024 21:42:10 +0300 Subject: [PATCH 2/4] feat: write json and string methods for producer and consumer configs (#140) --- balancer_test.go | 1 - consumer_config.go | 30 ++++++++++-------- consumer_config_test.go | 69 +++++++++++++++++++++++++++++++++-------- mechanism.go | 3 +- mechanism_test.go | 4 +-- producer_config.go | 10 +++--- producer_config_test.go | 40 +++++++++++++++++------- tls.go | 2 +- tls_test.go | 4 +-- 9 files changed, 113 insertions(+), 50 deletions(-) diff --git a/balancer_test.go b/balancer_test.go index 331f6f6..39d3734 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -66,7 +66,6 @@ func TestGetBalancerRoundRobinh(t *testing.T) { } func TestGetBalancerString(t *testing.T) { - tests := []struct { name string balancer Balancer diff --git a/consumer_config.go b/consumer_config.go index ceec7b1..7a18da1 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -66,43 +66,47 @@ type ConsumerConfig struct { MetricPrefix string } -func (cfg RetryConfiguration) Json() string { - return fmt.Sprintf(`{"Brokers": ["%s"], "Topic": %q, "StartTimeCron": %q, "WorkDuration": %q, "MaxRetry": %d, "VerifyTopicOnStartup": %t, "Rack": %q}`, +func (cfg RetryConfiguration) JSON() string { + return fmt.Sprintf(`{"Brokers": ["%s"], "Topic": %q, "StartTimeCron": %q, "WorkDuration": %q, `+ + `"MaxRetry": %d, "VerifyTopicOnStartup": %t, "Rack": %q}`, strings.Join(cfg.Brokers, "\", \""), cfg.Topic, cfg.StartTimeCron, cfg.WorkDuration, cfg.MaxRetry, cfg.VerifyTopicOnStartup, cfg.Rack) } -func (cfg *BatchConfiguration) Json() string { +func (cfg *BatchConfiguration) JSON() string { if cfg == nil { return "{}" } return fmt.Sprintf(`{"MessageGroupLimit": %d}`, cfg.MessageGroupLimit) } -func (cfg ReaderConfig) Json() string { - return fmt.Sprintf(`{"Brokers": ["%s"], "GroupId": %q, "GroupTopics": ["%s"], "MaxWait": %q, "CommitInterval": %q, "StartOffset": %q}`, +func (cfg ReaderConfig) JSON() string { + return fmt.Sprintf(`{"Brokers": ["%s"], "GroupId": %q, "GroupTopics": ["%s"], `+ + `"MaxWait": %q, "CommitInterval": %q, "StartOffset": %q}`, strings.Join(cfg.Brokers, "\", \""), cfg.GroupID, strings.Join(cfg.GroupTopics, "\", \""), cfg.MaxWait, cfg.CommitInterval, kcronsumer.ToStringOffset(cfg.StartOffset)) } -func (cfg *ConsumerConfig) Json() string { +func (cfg *ConsumerConfig) JSON() string { if cfg == nil { return "{}" } - return fmt.Sprintf(`{"ClientID": %q, "Reader": %s, "BatchConfiguration": %s, "MessageGroupDuration": %q, "TransactionalRetry": %t, "Concurrency": %d, "RetryEnabled": %t, "RetryConfiguration": %s, "VerifyTopicOnStartup": %t, "Rack": %q, "SASL": %s, "TLS": %s}`, - cfg.ClientID, cfg.Reader.Json(), cfg.BatchConfiguration.Json(), + return fmt.Sprintf(`{"ClientID": %q, "Reader": %s, "BatchConfiguration": %s, "MessageGroupDuration": %q, `+ + `"TransactionalRetry": %t, "Concurrency": %d, "RetryEnabled": %t, "RetryConfiguration": %s, `+ + `"VerifyTopicOnStartup": %t, "Rack": %q, "SASL": %s, "TLS": %s}`, + cfg.ClientID, cfg.Reader.JSON(), cfg.BatchConfiguration.JSON(), cfg.MessageGroupDuration, *cfg.TransactionalRetry, cfg.Concurrency, - cfg.RetryEnabled, cfg.RetryConfiguration.Json(), cfg.VerifyTopicOnStartup, - cfg.Rack, cfg.SASL.Json(), cfg.TLS.Json()) + cfg.RetryEnabled, cfg.RetryConfiguration.JSON(), cfg.VerifyTopicOnStartup, + cfg.Rack, cfg.SASL.JSON(), cfg.TLS.JSON()) } -func (cfg *ConsumerConfig) JsonPretty() string { - return jsonPretty(cfg.Json()) +func (cfg *ConsumerConfig) JSONPretty() string { + return jsonPretty(cfg.JSON()) } func (cfg *ConsumerConfig) String() string { re := regexp.MustCompile(`"(\w+)"\s*:`) - modifiedString := re.ReplaceAllString(cfg.Json(), `$1:`) + modifiedString := re.ReplaceAllString(cfg.JSON(), `$1:`) modifiedString = modifiedString[1 : len(modifiedString)-1] return modifiedString } diff --git a/consumer_config_test.go b/consumer_config_test.go index 2fc9586..65c21dc 100644 --- a/consumer_config_test.go +++ b/consumer_config_test.go @@ -228,13 +228,13 @@ func Test_jsonPretty(t *testing.T) { } } -func TestConsumerConfig_Json(t *testing.T) { +func TestConsumerConfig_JSON(t *testing.T) { t.Run("Should_Convert_Nil_Config_To_Json", func(t *testing.T) { // Given var config *ConsumerConfig expected := "{}" // When - result := config.Json() + result := config.JSON() // Then if result != expected { t.Fatal("result must be equal to expected") @@ -242,9 +242,17 @@ func TestConsumerConfig_Json(t *testing.T) { }) t.Run("Should_Convert_To_Json", func(t *testing.T) { // Given - expected := "{\"ClientID\": \"test-consumer-client-id\", \"Reader\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], \"GroupId\": \"test-consumer.0\", \"GroupTopics\": [\"test-updated.0\"], \"MaxWait\": \"2s\", \"CommitInterval\": \"1s\", \"StartOffset\": \"earliest\"}, \"BatchConfiguration\": {\"MessageGroupLimit\": 100}, \"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, \"RetryEnabled\": true, \"RetryConfiguration\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], \"Topic\": \"test-exception.0\", \"StartTimeCron\": \"*/2 * * * *\", \"WorkDuration\": \"1m0s\", \"MaxRetry\": 3, \"VerifyTopicOnStartup\": true, \"Rack\": \"\"}, \"VerifyTopicOnStartup\": true, \"Rack\": \"stage\", \"SASL\": {\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}, \"TLS\": {\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}}" + expected := "{\"ClientID\": \"test-consumer-client-id\", \"Reader\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], " + + "\"GroupId\": \"test-consumer.0\", \"GroupTopics\": [\"test-updated.0\"], \"MaxWait\": \"2s\", " + + "\"CommitInterval\": \"1s\", \"StartOffset\": \"earliest\"}, \"BatchConfiguration\": {\"MessageGroupLimit\": 100}, " + + "\"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, \"RetryEnabled\": true, " + + "\"RetryConfiguration\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], \"Topic\": \"test-exception.0\", " + + "\"StartTimeCron\": \"*/2 * * * *\", \"WorkDuration\": \"1m0s\", \"MaxRetry\": 3, \"VerifyTopicOnStartup\": true, \"Rack\": \"\"}, " + + "\"VerifyTopicOnStartup\": true, \"Rack\": \"stage\", " + + "\"SASL\": {\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}, " + + "\"TLS\": {\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}}" // When - result := getConsumerConfigExample().Json() + result := getConsumerConfigExample().JSON() // Then if result != expected { t.Fatal("result must be equal to expected") @@ -252,9 +260,14 @@ func TestConsumerConfig_Json(t *testing.T) { }) t.Run("Should_Convert_To_Json_Without_Inner_Object", func(t *testing.T) { // Given - expected := "{\"ClientID\": \"test-consumer-client-id\", \"Reader\": {\"Brokers\": [\"\"], \"GroupId\": \"\", \"GroupTopics\": [\"\"], \"MaxWait\": \"0s\", \"CommitInterval\": \"0s\", \"StartOffset\": \"earliest\"}, \"BatchConfiguration\": {}, \"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, \"RetryEnabled\": true, \"RetryConfiguration\": {\"Brokers\": [\"\"], \"Topic\": \"\", \"StartTimeCron\": \"\", \"WorkDuration\": \"0s\", \"MaxRetry\": 0, \"VerifyTopicOnStartup\": false, \"Rack\": \"\"}, \"VerifyTopicOnStartup\": true, \"Rack\": \"stage\", \"SASL\": {}, \"TLS\": {}}" + expected := "{\"ClientID\": \"test-consumer-client-id\", \"Reader\": {\"Brokers\": [\"\"], \"GroupId\": \"\", " + + "\"GroupTopics\": [\"\"], \"MaxWait\": \"0s\", \"CommitInterval\": \"0s\", \"StartOffset\": \"earliest\"}, " + + "\"BatchConfiguration\": {}, \"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, " + + "\"RetryEnabled\": true, \"RetryConfiguration\": {\"Brokers\": [\"\"], \"Topic\": \"\", \"StartTimeCron\": \"\", " + + "\"WorkDuration\": \"0s\", \"MaxRetry\": 0, \"VerifyTopicOnStartup\": false, \"Rack\": \"\"}, \"VerifyTopicOnStartup\": true, " + + "\"Rack\": \"stage\", \"SASL\": {}, \"TLS\": {}}" // When - result := getConsumerConfigWithoutInnerObjectExample().Json() + result := getConsumerConfigWithoutInnerObjectExample().JSON() // Then if result != expected { t.Fatal("result must be equal to expected") @@ -265,7 +278,14 @@ func TestConsumerConfig_Json(t *testing.T) { func TestConsumerConfig_String(t *testing.T) { t.Run("Should_Convert_To_String", func(t *testing.T) { // Given - expected := "ClientID: \"test-consumer-client-id\", Reader: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], GroupId: \"test-consumer.0\", GroupTopics: [\"test-updated.0\"], MaxWait: \"2s\", CommitInterval: \"1s\", StartOffset: \"earliest\"}, BatchConfiguration: {MessageGroupLimit: 100}, MessageGroupDuration: \"20ns\", TransactionalRetry: false, Concurrency: 10, RetryEnabled: true, RetryConfiguration: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], Topic: \"test-exception.0\", StartTimeCron: \"*/2 * * * *\", WorkDuration: \"1m0s\", MaxRetry: 3, VerifyTopicOnStartup: true, Rack: \"\"}, VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {Mechanism: \"scram\", Username: \"user\", Password: \"pass\"}, TLS: {RootCAPath: \"resources/ca\", IntermediateCAPath: \"resources/intCa\"}" + expected := "ClientID: \"test-consumer-client-id\", Reader: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], " + + "GroupId: \"test-consumer.0\", GroupTopics: [\"test-updated.0\"], MaxWait: \"2s\", CommitInterval: \"1s\", " + + "StartOffset: \"earliest\"}, BatchConfiguration: {MessageGroupLimit: 100}, MessageGroupDuration: \"20ns\", " + + "TransactionalRetry: false, Concurrency: 10, RetryEnabled: true, " + + "RetryConfiguration: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], Topic: \"test-exception.0\", " + + "StartTimeCron: \"*/2 * * * *\", WorkDuration: \"1m0s\", MaxRetry: 3, VerifyTopicOnStartup: true, Rack: \"\"}, " + + "VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {Mechanism: \"scram\", Username: \"user\", Password: \"pass\"}, " + + "TLS: {RootCAPath: \"resources/ca\", IntermediateCAPath: \"resources/intCa\"}" // When result := getConsumerConfigExample().String() // Then @@ -275,7 +295,11 @@ func TestConsumerConfig_String(t *testing.T) { }) t.Run("Should_Convert_To_String_Without_Inner_Object", func(t *testing.T) { // Given - expected := "ClientID: \"test-consumer-client-id\", Reader: {Brokers: [\"\"], GroupId: \"\", GroupTopics: [\"\"], MaxWait: \"0s\", CommitInterval: \"0s\", StartOffset: \"earliest\"}, BatchConfiguration: {}, MessageGroupDuration: \"20ns\", TransactionalRetry: false, Concurrency: 10, RetryEnabled: true, RetryConfiguration: {Brokers: [\"\"], Topic: \"\", StartTimeCron: \"\", WorkDuration: \"0s\", MaxRetry: 0, VerifyTopicOnStartup: false, Rack: \"\"}, VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {}, TLS: {}" + expected := "ClientID: \"test-consumer-client-id\", Reader: {Brokers: [\"\"], GroupId: \"\", " + + "GroupTopics: [\"\"], MaxWait: \"0s\", CommitInterval: \"0s\", StartOffset: \"earliest\"}, " + + "BatchConfiguration: {}, MessageGroupDuration: \"20ns\", TransactionalRetry: false, Concurrency: 10, " + + "RetryEnabled: true, RetryConfiguration: {Brokers: [\"\"], Topic: \"\", StartTimeCron: \"\", WorkDuration: \"0s\", " + + "MaxRetry: 0, VerifyTopicOnStartup: false, Rack: \"\"}, VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {}, TLS: {}" // When result := getConsumerConfigWithoutInnerObjectExample().String() // Then @@ -285,12 +309,23 @@ func TestConsumerConfig_String(t *testing.T) { }) } -func TestConsumerConfig_JsonPretty(t *testing.T) { +func TestConsumerConfig_JSONPretty(t *testing.T) { t.Run("Should_Convert_To_Pretty_Json", func(t *testing.T) { // Given - expected := "{\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"Reader\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"GroupId\": \"test-consumer.0\",\n\t\t\"GroupTopics\": [\n\t\t\t\"test-updated.0\"\n\t\t],\n\t\t\"MaxWait\": \"2s\",\n\t\t\"CommitInterval\": \"1s\",\n\t\t\"StartOffset\": \"earliest\"\n\t},\n\t\"BatchConfiguration\": {\n\t\t\"MessageGroupLimit\": 100\n\t},\n\t\"MessageGroupDuration\": \"20ns\",\n\t\"TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"RetryEnabled\": true,\n\t\"RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"Topic\": \"test-exception.0\",\n\t\t\"StartTimeCron\": \"*/2 * * * *\",\n\t\t\"WorkDuration\": \"1m0s\",\n\t\t\"MaxRetry\": 3,\n\t\t\"VerifyTopicOnStartup\": true,\n\t\t\"Rack\": \"\"\n\t},\n\t\"VerifyTopicOnStartup\": true,\n\t\"Rack\": \"stage\",\n\t\"SASL\": {\n\t\t\"Mechanism\": \"scram\",\n\t\t\"Username\": \"user\",\n\t\t\"Password\": \"pass\"\n\t},\n\t\"TLS\": {\n\t\t\"RootCAPath\": \"resources/ca\",\n\t\t\"IntermediateCAPath\": \"resources/intCa\"\n\t}\n}" + expected := "{\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"Reader\": {\n\t\t\"" + + "Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"" + + "GroupId\": \"test-consumer.0\",\n\t\t\"GroupTopics\": [\n\t\t\t\"test-updated.0\"\n\t\t],\n\t\t\"" + + "MaxWait\": \"2s\",\n\t\t\"CommitInterval\": \"1s\",\n\t\t\"StartOffset\": \"earliest\"\n\t},\n\t\"" + + "BatchConfiguration\": {\n\t\t\"MessageGroupLimit\": 100\n\t},\n\t\"MessageGroupDuration\": \"20ns\",\n\t\"" + + "TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"RetryEnabled\": true,\n\t\"" + + "RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"" + + "Topic\": \"test-exception.0\",\n\t\t\"StartTimeCron\": \"*/2 * * * *\",\n\t\t\"WorkDuration\": \"1m0s\",\n\t\t\"" + + "MaxRetry\": 3,\n\t\t\"VerifyTopicOnStartup\": true,\n\t\t\"Rack\": \"\"\n\t},\n\t\"" + + "VerifyTopicOnStartup\": true,\n\t\"Rack\": \"stage\",\n\t\"" + + "SASL\": {\n\t\t\"Mechanism\": \"scram\",\n\t\t\"Username\": \"user\",\n\t\t\"Password\": \"pass\"\n\t},\n\t\"" + + "TLS\": {\n\t\t\"RootCAPath\": \"resources/ca\",\n\t\t\"IntermediateCAPath\": \"resources/intCa\"\n\t}\n}" // When - result := getConsumerConfigExample().JsonPretty() + result := getConsumerConfigExample().JSONPretty() // Then if result != expected { t.Fatal("result must be equal to expected") @@ -298,9 +333,17 @@ func TestConsumerConfig_JsonPretty(t *testing.T) { }) t.Run("Should_Convert_To_Pretty_Json_Without_Inner_Object", func(t *testing.T) { // Given - expected := "{\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"Reader\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"GroupId\": \"\",\n\t\t\"GroupTopics\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"MaxWait\": \"0s\",\n\t\t\"CommitInterval\": \"0s\",\n\t\t\"StartOffset\": \"earliest\"\n\t},\n\t\"BatchConfiguration\": {},\n\t\"MessageGroupDuration\": \"20ns\",\n\t\"TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"RetryEnabled\": true,\n\t\"RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"Topic\": \"\",\n\t\t\"StartTimeCron\": \"\",\n\t\t\"WorkDuration\": \"0s\",\n\t\t\"MaxRetry\": 0,\n\t\t\"VerifyTopicOnStartup\": false,\n\t\t\"Rack\": \"\"\n\t},\n\t\"VerifyTopicOnStartup\": true,\n\t\"Rack\": \"stage\",\n\t\"SASL\": {},\n\t\"TLS\": {}\n}" + expected := "{\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"" + + "Reader\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"GroupId\": \"\",\n\t\t\"" + + "GroupTopics\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"MaxWait\": \"0s\",\n\t\t\"CommitInterval\": \"0s\",\n\t\t\"" + + "StartOffset\": \"earliest\"\n\t},\n\t\"BatchConfiguration\": {},\n\t\"" + + "MessageGroupDuration\": \"20ns\",\n\t\"TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"" + + "RetryEnabled\": true,\n\t\"RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"" + + "Topic\": \"\",\n\t\t\"StartTimeCron\": \"\",\n\t\t\"WorkDuration\": \"0s\",\n\t\t\"MaxRetry\": 0,\n\t\t\"" + + "VerifyTopicOnStartup\": false,\n\t\t\"Rack\": \"\"\n\t},\n\t\"VerifyTopicOnStartup\": true,\n\t\"" + + "Rack\": \"stage\",\n\t\"SASL\": {},\n\t\"TLS\": {}\n}" // When - result := getConsumerConfigWithoutInnerObjectExample().JsonPretty() + result := getConsumerConfigWithoutInnerObjectExample().JSONPretty() // Then if result != expected { t.Fatal("result must be equal to expected") diff --git a/mechanism.go b/mechanism.go index ceb0fe6..65cf284 100644 --- a/mechanism.go +++ b/mechanism.go @@ -2,6 +2,7 @@ package kafka import ( "fmt" + "github.com/segmentio/kafka-go/sasl" "github.com/segmentio/kafka-go/sasl/plain" "github.com/segmentio/kafka-go/sasl/scram" @@ -39,7 +40,7 @@ func (s *SASLConfig) IsEmpty() bool { return s == nil } -func (s *SASLConfig) Json() string { +func (s *SASLConfig) JSON() string { if s == nil { return "{}" } diff --git a/mechanism_test.go b/mechanism_test.go index 1436fab..1c93ed0 100644 --- a/mechanism_test.go +++ b/mechanism_test.go @@ -9,7 +9,7 @@ func TestSASLConfig_Json(t *testing.T) { expected := "{}" // When - result := cfg.Json() + result := cfg.JSON() // Then if result != expected { t.Fatal("result must be equal to expected") @@ -25,7 +25,7 @@ func TestSASLConfig_Json(t *testing.T) { expected := "{\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}" // When - result := cfg.Json() + result := cfg.JSON() // Then if result != expected { t.Fatal("result must be equal to expected") diff --git a/producer_config.go b/producer_config.go index 1d8a908..8c88e96 100644 --- a/producer_config.go +++ b/producer_config.go @@ -56,21 +56,21 @@ type ProducerConfig struct { func (cfg *ProducerConfig) String() string { re := regexp.MustCompile(`"(\w+)"\s*:`) - modifiedString := re.ReplaceAllString(cfg.Json(), `$1:`) + modifiedString := re.ReplaceAllString(cfg.JSON(), `$1:`) modifiedString = modifiedString[1 : len(modifiedString)-1] return modifiedString } -func (cfg *ProducerConfig) Json() string { +func (cfg *ProducerConfig) JSON() string { if cfg == nil { return "{}" } return fmt.Sprintf(`{"Writer": %s, "ClientID": %q, "DistributedTracingEnabled": %t, "SASL": %s, "TLS": %s}`, - cfg.Writer.Json(), cfg.ClientID, cfg.DistributedTracingEnabled, cfg.SASL.Json(), cfg.TLS.Json()) + cfg.Writer.Json(), cfg.ClientID, cfg.DistributedTracingEnabled, cfg.SASL.JSON(), cfg.TLS.JSON()) } -func (cfg *ProducerConfig) JsonPretty() string { - return jsonPretty(cfg.Json()) +func (cfg *ProducerConfig) JSONPretty() string { + return jsonPretty(cfg.JSON()) } func (cfg *ProducerConfig) newKafkaTransport() (*kafka.Transport, error) { diff --git a/producer_config_test.go b/producer_config_test.go index d8f0fbd..0cf6388 100644 --- a/producer_config_test.go +++ b/producer_config_test.go @@ -1,8 +1,9 @@ package kafka import ( - "github.com/segmentio/kafka-go" "testing" + + "github.com/segmentio/kafka-go" ) func TestProducerConfig_setDefaults(t *testing.T) { @@ -27,7 +28,7 @@ func TestProducerConfig_Json(t *testing.T) { var config *ProducerConfig expected := "{}" // When - result := config.Json() + result := config.JSON() // Then if result != expected { t.Fatal("result must be equal to expected") @@ -35,9 +36,13 @@ func TestProducerConfig_Json(t *testing.T) { }) t.Run("Should_Convert_To_Json", func(t *testing.T) { // Given - expected := "{\"Writer\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], \"Balancer\": \"Hash\", \"Compression\": \"gzip\"}, \"ClientID\": \"test-consumer-client-id\", \"DistributedTracingEnabled\": false, \"SASL\": {\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}, \"TLS\": {\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}}" + expected := "{\"Writer\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], " + + "\"Balancer\": \"Hash\", \"Compression\": \"gzip\"}, \"ClientID\": \"test-consumer-client-id\", " + + "\"DistributedTracingEnabled\": false, " + + "\"SASL\": {\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}, " + + "\"TLS\": {\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}}" // When - result := getProducerConfigExample().Json() + result := getProducerConfigExample().JSON() // Then if result != expected { t.Fatal("result must be equal to expected") @@ -45,9 +50,10 @@ func TestProducerConfig_Json(t *testing.T) { }) t.Run("Should_Convert_To_Json_Without_Inner_Object", func(t *testing.T) { // Given - expected := "{\"Writer\": {\"Brokers\": [\"\"], \"Balancer\": \"Unknown\", \"Compression\": \"uncompressed\"}, \"ClientID\": \"test-consumer-client-id\", \"DistributedTracingEnabled\": false, \"SASL\": {}, \"TLS\": {}}" + expected := "{\"Writer\": {\"Brokers\": [\"\"], \"Balancer\": \"Unknown\", \"Compression\": \"uncompressed\"}, " + + "\"ClientID\": \"test-consumer-client-id\", \"DistributedTracingEnabled\": false, \"SASL\": {}, \"TLS\": {}}" // When - result := getProducerConfigWithoutInnerObjectExample().Json() + result := getProducerConfigWithoutInnerObjectExample().JSON() // Then if result != expected { t.Fatal("result must be equal to expected") @@ -58,9 +64,13 @@ func TestProducerConfig_Json(t *testing.T) { func TestProducerConfig_JsonPretty(t *testing.T) { t.Run("Should_Convert_To_Pretty_Json", func(t *testing.T) { // Given - expected := "{\n\t\"Writer\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"Balancer\": \"Hash\",\n\t\t\"Compression\": \"gzip\"\n\t},\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"DistributedTracingEnabled\": false,\n\t\"SASL\": {\n\t\t\"Mechanism\": \"scram\",\n\t\t\"Username\": \"user\",\n\t\t\"Password\": \"pass\"\n\t},\n\t\"TLS\": {\n\t\t\"RootCAPath\": \"resources/ca\",\n\t\t\"IntermediateCAPath\": \"resources/intCa\"\n\t}\n}" + expected := "{\n\t\"Writer\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"" + + "Balancer\": \"Hash\",\n\t\t\"Compression\": \"gzip\"\n\t},\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"" + + "DistributedTracingEnabled\": false,\n\t\"" + + "SASL\": {\n\t\t\"Mechanism\": \"scram\",\n\t\t\"Username\": \"user\",\n\t\t\"Password\": \"pass\"\n\t},\n\t\"" + + "TLS\": {\n\t\t\"RootCAPath\": \"resources/ca\",\n\t\t\"IntermediateCAPath\": \"resources/intCa\"\n\t}\n}" // When - result := getProducerConfigExample().JsonPretty() + result := getProducerConfigExample().JSONPretty() // Then if result != expected { t.Fatal("result must be equal to expected") @@ -68,9 +78,11 @@ func TestProducerConfig_JsonPretty(t *testing.T) { }) t.Run("Should_Convert_To_Pretty_Json_Without_Inner_Object", func(t *testing.T) { // Given - expected := "{\n\t\"Writer\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"Balancer\": \"Unknown\",\n\t\t\"Compression\": \"uncompressed\"\n\t},\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"DistributedTracingEnabled\": false,\n\t\"SASL\": {},\n\t\"TLS\": {}\n}" + expected := "{\n\t\"Writer\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"Balancer\": \"Unknown\",\n\t\t\"" + + "Compression\": \"uncompressed\"\n\t},\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"" + + "DistributedTracingEnabled\": false,\n\t\"SASL\": {},\n\t\"TLS\": {}\n}" // When - result := getProducerConfigWithoutInnerObjectExample().JsonPretty() + result := getProducerConfigWithoutInnerObjectExample().JSONPretty() // Then if result != expected { t.Fatal("result must be equal to expected") @@ -81,7 +93,10 @@ func TestProducerConfig_JsonPretty(t *testing.T) { func TestProducerConfig_String(t *testing.T) { t.Run("Should_Convert_To_String", func(t *testing.T) { // Given - expected := "Writer: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], Balancer: \"Hash\", Compression: \"gzip\"}, ClientID: \"test-consumer-client-id\", DistributedTracingEnabled: false, SASL: {Mechanism: \"scram\", Username: \"user\", Password: \"pass\"}, TLS: {RootCAPath: \"resources/ca\", IntermediateCAPath: \"resources/intCa\"}" + expected := "Writer: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], " + + "Balancer: \"Hash\", Compression: \"gzip\"}, ClientID: \"test-consumer-client-id\", " + + "DistributedTracingEnabled: false, SASL: {Mechanism: \"scram\", Username: \"user\", Password: \"pass\"}, " + + "TLS: {RootCAPath: \"resources/ca\", IntermediateCAPath: \"resources/intCa\"}" // When result := getProducerConfigExample().String() // Then @@ -91,7 +106,8 @@ func TestProducerConfig_String(t *testing.T) { }) t.Run("Should_Convert_To_String_Without_Inner_Object", func(t *testing.T) { // Given - expected := "Writer: {Brokers: [\"\"], Balancer: \"Unknown\", Compression: \"uncompressed\"}, ClientID: \"test-consumer-client-id\", DistributedTracingEnabled: false, SASL: {}, TLS: {}" + expected := "Writer: {Brokers: [\"\"], Balancer: \"Unknown\", Compression: \"uncompressed\"}, " + + "ClientID: \"test-consumer-client-id\", DistributedTracingEnabled: false, SASL: {}, TLS: {}" // When result := getProducerConfigWithoutInnerObjectExample().String() // Then diff --git a/tls.go b/tls.go index dbb08e7..8ffaa8e 100644 --- a/tls.go +++ b/tls.go @@ -34,7 +34,7 @@ func (c *TLSConfig) IsEmpty() bool { return c == nil || c.RootCAPath == "" && c.IntermediateCAPath == "" } -func (c *TLSConfig) Json() string { +func (c *TLSConfig) JSON() string { if c == nil { return "{}" } diff --git a/tls_test.go b/tls_test.go index f138507..e49db51 100644 --- a/tls_test.go +++ b/tls_test.go @@ -73,7 +73,7 @@ func TestTLSConfig_Json(t *testing.T) { expected := "{}" // When - result := cfg.Json() + result := cfg.JSON() // Then if result != expected { t.Fatal("result must be equal to expected") @@ -88,7 +88,7 @@ func TestTLSConfig_Json(t *testing.T) { expected := "{\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}" // When - result := cfg.Json() + result := cfg.JSON() // Then if result != expected { t.Fatal("result must be equal to expected") From a55af9dc08641a6c5ea539da4c44090edfbd86e4 Mon Sep 17 00:00:00 2001 From: oktaykcr Date: Sat, 24 Aug 2024 22:02:44 +0300 Subject: [PATCH 3/4] feat: write json and string methods for producer and consumer configs (#140) --- producer_config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/producer_config.go b/producer_config.go index 8c88e96..7af1195 100644 --- a/producer_config.go +++ b/producer_config.go @@ -32,7 +32,7 @@ type WriterConfig struct { AllowAutoTopicCreation bool } -func (cfg WriterConfig) Json() string { +func (cfg WriterConfig) JSON() string { return fmt.Sprintf(`{"Brokers": ["%s"], "Balancer": %q, "Compression": %q}`, strings.Join(cfg.Brokers, "\", \""), GetBalancerString(cfg.Balancer), cfg.Compression.String()) } @@ -66,7 +66,7 @@ func (cfg *ProducerConfig) JSON() string { return "{}" } return fmt.Sprintf(`{"Writer": %s, "ClientID": %q, "DistributedTracingEnabled": %t, "SASL": %s, "TLS": %s}`, - cfg.Writer.Json(), cfg.ClientID, cfg.DistributedTracingEnabled, cfg.SASL.JSON(), cfg.TLS.JSON()) + cfg.Writer.JSON(), cfg.ClientID, cfg.DistributedTracingEnabled, cfg.SASL.JSON(), cfg.TLS.JSON()) } func (cfg *ProducerConfig) JSONPretty() string { From aa7faf9d9bf6b40258c6e1c40ac43f3acc40a59e Mon Sep 17 00:00:00 2001 From: oktaykcr Date: Sat, 24 Aug 2024 22:06:39 +0300 Subject: [PATCH 4/4] feat: write json and string methods for producer and consumer configs (#140) --- consumer_config_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/consumer_config_test.go b/consumer_config_test.go index 65c21dc..ebf9878 100644 --- a/consumer_config_test.go +++ b/consumer_config_test.go @@ -202,9 +202,11 @@ func Test_jsonPretty(t *testing.T) { expected: "{\n\t\"key1\": \"value1\",\n\t\"key2\": 2\n}", }, { - name: "Nested JSON", - input: `{"key1":"value1","key2":{"nestedKey1":1,"nestedKey2":2},"key3":[1,2,3]}`, - expected: "{\n\t\"key1\": \"value1\",\n\t\"key2\": {\n\t\t\"nestedKey1\": 1,\n\t\t\"nestedKey2\": 2\n\t},\n\t\"key3\": [\n\t\t1,\n\t\t2,\n\t\t3\n\t]\n}", + name: "Nested JSON", + input: `{"key1":"value1","key2":{"nestedKey1":1,"nestedKey2":2},"key3":[1,2,3]}`, + expected: "{\n\t\"key1\": \"value1\",\n\t\"" + + "key2\": {\n\t\t\"nestedKey1\": 1,\n\t\t\"nestedKey2\": 2\n\t},\n\t\"" + + "key3\": [\n\t\t1,\n\t\t2,\n\t\t3\n\t]\n}", }, { name: "Invalid JSON",