Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add subscribed message metrics instrumentations #2080

Merged
merged 5 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions internal/gatewayapi/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
message.HandleSubscription(r.ProviderResources.GatewayAPIResources.Subscribe(ctx),
func(update message.Update[string, *gatewayapi.Resources]) {
message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentGatewayAPIRunner), Resource: "provider-resources"}, r.ProviderResources.GatewayAPIResources.Subscribe(ctx),
func(update message.Update[string, *gatewayapi.Resources], errChan chan error) {
r.Logger.Info("received an update")

val := update.Value
Expand Down Expand Up @@ -93,6 +93,7 @@
for key, val := range result.InfraIR {
if err := val.Validate(); err != nil {
r.Logger.Error(err, "unable to validate infra ir, skipped sending it")
errChan <- err

Check warning on line 96 in internal/gatewayapi/runner/runner.go

View check run for this annotation

Codecov / codecov/patch

internal/gatewayapi/runner/runner.go#L96

Added line #L96 was not covered by tests
} else {
r.InfraIR.Store(key, val)
newKeys = append(newKeys, key)
Expand All @@ -102,6 +103,7 @@
for key, val := range result.XdsIR {
if err := val.Validate(); err != nil {
r.Logger.Error(err, "unable to validate xds ir, skipped sending it")
errChan <- err

Check warning on line 106 in internal/gatewayapi/runner/runner.go

View check run for this annotation

Codecov / codecov/patch

internal/gatewayapi/runner/runner.go#L106

Added line #L106 was not covered by tests
} else {
r.XdsIR.Store(key, val)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/globalratelimit/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ func (r *Runner) serveXdsConfigServer(ctx context.Context) {

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
// Subscribe to resources.
message.HandleSubscription(r.XdsIR.Subscribe(ctx),
func(update message.Update[string, *ir.Xds]) {
message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentGlobalRateLimitRunner), Resource: "xds-ir"}, r.XdsIR.Subscribe(ctx),
func(update message.Update[string, *ir.Xds], errChan chan error) {
r.Logger.Info("received a notification")

if update.Delete {
if err := r.addNewSnapshot(ctx, nil); err != nil {
r.Logger.Error(err, "failed to update the config snapshot")
errChan <- err
}
} else {
// Translate to ratelimit xDS Config.
Expand Down
6 changes: 4 additions & 2 deletions internal/infrastructure/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,21 @@ func (r *Runner) Start(ctx context.Context) (err error) {

func (r *Runner) subscribeToProxyInfraIR(ctx context.Context) {
// Subscribe to resources
message.HandleSubscription(r.InfraIR.Subscribe(ctx),
func(update message.Update[string, *ir.Infra]) {
message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentInfrastructureRunner), Resource: "infra-ir"}, r.InfraIR.Subscribe(ctx),
func(update message.Update[string, *ir.Infra], errChan chan error) {
r.Logger.Info("received an update")
val := update.Value

if update.Delete {
if err := r.mgr.DeleteProxyInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to delete infra")
errChan <- err
}
} else {
// Manage the proxy infra.
if err := r.mgr.CreateOrUpdateProxyInfra(ctx, val); err != nil {
r.Logger.Error(err, "failed to create new infra")
errChan <- err
}
}
},
Expand Down
22 changes: 22 additions & 0 deletions internal/message/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package message

import "github.com/envoyproxy/gateway/internal/metrics"

var (
watchableDepth = metrics.NewGauge("watchable_depth", "Current depth of watchable queue.")

watchableSubscribedDurationSeconds = metrics.NewHistogram("watchable_subscribed_duration_seconds", "How long in seconds a subscribed watchable is handled.", []float64{0.001, 0.01, 0.1, 1, 5, 10})

watchableSubscribedTotal = metrics.NewCounter("watchable_subscribed_total", "Total number of subscribed watchable.")

watchableSubscribedErrorsTotal = metrics.NewCounter("watchable_subscribed_errors_total", "Total number of subscribed watchable errors.")

runnerLabel = metrics.NewLabel("runner")

resourceLabel = metrics.NewLabel("resource")
Xunzhuo marked this conversation as resolved.
Show resolved Hide resolved
)
47 changes: 44 additions & 3 deletions internal/message/watchutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,36 @@
package message

import (
"time"

"github.com/telepresenceio/watchable"

"github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/logging"
"github.com/envoyproxy/gateway/internal/metrics"
)

type Update[K comparable, V any] watchable.Update[K, V]

var logger = logging.DefaultLogger(v1alpha1.LogLevelInfo).WithName("watchable")

type Metadata struct {
Runner string
Resource string
}

func (m Metadata) LabelValues() []metrics.LabelValue {
labels := make([]metrics.LabelValue, 0, 2)
if m.Runner != "" {
labels = append(labels, runnerLabel.Value(m.Runner))
}
if m.Resource != "" {
labels = append(labels, resourceLabel.Value(m.Resource))
}

return labels
}

// HandleSubscription takes a channel returned by
// watchable.Map.Subscribe() (or .SubscribeSubset()), and calls the
// given function for each initial value in the map, and for any
Expand All @@ -20,20 +45,36 @@
// it handles the case where the watchable.Map already contains
// entries before .Subscribe is called.
func HandleSubscription[K comparable, V any](
meta Metadata,
subscription <-chan watchable.Snapshot[K, V],
handle func(Update[K, V]),
handle func(updateFunc Update[K, V], errChans chan error),
) {
errChans := make(chan error, 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason for use 10 for buffer size?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some buffers to make sure the errChan will not be stuck when observing errors from snapshot.Updates. As for the size, I am not sure about it, a suggested value from chatgpt.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some comments for it, or a TODO: find a suitable value for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

go func() {
for err := range errChans {
logger.WithValues("runner", meta.Runner).Error(err, "observed an error")
watchableSubscribedErrorsTotal.With(meta.LabelValues()...).Increment()
}

Check warning on line 57 in internal/message/watchutil.go

View check run for this annotation

Codecov / codecov/patch

internal/message/watchutil.go#L55-L57

Added lines #L55 - L57 were not covered by tests
}()

if snapshot, ok := <-subscription; ok {
for k, v := range snapshot.State {
startHandleTime := time.Now()
handle(Update[K, V]{
Key: k,
Value: v,
})
}, errChans)
watchableSubscribedTotal.With(meta.LabelValues()...).Increment()
watchableSubscribedDurationSeconds.With(meta.LabelValues()...).Record(time.Since(startHandleTime).Seconds())
}
}
for snapshot := range subscription {
watchableDepth.With(meta.LabelValues()...).Record(float64(len(subscription)))
for _, update := range snapshot.Updates {
handle(Update[K, V](update))
startHandleTime := time.Now()
handle(Update[K, V](update), errChans)
watchableSubscribedTotal.With(meta.LabelValues()...).Increment()
watchableSubscribedDurationSeconds.With(meta.LabelValues()...).Record(time.Since(startHandleTime).Seconds())
}
}
}
8 changes: 5 additions & 3 deletions internal/message/watchutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ func TestHandleSubscriptionAlreadyClosed(t *testing.T) {

var calls int
message.HandleSubscription[string, any](
message.Metadata{Runner: "demo", Resource: "demo"},
ch,
func(message.Update[string, any]) { calls++ },
func(update message.Update[string, any], errChans chan error) { calls++ },
)
assert.Equal(t, 0, calls)
}
Expand All @@ -47,8 +48,9 @@ func TestHandleSubscriptionAlreadyInitialized(t *testing.T) {
var storeCalls int
var deleteCalls int
message.HandleSubscription[string, any](
message.Metadata{Runner: "demo", Resource: "demo"},
m.Subscribe(context.Background()),
func(update message.Update[string, any]) {
func(update message.Update[string, any], errChans chan error) {
end()
if update.Delete {
deleteCalls++
Expand Down Expand Up @@ -121,7 +123,7 @@ func TestXdsIRUpdates(t *testing.T) {
}()

updates := 0
message.HandleSubscription(snapshotC, func(u message.Update[string, *ir.Xds]) {
message.HandleSubscription(message.Metadata{Runner: "demo", Resource: "demo"}, snapshotC, func(u message.Update[string, *ir.Xds], errChans chan error) {
end()
if u.Key == "test" {
updates += 1
Expand Down
Loading