From 948dcb33278f7ee434fab22af624941444cd2466 Mon Sep 17 00:00:00 2001 From: Gus Eggert Date: Fri, 8 Apr 2022 10:56:07 -0400 Subject: [PATCH] feat: add tracing (#30) * feat: add tracing * make span names and attributes more consistent This commit was moved from ipfs/go-namesys@bf4b3cffd9bdc0fb668bb6879f17070128145798 --- namesys/base.go | 6 ++++++ namesys/dns.go | 17 +++++++++++++++++ namesys/namesys.go | 21 +++++++++++++++++++++ namesys/publisher.go | 14 ++++++++++++++ namesys/republisher/repub.go | 13 ++++++++++++- namesys/resolve/resolve.go | 4 ++++ namesys/routing.go | 12 ++++++++++++ namesys/tracing.go | 13 +++++++++++++ 8 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 namesys/tracing.go diff --git a/namesys/base.go b/namesys/base.go index 27cc38f88..5bf64c0e3 100644 --- a/namesys/base.go +++ b/namesys/base.go @@ -40,12 +40,18 @@ func resolve(ctx context.Context, r resolver, name string, options opts.ResolveO } func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts) <-chan Result { + ctx, span := StartSpan(ctx, "ResolveAsync") + defer span.End() + resCh := r.resolveOnceAsync(ctx, name, options) depth := options.Depth outCh := make(chan Result, 1) go func() { defer close(outCh) + ctx, span := StartSpan(ctx, "ResolveAsync.Worker") + defer span.End() + var subCh <-chan Result var cancelSub context.CancelFunc defer func() { diff --git a/namesys/dns.go b/namesys/dns.go index 139835617..ba1906162 100644 --- a/namesys/dns.go +++ b/namesys/dns.go @@ -11,6 +11,8 @@ import ( path "github.com/ipfs/go-path" opts "github.com/ipfs/interface-go-ipfs-core/options/namesys" dns "github.com/miekg/dns" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // LookupTXTFunc is a function that lookups TXT record values. @@ -30,11 +32,17 @@ func NewDNSResolver(lookup LookupTXTFunc) *DNSResolver { // Resolve implements Resolver. func (r *DNSResolver) Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (path.Path, error) { + ctx, span := StartSpan(ctx, "DNSResolver.Resolve") + defer span.End() + return resolve(ctx, r, name, opts.ProcessOpts(options)) } // ResolveAsync implements Resolver. func (r *DNSResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result { + ctx, span := StartSpan(ctx, "DNSResolver.ResolveAsync") + defer span.End() + return resolveAsync(ctx, r, name, opts.ProcessOpts(options)) } @@ -47,6 +55,9 @@ type lookupRes struct { // TXT records for a given domain name should contain a b58 // encoded multihash. func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { + ctx, span := StartSpan(ctx, "DNSResolver.ResolveOnceAsync") + defer span.End() + var fqdn string out := make(chan onceResult, 1) segments := strings.SplitN(name, "/", 2) @@ -80,6 +91,9 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options go func() { defer close(out) + ctx, span := StartSpan(ctx, "DNSResolver.ResolveOnceAsync.Worker") + defer span.End() + var rootResErr, subResErr error for { select { @@ -131,6 +145,9 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options } func workDomain(ctx context.Context, r *DNSResolver, name string, res chan lookupRes) { + ctx, span := StartSpan(ctx, "DNSResolver.WorkDomain", trace.WithAttributes(attribute.String("Name", name))) + defer span.End() + defer close(res) txt, err := r.lookupTXT(ctx, name) diff --git a/namesys/namesys.go b/namesys/namesys.go index 537f0d1b0..51b5e7096 100644 --- a/namesys/namesys.go +++ b/namesys/namesys.go @@ -30,6 +30,8 @@ import ( routing "github.com/libp2p/go-libp2p-core/routing" dns "github.com/miekg/dns" madns "github.com/multiformats/go-multiaddr-dns" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // mpns (a multi-protocol NameSystem) implements generic IPFS naming. @@ -134,6 +136,9 @@ const DefaultResolverCacheTTL = time.Minute // Resolve implements Resolver. func (ns *mpns) Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (path.Path, error) { + ctx, span := StartSpan(ctx, "MPNS.Resolve", trace.WithAttributes(attribute.String("Name", name))) + defer span.End() + if strings.HasPrefix(name, "/ipfs/") { return path.ParsePath(name) } @@ -146,6 +151,9 @@ func (ns *mpns) Resolve(ctx context.Context, name string, options ...opts.Resolv } func (ns *mpns) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result { + ctx, span := StartSpan(ctx, "MPNS.ResolveAsync", trace.WithAttributes(attribute.String("Name", name))) + defer span.End() + if strings.HasPrefix(name, "/ipfs/") { p, err := path.ParsePath(name) res := make(chan Result, 1) @@ -167,6 +175,9 @@ func (ns *mpns) ResolveAsync(ctx context.Context, name string, options ...opts.R // resolveOnce implements resolver. func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { + ctx, span := StartSpan(ctx, "MPNS.ResolveOnceAsync") + defer span.End() + out := make(chan onceResult, 1) if !strings.HasPrefix(name, ipnsPrefix) { @@ -213,11 +224,14 @@ func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts. if len(segments) > 3 { p, err = path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3]) } + span.SetAttributes(attribute.Bool("CacheHit", true)) + span.RecordError(err) out <- onceResult{value: p, err: err} close(out) return out } + span.SetAttributes(attribute.Bool("CacheHit", false)) if err == nil { res = ns.ipnsResolver @@ -273,18 +287,25 @@ func emitOnceResult(ctx context.Context, outCh chan<- onceResult, r onceResult) // Publish implements Publisher func (ns *mpns) Publish(ctx context.Context, name ci.PrivKey, value path.Path) error { + ctx, span := StartSpan(ctx, "MPNS.Publish") + defer span.End() return ns.PublishWithEOL(ctx, name, value, time.Now().Add(DefaultRecordEOL)) } func (ns *mpns) PublishWithEOL(ctx context.Context, name ci.PrivKey, value path.Path, eol time.Time) error { + ctx, span := StartSpan(ctx, "MPNS.PublishWithEOL", trace.WithAttributes(attribute.String("Value", value.String()))) + defer span.End() id, err := peer.IDFromPrivateKey(name) if err != nil { + span.RecordError(err) return err } + span.SetAttributes(attribute.String("ID", id.String())) if err := ns.ipnsPublisher.PublishWithEOL(ctx, name, value, eol); err != nil { // Invalidate the cache. Publishing may _partially_ succeed but // still return an error. ns.cacheInvalidate(string(id)) + span.RecordError(err) return err } ttl := DefaultResolverCacheTTL diff --git a/namesys/publisher.go b/namesys/publisher.go index 1ea9e2145..bf1c46d9d 100644 --- a/namesys/publisher.go +++ b/namesys/publisher.go @@ -16,6 +16,8 @@ import ( peer "github.com/libp2p/go-libp2p-core/peer" routing "github.com/libp2p/go-libp2p-core/routing" base32 "github.com/whyrusleeping/base32" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) const ipnsPrefix = "/ipns/" @@ -191,6 +193,9 @@ func (p *IpnsPublisher) updateRecord(ctx context.Context, k crypto.PrivKey, valu // PublishWithEOL is a temporary stand in for the ipns records implementation // see here for more details: https://github.com/ipfs/specs/tree/master/records func (p *IpnsPublisher) PublishWithEOL(ctx context.Context, k crypto.PrivKey, value path.Path, eol time.Time) error { + ctx, span := StartSpan(ctx, "IpnsPublisher.PublishWithEOL", trace.WithAttributes(attribute.String("Value", value.String()))) + defer span.End() + record, err := p.updateRecord(ctx, k, value, eol) if err != nil { return err @@ -216,6 +221,9 @@ func checkCtxTTL(ctx context.Context) (time.Duration, bool) { // keyed on the ID associated with the provided public key. The public key is // also made available to the routing system so that entries can be verified. func PutRecordToRouting(ctx context.Context, r routing.ValueStore, k crypto.PubKey, entry *pb.IpnsEntry) error { + ctx, span := StartSpan(ctx, "PutRecordToRouting") + defer span.End() + ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -266,6 +274,9 @@ func waitOnErrChan(ctx context.Context, errs chan error) error { // PublishPublicKey stores the given public key in the ValueStore with the // given key. func PublishPublicKey(ctx context.Context, r routing.ValueStore, k string, pubk crypto.PubKey) error { + ctx, span := StartSpan(ctx, "PublishPublicKey", trace.WithAttributes(attribute.String("Key", k))) + defer span.End() + log.Debugf("Storing pubkey at: %s", k) pkbytes, err := crypto.MarshalPublicKey(pubk) if err != nil { @@ -279,6 +290,9 @@ func PublishPublicKey(ctx context.Context, r routing.ValueStore, k string, pubk // PublishEntry stores the given IpnsEntry in the ValueStore with the given // ipnskey. func PublishEntry(ctx context.Context, r routing.ValueStore, ipnskey string, rec *pb.IpnsEntry) error { + ctx, span := StartSpan(ctx, "PublishEntry", trace.WithAttributes(attribute.String("IPNSKey", ipnskey))) + defer span.End() + data, err := proto.Marshal(rec) if err != nil { return err diff --git a/namesys/republisher/repub.go b/namesys/republisher/repub.go index 5fefac222..a24e59dff 100644 --- a/namesys/republisher/repub.go +++ b/namesys/republisher/repub.go @@ -10,6 +10,7 @@ import ( keystore "github.com/ipfs/go-ipfs-keystore" namesys "github.com/ipfs/go-namesys" path "github.com/ipfs/go-path" + "go.opentelemetry.io/otel/attribute" proto "github.com/gogo/protobuf/proto" ds "github.com/ipfs/go-datastore" @@ -93,6 +94,8 @@ func (rp *Republisher) Run(proc goprocess.Process) { func (rp *Republisher) republishEntries(p goprocess.Process) error { ctx, cancel := context.WithCancel(gpctx.OnClosingContext(p)) defer cancel() + ctx, span := namesys.StartSpan(ctx, "Republisher.RepublishEntries") + defer span.End() // TODO: Use rp.ipns.ListPublished(). We can't currently *do* that // because: @@ -125,8 +128,11 @@ func (rp *Republisher) republishEntries(p goprocess.Process) error { } func (rp *Republisher) republishEntry(ctx context.Context, priv ic.PrivKey) error { + ctx, span := namesys.StartSpan(ctx, "Republisher.RepublishEntry") + defer span.End() id, err := peer.IDFromPrivateKey(priv) if err != nil { + span.RecordError(err) return err } @@ -136,14 +142,17 @@ func (rp *Republisher) republishEntry(ctx context.Context, priv ic.PrivKey) erro e, err := rp.getLastIPNSEntry(ctx, id) if err != nil { if err == errNoEntry { + span.SetAttributes(attribute.Bool("NoEntry", true)) return nil } + span.RecordError(err) return err } p := path.Path(e.GetValue()) prevEol, err := ipns.GetEOL(e) if err != nil { + span.RecordError(err) return err } @@ -152,7 +161,9 @@ func (rp *Republisher) republishEntry(ctx context.Context, priv ic.PrivKey) erro if prevEol.After(eol) { eol = prevEol } - return rp.ns.PublishWithEOL(ctx, priv, p, eol) + err = rp.ns.PublishWithEOL(ctx, priv, p, eol) + span.RecordError(err) + return err } func (rp *Republisher) getLastIPNSEntry(ctx context.Context, id peer.ID) (*pb.IpnsEntry, error) { diff --git a/namesys/resolve/resolve.go b/namesys/resolve/resolve.go index 5f1b4eed9..38096593e 100644 --- a/namesys/resolve/resolve.go +++ b/namesys/resolve/resolve.go @@ -7,6 +7,8 @@ import ( "strings" "github.com/ipfs/go-path" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-namesys" ) @@ -18,6 +20,8 @@ var ErrNoNamesys = errors.New( // ResolveIPNS resolves /ipns paths func ResolveIPNS(ctx context.Context, nsys namesys.NameSystem, p path.Path) (path.Path, error) { + ctx, span := namesys.StartSpan(ctx, "ResolveIPNS", trace.WithAttributes(attribute.String("Path", p.String()))) + defer span.End() if strings.HasPrefix(p.String(), "/ipns/") { // TODO(cryptix): we should be able to query the local cache for the path if nsys == nil { diff --git a/namesys/routing.go b/namesys/routing.go index 8bdfe21e6..c73e23ed7 100644 --- a/namesys/routing.go +++ b/namesys/routing.go @@ -16,6 +16,8 @@ import ( routing "github.com/libp2p/go-libp2p-core/routing" dht "github.com/libp2p/go-libp2p-kad-dht" mh "github.com/multiformats/go-multihash" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) var log = logging.Logger("namesys") @@ -38,17 +40,24 @@ func NewIpnsResolver(route routing.ValueStore) *IpnsResolver { // Resolve implements Resolver. func (r *IpnsResolver) Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (path.Path, error) { + ctx, span := StartSpan(ctx, "IpnsResolver.Resolve", trace.WithAttributes(attribute.String("Name", name))) + defer span.End() return resolve(ctx, r, name, opts.ProcessOpts(options)) } // ResolveAsync implements Resolver. func (r *IpnsResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result { + ctx, span := StartSpan(ctx, "IpnsResolver.ResolveAsync", trace.WithAttributes(attribute.String("Name", name))) + defer span.End() return resolveAsync(ctx, r, name, opts.ProcessOpts(options)) } // resolveOnce implements resolver. Uses the IPFS routing system to // resolve SFS-like names. func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { + ctx, span := StartSpan(ctx, "IpnsResolver.ResolveOnceAsync", trace.WithAttributes(attribute.String("Name", name))) + defer span.End() + out := make(chan onceResult, 1) log.Debugf("RoutingResolver resolving %s", name) cancel := func() {} @@ -86,6 +95,9 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option go func() { defer cancel() defer close(out) + ctx, span := StartSpan(ctx, "IpnsResolver.ResolveOnceAsync.Worker") + defer span.End() + for { select { case val, ok := <-vals: diff --git a/namesys/tracing.go b/namesys/tracing.go new file mode 100644 index 000000000..4ef84294a --- /dev/null +++ b/namesys/tracing.go @@ -0,0 +1,13 @@ +package namesys + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" +) + +func StartSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + return otel.Tracer("go-namesys").Start(ctx, fmt.Sprintf("Namesys.%s", name)) +}