Skip to content

Commit

Permalink
chore: fix integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzyildirim committed Nov 13, 2023
1 parent c88e139 commit df3b5ea
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,15 @@ func Test_Should_Batch_Retry_Only_Failed_Messages(t *testing.T) {
consumerGroup := "cronsumer-cg"
brokerAddress := "localhost:9092"

messagesLen := make(chan int)

retryTopic := "retry-topic"

_, cleanUp := createTopicAndWriteMessages(t, topic, []segmentio.Message{{Topic: topic, Key: []byte("1"), Value: []byte(`foo`)}})
_, cleanUp := createTopicAndWriteMessages(t, topic, []segmentio.Message{
{Topic: topic, Partition: 0, Offset: 1, Key: []byte("1"), Value: []byte(`foo1`)},
{Topic: topic, Partition: 0, Offset: 2, Key: []byte("2"), Value: []byte(`foo2`)},
{Topic: topic, Partition: 0, Offset: 3, Key: []byte("3"), Value: []byte(`foo3`)},
{Topic: topic, Partition: 0, Offset: 4, Key: []byte("4"), Value: []byte(`foo4`)},
{Topic: topic, Partition: 0, Offset: 5, Key: []byte("5"), Value: []byte(`foo5`)},
})
defer cleanUp()

retryConn, cleanUpThisToo := createTopicAndWriteMessages(t, retryTopic, nil)
Expand All @@ -186,7 +190,6 @@ func Test_Should_Batch_Retry_Only_Failed_Messages(t *testing.T) {
MessageGroupLimit: 100,
MessageGroupDuration: time.Second,
BatchConsumeFn: func(messages []*kafka.Message) error {
messagesLen <- len(messages)
messages[1].IsFailed = true
return errors.New("err")
},
Expand All @@ -203,6 +206,7 @@ func Test_Should_Batch_Retry_Only_Failed_Messages(t *testing.T) {
var expectedOffset int64 = 1
conditionFunc := func() bool {
lastOffset, _ := retryConn.ReadLastOffset()
fmt.Println(lastOffset)
return lastOffset == expectedOffset
}

Expand Down

0 comments on commit df3b5ea

Please sign in to comment.