Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

e2e: add CEL to accesslog test #3730

Merged
merged 4 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions test/config/gatewayclass.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ spec:
type: Text
text: |
[%START_TIME%] %METADATA(ROUTE:envoy-gateway:resources)% "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%"
matches:
- "'x-envoy-logged' in request.headers"
sinks:
- type: File
file:
Expand Down
290 changes: 95 additions & 195 deletions test/e2e/tests/accesslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,11 @@ package tests

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"testing"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
httputils "sigs.k8s.io/gateway-api/conformance/utils/http"
"sigs.k8s.io/gateway-api/conformance/utils/kubernetes"
"sigs.k8s.io/gateway-api/conformance/utils/suite"
Expand All @@ -37,7 +29,14 @@ var FileAccessLogTest = suite.ConformanceTest{
Description: "Make sure file access log is working",
Manifests: []string{"testdata/accesslog-file.yaml"},
Test: func(t *testing.T, suite *suite.ConformanceTestSuite) {
t.Run("Stdout", func(t *testing.T) {
labels := map[string]string{
"job": "fluentbit",
"k8s_namespace_name": "envoy-gateway-system",
"k8s_container_name": "envoy",
}
match := "test-annotation-value"

t.Run("Positive", func(t *testing.T) {
ns := "gateway-conformance-infra"
routeNN := types.NamespacedName{Name: "accesslog-file", Namespace: ns}
gwNN := types.NamespacedName{Name: "same-namespace", Namespace: ns}
Expand All @@ -46,6 +45,9 @@ var FileAccessLogTest = suite.ConformanceTest{
expectedResponse := httputils.ExpectedResponse{
Request: httputils.Request{
Path: "/file",
Headers: map[string]string{
"x-envoy-logged": "1",
},
},
Response: httputils.Response{
StatusCode: 200,
Expand All @@ -55,70 +57,29 @@ var FileAccessLogTest = suite.ConformanceTest{
// make sure listener is ready
httputils.MakeRequestAndExpectEventuallyConsistentResponse(t, suite.RoundTripper, suite.TimeoutConfig, gwAddr, expectedResponse)

labels := map[string]string{
"job": "fluentbit",
"k8s_namespace_name": "envoy-gateway-system",
"k8s_container_name": "envoy",
}
// let's wait for the log to be sent to stdout
if err := wait.PollUntilContextTimeout(context.TODO(), time.Second, time.Minute, true,
func(ctx context.Context) (bool, error) {
// query log count from loki
count, err := QueryLogCountFromLoki(t, suite.Client, types.NamespacedName{
Namespace: "envoy-gateway-system",
}, labels, "test-annotation-value")
if err != nil {
t.Logf("failed to get log count from loki: %v", err)
return false, nil
}

if count > 0 {
return true, nil
}
return false, nil
}); err != nil {
t.Errorf("failed to wait log flush to loki: %v", err)
}

if err := wait.PollUntilContextTimeout(context.TODO(), time.Second, time.Minute, true,
func(ctx context.Context) (bool, error) {
// query log count from loki
preCount, err := QueryLogCountFromLoki(t, suite.Client, types.NamespacedName{
Namespace: "envoy-gateway-system",
}, labels, "test-annotation-value")
if err != nil {
t.Logf("failed to get log count from loki: %v", err)
return false, nil
}

httputils.MakeRequestAndExpectEventuallyConsistentResponse(t, suite.RoundTripper, suite.TimeoutConfig, gwAddr, expectedResponse)

// it will take some time for fluent-bit to collect the log and send to loki
// let's wait for a while
if err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 15*time.Second, true, func(_ context.Context) (bool, error) {
count, err := QueryLogCountFromLoki(t, suite.Client, types.NamespacedName{
Namespace: "envoy-gateway-system",
}, labels, "test-annotation-value")
if err != nil {
t.Logf("failed to get log count from loki: %v", err)
return false, nil
}

delta := count - preCount
if delta == 1 {
return true, nil
}
runLogTest(t, suite, gwAddr, expectedResponse, labels, match, 1)
})

t.Logf("preCount=%d, count=%d", preCount, count)
return false, nil
}); err != nil {
return false, nil
}
t.Run("Negative", func(t *testing.T) {
ns := "gateway-conformance-infra"
routeNN := types.NamespacedName{Name: "accesslog-file", Namespace: ns}
gwNN := types.NamespacedName{Name: "same-namespace", Namespace: ns}
gwAddr := kubernetes.GatewayAndHTTPRoutesMustBeAccepted(t, suite.Client, suite.TimeoutConfig, suite.ControllerName, kubernetes.NewGatewayRef(gwNN), routeNN)

return true, nil
}); err != nil {
t.Errorf("failed to get log count from loki: %v", err)
expectedResponse := httputils.ExpectedResponse{
Request: httputils.Request{
Path: "/file",
// envoy will not log this request without the header x-envoy-logged
},
Response: httputils.Response{
StatusCode: 200,
},
Namespace: ns,
}
// make sure listener is ready
httputils.MakeRequestAndExpectEventuallyConsistentResponse(t, suite.RoundTripper, suite.TimeoutConfig, gwAddr, expectedResponse)

runLogTest(t, suite, gwAddr, expectedResponse, labels, match, 0)
})
},
}
Expand All @@ -128,7 +89,12 @@ var OpenTelemetryTest = suite.ConformanceTest{
Description: "Make sure OpenTelemetry access log is working",
Manifests: []string{"testdata/accesslog-otel.yaml"},
Test: func(t *testing.T, suite *suite.ConformanceTestSuite) {
t.Run("OTel", func(t *testing.T) {
labels := map[string]string{
"k8s_namespace_name": "envoy-gateway-system",
"exporter": "OTLP",
}

t.Run("Positive", func(t *testing.T) {
ns := "gateway-conformance-infra"
routeNN := types.NamespacedName{Name: "accesslog-otel", Namespace: ns}
gwNN := types.NamespacedName{Name: "same-namespace", Namespace: ns}
Expand All @@ -137,6 +103,9 @@ var OpenTelemetryTest = suite.ConformanceTest{
expectedResponse := httputils.ExpectedResponse{
Request: httputils.Request{
Path: "/otel",
Headers: map[string]string{
"x-envoy-logged": "1",
},
},
Response: httputils.Response{
StatusCode: 200,
Expand All @@ -146,47 +115,29 @@ var OpenTelemetryTest = suite.ConformanceTest{
// make sure listener is ready
httputils.MakeRequestAndExpectEventuallyConsistentResponse(t, suite.RoundTripper, suite.TimeoutConfig, gwAddr, expectedResponse)

labels := map[string]string{
"k8s_namespace_name": "envoy-gateway-system",
"exporter": "OTLP",
}
if err := wait.PollUntilContextTimeout(context.TODO(), time.Second, time.Minute, true,
func(ctx context.Context) (bool, error) {
// query log count from loki
preCount, err := QueryLogCountFromLoki(t, suite.Client, types.NamespacedName{
Namespace: "envoy-gateway-system",
}, labels, "")
if err != nil {
t.Logf("failed to get log count from loki: %v", err)
return false, nil
}

httputils.MakeRequestAndExpectEventuallyConsistentResponse(t, suite.RoundTripper, suite.TimeoutConfig, gwAddr, expectedResponse)

if err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 10*time.Second, true, func(_ context.Context) (bool, error) {
count, err := QueryLogCountFromLoki(t, suite.Client, types.NamespacedName{
Namespace: "envoy-gateway-system",
}, labels, "")
if err != nil {
t.Logf("failed to get log count from loki: %v", err)
return false, nil
}

delta := count - preCount
if delta == 1 {
return true, nil
}
runLogTest(t, suite, gwAddr, expectedResponse, labels, "", 1)
})

t.Logf("preCount=%d, count=%d", preCount, count)
return false, nil
}); err != nil {
return false, nil
}
t.Run("Negative", func(t *testing.T) {
ns := "gateway-conformance-infra"
routeNN := types.NamespacedName{Name: "accesslog-otel", Namespace: ns}
gwNN := types.NamespacedName{Name: "same-namespace", Namespace: ns}
gwAddr := kubernetes.GatewayAndHTTPRoutesMustBeAccepted(t, suite.Client, suite.TimeoutConfig, suite.ControllerName, kubernetes.NewGatewayRef(gwNN), routeNN)

return true, nil
}); err != nil {
t.Errorf("failed to get log count from loki: %v", err)
expectedResponse := httputils.ExpectedResponse{
Request: httputils.Request{
Path: "/otel",
// envoy will not log this request without the header x-envoy-logged
},
Response: httputils.Response{
StatusCode: 200,
},
Namespace: ns,
}
// make sure listener is ready
httputils.MakeRequestAndExpectEventuallyConsistentResponse(t, suite.RoundTripper, suite.TimeoutConfig, gwAddr, expectedResponse)

runLogTest(t, suite, gwAddr, expectedResponse, labels, "", 0)
})
},
}
Expand All @@ -205,6 +156,9 @@ var ALSTest = suite.ConformanceTest{
expectedResponse := httputils.ExpectedResponse{
Request: httputils.Request{
Path: "/als",
Headers: map[string]string{
"x-envoy-logged": "1",
},
},
Response: httputils.Response{
StatusCode: 200,
Expand All @@ -225,96 +179,42 @@ var ALSTest = suite.ConformanceTest{
},
}

func ALSLogCount(t *testing.T, suite *suite.ConformanceTestSuite) int {
metricPath, err := RetrieveURL(suite.Client, types.NamespacedName{
Namespace: "monitoring",
Name: "envoy-als",
}, 19001, "/metrics")
if err != nil {
t.Fatalf("failed to get metric url: %v", err)
}

countMetric, err := RetrieveMetric(metricPath, "log_count", time.Second)
if err != nil {
t.Fatalf("failed to get metric: %v", err)
}

total := 0
for _, m := range countMetric.Metric {
if m.Counter != nil && m.Counter.Value != nil {
total += int(*m.Counter.Value)
}
}

return total
}

// QueryLogCountFromLoki queries log count from loki
// TODO: move to utils package if needed
func QueryLogCountFromLoki(t *testing.T, c client.Client, nn types.NamespacedName, keyValues map[string]string, match string) (int, error) {
svc := corev1.Service{}
if err := c.Get(context.Background(), types.NamespacedName{
Namespace: "monitoring",
Name: "loki",
}, &svc); err != nil {
return -1, err
}
lokiHost := ""
for _, ing := range svc.Status.LoadBalancer.Ingress {
if ing.IP != "" {
lokiHost = ing.IP
break
}
}

qParams := make([]string, 0, len(keyValues))
for k, v := range keyValues {
qParams = append(qParams, fmt.Sprintf("%s=\"%s\"", k, v))
}

q := "{" + strings.Join(qParams, ",") + "}"
if match != "" {
q = q + "|~\"" + match + "\""
}
params := url.Values{}
params.Add("query", q)
params.Add("start", fmt.Sprintf("%d", time.Now().Add(-10*time.Minute).Unix())) // query logs from last 10 minutes
lokiQueryURL := fmt.Sprintf("http://%s:3100/loki/api/v1/query_range?%s", lokiHost, params.Encode())
res, err := http.DefaultClient.Get(lokiQueryURL)
if err != nil {
return -1, err
}
t.Logf("get response from loki, query=%s, status=%s", q, res.Status)
func runLogTest(t *testing.T, suite *suite.ConformanceTestSuite, gwAddr string,
expectedResponse httputils.ExpectedResponse, expectedLabels map[string]string, expectedMatch string, expectedDelta int,
) {
if err := wait.PollUntilContextTimeout(context.TODO(), time.Second, time.Minute, true,
func(ctx context.Context) (bool, error) {
// query log count from loki
preCount, err := QueryLogCountFromLoki(t, suite.Client, expectedLabels, expectedMatch)
if err != nil {
t.Logf("failed to get log count from loki: %v", err)
return false, nil
}

b, err := io.ReadAll(res.Body)
if err != nil {
return -1, err
}
httputils.MakeRequestAndExpectEventuallyConsistentResponse(t, suite.RoundTripper, suite.TimeoutConfig, gwAddr, expectedResponse)

lokiResponse := &LokiQueryResponse{}
if err := json.Unmarshal(b, lokiResponse); err != nil {
return -1, err
}
// it will take some time for fluent-bit to collect the log and send to loki
// let's wait for a while
if err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 15*time.Second, true, func(_ context.Context) (bool, error) {
count, err := QueryLogCountFromLoki(t, suite.Client, expectedLabels, expectedMatch)
if err != nil {
t.Logf("failed to get log count from loki: %v", err)
return false, nil
}

if len(lokiResponse.Data.Result) == 0 {
return 0, nil
}
delta := count - preCount
if delta == expectedDelta {
return true, nil
}

total := 0
for _, res := range lokiResponse.Data.Result {
total += len(res.Values)
}
t.Logf("get response from loki, query=%s, total=%d", q, total)
return total, nil
}
t.Logf("preCount=%d, count=%d", preCount, count)
return false, nil
}); err != nil {
return false, nil
}

type LokiQueryResponse struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Result []struct {
Metric interface{}
Values []interface{} `json:"values"`
}
return true, nil
}); err != nil {
t.Errorf("failed to get log count from loki: %v", err)
}
}
Loading
Loading