Skip to content

Commit

Permalink
added nats sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
vijeyash1 committed May 14, 2024
1 parent 1f965d9 commit 3678e1d
Show file tree
Hide file tree
Showing 19 changed files with 197 additions and 456 deletions.
41 changes: 8 additions & 33 deletions agent/kubviz/k8smetrics_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (

//"github.com/go-co-op/gocron"
"github.com/go-co-op/gocron"
"github.com/nats-io/nats.go"
"github.com/intelops/kubviz/constants"
"github.com/intelops/kubviz/pkg/nats/sdk"

"context"

"github.com/intelops/kubviz/pkg/mtlsnats"
"github.com/intelops/kubviz/pkg/opentelemetry"

"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -56,8 +56,6 @@ const (
// nats token, natsurl, clustername
var (
ClusterName string = os.Getenv("CLUSTER_NAME")
token string = os.Getenv("NATS_TOKEN")
natsurl string = os.Getenv("NATS_ADDRESS")

//for local testing provide the location of kubeconfig
cluster_conf_loc string = os.Getenv("CONFIG_LOCATION")
Expand All @@ -77,36 +75,13 @@ func main() {
clientset *kubernetes.Clientset
)

var mtlsConfig mtlsnats.MtlsConfig
var nc *nats.Conn

if mtlsConfig.IsEnabled {
tlsConfig, err := mtlsnats.GetTlsConfig()
if err != nil {
log.Println("error while getting tls config ", err)
time.Sleep(time.Minute * 30)
} else {
nc, err = nats.Connect(
natsurl,
nats.Name("K8s Metrics"),
nats.Token(token),
nats.Secure(tlsConfig),
)
if err != nil {
log.Println("error while connecting with mtls ", err)
}
}
natsCli, err := sdk.NewNATSClient()

if err != nil {
log.Fatalf("error occured while creating nats client %v", err.Error())
}

if nc == nil {
nc, err = nats.Connect(natsurl, nats.Name("K8s Metrics"), nats.Token(token))
events.CheckErr(err)
}
js, err := nc.JetStream()
events.CheckErr(err)
err = events.CreateStream(js)
events.CheckErr(err)
natsCli.CreateStream(constants.StreamName)
if env != Production {
config, err = clientcmd.BuildConfigFromFlags("", cluster_conf_loc)
if err != nil {
Expand All @@ -131,9 +106,9 @@ func main() {
}
}()

go events.PublishMetrics(clientset, js, clusterMetricsChan)
go events.PublishMetrics(clientset, natsCli, clusterMetricsChan)
if cfg.KuberHealthyEnable {
go kuberhealthy.StartKuberHealthy(js)
go kuberhealthy.StartKuberHealthy(natsCli)
}
go server.StartServer()
collectAndPublishMetrics := func() {
Expand Down
17 changes: 9 additions & 8 deletions agent/kubviz/plugins/events/event_metrics_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/intelops/kubviz/constants"
"github.com/intelops/kubviz/model"
"github.com/intelops/kubviz/pkg/nats/sdk"
"github.com/intelops/kubviz/pkg/opentelemetry"
"github.com/nats-io/nats.go"
"go.opentelemetry.io/otel"
Expand All @@ -29,19 +30,19 @@ var ClusterName string = os.Getenv("CLUSTER_NAME")

// publishMetrics publishes stream of events
// with subject "METRICS.created"
func PublishMetrics(clientset *kubernetes.Clientset, js nats.JetStreamContext, errCh chan error) {
func PublishMetrics(clientset *kubernetes.Clientset, natsCli *sdk.NATSClient, errCh chan error) {

ctx := context.Background()
tracer := otel.Tracer("kubviz-publish-metrics")
_, span := tracer.Start(opentelemetry.BuildContext(ctx), "publishMetrics")
span.SetAttributes(attribute.String("kubviz-agent", "publish-metrics"))
defer span.End()

watchK8sEvents(clientset, js)
watchK8sEvents(clientset, natsCli)
errCh <- nil
}

func publishK8sMetrics(id string, mtype string, mdata *v1.Event, js nats.JetStreamContext, imageName string) (bool, error) {
func publishK8sMetrics(id string, mtype string, mdata *v1.Event, natsCli *sdk.NATSClient, imageName string) (bool, error) {

ctx := context.Background()
tracer := otel.Tracer("kubviz-publish-k8smetrics")
Expand All @@ -57,7 +58,7 @@ func publishK8sMetrics(id string, mtype string, mdata *v1.Event, js nats.JetStre
ImageName: imageName,
}
metricsJson, _ := json.Marshal(metrics)
_, err := js.Publish(constants.EventSubject, metricsJson)
err := natsCli.Publish(constants.EventSubject, metricsJson)
if err != nil {
return true, err
}
Expand Down Expand Up @@ -164,7 +165,7 @@ func LogErr(err error) {
log.Println(err)
}
}
func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) {
func watchK8sEvents(clientset *kubernetes.Clientset, natsCli *sdk.NATSClient) {

ctx := context.Background()
tracer := otel.Tracer("kubviz-watch-k8sevents")
Expand All @@ -191,7 +192,7 @@ func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) {
return
}
for _, image := range images {
publishK8sMetrics(string(event.ObjectMeta.UID), "ADD", event, js, image)
publishK8sMetrics(string(event.ObjectMeta.UID), "ADD", event, natsCli, image)
}
},
DeleteFunc: func(obj interface{}) {
Expand All @@ -202,7 +203,7 @@ func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) {
return
}
for _, image := range images {
publishK8sMetrics(string(event.ObjectMeta.UID), "DELETE", event, js, image)
publishK8sMetrics(string(event.ObjectMeta.UID), "DELETE", event, natsCli, image)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
Expand All @@ -213,7 +214,7 @@ func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) {
return
}
for _, image := range images {
publishK8sMetrics(string(event.ObjectMeta.UID), "UPDATE", event, js, image)
publishK8sMetrics(string(event.ObjectMeta.UID), "UPDATE", event, natsCli, image)
}
},
},
Expand Down
15 changes: 7 additions & 8 deletions agent/kubviz/plugins/kuberhealthy/kuberhealthy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import (

"github.com/intelops/kubviz/agent/config"
"github.com/intelops/kubviz/constants"
"github.com/intelops/kubviz/pkg/nats/sdk"
"github.com/intelops/kubviz/pkg/opentelemetry"
"github.com/kuberhealthy/kuberhealthy/v2/pkg/health"
"github.com/nats-io/nats.go"
"go.opentelemetry.io/otel"
)

func StartKuberHealthy(js nats.JetStreamContext) {
func StartKuberHealthy(natsCli *sdk.NATSClient) {
khConfig, err := config.GetKuberHealthyConfig()
if err != nil {
log.Fatalf("Error getting Kuberhealthy config: %v", err)
Expand All @@ -27,12 +28,12 @@ func StartKuberHealthy(js nats.JetStreamContext) {
defer ticker.Stop()

for range ticker.C {
if err := pollAndPublishKuberhealthy(khConfig.KuberhealthyURL, js); err != nil {
if err := pollAndPublishKuberhealthy(khConfig.KuberhealthyURL, natsCli); err != nil {
log.Printf("Error polling and publishing Kuberhealthy metrics: %v", err)
}
}
}
func pollAndPublishKuberhealthy(url string, js nats.JetStreamContext) error {
func pollAndPublishKuberhealthy(url string, natsCli *sdk.NATSClient) error {
resp, err := http.Get(url)
if err != nil {
return fmt.Errorf("error making GET request to Kuberhealthy: %w", err)
Expand All @@ -49,10 +50,10 @@ func pollAndPublishKuberhealthy(url string, js nats.JetStreamContext) error {
return fmt.Errorf("error unmarshaling response: %w", err)
}

return PublishKuberhealthyMetrics(js, state)
return PublishKuberhealthyMetrics(natsCli, state)
}

func PublishKuberhealthyMetrics(js nats.JetStreamContext, state health.State) error {
func PublishKuberhealthyMetrics(natsCli *sdk.NATSClient, state health.State) error {
ctx := context.Background()
tracer := otel.Tracer("kuberhealthy")
_, span := tracer.Start(opentelemetry.BuildContext(ctx), "PublishKuberhealthyMetrics")
Expand All @@ -63,12 +64,10 @@ func PublishKuberhealthyMetrics(js nats.JetStreamContext, state health.State) er
log.Printf("Error marshaling metrics of kuberhealthy %v", err)
return err
}

if _, err := js.Publish(constants.KUBERHEALTHY_SUBJECT, metricsJSON); err != nil {
if err := natsCli.Publish(constants.KUBERHEALTHY_SUBJECT, metricsJSON); err != nil {
log.Printf("Error publishing metrics for kuberhealthy %v", err)
return err
}

log.Printf("Kuberhealthy metrics have been published")
return nil
}
68 changes: 68 additions & 0 deletions pkg/nats/sdk/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package sdk

import (
"errors"
"fmt"
"log"

"github.com/nats-io/nats.go"
)

type NATSClient struct {
conn *nats.Conn
js nats.JetStreamContext
config natsConfig
}

func NewNATSClient() (*NATSClient, error) {
config, err := loadNatsConfig()
if err != nil {
return nil, errors.New("Unable to load the nats configurations , error :" + err.Error())
}
options := []nats.Option{}
if config.EnableToken {
options = append(options, nats.Token(config.NatsToken))
}
if config.MtlsConfig.IsEnabled {
tlsConfig, err := createTLSConfig(config.MtlsConfig)
if err != nil {
return nil, err
}
options = append(options, nats.Secure(tlsConfig))
}
conn, err := nats.Connect(config.NatsAddress, options...)
if err != nil {
return nil, err
}

js, err := conn.JetStream()
if err != nil {
return nil, err
}

return &NATSClient{conn: conn, js: js, config: *config}, nil
}

func (natsCli *NATSClient) CreateStream(streamName string) error {
stream, err := natsCli.js.StreamInfo(streamName)
log.Printf("Retrieved stream %s", fmt.Sprintf("%v", stream))
if err != nil {
log.Printf("Error getting stream %s", err)
}
if stream == nil {
log.Printf("creating stream %q and subjects %q", streamName, streamName+".*")
_, err = natsCli.js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{streamName + ".*"},
})
if err != nil {
return err
}
}
return nil
}

func (natsCli *NATSClient) Publish(subject string, data []byte) error {
_, err := natsCli.js.Publish(subject, data)
return err
}
28 changes: 28 additions & 0 deletions pkg/nats/sdk/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package sdk

import (
"github.com/kelseyhightower/envconfig"
"github.com/pkg/errors"
)

type natsConfig struct {
NatsAddress string `envconfig:"NATS_ADDRESS"`
NatsToken string `envconfig:"NATS_TOKEN"`
MtlsConfig mtlsConfig
EnableToken bool `envconfig:"ENABLE_TOKEN"`
}

type mtlsConfig struct {
CertificateFilePath string `envconfig:"CERT_FILE" default:""`
KeyFilePath string `envconfig:"KEY_FILE" default:""`
CAFilePath string `envconfig:"CA_FILE" default:""`
IsEnabled bool `envconfig:"ENABLE_MTLS_NATS" default:"false"`
}

func loadNatsConfig() (*natsConfig, error) {
natsConf := &natsConfig{}
if err := envconfig.Process("", natsConf); err != nil {
return nil, errors.WithStack(err)
}
return natsConf, nil
}
77 changes: 77 additions & 0 deletions pkg/nats/sdk/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package sdk

import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"os"
)

func createTLSConfig(config mtlsConfig) (*tls.Config, error) {
certPEM, keyPEM, CACertPEM, err := readMtlsCerts(config.CertificateFilePath, config.KeyFilePath, config.CAFilePath)
if err != nil {
return nil, errors.New("unable to read the mtls certificates error:" + err.Error())
}
cert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
return nil, fmt.Errorf("error loading X509 key pair from PEM: %w", err)
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(CACertPEM)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: false,
}
return tlsConfig, nil
}

func readMtlsCerts(certificateFilePath, keyFilePath, CAFilePath string) (certPEM, keyPEM, CACertPEM []byte, err error) {
certPEM, err = readMtlsFileContents(certificateFilePath)
if err != nil {
err = fmt.Errorf("error while reading cert file: %w", err)
return
}

keyPEM, err = readMtlsFileContents(keyFilePath)
if err != nil {
err = fmt.Errorf("error while reading key file: %w", err)
return
}

CACertPEM, err = readMtlsFileContents(CAFilePath)
if err != nil {
err = fmt.Errorf("error while reading CAcert file: %w", err)
return
}

return

}

func openMtlsCertFile(filepath string) (f *os.File, err error) {
f, err = os.Open(filepath)
if err != nil {
return nil, fmt.Errorf("failed to open mtls certificate file: %w", err)
}
return f, nil
}

func readMtlsFileContents(filePath string) ([]byte, error) {
file, err := openMtlsCertFile(filePath)
if err != nil {
return nil, err
}

defer file.Close()

contents, err := io.ReadAll(file)
if err != nil {
return nil, fmt.Errorf("error while reading file %s:%w", filePath, err)
}

return contents, nil
}
Loading

0 comments on commit 3678e1d

Please sign in to comment.