Skip to content

Commit

Permalink
Record Metrics for Reminder
Browse files Browse the repository at this point in the history
Signed-off-by: Vyom Yadav <jackhammervyom@gmail.com>
  • Loading branch information
Vyom-Yadav committed Oct 27, 2024
1 parent 1bc9af5 commit eab51a3
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 6 deletions.
84 changes: 84 additions & 0 deletions internal/reminder/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package metrics

Check failure on line 1 in internal/reminder/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

package-comments: should have a package comment (revive)

Check failure on line 1 in internal/reminder/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

package-comments: should have a package comment (revive)

import (
"context"

Check failure on line 4 in internal/reminder/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

File is not properly formatted (gci)
"go.opentelemetry.io/otel/metric"
)

// Default bucket boundaries in seconds for the send delay histogram
var sendDelayBuckets = []float64{
0, // immediate
10, // 10 seconds
20, // 20 seconds
40, // 40 seconds
80, // 1m 20s
160, // 2m 40s
320, // 5m 20s
640, // 10m 40s
1280, // 21m 20s
}

type Metrics struct {

Check failure on line 21 in internal/reminder/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

exported: exported type Metrics should have comment or be unexported (revive)

Check failure on line 21 in internal/reminder/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

exported: exported type Metrics should have comment or be unexported (revive)
// Time between when a reminder became eligible and when it was sent
SendDelay metric.Float64Histogram

// Current number of reminders in the batch
BatchSize metric.Int64Gauge

// Average batch size (updated on each batch)
AvgBatchSize metric.Float64Gauge

// For tracking average calculation
// TODO: consider persisting this to avoid reset on restart (maybe)
totalBatches int64
totalReminders int64
}

func NewMetrics(meter metric.Meter) (*Metrics, error) {

Check failure on line 37 in internal/reminder/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

exported: exported function NewMetrics should have comment or be unexported (revive)

Check failure on line 37 in internal/reminder/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

exported: exported function NewMetrics should have comment or be unexported (revive)
sendDelay, err := meter.Float64Histogram(
"reminder_send_delay",
metric.WithDescription("Time between reminder becoming eligible and actual send (seconds)"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(sendDelayBuckets...),
)
if err != nil {
return nil, err
}

batchSize, err := meter.Int64Gauge(
"reminder_batch_size",
metric.WithDescription("Current number of reminders in the batch"),
)
if err != nil {
return nil, err
}

avgBatchSize, err := meter.Float64Gauge(
"reminder_avg_batch_size",
metric.WithDescription("Average number of reminders per batch"),
)
if err != nil {
return nil, err
}

return &Metrics{
SendDelay: sendDelay,
BatchSize: batchSize,
AvgBatchSize: avgBatchSize,
}, nil
}

func (m *Metrics) RecordBatch(ctx context.Context, size int64) {

Check failure on line 71 in internal/reminder/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

exported: exported method Metrics.RecordBatch should have comment or be unexported (revive)

Check failure on line 71 in internal/reminder/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

exported: exported method Metrics.RecordBatch should have comment or be unexported (revive)
// Update current batch size
m.BatchSize.Record(ctx, size)

// Update running average
m.totalBatches++
m.totalReminders += size
avgSize := float64(m.totalReminders) / float64(m.totalBatches)
m.AvgBatchSize.Record(ctx, avgSize)
}

func (m *Metrics) RecordSendDelay(ctx context.Context, delaySeconds float64) {

Check failure on line 82 in internal/reminder/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

exported: exported method Metrics.RecordSendDelay should have comment or be unexported (revive)

Check failure on line 82 in internal/reminder/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

exported: exported method Metrics.RecordSendDelay should have comment or be unexported (revive)
m.SendDelay.Record(ctx, delaySeconds)
}
132 changes: 132 additions & 0 deletions internal/reminder/metrics/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package metrics

import (
"context"
"errors"
"fmt"
"github.com/mindersec/minder/pkg/config/reminder"

Check failure on line 7 in internal/reminder/metrics/provider.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

File is not properly formatted (gci)
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/prometheus"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"

Check failure on line 17 in internal/reminder/metrics/provider.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

File is not properly formatted (gci)
)

const (
metricsPath = "/metrics"
readHeaderTimeout = 2 * time.Second
)

// Provider manages the metrics server and OpenTelemetry setup
type Provider struct {
server *http.Server
mp *sdkmetric.MeterProvider
metrics *Metrics
}

// NewProvider creates a new metrics provider
func NewProvider(cfg *reminder.MetricsConfig) (*Provider, error) {
if cfg == nil {
return nil, errors.New("metrics config is nil")
}

if !cfg.Enabled {
return &Provider{}, nil
}

// Create Prometheus exporter
prometheusExporter, err := prometheus.New(
prometheus.WithNamespace("reminder_service"),
)
if err != nil {
return nil, fmt.Errorf("failed to create Prometheus exporter: %w", err)
}

// Create resource with service information
res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("reminder-service"),
semconv.ServiceVersion("v0.1.0"),
)

// Create meter provider
mp := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(prometheusExporter),
sdkmetric.WithResource(res),
)

// Set global meter provider
otel.SetMeterProvider(mp)

// Create metrics
meter := mp.Meter("reminder-service")
metrics, err := NewMetrics(meter)
if err != nil {
return nil, fmt.Errorf("failed to create metrics: %w", err)
}

// Create HTTP server
mux := http.NewServeMux()
mux.Handle(metricsPath, promhttp.Handler())

server := &http.Server{
Addr: fmt.Sprintf("%s:%d", cfg.Host, cfg.Port),
Handler: mux,
ReadHeaderTimeout: readHeaderTimeout,
}

