Skip to content

Commit

Permalink
feat: add subscribed message metrics instrumentations (#2080)
Browse files Browse the repository at this point in the history
* feat: add subscribed message metrics instrumentations

Signed-off-by: bitliu <bitliu@tencent.com>

* update

Signed-off-by: bitliu <bitliu@tencent.com>

* update

Signed-off-by: bitliu <bitliu@tencent.com>

* update

Signed-off-by: bitliu <bitliu@tencent.com>

* update

Signed-off-by: bitliu <bitliu@tencent.com>

---------

Signed-off-by: bitliu <bitliu@tencent.com>
  • Loading branch information
Xunzhuo authored Oct 27, 2023
1 parent 727d64a commit 845e874
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 68 deletions.
6 changes: 4 additions & 2 deletions internal/gatewayapi/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (r *Runner) Start(ctx context.Context) (err error) {
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
message.HandleSubscription(r.ProviderResources.GatewayAPIResources.Subscribe(ctx),
func(update message.Update[string, *gatewayapi.Resources]) {
message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentGatewayAPIRunner), Message: "provider-resources"}, r.ProviderResources.GatewayAPIResources.Subscribe(ctx),
func(update message.Update[string, *gatewayapi.Resources], errChan chan error) {
r.Logger.Info("received an update")

val := update.Value
Expand Down Expand Up @@ -93,6 +93,7 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
for key, val := range result.InfraIR {
if err := val.Validate(); err != nil {
r.Logger.Error(err, "unable to validate infra ir, skipped sending it")
errChan <- err
} else {
r.InfraIR.Store(key, val)
newKeys = append(newKeys, key)
Expand All @@ -102,6 +103,7 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
for key, val := range result.XdsIR {
if err := val.Validate(); err != nil {
r.Logger.Error(err, "unable to validate xds ir, skipped sending it")
errChan <- err
} else {
r.XdsIR.Store(key, val)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/globalratelimit/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ func (r *Runner) serveXdsConfigServer(ctx context.Context) {

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
// Subscribe to resources.
message.HandleSubscription(r.XdsIR.Subscribe(ctx),
func(update message.Update[string, *ir.Xds]) {
message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentGlobalRateLimitRunner), Message: "xds-ir"}, r.XdsIR.Subscribe(ctx),
func(update message.Update[string, *ir.Xds], errChan chan error) {
r.Logger.Info("received a notification")

if update.Delete {
if err := r.addNewSnapshot(ctx, nil); err != nil {
r.Logger.Error(err, "failed to update the config snapshot")
errChan <- err
}
} else {
// Translate to ratelimit xDS Config.
Expand Down
6 changes: 4 additions & 2 deletions internal/infrastructure/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,21 @@ func (r *Runner) Start(ctx context.Context) (err error) {

func (r *Runner) subscribeToProxyInfraIR(ctx context.Context) {
// Subscribe to resources
message.HandleSubscription(r.InfraIR.Subscribe(ctx),
func(update message.Update[string, *ir.Infra]) {
message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentInfrastructureRunner), Message: "infra-ir"}, r.InfraIR.Subscribe(ctx),
func(update message.Update[string, *ir.Infra], errChan chan error) {
r.Logger.Info("received an update")
val := update.Value

if update.Delete {
if err := r.mgr.DeleteProxyInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to delete infra")
errChan <- err
}
} else {
// Manage the proxy infra.
if err := r.mgr.CreateOrUpdateProxyInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to create new infra")
errChan <- err
}
}
},
Expand Down
22 changes: 22 additions & 0 deletions internal/message/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package message

import "github.com/envoyproxy/gateway/internal/metrics"

var (
watchableDepth = metrics.NewGauge("watchable_depth", "Current depth of watchable queue.")

watchableSubscribedDurationSeconds = metrics.NewHistogram("watchable_subscribed_duration_seconds", "How long in seconds a subscribed watchable is handled.", []float64{0.001, 0.01, 0.1, 1, 5, 10})

watchableSubscribedTotal = metrics.NewCounter("watchable_subscribed_total", "Total number of subscribed watchable.")

watchableSubscribedErrorsTotal = metrics.NewCounter("watchable_subscribed_errors_total", "Total number of subscribed watchable errors.")

runnerLabel = metrics.NewLabel("runner")

messageLabel = metrics.NewLabel("message")
)
48 changes: 45 additions & 3 deletions internal/message/watchutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,36 @@
package message

import (
"time"

"github.com/telepresenceio/watchable"

"github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/logging"
"github.com/envoyproxy/gateway/internal/metrics"
)

type Update[K comparable, V any] watchable.Update[K, V]

var logger = logging.DefaultLogger(v1alpha1.LogLevelInfo).WithName("watchable")

type Metadata struct {
Runner string
Message string
}

func (m Metadata) LabelValues() []metrics.LabelValue {
labels := make([]metrics.LabelValue, 0, 2)
if m.Runner != "" {
labels = append(labels, runnerLabel.Value(m.Runner))
}
if m.Message != "" {
labels = append(labels, messageLabel.Value(m.Message))
}

return labels
}

// HandleSubscription takes a channel returned by
// watchable.Map.Subscribe() (or .SubscribeSubset()), and calls the
// given function for each initial value in the map, and for any
Expand All @@ -20,20 +45,37 @@ type Update[K comparable, V any] watchable.Update[K, V]
// it handles the case where the watchable.Map already contains
// entries before .Subscribe is called.
func HandleSubscription[K comparable, V any](
meta Metadata,
subscription <-chan watchable.Snapshot[K, V],
handle func(Update[K, V]),
handle func(updateFunc Update[K, V], errChans chan error),
) {
//TODO: find a suitable value
errChans := make(chan error, 10)
go func() {
for err := range errChans {
logger.WithValues("runner", meta.Runner).Error(err, "observed an error")
watchableSubscribedErrorsTotal.With(meta.LabelValues()...).Increment()
}
}()

if snapshot, ok := <-subscription; ok {
for k, v := range snapshot.State {
startHandleTime := time.Now()
handle(Update[K, V]{
Key: k,
Value: v,
})
}, errChans)
watchableSubscribedTotal.With(meta.LabelValues()...).Increment()
watchableSubscribedDurationSeconds.With(meta.LabelValues()...).Record(time.Since(startHandleTime).Seconds())
}
}
for snapshot := range subscription {
watchableDepth.With(meta.LabelValues()...).Record(float64(len(subscription)))
for _, update := range snapshot.Updates {
handle(Update[K, V](update))
startHandleTime := time.Now()
handle(Update[K, V](update), errChans)
watchableSubscribedTotal.With(meta.LabelValues()...).Increment()
watchableSubscribedDurationSeconds.With(meta.LabelValues()...).Record(time.Since(startHandleTime).Seconds())
}
}
}
8 changes: 5 additions & 3 deletions internal/message/watchutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ func TestHandleSubscriptionAlreadyClosed(t *testing.T) {

var calls int
message.HandleSubscription[string, any](
message.Metadata{Runner: "demo", Message: "demo"},
ch,
func(message.Update[string, any]) { calls++ },
func(update message.Update[string, any], errChans chan error) { calls++ },
)
assert.Equal(t, 0, calls)
}
Expand All @@ -47,8 +48,9 @@ func TestHandleSubscriptionAlreadyInitialized(t *testing.T) {
var storeCalls int
var deleteCalls int
message.HandleSubscription[string, any](
message.Metadata{Runner: "demo", Message: "demo"},
m.Subscribe(context.Background()),
func(update message.Update[string, any]) {
func(update message.Update[string, any], errChans chan error) {
end()
if update.Delete {
deleteCalls++
Expand Down Expand Up @@ -121,7 +123,7 @@ func TestXdsIRUpdates(t *testing.T) {
}()

updates := 0
message.HandleSubscription(snapshotC, func(u message.Update[string, *ir.Xds]) {
message.HandleSubscription(message.Metadata{Runner: "demo", Message: "demo"}, snapshotC, func(u message.Update[string, *ir.Xds], errChans chan error) {
end()
if u.Key == "test" {
updates += 1
Expand Down
Loading

0 comments on commit 845e874

Please sign in to comment.