diff --git a/CHANGELOG.md b/CHANGELOG.md index c17a9c46e..f334cca8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,8 @@ The following emojis are used to highlight certain changes: * ✨ Migrated repositories into Boxo * [`github.com/ipfs/kubo/peering`](https://pkg.go.dev/github.com/ipfs/kubo/peering) => [`./peering`](./peering) A service which establish, overwatch and maintain long lived connections. + * [`github.com/ipfs/kubo/core/bootstrap`](https://pkg.go.dev/github.com/ipfs/kubo/core/bootstrap) => [`./bootstrap](./bootstrap) + A service that maintains connections to a number of bootstrap peers. ### Changed diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go new file mode 100644 index 000000000..347d98797 --- /dev/null +++ b/bootstrap/bootstrap.go @@ -0,0 +1,371 @@ +package bootstrap + +import ( + "context" + "errors" + "io" + "math/rand" + "sync" + "sync/atomic" + "time" + + logging "github.com/ipfs/go-log" + "github.com/jbenet/goprocess" + goprocessctx "github.com/jbenet/goprocess/context" + periodicproc "github.com/jbenet/goprocess/periodic" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/routing" +) + +var log = logging.Logger("bootstrap") + +// ErrNotEnoughBootstrapPeers signals that we do not have enough bootstrap +// peers to bootstrap correctly. +var ErrNotEnoughBootstrapPeers = errors.New("not enough bootstrap peers to bootstrap") + +// BootstrapConfig specifies parameters used in an IpfsNode's network +// bootstrapping process. +type BootstrapConfig struct { + // MinPeerThreshold governs whether to bootstrap more connections. If the + // node has less open connections than this number, it will open connections + // to the bootstrap nodes. From there, the routing system should be able + // to use the connections to the bootstrap nodes to connect to even more + // peers. Routing systems like the IpfsDHT do so in their own Bootstrap + // process, which issues random queries to find more peers. + MinPeerThreshold int + + // Period governs the periodic interval at which the node will + // attempt to bootstrap. The bootstrap process is not very expensive, so + // this threshold can afford to be small (<=30s). + Period time.Duration + + // ConnectionTimeout determines how long to wait for a bootstrap + // connection attempt before cancelling it. + ConnectionTimeout time.Duration + + // BootstrapPeers is a function that returns a set of bootstrap peers + // for the bootstrap process to use. This makes it possible for clients + // to control the peers the process uses at any moment. + BootstrapPeers func() []peer.AddrInfo + + // BackupBootstrapInterval governs the periodic interval at which the node will + // attempt to save connected nodes to use as temporary bootstrap peers. + BackupBootstrapInterval time.Duration + + // MaxBackupBootstrapSize controls the maximum number of peers we're saving + // as backup bootstrap peers. + MaxBackupBootstrapSize int + + saveBackupBootstrapPeers func(context.Context, []peer.AddrInfo) + loadBackupBootstrapPeers func(context.Context) []peer.AddrInfo +} + +// DefaultBootstrapConfig specifies default sane parameters for bootstrapping. +var DefaultBootstrapConfig = BootstrapConfig{ + MinPeerThreshold: 4, + Period: 30 * time.Second, + ConnectionTimeout: (30 * time.Second) / 3, // Period / 3 + BackupBootstrapInterval: 1 * time.Hour, + MaxBackupBootstrapSize: 20, +} + +// BootstrapConfigWithPeers creates a default BootstrapConfig configured with +// the specified peers, and optional functions to load and save backup peers. +func BootstrapConfigWithPeers(pis []peer.AddrInfo, options ...func(*BootstrapConfig)) BootstrapConfig { + cfg := DefaultBootstrapConfig + cfg.BootstrapPeers = func() []peer.AddrInfo { + return pis + } + for _, opt := range options { + opt(&cfg) + } + return cfg +} + +// WithBackupPeers configures functions to load and save backup bootstrap peers. +func WithBackupPeers(load func(context.Context) []peer.AddrInfo, save func(context.Context, []peer.AddrInfo)) func(*BootstrapConfig) { + if save == nil && load != nil || save != nil && load == nil { + panic("both load and save backup bootstrap peers functions must be defined") + } + return func(cfg *BootstrapConfig) { + cfg.loadBackupBootstrapPeers = load + cfg.saveBackupBootstrapPeers = save + } +} + +// BackupPeers returns the load and save backup peers functions. +func (cfg *BootstrapConfig) BackupPeers() (func(context.Context) []peer.AddrInfo, func(context.Context, []peer.AddrInfo)) { + return cfg.loadBackupBootstrapPeers, cfg.saveBackupBootstrapPeers +} + +// SetBackupPeers sets the load and save backup peers functions. +func (cfg *BootstrapConfig) SetBackupPeers(load func(context.Context) []peer.AddrInfo, save func(context.Context, []peer.AddrInfo)) { + opt := WithBackupPeers(load, save) + opt(cfg) +} + +// Bootstrap kicks off IpfsNode bootstrapping. This function will periodically +// check the number of open connections and -- if there are too few -- initiate +// connections to well-known bootstrap peers. It also kicks off subsystem +// bootstrapping (i.e. routing). +func Bootstrap(id peer.ID, host host.Host, rt routing.Routing, cfg BootstrapConfig) (io.Closer, error) { + // make a signal to wait for one bootstrap round to complete. + doneWithRound := make(chan struct{}) + + if len(cfg.BootstrapPeers()) == 0 { + // We *need* to bootstrap but we have no bootstrap peers + // configured *at all*, inform the user. + log.Warn("no bootstrap nodes configured: go-ipfs may have difficulty connecting to the network") + } + + // the periodic bootstrap function -- the connection supervisor + periodic := func(worker goprocess.Process) { + ctx := goprocessctx.OnClosingContext(worker) + + if err := bootstrapRound(ctx, host, cfg); err != nil { + log.Debugf("%s bootstrap error: %s", id, err) + } + + // Exit the first call (triggered independently by `proc.Go`, not `Tick`) + // only after being done with the *single* Routing.Bootstrap call. Following + // periodic calls (`Tick`) will not block on this. + <-doneWithRound + } + + // kick off the node's periodic bootstrapping + proc := periodicproc.Tick(cfg.Period, periodic) + proc.Go(periodic) // run one right now. + + // kick off Routing.Bootstrap + if rt != nil { + ctx := goprocessctx.OnClosingContext(proc) + if err := rt.Bootstrap(ctx); err != nil { + proc.Close() + return nil, err + } + } + + doneWithRound <- struct{}{} + close(doneWithRound) // it no longer blocks periodic + + // If loadBackupBootstrapPeers is not nil then saveBackupBootstrapPeers + // must also not be nil. + if cfg.loadBackupBootstrapPeers != nil { + startSavePeersAsTemporaryBootstrapProc(cfg, host, proc) + } + + return proc, nil +} + +// Aside of the main bootstrap process we also run a secondary one that saves +// connected peers as a backup measure if we can't connect to the official +// bootstrap ones. These peers will serve as *temporary* bootstrap nodes. +func startSavePeersAsTemporaryBootstrapProc(cfg BootstrapConfig, host host.Host, bootstrapProc goprocess.Process) { + savePeersFn := func(worker goprocess.Process) { + ctx := goprocessctx.OnClosingContext(worker) + + if err := saveConnectedPeersAsTemporaryBootstrap(ctx, host, cfg); err != nil { + log.Debugf("saveConnectedPeersAsTemporaryBootstrap error: %s", err) + } + } + savePeersProc := periodicproc.Tick(cfg.BackupBootstrapInterval, savePeersFn) + + // When the main bootstrap process ends also terminate the 'save connected + // peers' ones. Coupling the two seems the easiest way to handle this backup + // process without additional complexity. + go func() { + <-bootstrapProc.Closing() + savePeersProc.Close() + }() + + // Run the first round now (after the first bootstrap process has finished) + // as the SavePeersPeriod can be much longer than bootstrap. + savePeersProc.Go(savePeersFn) +} + +func saveConnectedPeersAsTemporaryBootstrap(ctx context.Context, host host.Host, cfg BootstrapConfig) error { + // Randomize the list of connected peers, we don't prioritize anyone. + connectedPeers := randomizeList(host.Network().Peers()) + + bootstrapPeers := cfg.BootstrapPeers() + backupPeers := make([]peer.AddrInfo, 0, cfg.MaxBackupBootstrapSize) + foundPeers := make(map[peer.ID]struct{}, cfg.MaxBackupBootstrapSize+len(bootstrapPeers)) + + // Don't record bootstrap peers + for _, b := range bootstrapPeers { + foundPeers[b.ID] = struct{}{} + } + + // Choose peers to save and filter out the ones that are already bootstrap nodes. + for _, p := range connectedPeers { + if _, found := foundPeers[p]; found { + continue + } + foundPeers[p] = struct{}{} + + backupPeers = append(backupPeers, peer.AddrInfo{ + ID: p, + Addrs: host.Network().Peerstore().Addrs(p), + }) + + if len(backupPeers) >= cfg.MaxBackupBootstrapSize { + break + } + } + + // If we didn't reach the target number use previously stored connected peers. + if len(backupPeers) < cfg.MaxBackupBootstrapSize { + oldSavedPeers := cfg.loadBackupBootstrapPeers(ctx) + log.Debugf("missing %d peers to reach backup bootstrap target of %d, trying from previous list of %d saved peers", + cfg.MaxBackupBootstrapSize-len(backupPeers), cfg.MaxBackupBootstrapSize, len(oldSavedPeers)) + + // Add some of the old saved peers. Ensure we don't duplicate them. + for _, p := range oldSavedPeers { + if _, found := foundPeers[p.ID]; found { + continue + } + foundPeers[p.ID] = struct{}{} + + backupPeers = append(backupPeers, p) + + if len(backupPeers) >= cfg.MaxBackupBootstrapSize { + break + } + } + } + + cfg.saveBackupBootstrapPeers(ctx, backupPeers) + log.Debugf("saved %d peers (of %d target) as bootstrap backup in the config", len(backupPeers), cfg.MaxBackupBootstrapSize) + return nil +} + +// Connect to as many peers needed to reach the BootstrapConfig.MinPeerThreshold. +// Peers can be original bootstrap or temporary ones (drawn from a list of +// persisted previously connected peers). +func bootstrapRound(ctx context.Context, host host.Host, cfg BootstrapConfig) error { + ctx, cancel := context.WithTimeout(ctx, cfg.ConnectionTimeout) + defer cancel() + id := host.ID() + + // get bootstrap peers from config. retrieving them here makes + // sure we remain observant of changes to client configuration. + peers := cfg.BootstrapPeers() + // determine how many bootstrap connections to open + connected := host.Network().Peers() + if len(connected) >= cfg.MinPeerThreshold { + log.Debugf("%s core bootstrap skipped -- connected to %d (> %d) nodes", + id, len(connected), cfg.MinPeerThreshold) + return nil + } + numToDial := cfg.MinPeerThreshold - len(connected) // numToDial > 0 + + if len(peers) > 0 { + numToDial -= int(peersConnect(ctx, host, peers, numToDial, true)) + if numToDial <= 0 { + return nil + } + } + + if cfg.loadBackupBootstrapPeers == nil { + log.Debugf("not enough bootstrap peers to fill the remaining target of %d connections", numToDial) + return ErrNotEnoughBootstrapPeers + } + + log.Debugf("not enough bootstrap peers to fill the remaining target of %d connections, trying backup list", numToDial) + + tempBootstrapPeers := cfg.loadBackupBootstrapPeers(ctx) + if len(tempBootstrapPeers) > 0 { + numToDial -= int(peersConnect(ctx, host, tempBootstrapPeers, numToDial, false)) + if numToDial <= 0 { + return nil + } + } + + log.Debugf("tried both original bootstrap peers and temporary ones but still missing target of %d connections", numToDial) + + return ErrNotEnoughBootstrapPeers +} + +// Attempt to make `needed` connections from the `availablePeers` list. Mark +// peers as either `permanent` or temporary when adding them to the Peerstore. +// Return the number of connections completed. We eagerly over-connect in parallel, +// so we might connect to more than needed. +// (We spawn as many routines and attempt connections as the number of availablePeers, +// but this list comes from restricted sets of original or temporary bootstrap +// nodes which will keep it under a sane value.) +func peersConnect(ctx context.Context, ph host.Host, availablePeers []peer.AddrInfo, needed int, permanent bool) uint64 { + peers := randomizeList(availablePeers) + + // Monitor the number of connections and stop if we reach the target. + var connected uint64 + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + if int(atomic.LoadUint64(&connected)) >= needed { + cancel() + return + } + } + } + }() + + var wg sync.WaitGroup + for _, p := range peers { + + // performed asynchronously because when performed synchronously, if + // one `Connect` call hangs, subsequent calls are more likely to + // fail/abort due to an expiring context. + // Also, performed asynchronously for dial speed. + + if int(atomic.LoadUint64(&connected)) >= needed { + cancel() + break + } + + wg.Add(1) + go func(p peer.AddrInfo) { + defer wg.Done() + + // Skip addresses belonging to a peer we're already connected to. + // (Not a guarantee but a best-effort policy.) + if ph.Network().Connectedness(p.ID) == network.Connected { + return + } + log.Debugf("%s bootstrapping to %s", ph.ID(), p.ID) + + if err := ph.Connect(ctx, p); err != nil { + if ctx.Err() != context.Canceled { + log.Debugf("failed to bootstrap with %v: %s", p.ID, err) + } + return + } + if permanent { + // We're connecting to an original bootstrap peer, mark it as + // a permanent address (Connect will register it as TempAddrTTL). + ph.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL) + } + + log.Infof("bootstrapped with %v", p.ID) + atomic.AddUint64(&connected, 1) + }(p) + } + wg.Wait() + + return connected +} + +func randomizeList[T any](in []T) []T { + out := make([]T, len(in)) + for i, val := range rand.Perm(len(in)) { + out[i] = in[val] + } + return out +} diff --git a/bootstrap/bootstrap_test.go b/bootstrap/bootstrap_test.go new file mode 100644 index 000000000..d933379d4 --- /dev/null +++ b/bootstrap/bootstrap_test.go @@ -0,0 +1,139 @@ +package bootstrap + +import ( + "context" + "crypto/rand" + "reflect" + "testing" + "time" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/test" +) + +func TestRandomizeAddressList(t *testing.T) { + var ps []peer.AddrInfo + sizeofSlice := 10 + for i := 0; i < sizeofSlice; i++ { + pid, err := test.RandPeerID() + if err != nil { + t.Fatal(err) + } + + ps = append(ps, peer.AddrInfo{ID: pid}) + } + out := randomizeList(ps) + if len(out) != len(ps) { + t.Fail() + } +} + +func TestLoadAndSaveOptions(t *testing.T) { + loadFunc := func(_ context.Context) []peer.AddrInfo { return nil } + saveFunc := func(_ context.Context, _ []peer.AddrInfo) {} + + bootCfg := BootstrapConfigWithPeers(nil, WithBackupPeers(loadFunc, saveFunc)) + load, save := bootCfg.BackupPeers() + if load == nil { + t.Fatal("load function not assigned") + } + if reflect.ValueOf(load).Pointer() != reflect.ValueOf(loadFunc).Pointer() { + t.Fatal("load not assigned correct function") + } + if save == nil { + t.Fatal("save function not assigned") + } + if reflect.ValueOf(save).Pointer() != reflect.ValueOf(saveFunc).Pointer() { + t.Fatal("save not assigned correct function") + } + + assertPanics(t, "with only load func", func() { + BootstrapConfigWithPeers(nil, WithBackupPeers(loadFunc, nil)) + }) + + assertPanics(t, "with only save func", func() { + BootstrapConfigWithPeers(nil, WithBackupPeers(nil, saveFunc)) + }) + + bootCfg = BootstrapConfigWithPeers(nil, WithBackupPeers(nil, nil)) + load, save = bootCfg.BackupPeers() + if load != nil || save != nil { + t.Fatal("load and save functions should both be nil") + } +} + +func TestSetBackupPeers(t *testing.T) { + loadFunc := func(_ context.Context) []peer.AddrInfo { return nil } + saveFunc := func(_ context.Context, _ []peer.AddrInfo) {} + + bootCfg := DefaultBootstrapConfig + bootCfg.SetBackupPeers(loadFunc, saveFunc) + load, save := bootCfg.BackupPeers() + if load == nil { + t.Fatal("load function not assigned") + } + if reflect.ValueOf(load).Pointer() != reflect.ValueOf(loadFunc).Pointer() { + t.Fatal("load not assigned correct function") + } + if save == nil { + t.Fatal("save function not assigned") + } + if reflect.ValueOf(save).Pointer() != reflect.ValueOf(saveFunc).Pointer() { + t.Fatal("save not assigned correct function") + } + + assertPanics(t, "with only load func", func() { + bootCfg.SetBackupPeers(loadFunc, nil) + }) + + assertPanics(t, "with only save func", func() { + bootCfg.SetBackupPeers(nil, saveFunc) + }) + + bootCfg.SetBackupPeers(nil, nil) + load, save = bootCfg.BackupPeers() + if load != nil || save != nil { + t.Fatal("load and save functions should both be nil") + } +} + +func TestNoTempPeersLoadAndSave(t *testing.T) { + period := 500 * time.Millisecond + bootCfg := BootstrapConfigWithPeers(nil) + bootCfg.MinPeerThreshold = 2 + bootCfg.Period = period + + priv, pub, err := crypto.GenerateEd25519Key(rand.Reader) + if err != nil { + t.Fatal(err) + } + peerID, err := peer.IDFromPublicKey(pub) + if err != nil { + t.Fatal(err) + } + p2pHost, err := libp2p.New(libp2p.Identity(priv)) + if err != nil { + t.Fatal(err) + } + + bootstrapper, err := Bootstrap(peerID, p2pHost, nil, bootCfg) + if err != nil { + t.Fatal(err) + } + + time.Sleep(4 * period) + bootstrapper.Close() + +} + +func assertPanics(t *testing.T, name string, f func()) { + defer func() { + if r := recover(); r == nil { + t.Errorf("%s: did not panic as expected", name) + } + }() + + f() +} diff --git a/go.sum b/go.sum index 8beabe300..25af3632d 100644 --- a/go.sum +++ b/go.sum @@ -307,6 +307,7 @@ github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOan github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd h1:gMlw/MhNr2Wtp5RwGdsW23cs+yCuj9k2ON7i9MiJlRo= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= +github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk= github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=