Skip to content

Commit

Permalink
fix(watcher): Don't reimplement Kueue informer factory
Browse files Browse the repository at this point in the history
  • Loading branch information
AdrianoKF committed Jul 17, 2024
1 parent 76c5f58 commit b7822d7
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 19 deletions.
5 changes: 4 additions & 1 deletion watcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
kueueev "sigs.k8s.io/kueue/client-go/informers/externalversions"
kueueutil "sigs.k8s.io/kueue/cmd/experimental/kjobctl/pkg/cmd/util"
)

Expand Down Expand Up @@ -155,7 +156,9 @@ func handleUpdate(obj, newObj interface{}) {
func watchWorkloads(namespace string) {
log.Info("Watching Kueue Workloads in namespace ", namespace)

workloadInformer := util.NewKueueWorkloadInformer(kueueClient, 10*time.Minute)
factory := kueueev.NewSharedInformerFactoryWithOptions(kueueClient, 10*time.Minute, kueueev.WithNamespace(namespace))

workloadInformer := factory.Kueue().V1beta1().Workloads().Informer()
workloadInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: handleUpdate,
Expand Down
18 changes: 0 additions & 18 deletions watcher/pkg/util/kueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,12 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
kueueversioned "sigs.k8s.io/kueue/client-go/clientset/versioned"
)

func NewKueueWorkloadInformer(client kueueversioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.KueueV1beta1().Workloads("").List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.KueueV1beta1().Workloads("").Watch(context.TODO(), options)
},
},
&kueue.Workload{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
}

// Find a Kueue Workload by its UID, optionally filtered by namespace
func WorkloadByUid(kueueClient kueueversioned.Interface, uid types.UID, namespace string) (*kueue.Workload, error) {
list, err := kueueClient.KueueV1beta1().Workloads(namespace).List(context.TODO(), metav1.ListOptions{})
Expand Down

0 comments on commit b7822d7

Please sign in to comment.