diff --git a/routing/http/client/client.go b/routing/http/client/client.go index 173c20909..f20b9327b 100644 --- a/routing/http/client/client.go +++ b/routing/http/client/client.go @@ -12,6 +12,7 @@ import ( "github.com/benbjohnson/clock" "github.com/ipfs/go-cid" ipns "github.com/ipfs/go-ipns" + "github.com/ipfs/go-libipfs/routing/http/contentrouter" "github.com/ipfs/go-libipfs/routing/http/internal/drjson" "github.com/ipfs/go-libipfs/routing/http/server" "github.com/ipfs/go-libipfs/routing/http/types" @@ -39,6 +40,8 @@ type client struct { afterSignCallback func(req *types.WriteBitswapProviderRecord) } +var _ contentrouter.Client = &client{} + type httpClient interface { Do(req *http.Request) (*http.Response, error) } diff --git a/routing/http/contentrouter/contentrouter.go b/routing/http/contentrouter/contentrouter.go index b8dd54bd3..36707220e 100644 --- a/routing/http/contentrouter/contentrouter.go +++ b/routing/http/contentrouter/contentrouter.go @@ -2,13 +2,16 @@ package contentrouter import ( "context" + "reflect" "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-libipfs/routing/http/internal" + "github.com/ipfs/go-libipfs/routing/http/types" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" + "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multihash" ) @@ -16,14 +19,13 @@ var logger = logging.Logger("service/contentrouting") const ttl = 24 * time.Hour -type client interface { - Provide(context.Context, []cid.Cid, time.Duration) (time.Duration, error) - FindProviders(context.Context, cid.Cid) ([]peer.AddrInfo, error) - Ready(context.Context) (bool, error) +type Client interface { + ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) + FindProviders(ctx context.Context, key cid.Cid) ([]types.ProviderResponse, error) } type contentRouter struct { - client client + client Client maxProvideConcurrency int maxProvideBatchSize int } @@ -44,7 +46,7 @@ func WithMaxProvideBatchSize(max int) option { } } -func NewContentRoutingClient(c client, opts ...option) *contentRouter { +func NewContentRoutingClient(c Client, opts ...option) *contentRouter { cr := &contentRouter{ client: c, maxProvideConcurrency: 5, @@ -64,7 +66,7 @@ func (c *contentRouter) Provide(ctx context.Context, key cid.Cid, announce bool) return nil } - _, err := c.client.Provide(ctx, []cid.Cid{key}, ttl) + _, err := c.client.ProvideBitswap(ctx, []cid.Cid{key}, ttl) return err } @@ -78,7 +80,7 @@ func (c *contentRouter) ProvideMany(ctx context.Context, mhKeys []multihash.Mult } if len(keys) <= c.maxProvideBatchSize { - _, err := c.client.Provide(ctx, keys, ttl) + _, err := c.client.ProvideBitswap(ctx, keys, ttl) return err } @@ -88,23 +90,15 @@ func (c *contentRouter) ProvideMany(ctx context.Context, mhKeys []multihash.Mult c.maxProvideConcurrency, keys, func(ctx context.Context, batch []cid.Cid) error { - _, err := c.client.Provide(ctx, batch, ttl) + _, err := c.client.ProvideBitswap(ctx, batch, ttl) return err }, ) } -// Ready is part of the existing `ProvideMany` interface, but can be used more generally to determine if the routing client -// has a working connection. +// Ready is part of the existing `ProvideMany` interface. func (c *contentRouter) Ready() bool { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - ready, err := c.client.Ready(ctx) - if err != nil { - logger.Warnw("error checking if delegated content router is ready", "Error", err) - return false - } - return ready + return true } func (c *contentRouter) FindProvidersAsync(ctx context.Context, key cid.Cid, numResults int) <-chan peer.AddrInfo { @@ -118,7 +112,28 @@ func (c *contentRouter) FindProvidersAsync(ctx context.Context, key cid.Cid, num ch := make(chan peer.AddrInfo, len(results)) for _, r := range results { - ch <- r + if r.GetProtocol() == types.BitswapProviderID { + result, ok := r.(*types.ReadBitswapProviderRecord) + if !ok { + logger.Errorw( + "problem casting find providers result", + "ProtocolID", types.BitswapProviderID, + "Type", reflect.TypeOf(r).String(), + ) + continue + } + + var addrs []multiaddr.Multiaddr + for _, a := range result.Addrs { + addrs = append(addrs, a.Multiaddr) + } + + ch <- peer.AddrInfo{ + ID: *result.ID, + Addrs: addrs, + } + } + } close(ch) return ch diff --git a/routing/http/contentrouter/contentrouter_test.go b/routing/http/contentrouter/contentrouter_test.go index 93dd1efd9..c5e53bd17 100644 --- a/routing/http/contentrouter/contentrouter_test.go +++ b/routing/http/contentrouter/contentrouter_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/ipfs/go-cid" + "github.com/ipfs/go-libipfs/routing/http/types" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/assert" @@ -16,13 +17,13 @@ import ( type mockClient struct{ mock.Mock } -func (m *mockClient) Provide(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) { +func (m *mockClient) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) { args := m.Called(ctx, keys, ttl) return args.Get(0).(time.Duration), args.Error(1) } -func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) ([]peer.AddrInfo, error) { +func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) ([]types.ProviderResponse, error) { args := m.Called(ctx, key) - return args.Get(0).([]peer.AddrInfo), args.Error(1) + return args.Get(0).([]types.ProviderResponse), args.Error(1) } func (m *mockClient) Ready(ctx context.Context) (bool, error) { args := m.Called(ctx) @@ -66,14 +67,14 @@ func TestProvide(t *testing.T) { crc := NewContentRoutingClient(client) if !c.expNotProvided { - client.On("Provide", ctx, []cid.Cid{key}, ttl).Return(time.Minute, nil) + client.On("ProvideBitswap", ctx, []cid.Cid{key}, ttl).Return(time.Minute, nil) } err := crc.Provide(ctx, key, c.announce) assert.NoError(t, err) if c.expNotProvided { - client.AssertNumberOfCalls(t, "Provide", 0) + client.AssertNumberOfCalls(t, "ProvideBitswap", 0) } }) @@ -90,7 +91,7 @@ func TestProvideMany(t *testing.T) { client := &mockClient{} crc := NewContentRoutingClient(client) - client.On("Provide", ctx, cids, ttl).Return(time.Minute, nil) + client.On("ProvideBitswap", ctx, cids, ttl).Return(time.Minute, nil) err := crc.ProvideMany(ctx, mhs) require.NoError(t, err) @@ -102,9 +103,20 @@ func TestFindProvidersAsync(t *testing.T) { client := &mockClient{} crc := NewContentRoutingClient(client) - ais := []peer.AddrInfo{ - {ID: peer.ID("peer1")}, - {ID: peer.ID("peer2")}, + p1 := peer.ID("peer1") + p2 := peer.ID("peer2") + ais := []types.ProviderResponse{ + &types.ReadBitswapProviderRecord{ + Protocol: types.BitswapProviderID, + ID: &p1, + }, + &types.ReadBitswapProviderRecord{ + Protocol: types.BitswapProviderID, + ID: &p2, + }, + &types.UnknownProviderRecord{ + Protocol: "UNKNOWN", + }, } client.On("FindProviders", ctx, key).Return(ais, nil) @@ -116,5 +128,10 @@ func TestFindProvidersAsync(t *testing.T) { actualAIs = append(actualAIs, ai) } - require.Equal(t, ais, actualAIs) + expected := []peer.AddrInfo{ + {ID: p1}, + {ID: p2}, + } + + require.Equal(t, expected, actualAIs) }