return &Provider{
server: server,
mp: mp,
metrics: metrics,
}, nil
}

// Start starts the metrics server if enabled
func (p *Provider) Start(ctx context.Context) error {
if p.server == nil {
return nil // Metrics disabled
}

errCh := make(chan error)
go func() {
log.Info().Str("address", p.server.Addr).Msg("Starting metrics server")
if err := p.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- fmt.Errorf("metrics server error: %w", err)
}
}()

select {
case err := <-errCh:
return err
case <-ctx.Done():
return p.Shutdown(ctx)
}
}

// Shutdown gracefully shuts down the metrics server
func (p *Provider) Shutdown(ctx context.Context) error {
if p.server == nil {
return nil
}

shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

log.Info().Msg("Shutting down metrics server")
if err := p.mp.Shutdown(shutdownCtx); err != nil {
log.Error().Err(err).Msg("Error shutting down meter provider")
}

return p.server.Shutdown(shutdownCtx)
}

// Metrics returns the metrics instance
func (p *Provider) Metrics() *Metrics {
return p.metrics
}
42 changes: 36 additions & 6 deletions internal/reminder/reminder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
"github.com/mindersec/minder/internal/reminder/metrics"
"sync"
"time"

Expand Down Expand Up @@ -44,6 +45,8 @@ type reminder struct {

eventPublisher message.Publisher
eventDBCloser common.DriverCloser

metricsProvider *metrics.Provider
}

// NewReminder creates a new reminder instance
Expand All @@ -66,6 +69,13 @@ func NewReminder(ctx context.Context, store db.Store, config *reminderconfig.Con

r.eventPublisher = pub
r.eventDBCloser = cl

metricsProvider, err := metrics.NewProvider(&config.MetricsConfig)
if err != nil {
return nil, fmt.Errorf("error creating metrics provider: %w", err)
}

r.metricsProvider = metricsProvider
return r, nil
}

Expand All @@ -78,6 +88,11 @@ func (r *reminder) Start(ctx context.Context) error {
default:
}

err := r.metricsProvider.Start(ctx)
if err != nil {
return fmt.Errorf("error starting metrics provider: %w", err)
}

interval := r.cfg.RecurrenceConfig.Interval
if interval <= 0 {
return fmt.Errorf("invalid interval: %s", r.cfg.RecurrenceConfig.Interval)
Expand Down Expand Up @@ -119,6 +134,11 @@ func (r *reminder) Stop() {
r.stopOnce.Do(func() {
close(r.stop)
r.eventDBCloser()

err := r.metricsProvider.Shutdown(context.Background())
if err != nil {
zerolog.Ctx(context.Background()).Error().Err(err).Msg("error shutting down metrics provider")
}
})
}

Expand All @@ -143,6 +163,11 @@ func (r *reminder) sendReminders(ctx context.Context) error {
return fmt.Errorf("error creating reminder messages: %w", err)
}

remMetrics := r.metricsProvider.Metrics()
if remMetrics != nil {
remMetrics.RecordBatch(ctx, int64(len(repos)))
}

err = r.eventPublisher.Publish(constants.TopicQueueRepoReminder, messages...)
if err != nil {
return fmt.Errorf("error publishing messages: %w", err)
Expand All @@ -151,14 +176,19 @@ func (r *reminder) sendReminders(ctx context.Context) error {
repoIds := make([]uuid.UUID, len(repos))
for _, repo := range repos {
repoIds = append(repoIds, repo.ID)
if remMetrics != nil {
// sendDelay = Now() - ReminderLastSent - MinElapsed
reminderLastSent := repo.ReminderLastSent
if reminderLastSent.Valid {
remMetrics.SendDelay.Record(ctx, (time.Now().Sub(reminderLastSent.Time) - r.cfg.RecurrenceConfig.MinElapsed).Seconds())

Check failure on line 183 in internal/reminder/reminder.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

S1012: should use `time.Since` instead of `time.Now().Sub` (gosimple)

Check failure on line 183 in internal/reminder/reminder.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

S1012: should use `time.Since` instead of `time.Now().Sub` (gosimple)
} else {
// TODO: Should the send delay be zero if the reminder has never been sent?
remMetrics.SendDelay.Record(ctx, 0)
//remMetrics.SendDelay.Record(ctx, r.cfg.RecurrenceConfig.MinElapsed.Seconds())
}
}
}

// TODO: Collect Metrics
// Potential metrics:
// - Gauge: Number of reminders in the current batch
// - UpDownCounter: Average reminders sent per batch
// - Histogram: reminder_last_sent time distribution

err = r.store.UpdateReminderLastSentForRepositories(ctx, repoIds)
if err != nil {
return fmt.Errorf("reminders published but error updating last sent time: %w", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/reminder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config struct {
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
EventConfig EventConfig `mapstructure:"events"`
LoggingConfig LoggingConfig `mapstructure:"logging"`
MetricsConfig MetricsConfig `mapstructure:"metrics"`
}

// Validate validates the configuration
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/reminder/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package reminder

type MetricsConfig struct {

Check failure on line 3 in pkg/config/reminder/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

exported: exported type MetricsConfig should have comment or be unexported (revive)

Check failure on line 3 in pkg/config/reminder/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

exported: exported type MetricsConfig should have comment or be unexported (revive)
Enabled bool `mapstructure:"enabled" default:"true"`
Host string `mapstructure:"host" default:"127.0.0.1"`
Port int `mapstructure:"port" default:"8080"`
}

0 comments on commit eab51a3

Please sign in to comment.