diff --git a/.github/workflows/climon-docker-image.yml b/.github/workflows/climon-docker-image.yml new file mode 100644 index 00000000..a464252e --- /dev/null +++ b/.github/workflows/climon-docker-image.yml @@ -0,0 +1,35 @@ +name: Climon Docker Image CI + +on: + push: + paths-ignore: + - 'charts/**' + - '**.md' + branches: [ main ] + pull_request: + paths-ignore: + - 'helm/**' + - '**.md' + branches: [ main ] + +env: + # Use docker.io for Docker Hub if empty + REGISTRY: ghcr.io + # github.repository as / + IMAGE_NAME: ${{ github.repository }} + +jobs: + + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Build the Docker image + run: docker build . --file dockerfiles/climon/Dockerfile --tag ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}/climon:latest + - name: Docker push + run: | + docker login ${{ env.REGISTRY }} -u jebinjeb -p ${{ secrets.GITHUB_TOKEN }} + docker push ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}/climon:latest + if: github.event_name == 'push' diff --git a/.github/workflows/config-worker-docker-image.yml b/.github/workflows/config-worker-docker-image.yml index 500d891a..40e7a1c8 100644 --- a/.github/workflows/config-worker-docker-image.yml +++ b/.github/workflows/config-worker-docker-image.yml @@ -1,4 +1,4 @@ -name: Client Docker Image CI +name: Config worker Docker Image CI on: push: diff --git a/.github/workflows/deployment-worker-docker-image.yml b/.github/workflows/deployment-worker-docker-image.yml index 354d292f..83acb421 100644 --- a/.github/workflows/deployment-worker-docker-image.yml +++ b/.github/workflows/deployment-worker-docker-image.yml @@ -1,4 +1,4 @@ -name: Client Docker Image CI +name: Deployment worker Docker Image CI on: push: diff --git a/.github/workflows/server-docker-image.yml b/.github/workflows/server-docker-image.yml index 12a19c77..939e16de 100644 --- a/.github/workflows/server-docker-image.yml +++ b/.github/workflows/server-docker-image.yml @@ -1,4 +1,4 @@ -name: Client Docker Image CI +name: Server Docker Image CI on: push: diff --git a/Makefile b/Makefile index 630ea6c3..ec760953 100644 --- a/Makefile +++ b/Makefile @@ -3,6 +3,7 @@ SERVER_APP_NAME := server AGENT_APP_NAME := agent DEPLOYMENT_WORKER_APP_NAME := deployment-worker CONFIG_WORKER_APP_NAME := config-worker +CLIMON_APP_NAME := climon BUILD := 0.1.1 gen-protoc: @@ -37,4 +38,7 @@ docker-build-deployment: docker-build-config: docker build -f dockerfiles/config-worker/Dockerfile -t ${PREFIX}-${CONFIG_WORKER_APP_NAME}:${BUILD} . -docker-build: docker-build-kad docker-build-server +docker-build-climon: + docker build -f dockerfiles/climon/Dockerfile -t ${PREFIX}-${CLIMON_APP_NAME}:${BUILD} . + +docker-build: docker-build-kad docker-build-server docker-build-climon diff --git a/charts/kad/Chart.yaml b/charts/kad/Chart.yaml index d0955f35..ef9eaf17 100644 --- a/charts/kad/Chart.yaml +++ b/charts/kad/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.1.12 +version: 0.1.13 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/kad/templates/climon-deployment.yaml b/charts/kad/templates/climon-deployment.yaml new file mode 100644 index 00000000..e082908a --- /dev/null +++ b/charts/kad/templates/climon-deployment.yaml @@ -0,0 +1,89 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "kad.fullname" . }}-climon + labels: + {{- include "kad.labels" . | nindent 4 }} + app.kubernetes.io/component: climon +spec: + {{- if not .Values.autoscaling.enabled }} + replicas: {{ .Values.replicaCount }} + {{- end }} + selector: + matchLabels: + {{- include "kad.selectorLabels" . | nindent 6 }} + app.kubernetes.io/component: climon + template: + metadata: + {{- with .Values.podAnnotations }} + annotations: + {{- toYaml . | nindent 8 }} + {{- end }} + labels: + {{- include "kad.selectorLabels" . | nindent 8 }} + app.kubernetes.io/component: climon + spec: + serviceAccountName: {{ include "kad.serviceAccountName" . }} + {{- with .Values.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} + securityContext: + {{- toYaml .Values.podSecurityContext | nindent 8 }} + containers: + - name: {{ .Chart.Name }}-climon + securityContext: + {{- toYaml .Values.securityContext | nindent 12 }} + image: "{{ .Values.climon.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + ports: + - name: http + containerPort: {{ .Values.service.port }} + protocol: TCP + livenessProbe: + httpGet: + path: /status + port: http + readinessProbe: + httpGet: + path: /status + port: http + env: + - name: TEMPORAL_SERVICE_URL + value: "{{ .Values.temporal.temporalServiceURL }}.{{ .Release.Namespace }}.svc.cluster.local:7233" + - name: PORT + value: "{{ .Values.service.port }}" + - name: CASSANDRA_SERVICE_URL + value: "{{ .Values.cassandra.serviceURL }}" + - name: CASSANDRA_KEYSPACE_NAME + value: "{{ .Values.cassandra.keyspaceName }}" + - name: CASSANDRA_TABLE_NAME + value: "{{ .Values.cassandra.name }}" + - name: CASSANDRA_USERNAME + value: "{{ .Values.cassandra.username }}" + - name: CASSANDRA_PASSWORD + value: "{{ .Values.cassandra.password }}" + # valueFrom: + # secretKeyRef: + # name: "{{ .Values.cassandra.secretName }}" + # key: "password" + resources: + {{- toYaml .Values.resources | nindent 12 }} + volumeMounts: + - mountPath: /tmp + name: tmp-volume + volumes: + - name: tmp-volume + emptyDir: {} + {{- with .Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} \ No newline at end of file diff --git a/charts/kad/templates/cluster-role-binding.yaml b/charts/kad/templates/cluster-role-binding.yaml index 33549b3b..bc9fec1e 100644 --- a/charts/kad/templates/cluster-role-binding.yaml +++ b/charts/kad/templates/cluster-role-binding.yaml @@ -9,5 +9,5 @@ subjects: namespace: {{ .Release.Namespace }} roleRef: kind: ClusterRole - name: {{ include "kad.fullname" . }} + name: cluster-admin apiGroup: rbac.authorization.k8s.io diff --git a/charts/kad/templates/cluster-role.yaml b/charts/kad/templates/cluster-role.yaml index b840cd6d..4d7160ba 100644 --- a/charts/kad/templates/cluster-role.yaml +++ b/charts/kad/templates/cluster-role.yaml @@ -1,19 +1,19 @@ -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRole -metadata: - name: {{ include "kad.fullname" . }} -rules: -- apiGroups: ["*"] - # - # at the HTTP level, the name of the resource for accessing Secret - # objects is "secrets" - resources: ["*"] - verbs: - - get - - list - - watch - - create - - update - - patch - - delete +# apiVersion: rbac.authorization.k8s.io/v1 +# kind: ClusterRole +# metadata: +# name: {{ include "kad.fullname" . }} +# rules: +# - apiGroups: ["*"] +# # +# # at the HTTP level, the name of the resource for accessing Secret +# # objects is "secrets" +# resources: ["*"] +# verbs: +# - get +# - list +# - watch +# - create +# - update +# - patch +# - delete diff --git a/charts/kad/values.yaml b/charts/kad/values.yaml index 8ccdbfbf..f131628f 100644 --- a/charts/kad/values.yaml +++ b/charts/kad/values.yaml @@ -13,6 +13,8 @@ deployment_worker: repository: ghcr.io/kube-tarian/kad/deployment-worker config_worker: repository: ghcr.io/kube-tarian/kad/config-worker +climon: + repository: ghcr.io/kube-tarian/kad/climon imagePullSecrets: [] nameOverride: "" @@ -92,6 +94,6 @@ argocd: cassandra: username: user secretName: "temporal-default-store" - serviceURL: "temporal-cassandra.default.svc.cluster" + serviceURL: "temporal-cassandra" keyspaceName: "capten" tableName: "tools" \ No newline at end of file diff --git a/charts/server/Chart.yaml b/charts/server/Chart.yaml index 6c8199f4..d91cdd4f 100644 --- a/charts/server/Chart.yaml +++ b/charts/server/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.1.8 +version: 0.1.9 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/dockerfiles/climon/Dockerfile b/dockerfiles/climon/Dockerfile new file mode 100644 index 00000000..1d140479 --- /dev/null +++ b/dockerfiles/climon/Dockerfile @@ -0,0 +1,13 @@ +FROM golang:1.19.4 AS builder +WORKDIR / +COPY ./integrator ./ +RUN rm -rf vendor + +RUN go mod download +RUN CGO_ENABLED=0 go build -o ./build/climon climon/main.go + +FROM scratch +COPY --from=builder ./build/climon climon + +USER 65532:65532 +ENTRYPOINT ["./climon"] diff --git a/dockerfiles/deployment-worker/Dockerfile b/dockerfiles/deployment-worker/Dockerfile index 92a26478..52d4ff44 100644 --- a/dockerfiles/deployment-worker/Dockerfile +++ b/dockerfiles/deployment-worker/Dockerfile @@ -4,10 +4,9 @@ COPY ./integrator ./ RUN rm -rf vendor RUN go mod download -RUN go build -o ./build/deployment-worker deployment-worker/main.go +RUN CGO_ENABLED=0 go build -o ./build/deployment-worker deployment-worker/main.go -FROM alpine:3.16 -RUN apk add --no-cache libc6-compat +FROM scratch COPY --from=builder ./build/deployment-worker deployment-worker USER 65532:65532 diff --git a/integrator/Makefile b/integrator/Makefile index 9dd646d8..3e7dbf6e 100644 --- a/integrator/Makefile +++ b/integrator/Makefile @@ -32,6 +32,7 @@ build: go mod download CGO_ENABLED=0 go build -o build/deployment_worker deployment-worker/main.go CGO_ENABLED=0 go build -o build/config_worker config-worker/main.go + CGO_ENABLED=0 go build -o build/climon climon/main.go CGO_ENABLED=0 go build -o build/agent agent/cmd/agent/main.go clean: diff --git a/integrator/agent/pkg/server/agent.go b/integrator/agent/pkg/server/agent.go index 56c26f8f..de7d1224 100644 --- a/integrator/agent/pkg/server/agent.go +++ b/integrator/agent/pkg/server/agent.go @@ -48,7 +48,7 @@ func (a *Agent) SubmitJob(ctx context.Context, request *agentpb.JobRequest) (*ag func (a *Agent) getWorker(operatoin string) (workers.Worker, error) { switch operatoin { case "climon": - return workers.NewClimon(a.client), nil + return workers.NewClimon(a.client, a.log), nil case "deployment": return workers.NewDeployment(a.client, a.log), nil case "config": diff --git a/integrator/agent/pkg/workers/climon.go b/integrator/agent/pkg/workers/climon.go index 89aee15b..497b6852 100644 --- a/integrator/agent/pkg/workers/climon.go +++ b/integrator/agent/pkg/workers/climon.go @@ -3,27 +3,33 @@ package workers import ( "context" "encoding/json" + "fmt" "log" + "time" + "github.com/kube-tarian/kad/integrator/agent/pkg/model" "github.com/kube-tarian/kad/integrator/agent/pkg/temporalclient" + "github.com/kube-tarian/kad/integrator/common-pkg/logging" "go.temporal.io/sdk/client" ) -type climon struct { +type Climon struct { client *temporalclient.Client + log logging.Logger } -func NewClimon(client *temporalclient.Client) *climon { - return &climon{ +func NewClimon(client *temporalclient.Client, log logging.Logger) *Climon { + return &Climon{ client: client, + log: log, } } -func (c *climon) GetWorkflowName() string { +func (c *Climon) GetWorkflowName() string { return DeployWorkflowName } -func (c *climon) SendEvent(ctx context.Context, deployPayload json.RawMessage) (client.WorkflowRun, error) { +func (c *Climon) SendEvent(ctx context.Context, deployPayload json.RawMessage) (client.WorkflowRun, error) { options := client.StartWorkflowOptions{ ID: "helm-deploy-workflow", TaskQueue: ClimonHelmTaskQueue, @@ -48,5 +54,48 @@ func (c *climon) SendEvent(ctx context.Context, deployPayload json.RawMessage) ( } //printResults(deployInfo, we.GetID(), we.GetRunID()) + c.log.Infof("Started workflow, ID: %v, WorkflowName: %v RunID: %v", we.GetID(), DeploymentWorkerWorkflowName, we.GetRunID()) + + // Wait for 5mins till workflow finishes + // Timeout with 5mins + var result model.ResponsePayload + err = we.Get(ctx, &result) + if err != nil { + c.log.Errorf("Result for workflow ID: %v, workflowName: %v, runID: %v", we.GetID(), DeploymentWorkerWorkflowName, we.GetRunID()) + c.log.Errorf("Workflow result failed, %v", err) + return we, err + } + c.log.Infof("workflow finished success, %+v", result.ToString()) + return we, nil } + +func (d *Climon) getWorkflowStatusByLatestWorkflow(run client.WorkflowRun) error { + ticker := time.NewTicker(500 * time.Millisecond) + for { + select { + case <-ticker.C: + err := d.getWorkflowInformation(run) + if err != nil { + d.log.Errorf("get state of workflow failed: %v, retrying .....", err) + continue + } + return nil + case <-time.After(5 * time.Minute): + d.log.Errorf("Timed out waiting for state of workflow") + return fmt.Errorf("timedout waiting for the workflow to finish") + } + } +} + +func (d *Climon) getWorkflowInformation(run client.WorkflowRun) error { + latestRun := d.client.TemporalClient.GetWorkflow(context.Background(), run.GetID(), "") + + var result model.ResponsePayload + if err := latestRun.Get(context.Background(), &result); err != nil { + d.log.Errorf("Unable to decode query result", err) + return err + } + d.log.Debugf("Result info: %+v", result) + return nil +} diff --git a/integrator/agent/pkg/workers/types.go b/integrator/agent/pkg/workers/types.go index 59c7c5d9..81dcd8f8 100644 --- a/integrator/agent/pkg/workers/types.go +++ b/integrator/agent/pkg/workers/types.go @@ -9,7 +9,7 @@ import ( const ( ClimonHelmTaskQueue = "CLIMON_HELM_TASK_QUEUE" - DeployWorkflowName = "DeployApp" + DeployWorkflowName = "Workflow" ) type Worker interface { diff --git a/integrator/climon/Dockerfile b/integrator/climon/Dockerfile new file mode 100755 index 00000000..fb719333 --- /dev/null +++ b/integrator/climon/Dockerfile @@ -0,0 +1,12 @@ +FROM golang:1.19 AS builder +WORKDIR / +COPY ./ ./ + +RUN go mod download +RUN CGO_ENABLED=0 go build -o ./build/climon climon/main.go + +FROM scratch +COPY --from=builder ./build/climon climon + +USER 65532:65532 +ENTRYPOINT ["./climon"] diff --git a/integrator/climon/api/climon.gen.go b/integrator/climon/api/climon.gen.go new file mode 100644 index 00000000..e7fe5d51 --- /dev/null +++ b/integrator/climon/api/climon.gen.go @@ -0,0 +1,273 @@ +// Package api provides primitives to interact with the openapi HTTP API. +// +// Code generated by github.com/deepmap/oapi-codegen version v1.12.2 DO NOT EDIT. +package api + +import ( + "bytes" + "compress/gzip" + "encoding/base64" + "fmt" + "net/http" + "net/url" + "path" + "strings" + + "github.com/getkin/kin-openapi/openapi3" + "github.com/go-chi/chi/v5" +) + +// ServerInterface represents all server handlers. +type ServerInterface interface { + // List of APIs provided by the service + // (GET /api-docs) + GetApiDocs(w http.ResponseWriter, r *http.Request) + // Kubernetes readiness and liveness probe endpoint + // (GET /status) + GetStatus(w http.ResponseWriter, r *http.Request) +} + +// ServerInterfaceWrapper converts contexts to parameters. +type ServerInterfaceWrapper struct { + Handler ServerInterface + HandlerMiddlewares []MiddlewareFunc + ErrorHandlerFunc func(w http.ResponseWriter, r *http.Request, err error) +} + +type MiddlewareFunc func(http.Handler) http.Handler + +// GetApiDocs operation middleware +func (siw *ServerInterfaceWrapper) GetApiDocs(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + siw.Handler.GetApiDocs(w, r) + }) + + for _, middleware := range siw.HandlerMiddlewares { + handler = middleware(handler) + } + + handler.ServeHTTP(w, r.WithContext(ctx)) +} + +// GetStatus operation middleware +func (siw *ServerInterfaceWrapper) GetStatus(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + siw.Handler.GetStatus(w, r) + }) + + for _, middleware := range siw.HandlerMiddlewares { + handler = middleware(handler) + } + + handler.ServeHTTP(w, r.WithContext(ctx)) +} + +type UnescapedCookieParamError struct { + ParamName string + Err error +} + +func (e *UnescapedCookieParamError) Error() string { + return fmt.Sprintf("error unescaping cookie parameter '%s'", e.ParamName) +} + +func (e *UnescapedCookieParamError) Unwrap() error { + return e.Err +} + +type UnmarshallingParamError struct { + ParamName string + Err error +} + +func (e *UnmarshallingParamError) Error() string { + return fmt.Sprintf("Error unmarshalling parameter %s as JSON: %s", e.ParamName, e.Err.Error()) +} + +func (e *UnmarshallingParamError) Unwrap() error { + return e.Err +} + +type RequiredParamError struct { + ParamName string +} + +func (e *RequiredParamError) Error() string { + return fmt.Sprintf("Query argument %s is required, but not found", e.ParamName) +} + +type RequiredHeaderError struct { + ParamName string + Err error +} + +func (e *RequiredHeaderError) Error() string { + return fmt.Sprintf("Header parameter %s is required, but not found", e.ParamName) +} + +func (e *RequiredHeaderError) Unwrap() error { + return e.Err +} + +type InvalidParamFormatError struct { + ParamName string + Err error +} + +func (e *InvalidParamFormatError) Error() string { + return fmt.Sprintf("Invalid format for parameter %s: %s", e.ParamName, e.Err.Error()) +} + +func (e *InvalidParamFormatError) Unwrap() error { + return e.Err +} + +type TooManyValuesForParamError struct { + ParamName string + Count int +} + +func (e *TooManyValuesForParamError) Error() string { + return fmt.Sprintf("Expected one value for %s, got %d", e.ParamName, e.Count) +} + +// Handler creates http.Handler with routing matching OpenAPI spec. +func Handler(si ServerInterface) http.Handler { + return HandlerWithOptions(si, ChiServerOptions{}) +} + +type ChiServerOptions struct { + BaseURL string + BaseRouter chi.Router + Middlewares []MiddlewareFunc + ErrorHandlerFunc func(w http.ResponseWriter, r *http.Request, err error) +} + +// HandlerFromMux creates http.Handler with routing matching OpenAPI spec based on the provided mux. +func HandlerFromMux(si ServerInterface, r chi.Router) http.Handler { + return HandlerWithOptions(si, ChiServerOptions{ + BaseRouter: r, + }) +} + +func HandlerFromMuxWithBaseURL(si ServerInterface, r chi.Router, baseURL string) http.Handler { + return HandlerWithOptions(si, ChiServerOptions{ + BaseURL: baseURL, + BaseRouter: r, + }) +} + +// HandlerWithOptions creates http.Handler with additional options +func HandlerWithOptions(si ServerInterface, options ChiServerOptions) http.Handler { + r := options.BaseRouter + + if r == nil { + r = chi.NewRouter() + } + if options.ErrorHandlerFunc == nil { + options.ErrorHandlerFunc = func(w http.ResponseWriter, r *http.Request, err error) { + http.Error(w, err.Error(), http.StatusBadRequest) + } + } + wrapper := ServerInterfaceWrapper{ + Handler: si, + HandlerMiddlewares: options.Middlewares, + ErrorHandlerFunc: options.ErrorHandlerFunc, + } + + r.Group(func(r chi.Router) { + r.Get(options.BaseURL+"/api-docs", wrapper.GetApiDocs) + }) + r.Group(func(r chi.Router) { + r.Get(options.BaseURL+"/status", wrapper.GetStatus) + }) + + return r +} + +// Base64 encoded, gzipped, json marshaled Swagger object +var swaggerSpec = []string{ + + "H4sIAAAAAAAC/4yRwWrDMAyGX8XonKXpdvOt0DFKBx3rYIfSg2MrrVhiC8sJlJB3H05hjA5GT0bw6/P3", + "oxFs6Dh49ElAj1MB5JsAegSHYiNxouBBwxq5DZcOfVKfIX5hVDtGr96f9x9q9bZRwmipIWvmfAGJUot3", + "7O1v9gaMcv1xWVZlBVMBgdEbJtDwVFblEgpgk87ZFhaG6cEFOw8nTPkJjHGmbRxoeMG0YlrnSAERhYMX", + "nOOPVfW35m4L01SA9F1n4gU0vJIkFZrsKopjGMihU/VFpTMqwTiQxVzXnAT0AbivW7JwzJCFJJP6f9X2", + "18Q9ZtJbiyJN36ofzI3rtq8xekwoKqJx5FFEGe9USwPOA8dQo0LvOJBPv70jDSZhFs9IjPkOoA8j9LEF", + "DQuYjtN3AAAA//+ogzv3LAIAAA==", +} + +// GetSwagger returns the content of the embedded swagger specification file +// or error if failed to decode +func decodeSpec() ([]byte, error) { + zipped, err := base64.StdEncoding.DecodeString(strings.Join(swaggerSpec, "")) + if err != nil { + return nil, fmt.Errorf("error base64 decoding spec: %s", err) + } + zr, err := gzip.NewReader(bytes.NewReader(zipped)) + if err != nil { + return nil, fmt.Errorf("error decompressing spec: %s", err) + } + var buf bytes.Buffer + _, err = buf.ReadFrom(zr) + if err != nil { + return nil, fmt.Errorf("error decompressing spec: %s", err) + } + + return buf.Bytes(), nil +} + +var rawSpec = decodeSpecCached() + +// a naive cached of a decoded swagger spec +func decodeSpecCached() func() ([]byte, error) { + data, err := decodeSpec() + return func() ([]byte, error) { + return data, err + } +} + +// Constructs a synthetic filesystem for resolving external references when loading openapi specifications. +func PathToRawSpec(pathToFile string) map[string]func() ([]byte, error) { + var res = make(map[string]func() ([]byte, error)) + if len(pathToFile) > 0 { + res[pathToFile] = rawSpec + } + + return res +} + +// GetSwagger returns the Swagger specification corresponding to the generated code +// in this file. The external references of Swagger specification are resolved. +// The logic of resolving external references is tightly connected to "import-mapping" feature. +// Externally referenced files must be embedded in the corresponding golang packages. +// Urls can be supported but this task was out of the scope. +func GetSwagger() (swagger *openapi3.T, err error) { + var resolvePath = PathToRawSpec("") + + loader := openapi3.NewLoader() + loader.IsExternalRefsAllowed = true + loader.ReadFromURIFunc = func(loader *openapi3.Loader, url *url.URL) ([]byte, error) { + var pathToFile = url.String() + pathToFile = path.Clean(pathToFile) + getSpec, ok := resolvePath[pathToFile] + if !ok { + err1 := fmt.Errorf("path not found: %s", pathToFile) + return nil, err1 + } + return getSpec() + } + var specData []byte + specData, err = rawSpec() + if err != nil { + return + } + swagger, err = loader.LoadFromData(specData) + if err != nil { + return + } + return +} diff --git a/integrator/climon/cfg.yaml b/integrator/climon/cfg.yaml new file mode 100644 index 00000000..59703c29 --- /dev/null +++ b/integrator/climon/cfg.yaml @@ -0,0 +1,6 @@ +package: api +generate: + chi-server: true + models: true + embedded-spec: true +output: climon/api/climon.gen.go diff --git a/integrator/deployment-worker/env.sh b/integrator/climon/env.sh similarity index 100% rename from integrator/deployment-worker/env.sh rename to integrator/climon/env.sh diff --git a/integrator/climon/integration_tests/main_test.go b/integrator/climon/integration_tests/main_test.go new file mode 100644 index 00000000..90eb1093 --- /dev/null +++ b/integrator/climon/integration_tests/main_test.go @@ -0,0 +1,167 @@ +package integrationtests + +import ( + "context" + "encoding/json" + "os" + "testing" + "time" + + "github.com/kube-tarian/kad/integrator/climon/pkg/application" + "github.com/kube-tarian/kad/integrator/climon/pkg/workflows" + "github.com/kube-tarian/kad/integrator/common-pkg/logging" + "github.com/kube-tarian/kad/integrator/model" + "go.temporal.io/sdk/client" +) + +var logger = logging.NewLogger() + +func TestMain(m *testing.M) { + m.Run() +} + +func TestIntegrationArgocdDeploymentEvent(t *testing.T) { + testData := setup() + + stop := startMain() + + data := &model.Request{ + RepoName: "argocd-example", + RepoURL: "https://gitlab.privatecloud.sk/vladoportos/argo-cd-example.git", + ChartName: "hello-world", + Namespace: "default", + ReleaseName: "hello-world", + Timeout: 5, + } + dataJSON, err := json.Marshal(data) + if err != nil { + t.Errorf("Data marshalling failed, %v", err) + } + + sendDeploymentEvent(t, "argocd", dataJSON, "install") + logger.Info("Sleeping now for 5 seconds") + time.Sleep(5 * time.Second) + + logger.Info("Starting teardown") + tearDown(testData) + stop <- true +} + +func TestIntegrationArgocdDeleteEvent(t *testing.T) { + testData := setup() + + stop := startMain() + + data := &model.Request{ + RepoName: "argocd-example", + RepoURL: "https://gitlab.privatecloud.sk/vladoportos/argo-cd-example.git", + ChartName: "hello-world", + Namespace: "default", + ReleaseName: "hello-world", + Timeout: 5, + } + dataJSON, err := json.Marshal(data) + if err != nil { + t.Errorf("Data marshalling failed, %v", err) + } + + sendDeploymentEvent(t, "argocd", dataJSON, "delete") + logger.Info("Sleeping now for 5 seconds") + time.Sleep(5 * time.Second) + + logger.Info("Starting teardown") + tearDown(testData) + stop <- true +} + +func TestIntegrationHelmDeploymentEvent(t *testing.T) { + testData := setup() + + stop := startMain() + + data := &model.Request{ + RepoName: "argo", + RepoURL: "https://argoproj.github.io/argo-helm", + ChartName: "argo-cd", + Namespace: "default", + ReleaseName: "argocd", + Timeout: 5, + } + dataJSON, err := json.Marshal(data) + if err != nil { + t.Errorf("Data marshalling failed, %v", err) + } + + sendDeploymentEvent(t, "helm", dataJSON, "install") + logger.Info("Sleeping now for 5 seconds") + time.Sleep(5 * time.Second) + + logger.Info("Starting teardown") + tearDown(testData) + stop <- true +} + +func TestIntegrationHelmDeleteEvent(t *testing.T) { + testData := setup() + + stop := startMain() + + data := &model.Request{ + RepoName: "argo", + RepoURL: "https://argoproj.github.io/argo-helm", + ChartName: "argo-cd", + Namespace: "default", + ReleaseName: "argocd", + Timeout: 5, + } + dataJSON, err := json.Marshal(data) + if err != nil { + t.Errorf("Data marshalling failed, %v", err) + } + + sendDeploymentEvent(t, "helm", dataJSON, "delete") + logger.Info("Sleeping now for 5 seconds") + time.Sleep(5 * time.Second) + + logger.Info("Starting teardown") + tearDown(testData) + stop <- true +} + +func sendDeploymentEvent(t *testing.T, pluginName string, dataJSON json.RawMessage, action string) { + // The client is a heavyweight object that should be created once per process. + temporalAddress := os.Getenv("TEMPORAL_SERVICE_URL") + if len(temporalAddress) == 0 { + temporalAddress = "127.0.0.1:7233" + } + c, err := client.Dial(client.Options{ + HostPort: temporalAddress, + Logger: logger, + }) + if err != nil { + t.Errorf("Unable to create client, %v", err) + } + defer c.Close() + + workflowOptions := client.StartWorkflowOptions{ + ID: "deployment_worker_workflow", + TaskQueue: application.WorkflowTaskQueueName, + } + + p := []model.RequestPayload{{PluginName: pluginName, Action: action, Data: dataJSON}} + payload, _ := json.Marshal(p) + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, workflows.Workflow, payload) + if err != nil { + t.Errorf("Unable to execute workflow, %v", err) + } + + logger.Infof("Started workflow, WorkflowID: %v RunID: %v", we.GetID(), we.GetRunID()) + + // Synchronously wait for the workflow completion. + var result model.ResponsePayload + err = we.Get(context.Background(), &result) + if err != nil { + t.Errorf("Unable get workflow result, %v", err) + } + logger.Infof("Workflow result: %+v", result.ToString()) +} diff --git a/integrator/climon/integration_tests/setup_test.go b/integrator/climon/integration_tests/setup_test.go new file mode 100644 index 00000000..3d4bce5d --- /dev/null +++ b/integrator/climon/integration_tests/setup_test.go @@ -0,0 +1,101 @@ +package integrationtests + +import ( + "bytes" + "fmt" + "net/http" + "os" + "time" + + "github.com/kelseyhightower/envconfig" + "github.com/kube-tarian/kad/integrator/climon/pkg/application" + "github.com/kube-tarian/kad/integrator/climon/pkg/db/cassandra" + "github.com/kube-tarian/kad/integrator/common-pkg/logging" +) + +type TestContextData struct { +} + +func setupENV() { +} + +func setup() *TestContextData { + setupENV() + cfg := &application.Configuration{} + if err := envconfig.Process("", cfg); err != nil { + logger.Fatalf("Could not parse env Config: %v", err) + } + + return &TestContextData{} +} + +func tearDown(t *TestContextData) { +} + +func startMain() chan bool { + stopCh := make(chan bool) + + // Start agent and client + go startApplication(stopCh) + time.Sleep(2 * time.Second) + + // Wait till Agent and Client healthy + isApplicationHealthy := false + for { + select { + // wait till 1min, after that exit 1 + case <-time.After(1 * time.Minute): + logger.Fatalf("Deployment worker application not healthy") + case <-time.After(2 * time.Second): + // Check Agent health + isApplicationHealthy = getHealth(http.MethodGet, "http://localhost:9080", "status", "agent") + } + if isApplicationHealthy { + break + } + } + return stopCh +} + +func getHealth(method, url, path, serviceName string) bool { + resp, err := callHTTPRequest(method, url, path, nil) + if err != nil { + logger.Errorf("%v health check call failed: %v", serviceName, err) + return false + } + + return checkResponse(resp, http.StatusOK) +} + +func checkResponse(resp *http.Response, statusCode int) bool { + return resp.StatusCode == statusCode +} + +func startApplication(stop chan bool) { + os.Setenv("PORT", "9080") + log := logging.NewLogger() + db, err := cassandra.Create(logger) + if err != nil { + logger.Fatalf("failed to create db connection", err) + } + + app := application.New(log, db) + go app.Start() + + <-stop +} + +func callHTTPRequest(method, url, path string, body []byte) (*http.Response, error) { + finalURL := fmt.Sprintf("%s/%s", url, path) + var req *http.Request + if body != nil { + req, _ = http.NewRequest(method, finalURL, bytes.NewBuffer(body)) + } else { + req, _ = http.NewRequest(method, finalURL, nil) + } + client := http.Client{ + Timeout: 5 * time.Second, + } + + return client.Do(req) +} diff --git a/integrator/climon/main.go b/integrator/climon/main.go new file mode 100644 index 00000000..588b3853 --- /dev/null +++ b/integrator/climon/main.go @@ -0,0 +1,32 @@ +package main + +import ( + "os" + "os/signal" + "syscall" + + "github.com/kube-tarian/kad/integrator/climon/pkg/application" + "github.com/kube-tarian/kad/integrator/climon/pkg/db/cassandra" + "github.com/kube-tarian/kad/integrator/common-pkg/logging" +) + +func main() { + logger := logging.NewLogger() + logger.Infof("Started deployment worker\n") + + db, err := cassandra.Create(logger) + if err != nil { + logger.Fatalf("failed to create db connection", err) + } + + app := application.New(logger, db) + go app.Start() + + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) + <-signals + + app.Close() + db.Close() + logger.Infof("Exiting deployment worker\n") +} diff --git a/integrator/climon/openapi.yaml b/integrator/climon/openapi.yaml new file mode 100644 index 00000000..9ca32e95 --- /dev/null +++ b/integrator/climon/openapi.yaml @@ -0,0 +1,28 @@ +openapi: "3.0.1" + +info: + title: Deployment Worker Open REST API Specification + description: Deployment Worker Open REST API specification + version: 1.0.0 + +servers: + - url: / + +paths: + /status: + get: + tags: + - private + summary: Kubernetes readiness and liveness probe endpoint + responses: + '200': + description: successful operation + + /api-docs: + get: + tags: + - public + summary: List of APIs provided by the service + responses: + '200': + description: OK diff --git a/integrator/climon/pkg/activities/activity.go b/integrator/climon/pkg/activities/activity.go new file mode 100644 index 00000000..fe38a9b2 --- /dev/null +++ b/integrator/climon/pkg/activities/activity.go @@ -0,0 +1,120 @@ +package activities + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/pkg/errors" + + "github.com/kube-tarian/kad/integrator/climon/pkg/db/cassandra" + "github.com/kube-tarian/kad/integrator/common-pkg/logging" + "github.com/kube-tarian/kad/integrator/common-pkg/plugins" + workerframework "github.com/kube-tarian/kad/integrator/common-pkg/worker-framework" + "github.com/kube-tarian/kad/integrator/model" +) + +type Activities struct { +} + +func (a *Activities) DeploymentActivity(ctx context.Context, req model.RequestPayload) (model.ResponsePayload, error) { + logger := logging.NewLogger() + logger.Infof("Activity, name: %+v", req.ToString()) + // e := activity.GetInfo(ctx) + // logger.Infof("activity info: %+v", e) + + plugin, err := plugins.GetPlugin(req.PluginName, logger) + if err != nil { + logger.Errorf("Get plugin failed: %v", err) + return model.ResponsePayload{ + Status: "Failed", + Message: json.RawMessage(fmt.Sprintf("{\"error\": \"%v\"}", strings.ReplaceAll(err.Error(), "\"", "\\\""))), + }, err + } + deployerPlugin, ok := plugin.(workerframework.DeploymentWorker) + if !ok { + return model.ResponsePayload{ + Status: "Failed", + Message: json.RawMessage(fmt.Sprintf("{\"error\": \"%v\"}", strings.ReplaceAll(err.Error(), "\"", "\\\""))), + }, fmt.Errorf("plugin not supports deployment activities") + } + + msg, err := deployerPlugin.DeployActivities(req) + if err != nil { + logger.Errorf("Deploy activities failed %s: %v", req.Action, err) + return model.ResponsePayload{ + Status: "Failed", + Message: json.RawMessage(fmt.Sprintf("{\"error\": \"%v\"}", strings.ReplaceAll(err.Error(), "\"", "\\\""))), + }, err + } + + if req.Action == "install" || req.Action == "update" { + if err := InsertToDb(logger, req.Data); err != nil { + logger.Errorf("insert db failed, %v", err) + return model.ResponsePayload{ + Status: "Failed", + Message: json.RawMessage(fmt.Sprintf("database update failed %v", err)), + }, err + } + } else if req.Action == "delete" { + if err := DeleteDbEntry(logger, req.Data); err != nil { + logger.Errorf("delete plugin failed, %v", err) + return model.ResponsePayload{ + Status: "Failed", + Message: json.RawMessage(fmt.Sprintf("database update failed %v", err)), + }, err + } + } + + return model.ResponsePayload{ + Status: "Success", + Message: msg, + }, nil +} + +func InsertToDb(logger logging.Logger, reqData json.RawMessage) error { + data := &model.Request{} + if err := json.Unmarshal(reqData, data); err != nil { + return errors.Wrap(err, "failed to store data in database") + } + + dbConf, err := cassandra.GetDbConfig() + if err != nil { + return errors.Wrap(err, "failed to store data in database") + } + + db, err := cassandra.NewCassandraStore(logger, dbConf.DbAddresses, dbConf.DbAdminUsername, dbConf.DbAdminPassword) + if err != nil { + return errors.Wrap(err, "failed to store data in database") + } + + if err := db.InsertToolsDb(data); err != nil { + return errors.Wrap(err, "failed to store data in database") + } + + return nil +} + +func DeleteDbEntry(logger logging.Logger, reqData json.RawMessage) error { + data := &model.Request{} + if err := json.Unmarshal(reqData, data); err != nil { + return errors.Wrap(err, "failed to delete data in database") + } + + dbConf, err := cassandra.GetDbConfig() + if err != nil { + return errors.Wrap(err, "failed to delete data in database") + } + + db, err := cassandra.NewCassandraStore(logger, dbConf.DbAddresses, dbConf.DbAdminUsername, dbConf.DbAdminPassword) + if err != nil { + return errors.Wrap(err, "failed to delete data in database") + } + + if err := db.DeleteToolsDbEntry(data); err != nil { + return errors.Wrap(err, "failed to delete data in database") + } + + return nil +} diff --git a/integrator/climon/pkg/activities/activity_test.go b/integrator/climon/pkg/activities/activity_test.go new file mode 100644 index 00000000..06dc7afe --- /dev/null +++ b/integrator/climon/pkg/activities/activity_test.go @@ -0,0 +1,23 @@ +package activities + +import ( + "testing" + + "github.com/kube-tarian/kad/integrator/model" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/testsuite" +) + +func Test_Activity(t *testing.T) { + a := &Activities{} + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestActivityEnvironment() + env.RegisterActivity(a) + + _, err := env.ExecuteActivity(a.DeploymentActivity, model.RequestPayload{Action: "World"}) + require.Error(t, err) + + // var res model.ResponsePayload + // require.NoError(t, val.Get(&res)) + // require.Equal(t, "Success", res.Status) +} diff --git a/integrator/climon/pkg/application/application.go b/integrator/climon/pkg/application/application.go new file mode 100755 index 00000000..d934c48f --- /dev/null +++ b/integrator/climon/pkg/application/application.go @@ -0,0 +1,97 @@ +package application + +import ( + "context" + "errors" + "fmt" + "net/http" + + "github.com/go-chi/chi/v5" + + "github.com/kelseyhightower/envconfig" + "github.com/kube-tarian/kad/integrator/climon/pkg/activities" + "github.com/kube-tarian/kad/integrator/climon/pkg/db/cassandra" + "github.com/kube-tarian/kad/integrator/climon/pkg/handler" + "github.com/kube-tarian/kad/integrator/climon/pkg/workflows" + "github.com/kube-tarian/kad/integrator/common-pkg/logging" + workerframework "github.com/kube-tarian/kad/integrator/common-pkg/worker-framework" +) + +const ( + WorkflowTaskQueueName = "CLIMON_HELM_TASK_QUEUE" + HelmPluginName = "helm" +) + +type Configuration struct { + Port int `envconfig:"PORT" default:"9080"` +} + +type Application struct { + conf *Configuration + apiServer *handler.APIHandler + httpServer *http.Server + worker *workerframework.Worker + logger logging.Logger + Db cassandra.Store +} + +func New(logger logging.Logger, db cassandra.Store) *Application { + cfg := &Configuration{} + if err := envconfig.Process("", cfg); err != nil { + logger.Fatalf("Could not parse env Config: %v\n", err) + } + + worker, err := workerframework.NewWorker(WorkflowTaskQueueName, workflows.Workflow, &activities.Activities{}, logger) + if err != nil { + logger.Fatalf("Worker initialization failed, Reason: %v\n", err) + } + + apiServer, err := handler.NewAPIHandler(worker) + if err != nil { + logger.Fatalf("API Handler initialisation failed: %v\n", err) + } + + mux := chi.NewMux() + apiServer.BindRequest(mux) + + httpServer := &http.Server{ + Addr: fmt.Sprintf("0.0.0.0:%d", cfg.Port), + Handler: mux, + } + + return &Application{ + conf: cfg, + apiServer: apiServer, + httpServer: httpServer, + worker: worker, + logger: logger, + Db: db, + } +} + +func (app *Application) Start() { + app.logger.Infof("Starting worker\n") + go func() { + err := app.worker.Run() + if err != nil { + app.logger.Errorf("Worker stopped listening on temporal, exiting. Readon: %v\n", err) + app.Close() + } + }() + + app.logger.Infof("Starting server at %v", app.httpServer.Addr) + var err error + if err = app.httpServer.ListenAndServe(); err != nil && errors.Is(err, http.ErrServerClosed) { + app.logger.Fatalf("Unexpected server close: %v", err) + } + app.logger.Fatalf("Server closed") +} + +func (app *Application) Close() { + app.logger.Infof("Closing the service gracefully") + app.worker.Close() + + if err := app.httpServer.Shutdown(context.Background()); err != nil { + app.logger.Errorf("Could not close the service gracefully: %v", err) + } +} diff --git a/integrator/deployment-worker/pkg/db/cassandra/README.md b/integrator/climon/pkg/db/cassandra/README.md similarity index 100% rename from integrator/deployment-worker/pkg/db/cassandra/README.md rename to integrator/climon/pkg/db/cassandra/README.md diff --git a/integrator/deployment-worker/pkg/db/cassandra/cassandra.yaml b/integrator/climon/pkg/db/cassandra/cassandra.yaml similarity index 100% rename from integrator/deployment-worker/pkg/db/cassandra/cassandra.yaml rename to integrator/climon/pkg/db/cassandra/cassandra.yaml diff --git a/integrator/deployment-worker/pkg/db/cassandra/cassandra_store.go b/integrator/climon/pkg/db/cassandra/cassandra_store.go similarity index 99% rename from integrator/deployment-worker/pkg/db/cassandra/cassandra_store.go rename to integrator/climon/pkg/db/cassandra/cassandra_store.go index fab7b99f..d3e0bc99 100644 --- a/integrator/deployment-worker/pkg/db/cassandra/cassandra_store.go +++ b/integrator/climon/pkg/db/cassandra/cassandra_store.go @@ -4,11 +4,12 @@ package cassandra import ( "errors" "fmt" - "github.com/kube-tarian/kad/integrator/model" "strings" "sync" "time" + "github.com/kube-tarian/kad/integrator/model" + "github.com/gocql/gocql" "github.com/kube-tarian/kad/integrator/common-pkg/logging" ) diff --git a/integrator/deployment-worker/pkg/db/cassandra/db_config.go b/integrator/climon/pkg/db/cassandra/db_config.go similarity index 56% rename from integrator/deployment-worker/pkg/db/cassandra/db_config.go rename to integrator/climon/pkg/db/cassandra/db_config.go index 0d6d1658..802fa70d 100644 --- a/integrator/deployment-worker/pkg/db/cassandra/db_config.go +++ b/integrator/climon/pkg/db/cassandra/db_config.go @@ -4,13 +4,13 @@ package cassandra import "github.com/kube-tarian/kad/integrator/model" type DBConfig struct { - DbAddresses []string `envconfig:"DB_ADDRESSES" required:"true"` - DbAdminUsername string `envconfig:"DB_ADMIN_USERNAME" required:"true"` - DbServiceUsername string `envconfig:"DB_SERVICE_USERNAME" required:"false"` - DbName string `envconfig:"CASSANDRA_DB_NAME" required:"true"` - DbReplicationFactor string `envconfig:"DB_REPLICATION_FACTOR" required:"true"` - DbAdminPassword string `envconfig:"DB_ADMIN_PASSWD" required:"true"` - DbServicePassword string `envconfig:"DB_SERVICE_PASSWD" required:"false"` + DbAddresses []string `envconfig:"CASSANDRA_SERVICE_URL" required:"true"` + DbAdminUsername string `envconfig:"CASSANDRA_USERNAME" required:"true"` + DbServiceUsername string `envconfig:"DB_SERVICE_USERNAME" default:"user"` + DbName string `envconfig:"CASSANDRA_KEYSPACE_NAME" required:"true"` + DbReplicationFactor string `envconfig:"DB_REPLICATION_FACTOR" default:"1"` + DbAdminPassword string `envconfig:"CASSANDRA_PASSWORD" required:"true"` + DbServicePassword string `envconfig:"DB_SERVICE_PASSWD" default:"password"` } type Store interface { diff --git a/integrator/deployment-worker/pkg/db/cassandra/db_configurator.go b/integrator/climon/pkg/db/cassandra/db_configurator.go similarity index 100% rename from integrator/deployment-worker/pkg/db/cassandra/db_configurator.go rename to integrator/climon/pkg/db/cassandra/db_configurator.go diff --git a/integrator/deployment-worker/pkg/db/cassandra/db_creation.go b/integrator/climon/pkg/db/cassandra/db_creation.go similarity index 100% rename from integrator/deployment-worker/pkg/db/cassandra/db_creation.go rename to integrator/climon/pkg/db/cassandra/db_creation.go diff --git a/integrator/deployment-worker/pkg/db/db.go b/integrator/climon/pkg/db/db.go similarity index 100% rename from integrator/deployment-worker/pkg/db/db.go rename to integrator/climon/pkg/db/db.go diff --git a/integrator/climon/pkg/handler/api_handler.go b/integrator/climon/pkg/handler/api_handler.go new file mode 100755 index 00000000..8bc60d11 --- /dev/null +++ b/integrator/climon/pkg/handler/api_handler.go @@ -0,0 +1,45 @@ +package handler + +import ( + "encoding/json" + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/kube-tarian/kad/integrator/climon/api" + workerframework "github.com/kube-tarian/kad/integrator/common-pkg/worker-framework" +) + +type APIHandler struct { + worker *workerframework.Worker +} + +const ( + appJSONContentType = "application/json" + contentType = "Content-Type" +) + +func NewAPIHandler(worker *workerframework.Worker) (*APIHandler, error) { + return &APIHandler{ + worker: worker, + }, nil +} + +func (ah *APIHandler) BindRequest(mux *chi.Mux) { + mux.Route("/", func(r chi.Router) { + api.HandlerFromMux(ah, r) + }) +} + +func (ah *APIHandler) GetApiDocs(w http.ResponseWriter, r *http.Request) { + swagger, err := api.GetSwagger() + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + } + w.Header().Set(contentType, appJSONContentType) + _ = json.NewEncoder(w).Encode(swagger) +} + +func (ah *APIHandler) GetStatus(w http.ResponseWriter, r *http.Request) { + w.Header().Set(contentType, appJSONContentType) + w.WriteHeader(http.StatusOK) +} diff --git a/integrator/climon/pkg/workflows/workflow.go b/integrator/climon/pkg/workflows/workflow.go new file mode 100644 index 00000000..d175b1e3 --- /dev/null +++ b/integrator/climon/pkg/workflows/workflow.go @@ -0,0 +1,43 @@ +package workflows + +import ( + "encoding/json" + "time" + + "github.com/kube-tarian/kad/integrator/climon/pkg/activities" + "github.com/kube-tarian/kad/integrator/common-pkg/logging" + "github.com/kube-tarian/kad/integrator/model" + "go.temporal.io/sdk/workflow" +) + +// Workflow is a deployment workflow definition. +func Workflow(ctx workflow.Context, payload json.RawMessage) (model.ResponsePayload, error) { + var result model.ResponsePayload + logger := logging.NewLogger() + + logger.Infof("Deployment workflow started, req: %+v", string(payload)) + req := []model.RequestPayload{} + err := json.Unmarshal(payload, &req) + if err != nil { + logger.Errorf("Deployer worker payload unmarshall failed, Error: %v", err) + return result, err + } + + ao := workflow.ActivityOptions{ + ScheduleToCloseTimeout: 600 * time.Second, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + execution := workflow.GetInfo(ctx).WorkflowExecution + logger.Infof("execution: %+v\n", execution) + + var a *activities.Activities + err = workflow.ExecuteActivity(ctx, a.DeploymentActivity, req[0]).Get(ctx, &result) + if err != nil { + logger.Errorf("Activity failed, Error: %v", err) + return result, err + } + + logger.Infof("Deployment workflow completed., result: %s", (&result).ToString()) + return result, nil +} diff --git a/integrator/climon/pkg/workflows/workflow_test.go b/integrator/climon/pkg/workflows/workflow_test.go new file mode 100644 index 00000000..6ca8e5b9 --- /dev/null +++ b/integrator/climon/pkg/workflows/workflow_test.go @@ -0,0 +1,41 @@ +package workflows + +import ( + "testing" + + "github.com/kube-tarian/kad/integrator/climon/pkg/activities" + "github.com/kube-tarian/kad/integrator/model" + "go.temporal.io/sdk/worker" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "go.temporal.io/sdk/testsuite" +) + +type UnitTestSuite struct { + suite.Suite + testsuite.WorkflowTestSuite +} + +func TestUnitTestSuite(t *testing.T) { + suite.Run(t, new(UnitTestSuite)) +} + +func (s *UnitTestSuite) Test_Workflow() { + env := s.NewTestWorkflowEnvironment() + env.SetWorkerOptions(worker.Options{ + EnableSessionWorker: true, // Important for a worker to participate in the session + }) + var a *activities.Activities + + env.OnActivity(a.DeploymentActivity, mock.Anything, model.RequestPayload{Action: "file1"}).Return(model.ResponsePayload{Status: "file2"}, nil) + + env.RegisterActivity(a) + + env.ExecuteWorkflow(Workflow, model.RequestPayload{Action: "file1"}) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + + env.AssertExpectations(s.T()) +} diff --git a/integrator/common-pkg/db-create/cassandra/cassandra,yaml b/integrator/common-pkg/db-create/cassandra/cassandra,yaml deleted file mode 100644 index 3e3148b4..00000000 --- a/integrator/common-pkg/db-create/cassandra/cassandra,yaml +++ /dev/null @@ -1,2 +0,0 @@ -# enable password authentication! -authenticator: PasswordAuthenticator \ No newline at end of file diff --git a/integrator/common-pkg/db-create/cassandra/cassandra_store.go b/integrator/common-pkg/db-create/cassandra/cassandra_store.go index f3a3c0ba..608cae61 100644 --- a/integrator/common-pkg/db-create/cassandra/cassandra_store.go +++ b/integrator/common-pkg/db-create/cassandra/cassandra_store.go @@ -4,6 +4,7 @@ package cassandra import ( "errors" "fmt" + "strings" "time" "github.com/gocql/gocql" @@ -55,6 +56,41 @@ func (c *CassandraStore) Connect(dbAddrs []string, dbAdminUsername string, dbAdm func (c *CassandraStore) Close() { c.session.Close() } +func (c *CassandraStore) CreateDbUser(serviceUsername string, servicePassword string) (err error) { + // Create database user for service usage + err = c.session.Query(fmt.Sprintf(createUser, serviceUsername, servicePassword)).Exec() + if err != nil { + if strings.Contains(err.Error(), "already exists") { + return c.updateDbUser(serviceUsername, servicePassword) + } else { + c.logg.Error("Unable to create service user", err) + return + } + } + return +} + +func (c *CassandraStore) GrantPermission(serviceUsername string, dbName string) (err error) { + err = c.session.Query(fmt.Sprintf(grantSchemaChangeLockSelectPermission, serviceUsername)).Exec() + if err != nil { + c.logg.Error("Unable to grant select permission to service user on schema_change.lock table", err) + return + } + + err = c.session.Query(fmt.Sprintf(grantSchemaChangeLockModifyPermission, serviceUsername)).Exec() + if err != nil { + c.logg.Error("Unable to grant modify permission to service user on schema_change.lock table", err) + return + } + + err = c.session.Query(fmt.Sprintf(grantPermission, dbName, serviceUsername)).Exec() + if err != nil { + c.logg.Error("Unable to grant permission to service user", err) + return + } + + return +} func (c *CassandraStore) CreateDb(dbName string, replicationFactor string) (err error) { // Create keyspace only if it does not already exist @@ -91,7 +127,7 @@ func (c *CassandraStore) CreateLockSchemaDb(replicationFactor string) (err error return } -func configureClusterConfig(addrs []string, adminUsername string, adminPassword string) (cluster *gocql.ClusterConfig, err error) { +var configureClusterConfig = func(addrs []string, adminUsername string, adminPassword string) (cluster *gocql.ClusterConfig, err error) { if len(addrs) == 0 { err = errors.New("you must specify a Cassandra address to connect to") return @@ -112,7 +148,7 @@ func configureClusterConfig(addrs []string, adminUsername string, adminPassword return } -func createDbSession(cluster *gocql.ClusterConfig) (session *gocql.Session, err error) { +var createDbSession = func(cluster *gocql.ClusterConfig) (session *gocql.Session, err error) { session, err = cluster.CreateSession() if err != nil { return nil, err diff --git a/integrator/common-pkg/plugins/fetcher/cluster_store.go b/integrator/common-pkg/plugins/fetcher/cluster_store.go new file mode 100644 index 00000000..7b096e29 --- /dev/null +++ b/integrator/common-pkg/plugins/fetcher/cluster_store.go @@ -0,0 +1,46 @@ +package fetcher + +import ( + "github.com/kelseyhightower/envconfig" + "github.com/kube-tarian/kad/integrator/common-pkg/logging" + "github.com/kube-tarian/kad/integrator/common-pkg/plugins/utils" +) + +const ( + FetchClusterQuery = `select name, kubeconfig from clusters where name = ?;` +) + +type ClusterStoreConfiguration struct { + TableName string `envconfig:"CASSANDRA_CLUSTER_TABLE_NAME" default:"clusters"` +} + +func FetchClusterDetails(log logging.Logger, clusterName string) (*ClusterDetails, error) { + cfg := &ClusterStoreConfiguration{} + err := envconfig.Process("", cfg) + if err != nil { + log.Errorf("Cassandra configuration detail missing, %v", err) + return nil, err + } + + // Fetch the plugin details from Cassandra + store, err := utils.NewStore(log) + if err != nil { + log.Errorf("Store initialization failed, %v", err) + return nil, err + } + defer store.Close() + + pd := &ClusterDetails{} + // name, kubeconfig + query := store.GetSession().Query(FetchClusterQuery, clusterName) + err = query.Scan( + &pd.Name, + &pd.Kubeconfig, + ) + + if err != nil { + log.Errorf("Fetch plugin details failed, %v", err) + return nil, err + } + return pd, nil +} diff --git a/integrator/common-pkg/plugins/fetcher/fetcher.go b/integrator/common-pkg/plugins/fetcher/fetcher.go index 937b111e..8bda02d1 100644 --- a/integrator/common-pkg/plugins/fetcher/fetcher.go +++ b/integrator/common-pkg/plugins/fetcher/fetcher.go @@ -33,14 +33,7 @@ func NewCredentialFetcher(log logging.Logger) (*CredentialFetcher, error) { func (c *CredentialFetcher) FetchPluginDetails(req *PluginRequest) (*PluginResponse, error) { // Fetch the plugin details from Cassandra - store, err := NewStore(c.log) - if err != nil { - c.log.Errorf("Store initialization failed, %v", err) - return nil, err - } - defer store.Close() - - pluginDetails, err := store.FetchPluginDetails(req.PluginName) + pluginDetails, err := FetchPluginDetails(c.log, req.PluginName) if err != nil { c.log.Errorf("Failed to fetch plugin details from store, %v", err) return nil, err diff --git a/integrator/common-pkg/plugins/fetcher/model.go b/integrator/common-pkg/plugins/fetcher/model.go index 4213c275..263433a0 100644 --- a/integrator/common-pkg/plugins/fetcher/model.go +++ b/integrator/common-pkg/plugins/fetcher/model.go @@ -20,3 +20,8 @@ type PluginDetails struct { ReleaseName string Version string } + +type ClusterDetails struct { + Name string + Kubeconfig string +} diff --git a/integrator/common-pkg/plugins/fetcher/plugin_store.go b/integrator/common-pkg/plugins/fetcher/plugin_store.go new file mode 100644 index 00000000..981c18f9 --- /dev/null +++ b/integrator/common-pkg/plugins/fetcher/plugin_store.go @@ -0,0 +1,50 @@ +package fetcher + +import ( + "github.com/kelseyhightower/envconfig" + "github.com/kube-tarian/kad/integrator/common-pkg/logging" + "github.com/kube-tarian/kad/integrator/common-pkg/plugins/utils" +) + +const ( + FetchPluginQuery = `select name, repo_name, repo_url, chart_name, namespace, release_name, version from tools where name = ?;` +) + +type PluginConfiguration struct { + TableName string `envconfig:"CASSANDRA_TABLE_NAME" default:"tools"` +} + +func FetchPluginDetails(log logging.Logger, pluginName string) (*PluginDetails, error) { + cfg := &PluginConfiguration{} + err := envconfig.Process("", cfg) + if err != nil { + log.Errorf("Cassandra configuration detail missing, %v", err) + return nil, err + } + + // Fetch the plugin details from Cassandra + store, err := utils.NewStore(log) + if err != nil { + log.Errorf("Store initialization failed, %v", err) + return nil, err + } + defer store.Close() + + pd := &PluginDetails{} + // name, repo_name, repo_url, chart_name, namespace, release_name, version + err = store.GetSession().Query(FetchPluginQuery, pluginName).Scan( + &pd.Name, + &pd.RepoName, + &pd.RepoURL, + &pd.ChartName, + &pd.Namespace, + &pd.ReleaseName, + &pd.Version, + ) + + if err != nil { + log.Errorf("Fetch plugin details failed, %v", err) + return nil, err + } + return pd, nil +} diff --git a/integrator/common-pkg/plugins/helm/create.go b/integrator/common-pkg/plugins/helm/create.go index 09f7ef0c..58d4ca22 100644 --- a/integrator/common-pkg/plugins/helm/create.go +++ b/integrator/common-pkg/plugins/helm/create.go @@ -7,6 +7,7 @@ import ( "time" jsoniter "github.com/json-iterator/go" + "github.com/kube-tarian/kad/integrator/common-pkg/plugins/fetcher" helmclient "github.com/kube-tarian/kad/integrator/common-pkg/plugins/helm/go-helm-client" "github.com/kube-tarian/kad/integrator/model" "gopkg.in/yaml.v2" @@ -63,9 +64,6 @@ func (h *HelmCLient) Create(payload model.RequestPayload) (json.RawMessage, erro } func (h *HelmCLient) getHelmClient(req *model.Request) (helmclient.Client, error) { - // Change this to the namespace you wish the client to operate in. - // helmClient, err := helmclient.New(opt) - opt := &helmclient.Options{ Namespace: req.Namespace, RepositoryCache: "/tmp/.helmcache", @@ -75,10 +73,28 @@ func (h *HelmCLient) getHelmClient(req *model.Request) (helmclient.Client, error DebugLog: h.logger.Debugf, } + // If kubeconfig is empty (default) or inbuilt then use in-built(local) cluster + if req.ClusterName == "" || req.ClusterName == "inbuilt" { + return helmclient.New(opt) + } + + // External cluster + return h.getHelmClientForExternalCluster(req, opt) +} + +func (h *HelmCLient) getHelmClientForExternalCluster(req *model.Request, opt *helmclient.Options) (helmclient.Client, error) { + // Fetch external cluster kubeconfig from cassandra + clusterDetails, err := fetcher.FetchClusterDetails(h.logger, req.ClusterName) + if err != nil { + h.logger.Errorf("Failed to fetch the cluster details from cluster store, %v", err) + } + + // Unmarshall kubeconfig in yaml format if failed try with json format + // If not both yaml and json return error var yamlKubeConfig interface{} var jsonKubeConfig []byte - // err := yaml.Unmarshal([]byte(in_built_cluster), &yamlKubeConfig) - err := yaml.Unmarshal([]byte(req.KubeConfig), &yamlKubeConfig) + + err = yaml.Unmarshal([]byte(clusterDetails.Kubeconfig), &yamlKubeConfig) if err == nil { jsonKubeConfig, err = jsoniter.Marshal(yamlKubeConfig) if err != nil { @@ -86,18 +102,18 @@ func (h *HelmCLient) getHelmClient(req *model.Request) (helmclient.Client, error return nil, err } } else { - err1 := json.Unmarshal([]byte(req.KubeConfig), yamlKubeConfig) + err1 := json.Unmarshal([]byte(clusterDetails.Kubeconfig), yamlKubeConfig) if err1 != nil { h.logger.Errorf("kubeconfig not understanable format not in yaml or json. unmarshal failed, error: %v", err) return nil, err } - jsonKubeConfig = []byte(req.KubeConfig) + jsonKubeConfig = []byte(clusterDetails.Kubeconfig) } return helmclient.NewClientFromKubeConf( &helmclient.KubeConfClientOptions{ Options: opt, - KubeContext: "cluster-1", + KubeContext: req.ClusterName, KubeConfig: jsonKubeConfig, }, ) diff --git a/integrator/common-pkg/plugins/helm/go-helm-client/client.go b/integrator/common-pkg/plugins/helm/go-helm-client/client.go index 60015ad9..b98aaca7 100644 --- a/integrator/common-pkg/plugins/helm/go-helm-client/client.go +++ b/integrator/common-pkg/plugins/helm/go-helm-client/client.go @@ -19,7 +19,7 @@ import ( "helm.sh/helm/v3/pkg/cli" "helm.sh/helm/v3/pkg/downloader" "helm.sh/helm/v3/pkg/getter" - //"helm.sh/helm/v3/pkg/registry" + "helm.sh/helm/v3/pkg/registry" "helm.sh/helm/v3/pkg/release" "helm.sh/helm/v3/pkg/repo" v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" @@ -56,29 +56,13 @@ func NewClientFromKubeConf(options *KubeConfClientOptions, restClientOpts ...RES return nil, fmt.Errorf("kubeconfig missing") } - //clientGetter := NewRESTClientGetter(options.Namespace, options.KubeConfig, nil, restClientOpts...) - if os.Getenv("KUBECONFIG") != "" { - settings.KubeConfig = os.Getenv("KUBECONFIG") - } else { - settings.KubeToken = GetToken() - settings.KubeCaFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" - settings.KubeAPIServer = "https://kubernetes.default.svc.cluster.local" - } - - //if options.KubeContext != "" { - // settings.KubeContext = options.KubeContext - //} - - return newClient(options.Options, settings.RESTClientGetter(), settings) -} + clientGetter := NewRESTClientGetter(options.Namespace, options.KubeConfig, nil, restClientOpts...) -func GetToken() string { - token, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token") - if err != nil { - fmt.Println("failed to read the tokenfile") + if options.KubeContext != "" { + settings.KubeContext = options.KubeContext } - return string(token) + return newClient(options.Options, clientGetter, settings) } // NewClientFromRestConf returns a new Helm client constructed with the provided REST config options. @@ -120,14 +104,14 @@ func newClient(options *Options, clientGetter genericclioptions.RESTClientGetter return nil, err } - //registryClient, err := registry.NewClient( - // registry.ClientOptCredentialsFile(settings.RegistryConfig), - // registry.ClientOptDebug(settings.Debug), - //) + registryClient, err := registry.NewClient( + registry.ClientOptDebug(settings.Debug), + registry.ClientOptCredentialsFile(settings.RegistryConfig), + ) if err != nil { return nil, err } - //actionConfig.RegistryClient = settings.RESTClientGetter() + actionConfig.RegistryClient = registryClient return &HelmClient{ Settings: settings, @@ -365,6 +349,7 @@ func (c *HelmClient) upgrade(ctx context.Context, spec *ChartSpec, opts *Generic client := action.NewUpgrade(c.ActionConfig) mergeUpgradeOptions(spec, client) client.Install = true + client.ChartPathOptions.InsecureSkipTLSverify = true if client.Version == "" { client.Version = ">0.0.0-0" diff --git a/integrator/common-pkg/plugins/fetcher/cassandra_store.go b/integrator/common-pkg/plugins/utils/cassandra_store.go similarity index 69% rename from integrator/common-pkg/plugins/fetcher/cassandra_store.go rename to integrator/common-pkg/plugins/utils/cassandra_store.go index 7f5cfaf6..a5182956 100644 --- a/integrator/common-pkg/plugins/fetcher/cassandra_store.go +++ b/integrator/common-pkg/plugins/utils/cassandra_store.go @@ -1,4 +1,4 @@ -package fetcher +package utils import ( "github.com/gocql/gocql" @@ -6,10 +6,6 @@ import ( "github.com/kube-tarian/kad/integrator/common-pkg/logging" ) -const ( - FetchPluginQuery = `select name, repo_name, repo_url, chart_name, namespace, release_name, version from tools where name = ?;` -) - type Configuration struct { ServiceURL []string `envconfig:"CASSANDRA_SERVICE_URL" required:"true"` Username string `envconfig:"CASSANDRA_USERNAME" required:"true"` @@ -58,27 +54,10 @@ func NewStore(log logging.Logger) (*Store, error) { }, nil } -func (s *Store) Close() { - s.session.Close() +func (s *Store) GetSession() *gocql.Session { + return s.session } -func (s *Store) FetchPluginDetails(pluginName string) (*PluginDetails, error) { - pd := &PluginDetails{} - // name, repo_name, repo_url, chart_name, namespace, release_name, version - query := s.session.Query(FetchPluginQuery, pluginName) - err := query.Scan( - &pd.Name, - &pd.RepoName, - &pd.RepoURL, - &pd.ChartName, - &pd.Namespace, - &pd.ReleaseName, - &pd.Version, - ) - - if err != nil { - s.log.Errorf("Fetch plugin details failed, %v", err) - return nil, err - } - return pd, nil +func (s *Store) Close() { + s.session.Close() } diff --git a/integrator/common-pkg/plugins/fetcher/cassandra_store_test.go b/integrator/common-pkg/plugins/utils/cassandra_store_test.go similarity index 62% rename from integrator/common-pkg/plugins/fetcher/cassandra_store_test.go rename to integrator/common-pkg/plugins/utils/cassandra_store_test.go index f3658b24..57d4b851 100644 --- a/integrator/common-pkg/plugins/fetcher/cassandra_store_test.go +++ b/integrator/common-pkg/plugins/utils/cassandra_store_test.go @@ -1,4 +1,4 @@ -package fetcher +package utils import ( "os" @@ -22,13 +22,14 @@ func TestFetchArgoCDPluginDetails(t *testing.T) { if err != nil { return } + assert.NotNilf(t, store, "store session should get initialized") - pd, err := store.FetchPluginDetails("argocd") - assert.Nilf(t, err, "argocd plugin details should be able fetch") - assert.NotNilf(t, pd, "argocd plugin details failed to fetch") - if err != nil { - return - } + // pd, err := store.FetchPluginDetails("argocd") + // assert.Nilf(t, err, "argocd plugin details should be able fetch") + // assert.NotNilf(t, pd, "argocd plugin details failed to fetch") + // if err != nil { + // return + // } - t.Logf("argocd plugin details: %+v", pd) + // t.Logf("argocd plugin details: %+v", pd) } diff --git a/integrator/deployment-worker/main.go b/integrator/deployment-worker/main.go index 99ac5cdf..68c66ac7 100644 --- a/integrator/deployment-worker/main.go +++ b/integrator/deployment-worker/main.go @@ -7,19 +7,12 @@ import ( "github.com/kube-tarian/kad/integrator/common-pkg/logging" "github.com/kube-tarian/kad/integrator/deployment-worker/pkg/application" - "github.com/kube-tarian/kad/integrator/deployment-worker/pkg/db/cassandra" ) func main() { logger := logging.NewLogger() logger.Infof("Started deployment worker\n") - - db, err := cassandra.Create(logger) - if err != nil { - logger.Fatalf("failed to create db connection", err) - } - - app := application.New(logger, db) + app := application.New(logger) go app.Start() signals := make(chan os.Signal, 1) @@ -27,6 +20,5 @@ func main() { <-signals app.Close() - db.Close() logger.Infof("Exiting deployment worker\n") } diff --git a/integrator/deployment-worker/pkg/activities/activity.go b/integrator/deployment-worker/pkg/activities/activity.go index dc3260c2..5e440805 100644 --- a/integrator/deployment-worker/pkg/activities/activity.go +++ b/integrator/deployment-worker/pkg/activities/activity.go @@ -6,12 +6,9 @@ import ( "fmt" "strings" - "github.com/pkg/errors" - "github.com/kube-tarian/kad/integrator/common-pkg/logging" "github.com/kube-tarian/kad/integrator/common-pkg/plugins" workerframework "github.com/kube-tarian/kad/integrator/common-pkg/worker-framework" - "github.com/kube-tarian/kad/integrator/deployment-worker/pkg/db/cassandra" "github.com/kube-tarian/kad/integrator/model" ) @@ -39,7 +36,6 @@ func (a *Activities) DeploymentActivity(ctx context.Context, req model.RequestPa Message: json.RawMessage(fmt.Sprintf("{\"error\": \"%v\"}", strings.ReplaceAll(err.Error(), "\"", "\\\""))), }, fmt.Errorf("plugin not supports deployment activities") } - msg, err := deployerPlugin.DeployActivities(req) if err != nil { logger.Errorf("Deploy activities failed %s: %v", req.Action, err) @@ -49,73 +45,8 @@ func (a *Activities) DeploymentActivity(ctx context.Context, req model.RequestPa }, err } - if req.Action == "install" || req.Action == "update" { - if err := InsertToDb(logger, req.Data); err != nil { - logger.Errorf("insert db failed", err) - return model.ResponsePayload{ - Status: "Failed", - Message: json.RawMessage(fmt.Sprintf("database update failed %v", err)), - }, err - } - } else if req.Action == "delete" { - if err := DeleteDbEntry(logger, req.Data); err != nil { - return model.ResponsePayload{ - Status: "Failed", - Message: json.RawMessage(fmt.Sprintf("database update failed %v", err)), - }, err - } - } - return model.ResponsePayload{ Status: "Success", Message: msg, }, nil } - -func InsertToDb(logger logging.Logger, reqData json.RawMessage) error { - var data model.Request - fmt.Println("requestData", string(reqData)) - if err := json.Unmarshal(reqData, &data); err != nil { - return errors.Wrap(err, "failed to store data in database") - } - - dbConf, err := cassandra.GetDbConfig() - if err != nil { - return errors.Wrap(err, "failed to store data in database") - } - - db, err := cassandra.NewCassandraStore(logger, dbConf.DbAddresses, dbConf.DbAdminUsername, dbConf.DbAdminPassword) - if err != nil { - return errors.Wrap(err, "failed to store data in database") - } - - if err := db.InsertToolsDb(&data); err != nil { - return errors.Wrap(err, "failed to store data in database") - } - - return nil -} - -func DeleteDbEntry(logger logging.Logger, reqData json.RawMessage) error { - var data model.Request - fmt.Println("requestData", string(reqData)) - if err := json.Unmarshal(reqData, &data); err != nil { - return errors.Wrap(err, "failed to delete data in database") - } - - dbConf, err := cassandra.GetDbConfig() - if err != nil { - return errors.Wrap(err, "failed to delete data in database") - } - - db, err := cassandra.NewCassandraStore(logger, dbConf.DbAddresses, dbConf.DbAdminUsername, dbConf.DbAdminPassword) - if err != nil { - return errors.Wrap(err, "failed to delete data in database") - } - - if err := db.DeleteToolsDbEntry(&data); err != nil { - return errors.Wrap(err, "failed to delete data in database") - } - - return nil -} diff --git a/integrator/deployment-worker/pkg/application/application.go b/integrator/deployment-worker/pkg/application/application.go index 6bb3062b..93aa0b15 100755 --- a/integrator/deployment-worker/pkg/application/application.go +++ b/integrator/deployment-worker/pkg/application/application.go @@ -4,10 +4,9 @@ import ( "context" "errors" "fmt" - "github.com/go-chi/chi/v5" - "github.com/kube-tarian/kad/integrator/deployment-worker/pkg/db/cassandra" "net/http" + "github.com/go-chi/chi/v5" "github.com/kelseyhightower/envconfig" "github.com/kube-tarian/kad/integrator/common-pkg/logging" workerframework "github.com/kube-tarian/kad/integrator/common-pkg/worker-framework" @@ -31,10 +30,9 @@ type Application struct { httpServer *http.Server worker *workerframework.Worker logger logging.Logger - Db cassandra.Store } -func New(logger logging.Logger, db cassandra.Store) *Application { +func New(logger logging.Logger) *Application { cfg := &Configuration{} if err := envconfig.Process("", cfg); err != nil { logger.Fatalf("Could not parse env Config: %v\n", err) @@ -64,7 +62,6 @@ func New(logger logging.Logger, db cassandra.Store) *Application { httpServer: httpServer, worker: worker, logger: logger, - Db: db, } } diff --git a/integrator/model/payload.go b/integrator/model/payload.go index dfb88c3e..f35d47da 100644 --- a/integrator/model/payload.go +++ b/integrator/model/payload.go @@ -34,5 +34,5 @@ type Request struct { Timeout int `json:"timeout" default:"5"` Version string `json:"version"` - KubeConfig string `json:"kube_config" required:"true"` + ClusterName string `json:"cluster_name" required:"false"` } diff --git a/integrator/tests/argocd-helm-plugin-delete.json b/integrator/tests/argocd-helm-plugin-delete.json index 38dadee7..b6ed2b10 100644 --- a/integrator/tests/argocd-helm-plugin-delete.json +++ b/integrator/tests/argocd-helm-plugin-delete.json @@ -1,5 +1,5 @@ { - "operation": "deployment", + "operation": "climon", "payload": { "plugin_name": "helm", "action": "delete", @@ -10,8 +10,7 @@ "namespace": "default", "release_name": "argocd", "timeout": 5, - "version": "", - "kube_config": "apiVersion: v1\nclusters:\n- cluster:\n certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUMvakNDQWVhZ0F3SUJBZ0lCQURBTkJna3Foa2lHOXcwQkFRc0ZBREFWTVJNd0VRWURWUVFERXdwcmRXSmwKY201bGRHVnpNQjRYRFRJek1ERXhNVEl6TWpFME1Wb1hEVE16TURFd09ESXpNakUwTVZvd0ZURVRNQkVHQTFVRQpBeE1LYTNWaVpYSnVaWFJsY3pDQ0FTSXdEUVlKS29aSWh2Y05BUUVCQlFBRGdnRVBBRENDQVFvQ2dnRUJBTEE1CkpJZ3ZrKytldGsxbytBeEs4bzFyeE55MkE5eGg1L1F2Z1NxTFBRcU9odEh0TTFHUEhOdG5nN0RWMHlHbTJxR1kKVFVGWEE4b25hbjlvVUs3TTcyS0ZDUDh3Q3dzRFpWeGRBZTBPQWplNkh5OGhqYS9GcHVZMFF6c1VJOXAzNzdnNQpQeHYwc0sydVNNQXZQSXNtcXg1VFVyTmdVYWVMUmlPNGlDdXBwMDkvdXp6TGZYcFM1cWtwbWpHRjEzdjlvcHZSCnVUUjVidndJSGRvTHJYbVZoOURXUlU5bXZwalM5NHhTV3V5RDRTOTVDOFRYNStSVnhHQ0ozNlRSelhtTXJ0dGsKeUEvTzNpVFIxa0hac2dtcE9VeHhLaHRjd2tTRTdaVW9Uc2xScmlYb2piMkErRzhWTEhPb2Zsb3Vxb1RZRHNHTgphMC9EYkVRblp1RVdUNEd0czRNQ0F3RUFBYU5aTUZjd0RnWURWUjBQQVFIL0JBUURBZ0trTUE4R0ExVWRFd0VCCi93UUZNQU1CQWY4d0hRWURWUjBPQkJZRUZCZmxMZ3pKMTlNSWNVcUVRWloxWjlJY2dZa09NQlVHQTFVZEVRUU8KTUF5Q0NtdDFZbVZ5Ym1WMFpYTXdEUVlKS29aSWh2Y05BUUVMQlFBRGdnRUJBSTZkcWtXZ2pGcHZuNS9ieTJHeQpvRmtJQjJKR2d4V1IreG1MNmFhRHFiOTRFdGMvYnFKeXU2eVNZM01SQnV2c05BWUlGalp2M0YxT1R4ay9VaWVNCm03YVRCSEt1WUh3QlNhK25UUkdWRGlGaVVOWloxSGpUTEFleGJpRzNDUHVyWDhDRlhQTXJuSG5GdTBwYXlEOUwKUXUwbkxhRVR3MG1Ub1QreWZVYU4rL3gzNUd3L3ZvanhuT0dSQWtjb2xIK3NUcmFscnZLN1plUTJxTzhGNjBPdwpPOU1qdGRTWC91M3Z3RFRPNVMyWWxOanBJZEtCbmJyK2lzMkhtR3IxYVQxb01rbDRTeWxodDlCNUlFbC92UnlqCk9waWlaalhyV0UzWGtPVFZiV3h3Ty9BN3d3a0ZoR1prWE5kSjFGaENIWTFPRFMvTjI2dWJvOHdFM2dQay9IT1gKUWtnPQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg==\n server: https://192.168.73.3:8443\n name: kind-dev-cluster\ncontexts:\n- context:\n cluster: kind-dev-cluster\n user: kind-dev-cluster\n name: kind-dev-cluster\ncurrent-context: kind-dev-cluster\nkind: Config\npreferences: {}\nusers:\n- name: kind-dev-cluster\n user:\n client-certificate-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURJVENDQWdtZ0F3SUJBZ0lJV1Z6K2hZbG56clF3RFFZSktvWklodmNOQVFFTEJRQXdGVEVUTUJFR0ExVUUKQXhNS2EzVmlaWEp1WlhSbGN6QWVGdzB5TXpBeE1URXlNekl4TkRGYUZ3MHlOREF4TVRFeU16SXhOREphTURReApGekFWQmdOVkJBb1REbk41YzNSbGJUcHRZWE4wWlhKek1Sa3dGd1lEVlFRREV4QnJkV0psY201bGRHVnpMV0ZrCmJXbHVNSUlCSWpBTkJna3Foa2lHOXcwQkFRRUZBQU9DQVE4QU1JSUJDZ0tDQVFFQTBwWUNxNjg5bmk0NEc0V1UKbUt1eVBxK3YzeHNTNjRmNFZsT3VyZzN2SFlVWlJBY003djhlV253R1ByWGZvclgrZHhSQlBaR0NzRGIrU2NKOQpTOXZ2d2l5Z3pURUtXUXZubzY1a3FWdkhNT09veGcvdURyK1QwTW5zQ1dzVitJTmNCMFVmejFMbXRHOFl1VEZ0ClRiZEM4U3gxMEtLVEY4cDdUanhNUm9zdXhaU29JS3dQd2ZZYzVvbkw3VzQzSW9UTlROVi9zd2RLWElDVkE4aVAKL1ZTdk9jTkFGamlBUnBwa0pqV0Rsd2xCK20yemhib2FCK2ZPS1B3YUZDRHBuaktGMzk3UHFtenE0dlRjdUtRKwo4TDZkM1ljV1dLNWltN2dRSUIyTkJJaVQxYk1yZUxKK0NHdkJwNDJQUWlyNlFkLzJXQ1FzNXJFYnR2b0hoNEROClIrSFhIUUlEQVFBQm8xWXdWREFPQmdOVkhROEJBZjhFQkFNQ0JhQXdFd1lEVlIwbEJBd3dDZ1lJS3dZQkJRVUgKQXdJd0RBWURWUjBUQVFIL0JBSXdBREFmQmdOVkhTTUVHREFXZ0JRWDVTNE15ZGZUQ0hGS2hFR1dkV2ZTSElHSgpEakFOQmdrcWhraUc5dzBCQVFzRkFBT0NBUUVBZVBUT09FSVJPLzFKRWtyYWFCR04va0RDOGZMTmkwSENlUlhSCkVLVUI0cDR6T3p3YTBLdE9waHA2TlRBbkc2akwrYUVNZ05qRFppeTdTRnZqUG9oYTNqaHBXNkh6amMzUzlEcGgKZnYwUnhQVk9CY1J1ZkRDcnJPemw3V3paVnJWcmVGekxMSnhiQ3VOOFlWeEVHOTlXSExsaVRIc2xRelpjZFMyUgpyNEZnei9DSkMzQVJKNStLV2daMzlzUzBBRi9ST3UwMXVEb1pBU0R0Tmp3TnIvMmRTV25vQXFZdGo0STRUbHhpCjVCWWkzNUlFTXA0am1haTRaNFNDc2tIOVBWeTdLMUdnU0o0eHd2VFV6RTZBQWh1VEMzL2RuUHd0TDhQQkRyT3gKUlJpWFJUbGtmbW9HTkFHejNhTlE5MUM3S1ZjWjc5RjZXYW5GQUthREVROHFETi9oSGc9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg==\n client-key-data: LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb2dJQkFBS0NBUUVBMHBZQ3E2ODluaTQ0RzRXVW1LdXlQcSt2M3hzUzY0ZjRWbE91cmczdkhZVVpSQWNNCjd2OGVXbndHUHJYZm9yWCtkeFJCUFpHQ3NEYitTY0o5Uzl2dndpeWd6VEVLV1F2bm82NWtxVnZITU9Pb3hnL3UKRHIrVDBNbnNDV3NWK0lOY0IwVWZ6MUxtdEc4WXVURnRUYmRDOFN4MTBLS1RGOHA3VGp4TVJvc3V4WlNvSUt3UAp3ZlljNW9uTDdXNDNJb1ROVE5WL3N3ZEtYSUNWQThpUC9WU3ZPY05BRmppQVJwcGtKaldEbHdsQittMnpoYm9hCkIrZk9LUHdhRkNEcG5qS0YzOTdQcW16cTR2VGN1S1ErOEw2ZDNZY1dXSzVpbTdnUUlCMk5CSWlUMWJNcmVMSisKQ0d2QnA0MlBRaXI2UWQvMldDUXM1ckVidHZvSGg0RE5SK0hYSFFJREFRQUJBb0lCQUc2Uzdod1FEQjYremg5RgphTjB4YW9xWDNaVWN0amFPVXN1aGJSdGZuYXEyZEtuUHVlN1VicSs4WjlzTnpMdTNMRUtDbEM4cjlKOXFnT05pCkNFQ0kzNy9waHhXM0ptUFRhSEg5NUVVNU44Sm9CL3JYNm53OEEvV2gwUnF3Ni94dG5Ta0VGc3ZhRCtHMlpCajUKNXhiam4zYmJqWkZiakRqMXpRRXJrREdLYTZpNmpRanlmT2ZhV3RPWFJreDVHUURHMDkrSGNtYlQyYXlOOFdSNQpvSjZnNEU5UVVQTUduR3B2M2U2YzM5SHVIMU9OelhpQUp3M0JEekxhNmhhY0pWY3g0RGRUUFAzaFNsdHltZVJLCjVKWVBub0F5UXFoQ3dCT0wyTUtZNmp1b2JGbDl1aFc3d2t6ZTBCTXJzd3YrZDViTDNpU2Z5MkI5WXcrWk9FeVQKVldFcVR0RUNnWUVBMHZhZXY0SWNBQWhkZWZnbnB3L0NwSFcybkhGYmFTRHRzaWF1b2ZGVjdYR3ZBYVV0ZzVQZwpJV3ZOdkZVQWhrMkdUdVBHdlp6aFZiRElsNVRjSWU5OXFmb0JoUFBlY3pLcnZ2NHluRkx1SEpsaldCWmJJaEZOCjZxM3VlNTQ5eXdQc3pqV0QzSnd3dFlQZjZQcm1QSzlneElyY3dTV05NR3ZvVFVlQmM3Qm1KZ2NDZ1lFQS80ckUKR09uanJMTHpRd1dkNDRjanVtVHlwNFJtVlhYMXdTR0ZJdXhhTGJXQmlpWC90cHpMWmVIdWdBN2xvK01WUlZjLwprbmp6WTZQbUM0djg4bU9FSnFzZ3gwd3dFNHowOUFSaVpLQWFyMlFYMGJPUjhwNmV5TU90RFoyOVViOUh3RXhHCkt3dGtsTXg5NXhkd0NrYk1ockh5bFdIcGVzWkM2cTVlZjU0WGNMc0NnWUJhMllnTjB2czU3R0JOQ1ZnU010QlEKd0x5dWJJYkFKRVVZeGwzSU1jVWVaeW5Gbkp1WUlWT1JNUHE5a3lHUnRNc1ZLRFJMTGNkQWZzd3pzeENGc0x3KwpPZ0x6Zlk0YnNBT1VVYVg3K2g2K3hET3JHSjJRYzBGSndqT0VtdVhqaXNJdEg1QzByYkt3U0tWaGtNTWIrUzdFCkZVVHlETGpiMUd5Szh6TkZYZjd2ZXdLQmdBZWVlWTVNbXU4eFByT0czVmhGVlRsZmZTU2xlKytjWHNGdFlHelUKSXpRdHJ6a1JQUGlTNERXZmNONzhrcmc2TXc0b05jc0dOQ3VLWFhlR3F2b0hJWStObHFLYWtPeGtUWUZoQ0JYNworQSsycWtja1ZYdW9ZdytWVmZtTDlITVZndXdtMmdpNmhEc3poYVY0TzJ6ekEzSVlxQ1R3RUdnS3RVQU9CdDlECk5XdTFBb0dBVFQvOWhpSzVnY2hodGoybzZLTTlEL3FjNEgyMDdSWE42VytzUFE1Ui9XZ2huWFA0QXVKbzZIaGgKOXp0R2tUMWVoMC9OSmphN29HWDlFRi93RFJPVHBhYWNOZlRxOVVrL0svY1VaQThQdDBBQy8wR3p0emp0YTNjOApKNVl0aHdnTHlXcFlFaVJwUTJQcUVMQzgxdm5qdnh4R2dNVlNQQnBCeTB0MDFIamVJMnM9Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg==\n" + "version": "" } } } diff --git a/integrator/tests/argocd-helm-plugin.json b/integrator/tests/argocd-helm-plugin.json index 4bc2716d..0facc4e6 100644 --- a/integrator/tests/argocd-helm-plugin.json +++ b/integrator/tests/argocd-helm-plugin.json @@ -1,5 +1,5 @@ { - "operation": "deployment", + "operation": "climon", "payload": { "plugin_name": "helm", "action": "install", @@ -10,8 +10,7 @@ "namespace": "default", "release_name": "argocd", "timeout": 5, - "version": "", - "kube_config": "apiVersion: v1\nclusters:\n- cluster:\n certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUMvakNDQWVhZ0F3SUJBZ0lCQURBTkJna3Foa2lHOXcwQkFRc0ZBREFWTVJNd0VRWURWUVFERXdwcmRXSmwKY201bGRHVnpNQjRYRFRJek1ERXhNVEl6TWpFME1Wb1hEVE16TURFd09ESXpNakUwTVZvd0ZURVRNQkVHQTFVRQpBeE1LYTNWaVpYSnVaWFJsY3pDQ0FTSXdEUVlKS29aSWh2Y05BUUVCQlFBRGdnRVBBRENDQVFvQ2dnRUJBTEE1CkpJZ3ZrKytldGsxbytBeEs4bzFyeE55MkE5eGg1L1F2Z1NxTFBRcU9odEh0TTFHUEhOdG5nN0RWMHlHbTJxR1kKVFVGWEE4b25hbjlvVUs3TTcyS0ZDUDh3Q3dzRFpWeGRBZTBPQWplNkh5OGhqYS9GcHVZMFF6c1VJOXAzNzdnNQpQeHYwc0sydVNNQXZQSXNtcXg1VFVyTmdVYWVMUmlPNGlDdXBwMDkvdXp6TGZYcFM1cWtwbWpHRjEzdjlvcHZSCnVUUjVidndJSGRvTHJYbVZoOURXUlU5bXZwalM5NHhTV3V5RDRTOTVDOFRYNStSVnhHQ0ozNlRSelhtTXJ0dGsKeUEvTzNpVFIxa0hac2dtcE9VeHhLaHRjd2tTRTdaVW9Uc2xScmlYb2piMkErRzhWTEhPb2Zsb3Vxb1RZRHNHTgphMC9EYkVRblp1RVdUNEd0czRNQ0F3RUFBYU5aTUZjd0RnWURWUjBQQVFIL0JBUURBZ0trTUE4R0ExVWRFd0VCCi93UUZNQU1CQWY4d0hRWURWUjBPQkJZRUZCZmxMZ3pKMTlNSWNVcUVRWloxWjlJY2dZa09NQlVHQTFVZEVRUU8KTUF5Q0NtdDFZbVZ5Ym1WMFpYTXdEUVlKS29aSWh2Y05BUUVMQlFBRGdnRUJBSTZkcWtXZ2pGcHZuNS9ieTJHeQpvRmtJQjJKR2d4V1IreG1MNmFhRHFiOTRFdGMvYnFKeXU2eVNZM01SQnV2c05BWUlGalp2M0YxT1R4ay9VaWVNCm03YVRCSEt1WUh3QlNhK25UUkdWRGlGaVVOWloxSGpUTEFleGJpRzNDUHVyWDhDRlhQTXJuSG5GdTBwYXlEOUwKUXUwbkxhRVR3MG1Ub1QreWZVYU4rL3gzNUd3L3ZvanhuT0dSQWtjb2xIK3NUcmFscnZLN1plUTJxTzhGNjBPdwpPOU1qdGRTWC91M3Z3RFRPNVMyWWxOanBJZEtCbmJyK2lzMkhtR3IxYVQxb01rbDRTeWxodDlCNUlFbC92UnlqCk9waWlaalhyV0UzWGtPVFZiV3h3Ty9BN3d3a0ZoR1prWE5kSjFGaENIWTFPRFMvTjI2dWJvOHdFM2dQay9IT1gKUWtnPQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg==\n server: https://192.168.73.3:8443\n name: kind-dev-cluster\ncontexts:\n- context:\n cluster: kind-dev-cluster\n user: kind-dev-cluster\n name: kind-dev-cluster\ncurrent-context: kind-dev-cluster\nkind: Config\npreferences: {}\nusers:\n- name: kind-dev-cluster\n user:\n client-certificate-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURJVENDQWdtZ0F3SUJBZ0lJV1Z6K2hZbG56clF3RFFZSktvWklodmNOQVFFTEJRQXdGVEVUTUJFR0ExVUUKQXhNS2EzVmlaWEp1WlhSbGN6QWVGdzB5TXpBeE1URXlNekl4TkRGYUZ3MHlOREF4TVRFeU16SXhOREphTURReApGekFWQmdOVkJBb1REbk41YzNSbGJUcHRZWE4wWlhKek1Sa3dGd1lEVlFRREV4QnJkV0psY201bGRHVnpMV0ZrCmJXbHVNSUlCSWpBTkJna3Foa2lHOXcwQkFRRUZBQU9DQVE4QU1JSUJDZ0tDQVFFQTBwWUNxNjg5bmk0NEc0V1UKbUt1eVBxK3YzeHNTNjRmNFZsT3VyZzN2SFlVWlJBY003djhlV253R1ByWGZvclgrZHhSQlBaR0NzRGIrU2NKOQpTOXZ2d2l5Z3pURUtXUXZubzY1a3FWdkhNT09veGcvdURyK1QwTW5zQ1dzVitJTmNCMFVmejFMbXRHOFl1VEZ0ClRiZEM4U3gxMEtLVEY4cDdUanhNUm9zdXhaU29JS3dQd2ZZYzVvbkw3VzQzSW9UTlROVi9zd2RLWElDVkE4aVAKL1ZTdk9jTkFGamlBUnBwa0pqV0Rsd2xCK20yemhib2FCK2ZPS1B3YUZDRHBuaktGMzk3UHFtenE0dlRjdUtRKwo4TDZkM1ljV1dLNWltN2dRSUIyTkJJaVQxYk1yZUxKK0NHdkJwNDJQUWlyNlFkLzJXQ1FzNXJFYnR2b0hoNEROClIrSFhIUUlEQVFBQm8xWXdWREFPQmdOVkhROEJBZjhFQkFNQ0JhQXdFd1lEVlIwbEJBd3dDZ1lJS3dZQkJRVUgKQXdJd0RBWURWUjBUQVFIL0JBSXdBREFmQmdOVkhTTUVHREFXZ0JRWDVTNE15ZGZUQ0hGS2hFR1dkV2ZTSElHSgpEakFOQmdrcWhraUc5dzBCQVFzRkFBT0NBUUVBZVBUT09FSVJPLzFKRWtyYWFCR04va0RDOGZMTmkwSENlUlhSCkVLVUI0cDR6T3p3YTBLdE9waHA2TlRBbkc2akwrYUVNZ05qRFppeTdTRnZqUG9oYTNqaHBXNkh6amMzUzlEcGgKZnYwUnhQVk9CY1J1ZkRDcnJPemw3V3paVnJWcmVGekxMSnhiQ3VOOFlWeEVHOTlXSExsaVRIc2xRelpjZFMyUgpyNEZnei9DSkMzQVJKNStLV2daMzlzUzBBRi9ST3UwMXVEb1pBU0R0Tmp3TnIvMmRTV25vQXFZdGo0STRUbHhpCjVCWWkzNUlFTXA0am1haTRaNFNDc2tIOVBWeTdLMUdnU0o0eHd2VFV6RTZBQWh1VEMzL2RuUHd0TDhQQkRyT3gKUlJpWFJUbGtmbW9HTkFHejNhTlE5MUM3S1ZjWjc5RjZXYW5GQUthREVROHFETi9oSGc9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg==\n client-key-data: LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb2dJQkFBS0NBUUVBMHBZQ3E2ODluaTQ0RzRXVW1LdXlQcSt2M3hzUzY0ZjRWbE91cmczdkhZVVpSQWNNCjd2OGVXbndHUHJYZm9yWCtkeFJCUFpHQ3NEYitTY0o5Uzl2dndpeWd6VEVLV1F2bm82NWtxVnZITU9Pb3hnL3UKRHIrVDBNbnNDV3NWK0lOY0IwVWZ6MUxtdEc4WXVURnRUYmRDOFN4MTBLS1RGOHA3VGp4TVJvc3V4WlNvSUt3UAp3ZlljNW9uTDdXNDNJb1ROVE5WL3N3ZEtYSUNWQThpUC9WU3ZPY05BRmppQVJwcGtKaldEbHdsQittMnpoYm9hCkIrZk9LUHdhRkNEcG5qS0YzOTdQcW16cTR2VGN1S1ErOEw2ZDNZY1dXSzVpbTdnUUlCMk5CSWlUMWJNcmVMSisKQ0d2QnA0MlBRaXI2UWQvMldDUXM1ckVidHZvSGg0RE5SK0hYSFFJREFRQUJBb0lCQUc2Uzdod1FEQjYremg5RgphTjB4YW9xWDNaVWN0amFPVXN1aGJSdGZuYXEyZEtuUHVlN1VicSs4WjlzTnpMdTNMRUtDbEM4cjlKOXFnT05pCkNFQ0kzNy9waHhXM0ptUFRhSEg5NUVVNU44Sm9CL3JYNm53OEEvV2gwUnF3Ni94dG5Ta0VGc3ZhRCtHMlpCajUKNXhiam4zYmJqWkZiakRqMXpRRXJrREdLYTZpNmpRanlmT2ZhV3RPWFJreDVHUURHMDkrSGNtYlQyYXlOOFdSNQpvSjZnNEU5UVVQTUduR3B2M2U2YzM5SHVIMU9OelhpQUp3M0JEekxhNmhhY0pWY3g0RGRUUFAzaFNsdHltZVJLCjVKWVBub0F5UXFoQ3dCT0wyTUtZNmp1b2JGbDl1aFc3d2t6ZTBCTXJzd3YrZDViTDNpU2Z5MkI5WXcrWk9FeVQKVldFcVR0RUNnWUVBMHZhZXY0SWNBQWhkZWZnbnB3L0NwSFcybkhGYmFTRHRzaWF1b2ZGVjdYR3ZBYVV0ZzVQZwpJV3ZOdkZVQWhrMkdUdVBHdlp6aFZiRElsNVRjSWU5OXFmb0JoUFBlY3pLcnZ2NHluRkx1SEpsaldCWmJJaEZOCjZxM3VlNTQ5eXdQc3pqV0QzSnd3dFlQZjZQcm1QSzlneElyY3dTV05NR3ZvVFVlQmM3Qm1KZ2NDZ1lFQS80ckUKR09uanJMTHpRd1dkNDRjanVtVHlwNFJtVlhYMXdTR0ZJdXhhTGJXQmlpWC90cHpMWmVIdWdBN2xvK01WUlZjLwprbmp6WTZQbUM0djg4bU9FSnFzZ3gwd3dFNHowOUFSaVpLQWFyMlFYMGJPUjhwNmV5TU90RFoyOVViOUh3RXhHCkt3dGtsTXg5NXhkd0NrYk1ockh5bFdIcGVzWkM2cTVlZjU0WGNMc0NnWUJhMllnTjB2czU3R0JOQ1ZnU010QlEKd0x5dWJJYkFKRVVZeGwzSU1jVWVaeW5Gbkp1WUlWT1JNUHE5a3lHUnRNc1ZLRFJMTGNkQWZzd3pzeENGc0x3KwpPZ0x6Zlk0YnNBT1VVYVg3K2g2K3hET3JHSjJRYzBGSndqT0VtdVhqaXNJdEg1QzByYkt3U0tWaGtNTWIrUzdFCkZVVHlETGpiMUd5Szh6TkZYZjd2ZXdLQmdBZWVlWTVNbXU4eFByT0czVmhGVlRsZmZTU2xlKytjWHNGdFlHelUKSXpRdHJ6a1JQUGlTNERXZmNONzhrcmc2TXc0b05jc0dOQ3VLWFhlR3F2b0hJWStObHFLYWtPeGtUWUZoQ0JYNworQSsycWtja1ZYdW9ZdytWVmZtTDlITVZndXdtMmdpNmhEc3poYVY0TzJ6ekEzSVlxQ1R3RUdnS3RVQU9CdDlECk5XdTFBb0dBVFQvOWhpSzVnY2hodGoybzZLTTlEL3FjNEgyMDdSWE42VytzUFE1Ui9XZ2huWFA0QXVKbzZIaGgKOXp0R2tUMWVoMC9OSmphN29HWDlFRi93RFJPVHBhYWNOZlRxOVVrL0svY1VaQThQdDBBQy8wR3p0emp0YTNjOApKNVl0aHdnTHlXcFlFaVJwUTJQcUVMQzgxdm5qdnh4R2dNVlNQQnBCeTB0MDFIamVJMnM9Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg==\n" + "version": "" } } } diff --git a/integrator/migrations/cassandra/001_add_tab.down.cql b/integrator/tests/migrations/cassandra/001_add_tab.down.cql similarity index 100% rename from integrator/migrations/cassandra/001_add_tab.down.cql rename to integrator/tests/migrations/cassandra/001_add_tab.down.cql diff --git a/integrator/migrations/cassandra/001_add_tab.up.cql b/integrator/tests/migrations/cassandra/001_add_tab.up.cql similarity index 100% rename from integrator/migrations/cassandra/001_add_tab.up.cql rename to integrator/tests/migrations/cassandra/001_add_tab.up.cql diff --git a/server/cmd/server/main.go b/server/cmd/server/main.go index 9705ff74..ec9d9a22 100644 --- a/server/cmd/server/main.go +++ b/server/cmd/server/main.go @@ -2,7 +2,6 @@ package main import ( "fmt" - "github.com/kube-tarian/kad/server/pkg/db" "os" "os/signal" "syscall" @@ -12,6 +11,7 @@ import ( middleware "github.com/deepmap/oapi-codegen/pkg/gin-middleware" "github.com/kube-tarian/kad/server/api" "github.com/kube-tarian/kad/server/pkg/config" + "github.com/kube-tarian/kad/server/pkg/db" "github.com/kube-tarian/kad/server/pkg/handler" "github.com/kube-tarian/kad/server/pkg/logging" ) diff --git a/server/pkg/client/agent.go b/server/pkg/client/agent.go index 0983d715..27690f01 100644 --- a/server/pkg/client/agent.go +++ b/server/pkg/client/agent.go @@ -2,20 +2,24 @@ package client import ( "context" + "crypto/tls" + "crypto/x509" "fmt" + "io/ioutil" + "github.com/kube-tarian/kad/server/pkg/db" - "github.com/sirupsen/logrus" "github.com/kelseyhightower/envconfig" "github.com/kube-tarian/kad/agent/pkg/logging" "github.com/kube-tarian/kad/server/pkg/pb/agentpb" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" ) type Configuration struct { - AgentAddress string `envconfig:"AGENT_ADDRESS" default:"localhost"` - AgentPort int `envconfig:"AGENT_PORT" default:"9091"` + AgentAddress string `envconfig:"AGENT_ADDRESS" default:"localhost:9091"` + IsSSLEnabled bool `envconfig:"IS_SSL_ENABLED" default:"false"` } type Agent struct { @@ -27,17 +31,30 @@ type Agent struct { // NewAgent returns agent object creates grpc connection for given address func NewAgent(log logging.Logger) (*Agent, error) { - cfg, err := fetchConfiguration() + cfg, err := fetchConfiguration(log) if err != nil { return nil, err } - conn, err := grpc.Dial(fmt.Sprintf("%s:%d", cfg.AgentAddress, cfg.AgentPort), grpc.WithTransportCredentials(insecure.NewCredentials())) + var conn *grpc.ClientConn + if cfg.IsSSLEnabled { + // TODO: loadTLSCredential to be implemented when mtls is introduced + return nil, fmt.Errorf("SSL is not supported currently to agent") + + // tlsCredentials, lErr := loadTLSCredentials() + // if lErr != nil { + // log.Errorf("cannot load TLS credentials: ", lErr) + // return nil, lErr + // } + // conn, err = grpc.Dial(cfg.AgentAddress, grpc.WithTransportCredentials(tlsCredentials)) + } else { + conn, err = grpc.Dial(cfg.AgentAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + } if err != nil { log.Errorf("failed to connect: %v", err) return nil, err } - log.Infof("gRPC connection started to %s:%d", cfg.AgentAddress, cfg.AgentPort) + log.Infof("gRPC connection started to %s", cfg.AgentAddress) agentClient := agentpb.NewAgentClient(conn) return &Agent{ @@ -60,22 +77,53 @@ func (a *Agent) Close() { a.log.Info("gRPC connection closed") } -func fetchConfiguration() (*Configuration, error) { +func fetchConfiguration(log logging.Logger) (*Configuration, error) { cfg := &Configuration{} err := envconfig.Process("", cfg) + if err != nil { + log.Errorf("env configuration fetch failed, %v", err) + return nil, err + } + session, err := db.New() if err != nil { - logrus.Error("failed to get db session", err) + log.Error("failed to get db session", err) return nil, err } //todo make customerID dynamic : Ganesh endpoint, err := session.GetEndpoint("1") if err != nil { - logrus.Error("failed to get db session", err) + log.Error("failed to get db session", err) return nil, err } cfg.AgentAddress = endpoint return cfg, err } + +func loadTLSCredentials() (credentials.TransportCredentials, error) { + // Load certificate of the CA who signed server's certificate + certificate, err := tls.LoadX509KeyPair("certs/client.crt", "certs/client.key") + if err != nil { + panic("Load client certification failed: " + err.Error()) + } + + pemServerCA, err := ioutil.ReadFile("certs/dev.optimizor.app.crt") + if err != nil { + return nil, err + } + + certPool := x509.NewCertPool() + if !certPool.AppendCertsFromPEM(pemServerCA) { + return nil, fmt.Errorf("failed to add server CA's certificate") + } + + // Create the credentials and return it + config := &tls.Config{ + Certificates: []tls.Certificate{certificate}, + RootCAs: certPool, + } + + return credentials.NewTLS(config), nil +} diff --git a/server/pkg/handler/handle_agent.go b/server/pkg/handler/handle_agent.go index 7d738f41..eb449e02 100644 --- a/server/pkg/handler/handle_agent.go +++ b/server/pkg/handler/handle_agent.go @@ -1,10 +1,10 @@ package handler import ( - "github.com/kube-tarian/kad/server/pkg/db" - "github.com/sirupsen/logrus" "net/http" + "github.com/kube-tarian/kad/server/pkg/db" + "github.com/gin-gonic/gin" "github.com/kube-tarian/kad/server/api" "github.com/kube-tarian/kad/server/pkg/model" @@ -22,19 +22,19 @@ func (s *APIHanlder) PostRegisterAgent(c *gin.Context) { //TODO Save in DB and internal cache session, err := db.New() if err != nil { - c.IndentedJSON(http.StatusInternalServerError, &model.DeployResponse{ + c.IndentedJSON(http.StatusInternalServerError, &api.Response{ Status: "FAILED", Message: "failed to get db session"}) - logrus.Error("failed to get db session", err) + s.log.Error("failed to get db session", err) return } err = session.RegisterEndpoint(req.CustomerId, req.Endpoint) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, &model.DeployResponse{ + c.IndentedJSON(http.StatusInternalServerError, &api.Response{ Status: "FAILED", Message: "failed to store data"}) - logrus.Error("failed to get db session", err) + s.log.Error("failed to get db session", err) return } diff --git a/server/pkg/handler/handle_climon.go b/server/pkg/handler/handle_climon.go index 07b5cc54..11412db9 100644 --- a/server/pkg/handler/handle_climon.go +++ b/server/pkg/handler/handle_climon.go @@ -8,6 +8,7 @@ import ( "github.com/gin-gonic/gin" "github.com/kube-tarian/kad/server/api" + "github.com/kube-tarian/kad/server/pkg/client" "github.com/kube-tarian/kad/server/pkg/pb/agentpb" "google.golang.org/protobuf/types/known/anypb" ) @@ -31,7 +32,15 @@ func (s *APIHanlder) PostClimon(c *gin.Context) { return } - response, err := s.client.SubmitJob( + agentClient, err := client.NewAgent(s.log) + if err != nil { + s.log.Errorf("failed to connect agent internal error", err) + s.sendResponse(c, "agent connection failed", err) + return + } + defer agentClient.Close() + + response, err := agentClient.SubmitJob( ctx, &agentpb.JobRequest{ Operation: req.Operation, diff --git a/server/pkg/handler/handle_config.go b/server/pkg/handler/handle_config.go index 81931881..d750b63b 100644 --- a/server/pkg/handler/handle_config.go +++ b/server/pkg/handler/handle_config.go @@ -8,6 +8,7 @@ import ( "github.com/gin-gonic/gin" "github.com/kube-tarian/kad/server/api" + "github.com/kube-tarian/kad/server/pkg/client" "github.com/kube-tarian/kad/server/pkg/pb/agentpb" "google.golang.org/protobuf/types/known/anypb" ) @@ -33,7 +34,15 @@ func (s *APIHanlder) PostConfig(c *gin.Context) { // TODO: currently climon payload is submitted to temporal via agent. // This flow has to modified as per the understanding. - response, err := s.client.SubmitJob( + agentClient, err := client.NewAgent(s.log) + if err != nil { + s.log.Errorf("failed to connect agent internal error", err) + s.sendResponse(c, "agent connection failed", err) + return + } + defer agentClient.Close() + + response, err := agentClient.SubmitJob( ctx, &agentpb.JobRequest{ Operation: req.Operation, diff --git a/server/pkg/handler/handle_deploy.go b/server/pkg/handler/handle_deploy.go index 2294e654..7fdf5ba8 100644 --- a/server/pkg/handler/handle_deploy.go +++ b/server/pkg/handler/handle_deploy.go @@ -9,6 +9,7 @@ import ( "github.com/gin-gonic/gin" "github.com/kube-tarian/kad/server/api" + "github.com/kube-tarian/kad/server/pkg/client" "github.com/kube-tarian/kad/server/pkg/pb/agentpb" "google.golang.org/protobuf/types/known/anypb" ) @@ -32,12 +33,15 @@ func (s *APIHanlder) PostDeploy(c *gin.Context) { return } - if err := s.ConnectClient(); err != nil { + agentClient, err := client.NewAgent(s.log) + if err != nil { + s.log.Errorf("failed to connect agent internal error", err) s.sendResponse(c, "agent connection failed", err) return } + defer agentClient.Close() - response, err := s.client.SubmitJob( + response, err := agentClient.SubmitJob( ctx, &agentpb.JobRequest{ Operation: req.Operation, diff --git a/server/pkg/handler/handler.go b/server/pkg/handler/handler.go index cf1b72a0..6e8b4785 100644 --- a/server/pkg/handler/handler.go +++ b/server/pkg/handler/handler.go @@ -2,44 +2,23 @@ package handler import ( "net/http" - "sync" "github.com/gin-gonic/gin" "github.com/kube-tarian/kad/server/api" - "github.com/kube-tarian/kad/server/pkg/client" "github.com/kube-tarian/kad/server/pkg/logging" ) type APIHanlder struct { - log logging.Logger - client *client.Agent + log logging.Logger } -var ( - apiOnce sync.Once -) - func NewAPIHandler(log logging.Logger) (*APIHanlder, error) { return &APIHanlder{ - log: log, - client: nil, + log: log, }, nil } -func (s *APIHanlder) ConnectClient() error { - var err error - apiOnce.Do(func() { - s.client, err = client.NewAgent(s.log) - if err != nil { - s.log.Errorf("failed to connect agent internal error", err) - } - }) - - return err -} - func (s *APIHanlder) Close(c *gin.Context) { - s.client.Close() } func (ah *APIHanlder) GetApiDocs(c *gin.Context) {