Skip to content

Commit

Permalink
feat: add default sampling rate to mlobs logger
Browse files Browse the repository at this point in the history
  • Loading branch information
khorshuheng committed Apr 3, 2024
1 parent c1039ae commit 1b637dd
Showing 1 changed file with 19 additions and 11 deletions.
30 changes: 19 additions & 11 deletions api/pkg/inference-logger/logger/mlobs_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"errors"
"fmt"

"golang.org/x/exp/rand"

upiv1 "github.com/caraml-dev/universal-prediction-interface/gen/go/grpc/caraml/upi/v1"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"go.uber.org/zap"
Expand All @@ -17,6 +19,10 @@ var (
ErrMalformedLogEntry = errors.New("malformed log entry")
)

const (
SamplingRate = 0.02
)

type MLObsSink struct {
logger *zap.SugaredLogger
producer KafkaProducer
Expand Down Expand Up @@ -160,17 +166,19 @@ func (m *MLObsSink) buildNewKafkaMessage(predictionLog *upiv1.PredictionLog) (*k

func (m *MLObsSink) Sink(rawLogEntries []*LogEntry) error {
for _, rawLogEntry := range rawLogEntries {
predictionLog, err := m.newPredictionLog(rawLogEntry)
if err != nil {
m.logger.Errorf("unable to convert log entry: %v", err)
}
kafkaMessage, err := m.buildNewKafkaMessage(predictionLog)
if err != nil {
m.logger.Errorf("unable to build kafka message: %v", err)
}
err = m.producer.Produce(kafkaMessage, m.producer.Events())
if err != nil {
m.logger.Errorf("unable to produce kafka message: %v", err)
if rand.Float64() <= SamplingRate {
predictionLog, err := m.newPredictionLog(rawLogEntry)
if err != nil {
m.logger.Errorf("unable to convert log entry: %v", err)
}
kafkaMessage, err := m.buildNewKafkaMessage(predictionLog)
if err != nil {
m.logger.Errorf("unable to build kafka message: %v", err)
}
err = m.producer.Produce(kafkaMessage, m.producer.Events())
if err != nil {
m.logger.Errorf("unable to produce kafka message: %v", err)
}
}
}
return nil
Expand Down

0 comments on commit 1b637dd

Please sign in to comment.