Skip to content

Commit

Permalink
feat: add subscribed message metrics instrumentations
Browse files Browse the repository at this point in the history
Signed-off-by: bitliu <bitliu@tencent.com>
  • Loading branch information
Xunzhuo committed Oct 26, 2023
1 parent 6b8794e commit 05a49b2
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 71 deletions.
6 changes: 4 additions & 2 deletions internal/gatewayapi/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (r *Runner) Start(ctx context.Context) (err error) {
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
message.HandleSubscription(r.ProviderResources.GatewayAPIResources.Subscribe(ctx),
func(update message.Update[string, *gatewayapi.Resources]) {
message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentGatewayAPIRunner), Resource: "gatewayapi-resources"}, r.ProviderResources.GatewayAPIResources.Subscribe(ctx),
func(update message.Update[string, *gatewayapi.Resources], errChan chan error) {
r.Logger.Info("received an update")

val := update.Value
Expand Down Expand Up @@ -93,6 +93,7 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
for key, val := range result.InfraIR {
if err := val.Validate(); err != nil {
r.Logger.Error(err, "unable to validate infra ir, skipped sending it")
errChan <- err

Check warning on line 96 in internal/gatewayapi/runner/runner.go

View check run for this annotation

Codecov / codecov/patch

internal/gatewayapi/runner/runner.go#L96

Added line #L96 was not covered by tests
} else {
r.InfraIR.Store(key, val)
newKeys = append(newKeys, key)
Expand All @@ -102,6 +103,7 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
for key, val := range result.XdsIR {
if err := val.Validate(); err != nil {
r.Logger.Error(err, "unable to validate xds ir, skipped sending it")
errChan <- err

Check warning on line 106 in internal/gatewayapi/runner/runner.go

View check run for this annotation

Codecov / codecov/patch

internal/gatewayapi/runner/runner.go#L106

Added line #L106 was not covered by tests
} else {
r.XdsIR.Store(key, val)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/globalratelimit/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ func (r *Runner) serveXdsConfigServer(ctx context.Context) {

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
// Subscribe to resources.
message.HandleSubscription(r.XdsIR.Subscribe(ctx),
func(update message.Update[string, *ir.Xds]) {
message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentGlobalRateLimitRunner), Resource: "xds-ir"}, r.XdsIR.Subscribe(ctx),
func(update message.Update[string, *ir.Xds], errChan chan error) {
r.Logger.Info("received a notification")

if update.Delete {
if err := r.addNewSnapshot(ctx, nil); err != nil {
r.Logger.Error(err, "failed to update the config snapshot")
errChan <- err
}
} else {
// Translate to ratelimit xDS Config.
Expand Down
6 changes: 4 additions & 2 deletions internal/infrastructure/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,21 @@ func (r *Runner) Start(ctx context.Context) (err error) {

func (r *Runner) subscribeToProxyInfraIR(ctx context.Context) {
// Subscribe to resources
message.HandleSubscription(r.InfraIR.Subscribe(ctx),
func(update message.Update[string, *ir.Infra]) {
message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentInfrastructureRunner), Resource: "infra-ir"}, r.InfraIR.Subscribe(ctx),
func(update message.Update[string, *ir.Infra], errChan chan error) {
r.Logger.Info("received an update")
val := update.Value

if update.Delete {
if err := r.mgr.DeleteProxyInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to delete infra")
errChan <- err
}
} else {
// Manage the proxy infra.
if err := r.mgr.CreateOrUpdateProxyInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to create new infra")
errChan <- err
}
}
},
Expand Down
22 changes: 22 additions & 0 deletions internal/message/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package message

import "github.com/envoyproxy/gateway/internal/metrics"

var (
messageDepth = metrics.NewGauge("message_depth", "Current depth of message queue.")

messageSubscribedDurationSeconds = metrics.NewHistogram("message_subscribed_duration_seconds", "How long in seconds a subscribed message is handled.", []float64{0.001, 0.01, 0.1, 1, 5, 10})

messageSubscribedTotal = metrics.NewCounter("message_subscribed_total", "Total number of subscribed message.")

messageSubscribedErrorsTotal = metrics.NewCounter("message_subscribed_errors_total", "Total number of subscribed message errors.")

runnerLabel = metrics.NewLabel("runner")

resourceLabel = metrics.NewLabel("resource")
)
46 changes: 43 additions & 3 deletions internal/message/watchutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,35 @@
package message

import (
"time"

Check failure on line 10 in internal/message/watchutil.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed with -local github.com/envoyproxy/gateway/ (goimports)
"github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/logging"
"github.com/envoyproxy/gateway/internal/metrics"
"github.com/telepresenceio/watchable"
)

type Update[K comparable, V any] watchable.Update[K, V]

var logger = logging.DefaultLogger(v1alpha1.LogLevelInfo).WithName("watchable")

type Metadata struct {
Runner string
Resource string
}

func (m Metadata) LabelValues() []metrics.LabelValue {
labels := []metrics.LabelValue{}
if m.Runner != "" {
labels = append(labels, runnerLabel.Value(m.Runner))
}
if m.Resource != "" {
labels = append(labels, resourceLabel.Value(m.Resource))
}

return labels
}

// HandleSubscription takes a channel returned by
// watchable.Map.Subscribe() (or .SubscribeSubset()), and calls the
// given function for each initial value in the map, and for any
Expand All @@ -20,20 +44,36 @@ type Update[K comparable, V any] watchable.Update[K, V]
// it handles the case where the watchable.Map already contains
// entries before .Subscribe is called.
func HandleSubscription[K comparable, V any](
meta Metadata,
subscription <-chan watchable.Snapshot[K, V],
handle func(Update[K, V]),
handle func(updateFunc Update[K, V], errChans chan error),
) {
errChans := make(chan error, 10)
go func() {
for err := range errChans {
logger.WithValues("runner", meta.Runner).Error(err, "observed an error")
messageSubscribedErrorsTotal.With(meta.LabelValues()...).Increment()
}

Check warning on line 56 in internal/message/watchutil.go

View check run for this annotation

Codecov / codecov/patch

internal/message/watchutil.go#L54-L56

Added lines #L54 - L56 were not covered by tests
}()

if snapshot, ok := <-subscription; ok {
for k, v := range snapshot.State {
startHandleTime := time.Now()
handle(Update[K, V]{
Key: k,
Value: v,
})
}, errChans)
messageSubscribedTotal.With(meta.LabelValues()...).Increment()
messageSubscribedDurationSeconds.With(meta.LabelValues()...).Record(time.Since(startHandleTime).Seconds())
}
}
for snapshot := range subscription {
messageDepth.With(meta.LabelValues()...).Record(float64(len(subscription)))
for _, update := range snapshot.Updates {
handle(Update[K, V](update))
startHandleTime := time.Now()
handle(Update[K, V](update), errChans)
messageSubscribedTotal.With(meta.LabelValues()...).Increment()
messageSubscribedDurationSeconds.With(meta.LabelValues()...).Record(time.Since(startHandleTime).Seconds())
}
}
}
8 changes: 5 additions & 3 deletions internal/message/watchutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ func TestHandleSubscriptionAlreadyClosed(t *testing.T) {

var calls int
message.HandleSubscription[string, any](
message.Metadata{Runner: "demo", Resource: "demo"},
ch,
func(message.Update[string, any]) { calls++ },
func(update message.Update[string, any], errChans chan error) { calls++ },
)
assert.Equal(t, 0, calls)
}
Expand All @@ -47,8 +48,9 @@ func TestHandleSubscriptionAlreadyInitialized(t *testing.T) {
var storeCalls int
var deleteCalls int
message.HandleSubscription[string, any](
message.Metadata{Runner: "demo", Resource: "demo"},
m.Subscribe(context.Background()),
func(update message.Update[string, any]) {
func(update message.Update[string, any], errChans chan error) {
end()
if update.Delete {
deleteCalls++
Expand Down Expand Up @@ -121,7 +123,7 @@ func TestXdsIRUpdates(t *testing.T) {
}()

updates := 0
message.HandleSubscription(snapshotC, func(u message.Update[string, *ir.Xds]) {
message.HandleSubscription(message.Metadata{Runner: "demo", Resource: "demo"}, snapshotC, func(u message.Update[string, *ir.Xds], errChans chan error) {
end()
if u.Key == "test" {
updates += 1
Expand Down
Loading

0 comments on commit 05a49b2

Please sign in to comment.