diff --git a/monitor/census.go b/monitor/census.go index 00abd8b5e..98f69688c 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -1701,7 +1701,7 @@ func MaxTranscodingPrice(maxPrice *big.Rat) { func MaxPriceForCapability(cap string, modelName string, maxPrice *big.Rat) { floatWei, _ := maxPrice.Float64() if err := stats.RecordWithTags(census.ctx, - []tag.Mutator{tag.Insert(census.kPipeline, cap), tag.Insert(census.kModelName, modelName)}, + []tag.Mutator{tag.Insert(census.kPipeline, normalizePipelineTag(cap)), tag.Insert(census.kModelName, modelName)}, census.mPricePerCapability.M(floatWei)); err != nil { glog.Errorf("Error recording metrics err=%q", err) @@ -1851,9 +1851,10 @@ func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string // AIRequestFinished records gateway AI job request metrics. func AIRequestFinished(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) { - census.recordModelRequested(pipeline, model) - census.recordAIRequestLatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo) - census.recordAIRequestPricePerUnit(pipeline, model, jobInfo.PricePerUnit) + pipelineTag := normalizePipelineTag(pipeline) + census.recordModelRequested(pipelineTag, model) + census.recordAIRequestLatencyScore(pipelineTag, model, jobInfo.LatencyScore, orchInfo) + census.recordAIRequestPricePerUnit(pipelineTag, model, jobInfo.PricePerUnit) } // recordAIRequestLatencyScore records the latency score for a AI job request. @@ -1891,7 +1892,7 @@ func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet. orchAddr = common.BytesToAddress(addr).String() } - tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)} + tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, normalizePipelineTag(Pipeline)), tag.Insert(census.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)} capabilities := orchInfo.GetCapabilities() if capabilities != nil { tags = append(tags, tag.Insert(census.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion())) @@ -1904,9 +1905,11 @@ func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet. // AIJobProcessed records orchestrator AI job processing metrics. func AIJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo) { - census.recordModelRequested(pipeline, model) - census.recordAIJobLatencyScore(pipeline, model, jobInfo.LatencyScore) - census.recordAIJobPricePerUnit(pipeline, model, jobInfo.PricePerUnit) + pipelineTag := normalizePipelineTag(pipeline) + + census.recordModelRequested(pipelineTag, model) + census.recordAIJobLatencyScore(pipelineTag, model, jobInfo.LatencyScore) + census.recordAIJobPricePerUnit(pipelineTag, model, jobInfo.PricePerUnit) } // recordAIJobLatencyScore records the latency score for a processed AI job. @@ -1936,7 +1939,7 @@ func (cen *censusMetricsCounter) recordAIJobPricePerUnit(Pipeline string, Model // AIProcessingError logs errors in orchestrator AI job processing. func AIProcessingError(code string, Pipeline string, Model string, sender string) { if err := stats.RecordWithTags(census.ctx, - []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kSender, sender)}, + []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, normalizePipelineTag(Pipeline)), tag.Insert(census.kModelName, Model), tag.Insert(census.kSender, sender)}, census.mAIRequestError.M(1)); err != nil { glog.Errorf("Error recording metrics err=%q", err) } @@ -1999,3 +2002,7 @@ func FastVerificationFailed(ctx context.Context, uri string, errtype int) { clog.Errorf(ctx, "Error recording metrics err=%q", err) } } + +func normalizePipelineTag(pipeline string) string { + return strings.Replace(strings.ToLower(pipeline), " ", "-", -1) +} diff --git a/server/ai_process.go b/server/ai_process.go index 673af3ec3..e7f614ae7 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -681,7 +681,7 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A mw, err := worker.NewSegmentAnything2MultipartWriter(&buf, req) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo) + monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -689,7 +689,7 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo) + monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -697,14 +697,14 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A imageRdr, err := req.Image.Reader() if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo) + monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo) } return nil, err } config, _, err := image.DecodeConfig(imageRdr) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo) + monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -713,7 +713,7 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, outPixels) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo) + monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -724,7 +724,7 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A took := time.Since(start) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo) + monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -748,7 +748,7 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit) } - monitor.AIRequestFinished(ctx, "segment anything 2", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo) + monitor.AIRequestFinished(ctx, "segment-anything-2", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo) } return resp.JSON200, nil