Skip to content

Commit

Permalink
shard: drop more of context from local stores
Browse files Browse the repository at this point in the history
Similar to edc447f, we can't have any
meaninful context here and current context checks are broken. Fixes #1911,
the last case of

  Error in `cmd/neofs-node/config.go`: `Function `Reload->Init->init->listenEvents` should pass the context parameter`.

goes away with this.

Signed-off-by: Roman Khimov <roman@nspcc.ru>
  • Loading branch information
roman-khimov committed Sep 23, 2024
1 parent a0447ce commit 163c154
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 39 deletions.
14 changes: 3 additions & 11 deletions pkg/local_object_storage/engine/inhume.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package engine

import (
"context"
"errors"

meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
Expand Down Expand Up @@ -324,7 +323,7 @@ func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) {
return locked, outErr
}

func (e *StorageEngine) processExpiredObjects(_ context.Context, addrs []oid.Address) {
func (e *StorageEngine) processExpiredObjects(addrs []oid.Address) {
var prm InhumePrm
prm.MarkAsGarbage(addrs...)

Expand All @@ -334,17 +333,10 @@ func (e *StorageEngine) processExpiredObjects(_ context.Context, addrs []oid.Add
}
}

func (e *StorageEngine) processExpiredLocks(ctx context.Context, lockers []oid.Address) {
func (e *StorageEngine) processExpiredLocks(lockers []oid.Address) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleExpiredLocks(lockers)

select {
case <-ctx.Done():
e.log.Info("interrupt processing the expired locks by context")
return true
default:
return false
}
return false
})
}

Expand Down
36 changes: 14 additions & 22 deletions pkg/local_object_storage/shard/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func EventNewEpoch(e uint64) Event {
}
}

type eventHandler func(context.Context, Event)
type eventHandler func(Event)

type eventHandlers struct {
prevGroup sync.WaitGroup
Expand Down Expand Up @@ -117,16 +117,13 @@ func (gc *gc) listenEvents() {
v.cancelFunc()
v.prevGroup.Wait()

var ctx context.Context
ctx, v.cancelFunc = context.WithCancel(context.Background())

v.prevGroup.Add(len(v.handlers))

for i := range v.handlers {
h := v.handlers[i]

err := gc.workerPool.Submit(func() {
h(ctx, event)
h(event)
v.prevGroup.Done()
})
if err != nil {
Expand Down Expand Up @@ -220,13 +217,13 @@ func (s *Shard) removeGarbage() {
}
}

func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
func (s *Shard) collectExpiredObjects(e Event) {
epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch))

log.Debug("started expired objects handling")

expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool {
expired, err := s.getExpiredObjects(e.(newEpoch).epoch, func(typ object.Type) bool {
return typ != object.TypeLock
})
if err != nil || len(expired) == 0 {
Expand All @@ -238,12 +235,12 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {

log.Debug("collected expired objects", zap.Int("num", len(expired)))

s.expiredObjectsCallback(ctx, expired)
s.expiredObjectsCallback(expired)

log.Debug("finished expired objects handling")
}

func (s *Shard) collectExpiredTombstones(_ context.Context, e Event) {
func (s *Shard) collectExpiredTombstones(e Event) {
epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch))

Expand All @@ -258,8 +255,8 @@ func (s *Shard) collectExpiredTombstones(_ context.Context, e Event) {
log.Debug("finished expired tombstones handling", zap.Int("dropped marks", dropped))
}

func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool {
func (s *Shard) collectExpiredLocks(e Event) {
expired, err := s.getExpiredObjects(e.(newEpoch).epoch, func(typ object.Type) bool {
return typ == object.TypeLock
})
if err != nil || len(expired) == 0 {
Expand All @@ -269,10 +266,10 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
return
}

s.expiredLocksCallback(ctx, expired)
s.expiredLocksCallback(expired)
}

func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond func(object.Type) bool) ([]oid.Address, error) {
func (s *Shard) getExpiredObjects(epoch uint64, typeCond func(object.Type) bool) ([]oid.Address, error) {
s.m.RLock()
defer s.m.RUnlock()

Expand All @@ -283,20 +280,15 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond fu
var expired []oid.Address

err := s.metaBase.IterateExpired(epoch, func(expiredObject *meta.ExpiredObject) error {
select {
case <-ctx.Done():
return meta.ErrInterruptIterator
default:
if typeCond(expiredObject.Type()) {
expired = append(expired, expiredObject.Address())
}
return nil
if typeCond(expiredObject.Type()) {
expired = append(expired, expiredObject.Address())
}
return nil
})
if err != nil {
return nil, err
}
return expired, ctx.Err()
return expired, nil
}

// HandleExpiredLocks unlocks all objects which were locked by lockers.
Expand Down
5 changes: 2 additions & 3 deletions pkg/local_object_storage/shard/gc_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package shard_test

import (
"context"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -64,7 +63,7 @@ func TestGC_ExpiredObjectWithExpiredLock(t *testing.T) {
shard.WithDeletedLockCallback(func(aa []oid.Address) {
sh.HandleDeletedLocks(aa)
}),
shard.WithExpiredLocksCallback(func(_ context.Context, aa []oid.Address) {
shard.WithExpiredLocksCallback(func(aa []oid.Address) {
sh.HandleExpiredLocks(aa)
}),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
Expand Down Expand Up @@ -205,7 +204,7 @@ func TestExpiration(t *testing.T) {
meta.WithEpochState(epochState{Value: math.MaxUint64 / 2}),
),
shard.WithExpiredObjectsCallback(
func(_ context.Context, addresses []oid.Address) {
func(addresses []oid.Address) {
var p shard.InhumePrm
p.MarkAsGarbage(addresses...)
_, err := sh.Inhume(p)
Expand Down
5 changes: 2 additions & 3 deletions pkg/local_object_storage/shard/shard.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package shard

import (
"context"
"sync"
"time"

Expand Down Expand Up @@ -34,10 +33,10 @@ type Shard struct {
type Option func(*cfg)

// ExpiredTombstonesCallback is a callback handling list of expired tombstones.
type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject)
type ExpiredTombstonesCallback func([]meta.TombstonedObject)

// ExpiredObjectsCallback is a callback handling list of expired objects.
type ExpiredObjectsCallback func(context.Context, []oid.Address)
type ExpiredObjectsCallback func([]oid.Address)

// DeletedLockCallback is a callback handling list of deleted LOCK objects.
type DeletedLockCallback func([]oid.Address)
Expand Down

0 comments on commit 163c154

Please sign in to comment.