Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify engine internals #3010

Merged
merged 5 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 10 additions & 16 deletions pkg/local_object_storage/engine/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,19 @@ func (e *StorageEngine) ContainerSize(cnr cid.ID) (uint64, error) {

var size uint64

e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) {
for _, sh := range e.unsortedShards() {
var csPrm shard.ContainerSizePrm
csPrm.SetContainerID(cnr)

csRes, err := sh.Shard.ContainerSize(csPrm)
if err != nil {
e.reportShardError(sh, "can't get container size", err,
zap.Stringer("container_id", cnr))
return false
continue
}

size += csRes.Size()

return false
})
}

return size, nil
}
Expand All @@ -65,21 +63,19 @@ func (e *StorageEngine) ListContainers() ([]cid.ID, error) {

uniqueIDs := make(map[cid.ID]struct{})

e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) {
for _, sh := range e.unsortedShards() {
res, err := sh.Shard.ListContainers(shard.ListContainersPrm{})
if err != nil {
e.reportShardError(sh, "can't get list of containers", err)
return false
continue
}

for _, cnr := range res.Containers() {
if _, ok := uniqueIDs[cnr]; !ok {
uniqueIDs[cnr] = struct{}{}
}
}

return false
})
}

result := make([]cid.ID, 0, len(uniqueIDs))
for cnr := range uniqueIDs {
Expand All @@ -100,21 +96,19 @@ func (e *StorageEngine) DeleteContainer(ctx context.Context, cID cid.ID) error {

var wg errgroup.Group

e.iterateOverUnsortedShards(func(hs shardWrapper) bool {
for _, sh := range e.unsortedShards() {
wg.Go(func() error {
err := hs.Shard.DeleteContainer(ctx, cID)
err := sh.Shard.DeleteContainer(ctx, cID)
if err != nil {
err = fmt.Errorf("container cleanup in %s shard: %w", hs.ID(), err)
err = fmt.Errorf("container cleanup in %s shard: %w", sh.ID(), err)
e.log.Warn("container cleanup", zap.Error(err))

return err
}

return nil
})

return false
})
}

return wg.Wait()
}
Expand Down
28 changes: 8 additions & 20 deletions pkg/local_object_storage/engine/exists.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,33 @@ import (
func (e *StorageEngine) exists(addr oid.Address) (bool, error) {
var shPrm shard.ExistsPrm
shPrm.SetAddress(addr)
alreadyRemoved := false
exists := false

e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) {
for _, sh := range e.sortedShards(addr) {
res, err := sh.Exists(shPrm)
if err != nil {
if shard.IsErrRemoved(err) {
alreadyRemoved = true

return true
return false, apistatus.ObjectAlreadyRemoved{}
}

var siErr *objectSDK.SplitInfoError
if errors.As(err, &siErr) {
return true
return false, nil
}

if shard.IsErrObjectExpired(err) {
return true
return false, nil
}

if !shard.IsErrNotFound(err) {
e.reportShardError(sh, "could not check existence of object in shard", err)
}
return false
continue
}

if !exists {
exists = res.Exists()
if res.Exists() {
return true, nil
}

return false
})

if alreadyRemoved {
var errRemoved apistatus.ObjectAlreadyRemoved

return false, errRemoved
}

return exists, nil
return false, nil
}
90 changes: 36 additions & 54 deletions pkg/local_object_storage/engine/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,99 +54,81 @@ func (e *StorageEngine) Get(addr oid.Address) (*objectSDK.Object, error) {

func (e *StorageEngine) get(addr oid.Address, shardFunc func(s *shard.Shard, ignoreMetadata bool) (hasMetadata bool, err error)) error {
var (
ok bool
siErr *objectSDK.SplitInfoError

errNotFound apistatus.ObjectNotFound

outSI *objectSDK.SplitInfo
outError error = errNotFound

hasDegraded bool
shardWithMeta shardWrapper
splitInfo *objectSDK.SplitInfo
metaError error
)

var hasDegraded bool
var objectExpired bool

e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) {
for _, sh := range e.sortedShards(addr) {
noMeta := sh.GetMode().NoMetabase()
hasDegraded = hasDegraded || noMeta

hasMetadata, err := shardFunc(sh.Shard, noMeta)
if err != nil {
var siErr *objectSDK.SplitInfoError

if hasMetadata {
shardWithMeta = sh
metaError = err
}
switch {
case shard.IsErrNotFound(err):
return false // ignore, go to next shard
continue // ignore, go to next shard
case errors.As(err, &siErr):
if outSI == nil {
outSI = objectSDK.NewSplitInfo()
if splitInfo == nil {
splitInfo = objectSDK.NewSplitInfo()
}

util.MergeSplitInfo(siErr.SplitInfo(), outSI)
util.MergeSplitInfo(siErr.SplitInfo(), splitInfo)

// stop iterating over shards if SplitInfo structure is complete
return !outSI.GetLink().IsZero() && !outSI.GetLastPart().IsZero()
if !splitInfo.GetLink().IsZero() && !splitInfo.GetLastPart().IsZero() {
return logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this line be just break?

Copy link
Member Author

@roman-khimov roman-khimov Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That in fact is one of the things I wanted to discuss. It looks like split info collection we attempt to do here (with iteration-persistent splitInfo) is totally useless, we can return immediately upon seeing any split info because that's what we do ultimately anyway. But maybe I'm missing something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split information may or may not have a link object and a last part (it is theoretically ok to have a middle object only, be resynced, etc)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the comment above --- having a complete split info is beneficial to the caller (even though it gets an error), so this if can't be removed even though lacking a proper info we will return whatever is available.

Regarding break/return --- both are valid, so it doesn't matter much, post-loop can be extended in which case return is better even though currently it duplicates error wrapping a bit.

}
continue
case shard.IsErrRemoved(err):
outError = err

return true // stop, return it back
return err // stop, return it back
case shard.IsErrObjectExpired(err):
// object is found but should not
// be returned
objectExpired = true
return true
return apistatus.ObjectNotFound{}
default:
e.reportShardError(sh, "could not get object from shard", err)
return false
continue
}
}

ok = true

return true
})
return nil // shardFunc is successful and it has the result
}

if outSI != nil {
return logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
if splitInfo != nil {
return logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
}

if objectExpired {
return errNotFound
if !hasDegraded && shardWithMeta.Shard == nil {
return apistatus.ObjectNotFound{}
}

if !ok {
if !hasDegraded && shardWithMeta.Shard == nil || !shard.IsErrNotFound(outError) {
return outError
// If the object is not found but is present in metabase,
// try to fetch it from blobstor directly. If it is found in any
// blobstor, increase the error counter for the shard which contains the meta.
for _, sh := range e.sortedShards(addr) {
if sh.GetMode().NoMetabase() {
// Already visited.
continue
}

// If the object is not found but is present in metabase,
// try to fetch it from blobstor directly. If it is found in any
// blobstor, increase the error counter for the shard which contains the meta.
e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) {
if sh.GetMode().NoMetabase() {
// Already visited.
return false
_, err := shardFunc(sh.Shard, true)
if err == nil {
if shardWithMeta.Shard != nil {
e.reportShardError(shardWithMeta, "meta info was present, but object is missing",
metaError, zap.Stringer("address", addr))
}

_, err := shardFunc(sh.Shard, true)
ok = err == nil
return ok
})
if !ok {
return outError
}
if shardWithMeta.Shard != nil {
e.reportShardError(shardWithMeta, "meta info was present, but object is missing",
metaError, zap.Stringer("address", addr))
return nil
}
}

return nil
return apistatus.ObjectNotFound{}
}

// GetBytes reads object from the StorageEngine by address into memory buffer in
Expand Down
53 changes: 20 additions & 33 deletions pkg/local_object_storage/engine/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,64 +34,51 @@ func (e *StorageEngine) Head(addr oid.Address, raw bool) (*objectSDK.Object, err
}

var (
head *objectSDK.Object
siErr *objectSDK.SplitInfoError

errNotFound apistatus.ObjectNotFound

outSI *objectSDK.SplitInfo
outError error = errNotFound
shPrm shard.HeadPrm
splitInfo *objectSDK.SplitInfo
)

var shPrm shard.HeadPrm
shPrm.SetAddress(addr)
shPrm.SetRaw(raw)

e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) {
for _, sh := range e.sortedShards(addr) {
res, err := sh.Head(shPrm)
if err != nil {
var siErr *objectSDK.SplitInfoError

switch {
case shard.IsErrNotFound(err):
return false // ignore, go to next shard
continue // ignore, go to next shard
case errors.As(err, &siErr):
if outSI == nil {
outSI = objectSDK.NewSplitInfo()
if splitInfo == nil {
splitInfo = objectSDK.NewSplitInfo()
}

util.MergeSplitInfo(siErr.SplitInfo(), outSI)
util.MergeSplitInfo(siErr.SplitInfo(), splitInfo)

// stop iterating over shards if SplitInfo structure is complete
return !outSI.GetLink().IsZero() && !outSI.GetLastPart().IsZero()
if !splitInfo.GetLink().IsZero() && !splitInfo.GetLastPart().IsZero() {
return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

}
continue
case shard.IsErrRemoved(err):
outError = err

return true // stop, return it back
return nil, err // stop, return it back
case shard.IsErrObjectExpired(err):
var notFoundErr apistatus.ObjectNotFound

// object is found but should not
// be returned
outError = notFoundErr

return true
return nil, apistatus.ObjectNotFound{}
default:
e.reportShardError(sh, "could not head object from shard", err)
return false
continue
}
}

head = res.Object()

return true
})

if outSI != nil {
return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
return res.Object(), nil
}

if head == nil {
return nil, outError
if splitInfo != nil {
return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
}

return head, nil
return nil, apistatus.ObjectNotFound{}
}
Loading