diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index c7e8ddc..59e3ce2 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -160,11 +160,15 @@ func Test_Should_Batch_Retry_Only_Failed_Messages(t *testing.T) { consumerGroup := "cronsumer-cg" brokerAddress := "localhost:9092" - messagesLen := make(chan int) - retryTopic := "retry-topic" - _, cleanUp := createTopicAndWriteMessages(t, topic, []segmentio.Message{{Topic: topic, Key: []byte("1"), Value: []byte(`foo`)}}) + _, cleanUp := createTopicAndWriteMessages(t, topic, []segmentio.Message{ + {Topic: topic, Partition: 0, Offset: 1, Key: []byte("1"), Value: []byte(`foo1`)}, + {Topic: topic, Partition: 0, Offset: 2, Key: []byte("2"), Value: []byte(`foo2`)}, + {Topic: topic, Partition: 0, Offset: 3, Key: []byte("3"), Value: []byte(`foo3`)}, + {Topic: topic, Partition: 0, Offset: 4, Key: []byte("4"), Value: []byte(`foo4`)}, + {Topic: topic, Partition: 0, Offset: 5, Key: []byte("5"), Value: []byte(`foo5`)}, + }) defer cleanUp() retryConn, cleanUpThisToo := createTopicAndWriteMessages(t, retryTopic, nil) @@ -186,7 +190,6 @@ func Test_Should_Batch_Retry_Only_Failed_Messages(t *testing.T) { MessageGroupLimit: 100, MessageGroupDuration: time.Second, BatchConsumeFn: func(messages []*kafka.Message) error { - messagesLen <- len(messages) messages[1].IsFailed = true return errors.New("err") }, @@ -203,6 +206,7 @@ func Test_Should_Batch_Retry_Only_Failed_Messages(t *testing.T) { var expectedOffset int64 = 1 conditionFunc := func() bool { lastOffset, _ := retryConn.ReadLastOffset() + fmt.Println(lastOffset) return lastOffset == expectedOffset }