Skip to content

Commit

Permalink
Merge pull request #7 from zaidfadhil/feat/task-id
Browse files Browse the repository at this point in the history
switch Name in queue task to ID
  • Loading branch information
zaidfadhil authored Jul 2, 2024
2 parents 2a1e273 + 995c230 commit f3ff735
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: ['1.18', '1.19', '1.20']
go-version: ['1.20']

services:
redis:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func main() {

// Add a handler function to the queue
queue.AddHandler(func(ctx context.Context, task *cerra.Task) error {
fmt.Printf("Received task with Name %s and payload %v\n", task.Name, task.Payload)
fmt.Printf("Received task with ID %s and payload %v\n", task.ID, task.Payload)
return nil
})

Expand Down
6 changes: 3 additions & 3 deletions cerra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestEnqueue(t *testing.T) {
queue := cerra.NewQueue(cerra.NewInMemoryBackend(), 1)

task := &cerra.Task{
Name: "test_task",
ID: "test_task",
Payload: []byte("test_payload"),
}

Expand All @@ -36,7 +36,7 @@ func TestAddHandler(t *testing.T) {
defer queue.Close()

task := &cerra.Task{
Name: "test_task",
ID: "test_task",
Payload: []byte("test_payload"),
}

Expand All @@ -51,7 +51,7 @@ func TestAddHandler(t *testing.T) {
t.Error("handler was not called")
}

if dequeuedTask.Name != task.Name {
if dequeuedTask.ID != task.ID {
t.Error("dequeue task name != queued task name")
}

Expand Down
5 changes: 2 additions & 3 deletions inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
func TestInMemEnqueue(t *testing.T) {
backend := cerra.NewInMemoryBackend()
task := &cerra.Task{
Name: "test_task",
Payload: []byte("test_payload"),
}

Expand All @@ -23,7 +22,7 @@ func TestInMemDequeue(t *testing.T) {
backend := cerra.NewInMemoryBackend()

task := &cerra.Task{
Name: "test_task",
ID: "id",
Payload: []byte("test_payload"),
}

Expand All @@ -37,7 +36,7 @@ func TestInMemDequeue(t *testing.T) {
t.Errorf("redisq dequeu error: %v", err)
}

if dequeuedTask.Name != task.Name {
if dequeuedTask.ID != task.ID {
t.Error("redisq dequeue task name != queued task name")
}

Expand Down
2 changes: 1 addition & 1 deletion rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (b *rabbiMQBackend) Enqueue(task *cerra.Task) error {
false,
false,
amqp.Publishing{
Headers: amqp.Table{},
Headers: amqp.Table{"task_id": task.ID},
ContentType: "text/plain",
ContentEncoding: "",
Body: encodedTask,
Expand Down
6 changes: 3 additions & 3 deletions rabbitmq/rabbitmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestRabbitmqEnqueue(t *testing.T) {
defer queue.Close()

task := &cerra.Task{
Name: "test_task",
ID: "test_task",
Payload: []byte("test_payload"),
}

Expand All @@ -37,7 +37,7 @@ func TestRabbitmqDequeue(t *testing.T) {
queue.Start()

task := &cerra.Task{
Name: "test_task",
ID: "test_task",
Payload: []byte("test_payload"),
}

Expand All @@ -58,7 +58,7 @@ func TestRabbitmqDequeue(t *testing.T) {
t.Error("handler was not called")
}

if dequeuedTask.Name != task.Name {
if dequeuedTask.ID != task.ID {
t.Error("rabbitmq dequeue task name != queued task name")
}

Expand Down
3 changes: 1 addition & 2 deletions redisq/redisq.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (b *redisBackend) Dequeue() (*cerra.Task, error) {
}

return &cerra.Task{
Name: task.Values["name"].(string),
ID: task.Values["id"].(string),
Payload: []byte(task.Values["payload"].(string)),
}, nil
}
Expand Down Expand Up @@ -144,7 +144,6 @@ func (b *redisBackend) consumer() (err error) {
}

func (b *redisBackend) fetch() {

for {
select {
case <-b.stop:
Expand Down
6 changes: 3 additions & 3 deletions redisq/redisq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestRedisEnqueue(t *testing.T) {
defer queue.Close()

task := &cerra.Task{
Name: "test_task",
ID: "test_task",
Payload: []byte("test_payload"),
}

Expand All @@ -37,7 +37,7 @@ func TestRedisDequeue(t *testing.T) {
queue.Start()

task := &cerra.Task{
Name: "test_task",
ID: "test_task",
Payload: []byte("test_payload"),
}

Expand All @@ -58,7 +58,7 @@ func TestRedisDequeue(t *testing.T) {
t.Error("handler was not called")
}

if dequeuedTask.Name != task.Name {
if dequeuedTask.ID != task.ID {
t.Error("rabbitmq dequeue task name != queued task name")
}

Expand Down
18 changes: 12 additions & 6 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@ import (
)

type Task struct {
Name string `json:"name"`
Payload []byte `json:"payload"`

ID string `json:"id"`
Payload []byte `json:"payload"`
Timeout time.Duration `json:"timeout"`
}

func NewTask(name string, payload []byte) *Task {
func NewTask(payload []byte) *Task {
return &Task{
Payload: payload,
Timeout: 60 * time.Minute,
}
}

func NewTaskWithID(id string, payload []byte) *Task {
return &Task{
Name: name,
ID: id,
Payload: payload,
Timeout: 60 * time.Minute,
}
Expand All @@ -26,7 +32,7 @@ func (t *Task) Encode() ([]byte, error) {

func (t *Task) ToMap() map[string]interface{} {
return map[string]interface{}{
"name": t.Name,
"id": t.ID,
"payload": t.Payload,
"timeout": t.Timeout,
}
Expand Down

0 comments on commit f3ff735

Please sign in to comment.