From 9f1c9524abd936351a0e8a904eee124331c7cf0c Mon Sep 17 00:00:00 2001 From: udhos Date: Tue, 14 Nov 2023 19:12:03 -0300 Subject: [PATCH] Options to customize logging. --- examples/kubegroup-example/groupcache.go | 1 + kubegroup/action_test.go | 20 +++--- kubegroup/cache.go | 78 +++++++++++++++++++----- kubegroup/kube.go | 74 +++++++++++----------- 4 files changed, 112 insertions(+), 61 deletions(-) diff --git a/examples/kubegroup-example/groupcache.go b/examples/kubegroup-example/groupcache.go index cb3b337..669ca54 100644 --- a/examples/kubegroup-example/groupcache.go +++ b/examples/kubegroup-example/groupcache.go @@ -55,6 +55,7 @@ func startGroupcache(app *application) { GroupCachePort: app.groupCachePort, //PodLabelKey: "app", // default is "app" //PodLabelValue: "my-app-name", // default is current PODs label value for label key + Debug: true, } go kubegroup.UpdatePeers(options) diff --git a/kubegroup/action_test.go b/kubegroup/action_test.go index a7d5880..6b88215 100644 --- a/kubegroup/action_test.go +++ b/kubegroup/action_test.go @@ -123,6 +123,10 @@ var testTableAction = []testCaseAction{ }, } +func testOptions() Options { + return defaultOptions(Options{Debug: true}) +} + func TestAction(t *testing.T) { for _, data := range testTableAction { @@ -144,7 +148,7 @@ func TestAction(t *testing.T) { Object: &pod, } - result, ok := action(data.table, event, data.myPodName) + result, ok := action(data.table, event, data.myPodName, testOptions()) if ok != data.expectResult { t.Errorf("%s: wrong result: expected=%t got=%t", data.name, data.expectResult, ok) } @@ -181,7 +185,7 @@ func TestActionNonDeleteWithReady(t *testing.T) { Object: &pod, } - result, ok := action(table, event, "this-pod") + result, ok := action(table, event, "this-pod", testOptions()) if !ok { t.Errorf("unexpected not ok action") } @@ -219,7 +223,7 @@ func TestActionNonDeleteWithNotReady(t *testing.T) { Object: &pod, } - result, ok := action(table, event, "this-pod") + result, ok := action(table, event, "this-pod", testOptions()) if !ok { t.Errorf("unexpected not ok action") } @@ -257,7 +261,7 @@ func TestActionDeleteWithReady(t *testing.T) { Object: &pod, } - result, ok := action(table, event, "this-pod") + result, ok := action(table, event, "this-pod", testOptions()) if !ok { t.Errorf("unexpected not ok action") } @@ -296,7 +300,7 @@ func TestActionDeleteWithNotReady(t *testing.T) { Object: &pod, } - result, ok := action(table, event, "this-pod") + result, ok := action(table, event, "this-pod", testOptions()) if !ok { t.Errorf("unexpected not ok action") } @@ -330,7 +334,7 @@ func TestActionMyPodName(t *testing.T) { Object: &pod, } - _, ok := action(table, event, "this-pod") + _, ok := action(table, event, "this-pod", testOptions()) if ok { t.Errorf("unexpected ok action") } @@ -356,7 +360,7 @@ func TestActionMissingAddr(t *testing.T) { Object: &pod, } - _, ok := action(table, event, "this-pod") + _, ok := action(table, event, "this-pod", testOptions()) if ok { t.Errorf("unexpected ok action") } @@ -384,7 +388,7 @@ func TestActionMissingAddrSolvedFromTable(t *testing.T) { Object: &pod, } - result, ok := action(table, event, "this-pod") + result, ok := action(table, event, "this-pod", testOptions()) if !ok { t.Errorf("unexpected not ok action") } diff --git a/kubegroup/cache.go b/kubegroup/cache.go index 6a25241..3d4a405 100644 --- a/kubegroup/cache.go +++ b/kubegroup/cache.go @@ -63,20 +63,71 @@ type Options struct { // to "my-app-name" or leave it empty (since by default PodLabelValue takes its value // from the PodLabelKey key). PodLabelValue string + + // Cooldown sets interval between retries. If unspecified defaults to 5 seconds. + Cooldown time.Duration + + // Debug enables non-error logging. Errors are always logged. + Debug bool + + // Debugf optionally sets custom logging stream for debug messages. + Debugf func(format string, v ...any) + + // Errorf optionally sets custom logging stream for error messages. + Errorf func(format string, v ...any) + + // Fatalf optionally sets custom logging stream for fatal messages. It must terminate/abort the program. + Fatalf func(format string, v ...any) +} + +func debugf(format string, v ...any) { + log.Printf("DEBUG: "+format, v...) +} + +func errorf(format string, v ...any) { + log.Printf("ERROR: "+format, v...) +} + +func fatalf(format string, v ...any) { + log.Fatalf("FATAL: "+format, v...) +} + +func defaultOptions(options Options) Options { + if options.Cooldown == 0 { + options.Cooldown = 5 * time.Second + } + if options.Debugf == nil { + options.Debugf = func(format string, v ...any) { + if options.Debug { + debugf(format, v...) + } + } + } + if options.Errorf == nil { + options.Errorf = errorf + } + if options.Fatalf == nil { + options.Fatalf = fatalf + } + return options } // UpdatePeers continuously updates groupcache peers. // groupcachePort example: ":5000". func UpdatePeers(options Options) { - kc, errClient := newKubeClient(options.PodLabelKey, options.PodLabelValue) + const me = "UpdatePeers" + + options = defaultOptions(options) + + kc, errClient := newKubeClient(options) if errClient != nil { - log.Fatalf("updatePeers: kube client: %v", errClient) + options.Fatalf("%s: kube client: %v", me, errClient) } addresses, errList := kc.listPodsAddresses() if errList != nil { - log.Fatalf("updatePeers: list addresses: %v", errList) + options.Fatalf("%s: list addresses: %v", me, errList) } var myAddr string @@ -85,12 +136,11 @@ func UpdatePeers(options Options) { var errAddr error myAddr, errAddr = findMyAddr() if errAddr != nil { - log.Printf("updatePeers: %v", errAddr) + options.Errorf("%s: %v", me, errAddr) } if myAddr == "" { - const cooldown = 5 * time.Second - log.Printf("updatePeers: could not find my address, sleeping %v", cooldown) - time.Sleep(cooldown) + options.Errorf("%s: could not find my address, sleeping %v", me, options.Cooldown) + time.Sleep(options.Cooldown) } } @@ -104,7 +154,7 @@ func UpdatePeers(options Options) { } keys := maps.Keys(peers) - log.Printf("updatePeers: initial peers: %v", keys) + options.Debugf("%s: initial peers: %v", me, keys) options.Pool.Set(keys...) ch := make(chan podAddress) @@ -113,8 +163,8 @@ func UpdatePeers(options Options) { for n := range ch { url := buildURL(n.address, options.GroupCachePort) - log.Printf("updatePeers: peer=%s added=%t current peers: %v", - url, n.added, maps.Keys(peers)) + options.Debugf("%s: peer=%s added=%t current peers: %v", + me, url, n.added, maps.Keys(peers)) count := len(peers) if n.added { peers[url] = true @@ -125,17 +175,17 @@ func UpdatePeers(options Options) { continue } keys := maps.Keys(peers) - log.Printf("updatePeers: updating peers: %v", keys) + options.Debugf("%s: updating peers: %v", me, keys) options.Pool.Set(keys...) } - log.Printf("updatePeers: channel has been closed, nothing to do, exiting") + options.Errorf("%s: channel has been closed, nothing to do, exiting goroutine", me) } func watchPeers(kc kubeClient, ch chan<- podAddress) { errWatch := kc.watchPodsAddresses(ch) if errWatch != nil { - log.Fatalf("watchPeers: %v", errWatch) + kc.options.Fatalf("watchPeers: %v", errWatch) } - log.Printf("watchPeers: nothing to do, exiting") + kc.options.Errorf("watchPeers: nothing to do, exiting goroutine") } diff --git a/kubegroup/kube.go b/kubegroup/kube.go index 5bf6830..f469631 100644 --- a/kubegroup/kube.go +++ b/kubegroup/kube.go @@ -3,7 +3,6 @@ package kubegroup import ( "context" "errors" - "log" "os" "time" @@ -17,11 +16,10 @@ import ( ) type kubeClient struct { - clientset *kubernetes.Clientset - inCluster bool - podCache *podInfo - labelKey string - labelValue string + clientset *kubernetes.Clientset + inCluster bool + podCache *podInfo + options Options } type podInfo struct { @@ -30,25 +28,24 @@ type podInfo struct { listOptions metav1.ListOptions } -func newKubeClient(labelKey, labelValue string) (kubeClient, error) { +func newKubeClient(options Options) (kubeClient, error) { kc := kubeClient{ - labelKey: labelKey, - labelValue: labelValue, + options: options, } config, errConfig := rest.InClusterConfig() if errConfig != nil { - log.Printf("running OUT-OF-CLUSTER: %v", errConfig) + options.Errorf("running OUT-OF-CLUSTER: %v", errConfig) return kc, nil } - log.Printf("running IN-CLUSTER") + options.Debugf("running IN-CLUSTER") kc.inCluster = true clientset, errClientset := kubernetes.NewForConfig(config) if errClientset != nil { - log.Fatalf("kube clientset error: %v", errClientset) + options.Errorf("kube clientset error: %v", errClientset) return kc, errClientset } @@ -60,7 +57,7 @@ func newKubeClient(labelKey, labelValue string) (kubeClient, error) { func (k *kubeClient) getPodName() string { host, errHost := os.Hostname() if errHost != nil { - log.Printf("getPodName: hostname: %v", errHost) + k.options.Errorf("getPodName: hostname: %v", errHost) } return host } @@ -73,13 +70,13 @@ func (k *kubeClient) getPod() (*corev1.Pod, error) { namespace, errNs := findMyNamespace() if errNs != nil { - log.Printf("getPod: could not find pod='%s' namespace: %v", podName, errNs) + k.options.Errorf("getPod: could not find pod='%s' namespace: %v", podName, errNs) return nil, errNs } pod, errPod := k.clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) if errPod != nil { - log.Printf("getPod: could not find pod name='%s': %v", podName, errPod) + k.options.Errorf("getPod: could not find pod name='%s': %v", podName, errPod) } return pod, errPod @@ -107,7 +104,7 @@ func (k *kubeClient) getPodInfo() (*podInfo, error) { // get my pod pod, errPod := k.getPod() if errPod != nil { - log.Printf("getPodInfo: could not find pod: %v", errPod) + k.options.Errorf("getPodInfo: could not find pod: %v", errPod) return nil, errPod } @@ -117,17 +114,17 @@ func (k *kubeClient) getPodInfo() (*podInfo, error) { // get label to match peer PODs var labelKey string - if k.labelKey == "" { + if k.options.PodLabelKey == "" { labelKey = "app" // default label key } else { - labelKey = k.labelKey + labelKey = k.options.PodLabelKey } var labelValue string - if k.labelValue == "" { + if k.options.PodLabelValue == "" { labelValue = pod.ObjectMeta.Labels[labelKey] // default label value } else { - labelValue = k.labelValue + labelValue = k.options.PodLabelValue } // search other pods using label from my pod @@ -146,7 +143,7 @@ func (k *kubeClient) listPodsAddresses() ([]string, error) { table, errTable := k.getPodTable() if errTable != nil { - log.Printf("listPodsAddresses: pod table: %v", errTable) + k.options.Errorf("listPodsAddresses: pod table: %v", errTable) return nil, errTable } @@ -164,7 +161,7 @@ func (k *kubeClient) getPodTable() (map[string]string, error) { } addr, errAddr := findMyAddr() if errAddr != nil { - log.Printf("getPodTable: %v", errAddr) + k.options.Errorf("getPodTable: %v", errAddr) } if addr == "" { return nil, errors.New("getPodTable: out-of-cluster: missing pod address") @@ -175,13 +172,13 @@ func (k *kubeClient) getPodTable() (map[string]string, error) { podInfo, errInfo := k.getPodInfo() if errInfo != nil { - log.Printf("getPodTable: pod info: %v", errInfo) + k.options.Errorf("getPodTable: pod info: %v", errInfo) return nil, errInfo } pods, errList := k.clientset.CoreV1().Pods(podInfo.namespace).List(context.TODO(), podInfo.listOptions) if errList != nil { - log.Printf("getPodTable: list pods: %v", errList) + k.options.Errorf("getPodTable: list pods: %v", errList) return nil, errList } @@ -208,27 +205,26 @@ func (k *kubeClient) watchPodsAddresses(out chan<- podAddress) error { // some going down events don't report pod address, so we retrieve addr from a local table table, errTable := k.getPodTable() if errTable != nil { - log.Printf("watchPodsAddresses: table: %v", errTable) + k.options.Errorf("watchPodsAddresses: table: %v", errTable) return errTable } - log.Printf("watchPodsAddresses: initial table: %v", table) + k.options.Debugf("watchPodsAddresses: initial table: %v", table) podInfo, errInfo := k.getPodInfo() if errInfo != nil { - log.Printf("watchPodsAddresses: pod info: %v", errInfo) + k.options.Errorf("watchPodsAddresses: pod info: %v", errInfo) return errInfo } - const cooldown = 5 * time.Second for { errWatch := k.watchOnce(out, podInfo, table) - log.Printf("watchPodsAddresses: %v", errWatch) + k.options.Errorf("watchPodsAddresses: %v", errWatch) if errWatch != errWatchInputChannelClose { return errWatch } - log.Printf("watchPodsAddresses: retrying in %v", cooldown) - time.Sleep(cooldown) + k.options.Errorf("watchPodsAddresses: retrying in %v", k.options.Cooldown) + time.Sleep(k.options.Cooldown) } } @@ -239,13 +235,13 @@ func (k *kubeClient) watchOnce(out chan<- podAddress, info *podInfo, table map[s watcher, errWatch := k.clientset.CoreV1().Pods(info.namespace).Watch(context.TODO(), info.listOptions) if errWatch != nil { - log.Printf("watchOnce: watch: %v", errWatch) + k.options.Errorf("watchOnce: watch: %v", errWatch) return errWatch } in := watcher.ResultChan() for event := range in { - if result, ok := action(table, event, myPodName); ok { + if result, ok := action(table, event, myPodName, k.options); ok { out <- result } } @@ -253,19 +249,19 @@ func (k *kubeClient) watchOnce(out chan<- podAddress, info *podInfo, table map[s return errWatchInputChannelClose } -func action(table map[string]string, event watch.Event, myPodName string) (podAddress, bool) { +func action(table map[string]string, event watch.Event, myPodName string, options Options) (podAddress, bool) { const me = "action" var result podAddress pod, ok := event.Object.(*corev1.Pod) if !ok { - log.Printf("%s: unexpected event object: %v", me, event.Object) + options.Errorf("%s: unexpected event object: %v", me, event.Object) return result, false } if pod == nil { - log.Printf("%s: unexpected nil pod from event object: %v", me, event.Object) + options.Errorf("%s: unexpected nil pod from event object: %v", me, event.Object) return result, false } @@ -280,7 +276,7 @@ func action(table map[string]string, event watch.Event, myPodName string) (podAd ready := isPodReady(pod) if name == myPodName { - log.Printf("%s: event=%s pod=%s addr=%s ready=%t: ignoring my own pod", + options.Debugf("%s: event=%s pod=%s addr=%s ready=%t: ignoring my own pod", me, event.Type, name, addr, ready) return result, false // ignore my own pod } @@ -291,12 +287,12 @@ func action(table map[string]string, event watch.Event, myPodName string) (podAd } if addr == "" { - log.Printf("%s: event=%s pod=%s addr=%s ready=%t: ignoring, cannot add/remove unknown address", + options.Debugf("%s: event=%s pod=%s addr=%s ready=%t: ignoring, cannot add/remove unknown address", me, event.Type, name, addr, ready) return result, false // ignore empty address } - log.Printf("%s: event=%s pod=%s addr=%s ready=%t: success: sending update", + options.Debugf("%s: event=%s pod=%s addr=%s ready=%t: success: sending update", me, event.Type, name, addr, ready) if event.Type != watch.Deleted {