Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(CSI-250): do not maintain redundant active mounts from node server after publishing volume #320

Merged
merged 3 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charts/csi-wekafsplugin/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ pluginConfig:
snapshotVolumesWithoutQuotaEnforcement: false
mutuallyExclusiveMountOptions:
- "readcache,writecache,coherent,forcedirect"
- "sync,async"
mountProtocol:
# -- Use NFS transport for mounting Weka filesystems, off by default
useNfs: false
Expand Down
141 changes: 59 additions & 82 deletions pkg/wekafs/nfsmounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@ import (
"github.com/rs/zerolog/log"
"github.com/wekafs/csi-wekafs/pkg/wekafs/apiclient"
"k8s.io/mount-utils"
"sync"
"time"
)

type nfsMounter struct {
mountMap mountsMap
lock sync.Mutex
kMounter mount.Interface
debugPath string
selinuxSupport *bool
gc *innerPathVolGc
interfaceGroupName *string
clientGroupName string
kMounter mount.Interface
debugPath string
selinuxSupport *bool
gc *innerPathVolGc
interfaceGroupName *string
clientGroupName string
exclusiveMountOptions []mutuallyExclusiveMountOptionSet
}

func (m *nfsMounter) getGarbageCollector() *innerPathVolGc {
Expand All @@ -30,7 +28,7 @@ func newNfsMounter(driver *WekaFsDriver) *nfsMounter {
log.Debug().Msg("SELinux support is forced")
selinuxSupport = &[]bool{true}[0]
}
mounter := &nfsMounter{mountMap: mountsMap{}, debugPath: driver.debugPath, selinuxSupport: selinuxSupport}
mounter := &nfsMounter{debugPath: driver.debugPath, selinuxSupport: selinuxSupport, exclusiveMountOptions: driver.config.mutuallyExclusiveOptions}
mounter.gc = initInnerPathVolumeGc(mounter)
mounter.schedulePeriodicMountGc()
mounter.interfaceGroupName = &driver.config.interfaceGroupName
Expand All @@ -40,28 +38,20 @@ func newNfsMounter(driver *WekaFsDriver) *nfsMounter {
}

func (m *nfsMounter) NewMount(fsName string, options MountOptions) AnyMount {
m.lock.Lock()
if m.kMounter == nil {
m.kMounter = mount.New("")
}
if _, ok := m.mountMap[fsName]; !ok {
m.mountMap[fsName] = mountsMapPerFs{}
uniqueId := getStringSha1AsB32(fsName + ":" + options.String())
wMount := &nfsMount{
kMounter: m.kMounter,
fsName: fsName,
debugPath: m.debugPath,
mountPoint: "/run/weka-fs-mounts/" + getAsciiPart(fsName, 64) + "-" + uniqueId,
mountOptions: options,
interfaceGroupName: m.interfaceGroupName,
clientGroupName: m.clientGroupName,
}
if _, ok := m.mountMap[fsName][options.AsMapKey()]; !ok {
uniqueId := getStringSha1AsB32(fsName + ":" + options.String())
wMount := &nfsMount{
kMounter: m.kMounter,
fsName: fsName,
debugPath: m.debugPath,
mountPoint: "/run/weka-fs-mounts/" + getAsciiPart(fsName, 64) + "-" + uniqueId,
mountOptions: options,
interfaceGroupName: m.interfaceGroupName,
clientGroupName: m.clientGroupName,
}
m.mountMap[fsName][options.AsMapKey()] = wMount
}
m.lock.Unlock()
return m.mountMap[fsName][options.AsMapKey()]
return wMount
}

func (m *nfsMounter) getSelinuxStatus(ctx context.Context) bool {
Expand All @@ -76,6 +66,7 @@ func (m *nfsMounter) getSelinuxStatus(ctx context.Context) bool {
func (m *nfsMounter) mountWithOptions(ctx context.Context, fsName string, mountOptions MountOptions, apiClient *apiclient.ApiClient) (string, error, UnmountFunc) {
mountOptions.setSelinux(m.getSelinuxStatus(ctx), MountProtocolNfs)
mountOptions = mountOptions.AsNfs()
mountOptions.Merge(mountOptions, m.exclusiveMountOptions)
mountObj := m.NewMount(fsName, mountOptions)
mountErr := mountObj.incRef(ctx, apiClient)

Expand All @@ -97,68 +88,54 @@ func (m *nfsMounter) Mount(ctx context.Context, fs string, apiClient *apiclient.
func (m *nfsMounter) unmountWithOptions(ctx context.Context, fsName string, options MountOptions) error {
opts := options
options.setSelinux(m.getSelinuxStatus(ctx), MountProtocolNfs)
options = options.AsNfs()
options.Merge(options, m.exclusiveMountOptions)
mnt := m.NewMount(fsName, options)

log.Ctx(ctx).Trace().Strs("mount_options", opts.Strings()).Str("filesystem", fsName).Msg("Received an unmount request")
if mnt, ok := m.mountMap[fsName][options.AsMapKey()]; ok {
err := mnt.decRef(ctx)
if err == nil {
if m.mountMap[fsName][options.AsMapKey()].getRefCount() <= 0 {
log.Ctx(ctx).Trace().Str("filesystem", fsName).Strs("mount_options", options.Strings()).Msg("This is a last use of this mount, removing from map")
delete(m.mountMap[fsName], options.String())
}
if len(m.mountMap[fsName]) == 0 {
log.Ctx(ctx).Trace().Str("filesystem", fsName).Msg("No more mounts to filesystem, removing from map")
delete(m.mountMap, fsName)
}
}
return err

} else {
log.Ctx(ctx).Warn().Msg("Attempted to access mount point which is not known to the system")
return nil
}
return mnt.decRef(ctx)
}

func (m *nfsMounter) LogActiveMounts() {
if len(m.mountMap) > 0 {
count := 0
for fsName := range m.mountMap {
for mnt := range m.mountMap[fsName] {
mapEntry := m.mountMap[fsName][mnt]
if mapEntry.getRefCount() > 0 {
log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.getRefCount()).Strs("mount_options", mapEntry.getMountOptions().Strings()).Msg("Mount is active")
count++
} else {
log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.getRefCount()).Strs("mount_options", mapEntry.getMountOptions().Strings()).Msg("Mount is not active")
}

}
}
log.Debug().Int("total", len(m.mountMap)).Int("active", count).Msg("Periodic checkup on mount map")
}
//if len(m.mountMap) > 0 {
// count := 0
// for fsName := range m.mountMap {
// for mnt := range m.mountMap[fsName] {
// mapEntry := m.mountMap[fsName][mnt]
// if mapEntry.getRefCount() > 0 {
// log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.getRefCount()).Strs("mount_options", mapEntry.getMountOptions().Strings()).Msg("Mount is active")
// count++
// } else {
// log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.getRefCount()).Strs("mount_options", mapEntry.getMountOptions().Strings()).Msg("Mount is not active")
// }
//
// }
// }
// log.Debug().Int("total", len(m.mountMap)).Int("active", count).Msg("Periodic checkup on mount map")
//}
}

func (m *nfsMounter) gcInactiveMounts() {
if len(m.mountMap) > 0 {
for fsName := range m.mountMap {
for uniqueId, wekaMount := range m.mountMap[fsName] {
if wekaMount.getRefCount() == 0 {
if wekaMount.getLastUsed().Before(time.Now().Add(-inactiveMountGcPeriod)) {
m.lock.Lock()
if wekaMount.getRefCount() == 0 {
log.Trace().Str("filesystem", fsName).Strs("mount_options", wekaMount.getMountOptions().Strings()).
Time("last_used", wekaMount.getLastUsed()).Msg("Removing stale mount from map")
delete(m.mountMap[fsName], uniqueId)
}
m.lock.Unlock()
}
}
}
if len(m.mountMap[fsName]) == 0 {
delete(m.mountMap, fsName)
}
}
}
//if len(m.mountMap) > 0 {
// for fsName := range m.mountMap {
// for uniqueId, wekaMount := range m.mountMap[fsName] {
// if wekaMount.getRefCount() == 0 {
// if wekaMount.getLastUsed().Before(time.Now().Add(-inactiveMountGcPeriod)) {
// m.lock.Lock()
// if wekaMount.getRefCount() == 0 {
// log.Trace().Str("filesystem", fsName).Strs("mount_options", wekaMount.getMountOptions().Strings()).
// Time("last_used", wekaMount.getLastUsed()).Msg("Removing stale mount from map")
// delete(m.mountMap[fsName], uniqueId)
// }
// m.lock.Unlock()
// }
// }
// }
// if len(m.mountMap[fsName]) == 0 {
// delete(m.mountMap, fsName)
// }
// }
//}
}

func (m *nfsMounter) schedulePeriodicMountGc() {
Expand Down
22 changes: 5 additions & 17 deletions pkg/wekafs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,11 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis

err, unmount := volume.MountUnderlyingFS(ctx)
if err != nil {
unmount()
logger.Error().Err(err).Msg("Failed to mount underlying filesystem")
return NodePublishVolumeError(ctx, codes.Internal, "Failed to mount a parent filesystem, check Authentication: "+err.Error())
}
defer unmount() // unmount the parent mount since there is a bind mount anyway

fullPath := volume.GetFullPath(ctx)

targetPathDir := filepath.Dir(targetPath)
Expand Down Expand Up @@ -294,10 +296,8 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis

// if we run in K8s isolated environment, 2nd mount must be done using mapped volume path
if err := mounter.Mount(fullPath, targetPath, "", innerMountOpts); err != nil {
var errList strings.Builder
errList.WriteString(err.Error())
unmount() // unmount only if mount bind failed
return NodePublishVolumeError(ctx, codes.Internal, fmt.Sprintf("failed to Mount device: %s at %s: %s", fullPath, targetPath, errList.String()))
logger.Error().Err(err).Str("full_path", fullPath).Str("target_path", targetPath).Msg("Failed to perform mount")
return NodePublishVolumeError(ctx, codes.Internal, fmt.Sprintf("failed to Mount device: %s at %s: %s", fullPath, targetPath, err.Error()))
}
result = "SUCCESS"
// Not doing unmount, NodePublish should do unmount but only when it unmounts bind successfully
Expand All @@ -314,7 +314,6 @@ func NodeUnpublishVolumeError(ctx context.Context, errorCode codes.Code, errorMe
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
op := "NodeUnpublishVolume"
result := "FAILURE"
volumeID := req.GetVolumeId()
ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot())
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
Expand All @@ -337,12 +336,6 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return NodeUnpublishVolumeError(ctx, codes.Unavailable, "Too many concurrent requests, please retry")
}

// Check arguments
volume, err := NewVolumeFromId(ctx, req.GetVolumeId(), nil, ns)
if err != nil {
return &csi.NodeUnpublishVolumeResponse{}, err
}

if len(req.GetTargetPath()) == 0 {
return NodeUnpublishVolumeError(ctx, codes.InvalidArgument, "Target path missing in request")
}
Expand Down Expand Up @@ -387,11 +380,6 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return NodeUnpublishVolumeError(ctx, codes.Internal, err.Error())
}

logger.Trace().Str("volume_id", volumeID).Msg("Unmounting")
err = volume.UnmountUnderlyingFS(ctx)
if err != nil {
logger.Error().Str("volume_id", volumeID).Err(err).Msg("Post-unpublish task failed")
}
result = "SUCCESS"
return &csi.NodeUnpublishVolumeResponse{}, nil
}
Expand Down
Loading