diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 2530128..b16db45 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -8,7 +8,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: ['1.18', '1.19', '1.20'] + go-version: ['1.20'] services: redis: diff --git a/README.md b/README.md index 5950213..0ca1c02 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ func main() { // Add a handler function to the queue queue.AddHandler(func(ctx context.Context, task *cerra.Task) error { - fmt.Printf("Received task with Name %s and payload %v\n", task.Name, task.Payload) + fmt.Printf("Received task with ID %s and payload %v\n", task.ID, task.Payload) return nil }) diff --git a/cerra_test.go b/cerra_test.go index 377d864..cf89fe9 100644 --- a/cerra_test.go +++ b/cerra_test.go @@ -12,7 +12,7 @@ func TestEnqueue(t *testing.T) { queue := cerra.NewQueue(cerra.NewInMemoryBackend(), 1) task := &cerra.Task{ - Name: "test_task", + ID: "test_task", Payload: []byte("test_payload"), } @@ -36,7 +36,7 @@ func TestAddHandler(t *testing.T) { defer queue.Close() task := &cerra.Task{ - Name: "test_task", + ID: "test_task", Payload: []byte("test_payload"), } @@ -51,7 +51,7 @@ func TestAddHandler(t *testing.T) { t.Error("handler was not called") } - if dequeuedTask.Name != task.Name { + if dequeuedTask.ID != task.ID { t.Error("dequeue task name != queued task name") } diff --git a/inmem_test.go b/inmem_test.go index bc2d2a1..6736ddd 100644 --- a/inmem_test.go +++ b/inmem_test.go @@ -9,7 +9,6 @@ import ( func TestInMemEnqueue(t *testing.T) { backend := cerra.NewInMemoryBackend() task := &cerra.Task{ - Name: "test_task", Payload: []byte("test_payload"), } @@ -23,7 +22,7 @@ func TestInMemDequeue(t *testing.T) { backend := cerra.NewInMemoryBackend() task := &cerra.Task{ - Name: "test_task", + ID: "id", Payload: []byte("test_payload"), } @@ -37,7 +36,7 @@ func TestInMemDequeue(t *testing.T) { t.Errorf("redisq dequeu error: %v", err) } - if dequeuedTask.Name != task.Name { + if dequeuedTask.ID != task.ID { t.Error("redisq dequeue task name != queued task name") } diff --git a/rabbitmq/rabbitmq.go b/rabbitmq/rabbitmq.go index 953ecad..852bcce 100644 --- a/rabbitmq/rabbitmq.go +++ b/rabbitmq/rabbitmq.go @@ -87,7 +87,7 @@ func (b *rabbiMQBackend) Enqueue(task *cerra.Task) error { false, false, amqp.Publishing{ - Headers: amqp.Table{}, + Headers: amqp.Table{"task_id": task.ID}, ContentType: "text/plain", ContentEncoding: "", Body: encodedTask, diff --git a/rabbitmq/rabbitmq_test.go b/rabbitmq/rabbitmq_test.go index c256db4..3dc2884 100644 --- a/rabbitmq/rabbitmq_test.go +++ b/rabbitmq/rabbitmq_test.go @@ -17,7 +17,7 @@ func TestRabbitmqEnqueue(t *testing.T) { defer queue.Close() task := &cerra.Task{ - Name: "test_task", + ID: "test_task", Payload: []byte("test_payload"), } @@ -37,7 +37,7 @@ func TestRabbitmqDequeue(t *testing.T) { queue.Start() task := &cerra.Task{ - Name: "test_task", + ID: "test_task", Payload: []byte("test_payload"), } @@ -58,7 +58,7 @@ func TestRabbitmqDequeue(t *testing.T) { t.Error("handler was not called") } - if dequeuedTask.Name != task.Name { + if dequeuedTask.ID != task.ID { t.Error("rabbitmq dequeue task name != queued task name") } diff --git a/redisq/redisq.go b/redisq/redisq.go index 9e669d8..a7433db 100644 --- a/redisq/redisq.go +++ b/redisq/redisq.go @@ -103,7 +103,7 @@ func (b *redisBackend) Dequeue() (*cerra.Task, error) { } return &cerra.Task{ - Name: task.Values["name"].(string), + ID: task.Values["id"].(string), Payload: []byte(task.Values["payload"].(string)), }, nil } @@ -144,7 +144,6 @@ func (b *redisBackend) consumer() (err error) { } func (b *redisBackend) fetch() { - for { select { case <-b.stop: diff --git a/redisq/redisq_test.go b/redisq/redisq_test.go index 5d104b9..ca06838 100644 --- a/redisq/redisq_test.go +++ b/redisq/redisq_test.go @@ -17,7 +17,7 @@ func TestRedisEnqueue(t *testing.T) { defer queue.Close() task := &cerra.Task{ - Name: "test_task", + ID: "test_task", Payload: []byte("test_payload"), } @@ -37,7 +37,7 @@ func TestRedisDequeue(t *testing.T) { queue.Start() task := &cerra.Task{ - Name: "test_task", + ID: "test_task", Payload: []byte("test_payload"), } @@ -58,7 +58,7 @@ func TestRedisDequeue(t *testing.T) { t.Error("handler was not called") } - if dequeuedTask.Name != task.Name { + if dequeuedTask.ID != task.ID { t.Error("rabbitmq dequeue task name != queued task name") } diff --git a/task.go b/task.go index 5ce8bf2..05a4c78 100644 --- a/task.go +++ b/task.go @@ -6,15 +6,21 @@ import ( ) type Task struct { - Name string `json:"name"` - Payload []byte `json:"payload"` - + ID string `json:"id"` + Payload []byte `json:"payload"` Timeout time.Duration `json:"timeout"` } -func NewTask(name string, payload []byte) *Task { +func NewTask(payload []byte) *Task { + return &Task{ + Payload: payload, + Timeout: 60 * time.Minute, + } +} + +func NewTaskWithID(id string, payload []byte) *Task { return &Task{ - Name: name, + ID: id, Payload: payload, Timeout: 60 * time.Minute, } @@ -26,7 +32,7 @@ func (t *Task) Encode() ([]byte, error) { func (t *Task) ToMap() map[string]interface{} { return map[string]interface{}{ - "name": t.Name, + "id": t.ID, "payload": t.Payload, "timeout": t.Timeout, }