Skip to content

Commit

Permalink
chore: change write message strategy for integration test problem
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdulsametileri committed Sep 17, 2023
1 parent 9df3a21 commit 667d162
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 33 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ integration-test:

## run-act: act for running github actions on your local machine
run-act:
act -j test --container-architecture linux/arm64 -v
act -j test --container-architecture linux/arm64
52 changes: 20 additions & 32 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func Test_Should_Consume_Message_Successfully(t *testing.T) {
consumerGroup := "topic-cg"
brokerAddress := "localhost:9092"

conn, cleanUp := createTopic(t, topic)
conn, cleanUp := createTopicAndWriteMessages(t, topic, []segmentio.Message{{Topic: topic, Key: []byte("1"), Value: []byte(`foo`)}})
defer cleanUp()

messageCh := make(chan kafka.Message)
Expand All @@ -89,13 +89,6 @@ func Test_Should_Consume_Message_Successfully(t *testing.T) {

consumer.Consume()

// When
produceMessages(t, conn, segmentio.Message{
Topic: topic,
Key: []byte("1"),
Value: []byte(`foo`),
})

// Then
actual := <-messageCh
if string(actual.Value) != "foo" {
Expand All @@ -117,7 +110,15 @@ func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) {
consumerGroup := "batch-topic-cg"
brokerAddress := "localhost:9092"

conn, cleanUp := createTopic(t, topic)
messages := []segmentio.Message{
{Topic: topic, Partition: 0, Offset: 1, Key: []byte("1"), Value: []byte(`foo1`)},
{Topic: topic, Partition: 0, Offset: 2, Key: []byte("2"), Value: []byte(`foo2`)},
{Topic: topic, Partition: 0, Offset: 3, Key: []byte("3"), Value: []byte(`foo3`)},
{Topic: topic, Partition: 0, Offset: 4, Key: []byte("4"), Value: []byte(`foo4`)},
{Topic: topic, Partition: 0, Offset: 5, Key: []byte("5"), Value: []byte(`foo5`)},
}

conn, cleanUp := createTopicAndWriteMessages(t, topic, messages)
defer cleanUp()

messagesLen := make(chan int)
Expand All @@ -139,15 +140,6 @@ func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) {

consumer.Consume()

// When
produceMessages(t, conn,
segmentio.Message{Topic: topic, Partition: 0, Offset: 1, Key: []byte("1"), Value: []byte(`foo1`)},
segmentio.Message{Topic: topic, Partition: 0, Offset: 2, Key: []byte("2"), Value: []byte(`foo2`)},
segmentio.Message{Topic: topic, Partition: 0, Offset: 3, Key: []byte("3"), Value: []byte(`foo3`)},
segmentio.Message{Topic: topic, Partition: 0, Offset: 4, Key: []byte("4"), Value: []byte(`foo4`)},
segmentio.Message{Topic: topic, Partition: 0, Offset: 5, Key: []byte("5"), Value: []byte(`foo5`)},
)

// Then
actual := <-messagesLen

Expand All @@ -169,10 +161,10 @@ func Test_Should_Integrate_With_Kafka_Cronsumer_Successfully(t *testing.T) {

retryTopic := "retry-topic"

conn, cleanUp := createTopic(t, topic)
_, cleanUp := createTopicAndWriteMessages(t, topic, []segmentio.Message{{Topic: topic, Key: []byte("1"), Value: []byte(`foo`)}})
defer cleanUp()

retryConn, cleanUpThisToo := createTopic(t, retryTopic)
retryConn, cleanUpThisToo := createTopicAndWriteMessages(t, retryTopic, nil)
defer cleanUpThisToo()

consumerCfg := &kafka.ConsumerConfig{
Expand All @@ -197,9 +189,6 @@ func Test_Should_Integrate_With_Kafka_Cronsumer_Successfully(t *testing.T) {

consumer.Consume()

// When
produceMessages(t, conn, segmentio.Message{Topic: topic, Key: []byte("1"), Value: []byte(`foo`)})

// Then
var expectedOffset int64 = 1
conditionFunc := func() bool {
Expand All @@ -210,7 +199,7 @@ func Test_Should_Integrate_With_Kafka_Cronsumer_Successfully(t *testing.T) {
assertEventually(t, conditionFunc, 45*time.Second, time.Second)
}

func createTopic(t *testing.T, topicName string) (*segmentio.Conn, func()) {
func createTopicAndWriteMessages(t *testing.T, topicName string, messages []segmentio.Message) (*segmentio.Conn, func()) {
t.Helper()

conn, err := segmentio.DialLeader(context.Background(), "tcp", "localhost:9092", topicName, 0)
Expand All @@ -224,15 +213,14 @@ func createTopic(t *testing.T, topicName string) (*segmentio.Conn, func()) {
}
}

return conn, cleanUp
}

func produceMessages(t *testing.T, conn *segmentio.Conn, msgs ...segmentio.Message) {
t.Helper()

if _, err := conn.WriteMessages(msgs...); err != nil {
t.Fatalf("Produce err %s", err.Error())
if messages != nil {
_, err = conn.WriteMessages(messages...)
if err != nil {
t.Fatalf("err during write message %s", err.Error())
}
}

return conn, cleanUp
}

func assertEventually(t *testing.T, condition func() bool, waitFor time.Duration, tick time.Duration) bool {
Expand Down

0 comments on commit 667d162

Please sign in to comment.