From 2ae8e797d7e993a27d866cb33d7f57dbe52a4c65 Mon Sep 17 00:00:00 2001 From: Yousif <753751+yousifh@users.noreply.github.com> Date: Wed, 9 Oct 2024 02:54:18 -0400 Subject: [PATCH] Optimize enqueueing tasks performance --- internal/rdb/rdb.go | 44 +++++++++++++++++++++++++++++----------- internal/rdb/rdb_test.go | 43 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 12 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 68de3864..d00acfcf 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "math" + "sync" "time" "github.com/google/uuid" @@ -28,6 +29,7 @@ const LeaseDuration = 30 * time.Second type RDB struct { client redis.UniversalClient clock timeutil.Clock + m sync.Map } // NewRDB returns a new instance of RDB. @@ -112,8 +114,11 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error { if err != nil { return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err)) } - if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { - return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + if _, ok := r.m.Load(msg.Queue); !ok { + if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + } + r.m.Store(msg.Queue, struct{}{}) } keys := []string{ base.TaskKey(msg.Queue, msg.ID), @@ -174,8 +179,11 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time if err != nil { return errors.E(op, errors.Internal, "cannot encode task message: %v", err) } - if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { - return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + if _, ok := r.m.Load(msg.Queue); !ok { + if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + } + r.m.Store(msg.Queue, struct{}{}) } keys := []string{ msg.UniqueKey, @@ -529,8 +537,11 @@ func (r *RDB) AddToGroup(ctx context.Context, msg *base.TaskMessage, groupKey st if err != nil { return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err)) } - if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { - return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + if _, ok := r.m.Load(msg.Queue); !ok { + if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + } + r.m.Store(msg.Queue, struct{}{}) } keys := []string{ base.TaskKey(msg.Queue, msg.ID), @@ -591,8 +602,11 @@ func (r *RDB) AddToGroupUnique(ctx context.Context, msg *base.TaskMessage, group if err != nil { return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err)) } - if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { - return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + if _, ok := r.m.Load(msg.Queue); !ok { + if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + } + r.m.Store(msg.Queue, struct{}{}) } keys := []string{ base.TaskKey(msg.Queue, msg.ID), @@ -648,8 +662,11 @@ func (r *RDB) Schedule(ctx context.Context, msg *base.TaskMessage, processAt tim if err != nil { return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err)) } - if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { - return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + if _, ok := r.m.Load(msg.Queue); !ok { + if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + } + r.m.Store(msg.Queue, struct{}{}) } keys := []string{ base.TaskKey(msg.Queue, msg.ID), @@ -707,8 +724,11 @@ func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, process if err != nil { return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode task message: %v", err)) } - if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { - return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + if _, ok := r.m.Load(msg.Queue); !ok { + if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + } + r.m.Store(msg.Queue, struct{}{}) } keys := []string{ msg.UniqueKey, diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 3bd9eda4..12ec1015 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -160,6 +160,49 @@ func TestEnqueueTaskIdConflictError(t *testing.T) { } } +func TestEnqueueQueueCache(t *testing.T) { + r := setup(t) + defer r.Close() + t1 := h.NewTaskMessageWithQueue("sync1", nil, "q1") + t2 := h.NewTaskMessageWithQueue("sync2", nil, "q2") + t3 := h.NewTaskMessageWithQueue("sync3", nil, "q1") + + err := r.Enqueue(context.Background(), t1) + if err != nil { + t.Fatalf("(*RDB).Enqueue(msg) = %v, want nil", err) + } + + // Check queue is in the AllQueues set. + if !r.client.SIsMember(context.Background(), base.AllQueues, t1.Queue).Val() { + t.Fatalf("%q is not a member of SET %q", t1.Queue, base.AllQueues) + } + + err = r.Enqueue(context.Background(), t2) + if err != nil { + t.Fatalf("(*RDB).Enqueue(msg) = %v, want nil", err) + } + + if !r.client.SIsMember(context.Background(), base.AllQueues, t2.Queue).Val() { + t.Fatalf("%q is not a member of SET %q", t2.Queue, base.AllQueues) + } + + // Delete q1 from AllQueues + err = r.client.SRem(context.Background(), base.AllQueues, "q1").Err() + if err != nil { + t.Fatalf("Redis SREM = %v, want nil", err) + } + + // Enqueue another task to q1, won't update the queue because already cached in-memory + err = r.Enqueue(context.Background(), t3) + if err != nil { + t.Fatalf("(*RDB).Enqueue(msg) = %v, want nil", err) + } + + if r.client.SIsMember(context.Background(), base.AllQueues, t3.Queue).Val() { + t.Fatalf("%q is a member of SET %q", t3.Queue, base.AllQueues) + } +} + func TestEnqueueUnique(t *testing.T) { r := setup(t) defer r.Close()