Skip to content

Commit

Permalink
sync resources in the kubescape namespace (#71)
Browse files Browse the repository at this point in the history
* sync resources in the kubescape namespace

Signed-off-by: Amir Malka <amirm@armosec.io>

Signed-off-by: Amir Malka <amirm@armosec.io>
  • Loading branch information
amirmalka authored Mar 7, 2024
1 parent eafc6ce commit 06ded54
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 45 deletions.
4 changes: 2 additions & 2 deletions adapters/incluster/v1/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (a *Adapter) GetClientByKind(kind domain.Kind) adapters.Client {
if !ok {
logger.L().Error("client not found", helpers.String("kind", kind.String()))
// if client is not found, create an empty one to discard the messages from the server in callbacks if the kind is not in the list
client = NewClient(&NoOpDynamicClient{}, a.cfg.Account, a.cfg.ClusterName, config.Resource{
client = NewClient(&NoOpDynamicClient{}, a.cfg, config.Resource{
Group: kind.Group,
Version: kind.Version,
Resource: kind.Resource,
Expand Down Expand Up @@ -110,7 +110,7 @@ func (a *Adapter) Callbacks(_ context.Context) (domain.Callbacks, error) {

func (a *Adapter) Start(ctx context.Context) error {
for _, r := range a.cfg.Resources {
client := NewClient(a.k8sclient, a.cfg.Account, a.cfg.ClusterName, r)
client := NewClient(a.k8sclient, a.cfg, r)
client.RegisterCallbacks(ctx, a.callbacks)
a.clients[r.String()] = client

Expand Down
52 changes: 36 additions & 16 deletions adapters/incluster/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
jsonpatch "github.com/evanphx/json-patch"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
helpersv1 "github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers"
"github.com/kubescape/synchronizer/adapters"
"github.com/kubescape/synchronizer/config"
"github.com/kubescape/synchronizer/domain"
Expand All @@ -30,7 +31,10 @@ import (
"k8s.io/client-go/dynamic"
)

const envMultiplier = "EVENT_MULTIPLIER"
const (
envMultiplier = "EVENT_MULTIPLIER"
kubescapeCustomResourceGroup = "spdx.softwarecomposition.kubescape.io"
)

var fieldsToRemove = map[string][][]string{
"default": {},
Expand All @@ -50,6 +54,7 @@ type Client struct {
client dynamic.Interface
account string
cluster string
operatorNamespace string // the namespace where the kubescape operator is running
kind *domain.Kind
multiplier int
callbacks domain.Callbacks
Expand All @@ -61,14 +66,15 @@ type Client struct {

var errWatchClosed = errors.New("watch channel closed")

func NewClient(client dynamic.Interface, account, cluster string, r config.Resource) *Client {
func NewClient(client dynamic.Interface, cfg config.InCluster, r config.Resource) *Client {
res := schema.GroupVersionResource{Group: r.Group, Version: r.Version, Resource: r.Resource}
// get event multiplier from env, defaults to 0
multiplier, _ := strconv.Atoi(os.Getenv(envMultiplier))
return &Client{
account: account,
client: client,
cluster: cluster,
account: cfg.Account,
client: client,
operatorNamespace: cfg.Namespace,
cluster: cfg.ClusterName,
kind: &domain.Kind{
Group: res.Group,
Version: res.Version,
Expand All @@ -93,7 +99,7 @@ func (c *Client) Start(ctx context.Context) error {
// for our storage, we need to list all resources and get them one by one
// as list returns objects with empty spec
// and watch does not return existing objects
if c.res.Group == "spdx.softwarecomposition.kubescape.io" {
if c.res.Group == kubescapeCustomResourceGroup {
if err := backoff.RetryNotify(func() error {
var err error
watchOpts.ResourceVersion, err = c.getExistingStorageObjects(ctx)
Expand All @@ -118,7 +124,7 @@ func (c *Client) Start(ctx context.Context) error {
}

// skip non-standalone resources
if hasParent(d) {
if c.isFiltered(d) {
continue
}
id := domain.KindName{
Expand Down Expand Up @@ -226,18 +232,18 @@ func multiplyEvent(event watch.Event, queue *utils.CooldownQueue, multiplier int
// change the workload-name label too - if applicable
labels := newEvent.Object.(*unstructured.Unstructured).GetLabels()
if labels != nil {
if workloadName, ok := labels["kubescape.io/workload-name"]; ok {
labels["kubescape.io/workload-name"] = fmt.Sprintf("%s-%d", workloadName, i)
if workloadName, ok := labels[helpersv1.NameMetadataKey]; ok {
labels[helpersv1.NameMetadataKey] = fmt.Sprintf("%s-%d", workloadName, i)
newEvent.Object.(*unstructured.Unstructured).SetLabels(labels)
}
}

annotations := newEvent.Object.(*unstructured.Unstructured).GetAnnotations()
if annotations != nil {
if wlid, ok := annotations["kubescape.io/wlid"]; ok {
if wlid, ok := annotations[helpersv1.WlidMetadataKey]; ok {

// workloadinterface.
annotations["kubescape.io/wlid"] = fmt.Sprintf("%s-%d", wlid, i)
annotations[helpersv1.WlidMetadataKey] = fmt.Sprintf("%s-%d", wlid, i)
newEvent.Object.(*unstructured.Unstructured).SetAnnotations(annotations)
}
}
Expand All @@ -247,6 +253,20 @@ func multiplyEvent(event watch.Event, queue *utils.CooldownQueue, multiplier int
}
}

// isFiltered returns true if workload should be filtered out.
// filters out workloads that have a parent, unless they are in the kubescape-operator namespace
func (c *Client) isFiltered(workload *unstructured.Unstructured) bool {
if workload == nil {
return false
}
// workload is not filtered if it is in the kubescape-operator namespace
if c.operatorNamespace != "" && workload.GetNamespace() == c.operatorNamespace {
return false
}
// for all other workloads, we filter out those that have a parent
return hasParent(workload)
}

// hasParent returns true if workload has a parent
// based on https://github.com/kubescape/k8s-interface/blob/2855cc94bd7666b227ad9e5db5ca25cb895e6cee/k8sinterface/k8sdynamic.go#L219
func hasParent(workload *unstructured.Unstructured) bool {
Expand Down Expand Up @@ -492,8 +512,8 @@ func (c *Client) multiplyVerifyObject(ctx context.Context, id domain.KindName, o
// change the workload-name label too - if applicable
labels := obj.GetLabels()
if labels != nil {
if workloadName, ok := labels["kubescape.io/workload-name"]; ok {
labels["kubescape.io/workload-name"] = fmt.Sprintf("%s-%d", workloadName, i)
if workloadName, ok := labels[helpersv1.NameMetadataKey]; ok {
labels[helpersv1.NameMetadataKey] = fmt.Sprintf("%s-%d", workloadName, i)
obj.SetLabels(labels)
}
}
Expand Down Expand Up @@ -523,7 +543,7 @@ func (c *Client) filterAndMarshal(d *unstructured.Unstructured) ([]byte, error)
}

func (c *Client) getObjectFromUnstructured(d *unstructured.Unstructured) ([]byte, error) {
if c.res.Group == "spdx.softwarecomposition.kubescape.io" {
if c.res.Group == kubescapeCustomResourceGroup {
obj, err := c.getResource(d.GetNamespace(), d.GetName())
if err != nil {
return nil, fmt.Errorf("get resource: %w", err)
Expand Down Expand Up @@ -620,8 +640,8 @@ func reconcileBatchProcessingFunc(ctx context.Context, c *Client, items domain.B
for _, k := range clientItemsSet.Difference(serverItemsSet).ToSlice() {
item := clientItems[k]

if hasParent(&item) {
logger.L().Debug("reconciliation: resource missing in server has parent, skipping",
if c.isFiltered(&item) {
logger.L().Debug("reconciliation: resource missing in server should be filtered, skipping",
helpers.String("resource", c.kind.String()),
helpers.String("name", item.GetName()),
helpers.String("namespace", item.GetNamespace()))
Expand Down
5 changes: 4 additions & 1 deletion cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ func main() {
if clusterConfig, err := config.LoadClusterConfig(); err != nil {
logger.L().Warning("failed to load cluster config", helpers.Error(err))
} else {
logger.L().Debug("cluster config loaded", helpers.String("clusterName", clusterConfig.ClusterName))
logger.L().Debug("cluster config loaded",
helpers.String("clusterName", clusterConfig.ClusterName),
helpers.String("namespace", clusterConfig.Namespace))
cfg.InCluster.ClusterName = clusterConfig.ClusterName
cfg.InCluster.Namespace = clusterConfig.Namespace
}

// load credentials (access key & account)
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Backend struct {

type InCluster struct {
ServerUrl string `mapstructure:"serverUrl"`
Namespace string `mapstructure:"namespace"`
ClusterName string `mapstructure:"clusterName"`
Account string `mapstructure:"account"`
AccessKey string `mapstructure:"accessKey"`
Expand Down
1 change: 1 addition & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func TestLoadConfig(t *testing.T) {
Account: "11111111-2222-3333-4444-11111111",
AccessKey: "xxxxxxxx-1111-1111-1111-xxxxxxxx",
Resources: []Resource{
{Group: "", Version: "v1", Resource: "pods", Strategy: "patch"},
{Group: "", Version: "v1", Resource: "nodes", Strategy: "patch"},
{Group: "apps", Version: "v1", Resource: "deployments", Strategy: "patch"},
{Group: "apps", Version: "v1", Resource: "statefulsets", Strategy: "patch"},
Expand Down
6 changes: 6 additions & 0 deletions configuration/client/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
"account": "11111111-2222-3333-4444-11111111",
"accessKey": "xxxxxxxx-1111-1111-1111-xxxxxxxx",
"resources": [
{
"group": "",
"version": "v1",
"resource": "pods",
"strategy": "patch"
},
{
"group": "",
"version": "v1",
Expand Down
23 changes: 16 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ toolchain go1.21.5
require (
github.com/SergJa/jsonhash v0.0.0-20210531165746-fc45f346aa74
github.com/apache/pulsar-client-go v0.12.0
github.com/armosec/armoapi-go v0.0.292
github.com/armosec/utils-k8s-go v0.0.24
github.com/armosec/armoapi-go v0.0.329
github.com/armosec/utils-k8s-go v0.0.26
github.com/cenkalti/backoff/v4 v4.2.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/deckarep/golang-set/v2 v2.6.0
Expand All @@ -19,6 +19,7 @@ require (
github.com/kinbiko/jsonassert v1.1.1
github.com/kubescape/backend v0.0.14
github.com/kubescape/go-logger v0.0.22
github.com/kubescape/k8s-interface v0.0.161
github.com/kubescape/messaging v0.0.22
github.com/panjf2000/ants/v2 v2.8.2
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2
Expand Down Expand Up @@ -48,23 +49,26 @@ require (
github.com/Microsoft/hcsshim v0.11.4 // indirect
github.com/ardielle/ardielle-go v1.5.2 // indirect
github.com/armosec/gojay v1.2.17 // indirect
github.com/armosec/utils-go v0.0.56 // indirect
github.com/avast/retry-go v3.0.0+incompatible // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.8.0 // indirect
github.com/briandowns/spinner v1.23.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/containerd v1.7.11 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/coreos/go-oidc v2.2.1+incompatible // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/danieljoos/wincred v1.2.0 // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect
github.com/docker/docker v24.0.7+incompatible // indirect
github.com/docker/docker v25.0.1+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dvsekhvalnov/jose2go v1.6.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down Expand Up @@ -101,6 +105,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/patternmatcher v0.6.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/sys/user v0.1.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand All @@ -110,11 +115,11 @@ require (
github.com/olvrng/ujson v1.1.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc5 // indirect
github.com/opencontainers/runc v1.1.12 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/pquerna/cachecontrol v0.2.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
Expand All @@ -137,6 +142,7 @@ require (
github.com/uptrace/opentelemetry-go-extra/otelzap v0.2.3 // indirect
github.com/uptrace/uptrace-go v1.21.0 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect
go.opentelemetry.io/contrib/instrumentation/runtime v0.46.1 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 // indirect
Expand All @@ -150,10 +156,11 @@ require (
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
golang.org/x/oauth2 v0.15.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/term v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.15.0 // indirect
Expand All @@ -165,10 +172,12 @@ require (
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.110.1 // indirect
k8s.io/kube-openapi v0.0.0-20231113174909-778a5567bc1e // indirect
sigs.k8s.io/controller-runtime v0.15.0 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
Expand Down
Loading

0 comments on commit 06ded54

Please sign in to comment.