diff --git a/kafka/resource_kafka_topic.go b/kafka/resource_kafka_topic.go index 6e01b3f8..c56b170d 100644 --- a/kafka/resource_kafka_topic.go +++ b/kafka/resource_kafka_topic.go @@ -26,7 +26,7 @@ func kafkaTopicResource() *schema.Resource { Importer: &schema.ResourceImporter{ State: schema.ImportStatePassthrough, }, - CustomizeDiff: customPartitionDiff, + CustomizeDiff: customDiff, Schema: map[string]*schema.Schema{ "name": { Type: schema.TypeString, @@ -200,7 +200,7 @@ func topicRead(d *schema.ResourceData, meta interface{}) error { return nil } -func customPartitionDiff(diff *schema.ResourceDiff, v interface{}) error { +func customDiff(diff *schema.ResourceDiff, v interface{}) error { log.Printf("[INFO] Checking the diff!") if diff.HasChange("partitions") { log.Printf("[INFO] Partitions have changed!") @@ -212,7 +212,19 @@ func customPartitionDiff(diff *schema.ResourceDiff, v interface{}) error { log.Printf("Partitions decreased from %d to %d. Forcing new resource", oi, ni) diff.ForceNew("partitions") } - + } + if diff.HasChange("config") { + log.Printf("[INFO] Config has changed!") + o, n := diff.GetChange("config") + om := o.(map[string]interface{}) + nm := n.(map[string]interface{}) + log.Printf("Config changed from %+v to %+v", om, nm) + if val, changed := nm["cleanup.policy"]; changed { + if val == "compact" { + log.Printf("Cleanup policy changed to `compact`. Forcing new resource") + diff.ForceNew("config") + } + } } return nil } diff --git a/kafka/resource_kafka_topic_test.go b/kafka/resource_kafka_topic_test.go index c925bb57..2870ffc5 100644 --- a/kafka/resource_kafka_topic_test.go +++ b/kafka/resource_kafka_topic_test.go @@ -74,6 +74,56 @@ func TestAccTopicUpdatePartitions(t *testing.T) { }) } +func TestAccTopicDecreasePartitions(t *testing.T) { + u, err := uuid.GenerateUUID() + if err != nil { + t.Fatal(err) + } + topicName := fmt.Sprintf("syslog-%s", u) + + r.Test(t, r.TestCase{ + Providers: accProvider(), + PreCheck: func() { testAccPreCheck(t) }, + Steps: []r.TestStep{ + { + Config: fmt.Sprintf(testResourceTopic_initialConfig, topicName), + Check: testResourceTopic_initialCheck, + }, + { + Config: fmt.Sprintf(testResourceTopic_updatePartitions, topicName), + Check: testResourceTopic_updatePartitionsCheck, + }, + { + Config: fmt.Sprintf(testResourceTopic_initialConfig, topicName), + Check: testResourceTopic_decreasePartitionsCheck, + }, + }, + }) +} + +func TestAccTopicUpdateCleanupPolicy(t *testing.T) { + u, err := uuid.GenerateUUID() + if err != nil { + t.Fatal(err) + } + topicName := fmt.Sprintf("syslog-%s", u) + + r.Test(t, r.TestCase{ + Providers: accProvider(), + PreCheck: func() { testAccPreCheck(t) }, + Steps: []r.TestStep{ + { + Config: fmt.Sprintf(testResourceTopic_initialConfig, topicName), + Check: testResourceTopic_initialCheck, + }, + { + Config: fmt.Sprintf(testResourceTopic_updateCleanupPolicy, topicName), + Check: testResourceTopic_updateCleanupPolicyCheck, + }, + }, + }) +} + func testResourceTopic_noConfigCheck(s *terraform.State) error { resourceState := s.Modules[0].Resources["kafka_topic.test"] if resourceState == nil { @@ -187,7 +237,7 @@ func testResourceTopic_updatePartitionsCheck(s *terraform.State) error { return err } if topic.Partitions != 2 { - return fmt.Errorf("partitions did not get increated got: %d", topic.Partitions) + return fmt.Errorf("partitions did not get increased got: %d", topic.Partitions) } if v, ok := topic.Config["segment.ms"]; ok && *v != "33333" { @@ -196,6 +246,41 @@ func testResourceTopic_updatePartitionsCheck(s *terraform.State) error { return nil } +func testResourceTopic_decreasePartitionsCheck(s *terraform.State) error { + resourceState := s.Modules[0].Resources["kafka_topic.test"] + instanceState := resourceState.Primary + client := testProvider.Meta().(*LazyClient) + name := instanceState.ID + topic, err := client.ReadTopic(name) + if err != nil { + return err + } + if topic.Partitions != 1 { + return fmt.Errorf("partitions did not get decreased got: %d", topic.Partitions) + } + + if v, ok := topic.Config["segment.ms"]; ok && *v != "22222" { + return fmt.Errorf("segment.ms != %v", topic) + } + return nil +} + +func testResourceTopic_updateCleanupPolicyCheck(s *terraform.State) error { + resourceState := s.Modules[0].Resources["kafka_topic.test"] + instanceState := resourceState.Primary + client := testProvider.Meta().(*LazyClient) + name := instanceState.ID + topic, err := client.ReadTopic(name) + if err != nil { + return err + } + + if v, ok := topic.Config["cleanup.policy"]; ok && *v != "compact" { + return fmt.Errorf("cleanup.policy != %v", topic) + } + return nil +} + const testResourceTopic_noConfig = ` provider "kafka" { bootstrap_servers = ["localhost:9092"] @@ -258,3 +343,21 @@ resource "kafka_topic" "test" { } } ` + +const testResourceTopic_updateCleanupPolicy = ` +provider "kafka" { + bootstrap_servers = ["localhost:9092"] +} + +resource "kafka_topic" "test" { + name = "%s" + replication_factor = 1 + partitions = 2 + + config = { + "retention.ms" = "11111" + "segment.ms" = "33333" + "cleanup.policy" = "compact" + } +} +`