Skip to content

Commit

Permalink
feat: add tracing (ipfs#30)
Browse files Browse the repository at this point in the history
* feat: add tracing

* make span names and attributes more consistent

This commit was moved from ipfs/go-namesys@bf4b3cf
  • Loading branch information
guseggert authored Apr 8, 2022
1 parent af325c0 commit 948dcb3
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 1 deletion.
6 changes: 6 additions & 0 deletions namesys/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,18 @@ func resolve(ctx context.Context, r resolver, name string, options opts.ResolveO
}

func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts) <-chan Result {
ctx, span := StartSpan(ctx, "ResolveAsync")
defer span.End()

resCh := r.resolveOnceAsync(ctx, name, options)
depth := options.Depth
outCh := make(chan Result, 1)

go func() {
defer close(outCh)
ctx, span := StartSpan(ctx, "ResolveAsync.Worker")
defer span.End()

var subCh <-chan Result
var cancelSub context.CancelFunc
defer func() {
Expand Down
17 changes: 17 additions & 0 deletions namesys/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
path "github.com/ipfs/go-path"
opts "github.com/ipfs/interface-go-ipfs-core/options/namesys"
dns "github.com/miekg/dns"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// LookupTXTFunc is a function that lookups TXT record values.
Expand All @@ -30,11 +32,17 @@ func NewDNSResolver(lookup LookupTXTFunc) *DNSResolver {

// Resolve implements Resolver.
func (r *DNSResolver) Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (path.Path, error) {
ctx, span := StartSpan(ctx, "DNSResolver.Resolve")
defer span.End()

return resolve(ctx, r, name, opts.ProcessOpts(options))
}

// ResolveAsync implements Resolver.
func (r *DNSResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result {
ctx, span := StartSpan(ctx, "DNSResolver.ResolveAsync")
defer span.End()

return resolveAsync(ctx, r, name, opts.ProcessOpts(options))
}

Expand All @@ -47,6 +55,9 @@ type lookupRes struct {
// TXT records for a given domain name should contain a b58
// encoded multihash.
func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult {
ctx, span := StartSpan(ctx, "DNSResolver.ResolveOnceAsync")
defer span.End()

var fqdn string
out := make(chan onceResult, 1)
segments := strings.SplitN(name, "/", 2)
Expand Down Expand Up @@ -80,6 +91,9 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options

go func() {
defer close(out)
ctx, span := StartSpan(ctx, "DNSResolver.ResolveOnceAsync.Worker")
defer span.End()

var rootResErr, subResErr error
for {
select {
Expand Down Expand Up @@ -131,6 +145,9 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options
}

func workDomain(ctx context.Context, r *DNSResolver, name string, res chan lookupRes) {
ctx, span := StartSpan(ctx, "DNSResolver.WorkDomain", trace.WithAttributes(attribute.String("Name", name)))
defer span.End()

defer close(res)

txt, err := r.lookupTXT(ctx, name)
Expand Down
21 changes: 21 additions & 0 deletions namesys/namesys.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
routing "github.com/libp2p/go-libp2p-core/routing"
dns "github.com/miekg/dns"
madns "github.com/multiformats/go-multiaddr-dns"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// mpns (a multi-protocol NameSystem) implements generic IPFS naming.
Expand Down Expand Up @@ -134,6 +136,9 @@ const DefaultResolverCacheTTL = time.Minute

// Resolve implements Resolver.
func (ns *mpns) Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (path.Path, error) {
ctx, span := StartSpan(ctx, "MPNS.Resolve", trace.WithAttributes(attribute.String("Name", name)))
defer span.End()

if strings.HasPrefix(name, "/ipfs/") {
return path.ParsePath(name)
}
Expand All @@ -146,6 +151,9 @@ func (ns *mpns) Resolve(ctx context.Context, name string, options ...opts.Resolv
}

func (ns *mpns) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result {
ctx, span := StartSpan(ctx, "MPNS.ResolveAsync", trace.WithAttributes(attribute.String("Name", name)))
defer span.End()

if strings.HasPrefix(name, "/ipfs/") {
p, err := path.ParsePath(name)
res := make(chan Result, 1)
Expand All @@ -167,6 +175,9 @@ func (ns *mpns) ResolveAsync(ctx context.Context, name string, options ...opts.R

// resolveOnce implements resolver.
func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult {
ctx, span := StartSpan(ctx, "MPNS.ResolveOnceAsync")
defer span.End()

out := make(chan onceResult, 1)

if !strings.HasPrefix(name, ipnsPrefix) {
Expand Down Expand Up @@ -213,11 +224,14 @@ func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts.
if len(segments) > 3 {
p, err = path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3])
}
span.SetAttributes(attribute.Bool("CacheHit", true))
span.RecordError(err)

out <- onceResult{value: p, err: err}
close(out)
return out
}
span.SetAttributes(attribute.Bool("CacheHit", false))

if err == nil {
res = ns.ipnsResolver
Expand Down Expand Up @@ -273,18 +287,25 @@ func emitOnceResult(ctx context.Context, outCh chan<- onceResult, r onceResult)

// Publish implements Publisher
func (ns *mpns) Publish(ctx context.Context, name ci.PrivKey, value path.Path) error {
ctx, span := StartSpan(ctx, "MPNS.Publish")
defer span.End()
return ns.PublishWithEOL(ctx, name, value, time.Now().Add(DefaultRecordEOL))
}

func (ns *mpns) PublishWithEOL(ctx context.Context, name ci.PrivKey, value path.Path, eol time.Time) error {
ctx, span := StartSpan(ctx, "MPNS.PublishWithEOL", trace.WithAttributes(attribute.String("Value", value.String())))
defer span.End()
id, err := peer.IDFromPrivateKey(name)
if err != nil {
span.RecordError(err)
return err
}
span.SetAttributes(attribute.String("ID", id.String()))
if err := ns.ipnsPublisher.PublishWithEOL(ctx, name, value, eol); err != nil {
// Invalidate the cache. Publishing may _partially_ succeed but
// still return an error.
ns.cacheInvalidate(string(id))
span.RecordError(err)
return err
}
ttl := DefaultResolverCacheTTL
Expand Down
14 changes: 14 additions & 0 deletions namesys/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
routing "github.com/libp2p/go-libp2p-core/routing"
base32 "github.com/whyrusleeping/base32"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const ipnsPrefix = "/ipns/"
Expand Down Expand Up @@ -191,6 +193,9 @@ func (p *IpnsPublisher) updateRecord(ctx context.Context, k crypto.PrivKey, valu
// PublishWithEOL is a temporary stand in for the ipns records implementation
// see here for more details: https://github.com/ipfs/specs/tree/master/records
func (p *IpnsPublisher) PublishWithEOL(ctx context.Context, k crypto.PrivKey, value path.Path, eol time.Time) error {
ctx, span := StartSpan(ctx, "IpnsPublisher.PublishWithEOL", trace.WithAttributes(attribute.String("Value", value.String())))
defer span.End()

record, err := p.updateRecord(ctx, k, value, eol)
if err != nil {
return err
Expand All @@ -216,6 +221,9 @@ func checkCtxTTL(ctx context.Context) (time.Duration, bool) {
// keyed on the ID associated with the provided public key. The public key is
// also made available to the routing system so that entries can be verified.
func PutRecordToRouting(ctx context.Context, r routing.ValueStore, k crypto.PubKey, entry *pb.IpnsEntry) error {
ctx, span := StartSpan(ctx, "PutRecordToRouting")
defer span.End()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down Expand Up @@ -266,6 +274,9 @@ func waitOnErrChan(ctx context.Context, errs chan error) error {
// PublishPublicKey stores the given public key in the ValueStore with the
// given key.
func PublishPublicKey(ctx context.Context, r routing.ValueStore, k string, pubk crypto.PubKey) error {
ctx, span := StartSpan(ctx, "PublishPublicKey", trace.WithAttributes(attribute.String("Key", k)))
defer span.End()

log.Debugf("Storing pubkey at: %s", k)
pkbytes, err := crypto.MarshalPublicKey(pubk)
if err != nil {
Expand All @@ -279,6 +290,9 @@ func PublishPublicKey(ctx context.Context, r routing.ValueStore, k string, pubk
// PublishEntry stores the given IpnsEntry in the ValueStore with the given
// ipnskey.
func PublishEntry(ctx context.Context, r routing.ValueStore, ipnskey string, rec *pb.IpnsEntry) error {
ctx, span := StartSpan(ctx, "PublishEntry", trace.WithAttributes(attribute.String("IPNSKey", ipnskey)))
defer span.End()

data, err := proto.Marshal(rec)
if err != nil {
return err
Expand Down
13 changes: 12 additions & 1 deletion namesys/republisher/repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
keystore "github.com/ipfs/go-ipfs-keystore"
namesys "github.com/ipfs/go-namesys"
path "github.com/ipfs/go-path"
"go.opentelemetry.io/otel/attribute"

proto "github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -93,6 +94,8 @@ func (rp *Republisher) Run(proc goprocess.Process) {
func (rp *Republisher) republishEntries(p goprocess.Process) error {
ctx, cancel := context.WithCancel(gpctx.OnClosingContext(p))
defer cancel()
ctx, span := namesys.StartSpan(ctx, "Republisher.RepublishEntries")
defer span.End()

// TODO: Use rp.ipns.ListPublished(). We can't currently *do* that
// because:
Expand Down Expand Up @@ -125,8 +128,11 @@ func (rp *Republisher) republishEntries(p goprocess.Process) error {
}

func (rp *Republisher) republishEntry(ctx context.Context, priv ic.PrivKey) error {
ctx, span := namesys.StartSpan(ctx, "Republisher.RepublishEntry")
defer span.End()
id, err := peer.IDFromPrivateKey(priv)
if err != nil {
span.RecordError(err)
return err
}

Expand All @@ -136,14 +142,17 @@ func (rp *Republisher) republishEntry(ctx context.Context, priv ic.PrivKey) erro
e, err := rp.getLastIPNSEntry(ctx, id)
if err != nil {
if err == errNoEntry {
span.SetAttributes(attribute.Bool("NoEntry", true))
return nil
}
span.RecordError(err)
return err
}

p := path.Path(e.GetValue())
prevEol, err := ipns.GetEOL(e)
if err != nil {
span.RecordError(err)
return err
}

Expand All @@ -152,7 +161,9 @@ func (rp *Republisher) republishEntry(ctx context.Context, priv ic.PrivKey) erro
if prevEol.After(eol) {
eol = prevEol
}
return rp.ns.PublishWithEOL(ctx, priv, p, eol)
err = rp.ns.PublishWithEOL(ctx, priv, p, eol)
span.RecordError(err)
return err
}

func (rp *Republisher) getLastIPNSEntry(ctx context.Context, id peer.ID) (*pb.IpnsEntry, error) {
Expand Down
4 changes: 4 additions & 0 deletions namesys/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"

"github.com/ipfs/go-path"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-namesys"
)
Expand All @@ -18,6 +20,8 @@ var ErrNoNamesys = errors.New(

// ResolveIPNS resolves /ipns paths
func ResolveIPNS(ctx context.Context, nsys namesys.NameSystem, p path.Path) (path.Path, error) {
ctx, span := namesys.StartSpan(ctx, "ResolveIPNS", trace.WithAttributes(attribute.String("Path", p.String())))
defer span.End()
if strings.HasPrefix(p.String(), "/ipns/") {
// TODO(cryptix): we should be able to query the local cache for the path
if nsys == nil {
Expand Down
12 changes: 12 additions & 0 deletions namesys/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
routing "github.com/libp2p/go-libp2p-core/routing"
dht "github.com/libp2p/go-libp2p-kad-dht"
mh "github.com/multiformats/go-multihash"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var log = logging.Logger("namesys")
Expand All @@ -38,17 +40,24 @@ func NewIpnsResolver(route routing.ValueStore) *IpnsResolver {

// Resolve implements Resolver.
func (r *IpnsResolver) Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (path.Path, error) {
ctx, span := StartSpan(ctx, "IpnsResolver.Resolve", trace.WithAttributes(attribute.String("Name", name)))
defer span.End()
return resolve(ctx, r, name, opts.ProcessOpts(options))
}

// ResolveAsync implements Resolver.
func (r *IpnsResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result {
ctx, span := StartSpan(ctx, "IpnsResolver.ResolveAsync", trace.WithAttributes(attribute.String("Name", name)))
defer span.End()
return resolveAsync(ctx, r, name, opts.ProcessOpts(options))
}

// resolveOnce implements resolver. Uses the IPFS routing system to
// resolve SFS-like names.
func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult {
ctx, span := StartSpan(ctx, "IpnsResolver.ResolveOnceAsync", trace.WithAttributes(attribute.String("Name", name)))
defer span.End()

out := make(chan onceResult, 1)
log.Debugf("RoutingResolver resolving %s", name)
cancel := func() {}
Expand Down Expand Up @@ -86,6 +95,9 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option
go func() {
defer cancel()
defer close(out)
ctx, span := StartSpan(ctx, "IpnsResolver.ResolveOnceAsync.Worker")
defer span.End()

for {
select {
case val, ok := <-vals:
Expand Down
13 changes: 13 additions & 0 deletions namesys/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package namesys

import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

func StartSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return otel.Tracer("go-namesys").Start(ctx, fmt.Sprintf("Namesys.%s", name))
}

0 comments on commit 948dcb3

Please sign in to comment.