Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify engine internals #3010

Merged
merged 5 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 34 additions & 47 deletions pkg/local_object_storage/engine/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,32 @@
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) ContainerSize(cnr cid.ID) (uint64, error) {
var (
err error
size uint64
)
if e.metrics != nil {
defer elapsed(e.metrics.AddEstimateContainerSizeDuration)()
}

err = e.execIfNotBlocked(func() error {
size, err = e.containerSize(cnr)
return err
})
e.blockMtx.RLock()
defer e.blockMtx.RUnlock()

Check warning on line 24 in pkg/local_object_storage/engine/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/container.go#L23-L24

Added lines #L23 - L24 were not covered by tests

return size, err
}
if e.blockErr != nil {
return 0, e.blockErr
}

Check warning on line 28 in pkg/local_object_storage/engine/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/container.go#L26-L28

Added lines #L26 - L28 were not covered by tests

func (e *StorageEngine) containerSize(cnr cid.ID) (uint64, error) {
var size uint64

e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
for _, sh := range e.unsortedShards() {

Check warning on line 32 in pkg/local_object_storage/engine/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/container.go#L32

Added line #L32 was not covered by tests
var csPrm shard.ContainerSizePrm
csPrm.SetContainerID(cnr)

csRes, err := sh.Shard.ContainerSize(csPrm)
if err != nil {
e.reportShardError(sh, "can't get container size", err,
zap.Stringer("container_id", cnr))
return false
continue

Check warning on line 40 in pkg/local_object_storage/engine/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/container.go#L40

Added line #L40 was not covered by tests
}

size += csRes.Size()

return false
})
}

return size, nil
}
Expand All @@ -58,40 +50,32 @@
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) ListContainers() ([]cid.ID, error) {
var (
res []cid.ID
err error
)
if e.metrics != nil {
defer elapsed(e.metrics.AddListContainersDuration)()
}

err = e.execIfNotBlocked(func() error {
res, err = e.listContainers()
return err
})
e.blockMtx.RLock()
defer e.blockMtx.RUnlock()

Check warning on line 58 in pkg/local_object_storage/engine/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/container.go#L57-L58

Added lines #L57 - L58 were not covered by tests

return res, err
}
if e.blockErr != nil {
return nil, e.blockErr
}

Check warning on line 62 in pkg/local_object_storage/engine/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/container.go#L60-L62

Added lines #L60 - L62 were not covered by tests

func (e *StorageEngine) listContainers() ([]cid.ID, error) {
uniqueIDs := make(map[cid.ID]struct{})

e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
for _, sh := range e.unsortedShards() {

Check warning on line 66 in pkg/local_object_storage/engine/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/container.go#L66

Added line #L66 was not covered by tests
res, err := sh.Shard.ListContainers(shard.ListContainersPrm{})
if err != nil {
e.reportShardError(sh, "can't get list of containers", err)
return false
continue

Check warning on line 70 in pkg/local_object_storage/engine/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/container.go#L70

Added line #L70 was not covered by tests
}

for _, cnr := range res.Containers() {
if _, ok := uniqueIDs[cnr]; !ok {
uniqueIDs[cnr] = struct{}{}
}
}

return false
})
}

