Skip to content

Commit

Permalink
Record total tasks processed/failed
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken authored Dec 17, 2021
1 parent 43cb4dd commit 82d18e3
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 96 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Package `x/metrics` is added.
- Tool `tools/metrics_exporter` binary is added.
- `ProcessedTotal` and `FailedTotal` fields were added to `QueueInfo` struct.

## [0.19.1] - 2021-12-12

Expand Down
39 changes: 23 additions & 16 deletions inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,17 @@ type QueueInfo struct {
// Number of stored completed tasks.
Completed int

// Total number of tasks being processed during the given date.
// Total number of tasks being processed within the given date (counter resets daily).
// The number includes both succeeded and failed tasks.
Processed int
// Total number of tasks failed to be processed during the given date.
// Total number of tasks failed to be processed within the given date (counter resets daily).
Failed int

// Total number of tasks processed (cumulative).
ProcessedTotal int
// Total number of tasks failed (cumulative).
FailedTotal int

// Paused indicates whether the queue is paused.
// If true, tasks in the queue will not be processed.
Paused bool
Expand All @@ -96,20 +101,22 @@ func (i *Inspector) GetQueueInfo(qname string) (*QueueInfo, error) {
return nil, err
}
return &QueueInfo{
Queue: stats.Queue,
MemoryUsage: stats.MemoryUsage,
Latency: stats.Latency,
Size: stats.Size,
Pending: stats.Pending,
Active: stats.Active,
Scheduled: stats.Scheduled,
Retry: stats.Retry,
Archived: stats.Archived,
Completed: stats.Completed,
Processed: stats.Processed,
Failed: stats.Failed,
Paused: stats.Paused,
Timestamp: stats.Timestamp,
Queue: stats.Queue,
MemoryUsage: stats.MemoryUsage,
Latency: stats.Latency,
Size: stats.Size,
Pending: stats.Pending,
Active: stats.Active,
Scheduled: stats.Scheduled,
Retry: stats.Retry,
Archived: stats.Archived,
Completed: stats.Completed,
Processed: stats.Processed,
Failed: stats.Failed,
ProcessedTotal: stats.ProcessedTotal,
FailedTotal: stats.FailedTotal,
Paused: stats.Paused,
Timestamp: stats.Timestamp,
}, nil
}

