Skip to content

Commit

Permalink
feat: add default sampling rate to mlobs logger
Browse files Browse the repository at this point in the history
  • Loading branch information
khorshuheng committed Apr 4, 2024
1 parent 1381a0b commit 252d6ae
Showing 1 changed file with 18 additions and 11 deletions.
29 changes: 18 additions & 11 deletions api/pkg/inference-logger/logger/mlobs_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math/rand"

"google.golang.org/protobuf/types/known/timestamppb"

Expand All @@ -19,6 +20,10 @@ var (
ErrMalformedLogEntry = errors.New("malformed log entry")
)

const (
SamplingRate = 0.02
)

type MLObsSink struct {
logger *zap.SugaredLogger
producer KafkaProducer
Expand Down Expand Up @@ -163,17 +168,19 @@ func (m *MLObsSink) buildNewKafkaMessage(predictionLog *upiv1.PredictionLog) (*k

func (m *MLObsSink) Sink(rawLogEntries []*LogEntry) error {
for _, rawLogEntry := range rawLogEntries {
predictionLog, err := m.newPredictionLog(rawLogEntry)
if err != nil {
m.logger.Errorf("unable to convert log entry: %v", err)
}
kafkaMessage, err := m.buildNewKafkaMessage(predictionLog)
if err != nil {
m.logger.Errorf("unable to build kafka message: %v", err)
}
err = m.producer.Produce(kafkaMessage, m.producer.Events())
if err != nil {
m.logger.Errorf("unable to produce kafka message: %v", err)
if rand.Float64() <= SamplingRate {
predictionLog, err := m.newPredictionLog(rawLogEntry)
if err != nil {
m.logger.Errorf("unable to convert log entry: %v", err)
}
kafkaMessage, err := m.buildNewKafkaMessage(predictionLog)
if err != nil {
m.logger.Errorf("unable to build kafka message: %v", err)
}
err = m.producer.Produce(kafkaMessage, m.producer.Events())
if err != nil {
m.logger.Errorf("unable to produce kafka message: %v", err)
}
}
}
return nil
Expand Down

0 comments on commit 252d6ae

Please sign in to comment.