Skip to content

Commit

Permalink
Merge pull request #367 from kube-tarian/sync_tekton
Browse files Browse the repository at this point in the history
Sync Tekton resources
  • Loading branch information
vramk23 authored Jan 13, 2024
2 parents 51cfe1b + 08cd427 commit 29ae1ac
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 4 deletions.
13 changes: 11 additions & 2 deletions capten/agent/internal/api/plugin_tekton_pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,23 @@ func (a *Agent) CreateTektonPipelines(ctx context.Context, request *captenplugin
}, nil
}

tektonAvailable, err := a.as.GetTektonProjectForID(request.GitOrgId)
if err != nil {
a.log.Infof("faile to get git project %s, %v", request.GitOrgId, err)
return &captenpluginspb.CreateTektonPipelinesResponse{
Status: captenpluginspb.StatusCode_INVALID_ARGUMENT,
StatusMessage: "request validation failed",
}, nil
}

a.log.Infof("Add Create Tekton Pipeline registry %s request received", request.PipelineName)

id := uuid.New()

TektonPipeline := model.TektonPipeline{
Id: id.String(),
PipelineName: request.PipelineName,
GitProjectId: request.GitOrgId,
GitProjectId: tektonAvailable.GitProjectId,
ContainerRegId: request.ContainerRegistryIds,
}
if err := a.as.UpsertTektonPipelines(&TektonPipeline); err != nil {
Expand All @@ -47,7 +56,7 @@ func (a *Agent) CreateTektonPipelines(ctx context.Context, request *captenplugin
}, nil
}

err := a.configureTektonPipelinesGitRepo(&TektonPipeline, model.TektonPipelineCreate)
err = a.configureTektonPipelinesGitRepo(&TektonPipeline, model.TektonPipelineCreate)
if err != nil {
TektonPipeline.Status = string(model.TektonPipelineConfigurationFailed)
TektonPipeline.WorkflowId = "NA"
Expand Down
17 changes: 17 additions & 0 deletions capten/agent/internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/kube-tarian/kad/capten/agent/internal/pb/agentpb"
"github.com/kube-tarian/kad/capten/agent/internal/pb/captenpluginspb"
"github.com/kube-tarian/kad/capten/agent/internal/pb/captensdkpb"
"github.com/kube-tarian/kad/capten/agent/internal/tekton"
"github.com/kube-tarian/kad/capten/agent/internal/util"
dbinit "github.com/kube-tarian/kad/capten/common-pkg/cassandra/db-init"
dbmigrate "github.com/kube-tarian/kad/capten/common-pkg/cassandra/db-migrate"
Expand Down Expand Up @@ -129,6 +130,17 @@ func initializeJobScheduler(cfg *config.SericeConfig, as *captenstore.Store) (*j
}
}

if cfg.TektonSyncJobEnabled {
cs, err := job.NewTektonResourcesSync(log, cfg.TektonSyncJobInterval, as)
if err != nil {
log.Fatal("failed to init tekton resources sync job", err)
}
err = s.AddJob("tekton-resources-synch", cs)
if err != nil {
log.Fatal("failed to add tekton resources sync job", err)
}
}

log.Info("successfully initialized job scheduler")
return s, nil
}
Expand All @@ -137,5 +149,10 @@ func registerK8SWatcher(dbStore *captenstore.Store) error {
if err := crossplane.RegisterK8SWatcher(log, dbStore); err != nil {
return err
}

if err := tekton.RegisterK8SWatcher(log, dbStore); err != nil {
return err
}

return nil
}
2 changes: 2 additions & 0 deletions capten/agent/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type SericeConfig struct {
AuthEnabled bool `envconfig:"AUTH_ENABLED" default:"false"`
CrossplaneSyncJobEnabled bool `envconfig:"CROSSPLANE_SYNC_JOB_ENABLED" default:"true"`
CrossplaneSyncJobInterval string `envconfig:"CROSSPLANE_SYNC_JOB_INTERVAL" default:"@every 1h"`
TektonSyncJobEnabled bool `envconfig:"TEKTON_SYNC_JOB_ENABLED" default:"true"`
TektonSyncJobInterval string `envconfig:"TEKTON_SYNC_JOB_INTERVAL" default:"@every 1h"`
DomainName string `envconfig:"DOMAIN_NAME" default:"example.com"`
}

