Skip to content

Commit

Permalink
Add metric taskDurationVec
Browse files Browse the repository at this point in the history
b670ab24dfda6e1c3d66893654672af222ea6123
  • Loading branch information
xRaiMaNx committed Aug 22, 2024
1 parent ec7a24c commit b08c445
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions internal/cms/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ func (c *TaskProcessorConfig) UnmarshalYAML(unmarshal func(interface{}) error) e
const (
labelAction = "action"
labelProcessingState = "processing_state"
labelTask = "task"
labelHost = "host"
)

// TaskProcessor is an actual CMS implementation.
Expand Down Expand Up @@ -312,6 +314,7 @@ type TaskProcessor struct {
taskProcessingLoop metrics.Timer
tasksInProcess metrics.GaugeVec
gpuTasksInProcess metrics.GaugeVec
taskDurationVec metrics.TimerVec
taskProcessingDuration metrics.Timer

chunkIntegrityCheckErrors metrics.Counter
Expand Down Expand Up @@ -375,6 +378,7 @@ func (p *TaskProcessor) resetReservePool(tasks []*models.Task) {

func (p *TaskProcessor) ResetLeaderMetrics() {
p.reservePoolCPULimit.Set(0)
p.taskDurationVec.Reset()
}

func (p *TaskProcessor) initRateLimiter(tasks []*models.Task) {
Expand Down Expand Up @@ -457,6 +461,7 @@ func (p *TaskProcessor) ensureState(ctx context.Context) error {
func (p *TaskProcessor) RegisterMetrics(r metrics.Registry) {
p.taskProcessingLoop = r.Timer("task_processing_loop")
p.tasksInProcess = r.GaugeVec("tasks_in_process", []string{labelAction, labelProcessingState})
p.taskDurationVec = r.TimerVec("task_duration_vec", []string{labelTask, labelHost, labelProcessingState})
p.gpuTasksInProcess = r.GaugeVec("gpu_tasks_in_process", []string{labelAction, labelProcessingState})
p.taskProcessingDuration = r.DurationHistogram("task_processing_duration", metrics.NewDurationBuckets(
time.Minute,
Expand Down Expand Up @@ -490,6 +495,7 @@ func (p *TaskProcessor) RegisterMetrics(r metrics.Registry) {
}

func (p *TaskProcessor) resetLoopMetrics() {
p.taskDurationVec.Reset()
for _, action := range walle.HostActions {
for _, state := range models.TaskProcessingStates {
p.tasksInProcess.With(map[string]string{
Expand All @@ -504,6 +510,29 @@ func (p *TaskProcessor) resetLoopMetrics() {
}
}

func (p *TaskProcessor) initLoopMetrics(tasks []*models.Task) {
for _, t := range tasks {
p.tasksInProcess.With(map[string]string{
labelAction: string(t.Action),
labelProcessingState: string(t.ProcessingState),
}).Add(1.0)
if p.isGPUTask(t) {
p.gpuTasksInProcess.With(map[string]string{
labelAction: string(t.Action),
labelProcessingState: string(t.ProcessingState),
}).Add(1.0)
}

if t.Hosts != nil {
p.taskDurationVec.With(map[string]string{
labelTask: string(t.ID),
labelHost: t.Hosts[0],
labelProcessingState: string(t.ProcessingState),
}).RecordDuration(time.Since(time.Time(t.CreatedAt)))
}
}
}

func (p *TaskProcessor) Conf() *TaskProcessorConfig {
p.confLock.Lock()
defer p.confLock.Unlock()
Expand Down Expand Up @@ -648,18 +677,7 @@ func (p *TaskProcessor) updateTasks(ctx context.Context) error {
p.logClusterState(tasks)

p.resetLoopMetrics()
for _, t := range tasks {
p.tasksInProcess.With(map[string]string{
labelAction: string(t.Action),
labelProcessingState: string(t.ProcessingState),
}).Add(1.0)
if p.isGPUTask(t) {
p.gpuTasksInProcess.With(map[string]string{
labelAction: string(t.Action),
labelProcessingState: string(t.ProcessingState),
}).Add(1.0)
}
}
p.initLoopMetrics(tasks)

p.taskCache, err = models.NewTaskCache(tasks)
if err != nil {
Expand Down

0 comments on commit b08c445

Please sign in to comment.