result := make([]cid.ID, 0, len(uniqueIDs))
for cnr := range uniqueIDs {
Expand All @@ -103,27 +87,30 @@

// DeleteContainer deletes container's objects that engine stores.
func (e *StorageEngine) DeleteContainer(ctx context.Context, cID cid.ID) error {
return e.execIfNotBlocked(func() error {
var wg errgroup.Group
e.blockMtx.RLock()
defer e.blockMtx.RUnlock()

if e.blockErr != nil {
return e.blockErr
}

Check warning on line 95 in pkg/local_object_storage/engine/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/container.go#L90-L95

Added lines #L90 - L95 were not covered by tests

e.iterateOverUnsortedShards(func(hs hashedShard) bool {
wg.Go(func() error {
err := hs.Shard.DeleteContainer(ctx, cID)
if err != nil {
err = fmt.Errorf("container cleanup in %s shard: %w", hs.ID(), err)
e.log.Warn("container cleanup", zap.Error(err))
var wg errgroup.Group

Check warning on line 97 in pkg/local_object_storage/engine/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/container.go#L97

Added line #L97 was not covered by tests

return err
}
for _, sh := range e.unsortedShards() {
wg.Go(func() error {
err := sh.Shard.DeleteContainer(ctx, cID)
if err != nil {
err = fmt.Errorf("container cleanup in %s shard: %w", sh.ID(), err)
e.log.Warn("container cleanup", zap.Error(err))

Check warning on line 104 in pkg/local_object_storage/engine/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/container.go#L99-L104

Added lines #L99 - L104 were not covered by tests

return nil
})
return err
}

Check warning on line 107 in pkg/local_object_storage/engine/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/container.go#L106-L107

Added lines #L106 - L107 were not covered by tests

return false
return nil

Check warning on line 109 in pkg/local_object_storage/engine/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/container.go#L109

Added line #L109 was not covered by tests
})
}

return wg.Wait()
})
return wg.Wait()

Check warning on line 113 in pkg/local_object_storage/engine/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/container.go#L113

Added line #L113 was not covered by tests
}

func (e *StorageEngine) deleteNotFoundContainers() error {
Expand Down
25 changes: 5 additions & 20 deletions pkg/local_object_storage/engine/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,38 +99,23 @@ func (e *StorageEngine) close(releasePools bool) error {
return nil
}

// executes op if execution is not blocked, otherwise returns blocking error.
//
// Can be called concurrently with setBlockExecErr.
func (e *StorageEngine) execIfNotBlocked(op func() error) error {
e.blockExec.mtx.RLock()
defer e.blockExec.mtx.RUnlock()

if e.blockExec.err != nil {
return e.blockExec.err
}

return op()
}

// sets the flag of blocking execution of all data operations according to err:
// - err != nil, then blocks the execution. If exec wasn't blocked, calls close method
// (if err == errClosed => additionally releases pools and does not allow to resume executions).
// - otherwise, resumes execution. If exec was blocked, calls open method.
//
// Can be called concurrently with exec. In this case it waits for all executions to complete.
func (e *StorageEngine) setBlockExecErr(err error) error {
e.blockExec.mtx.Lock()
defer e.blockExec.mtx.Unlock()
e.blockMtx.Lock()
defer e.blockMtx.Unlock()

prevErr := e.blockExec.err
prevErr := e.blockErr

wasClosed := errors.Is(prevErr, errClosed)
if wasClosed {
if errors.Is(prevErr, errClosed) {
return errClosed
}

e.blockExec.err = err
e.blockErr = err

if err == nil {
if prevErr != nil { // block -> ok
Expand Down
10 changes: 7 additions & 3 deletions pkg/local_object_storage/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
defer elapsed(e.metrics.AddDeleteDuration)()
}

return e.execIfNotBlocked(func() error {
return e.deleteObj(addr, true)
})
e.blockMtx.RLock()
defer e.blockMtx.RUnlock()

if e.blockErr != nil {
return e.blockErr
}

Check warning on line 23 in pkg/local_object_storage/engine/delete.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/delete.go#L22-L23

Added lines #L22 - L23 were not covered by tests
return e.deleteObj(addr, true)
}

func (e *StorageEngine) deleteObj(addr oid.Address, force bool) error {
Expand Down
9 changes: 3 additions & 6 deletions pkg/local_object_storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,8 @@ type StorageEngine struct {
setModeCh chan setModeRequest
wg sync.WaitGroup

blockExec struct {
mtx sync.RWMutex

err error
}
blockMtx sync.RWMutex
blockErr error
}

type shardWrapper struct {
Expand Down Expand Up @@ -137,7 +134,7 @@ func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err er
// reportShardError checks that the amount of errors doesn't exceed the configured threshold.
// If it does, shard is set to read-only mode.
func (e *StorageEngine) reportShardError(
sh hashedShard,
sh shardWrapper,
msg string,
err error,
fields ...zap.Field) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/local_object_storage/engine/evacuate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
const defaultEvacuateBatchSize = 100

type pooledShard struct {
hashedShard
shardWrapper
pool util.WorkerPool
}

Expand Down Expand Up @@ -63,8 +63,8 @@ func (e *StorageEngine) Evacuate(shardIDs []*shard.ID, ignoreErrors bool, faultH
shards := make([]pooledShard, 0, len(e.shards))
for id := range e.shards {
shards = append(shards, pooledShard{
hashedShard: hashedShard(e.shards[id]),
pool: e.shardPools[id],
shardWrapper: e.shards[id],
pool: e.shardPools[id],
})
}
e.mtx.RUnlock()
Expand Down Expand Up @@ -125,7 +125,7 @@ mainLoop:
if _, ok := shardMap[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, getRes.Object(), nil, 0)
putDone, exists := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, getRes.Object(), nil, 0)
if putDone || exists {
if putDone {
e.log.Debug("object is moved to another shard",
Expand Down
28 changes: 8 additions & 20 deletions pkg/local_object_storage/engine/exists.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,33 @@
func (e *StorageEngine) exists(addr oid.Address) (bool, error) {
var shPrm shard.ExistsPrm
shPrm.SetAddress(addr)
alreadyRemoved := false
exists := false

e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
for _, sh := range e.sortedShards(addr) {
res, err := sh.Exists(shPrm)
if err != nil {
if shard.IsErrRemoved(err) {
alreadyRemoved = true

return true
return false, apistatus.ObjectAlreadyRemoved{}

Check warning on line 20 in pkg/local_object_storage/engine/exists.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/exists.go#L20

Added line #L20 was not covered by tests
}

var siErr *objectSDK.SplitInfoError
if errors.As(err, &siErr) {
return true
return false, nil

Check warning on line 25 in pkg/local_object_storage/engine/exists.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/exists.go#L25

Added line #L25 was not covered by tests
}

if shard.IsErrObjectExpired(err) {
return true
return false, nil

Check warning on line 29 in pkg/local_object_storage/engine/exists.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/exists.go#L29

Added line #L29 was not covered by tests
}

if !shard.IsErrNotFound(err) {
e.reportShardError(sh, "could not check existence of object in shard", err)
}
return false
continue

Check warning on line 35 in pkg/local_object_storage/engine/exists.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/exists.go#L35

Added line #L35 was not covered by tests
}

if !exists {
exists = res.Exists()
if res.Exists() {
return true, nil

Check warning on line 39 in pkg/local_object_storage/engine/exists.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/exists.go#L39

Added line #L39 was not covered by tests
}

return false
})

if alreadyRemoved {
var errRemoved apistatus.ObjectAlreadyRemoved

return false, errRemoved
}

return exists, nil
return false, nil
}
Loading
Loading