Expand Down
36 changes: 36 additions & 0 deletions capten/agent/internal/job/tekton_resources_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package job

import (
"github.com/intelops/go-common/logging"
captenstore "github.com/kube-tarian/kad/capten/agent/internal/capten-store"
"github.com/kube-tarian/kad/capten/agent/internal/tekton"
)

type TektonResourcesSync struct {
dbStore *captenstore.Store
eventlistener *tekton.TektonPipelineSyncHandler
log logging.Logger
frequency string
}

func NewTektonResourcesSync(log logging.Logger, frequency string, dbStore *captenstore.Store) (*TektonResourcesSync, error) {
ccObj := tekton.NewTektonPipelineSyncHandler(log, dbStore)
return &TektonResourcesSync{
log: log,
frequency: frequency,
dbStore: dbStore,
eventlistener: ccObj,
}, nil
}

func (s *TektonResourcesSync) CronSpec() string {
return s.frequency
}

func (s *TektonResourcesSync) Run() {
s.log.Debug("started Tekton resource sync job")
if err := s.eventlistener.Sync(); err != nil {
s.log.Errorf("failed to synch eventlisteneres, %v", err)
}
s.log.Debug("Tekton resource sync job completed")
}
161 changes: 161 additions & 0 deletions capten/agent/internal/tekton/tekton_pipelines.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package tekton

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/intelops/go-common/logging"
captenstore "github.com/kube-tarian/kad/capten/agent/internal/capten-store"

"github.com/kube-tarian/kad/capten/common-pkg/k8s"
"github.com/kube-tarian/kad/capten/model"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
)

var (
pgvk = schema.GroupVersionResource{Group: "triggers.tekton.dev", Version: "v1beta1", Resource: "eventlisteners"}
)

type TektonPipelineSyncHandler struct {
log logging.Logger
dbStore *captenstore.Store
}

func NewTektonPipelineSyncHandler(log logging.Logger, dbStore *captenstore.Store) *TektonPipelineSyncHandler {
return &TektonPipelineSyncHandler{log: log, dbStore: dbStore}
}

func registerK8STektonPipelineSync(log logging.Logger, dbStore *captenstore.Store, dynamicClient dynamic.Interface) error {
return k8s.RegisterDynamicInformers(NewTektonPipelineSyncHandler(log, dbStore), dynamicClient, pgvk)
}

func getEventListenerObj(obj any) (*model.EventListener, error) {
clusterClaimByte, err := json.Marshal(obj)
if err != nil {
return nil, err
}

var clObj model.EventListener
err = json.Unmarshal(clusterClaimByte, &clObj)
if err != nil {
return nil, err
}

return &clObj, nil
}

func (h *TektonPipelineSyncHandler) OnAdd(obj interface{}) {
h.log.Info("TektonPipeline Add Callback")
newCcObj, err := getEventListenerObj(obj)
if newCcObj == nil {
h.log.Errorf("failed to read TektonPipeline object, %v", err)
return
}

if err := h.updateTektonPipelines([]model.EventListener{*newCcObj}); err != nil {
h.log.Errorf("failed to update TektonPipeline object, %v", err)
return
}
}

func (h *TektonPipelineSyncHandler) OnUpdate(oldObj, newObj interface{}) {
h.log.Info("TektonPipeline Update Callback")
prevObj, err := getEventListenerObj(oldObj)
if prevObj == nil {
h.log.Errorf("failed to read TektonPipeline old object %v", err)
return
}

newCcObj, err := getEventListenerObj(oldObj)
if newCcObj == nil {
h.log.Errorf("failed to read TektonPipeline new object %v", err)
return
}

if err := h.updateTektonPipelines([]model.EventListener{*newCcObj}); err != nil {
h.log.Errorf("failed to update TektonPipeline object, %v", err)
return
}
}

func (h *TektonPipelineSyncHandler) OnDelete(obj interface{}) {
h.log.Info("TektonPipeline Delete Callback")
}

