Skip to content

Commit

Permalink
chore: golang, k8s and controller-runtime upgrades (#3107)
Browse files Browse the repository at this point in the history
  • Loading branch information
whynowy authored Apr 20, 2024
1 parent 82302f3 commit 8ece8fd
Show file tree
Hide file tree
Showing 44 changed files with 11,962 additions and 16,342 deletions.
20 changes: 10 additions & 10 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
- name: Setup Golang
uses: actions/setup-go@v4.1.0
uses: actions/setup-go@v5
with:
go-version: "1.20"
go-version: "1.21"
- name: Restore go build cache
uses: actions/cache@v4
with:
Expand All @@ -34,9 +34,9 @@ jobs:
- name: Install protoc
run: |
set -eux -o pipefail
PROTOC_VERSION=3.11.1
PROTOC_VERSION=3.19.4
PROTOC_ZIP=protoc-$PROTOC_VERSION-linux-x86_64.zip
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v3.11.1/$PROTOC_ZIP
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v$PROTOC_VERSION/$PROTOC_ZIP
sudo unzip -o $PROTOC_ZIP -d /usr/local bin/protoc
sudo unzip -o $PROTOC_ZIP -d /usr/local 'include/*'
sudo chmod +x /usr/local/bin/protoc
Expand Down Expand Up @@ -68,9 +68,9 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
- name: Setup Golang
uses: actions/setup-go@v4.1.0
uses: actions/setup-go@v5
with:
go-version: "1.20"
go-version: "1.21"
- name: Restore go build cache
uses: actions/cache@v4
with:
Expand All @@ -95,9 +95,9 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
- name: Setup Golang
uses: actions/setup-go@v4.1.0
uses: actions/setup-go@v5
with:
go-version: "1.20"
go-version: "1.21"
- name: Restore go build cache
uses: actions/cache@v4
with:
Expand Down Expand Up @@ -128,9 +128,9 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
- name: Setup Golang
uses: actions/setup-go@v4.1.0
uses: actions/setup-go@v5
with:
go-version: "1.20"
go-version: "1.21"
- name: Restore go build cache
uses: actions/cache@v4
with:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/gh-pages.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5.1.0
uses: actions/setup-python@v5
with:
python-version: 3.9
- name: Setup Golang
uses: actions/setup-go@v4.1.0
uses: actions/setup-go@v5
with:
go-version: '1.20'
go-version: '1.21'
- name: build
run: |
pip install mkdocs==1.3.0 mkdocs_material==8.2.9
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ jobs:
uses: actions/checkout@v4

- name: Setup Go
uses: actions/setup-go@v4.1.0
uses: actions/setup-go@v5
with:
go-version: "1.20"
go-version: "1.21"

- name: Build binaries
run: |
Expand Down Expand Up @@ -93,9 +93,9 @@ jobs:
else
echo "VERSION=${GITHUB_REF##*/}" >> $GITHUB_ENV
fi
- uses: actions/setup-go@v4.1.0
- uses: actions/setup-go@v5
with:
go-version: "1.20"
go-version: "1.21"
- uses: actions/checkout@v4
- run: go install sigs.k8s.io/bom/cmd/bom@v0.2.0
- run: go install github.com/spdx/spdx-sbom-generator/cmd/generator@v0.0.13
Expand Down
4 changes: 2 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ run:
linters:
enable:
- deadcode
- depguard
# - depguard
- dogsled
- goconst
- gocritic
Expand All @@ -23,7 +23,7 @@ linters:
- nakedret
- rowserrcheck
- staticcheck
- structcheck
# - structcheck
- typecheck
- unconvert
- unused
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,12 @@ start: image
kubectl -n argo-events wait --for=condition=Ready --timeout 60s pod --all

$(GOPATH)/bin/golangci-lint:
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b `go env GOPATH`/bin v1.52.1
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b `go env GOPATH`/bin v1.54.1

.PHONY: lint
lint: $(GOPATH)/bin/golangci-lint
go mod tidy
golangci-lint run --fix --verbose --concurrency 4 --timeout 10m
golangci-lint run --fix --verbose --concurrency 4 --timeout 5m --enable goimports

# release - targets only available on release branch
ifneq ($(findstring release,$(GIT_BRANCH)),)
Expand Down
13,061 changes: 5,432 additions & 7,629 deletions api/jsonschema/schema.json

Large diffs are not rendered by default.

12,247 changes: 5,025 additions & 7,222 deletions api/openapi-spec/swagger.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions common/string_keyed_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func (sm *StringKeyedMap[T]) Store(key string, item T) {
func (sm *StringKeyedMap[T]) Load(key string) (T, bool) {
sm.lock.RLock()
defer sm.lock.RUnlock()
ok, item := sm.items[key]
return ok, item
item, ok := sm.items[key]
return item, ok
}

func (sm *StringKeyedMap[T]) Delete(key string) {
Expand Down
2 changes: 1 addition & 1 deletion common/viper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ limitations under the License.
package common

import (
"log/slog"
"os"

"github.com/spf13/viper"
"golang.org/x/exp/slog"
)

func ViperWithLogging() *viper.Viper {
Expand Down
82 changes: 51 additions & 31 deletions controllers/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand All @@ -25,6 +26,7 @@ import (
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
eventsourcev1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"
sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

const (
Expand Down Expand Up @@ -57,11 +59,17 @@ func Start(eventsOpts ArgoEventsControllerOpts) {
logger.Fatalf("required environment variable '%s' not defined", imageEnvVar)
}
opts := ctrl.Options{
MetricsBindAddress: fmt.Sprintf(":%d", eventsOpts.MetricsPort),
Metrics: metricsserver.Options{
BindAddress: fmt.Sprintf(":%d", eventsOpts.MetricsPort),
},
HealthProbeBindAddress: fmt.Sprintf(":%d", eventsOpts.HealthPort),
}
if eventsOpts.Namespaced {
opts.Namespace = eventsOpts.ManagedNamespace
opts.Cache = cache.Options{
DefaultNamespaces: map[string]cache.Config{
eventsOpts.ManagedNamespace: {},
},
}
}
if eventsOpts.LeaderElection {
opts.LeaderElection = true
Expand All @@ -70,115 +78,127 @@ func Start(eventsOpts ArgoEventsControllerOpts) {
restConfig := ctrl.GetConfigOrDie()
mgr, err := ctrl.NewManager(restConfig, opts)
if err != nil {
logger.Fatalw("unable to get a controller-runtime manager", zap.Error(err))
logger.Fatalw("Unable to get a controller-runtime manager", zap.Error(err))
}
kubeClient := kubernetes.NewForConfigOrDie(restConfig)

// Readyness probe
if err := mgr.AddReadyzCheck("readiness", healthz.Ping); err != nil {
logger.Fatalw("unable add a readiness check", zap.Error(err))
logger.Fatalw("Unable add a readiness check", zap.Error(err))
}

// Liveness probe
if err := mgr.AddHealthzCheck("liveness", healthz.Ping); err != nil {
logger.Fatalw("unable add a health check", zap.Error(err))
logger.Fatalw("Unable add a health check", zap.Error(err))
}

if err := eventbusv1alpha1.AddToScheme(mgr.GetScheme()); err != nil {
logger.Fatalw("unable to add scheme", zap.Error(err))
logger.Fatalw("Unable to add scheme", zap.Error(err))
}

if err := eventsourcev1alpha1.AddToScheme(mgr.GetScheme()); err != nil {
logger.Fatalw("unable to add EventSource scheme", zap.Error(err))
logger.Fatalw("Unable to add EventSource scheme", zap.Error(err))
}

if err := sensorv1alpha1.AddToScheme(mgr.GetScheme()); err != nil {
logger.Fatalw("unable to add Sensor scheme", zap.Error(err))
logger.Fatalw("Unable to add Sensor scheme", zap.Error(err))
}

// EventBus controller
eventBusController, err := controller.New(eventbus.ControllerName, mgr, controller.Options{
Reconciler: eventbus.NewReconciler(mgr.GetClient(), kubeClient, mgr.GetScheme(), config, logger),
})
if err != nil {
logger.Fatalw("unable to set up EventBus controller", zap.Error(err))
logger.Fatalw("Unable to set up EventBus controller", zap.Error(err))
}

// Watch EventBus and enqueue EventBus object key
if err := eventBusController.Watch(&source.Kind{Type: &eventbusv1alpha1.EventBus{}}, &handler.EnqueueRequestForObject{},
if err := eventBusController.Watch(source.Kind(mgr.GetCache(), &eventbusv1alpha1.EventBus{}), &handler.EnqueueRequestForObject{},
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.LabelChangedPredicate{},
)); err != nil {
logger.Fatalw("unable to watch EventBus", zap.Error(err))
logger.Fatalw("Unable to watch EventBus", zap.Error(err))
}

// Watch ConfigMaps and enqueue owning EventBus key
if err := eventBusController.Watch(&source.Kind{Type: &corev1.ConfigMap{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventbusv1alpha1.EventBus{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch ConfigMaps", zap.Error(err))
if err := eventBusController.Watch(source.Kind(mgr.GetCache(), &corev1.ConfigMap{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &eventbusv1alpha1.EventBus{}, handler.OnlyControllerOwner()),
predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("Unable to watch ConfigMaps", zap.Error(err))
}

// Watch StatefulSets and enqueue owning EventBus key
if err := eventBusController.Watch(&source.Kind{Type: &appv1.StatefulSet{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventbusv1alpha1.EventBus{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch StatefulSets", zap.Error(err))
if err := eventBusController.Watch(source.Kind(mgr.GetCache(), &appv1.StatefulSet{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &eventbusv1alpha1.EventBus{}, handler.OnlyControllerOwner()),
predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("Unable to watch StatefulSets", zap.Error(err))
}

// Watch Services and enqueue owning EventBus key
if err := eventBusController.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventbusv1alpha1.EventBus{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch Services", zap.Error(err))
if err := eventBusController.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &eventbusv1alpha1.EventBus{}, handler.OnlyControllerOwner()),
predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("Unable to watch Services", zap.Error(err))
}

// EventSource controller
eventSourceController, err := controller.New(eventsource.ControllerName, mgr, controller.Options{
Reconciler: eventsource.NewReconciler(mgr.GetClient(), mgr.GetScheme(), imageName, logger),
})
if err != nil {
logger.Fatalw("unable to set up EventSource controller", zap.Error(err))
logger.Fatalw("Unable to set up EventSource controller", zap.Error(err))
}

// Watch EventSource and enqueue EventSource object key
if err := eventSourceController.Watch(&source.Kind{Type: &eventsourcev1alpha1.EventSource{}}, &handler.EnqueueRequestForObject{},
if err := eventSourceController.Watch(source.Kind(mgr.GetCache(), &eventsourcev1alpha1.EventSource{}), &handler.EnqueueRequestForObject{},
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.LabelChangedPredicate{},
)); err != nil {
logger.Fatalw("unable to watch EventSources", zap.Error(err))
logger.Fatalw("Unable to watch EventSources", zap.Error(err))
}

// Watch Deployments and enqueue owning EventSource key
if err := eventSourceController.Watch(&source.Kind{Type: &appv1.Deployment{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventsourcev1alpha1.EventSource{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch Deployments", zap.Error(err))
if err := eventSourceController.Watch(source.Kind(mgr.GetCache(), &appv1.Deployment{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &eventsourcev1alpha1.EventSource{}, handler.OnlyControllerOwner()),
predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("Unable to watch Deployments", zap.Error(err))
}

// Watch Services and enqueue owning EventSource key
if err := eventSourceController.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventsourcev1alpha1.EventSource{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch Services", zap.Error(err))
if err := eventSourceController.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &eventsourcev1alpha1.EventSource{}, handler.OnlyControllerOwner()),
predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("Unable to watch Services", zap.Error(err))
}

// Sensor controller
sensorController, err := controller.New(sensor.ControllerName, mgr, controller.Options{
Reconciler: sensor.NewReconciler(mgr.GetClient(), mgr.GetScheme(), imageName, logger),
})
if err != nil {
logger.Fatalw("unable to set up Sensor controller", zap.Error(err))
logger.Fatalw("Unable to set up Sensor controller", zap.Error(err))
}

// Watch Sensor and enqueue Sensor object key
if err := sensorController.Watch(&source.Kind{Type: &sensorv1alpha1.Sensor{}}, &handler.EnqueueRequestForObject{},
if err := sensorController.Watch(source.Kind(mgr.GetCache(), &sensorv1alpha1.Sensor{}), &handler.EnqueueRequestForObject{},
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.LabelChangedPredicate{},
)); err != nil {
logger.Fatalw("unable to watch Sensors", zap.Error(err))
logger.Fatalw("Unable to watch Sensors", zap.Error(err))
}

// Watch Deployments and enqueue owning Sensor key
if err := sensorController.Watch(&source.Kind{Type: &appv1.Deployment{}}, &handler.EnqueueRequestForOwner{OwnerType: &sensorv1alpha1.Sensor{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch Deployments", zap.Error(err))
if err := sensorController.Watch(source.Kind(mgr.GetCache(), &appv1.Deployment{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &sensorv1alpha1.Sensor{}, handler.OnlyControllerOwner()),
predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("Unable to watch Deployments", zap.Error(err))
}

logger.Infow("starting controller manager", "version", argoevents.GetVersion())
logger.Infow("Starting controller manager", "version", argoevents.GetVersion())
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
logger.Fatalw("unable to run eventbus controller", zap.Error(err))
logger.Fatalw("Unable to start controller manager", zap.Error(err))
}
}
2 changes: 1 addition & 1 deletion controllers/eventbus/installer/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (r *jetStreamInstaller) buildStatefulSetSpec(jsVersion *controllers.JetStre
},
VolumeMode: &volMode,
StorageClassName: js.Persistence.StorageClassName,
Resources: corev1.ResourceRequirements{
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: volSize,
},
Expand Down
2 changes: 1 addition & 1 deletion controllers/eventbus/installer/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ func (i *natsInstaller) buildStatefulSetSpec(serviceName, configmapName, authSec
},
VolumeMode: &volMode,
StorageClassName: i.eventBus.Spec.NATS.Native.Persistence.StorageClassName,
Resources: corev1.ResourceRequirements{
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: volSize,
},
Expand Down
4 changes: 2 additions & 2 deletions controllers/eventbus/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestValidate(t *testing.T) {

t.Run("test js eventbus replica", func(t *testing.T) {
eb := testJetStreamEventBus.DeepCopy()
eb.Spec.JetStream.Replicas = pointer.Int32(3)
eb.Spec.JetStream.Replicas = ptr.To[int32](3)
err := ValidateEventBus(eb)
assert.NoError(t, err)
eb.Spec.JetStream.Replicas = nil
Expand Down
4 changes: 3 additions & 1 deletion eventsources/sources/resource/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}

sharedInformer := informer.Informer()
sharedInformer.AddEventHandler(handlerFuncs)
if _, err := sharedInformer.AddEventHandler(handlerFuncs); err != nil {
return fmt.Errorf("failed to add event handler, %w", err)
}

doneCh := make(chan struct{})

Expand Down
Loading

0 comments on commit 8ece8fd

Please sign in to comment.