Skip to content

Commit

Permalink
blockservice & exchange & bitswap: add non variadic NotifyNewBlock
Browse files Browse the repository at this point in the history
Variadicts in go are just syntactic sugar around passing a slice, that
means all go memory reachability rules apply, this force the compiler to
heap allocate the variadic slice for virtual call, because the
implementation is allowed to leak the slice (and go's interprocedural
optimisations do not cover virtuals).

Passing a block without variadic will pass the itab either on the stack
or decomposed through registers. Skipping having to allocate a slice.
  • Loading branch information
Jorropo authored and gammazero committed Sep 20, 2024
1 parent 171b0b7 commit 4f7528b
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 7 deletions.
7 changes: 7 additions & 0 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc
return bs
}

func (bs *Bitswap) NotifyNewBlock(ctx context.Context, blk blocks.Block) error {
return multierr.Combine(
bs.Client.NotifyNewBlock(ctx, blk),
bs.Server.NotifyNewBlock(ctx, blk),
)
}

func (bs *Bitswap) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) error {
return multierr.Combine(
bs.Client.NotifyNewBlocks(ctx, blks...),
Expand Down
10 changes: 10 additions & 0 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,16 @@ func (bs *Client) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.
return session.GetBlocks(ctx, keys)
}

// NotifyNewBlock announces the existence of blocks to this bitswap service.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
// that those blocks are available in the blockstore before calling this function.
func (bs *Client) NotifyNewBlock(ctx context.Context, blk blocks.Block) error {
// Call to the variadic to avoid code duplication.
// This is actually fine to do because no calls is virtual the compiler is able
// to see that the slice does not leak and the slice is stack allocated.
return bs.NotifyNewBlocks(ctx, blk)
}

// NotifyNewBlocks announces the existence of blocks to this bitswap service.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
// that those blocks are available in the blockstore before calling this function.
Expand Down
11 changes: 11 additions & 0 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,17 @@ func (bs *Server) Stat() (Stat, error) {
return s, nil
}

// NotifyNewBlock announces the existence of block to this bitswap service. The
// service will potentially notify its peers.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
// that those blocks are available in the blockstore before calling this function.
func (bs *Server) NotifyNewBlock(ctx context.Context, blk blocks.Block) error {
// Call to the variadic to avoid code duplication.
// This is actually fine to do because no calls is virtual the compiler is able
// to see that the slice does not leak and the slice is stack allocated.
return bs.NotifyNewBlocks(ctx, blk)
}

// NotifyNewBlocks announces the existence of blocks to this bitswap service. The
// service will potentially notify its peers.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
Expand Down
16 changes: 9 additions & 7 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error {
logger.Debugf("BlockService.BlockAdded %s", c)

if s.exchange != nil {
if err := s.exchange.NotifyNewBlocks(ctx, o); err != nil {
logger.Errorf("NotifyNewBlocks: %s", err.Error())
if err := s.exchange.NotifyNewBlock(ctx, o); err != nil {
logger.Errorf("NotifyNewBlock: %s", err.Error())
}
}

Expand Down Expand Up @@ -282,7 +282,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
return nil, err
}
if ex := bs.Exchange(); ex != nil {
err = ex.NotifyNewBlocks(ctx, blk)
err = ex.NotifyNewBlock(ctx, blk)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -364,7 +364,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
}

ex := blockservice.Exchange()
var cache [1]blocks.Block // preallocate once for all iterations
for {
var b blocks.Block
select {
Expand All @@ -386,13 +385,11 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet

if ex != nil {
// inform the exchange that the blocks are available
cache[0] = b
err = ex.NotifyNewBlocks(ctx, cache[:]...)
err = ex.NotifyNewBlock(ctx, b)
if err != nil {
logger.Errorf("could not tell the exchange about new blocks: %s", err)
return
}
cache[0] = nil // early gc
}

select {
Expand Down Expand Up @@ -425,6 +422,11 @@ func (s *blockService) Close() error {
return s.exchange.Close()
}

type notifier interface {
NotifyNewBlock(context.Context, blocks.Block) error
NotifyNewBlocks(context.Context, ...blocks.Block) error
}

// Session is a helper type to provide higher level access to bitswap sessions
type Session struct {
createSession sync.Once
Expand Down
5 changes: 5 additions & 0 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ type notifyCountingExchange struct {
notifyCount int
}

func (n *notifyCountingExchange) NotifyNewBlock(ctx context.Context, blocks blocks.Block) error {
n.notifyCount++
return n.Interface.NotifyNewBlock(ctx, blocks)
}

func (n *notifyCountingExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
n.notifyCount += len(blocks)
return n.Interface.NotifyNewBlocks(ctx, blocks...)
Expand Down
2 changes: 2 additions & 0 deletions exchange/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
type Interface interface { // type Exchanger interface
Fetcher

// NotifyNewBlock tells the exchange that a new block is available and can be served.
NotifyNewBlock(ctx context.Context, blocks blocks.Block) error
// NotifyNewBlocks tells the exchange that new blocks are available and can be served.
NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error

Expand Down
6 changes: 6 additions & 0 deletions exchange/offline/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ func (e *offlineExchange) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block
return blk, err
}

// NotifyNewBlock tells the exchange that a new block is available and can be served.
func (e *offlineExchange) NotifyNewBlock(ctx context.Context, block blocks.Block) error {
// as an offline exchange we have nothing to do
return nil
}

// NotifyNewBlocks tells the exchange that new blocks are available and can be served.
func (e *offlineExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
// as an offline exchange we have nothing to do
Expand Down

0 comments on commit 4f7528b

Please sign in to comment.