func (h *TektonPipelineSyncHandler) Sync() error {
h.log.Debug("started to sync TektonPipeline resources")

k8sclient, err := k8s.NewK8SClient(h.log)
if err != nil {
return fmt.Errorf("failed to initalize k8s client: %v", err)
}

objList, err := k8sclient.DynamicClient.ListAllNamespaceResource(context.TODO(), pgvk)
if err != nil {
return fmt.Errorf("failed to fetch pipelines resources, %v", err)
}

pipelines, err := json.Marshal(objList)
if err != nil {
return fmt.Errorf("failed to marshall the data, %v", err)
}

var pipelineObj model.EventListeners
err = json.Unmarshal(pipelines, &pipelineObj)
if err != nil {
return fmt.Errorf("failed to un-marshall the data, %s", err)
}

if err = h.updateTektonPipelines(pipelineObj.Items); err != nil {
return fmt.Errorf("failed to update TektonPipeline in DB, %v", err)
}
h.log.Debug("TektonPipeline resources synched")
return nil
}

func (h *TektonPipelineSyncHandler) updateTektonPipelines(k8spipelines []model.EventListener) error {
dbpipelines, err := h.dbStore.GetTektonPipeliness()
if err != nil {
return fmt.Errorf("failed to get TektonPipeline pipelines, %v", err)
}

dbpipelineMap := make(map[string]*model.TektonPipeline)
for _, dbpipeline := range dbpipelines {
dbpipelineMap[dbpipeline.PipelineName] = dbpipeline
}

for _, k8spipeline := range k8spipelines {
h.log.Infof("processing TektonPipeline %s", k8spipeline.Name)
for _, pipelineStatus := range k8spipeline.Status.Conditions {
if pipelineStatus.Type != "Ready" {
continue
}

dbpipeline, ok := dbpipelineMap[k8spipeline.Name]
if !ok {
h.log.Infof("TektonPipeline name %s is not found in the db, skipping the update", k8spipeline.Name)
continue
}

status := model.TektonPipelineNotReady
if strings.EqualFold(string(pipelineStatus.Status), "true") {
status = model.TektonPipelineReady
}

dbpipeline.Status = string(status)

v, _ := json.Marshal(dbpipeline)
fmt.Println("TektonPipeline ===>" + string(v))

if err := h.dbStore.UpsertTektonPipelines(dbpipeline); err != nil {
h.log.Errorf("failed to update TektonPipeline %s details in db, %v", k8spipeline.Name, err)
continue
}
h.log.Infof("updated the TektonPipeline eventlistener %s", k8spipeline.Name)
}
}
return nil
}
22 changes: 22 additions & 0 deletions capten/agent/internal/tekton/watchers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package tekton

import (
"fmt"

"github.com/intelops/go-common/logging"
captenstore "github.com/kube-tarian/kad/capten/agent/internal/capten-store"
"github.com/kube-tarian/kad/capten/common-pkg/k8s"
)

func RegisterK8SWatcher(log logging.Logger, dbStore *captenstore.Store) error {
k8sclient, err := k8s.NewK8SClient(log)
if err != nil {
return fmt.Errorf("failed to initalize k8s client: %v", err)
}

err = registerK8STektonPipelineSync(log, dbStore, k8sclient.DynamicClientInterface)
if err != nil {
return fmt.Errorf("failed to RegisterK8STektonPipelineSync: %v", err)
}
return nil
}
4 changes: 2 additions & 2 deletions capten/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/gocql/gocql v1.3.1
github.com/gogo/status v1.1.1
github.com/golang-migrate/migrate/v4 v4.15.2
github.com/golang/protobuf v1.5.3
github.com/google/uuid v1.3.0
github.com/hashicorp/go-multierror v1.1.1
github.com/intelops/go-common v1.0.15
Expand All @@ -20,7 +21,6 @@ require (
go.temporal.io/sdk v1.19.0
go.uber.org/atomic v1.9.0
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.31.0
gopkg.in/yaml.v2 v2.4.0
helm.sh/helm/v3 v3.10.3
k8s.io/api v0.25.2
Expand All @@ -33,8 +33,8 @@ require (

require (
github.com/cloudflare/circl v1.3.3 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)

require (
Expand Down
21 changes: 21 additions & 0 deletions capten/model/tekton_types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package model

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type TektonProjectStatus string

const (
Expand Down Expand Up @@ -55,3 +59,20 @@ type TektonPipeline struct {
WorkflowId string `json:"workflow_id,omitempty"`
WorkflowStatus string `json:"workflow_status,omitempty"`
}

type EventListenerStatus struct {
ConditionedStatus `json:",inline"`
}

type EventListener struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Status EventListenerStatus `json:"status,omitempty"`
}

type EventListeners struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []EventListener `json:"items"`
}

0 comments on commit 29ae1ac

Please sign in to comment.