diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS deleted file mode 100644 index dac7ed9c..00000000 --- a/.github/CODEOWNERS +++ /dev/null @@ -1,4 +0,0 @@ -# CODEOWNERS info: https://help.github.com/en/articles/about-code-owners -# Owners are automatically requested for review for PRs that changes code -# that they own. -* @ankitnayan @makeavish @nityanandagohain @prashant-shahi @srikanthccv diff --git a/components/components.go b/components/components.go index c2dabc1e..e301afe3 100644 --- a/components/components.go +++ b/components/components.go @@ -117,6 +117,7 @@ import ( "github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter" "github.com/SigNoz/signoz-otel-collector/exporter/clickhousetracesexporter" "github.com/SigNoz/signoz-otel-collector/exporter/signozkafkaexporter" + signozhealthcheckextension "github.com/SigNoz/signoz-otel-collector/extension/healthcheckextension" _ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok" "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor" "github.com/SigNoz/signoz-otel-collector/processor/signozspanmetricsprocessor" @@ -304,6 +305,7 @@ func CoreComponents() ( oidcauthextension.NewFactory(), healthcheckextension.NewFactory(), pprofextension.NewFactory(), + signozhealthcheckextension.NewFactory(), zpagesextension.NewFactory(), ) errs = multierr.Append(errs, err) diff --git a/extension/healthcheckextension/Makefile b/extension/healthcheckextension/Makefile new file mode 100644 index 00000000..84677bc7 --- /dev/null +++ b/extension/healthcheckextension/Makefile @@ -0,0 +1,2 @@ +include ../../Makefile.Common + diff --git a/extension/healthcheckextension/README.md b/extension/healthcheckextension/README.md new file mode 100644 index 00000000..d9b5c563 --- /dev/null +++ b/extension/healthcheckextension/README.md @@ -0,0 +1,3 @@ +# Health Check + +This is a copy of upstream health check that always returns healthy. \ No newline at end of file diff --git a/extension/healthcheckextension/config.go b/extension/healthcheckextension/config.go new file mode 100644 index 00000000..cd9526d7 --- /dev/null +++ b/extension/healthcheckextension/config.go @@ -0,0 +1,74 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package healthcheckextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension" + +import ( + "errors" + "strings" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confighttp" +) + +type ResponseBodySettings struct { + // Healthy represents the body of the response returned when the collector is healthy. + // The default value is "" + Healthy string `mapstructure:"healthy"` + + // Unhealthy represents the body of the response returned when the collector is unhealthy. + // The default value is "" + Unhealthy string `mapstructure:"unhealthy"` +} + +// Config has the configuration for the extension enabling the health check +// extension, used to report the health status of the service. +type Config struct { + confighttp.ServerConfig `mapstructure:",squash"` + + // Path represents the path the health check service will serve. + // The default path is "/". + Path string `mapstructure:"path"` + + // ResponseBody represents the body of the response returned by the health check service. + // This overrides the default response that it would return. + ResponseBody *ResponseBodySettings `mapstructure:"response_body"` + + // CheckCollectorPipeline contains the list of settings of collector pipeline health check + CheckCollectorPipeline checkCollectorPipelineSettings `mapstructure:"check_collector_pipeline"` +} + +var _ component.Config = (*Config)(nil) +var ( + errNoEndpointProvided = errors.New("bad config: endpoint must be specified") + errInvalidExporterFailureThresholdProvided = errors.New("bad config: exporter_failure_threshold expects a positive number") + errInvalidPath = errors.New("bad config: path must start with /") +) + +// Validate checks if the extension configuration is valid +func (cfg *Config) Validate() error { + _, err := time.ParseDuration(cfg.CheckCollectorPipeline.Interval) + if err != nil { + return err + } + if cfg.Endpoint == "" { + return errNoEndpointProvided + } + if cfg.CheckCollectorPipeline.ExporterFailureThreshold <= 0 { + return errInvalidExporterFailureThresholdProvided + } + if !strings.HasPrefix(cfg.Path, "/") { + return errInvalidPath + } + return nil +} + +type checkCollectorPipelineSettings struct { + // Enabled indicates whether to not enable collector pipeline check. + Enabled bool `mapstructure:"enabled"` + // Interval the time range to check healthy status of collector pipeline + Interval string `mapstructure:"interval"` + // ExporterFailureThreshold is the threshold of exporter failure numbers during the Interval + ExporterFailureThreshold int `mapstructure:"exporter_failure_threshold"` +} diff --git a/extension/healthcheckextension/config_test.go b/extension/healthcheckextension/config_test.go new file mode 100644 index 00000000..deb3e9d8 --- /dev/null +++ b/extension/healthcheckextension/config_test.go @@ -0,0 +1,80 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package healthcheckextension + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/confmap/confmaptest" + + "github.com/SigNoz/signoz-otel-collector/extension/healthcheckextension/internal/metadata" +) + +func TestLoadConfig(t *testing.T) { + t.Parallel() + + tests := []struct { + id component.ID + expected component.Config + expectedErr error + }{ + { + id: component.NewID(metadata.Type), + expected: NewFactory().CreateDefaultConfig(), + }, + { + id: component.NewIDWithName(metadata.Type, "1"), + expected: &Config{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: "localhost:13", + TLSSetting: &configtls.ServerConfig{ + Config: configtls.Config{ + CAFile: "/path/to/ca", + CertFile: "/path/to/cert", + KeyFile: "/path/to/key", + }, + }, + }, + CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), + Path: "/", + ResponseBody: nil, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "missingendpoint"), + expectedErr: errNoEndpointProvided, + }, + { + id: component.NewIDWithName(metadata.Type, "invalidthreshold"), + expectedErr: errInvalidExporterFailureThresholdProvided, + }, + { + id: component.NewIDWithName(metadata.Type, "invalidpath"), + expectedErr: errInvalidPath, + }, + } + for _, tt := range tests { + t.Run(tt.id.String(), func(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + sub, err := cm.Sub(tt.id.String()) + require.NoError(t, err) + require.NoError(t, component.UnmarshalConfig(sub, cfg)) + if tt.expectedErr != nil { + assert.ErrorIs(t, component.ValidateConfig(cfg), tt.expectedErr) + return + } + assert.NoError(t, component.ValidateConfig(cfg)) + assert.Equal(t, tt.expected, cfg) + }) + } +} diff --git a/extension/healthcheckextension/doc.go b/extension/healthcheckextension/doc.go new file mode 100644 index 00000000..16bd9583 --- /dev/null +++ b/extension/healthcheckextension/doc.go @@ -0,0 +1,9 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:generate mdatagen metadata.yaml + +// Package healthcheckextension implements an extension that enables an HTTP +// endpoint that can be used to check the overall health and status of the +// service. +package healthcheckextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension" diff --git a/extension/healthcheckextension/factory.go b/extension/healthcheckextension/factory.go new file mode 100644 index 00000000..26d43bcb --- /dev/null +++ b/extension/healthcheckextension/factory.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package healthcheckextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension" + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/extension" + + "github.com/SigNoz/signoz-otel-collector/extension/healthcheckextension/internal/metadata" +) + +const defaultPort = 13133 + +// NewFactory creates a factory for HealthCheck extension. +func NewFactory() extension.Factory { + return extension.NewFactory( + metadata.Type, + createDefaultConfig, + createExtension, + metadata.ExtensionStability, + ) +} + +func createDefaultConfig() component.Config { + return &Config{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: fmt.Sprintf("0.0.0.0:%d", defaultPort), + }, + CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), + Path: "/", + } +} + +func createExtension(_ context.Context, set extension.CreateSettings, cfg component.Config) (extension.Extension, error) { + config := cfg.(*Config) + + return newServer(*config, set.TelemetrySettings), nil +} + +// defaultCheckCollectorPipelineSettings returns the default settings for CheckCollectorPipeline. +func defaultCheckCollectorPipelineSettings() checkCollectorPipelineSettings { + return checkCollectorPipelineSettings{ + Enabled: false, + Interval: "5m", + ExporterFailureThreshold: 5, + } +} diff --git a/extension/healthcheckextension/factory_test.go b/extension/healthcheckextension/factory_test.go new file mode 100644 index 00000000..eb031ce9 --- /dev/null +++ b/extension/healthcheckextension/factory_test.go @@ -0,0 +1,42 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package healthcheckextension + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/extension/extensiontest" + + "github.com/SigNoz/signoz-otel-collector/internal/common/testutil" +) + +func TestFactory_CreateDefaultConfig(t *testing.T) { + cfg := createDefaultConfig() + assert.Equal(t, &Config{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: "0.0.0.0:13133", + }, + CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), + Path: "/", + }, cfg) + + assert.NoError(t, componenttest.CheckConfigStruct(cfg)) + ext, err := createExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + require.NotNil(t, ext) +} + +func TestFactory_CreateExtension(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = testutil.GetAvailableLocalAddress(t) + + ext, err := createExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + require.NotNil(t, ext) +} diff --git a/extension/healthcheckextension/generated_package_test.go b/extension/healthcheckextension/generated_package_test.go new file mode 100644 index 00000000..1dfd9697 --- /dev/null +++ b/extension/healthcheckextension/generated_package_test.go @@ -0,0 +1,13 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package healthcheckextension + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/extension/healthcheckextension/healthcheckextension.go b/extension/healthcheckextension/healthcheckextension.go new file mode 100644 index 00000000..c57a19a4 --- /dev/null +++ b/extension/healthcheckextension/healthcheckextension.go @@ -0,0 +1,103 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package healthcheckextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension" + +import ( + "context" + "errors" + "fmt" + "net/http" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension" + "go.uber.org/zap" + + "github.com/SigNoz/signoz-otel-collector/extension/healthcheckextension/internal/healthcheck" +) + +type healthCheckExtension struct { + config Config + logger *zap.Logger + state *healthcheck.HealthCheck + server *http.Server + stopCh chan struct{} + settings component.TelemetrySettings +} + +var _ extension.PipelineWatcher = (*healthCheckExtension)(nil) + +func (hc *healthCheckExtension) Start(ctx context.Context, host component.Host) error { + + hc.logger.Info("Starting health_check extension", zap.Any("config", hc.config)) + ln, err := hc.config.ToListener(ctx) + if err != nil { + return fmt.Errorf("failed to bind to address %s: %w", hc.config.Endpoint, err) + } + + hc.server, err = hc.config.ToServer(ctx, host, hc.settings, nil) + if err != nil { + return err + } + + // Mount HC handler + mux := http.NewServeMux() + mux.Handle(hc.config.Path, hc.baseHandler()) + hc.server.Handler = mux + hc.stopCh = make(chan struct{}) + go func() { + defer close(hc.stopCh) + + // The listener ownership goes to the server. + if err = hc.server.Serve(ln); !errors.Is(err, http.ErrServerClosed) && err != nil { + hc.settings.ReportStatus(component.NewFatalErrorEvent(err)) + } + }() + + return nil +} + +// base handler function +func (hc *healthCheckExtension) baseHandler() http.Handler { + if hc.config.ResponseBody != nil { + return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(hc.config.ResponseBody.Healthy)) + }) + } + return hc.state.Handler() +} + +func (hc *healthCheckExtension) Shutdown(context.Context) error { + if hc.server == nil { + return nil + } + err := hc.server.Close() + if hc.stopCh != nil { + <-hc.stopCh + } + return err +} + +func (hc *healthCheckExtension) Ready() error { + hc.state.Set(healthcheck.Ready) + return nil +} + +func (hc *healthCheckExtension) NotReady() error { + hc.state.Set(healthcheck.Unavailable) + return nil +} + +func newServer(config Config, settings component.TelemetrySettings) *healthCheckExtension { + hc := &healthCheckExtension{ + config: config, + logger: settings.Logger, + state: healthcheck.New(), + settings: settings, + } + + hc.state.SetLogger(settings.Logger) + + return hc +} diff --git a/extension/healthcheckextension/healthcheckextension_test.go b/extension/healthcheckextension/healthcheckextension_test.go new file mode 100644 index 00000000..2a9c4fd1 --- /dev/null +++ b/extension/healthcheckextension/healthcheckextension_test.go @@ -0,0 +1,242 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package healthcheckextension + +import ( + "context" + "io" + "net" + "net/http" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confighttp" + + "github.com/SigNoz/signoz-otel-collector/internal/common/testutil" +) + +const ( + expectedBodyNotReady = "{\"status\":\"Server not available\",\"upSince\":" + expectedBodyReady = "{\"status\":\"Server available\",\"upSince\":" +) + +func ensureServerRunning(url string) func() bool { + return func() bool { + _, err := net.DialTimeout("tcp", url, 30*time.Second) + return err == nil + } +} + +type teststep struct { + step func(*healthCheckExtension) error + expectedStatusCode int + expectedBody string +} + +func TestHealthCheckExtensionUsage(t *testing.T) { + tests := []struct { + name string + config Config + teststeps []teststep + }{ + { + name: "WithoutCheckCollectorPipeline", + config: Config{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + }, + CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), + Path: "/", + ResponseBody: nil, + }, + teststeps: []teststep{ + { + step: func(hcExt *healthCheckExtension) error { return hcExt.Ready() }, + expectedStatusCode: http.StatusOK, + expectedBody: expectedBodyReady, + }, + }, + }, + { + name: "WithCustomizedPathWithoutCheckCollectorPipeline", + config: Config{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + }, + CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), + Path: "/health", + }, + teststeps: []teststep{ + { + step: func(hcExt *healthCheckExtension) error { return hcExt.Ready() }, + expectedStatusCode: http.StatusOK, + }, + }, + }, + { + name: "WithBothCustomResponseBodyWithoutCheckCollectorPipeline", + config: Config{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + }, + CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), + Path: "/", + ResponseBody: &ResponseBodySettings{Healthy: "ALL OK", Unhealthy: "NOT OK"}, + }, + teststeps: []teststep{ + { + step: func(hcExt *healthCheckExtension) error { return hcExt.Ready() }, + expectedStatusCode: http.StatusOK, + expectedBody: "ALL OK", + }, + }, + }, + { + name: "WithHealthyCustomResponseBodyWithoutCheckCollectorPipeline", + config: Config{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + }, + CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), + Path: "/", + ResponseBody: &ResponseBodySettings{Healthy: "ALL OK"}, + }, + teststeps: []teststep{ + { + step: func(hcExt *healthCheckExtension) error { return hcExt.Ready() }, + expectedStatusCode: http.StatusOK, + expectedBody: "ALL OK", + }, + }, + }, + { + name: "WithUnhealthyCustomResponseBodyWithoutCheckCollectorPipeline", + config: Config{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + }, + CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), + Path: "/", + ResponseBody: &ResponseBodySettings{Unhealthy: "NOT OK"}, + }, + teststeps: []teststep{ + { + step: func(hcExt *healthCheckExtension) error { return hcExt.Ready() }, + expectedStatusCode: http.StatusOK, + expectedBody: "", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hcExt := newServer(tt.config, componenttest.NewNopTelemetrySettings()) + require.NotNil(t, hcExt) + + require.NoError(t, hcExt.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { require.NoError(t, hcExt.Shutdown(context.Background())) }) + + // Give a chance for the server goroutine to run. + runtime.Gosched() + require.Eventuallyf(t, ensureServerRunning(tt.config.Endpoint), 30*time.Second, 1*time.Second, "Failed to start the testing server.") + + client := &http.Client{} + url := "http://" + tt.config.Endpoint + tt.config.Path + + for _, ts := range tt.teststeps { + if ts.step != nil { + require.NoError(t, ts.step(hcExt)) + } + + resp, err := client.Get(url) + require.NoError(t, err) + + if ts.expectedStatusCode != 0 { + require.Equal(t, ts.expectedStatusCode, resp.StatusCode) + } + if ts.expectedBody != "" { + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Contains(t, string(body), ts.expectedBody) + } + require.NoError(t, resp.Body.Close(), "Must be able to close the response") + } + }) + } +} + +func TestHealthCheckExtensionPortAlreadyInUse(t *testing.T) { + endpoint := testutil.GetAvailableLocalAddress(t) + + // This needs to be ":port" because health checks also tries to connect to ":port". + // To avoid the pop-up "accept incoming network connections" health check should be changed + // to accept an address. + ln, err := net.Listen("tcp", endpoint) + require.NoError(t, err) + defer ln.Close() + + config := Config{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: endpoint, + }, + CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), + } + hcExt := newServer(config, componenttest.NewNopTelemetrySettings()) + require.NotNil(t, hcExt) + + require.Error(t, hcExt.Start(context.Background(), componenttest.NewNopHost())) +} + +func TestHealthCheckMultipleStarts(t *testing.T) { + config := Config{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + }, + CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), + Path: "/", + } + + hcExt := newServer(config, componenttest.NewNopTelemetrySettings()) + require.NotNil(t, hcExt) + + require.NoError(t, hcExt.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { require.NoError(t, hcExt.Shutdown(context.Background())) }) + + require.Error(t, hcExt.Start(context.Background(), componenttest.NewNopHost())) +} + +func TestHealthCheckMultipleShutdowns(t *testing.T) { + config := Config{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + }, + CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), + Path: "/", + } + + hcExt := newServer(config, componenttest.NewNopTelemetrySettings()) + require.NotNil(t, hcExt) + + require.NoError(t, hcExt.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, hcExt.Shutdown(context.Background())) + require.NoError(t, hcExt.Shutdown(context.Background())) +} + +func TestHealthCheckShutdownWithoutStart(t *testing.T) { + config := Config{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + }, + CheckCollectorPipeline: defaultCheckCollectorPipelineSettings(), + } + + hcExt := newServer(config, componenttest.NewNopTelemetrySettings()) + require.NotNil(t, hcExt) + + require.NoError(t, hcExt.Shutdown(context.Background())) +} diff --git a/extension/healthcheckextension/internal/healthcheck/handler.go b/extension/healthcheckextension/internal/healthcheck/handler.go new file mode 100644 index 00000000..6d5b85ca --- /dev/null +++ b/extension/healthcheckextension/internal/healthcheck/handler.go @@ -0,0 +1,146 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +// +// Copyright (c) 2019 The Jaeger Authors. +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package healthcheck // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension/internal/healthcheck" + +import ( + "encoding/json" + "fmt" + "net/http" + "sync/atomic" + "time" + + "go.uber.org/zap" +) + +// Status represents the state of the service. +type Status int + +const ( + // Unavailable indicates the service is not able to handle requests + Unavailable Status = iota + // Ready indicates the service is ready to handle requests + Ready + // Broken indicates that the healthcheck itself is broken, not serving HTTP + Broken +) + +func (s Status) String() string { + switch s { + case Unavailable: + return "unavailable" + case Ready: + return "ready" + case Broken: + return "broken" + default: + return "unknown" + } +} + +type healthCheckResponse struct { + statusCode int + StatusMsg string `json:"status"` + UpSince time.Time `json:"upSince"` + Uptime string `json:"uptime"` +} + +type state struct { + status Status + upSince time.Time +} + +// HealthCheck provides an HTTP endpoint that returns the health status of the service +type HealthCheck struct { + state atomic.Value // stores state struct + logger *zap.Logger + responses map[Status]healthCheckResponse +} + +// New creates a HealthCheck with the specified initial state. +func New() *HealthCheck { + hc := &HealthCheck{ + logger: zap.NewNop(), + responses: map[Status]healthCheckResponse{ + Unavailable: { + statusCode: http.StatusServiceUnavailable, + StatusMsg: "Server not available", + }, + Ready: { + statusCode: http.StatusOK, + StatusMsg: "Server available", + }, + }, + } + hc.state.Store(state{status: Unavailable}) + return hc +} + +// SetLogger initializes a logger. +func (hc *HealthCheck) SetLogger(logger *zap.Logger) { + hc.logger = logger +} + +// Handler creates a new HTTP handler. +func (hc *HealthCheck) Handler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + template := hc.responses[Ready] + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(template.statusCode) + + _, _ = w.Write(hc.createRespBody(state{status: Ready}, template)) + }) +} + +func (hc *HealthCheck) createRespBody(state state, template healthCheckResponse) []byte { + resp := template // clone + if state.status == Ready { + resp.UpSince = state.upSince + resp.Uptime = fmt.Sprintf("%v", time.Since(state.upSince)) + } + healthCheckStatus, _ := json.Marshal(resp) + return healthCheckStatus +} + +// Set a new health check status +func (hc *HealthCheck) Set(status Status) { + oldState := hc.getState() + newState := state{status: status} + if status == Ready { + if oldState.status != Ready { + newState.upSince = time.Now() + } + } + hc.state.Store(newState) + hc.logger.Info("Health Check state change", zap.Stringer("status", status)) +} + +// Get the current status of this health check +func (hc *HealthCheck) Get() Status { + return hc.getState().status +} + +func (hc *HealthCheck) getState() state { + return hc.state.Load().(state) +} + +// Ready is a shortcut for Set(Ready) (kept for backwards compatibility) +func (hc *HealthCheck) Ready() { + hc.Set(Ready) +} diff --git a/extension/healthcheckextension/internal/metadata/generated_status.go b/extension/healthcheckextension/internal/metadata/generated_status.go new file mode 100644 index 00000000..a842ed2f --- /dev/null +++ b/extension/healthcheckextension/internal/metadata/generated_status.go @@ -0,0 +1,16 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "go.opentelemetry.io/collector/component" +) + +var ( + Type = component.MustNewType("signoz_health_check") + ScopeName = "github.com/SigNoz/signoz-otel-collector/extension/healthcheckextension" +) + +const ( + ExtensionStability = component.StabilityLevelBeta +) diff --git a/extension/healthcheckextension/metadata.yaml b/extension/healthcheckextension/metadata.yaml new file mode 100644 index 00000000..e9fa29a5 --- /dev/null +++ b/extension/healthcheckextension/metadata.yaml @@ -0,0 +1,13 @@ +type: health_check + +status: + class: extension + stability: + beta: [extension] + distributions: [core, contrib] + codeowners: + active: [jpkrohling] + +tests: + config: + endpoint: localhost:0 diff --git a/extension/healthcheckextension/testdata/config.yaml b/extension/healthcheckextension/testdata/config.yaml new file mode 100644 index 00000000..d6b64afc --- /dev/null +++ b/extension/healthcheckextension/testdata/config.yaml @@ -0,0 +1,30 @@ +signoz_health_check: +signoz_health_check/1: + endpoint: "localhost:13" + tls: + ca_file: "/path/to/ca" + key_file: "/path/to/key" + cert_file: "/path/to/cert" + check_collector_pipeline: + enabled: false + interval: "5m" + exporter_failure_threshold: 5 +signoz_health_check/missingendpoint: + endpoint: "" + check_collector_pipeline: + enabled: false + interval: "5m" + exporter_failure_threshold: 5 +signoz_health_check/invalidthreshold: + endpoint: "localhost:13" + check_collector_pipeline: + enabled: false + interval: "5m" + exporter_failure_threshold: -1 +signoz_health_check/invalidpath: + endpoint: "localhost:13" + path: "invalid" + check_collector_pipeline: + enabled: false + interval: "5m" + exporter_failure_threshold: 5 diff --git a/internal/common/testutil/util.go b/internal/common/testutil/util.go new file mode 100644 index 00000000..bc4e4983 --- /dev/null +++ b/internal/common/testutil/util.go @@ -0,0 +1,147 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testutil + +import ( + "net" + "os/exec" + "runtime" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/featuregate" +) + +type portpair struct { + first string + last string +} + +// GetAvailableLocalAddress finds an available local port on tcp network and returns an endpoint +// describing it. The port is available for opening when this function returns +// provided that there is no race by some other code to grab the same port +// immediately. +func GetAvailableLocalAddress(t testing.TB) string { + return GetAvailableLocalNetworkAddress(t, "tcp") +} + +// GetAvailableLocalNetworkAddress finds an available local port on specified network and returns an endpoint +// describing it. The port is available for opening when this function returns +// provided that there is no race by some other code to grab the same port +// immediately. +func GetAvailableLocalNetworkAddress(t testing.TB, network string) string { + // Retry has been added for windows as net.Listen can return a port that is not actually available. Details can be + // found in https://github.com/docker/for-win/issues/3171 but to summarize Hyper-V will reserve ranges of ports + // which do not show up under the "netstat -ano" but can only be found by + // "netsh interface ipv4 show excludedportrange protocol=tcp". We'll use []exclusions to hold those ranges and + // retry if the port returned by GetAvailableLocalAddress falls in one of those them. + var exclusions []portpair + + portFound := false + if runtime.GOOS == "windows" { + exclusions = getExclusionsList(t) + } + + var endpoint string + for !portFound { + endpoint = findAvailableAddress(t, network) + _, port, err := net.SplitHostPort(endpoint) + require.NoError(t, err) + portFound = true + if runtime.GOOS == "windows" { + for _, pair := range exclusions { + if port >= pair.first && port <= pair.last { + portFound = false + break + } + } + } + } + + return endpoint +} + +func findAvailableAddress(t testing.TB, network string) string { + switch network { + // net.Listen supported network strings + case "tcp", "tcp4", "tcp6", "unix", "unixpacket": + ln, err := net.Listen(network, "localhost:0") + require.NoError(t, err, "Failed to get a free local port") + // There is a possible race if something else takes this same port before + // the test uses it, however, that is unlikely in practice. + defer func() { + assert.NoError(t, ln.Close()) + }() + return ln.Addr().String() + // net.ListenPacket supported network strings + case "udp", "udp4", "udp6", "unixgram": + ln, err := net.ListenPacket(network, "localhost:0") + require.NoError(t, err, "Failed to get a free local port") + // There is a possible race if something else takes this same port before + // the test uses it, however, that is unlikely in practice. + defer func() { + assert.NoError(t, ln.Close()) + }() + return ln.LocalAddr().String() + } + return "" +} + +// Get excluded ports on Windows from the command: netsh interface ipv4 show excludedportrange protocol=tcp +func getExclusionsList(t testing.TB) []portpair { + cmdTCP := exec.Command("netsh", "interface", "ipv4", "show", "excludedportrange", "protocol=tcp") + outputTCP, errTCP := cmdTCP.CombinedOutput() + require.NoError(t, errTCP) + exclusions := createExclusionsList(t, string(outputTCP)) + + cmdUDP := exec.Command("netsh", "interface", "ipv4", "show", "excludedportrange", "protocol=udp") + outputUDP, errUDP := cmdUDP.CombinedOutput() + require.NoError(t, errUDP) + exclusions = append(exclusions, createExclusionsList(t, string(outputUDP))...) + + return exclusions +} + +func createExclusionsList(t testing.TB, exclusionsText string) []portpair { + var exclusions []portpair + + parts := strings.Split(exclusionsText, "--------") + require.Len(t, parts, 3) + portsText := strings.Split(parts[2], "*") + require.Greater(t, len(portsText), 1) // original text may have a suffix like " - Administered port exclusions." + lines := strings.Split(portsText[0], "\n") + for _, line := range lines { + if strings.TrimSpace(line) != "" { + entries := strings.Fields(strings.TrimSpace(line)) + require.Len(t, entries, 2) + pair := portpair{entries[0], entries[1]} + exclusions = append(exclusions, pair) + } + } + return exclusions +} + +// Force the state of feature gate for a test +// usage: defer SetFeatureGateForTest("gateName", true)() +func SetFeatureGateForTest(t testing.TB, gate *featuregate.Gate, enabled bool) func() { + originalValue := gate.IsEnabled() + require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), enabled)) + return func() { + require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), originalValue)) + } +} + +func GetAvailablePort(t testing.TB) int { + endpoint := GetAvailableLocalAddress(t) + _, port, err := net.SplitHostPort(endpoint) + require.NoError(t, err) + + portInt, err := strconv.Atoi(port) + require.NoError(t, err) + + return portInt +}