forked from golang-queue/kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka_test.go
55 lines (51 loc) · 1.19 KB
/
kafka_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package kafka
import (
"testing"
"github.com/golang-queue/queue"
"github.com/stretchr/testify/assert"
//"github.com/stretchr/testify/assert"
)
// func TestFetchData(t *testing.T) {
// // m := mockMessage{
// // Message: "foo",
// // }
// // w := NewWorker()
// // q, err := queue.NewQueue(
// // queue.WithWorker(w),
// // queue.WithWorkerCount(2),
// // )
// // assert.NoError(t, err)
// // q.Start()
// // time.Sleep(50 * time.Millisecond)
// // q.Shutdown()
// // // can't queue task after shutdown
// // err = q.Queue(m)
// // assert.Error(t, err)
// // assert.Equal(t, queue.ErrQueueShutdown, err)
// // q.Wait()
// fmt.Printf("start\n")
// InitConsumer(WithAddr("localhost"),
// WithPartition(1),
// WithTopic("hello"))
// fmt.Printf("end\n")
// }
func TestNewWork(t *testing.T) {
w :=
NewWorker(
WithAddr("localhost"),
WithNetwork("tcp"),
WithPartition(1),
WithTopic("hello"),
)
q, err := queue.NewQueue(
queue.WithWorker(w),
queue.WithWorkerCount(1),
)
assert.NoError(t, err)
q.Start()
// time.Sleep(100 * time.Millisecond)
// //assert.Equal(t, 1, int(q.metric.BusyWorkers()))
// time.Sleep(600 * time.Millisecond)
// q.Shutdown()
// q.Wait()
}