Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add custom unique key option #505

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
TimeoutOpt
DeadlineOpt
UniqueOpt
UniqueKeyOpt
ProcessAtOpt
ProcessInOpt
TaskIDOpt
Expand Down Expand Up @@ -82,6 +83,7 @@ type (
timeoutOption time.Duration
deadlineOption time.Time
uniqueOption time.Duration
uniqueKeyOption string
processAtOption time.Time
processInOption time.Duration
retentionOption time.Duration
Expand Down Expand Up @@ -160,10 +162,11 @@ func (t deadlineOption) Value() interface{} { return time.Time(t) }
// ErrDuplicateTask error is returned when enqueueing a duplicate task.
// TTL duration must be greater than or equal to 1 second.
//
// Uniqueness of a task is based on the following properties:
// By default, the uniqueness of a task is based on the following properties:
// - Task Type
// - Task Payload
// - Queue Name
// UniqueKey can be used to specify a custom string for calculating uniqueness, instead of task payload.
func Unique(ttl time.Duration) Option {
return uniqueOption(ttl)
}
Expand All @@ -172,6 +175,24 @@ func (ttl uniqueOption) String() string { return fmt.Sprintf("Unique(%v)", t
func (ttl uniqueOption) Type() OptionType { return UniqueOpt }
func (ttl uniqueOption) Value() interface{} { return time.Duration(ttl) }

// UniqueKey returns an option to define the custom uniqueness of a task.
// If uniqueKey is not empty, the uniqueness of a task is based on the following properties:
// - Task Type
// - UniqueKey
// - Queue Name
// Otherwise, task payload will be used, see Unique.
//
// UniqueKey should be used together with Unique.
func UniqueKey(uniqueKey string) Option {
return uniqueKeyOption(uniqueKey)
}

func (uniqueKey uniqueKeyOption) String() string {
return fmt.Sprintf("UniqueKey(%q)", string(uniqueKey))
}
func (uniqueKey uniqueKeyOption) Type() OptionType { return UniqueKeyOpt }
func (uniqueKey uniqueKeyOption) Value() interface{} { return string(uniqueKey) }

// ProcessAt returns an option to specify when to process the given task.
//
// If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others.
Expand Down Expand Up @@ -234,6 +255,7 @@ type option struct {
timeout time.Duration
deadline time.Time
uniqueTTL time.Duration
uniqueKey string
processAt time.Time
retention time.Duration
group string
Expand Down Expand Up @@ -278,6 +300,8 @@ func composeOptions(opts ...Option) (option, error) {
return option{}, errors.New("Unique TTL cannot be less than 1s")
}
res.uniqueTTL = ttl
case uniqueKeyOption:
res.uniqueKey = string(opt)
case processAtOption:
res.processAt = time.Time(opt)
case processInOption:
Expand Down Expand Up @@ -379,7 +403,11 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
}
var uniqueKey string
if opt.uniqueTTL > 0 {
uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload())
if opt.uniqueKey != "" {
uniqueKey = base.CustomUniqueKey(opt.queue, task.Type(), opt.uniqueKey)
} else {
uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload())
}
}
msg := &base.TaskMessage{
ID: opt.taskID,
Expand Down
43 changes: 43 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,3 +1191,46 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
}
}
}

func TestClientEnqueueUniqueWithUniqueKeyOption(t *testing.T) {
r := setup(t)
c := NewClient(getRedisConnOpt(t))
defer c.Close()

tests := []struct {
task *Task
ttl time.Duration
}{
{
NewTask("email", h.JSON(map[string]interface{}{"user_id": 123})),
time.Hour,
},
}

for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.

// Enqueue the task first. It should succeed.
_, err := c.Enqueue(tc.task, Unique(tc.ttl), UniqueKey("custom_unique_key"))
if err != nil {
t.Fatal(err)
}

gotTTL := r.TTL(context.Background(), base.CustomUniqueKey(base.DefaultQueueName, tc.task.Type(), "custom_unique_key")).Val()
if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL = %v, want %v", gotTTL, tc.ttl)
continue
}

// Enqueue the task again. It should fail.
_, err = c.Enqueue(tc.task, Unique(tc.ttl), UniqueKey("custom_unique_key"))
if err == nil {
t.Errorf("Enqueueing %+v did not return an error", tc.task)
continue
}
if !errors.Is(err, ErrDuplicateTask) {
t.Errorf("Enqueueing %+v returned an error that is not ErrDuplicateTask", tc.task)
continue
}
}
}
6 changes: 6 additions & 0 deletions inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,12 @@ func parseOption(s string) (Option, error) {
return nil, err
}
return Unique(d), nil
case "UniqueKey":
key, err := strconv.Unquote(arg)
if err != nil {
return nil, err
}
return UniqueKey(key), nil
case "ProcessAt":
t, err := time.Parse(time.UnixDate, arg)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions internal/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ func UniqueKey(qname, tasktype string, payload []byte) string {
return QueueKeyPrefix(qname) + "unique:" + tasktype + ":" + hex.EncodeToString(checksum[:])
}

// CustomUniqueKey returns a redis key with the given type, custom key, and queue name.
func CustomUniqueKey(qname, tasktype string, customKey string) string {
checksum := md5.Sum([]byte(customKey))
return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, hex.EncodeToString(checksum[:]))
}

// GroupKeyPrefix returns a prefix for group key.
func GroupKeyPrefix(qname string) string {
return QueueKeyPrefix(qname) + "g:"
Expand Down