diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 603b0864..29157ebb 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -5,7 +5,9 @@ import ( "encoding/binary" "fmt" "hash/crc32" + "slices" "sort" + "strings" "sync" "time" @@ -262,6 +264,60 @@ func (p *cursorOffsetPreferred) move() { type cursorPreferreds []cursorOffsetPreferred +func (cs cursorPreferreds) String() string { + type pnext struct { + p int32 + next int32 + } + ts := make(map[string][]pnext) + for _, c := range cs { + t := c.from.topic + p := c.from.partition + ts[t] = append(ts[t], pnext{p, c.preferredReplica}) + } + tsorted := make([]string, 0, len(ts)) + for t, ps := range ts { + tsorted = append(tsorted, t) + slices.SortFunc(ps, func(l, r pnext) int { + if l.p < r.p { + return -1 + } + if l.p > r.p { + return 1 + } + if l.next < r.next { + return -1 + } + if l.next > r.next { + return 1 + } + return 0 + }) + } + slices.Sort(tsorted) + + sb := new(strings.Builder) + for i, t := range tsorted { + ps := ts[t] + fmt.Fprintf(sb, "%s{", t) + + for j, p := range ps { + if j < len(ps)-1 { + fmt.Fprintf(sb, "%d=>%d, ", p.p, p.next) + } else { + fmt.Fprintf(sb, "%d=>%d", p.p, p.next) + } + } + + if i < len(tsorted)-1 { + fmt.Fprint(sb, "}, ") + } else { + fmt.Fprint(sb, "}") + } + } + return sb.String() +} + func (cs cursorPreferreds) eachPreferred(fn func(cursorOffsetPreferred)) { for _, c := range cs { fn(c) @@ -832,6 +888,12 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct // These two removals transition responsibility for finishing using the // cursor from the request's used offsets to the new source or the // reloading. + if len(preferreds) > 0 { + s.cl.cfg.logger.Log(LogLevelInfo, "fetch partitions returned preferred replicas", + "from_broker", s.nodeID, + "moves", preferreds.String(), + ) + } preferreds.eachPreferred(func(c cursorOffsetPreferred) { c.move() deleteReqUsedOffset(c.from.topic, c.from.partition)