Skip to content

Commit

Permalink
Options to customize logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
udhos committed Nov 14, 2023
1 parent 7233297 commit 9f1c952
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 61 deletions.
1 change: 1 addition & 0 deletions examples/kubegroup-example/groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func startGroupcache(app *application) {
GroupCachePort: app.groupCachePort,
//PodLabelKey: "app", // default is "app"
//PodLabelValue: "my-app-name", // default is current PODs label value for label key
Debug: true,
}

go kubegroup.UpdatePeers(options)
Expand Down
20 changes: 12 additions & 8 deletions kubegroup/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ var testTableAction = []testCaseAction{
},
}

func testOptions() Options {
return defaultOptions(Options{Debug: true})
}

func TestAction(t *testing.T) {

for _, data := range testTableAction {
Expand All @@ -144,7 +148,7 @@ func TestAction(t *testing.T) {
Object: &pod,
}

result, ok := action(data.table, event, data.myPodName)
result, ok := action(data.table, event, data.myPodName, testOptions())
if ok != data.expectResult {
t.Errorf("%s: wrong result: expected=%t got=%t", data.name, data.expectResult, ok)
}
Expand Down Expand Up @@ -181,7 +185,7 @@ func TestActionNonDeleteWithReady(t *testing.T) {
Object: &pod,
}

result, ok := action(table, event, "this-pod")
result, ok := action(table, event, "this-pod", testOptions())
if !ok {
t.Errorf("unexpected not ok action")
}
Expand Down Expand Up @@ -219,7 +223,7 @@ func TestActionNonDeleteWithNotReady(t *testing.T) {
Object: &pod,
}

result, ok := action(table, event, "this-pod")
result, ok := action(table, event, "this-pod", testOptions())
if !ok {
t.Errorf("unexpected not ok action")
}
Expand Down Expand Up @@ -257,7 +261,7 @@ func TestActionDeleteWithReady(t *testing.T) {
Object: &pod,
}

result, ok := action(table, event, "this-pod")
result, ok := action(table, event, "this-pod", testOptions())
if !ok {
t.Errorf("unexpected not ok action")
}
Expand Down Expand Up @@ -296,7 +300,7 @@ func TestActionDeleteWithNotReady(t *testing.T) {
Object: &pod,
}

result, ok := action(table, event, "this-pod")
result, ok := action(table, event, "this-pod", testOptions())
if !ok {
t.Errorf("unexpected not ok action")
}
Expand Down Expand Up @@ -330,7 +334,7 @@ func TestActionMyPodName(t *testing.T) {
Object: &pod,
}

_, ok := action(table, event, "this-pod")
_, ok := action(table, event, "this-pod", testOptions())
if ok {
t.Errorf("unexpected ok action")
}
Expand All @@ -356,7 +360,7 @@ func TestActionMissingAddr(t *testing.T) {
Object: &pod,
}

_, ok := action(table, event, "this-pod")
_, ok := action(table, event, "this-pod", testOptions())
if ok {
t.Errorf("unexpected ok action")
}
Expand Down Expand Up @@ -384,7 +388,7 @@ func TestActionMissingAddrSolvedFromTable(t *testing.T) {
Object: &pod,
}

result, ok := action(table, event, "this-pod")
result, ok := action(table, event, "this-pod", testOptions())
if !ok {
t.Errorf("unexpected not ok action")
}
Expand Down
78 changes: 64 additions & 14 deletions kubegroup/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,71 @@ type Options struct {
// to "my-app-name" or leave it empty (since by default PodLabelValue takes its value
// from the PodLabelKey key).
PodLabelValue string

// Cooldown sets interval between retries. If unspecified defaults to 5 seconds.
Cooldown time.Duration

// Debug enables non-error logging. Errors are always logged.
Debug bool

// Debugf optionally sets custom logging stream for debug messages.
Debugf func(format string, v ...any)

// Errorf optionally sets custom logging stream for error messages.
Errorf func(format string, v ...any)

// Fatalf optionally sets custom logging stream for fatal messages. It must terminate/abort the program.
Fatalf func(format string, v ...any)
}

func debugf(format string, v ...any) {
log.Printf("DEBUG: "+format, v...)
}

func errorf(format string, v ...any) {
log.Printf("ERROR: "+format, v...)
}

func fatalf(format string, v ...any) {
log.Fatalf("FATAL: "+format, v...)
}

func defaultOptions(options Options) Options {
if options.Cooldown == 0 {
options.Cooldown = 5 * time.Second
}
if options.Debugf == nil {
options.Debugf = func(format string, v ...any) {
if options.Debug {
debugf(format, v...)
}
}
}
if options.Errorf == nil {
options.Errorf = errorf
}
if options.Fatalf == nil {
options.Fatalf = fatalf
}
return options
}

// UpdatePeers continuously updates groupcache peers.
// groupcachePort example: ":5000".
func UpdatePeers(options Options) {

kc, errClient := newKubeClient(options.PodLabelKey, options.PodLabelValue)
const me = "UpdatePeers"

options = defaultOptions(options)

kc, errClient := newKubeClient(options)
if errClient != nil {
log.Fatalf("updatePeers: kube client: %v", errClient)
options.Fatalf("%s: kube client: %v", me, errClient)
}

addresses, errList := kc.listPodsAddresses()
if errList != nil {
log.Fatalf("updatePeers: list addresses: %v", errList)
options.Fatalf("%s: list addresses: %v", me, errList)
}

var myAddr string
Expand All @@ -85,12 +136,11 @@ func UpdatePeers(options Options) {
var errAddr error
myAddr, errAddr = findMyAddr()
if errAddr != nil {
log.Printf("updatePeers: %v", errAddr)
options.Errorf("%s: %v", me, errAddr)
}
if myAddr == "" {
const cooldown = 5 * time.Second
log.Printf("updatePeers: could not find my address, sleeping %v", cooldown)
time.Sleep(cooldown)
options.Errorf("%s: could not find my address, sleeping %v", me, options.Cooldown)
time.Sleep(options.Cooldown)
}
}

Expand All @@ -104,7 +154,7 @@ func UpdatePeers(options Options) {
}

keys := maps.Keys(peers)
log.Printf("updatePeers: initial peers: %v", keys)
options.Debugf("%s: initial peers: %v", me, keys)
options.Pool.Set(keys...)

ch := make(chan podAddress)
Expand All @@ -113,8 +163,8 @@ func UpdatePeers(options Options) {

for n := range ch {
url := buildURL(n.address, options.GroupCachePort)
log.Printf("updatePeers: peer=%s added=%t current peers: %v",
url, n.added, maps.Keys(peers))
options.Debugf("%s: peer=%s added=%t current peers: %v",
me, url, n.added, maps.Keys(peers))
count := len(peers)
if n.added {
peers[url] = true
Expand All @@ -125,17 +175,17 @@ func UpdatePeers(options Options) {
continue
}
keys := maps.Keys(peers)
log.Printf("updatePeers: updating peers: %v", keys)
options.Debugf("%s: updating peers: %v", me, keys)
options.Pool.Set(keys...)
}

log.Printf("updatePeers: channel has been closed, nothing to do, exiting")
options.Errorf("%s: channel has been closed, nothing to do, exiting goroutine", me)
}

func watchPeers(kc kubeClient, ch chan<- podAddress) {
errWatch := kc.watchPodsAddresses(ch)
if errWatch != nil {
log.Fatalf("watchPeers: %v", errWatch)
kc.options.Fatalf("watchPeers: %v", errWatch)
}
log.Printf("watchPeers: nothing to do, exiting")
kc.options.Errorf("watchPeers: nothing to do, exiting goroutine")
}
Loading

0 comments on commit 9f1c952

Please sign in to comment.