Skip to content

Commit

Permalink
use zap in the fetch package (#6109)
Browse files Browse the repository at this point in the history
## Motivation

Part of the effort to switch from log to zap.
  • Loading branch information
poszu committed Jul 10, 2024
1 parent 988fd1c commit 80f3ee5
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 228 deletions.
153 changes: 76 additions & 77 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/network"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"

Expand Down Expand Up @@ -215,7 +216,7 @@ func WithConfig(c Config) Option {
}

// WithLogger configures logger for the fetcher.
func WithLogger(log log.Log) Option {
func WithLogger(log *zap.Logger) Option {
return func(f *Fetch) {
f.logger = log
}
Expand All @@ -236,7 +237,7 @@ func withHost(h host) Option {
// Fetch is the main struct that contains network peers and logic to batch and dispatch hash fetch requests.
type Fetch struct {
cfg Config
logger log.Log
logger *zap.Logger
bs *datastore.BlobStore
host host
peers *peers.Peers
Expand Down Expand Up @@ -271,7 +272,7 @@ func NewFetch(

f := &Fetch{
cfg: DefaultConfig(),
logger: log.NewNop(),
logger: zap.NewNop(),
bs: bs,
host: host,
servers: map[string]requester{},
Expand All @@ -289,7 +290,7 @@ func NewFetch(
if host != nil {
connectedf := func(peer p2p.Peer) {
if f.peers.Add(peer) {
f.logger.With().Debug("add peer", log.Stringer("id", peer))
f.logger.Debug("adding peer", zap.Stringer("id", peer))
}
}
host.Network().Notify(&network.NotifyBundle{
Expand All @@ -300,7 +301,7 @@ func NewFetch(
},
DisconnectedF: func(_ network.Network, c network.Conn) {
if !c.Stat().Limited && !host.Connected(c.RemotePeer()) {
f.logger.With().Debug("remove peer", log.Stringer("id", c.RemotePeer()))
f.logger.Debug("removing peer", zap.Stringer("id", c.RemotePeer()))
f.peers.Delete(c.RemotePeer())
}
},
Expand All @@ -314,7 +315,7 @@ func NewFetch(

f.batchTimeout = time.NewTicker(f.cfg.BatchTimeout)
if len(f.servers) == 0 {
h := newHandler(cdb, bs, f.logger)
h := newHandler(cdb, bs, f.logger.Named("handler"))
if f.cfg.Streaming {
f.registerServer(host, atxProtocol, h.handleEpochInfoReqStream)
f.registerServer(host, hashProtocol, h.handleHashReqStream)
Expand Down Expand Up @@ -350,7 +351,7 @@ func (f *Fetch) registerServer(
opts := []server.Opt{
server.WithTimeout(f.cfg.RequestTimeout),
server.WithHardTimeout(f.cfg.RequestHardTimeout),
server.WithLog(f.logger.Zap()),
server.WithLog(f.logger),
server.WithDecayingTag(f.cfg.DecayingTag),
}
if f.cfg.EnableServerMetrics {
Expand Down Expand Up @@ -419,7 +420,7 @@ func (f *Fetch) Start() error {
return nil
case <-time.After(f.cfg.LogPeerStatsInterval):
stats := f.peers.Stats()
f.logger.With().Info("peer stats", log.Inline(&stats))
f.logger.Info("peer stats", zap.Inline(&stats))
}
}
})
Expand Down Expand Up @@ -523,34 +524,34 @@ func (f *Fetch) receiveResponse(data []byte, batch *batchInfo) {

var response ResponseBatch
if err := codec.Decode(data, &response); err != nil {
f.logger.With().Warning("failed to decode batch response", log.Err(err))
f.logger.Warn("failed to decode batch response", zap.Error(err))
return
}

f.logger.With().Debug("received batch response",
log.Stringer("batch_hash", response.ID),
log.Int("num_hashes", len(response.Responses)),
f.logger.Debug("received batch response",
zap.Stringer("batch_hash", response.ID),
zap.Int("num_hashes", len(response.Responses)),
)

if batch.ID != response.ID {
f.logger.With().Warning(
f.logger.Warn(
"unknown batch response received",
log.Stringer("expected", batch.ID),
log.Stringer("response", response.ID),
zap.Stringer("expected", batch.ID),
zap.Stringer("response", response.ID),
)
return
}

batchMap := batch.toMap()
// iterate all hash Responses
for _, resp := range response.Responses {
f.logger.With().Debug("received response for hash", log.Stringer("hash", resp.Hash))
f.logger.Debug("received response for hash", zap.Stringer("hash", resp.Hash))
f.mu.Lock()
req, ok := f.ongoing[resp.Hash]
f.mu.Unlock()

if !ok {
f.logger.With().Warning("response received for unknown hash", log.Stringer("hash", resp.Hash))
f.logger.Warn("response received for unknown hash", zap.Stringer("hash", resp.Hash))
continue
}

Expand All @@ -565,10 +566,10 @@ func (f *Fetch) receiveResponse(data []byte, batch *batchInfo) {
// iterate all requests that didn't return value from peer and notify
// they will be retried for MaxRetriesForRequest
for h, r := range batchMap {
f.logger.With().Debug("hash not found in response from peer",
log.String("hint", string(r.Hint)),
log.Stringer("hash", h),
log.Stringer("peer", batch.peer),
f.logger.Debug("hash not found in response from peer",
zap.String("hint", string(r.Hint)),
zap.Stringer("hash", h),
zap.Stringer("peer", batch.peer),
)
f.failAfterRetry(r.Hash)
}
Expand All @@ -580,14 +581,13 @@ func (f *Fetch) hashValidationDone(hash types.Hash32, err error) {

req, ok := f.ongoing[hash]
if !ok {
f.logger.With().Error("validation ran for unknown hash", log.Stringer("hash", hash))
f.logger.Error("validation ran for unknown hash", zap.Stringer("hash", hash))
return
}
if err != nil {
req.promise.err = err
} else {
f.logger.WithContext(req.ctx).With().Debug("hash request done",
log.Stringer("hash", hash))
f.logger.Debug("hash request done", log.ZContext(req.ctx), zap.Stringer("hash", hash))
}
close(req.promise.completed)
delete(f.ongoing, hash)
Expand All @@ -599,7 +599,7 @@ func (f *Fetch) failAfterRetry(hash types.Hash32) {

req, ok := f.ongoing[hash]
if !ok {
f.logger.With().Error("hash missing from ongoing requests", log.Stringer("hash", hash))
f.logger.Error("hash missing from ongoing requests", zap.Stringer("hash", hash))
return
}

Expand All @@ -612,9 +612,10 @@ func (f *Fetch) failAfterRetry(hash types.Hash32) {

req.retries++
if req.retries > f.cfg.MaxRetriesForRequest {
f.logger.WithContext(req.ctx).With().Debug("gave up on hash after max retries",
log.Stringer("hash", req.hash),
log.Int("retries", req.retries),
f.logger.Debug("gave up on hash after max retries",
log.ZContext(req.ctx),
zap.Stringer("hash", req.hash),
zap.Int("retries", req.retries),
)
req.promise.err = ErrExceedMaxRetries
close(req.promise.completed)
Expand All @@ -637,9 +638,7 @@ func (f *Fetch) getUnprocessed() []RequestMessage {
var requestList []RequestMessage
// only send one request per hash
for hash, req := range f.unprocessed {
f.logger.WithContext(req.ctx).
With().
Debug("processing hash request", log.Stringer("hash", hash))
f.logger.Debug("processing hash request", log.ZContext(req.ctx), zap.Stringer("hash", hash))
requestList = append(requestList, RequestMessage{Hash: hash, Hint: req.hint})
// move the processed requests to pending
f.ongoing[hash] = req
Expand All @@ -662,22 +661,22 @@ func (f *Fetch) send(requests []RequestMessage) {
go func() {
if f.cfg.Streaming {
if err := f.streamBatch(peer, batch); err != nil {
f.logger.With().Debug(
f.logger.Debug(
"failed to process batch request",
log.Stringer("batch", batch.ID),
log.Stringer("peer", peer),
log.Err(err),
zap.Stringer("batch", batch.ID),
zap.Stringer("peer", peer),
zap.Error(err),
)
}
return
}
data, err := f.sendBatch(peer, batch)
if err != nil {
f.logger.With().Debug(
f.logger.Debug(
"failed to send batch request",
log.Stringer("batch", batch.ID),
log.Stringer("peer", peer),
log.Err(err),
zap.Stringer("batch", batch.ID),
zap.Stringer("peer", peer),
zap.Error(err),
)
f.handleHashError(batch, err)
} else {
Expand Down Expand Up @@ -706,9 +705,9 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][]*batc
close(req.promise.completed)
delete(f.ongoing, req.hash)
} else {
f.logger.With().Error("ongoing request missing",
log.Stringer("hash", msg.Hash),
log.String("hint", string(msg.Hint)),
f.logger.Error("ongoing request missing",
zap.Stringer("hash", msg.Hash),
zap.String("hint", string(msg.Hint)),
)
}
}
Expand Down Expand Up @@ -769,11 +768,11 @@ func (f *Fetch) streamBatch(peer p2p.Peer, batch *batchInfo) error {
if f.stopped() {
return f.shutdownCtx.Err()
}
f.logger.With().Debug("sending streamed batched request to peer",
log.Stringer("batch_hash", batch.ID),
log.Int("num_requests", len(batch.Requests)),
log.Stringer("peer", peer),
log.Any("extraProtocols", batch.extraProtocols()),
f.logger.Debug("sending streamed batched request to peer",
zap.Stringer("batch_hash", batch.ID),
zap.Int("num_requests", len(batch.Requests)),
zap.Stringer("peer", peer),
zap.Any("extraProtocols", batch.extraProtocols()),
)
// Request is synchronous, it will return errors only if size of the bytes buffer
// is large or target peer is not connected
Expand All @@ -793,10 +792,10 @@ func (f *Fetch) streamBatch(peer p2p.Peer, batch *batchInfo) error {
// iterate all requests that didn't return value from peer and notify
// they will be retried for MaxRetriesForRequest
for h, r := range batchMap {
f.logger.With().Debug("hash not found in response from peer",
log.String("hint", string(r.Hint)),
log.Stringer("hash", h),
log.Stringer("peer", batch.peer),
f.logger.Debug("hash not found in response from peer",
zap.String("hint", string(r.Hint)),
zap.Stringer("hash", h),
zap.Stringer("peer", batch.peer),
)
f.failAfterRetry(r.Hash)
}
Expand All @@ -805,11 +804,11 @@ func (f *Fetch) streamBatch(peer p2p.Peer, batch *batchInfo) error {
},
batch.extraProtocols()...)
if err != nil {
f.logger.With().Debug(
f.logger.Debug(
"failed to send batch request",
log.Stringer("batch", batch.ID),
log.Stringer("peer", peer),
log.Err(err),
zap.Stringer("batch", batch.ID),
zap.Stringer("peer", peer),
zap.Error(err),
)
f.handleHashError(batch, err)
}
Expand All @@ -828,10 +827,10 @@ func (f *Fetch) receiveStreamedBatch(
return 0, err
}
if id != batch.ID {
f.logger.With().Warning(
f.logger.Warn(
"unknown batch response received",
log.Stringer("expected", batch.ID),
log.Stringer("response", id),
zap.Stringer("expected", batch.ID),
zap.Stringer("response", id),
)
return 0, errors.New("mismatched response")
}
Expand All @@ -850,7 +849,7 @@ func (f *Fetch) receiveStreamedBatch(

nBytes += n

f.logger.With().Debug("received response for hash", log.Stringer("hash", respHash))
f.logger.Debug("received response for hash", zap.Stringer("hash", respHash))
f.mu.Lock()
req, ok := f.ongoing[respHash]
f.mu.Unlock()
Expand All @@ -870,8 +869,7 @@ func (f *Fetch) receiveStreamedBatch(

if !ok {
// we make sure to read the blob before continuing
f.logger.With().Warning("response received for unknown hash",
log.Stringer("hash", respHash))
f.logger.Warn("response received for unknown hash", zap.Stringer("hash", respHash))
continue
}

Expand All @@ -892,11 +890,11 @@ func (f *Fetch) sendBatch(peer p2p.Peer, batch *batchInfo) ([]byte, error) {
if f.stopped() {
return nil, f.shutdownCtx.Err()
}
f.logger.With().Debug("sending batched request to peer",
log.Stringer("batch_hash", batch.ID),
log.Int("num_requests", len(batch.Requests)),
log.Stringer("peer", peer),
log.Any("extraProtocols", batch.extraProtocols()),
f.logger.Debug("sending batched request to peer",
zap.Stringer("batch_hash", batch.ID),
zap.Int("num_requests", len(batch.Requests)),
zap.Stringer("peer", peer),
zap.Any("extraProtocols", batch.extraProtocols()),
)
// Request is synchronous,
// it will return errors only if size of the bytes buffer is large
Expand All @@ -913,11 +911,10 @@ func (f *Fetch) handleHashError(batch *batchInfo, err error) {
for _, br := range batch.Requests {
req, ok := f.ongoing[br.Hash]
if !ok {
f.logger.With().Warning("hash missing from ongoing requests", log.Stringer("hash", br.Hash))
f.logger.Warn("hash missing from ongoing requests", zap.Stringer("hash", br.Hash))
continue
}
f.logger.WithContext(req.ctx).With().
Debug("hash request failed", log.Stringer("hash", req.hash), log.Err(err))
f.logger.Debug("hash request failed", log.ZContext(req.ctx), zap.Stringer("hash", req.hash), zap.Error(err))
req.promise.err = err
peerErrors.WithLabelValues(string(req.hint)).Inc()
close(req.promise.completed)
Expand Down Expand Up @@ -947,7 +944,7 @@ func (f *Fetch) getHash(
defer f.mu.Unlock()

if _, ok := f.ongoing[hash]; ok {
f.logger.WithContext(ctx).With().Debug("request ongoing", log.Stringer("hash", hash))
f.logger.Debug("request ongoing", log.ZContext(ctx), zap.Stringer("hash", hash))
return f.ongoing[hash].promise, nil
}

Expand All @@ -961,14 +958,16 @@ func (f *Fetch) getHash(
completed: make(chan struct{}, 1),
},
}
f.logger.WithContext(ctx).With().Debug("hash request added to queue",
log.Stringer("hash", hash),
log.Int("queued", len(f.unprocessed)))
f.logger.Debug("hash request added to queue",
log.ZContext(ctx),
zap.Stringer("hash", hash),
zap.Int("queued", len(f.unprocessed)))
} else {
f.logger.WithContext(ctx).With().Debug("hash request already in queue",
log.Stringer("hash", hash),
log.Int("retries", f.unprocessed[hash].retries),
log.Int("queued", len(f.unprocessed)))
f.logger.Debug("hash request already in queue",
log.ZContext(ctx),
zap.Stringer("hash", hash),
zap.Int("retries", f.unprocessed[hash].retries),
zap.Int("queued", len(f.unprocessed)))
}
if len(f.unprocessed) >= f.cfg.QueueSize {
f.eg.Go(func() error {
Expand Down
Loading

0 comments on commit 80f3ee5

Please sign in to comment.