Skip to content

Commit

Permalink
Merge pull request #762 from twmb/preferred_log
Browse files Browse the repository at this point in the history
kgo: add log for preferred replicas
  • Loading branch information
twmb authored Jul 29, 2024
2 parents b44e16e + cda897d commit 940ed68
Showing 1 changed file with 62 additions and 0 deletions.
62 changes: 62 additions & 0 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"encoding/binary"
"fmt"
"hash/crc32"
"slices"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -262,6 +264,60 @@ func (p *cursorOffsetPreferred) move() {

type cursorPreferreds []cursorOffsetPreferred

func (cs cursorPreferreds) String() string {
type pnext struct {
p int32
next int32
}
ts := make(map[string][]pnext)
for _, c := range cs {
t := c.from.topic
p := c.from.partition
ts[t] = append(ts[t], pnext{p, c.preferredReplica})
}
tsorted := make([]string, 0, len(ts))
for t, ps := range ts {
tsorted = append(tsorted, t)
slices.SortFunc(ps, func(l, r pnext) int {
if l.p < r.p {
return -1
}
if l.p > r.p {
return 1
}
if l.next < r.next {
return -1
}
if l.next > r.next {
return 1
}
return 0
})
}
slices.Sort(tsorted)

sb := new(strings.Builder)
for i, t := range tsorted {
ps := ts[t]
fmt.Fprintf(sb, "%s{", t)

for j, p := range ps {
if j < len(ps)-1 {
fmt.Fprintf(sb, "%d=>%d, ", p.p, p.next)
} else {
fmt.Fprintf(sb, "%d=>%d", p.p, p.next)
}
}

if i < len(tsorted)-1 {
fmt.Fprint(sb, "}, ")
} else {
fmt.Fprint(sb, "}")
}
}
return sb.String()
}

func (cs cursorPreferreds) eachPreferred(fn func(cursorOffsetPreferred)) {
for _, c := range cs {
fn(c)
Expand Down Expand Up @@ -832,6 +888,12 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
// These two removals transition responsibility for finishing using the
// cursor from the request's used offsets to the new source or the
// reloading.
if len(preferreds) > 0 {
s.cl.cfg.logger.Log(LogLevelInfo, "fetch partitions returned preferred replicas",
"from_broker", s.nodeID,
"moves", preferreds.String(),
)
}
preferreds.eachPreferred(func(c cursorOffsetPreferred) {
c.move()
deleteReqUsedOffset(c.from.topic, c.from.partition)
Expand Down

0 comments on commit 940ed68

Please sign in to comment.