From df3b5ea3688868bb5cda134bbea23aec0b761a8e Mon Sep 17 00:00:00 2001 From: oguzyildirim Date: Tue, 14 Nov 2023 00:04:32 +0300 Subject: [PATCH] chore: fix integration test --- test/integration/integration_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index c7e8ddc..59e3ce2 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -160,11 +160,15 @@ func Test_Should_Batch_Retry_Only_Failed_Messages(t *testing.T) { consumerGroup := "cronsumer-cg" brokerAddress := "localhost:9092" - messagesLen := make(chan int) - retryTopic := "retry-topic" - _, cleanUp := createTopicAndWriteMessages(t, topic, []segmentio.Message{{Topic: topic, Key: []byte("1"), Value: []byte(`foo`)}}) + _, cleanUp := createTopicAndWriteMessages(t, topic, []segmentio.Message{ + {Topic: topic, Partition: 0, Offset: 1, Key: []byte("1"), Value: []byte(`foo1`)}, + {Topic: topic, Partition: 0, Offset: 2, Key: []byte("2"), Value: []byte(`foo2`)}, + {Topic: topic, Partition: 0, Offset: 3, Key: []byte("3"), Value: []byte(`foo3`)}, + {Topic: topic, Partition: 0, Offset: 4, Key: []byte("4"), Value: []byte(`foo4`)}, + {Topic: topic, Partition: 0, Offset: 5, Key: []byte("5"), Value: []byte(`foo5`)}, + }) defer cleanUp() retryConn, cleanUpThisToo := createTopicAndWriteMessages(t, retryTopic, nil) @@ -186,7 +190,6 @@ func Test_Should_Batch_Retry_Only_Failed_Messages(t *testing.T) { MessageGroupLimit: 100, MessageGroupDuration: time.Second, BatchConsumeFn: func(messages []*kafka.Message) error { - messagesLen <- len(messages) messages[1].IsFailed = true return errors.New("err") }, @@ -203,6 +206,7 @@ func Test_Should_Batch_Retry_Only_Failed_Messages(t *testing.T) { var expectedOffset int64 = 1 conditionFunc := func() bool { lastOffset, _ := retryConn.ReadLastOffset() + fmt.Println(lastOffset) return lastOffset == expectedOffset }