Skip to content

Commit

Permalink
[3185] Unify pipeline tag on prometheus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
pwilczynskiclearcode committed Oct 21, 2024
1 parent d5f0db7 commit e94b188
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 16 deletions.
25 changes: 16 additions & 9 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -1701,7 +1701,7 @@ func MaxTranscodingPrice(maxPrice *big.Rat) {
func MaxPriceForCapability(cap string, modelName string, maxPrice *big.Rat) {
floatWei, _ := maxPrice.Float64()
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kPipeline, cap), tag.Insert(census.kModelName, modelName)},
[]tag.Mutator{tag.Insert(census.kPipeline, normalizePipelineTag(cap)), tag.Insert(census.kModelName, modelName)},

Check warning on line 1704 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1704

Added line #L1704 was not covered by tests
census.mPricePerCapability.M(floatWei)); err != nil {

glog.Errorf("Error recording metrics err=%q", err)
Expand Down Expand Up @@ -1851,9 +1851,10 @@ func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string

// AIRequestFinished records gateway AI job request metrics.
func AIRequestFinished(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) {
census.recordModelRequested(pipeline, model)
census.recordAIRequestLatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo)
census.recordAIRequestPricePerUnit(pipeline, model, jobInfo.PricePerUnit)
pipelineTag := normalizePipelineTag(pipeline)
census.recordModelRequested(pipelineTag, model)
census.recordAIRequestLatencyScore(pipelineTag, model, jobInfo.LatencyScore, orchInfo)
census.recordAIRequestPricePerUnit(pipelineTag, model, jobInfo.PricePerUnit)

Check warning on line 1857 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1854-L1857

Added lines #L1854 - L1857 were not covered by tests
}

// recordAIRequestLatencyScore records the latency score for a AI job request.
Expand Down Expand Up @@ -1891,7 +1892,7 @@ func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.
orchAddr = common.BytesToAddress(addr).String()
}

tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}
tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, normalizePipelineTag(Pipeline)), tag.Insert(census.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}

Check warning on line 1895 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1895

Added line #L1895 was not covered by tests
capabilities := orchInfo.GetCapabilities()
if capabilities != nil {
tags = append(tags, tag.Insert(census.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion()))
Expand All @@ -1904,9 +1905,11 @@ func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.

// AIJobProcessed records orchestrator AI job processing metrics.
func AIJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo) {
census.recordModelRequested(pipeline, model)
census.recordAIJobLatencyScore(pipeline, model, jobInfo.LatencyScore)
census.recordAIJobPricePerUnit(pipeline, model, jobInfo.PricePerUnit)
pipelineTag := normalizePipelineTag(pipeline)

census.recordModelRequested(pipelineTag, model)
census.recordAIJobLatencyScore(pipelineTag, model, jobInfo.LatencyScore)
census.recordAIJobPricePerUnit(pipelineTag, model, jobInfo.PricePerUnit)

Check warning on line 1912 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1908-L1912

Added lines #L1908 - L1912 were not covered by tests
}

// recordAIJobLatencyScore records the latency score for a processed AI job.
Expand Down Expand Up @@ -1936,7 +1939,7 @@ func (cen *censusMetricsCounter) recordAIJobPricePerUnit(Pipeline string, Model
// AIProcessingError logs errors in orchestrator AI job processing.
func AIProcessingError(code string, Pipeline string, Model string, sender string) {
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kSender, sender)},
[]tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, normalizePipelineTag(Pipeline)), tag.Insert(census.kModelName, Model), tag.Insert(census.kSender, sender)},

Check warning on line 1942 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1942

Added line #L1942 was not covered by tests
census.mAIRequestError.M(1)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
Expand Down Expand Up @@ -1999,3 +2002,7 @@ func FastVerificationFailed(ctx context.Context, uri string, errtype int) {
clog.Errorf(ctx, "Error recording metrics err=%q", err)
}
}

func normalizePipelineTag(pipeline string) string {
return strings.Replace(strings.ToLower(pipeline), " ", "-", -1)

Check warning on line 2007 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L2006-L2007

Added lines #L2006 - L2007 were not covered by tests
}
14 changes: 7 additions & 7 deletions server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,30 +681,30 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A
mw, err := worker.NewSegmentAnything2MultipartWriter(&buf, req)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)

Check warning on line 684 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L684

Added line #L684 was not covered by tests
}
return nil, err
}

client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient))
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)

Check warning on line 692 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L692

Added line #L692 was not covered by tests
}
return nil, err
}

imageRdr, err := req.Image.Reader()
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)

Check warning on line 700 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L700

Added line #L700 was not covered by tests
}
return nil, err
}
config, _, err := image.DecodeConfig(imageRdr)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)

Check warning on line 707 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L707

Added line #L707 was not covered by tests
}
return nil, err
}
Expand All @@ -713,7 +713,7 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A
setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, outPixels)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)

Check warning on line 716 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L716

Added line #L716 was not covered by tests
}
return nil, err
}
Expand All @@ -724,7 +724,7 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A
took := time.Since(start)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)

Check warning on line 727 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L727

Added line #L727 was not covered by tests
}
return nil, err
}
Expand All @@ -748,7 +748,7 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A
pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit)
}

monitor.AIRequestFinished(ctx, "segment anything 2", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)
monitor.AIRequestFinished(ctx, "segment-anything-2", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)

Check warning on line 751 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L751

Added line #L751 was not covered by tests
}

return resp.JSON200, nil
Expand Down

0 comments on commit e94b188

Please sign in to comment.