Expand Down
52 changes: 35 additions & 17 deletions inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ func TestInspectorGetQueueInfo(t *testing.T) {
completed map[string][]base.Z
processed map[string]int
failed map[string]int
processedTotal map[string]int
failedTotal map[string]int
oldestPendingMessageEnqueueTime map[string]time.Time
qname string
want *QueueInfo
Expand Down Expand Up @@ -329,26 +331,38 @@ func TestInspectorGetQueueInfo(t *testing.T) {
"critical": 0,
"low": 5,
},
processedTotal: map[string]int{
"default": 11111,
"critical": 22222,
"low": 33333,
},
failedTotal: map[string]int{
"default": 111,
"critical": 222,
"low": 333,
},
oldestPendingMessageEnqueueTime: map[string]time.Time{
"default": now.Add(-15 * time.Second),
"critical": now.Add(-200 * time.Millisecond),
"low": now.Add(-30 * time.Second),
},
qname: "default",
want: &QueueInfo{
Queue: "default",
Latency: 15 * time.Second,
Size: 4,
Pending: 1,
Active: 1,
Scheduled: 2,
Retry: 0,
Archived: 0,
Completed: 0,
Processed: 120,
Failed: 2,
Paused: false,
Timestamp: now,
Queue: "default",
Latency: 15 * time.Second,
Size: 4,
Pending: 1,
Active: 1,
Scheduled: 2,
Retry: 0,
Archived: 0,
Completed: 0,
Processed: 120,
Failed: 2,
ProcessedTotal: 11111,
FailedTotal: 111,
Paused: false,
Timestamp: now,
},
},
}
Expand All @@ -363,12 +377,16 @@ func TestInspectorGetQueueInfo(t *testing.T) {
h.SeedAllCompletedQueues(t, r, tc.completed)
ctx := context.Background()
for qname, n := range tc.processed {
processedKey := base.ProcessedKey(qname, now)
r.Set(ctx, processedKey, n, 0)
r.Set(ctx, base.ProcessedKey(qname, now), n, 0)
}
for qname, n := range tc.failed {
failedKey := base.FailedKey(qname, now)
r.Set(ctx, failedKey, n, 0)
r.Set(ctx, base.FailedKey(qname, now), n, 0)
}
for qname, n := range tc.processedTotal {
r.Set(ctx, base.ProcessedTotalKey(qname), n, 0)
}
for qname, n := range tc.failedTotal {
r.Set(ctx, base.FailedTotalKey(qname), n, 0)
}
for qname, enqueueTime := range tc.oldestPendingMessageEnqueueTime {
if enqueueTime.IsZero() {
Expand Down
16 changes: 16 additions & 0 deletions internal/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ const (
CancelChannel = "asynq:cancel" // PubSub channel
)

// Max value for int64.
//
// Use this value to check if a redis counter value reached maximum.
// As documeted in https://redis.io/commands/INCR, a string stored at a redis key is interpreted as a base-10 64 bit signed integer.
const MaxInt64 = 1<<63 - 1

// TaskState denotes the state of a task.
type TaskState int

Expand Down Expand Up @@ -150,6 +156,16 @@ func PausedKey(qname string) string {
return fmt.Sprintf("%spaused", QueueKeyPrefix(qname))
}

// ProcessedTotalKey returns a redis key for total processed count for the given queue.
func ProcessedTotalKey(qname string) string {
return fmt.Sprintf("%sprocessed", QueueKeyPrefix(qname))
}

// FailedTotalKey returns a redis key for total failure count for the given queue.
func FailedTotalKey(qname string) string {
return fmt.Sprintf("%sfailed", QueueKeyPrefix(qname))
}

// ProcessedKey returns a redis key for processed count for the given day for the queue.
func ProcessedKey(qname string, t time.Time) string {
return fmt.Sprintf("%sprocessed:%s", QueueKeyPrefix(qname), t.UTC().Format("2006-01-02"))
Expand Down
34 changes: 34 additions & 0 deletions internal/base/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,40 @@ func TestPausedKey(t *testing.T) {
}
}

func TestProcessedTotalKey(t *testing.T) {
tests := []struct {
qname string
want string
}{
{"default", "asynq:{default}:processed"},
{"custom", "asynq:{custom}:processed"},
}

for _, tc := range tests {
got := ProcessedTotalKey(tc.qname)
if got != tc.want {
t.Errorf("ProcessedTotalKey(%q) = %q, want %q", tc.qname, got, tc.want)
}
}
}

func TestFailedTotalKey(t *testing.T) {
tests := []struct {
qname string
want string
}{
{"default", "asynq:{default}:failed"},
{"custom", "asynq:{custom}:failed"},
}

for _, tc := range tests {
got := FailedTotalKey(tc.qname)
if got != tc.want {
t.Errorf("FailedTotalKey(%q) = %q, want %q", tc.qname, got, tc.want)
}
}
}

func TestProcessedKey(t *testing.T) {
tests := []struct {
qname string
Expand Down
62 changes: 36 additions & 26 deletions internal/rdb/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,18 @@ type Stats struct {
Retry int
Archived int
Completed int
// Total number of tasks processed during the current date.

// Number of tasks processed within the current date.
// The number includes both succeeded and failed tasks.
Processed int
// Total number of tasks failed during the current date.
// Number of tasks failed within the current date.
Failed int

// Total number of tasks processed (both succeeded and failed) from this queue.
ProcessedTotal int
// Total number of tasks failed.
FailedTotal int

// Latency of the queue, measured by the oldest pending task in the queue.
Latency time.Duration
// Time this stats was taken.
Expand All @@ -65,15 +72,17 @@ type DailyStats struct {
Time time.Time
}

// KEYS[1] -> asynq:<qname>:pending
// KEYS[2] -> asynq:<qname>:active
// KEYS[3] -> asynq:<qname>:scheduled
// KEYS[4] -> asynq:<qname>:retry
// KEYS[5] -> asynq:<qname>:archived
// KEYS[6] -> asynq:<qname>:completed
// KEYS[7] -> asynq:<qname>:processed:<yyyy-mm-dd>
// KEYS[8] -> asynq:<qname>:failed:<yyyy-mm-dd>
// KEYS[9] -> asynq:<qname>:paused
// KEYS[1] -> asynq:<qname>:pending
// KEYS[2] -> asynq:<qname>:active
// KEYS[3] -> asynq:<qname>:scheduled
// KEYS[4] -> asynq:<qname>:retry
// KEYS[5] -> asynq:<qname>:archived
// KEYS[6] -> asynq:<qname>:completed
// KEYS[7] -> asynq:<qname>:processed:<yyyy-mm-dd>
// KEYS[8] -> asynq:<qname>:failed:<yyyy-mm-dd>
// KEYS[9] -> asynq:<qname>:processed
// KEYS[10] -> asynq:<qname>:failed
// KEYS[11] -> asynq:<qname>:paused
//
// ARGV[1] -> task key prefix
var currentStatsCmd = redis.NewScript(`
Expand All @@ -91,22 +100,17 @@ table.insert(res, KEYS[5])
table.insert(res, redis.call("ZCARD", KEYS[5]))
table.insert(res, KEYS[6])
table.insert(res, redis.call("ZCARD", KEYS[6]))
local pcount = 0
local p = redis.call("GET", KEYS[7])
if p then
pcount = tonumber(p)
end
table.insert(res, KEYS[7])
table.insert(res, pcount)
local fcount = 0
local f = redis.call("GET", KEYS[8])
if f then
fcount = tonumber(f)
for i=7,10 do
local count = 0
local n = redis.call("GET", KEYS[i])
if n then
count = tonumber(n)
end
table.insert(res, KEYS[i])
table.insert(res, count)
end
table.insert(res, KEYS[8])
table.insert(res, fcount)
table.insert(res, KEYS[9])
table.insert(res, redis.call("EXISTS", KEYS[9]))
table.insert(res, KEYS[11])
table.insert(res, redis.call("EXISTS", KEYS[11]))
table.insert(res, "oldest_pending_since")
if pendingTaskCount > 0 then
local id = redis.call("LRANGE", KEYS[1], -1, -1)[1]
Expand Down Expand Up @@ -136,6 +140,8 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
base.CompletedKey(qname),
base.ProcessedKey(qname, now),
base.FailedKey(qname, now),
base.ProcessedTotalKey(qname),
base.FailedTotalKey(qname),
base.PausedKey(qname),
}, base.TaskKeyPrefix(qname)).Result()
if err != nil {
Expand Down Expand Up @@ -176,6 +182,10 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
stats.Processed = val
case base.FailedKey(qname, now):
stats.Failed = val
case base.ProcessedTotalKey(qname):
stats.ProcessedTotal = val
case base.FailedTotalKey(qname):
stats.FailedTotal = val
case base.PausedKey(qname):
if val == 0 {
stats.Paused = false
Expand Down
Loading

0 comments on commit 82d18e3

Please sign in to comment.