Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: add utility to record metrics for sidecar #78

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/internal/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type Args struct {
TLSCertFile string
// Absolute path to the TLS key file.
TLSKeyFile string
// HttpEndpoint is the address of the metrics sever
HttpEndpoint string
// MetricsPath is the path where metrics will be recorded
MetricsPath string
}

func (args *Args) Validate() error {
Expand Down Expand Up @@ -169,7 +173,9 @@ func (rt *Runtime) kubeConnect(kubeconfig string, kubeAPIQPS float32, kubeAPIBur
func (rt *Runtime) csiConnect(csiAddress string) error {
ctx := context.Background()

metricsManager := metrics.NewCSIMetricsManagerForSidecar("" /* driverName */)
metricsManager := metrics.NewCSIMetricsManagerWithOptions("",
metrics.WithSubsystem(SubSystem),
metrics.WithLabelNames(LabelTargetSnapshotName, LabelBaseSnapshotName))
csiConn, err := connection.Connect(
ctx,
csiAddress,
Expand Down
8 changes: 7 additions & 1 deletion pkg/internal/runtime/test_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type TestHarness struct {
MockCSIIdentityServer *driver.MockIdentityServer
MockCSISnapshotMetadataServer *driver.MockSnapshotMetadataServer
MockCSIDriverConn *grpc.ClientConn
MetricsManager metrics.CSIMetricsManager

FakeCSIDriver *driver.CSIDriver

Expand Down Expand Up @@ -120,6 +121,8 @@ func (th *TestHarness) RuntimeArgs() Args {
GRPCPort: th.rtaPortNumber,
TLSCertFile: th.tlsCertFile,
TLSKeyFile: th.tlsKeyFile,
HttpEndpoint: "localhost:8081",
MetricsPath: "/metrics",
}
}

Expand Down Expand Up @@ -205,7 +208,6 @@ func (th *TestHarness) WithMockCSIDriver(t *testing.T) *TestHarness {
mockController := gomock.NewController(t)
identityServer := driver.NewMockIdentityServer(mockController)
snapshotMetadataServer := driver.NewMockSnapshotMetadataServer(mockController)
metricsManager := metrics.NewCSIMetricsManagerForSidecar("" /* driverName */)
drv := driver.NewMockCSIDriver(&driver.MockCSIDriverServers{
Identity: identityServer,
SnapshotMetadata: snapshotMetadataServer,
Expand All @@ -215,6 +217,9 @@ func (th *TestHarness) WithMockCSIDriver(t *testing.T) *TestHarness {

// Create a client connection to it
addr := drv.Address()
metricsManager := metrics.NewCSIMetricsManagerWithOptions("",
metrics.WithSubsystem(SubSystem),
metrics.WithLabelNames(LabelTargetSnapshotName, LabelBaseSnapshotName))
csiConn, err := connection.Connect(context.Background(), addr, metricsManager)
if err != nil {
t.Fatal("Connect", err)
Expand All @@ -226,6 +231,7 @@ func (th *TestHarness) WithMockCSIDriver(t *testing.T) *TestHarness {
th.MockCSIIdentityServer = identityServer
th.MockCSISnapshotMetadataServer = snapshotMetadataServer
th.driverName = "mock-csi-driver"
th.MetricsManager = metricsManager

return th
}
Expand Down
54 changes: 54 additions & 0 deletions pkg/internal/runtime/util_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package runtime

import (
"time"

"k8s.io/klog/v2"
)

const (
LabelTargetSnapshotName = "target_snapshot"
LabelBaseSnapshotName = "base_snapshot"
SubSystem = "snapshot_metadata_controller"

// MetadataAllocatedOperationName is the operation that tracks how long the controller takes to get the allocated blocks for a snapshot.
// Specifically, the operation metric is emitted based on the following timestamps:
// - Start_time: controller notices the first time that there is a GetMetadataAllocated RPC call to fetch the allocated blocks of metadata
// - End_time: controller notices that the RPC call is finished and the allocated blocks is streamed back to the driver
MetadataAllocatedOperationName = "MetadataAllocated"

// MetadataDeltaOperationName is the operation that tracks how long the controller takes to get the changed blocks between 2 snapshots
// Specifically, the operation metric is emitted based on the following timestamps:
// - Start_time: controller notices the first time that there is a GetMetadataDelta RPC call to fetch the changed blocks between 2 snapshots
// - End_time: controller notices that the RPC call is finished and the changed blocks is streamed back to the driver
MetadataDeltaOperationName = "MetadataDelta"
)

// RecordMetricsWithLabels is a wrapper on the csi-lib-utils RecordMetrics function, that calls the
// "RecordMetrics" functions with the necessary labels added to the MetricsManager runtime.
func (rt *Runtime) RecordMetricsWithLabels(opLabel map[string]string, opName string, startTime time.Time, opErr error) {
metricsWithLabel, err := rt.MetricsManager.WithLabelValues(opLabel)
if err != nil {
klog.Error(err, "failed to add labels to metrics")
return
}

opDuration := time.Since(startTime)
metricsWithLabel.RecordMetrics(opName, opErr, opDuration)
}
2 changes: 2 additions & 0 deletions pkg/internal/server/grpc/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/metrics"
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1"
fakesnapshot "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned/fake"
snapshotutils "github.com/kubernetes-csi/external-snapshotter/v8/pkg/utils"
Expand Down Expand Up @@ -112,6 +113,7 @@ func (th *testHarness) Runtime() *runtime.Runtime {
SnapshotClient: th.FakeSnapshotClient,
DriverName: th.DriverName,
CSIConn: th.mockCSIDriverConn,
MetricsManager: metrics.NewCSIMetricsManagerWithOptions(th.DriverName, metrics.WithSubsystem(runtime.SubSystem), metrics.WithLabelNames(runtime.LabelTargetSnapshotName, runtime.LabelBaseSnapshotName)),
}
}

Expand Down
15 changes: 13 additions & 2 deletions pkg/internal/server/grpc/get_metadata_allocated.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,32 @@ import (
"fmt"
"io"
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"

"github.com/kubernetes-csi/external-snapshot-metadata/pkg/api"
"github.com/kubernetes-csi/external-snapshot-metadata/pkg/internal/runtime"
)

func (s *Server) GetMetadataAllocated(req *api.GetMetadataAllocatedRequest, stream api.SnapshotMetadata_GetMetadataAllocatedServer) error {
func (s *Server) GetMetadataAllocated(req *api.GetMetadataAllocatedRequest, stream api.SnapshotMetadata_GetMetadataAllocatedServer) (err error) {
// Create a timeout context so that failure in either sending to the client or
// receiving from the CSI driver will ultimately abort the handler session.
// The context could also get canceled by the client.
ctx, cancelFn := context.WithTimeout(s.getMetadataAllocatedContextWithLogger(req, stream), s.config.MaxStreamDur)
defer cancelFn()

// Record metrics when the operation ends
defer func(startTime time.Time) {
opLabel := map[string]string{
runtime.LabelTargetSnapshotName: fmt.Sprintf("%s/%s", req.Namespace, req.SnapshotName),
}
s.config.Runtime.RecordMetricsWithLabels(opLabel, runtime.MetadataAllocatedOperationName, startTime, err)
}(time.Now())

if err := s.validateGetMetadataAllocatedRequest(req); err != nil {
klog.FromContext(ctx).Error(err, "validation failed")
return err
Expand All @@ -63,7 +73,8 @@ func (s *Server) GetMetadataAllocated(req *api.GetMetadataAllocatedRequest, stre
return err
}

return s.streamGetMetadataAllocatedResponse(ctx, stream, csiStream)
err = s.streamGetMetadataAllocatedResponse(ctx, stream, csiStream)
return err
}

// getMetadataAllocatedContextWithLogger returns the stream context with an embedded
Expand Down
25 changes: 25 additions & 0 deletions pkg/internal/server/grpc/get_metadata_allocated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,31 @@ func TestGetMetadataAllocatedViaGRPCClient(t *testing.T) {
} else if errStream != nil {
assert.ErrorIs(t, errStream, io.EOF)
}

// Validate metrics are recorded correctly
metrics, _ := grpcServer.config.Runtime.MetricsManager.GetRegistry().Gather()
statusFound := 0
snapshotFound := 0

// Validate that both gauge and controller metrics is recorded
assert.GreaterOrEqual(t, 2, len(metrics))
assert.Equal(t, *metrics[0].Name, "process_start_time_seconds")
assert.Equal(t, *metrics[1].Name, "snapshot_metadata_controller_operations_seconds")

// Validate grpc_status_code and target_snapshot name
for _, metric := range metrics[1].Metric {
for _, labels := range metric.Label {
expTargetSnapshotName := fmt.Sprintf("%s/%s", tc.req.Namespace, tc.req.SnapshotName)
if *labels.Name == "grpc_status_code" && *labels.Value == tc.expStatusCode.String() {
statusFound = 1
}
if *labels.Name == "target_snapshot" && *labels.Value == expTargetSnapshotName {
snapshotFound = 1
}
}
}
assert.Equal(t, 1, statusFound)
assert.Equal(t, 1, snapshotFound)
})
}
}
Expand Down
16 changes: 14 additions & 2 deletions pkg/internal/server/grpc/get_metadata_delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,33 @@ import (
"fmt"
"io"
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"

"github.com/kubernetes-csi/external-snapshot-metadata/pkg/api"
"github.com/kubernetes-csi/external-snapshot-metadata/pkg/internal/runtime"
)

func (s *Server) GetMetadataDelta(req *api.GetMetadataDeltaRequest, stream api.SnapshotMetadata_GetMetadataDeltaServer) error {
func (s *Server) GetMetadataDelta(req *api.GetMetadataDeltaRequest, stream api.SnapshotMetadata_GetMetadataDeltaServer) (err error) {
// Create a timeout context so that failure in either sending to the client or
// receiving from the CSI driver will ultimately abort the handler session.
// The context could also get canceled by the client.
ctx, cancelFn := context.WithTimeout(s.getMetadataDeltaContextWithLogger(req, stream), s.config.MaxStreamDur)
defer cancelFn()

// Record metrics when the operation ends
defer func(startTime time.Time) {
opLabel := map[string]string{
runtime.LabelTargetSnapshotName: fmt.Sprintf("%s/%s", req.Namespace, req.TargetSnapshotName),
runtime.LabelBaseSnapshotName: fmt.Sprintf("%s/%s", req.Namespace, req.BaseSnapshotName),
}
s.config.Runtime.RecordMetricsWithLabels(opLabel, runtime.MetadataAllocatedOperationName, startTime, err)
}(time.Now())

if err := s.validateGetMetadataDeltaRequest(req); err != nil {
klog.FromContext(ctx).Error(err, "validation failed")
return err
Expand All @@ -63,7 +74,8 @@ func (s *Server) GetMetadataDelta(req *api.GetMetadataDeltaRequest, stream api.S
return err
}

return s.streamGetMetadataDeltaResponse(ctx, stream, csiStream)
err = s.streamGetMetadataDeltaResponse(ctx, stream, csiStream)
return err
}

func (s *Server) getMetadataDeltaContextWithLogger(req *api.GetMetadataDeltaRequest, stream api.SnapshotMetadata_GetMetadataDeltaServer) context.Context {
Expand Down
31 changes: 31 additions & 0 deletions pkg/internal/server/grpc/get_metadata_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,37 @@ func TestGetMetadataDeltaViaGRPCClient(t *testing.T) {
} else if errStream != nil {
assert.ErrorIs(t, errStream, io.EOF)
}

// Validate metrics are recorded correctly
metrics, _ := grpcServer.config.Runtime.MetricsManager.GetRegistry().Gather()
statusFound := 0
targetSnapshotFound := 0
baseSnapshotFound := 0

// Validate that both gauge and controller metrics is recorded
assert.GreaterOrEqual(t, 2, len(metrics))
assert.Equal(t, *metrics[0].Name, "process_start_time_seconds")
assert.Equal(t, *metrics[1].Name, "snapshot_metadata_controller_operations_seconds")

// Validate grpc_status_code and target_snapshot name
for _, metric := range metrics[1].Metric {
for _, labels := range metric.Label {
expTargetSnapshotName := fmt.Sprintf("%s/%s", tc.req.Namespace, tc.req.TargetSnapshotName)
expBaseSnapshotName := fmt.Sprintf("%s/%s", tc.req.Namespace, tc.req.BaseSnapshotName)
if *labels.Name == "grpc_status_code" && *labels.Value == tc.expStatusCode.String() {
statusFound = 1
}
if *labels.Name == "target_snapshot" && *labels.Value == expTargetSnapshotName {
targetSnapshotFound = 1
}
if *labels.Name == "base_snapshot" && *labels.Value == expBaseSnapshotName {
baseSnapshotFound = 1
}
}
}
assert.Equal(t, 1, statusFound)
assert.Equal(t, 1, targetSnapshotFound)
assert.Equal(t, 1, baseSnapshotFound)
})
}
}
Expand Down
29 changes: 24 additions & 5 deletions pkg/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sidecar
import (
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"strconv"
Expand Down Expand Up @@ -86,16 +87,25 @@ func Run(argv []string, version string) int {

klog.Infof("CSI driver name: %q", rt.DriverName)

// TBD May need to exposed metric HTTP end point
// here because the wait for the CSI driver is open ended.

grpcServer, err := startGRPCServerAndValidateCSIDriver(s.createServerConfig(rt))
if err != nil {
klog.Error(err)
return 1
}

// TODO: Start the HTTP metrics server here.
// start listening & serving http endpoint, if set
mux := http.NewServeMux()
if *s.httpEndpoint != "" {
Nikhil-Ladha marked this conversation as resolved.
Show resolved Hide resolved
rt.MetricsManager.RegisterToServer(mux, *s.metricsPath)
rt.MetricsManager.SetDriverName(rt.DriverName)
go func() {
klog.Infof("ServeMux listening at %q", *s.httpEndpoint)
err := http.ListenAndServe(*s.httpEndpoint, mux)
if err != nil {
klog.Fatalf("Failed to start HTTP server at specified address (%q) and metrics path (%q): %s", *s.httpEndpoint, *s.metricsPath, err)
}
}()
}

shutdownOnTerminationSignal(grpcServer)

Expand Down Expand Up @@ -172,7 +182,6 @@ func (s *sidecarFlagSet) parseFlagsAndHandleShowVersion(args []string) (handledS
}

func (s *sidecarFlagSet) runtimeArgsFromFlags() runtime.Args {
// TODO: set the HTTP server properties.
return runtime.Args{
CSIAddress: *s.csiAddress,
CSITimeout: *s.csiTimeout,
Expand All @@ -182,6 +191,8 @@ func (s *sidecarFlagSet) runtimeArgsFromFlags() runtime.Args {
GRPCPort: *s.grpcPort,
TLSCertFile: *s.tlsCert,
TLSKeyFile: *s.tlsKey,
HttpEndpoint: *s.httpEndpoint,
MetricsPath: *s.metricsPath,
}
}

Expand Down Expand Up @@ -222,6 +233,14 @@ func (s *sidecarFlagSet) runtimeArgsToArgv(progName string, rta runtime.Args) []
argv = append(argv, "-"+flagKubeAPIQPS, strconv.FormatFloat(float64(rta.KubeAPIQPS), 'f', -1, 32))
}

if rta.HttpEndpoint != "" {
argv = append(argv, "-"+flagHTTPEndpoint, rta.HttpEndpoint)
}

if rta.MetricsPath != defaultMetricsPath {
argv = append(argv, "-"+flagMetricsPath, rta.MetricsPath)
}

return argv
}

Expand Down
Loading