Skip to content

Commit

Permalink
feat: add default sampling rate to mlobs logger
Browse files Browse the repository at this point in the history
  • Loading branch information
khorshuheng committed Apr 3, 2024
1 parent c1039ae commit 1b52605
Showing 1 changed file with 18 additions and 11 deletions.
29 changes: 18 additions & 11 deletions api/pkg/inference-logger/logger/mlobs_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"

Check failure on line 7 in api/pkg/inference-logger/logger/mlobs_sink.go

View workflow job for this annotation

GitHub Actions / lint-api

File is not `goimports`-ed (goimports)
"golang.org/x/exp/rand"

upiv1 "github.com/caraml-dev/universal-prediction-interface/gen/go/grpc/caraml/upi/v1"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
Expand All @@ -17,6 +18,10 @@ var (
ErrMalformedLogEntry = errors.New("malformed log entry")
)

const (
SamplingRate = 0.02
)

type MLObsSink struct {
logger *zap.SugaredLogger
producer KafkaProducer
Expand Down Expand Up @@ -160,17 +165,19 @@ func (m *MLObsSink) buildNewKafkaMessage(predictionLog *upiv1.PredictionLog) (*k

func (m *MLObsSink) Sink(rawLogEntries []*LogEntry) error {
for _, rawLogEntry := range rawLogEntries {
predictionLog, err := m.newPredictionLog(rawLogEntry)
if err != nil {
m.logger.Errorf("unable to convert log entry: %v", err)
}
kafkaMessage, err := m.buildNewKafkaMessage(predictionLog)
if err != nil {
m.logger.Errorf("unable to build kafka message: %v", err)
}
err = m.producer.Produce(kafkaMessage, m.producer.Events())
if err != nil {
m.logger.Errorf("unable to produce kafka message: %v", err)
if rand.Float64() <= SamplingRate {
predictionLog, err := m.newPredictionLog(rawLogEntry)
if err != nil {
m.logger.Errorf("unable to convert log entry: %v", err)
}
kafkaMessage, err := m.buildNewKafkaMessage(predictionLog)
if err != nil {
m.logger.Errorf("unable to build kafka message: %v", err)
}
err = m.producer.Produce(kafkaMessage, m.producer.Events())
if err != nil {
m.logger.Errorf("unable to produce kafka message: %v", err)
}
}
}
return nil
Expand Down

0 comments on commit 1b52605

Please sign in to comment.