diff --git a/Makefile b/Makefile index deda736..8907a81 100644 --- a/Makefile +++ b/Makefile @@ -38,4 +38,4 @@ integration-test: ## run-act: act for running github actions on your local machine run-act: - act -j test --container-architecture linux/arm64 -v \ No newline at end of file + act -j test --container-architecture linux/arm64 \ No newline at end of file diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 45e2833..6dac326 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -71,7 +71,7 @@ func Test_Should_Consume_Message_Successfully(t *testing.T) { consumerGroup := "topic-cg" brokerAddress := "localhost:9092" - conn, cleanUp := createTopic(t, topic) + conn, cleanUp := createTopicAndWriteMessages(t, topic, []segmentio.Message{{Topic: topic, Key: []byte("1"), Value: []byte(`foo`)}}) defer cleanUp() messageCh := make(chan kafka.Message) @@ -89,13 +89,6 @@ func Test_Should_Consume_Message_Successfully(t *testing.T) { consumer.Consume() - // When - produceMessages(t, conn, segmentio.Message{ - Topic: topic, - Key: []byte("1"), - Value: []byte(`foo`), - }) - // Then actual := <-messageCh if string(actual.Value) != "foo" { @@ -117,7 +110,15 @@ func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) { consumerGroup := "batch-topic-cg" brokerAddress := "localhost:9092" - conn, cleanUp := createTopic(t, topic) + messages := []segmentio.Message{ + {Topic: topic, Partition: 0, Offset: 1, Key: []byte("1"), Value: []byte(`foo1`)}, + {Topic: topic, Partition: 0, Offset: 2, Key: []byte("2"), Value: []byte(`foo2`)}, + {Topic: topic, Partition: 0, Offset: 3, Key: []byte("3"), Value: []byte(`foo3`)}, + {Topic: topic, Partition: 0, Offset: 4, Key: []byte("4"), Value: []byte(`foo4`)}, + {Topic: topic, Partition: 0, Offset: 5, Key: []byte("5"), Value: []byte(`foo5`)}, + } + + conn, cleanUp := createTopicAndWriteMessages(t, topic, messages) defer cleanUp() messagesLen := make(chan int) @@ -139,15 +140,6 @@ func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) { consumer.Consume() - // When - produceMessages(t, conn, - segmentio.Message{Topic: topic, Partition: 0, Offset: 1, Key: []byte("1"), Value: []byte(`foo1`)}, - segmentio.Message{Topic: topic, Partition: 0, Offset: 2, Key: []byte("2"), Value: []byte(`foo2`)}, - segmentio.Message{Topic: topic, Partition: 0, Offset: 3, Key: []byte("3"), Value: []byte(`foo3`)}, - segmentio.Message{Topic: topic, Partition: 0, Offset: 4, Key: []byte("4"), Value: []byte(`foo4`)}, - segmentio.Message{Topic: topic, Partition: 0, Offset: 5, Key: []byte("5"), Value: []byte(`foo5`)}, - ) - // Then actual := <-messagesLen @@ -169,10 +161,10 @@ func Test_Should_Integrate_With_Kafka_Cronsumer_Successfully(t *testing.T) { retryTopic := "retry-topic" - conn, cleanUp := createTopic(t, topic) + _, cleanUp := createTopicAndWriteMessages(t, topic, []segmentio.Message{{Topic: topic, Key: []byte("1"), Value: []byte(`foo`)}}) defer cleanUp() - retryConn, cleanUpThisToo := createTopic(t, retryTopic) + retryConn, cleanUpThisToo := createTopicAndWriteMessages(t, retryTopic, nil) defer cleanUpThisToo() consumerCfg := &kafka.ConsumerConfig{ @@ -197,9 +189,6 @@ func Test_Should_Integrate_With_Kafka_Cronsumer_Successfully(t *testing.T) { consumer.Consume() - // When - produceMessages(t, conn, segmentio.Message{Topic: topic, Key: []byte("1"), Value: []byte(`foo`)}) - // Then var expectedOffset int64 = 1 conditionFunc := func() bool { @@ -210,7 +199,7 @@ func Test_Should_Integrate_With_Kafka_Cronsumer_Successfully(t *testing.T) { assertEventually(t, conditionFunc, 45*time.Second, time.Second) } -func createTopic(t *testing.T, topicName string) (*segmentio.Conn, func()) { +func createTopicAndWriteMessages(t *testing.T, topicName string, messages []segmentio.Message) (*segmentio.Conn, func()) { t.Helper() conn, err := segmentio.DialLeader(context.Background(), "tcp", "localhost:9092", topicName, 0) @@ -224,15 +213,14 @@ func createTopic(t *testing.T, topicName string) (*segmentio.Conn, func()) { } } - return conn, cleanUp -} - -func produceMessages(t *testing.T, conn *segmentio.Conn, msgs ...segmentio.Message) { - t.Helper() - - if _, err := conn.WriteMessages(msgs...); err != nil { - t.Fatalf("Produce err %s", err.Error()) + if messages != nil { + _, err = conn.WriteMessages(messages...) + if err != nil { + t.Fatalf("err during write message %s", err.Error()) + } } + + return conn, cleanUp } func assertEventually(t *testing.T, condition func() bool, waitFor time.Duration, tick time.Duration) bool {