-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Record Metrics for Reminder #4831
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
package metrics | ||
|
||
import ( | ||
"context" | ||
"go.opentelemetry.io/otel/metric" | ||
) | ||
|
||
// Default bucket boundaries in seconds for the send delay histogram | ||
var sendDelayBuckets = []float64{ | ||
0, // immediate | ||
10, // 10 seconds | ||
20, // 20 seconds | ||
40, // 40 seconds | ||
80, // 1m 20s | ||
160, // 2m 40s | ||
320, // 5m 20s | ||
640, // 10m 40s | ||
1280, // 21m 20s | ||
} | ||
|
||
type Metrics struct { | ||
// Time between when a reminder became eligible and when it was sent | ||
SendDelay metric.Float64Histogram | ||
|
||
// Current number of reminders in the batch | ||
BatchSize metric.Int64Gauge | ||
Comment on lines
+25
to
+26
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use an |
||
|
||
// Average batch size (updated on each batch) | ||
AvgBatchSize metric.Float64Gauge | ||
|
||
// For tracking average calculation | ||
// TODO: consider persisting this to avoid reset on restart (maybe) | ||
totalBatches int64 | ||
totalReminders int64 | ||
Comment on lines
+31
to
+34
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than storing these as int64s, why not use an |
||
} | ||
|
||
func NewMetrics(meter metric.Meter) (*Metrics, error) { | ||
sendDelay, err := meter.Float64Histogram( | ||
"reminder_send_delay", | ||
metric.WithDescription("Time between reminder becoming eligible and actual send (seconds)"), | ||
metric.WithUnit("s"), | ||
metric.WithExplicitBucketBoundaries(sendDelayBuckets...), | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
batchSize, err := meter.Int64Gauge( | ||
"reminder_batch_size", | ||
metric.WithDescription("Current number of reminders in the batch"), | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
avgBatchSize, err := meter.Float64Gauge( | ||
"reminder_avg_batch_size", | ||
metric.WithDescription("Average number of reminders per batch"), | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &Metrics{ | ||
SendDelay: sendDelay, | ||
BatchSize: batchSize, | ||
AvgBatchSize: avgBatchSize, | ||
}, nil | ||
} | ||
|
||
func (m *Metrics) RecordBatch(ctx context.Context, size int64) { | ||
// Update current batch size | ||
m.BatchSize.Record(ctx, size) | ||
|
||
// Update running average | ||
m.totalBatches++ | ||
m.totalReminders += size | ||
Comment on lines
+75
to
+77
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should aim to get the metrics code to do this for you automatically -- in particular, if we end up with two |
||
avgSize := float64(m.totalReminders) / float64(m.totalBatches) | ||
m.AvgBatchSize.Record(ctx, avgSize) | ||
} | ||
|
||
func (m *Metrics) RecordSendDelay(ctx context.Context, delaySeconds float64) { | ||
m.SendDelay.Record(ctx, delaySeconds) | ||
} | ||
Comment on lines
+82
to
+84
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like this function isn't doing anything -- you're already exposing If you transform |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,132 @@ | ||||||
package metrics | ||||||
|
||||||
import ( | ||||||
"context" | ||||||
"errors" | ||||||
"fmt" | ||||||
"github.com/mindersec/minder/pkg/config/reminder" | ||||||
"net/http" | ||||||
"time" | ||||||
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp" | ||||||
"github.com/rs/zerolog/log" | ||||||
"go.opentelemetry.io/otel" | ||||||
"go.opentelemetry.io/otel/exporters/prometheus" | ||||||
sdkmetric "go.opentelemetry.io/otel/sdk/metric" | ||||||
"go.opentelemetry.io/otel/sdk/resource" | ||||||
semconv "go.opentelemetry.io/otel/semconv/v1.17.0" | ||||||
) | ||||||
|
||||||
const ( | ||||||
metricsPath = "/metrics" | ||||||
readHeaderTimeout = 2 * time.Second | ||||||
) | ||||||
|
||||||
// Provider manages the metrics server and OpenTelemetry setup | ||||||
type Provider struct { | ||||||
server *http.Server | ||||||
mp *sdkmetric.MeterProvider | ||||||
metrics *Metrics | ||||||
} | ||||||
|
||||||
// NewProvider creates a new metrics provider | ||||||
func NewProvider(cfg *reminder.MetricsConfig) (*Provider, error) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code feels "different" than the code in |
||||||
if cfg == nil { | ||||||
return nil, errors.New("metrics config is nil") | ||||||
} | ||||||
|
||||||
if !cfg.Enabled { | ||||||
return &Provider{}, nil | ||||||
} | ||||||
|
||||||
// Create Prometheus exporter | ||||||
prometheusExporter, err := prometheus.New( | ||||||
prometheus.WithNamespace("reminder_service"), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Skip "service" here -- it's implicit in the context, and the namespace may get stuck onto a bunch of metric names, and make them even longer and more difficult to read. 😁 |
||||||
) | ||||||
if err != nil { | ||||||
return nil, fmt.Errorf("failed to create Prometheus exporter: %w", err) | ||||||
} | ||||||
|
||||||
// Create resource with service information | ||||||
res := resource.NewWithAttributes( | ||||||
semconv.SchemaURL, | ||||||
semconv.ServiceName("reminder-service"), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto on removing "-service" noise.
Suggested change
|
||||||
semconv.ServiceVersion("v0.1.0"), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you copy over the:
if we're going to include it. 😁 (I think this was my fault.) |
||||||
) | ||||||
|
||||||
// Create meter provider | ||||||
mp := sdkmetric.NewMeterProvider( | ||||||
sdkmetric.WithReader(prometheusExporter), | ||||||
sdkmetric.WithResource(res), | ||||||
) | ||||||
|
||||||
// Set global meter provider | ||||||
otel.SetMeterProvider(mp) | ||||||
|
||||||
// Create metrics | ||||||
meter := mp.Meter("reminder-service") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
metrics, err := NewMetrics(meter) | ||||||
if err != nil { | ||||||
return nil, fmt.Errorf("failed to create metrics: %w", err) | ||||||
} | ||||||
Comment on lines
+66
to
+71
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you break the setup out into a static function, I'd probably have the calling reminder code look like:
That separates the "system setup" of the metrics exporter from the provisioning of metrics for the main reminder loop, and makes it easier in the future to test the metrics without accidentally setting up a whole prometheus exporter. |
||||||
|
||||||
// Create HTTP server | ||||||
mux := http.NewServeMux() | ||||||
mux.Handle(metricsPath, promhttp.Handler()) | ||||||
|
||||||
server := &http.Server{ | ||||||
Addr: fmt.Sprintf("%s:%d", cfg.Host, cfg.Port), | ||||||
Handler: mux, | ||||||
ReadHeaderTimeout: readHeaderTimeout, | ||||||
} | ||||||
|
||||||
return &Provider{ | ||||||
server: server, | ||||||
mp: mp, | ||||||
metrics: metrics, | ||||||
}, nil | ||||||
} | ||||||
|
||||||
// Start starts the metrics server if enabled | ||||||
func (p *Provider) Start(ctx context.Context) error { | ||||||
if p.server == nil { | ||||||
return nil // Metrics disabled | ||||||
} | ||||||
|
||||||
errCh := make(chan error) | ||||||
go func() { | ||||||
log.Info().Str("address", p.server.Addr).Msg("Starting metrics server") | ||||||
if err := p.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { | ||||||
errCh <- fmt.Errorf("metrics server error: %w", err) | ||||||
} | ||||||
}() | ||||||
|
||||||
select { | ||||||
case err := <-errCh: | ||||||
return err | ||||||
case <-ctx.Done(): | ||||||
return p.Shutdown(ctx) | ||||||
} | ||||||
} | ||||||
|
||||||
// Shutdown gracefully shuts down the metrics server | ||||||
func (p *Provider) Shutdown(ctx context.Context) error { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you make this method private, then cancelling the context to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (among other things, you can guarantee that |
||||||
if p.server == nil { | ||||||
return nil | ||||||
} | ||||||
|
||||||
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second) | ||||||
defer cancel() | ||||||
|
||||||
log.Info().Msg("Shutting down metrics server") | ||||||
if err := p.mp.Shutdown(shutdownCtx); err != nil { | ||||||
log.Error().Err(err).Msg("Error shutting down meter provider") | ||||||
} | ||||||
|
||||||
return p.server.Shutdown(shutdownCtx) | ||||||
} | ||||||
|
||||||
// Metrics returns the metrics instance | ||||||
func (p *Provider) Metrics() *Metrics { | ||||||
return p.metrics | ||||||
} | ||||||
Comment on lines
+130
to
+132
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My main suggestion is to remove the |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
"context" | ||
"errors" | ||
"fmt" | ||
"github.com/mindersec/minder/internal/reminder/metrics" | ||
"sync" | ||
"time" | ||
|
||
|
@@ -44,6 +45,8 @@ | |
|
||
eventPublisher message.Publisher | ||
eventDBCloser common.DriverCloser | ||
|
||
metricsProvider *metrics.Provider | ||
} | ||
|
||
// NewReminder creates a new reminder instance | ||
|
@@ -66,6 +69,13 @@ | |
|
||
r.eventPublisher = pub | ||
r.eventDBCloser = cl | ||
|
||
metricsProvider, err := metrics.NewProvider(&config.MetricsConfig) | ||
if err != nil { | ||
return nil, fmt.Errorf("error creating metrics provider: %w", err) | ||
} | ||
|
||
r.metricsProvider = metricsProvider | ||
return r, nil | ||
} | ||
|
||
|
@@ -78,6 +88,11 @@ | |
default: | ||
} | ||
|
||
err := r.metricsProvider.Start(ctx) | ||
if err != nil { | ||
return fmt.Errorf("error starting metrics provider: %w", err) | ||
} | ||
|
||
interval := r.cfg.RecurrenceConfig.Interval | ||
if interval <= 0 { | ||
return fmt.Errorf("invalid interval: %s", r.cfg.RecurrenceConfig.Interval) | ||
|
@@ -119,6 +134,11 @@ | |
r.stopOnce.Do(func() { | ||
close(r.stop) | ||
r.eventDBCloser() | ||
|
||
err := r.metricsProvider.Shutdown(context.Background()) | ||
if err != nil { | ||
zerolog.Ctx(context.Background()).Error().Err(err).Msg("error shutting down metrics provider") | ||
} | ||
}) | ||
} | ||
|
||
|
@@ -143,6 +163,11 @@ | |
return fmt.Errorf("error creating reminder messages: %w", err) | ||
} | ||
|
||
remMetrics := r.metricsProvider.Metrics() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels like MetricsProvider is bigger and more complicated than I'd expect. |
||
if remMetrics != nil { | ||
remMetrics.RecordBatch(ctx, int64(len(repos))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you make batches a histogram, this can just be a |
||
} | ||
|
||
err = r.eventPublisher.Publish(constants.TopicQueueRepoReminder, messages...) | ||
if err != nil { | ||
return fmt.Errorf("error publishing messages: %w", err) | ||
|
@@ -151,14 +176,19 @@ | |
repoIds := make([]uuid.UUID, len(repos)) | ||
for _, repo := range repos { | ||
repoIds = append(repoIds, repo.ID) | ||
if remMetrics != nil { | ||
// sendDelay = Now() - ReminderLastSent - MinElapsed | ||
reminderLastSent := repo.ReminderLastSent | ||
if reminderLastSent.Valid { | ||
remMetrics.SendDelay.Record(ctx, (time.Now().Sub(reminderLastSent.Time) - r.cfg.RecurrenceConfig.MinElapsed).Seconds()) | ||
} else { | ||
// TODO: Should the send delay be zero if the reminder has never been sent? | ||
remMetrics.SendDelay.Record(ctx, 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will produce a bunch of It's probably better to either export nothing here, or export it under some different counter metric ("new reminders"). |
||
//remMetrics.SendDelay.Record(ctx, r.cfg.RecurrenceConfig.MinElapsed.Seconds()) | ||
} | ||
} | ||
} | ||
|
||
// TODO: Collect Metrics | ||
// Potential metrics: | ||
// - Gauge: Number of reminders in the current batch | ||
// - UpDownCounter: Average reminders sent per batch | ||
// - Histogram: reminder_last_sent time distribution | ||
|
||
err = r.store.UpdateReminderLastSentForRepositories(ctx, repoIds) | ||
if err != nil { | ||
return fmt.Errorf("reminders published but error updating last sent time: %w", err) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package reminder | ||
|
||
type MetricsConfig struct { | ||
Enabled bool `mapstructure:"enabled" default:"true"` | ||
Host string `mapstructure:"host" default:"127.0.0.1"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You may want to leave the address as |
||
Port int `mapstructure:"port" default:"8080"` | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While powers of 2 are generally handy, I'd tend to align this more with human units, e.g.:
0, 10, 30, 60, 120, 300, 600, 900, 1800, 3600, 7200
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what frequency we'll run
reminder
at, but 2 hours seems reasonably pessimistic for now -- it's really annoying when you run into problems and your frequency distribution has always been "it's not so good".Given that our expected value for latency is 1/2 scan interval, I could even be convinced that < 1 minute is not particularly interesting (which could trim a couple buckets if you're worried about it -- I'm not!).