diff --git a/charts/csi-wekafsplugin/values.yaml b/charts/csi-wekafsplugin/values.yaml index cba756b69..f669afe05 100644 --- a/charts/csi-wekafsplugin/values.yaml +++ b/charts/csi-wekafsplugin/values.yaml @@ -145,6 +145,7 @@ pluginConfig: snapshotVolumesWithoutQuotaEnforcement: false mutuallyExclusiveMountOptions: - "readcache,writecache,coherent,forcedirect" + - "sync,async" mountProtocol: # -- Use NFS transport for mounting Weka filesystems, off by default useNfs: false diff --git a/pkg/wekafs/nfsmounter.go b/pkg/wekafs/nfsmounter.go index 219d4510f..90b692304 100644 --- a/pkg/wekafs/nfsmounter.go +++ b/pkg/wekafs/nfsmounter.go @@ -5,19 +5,17 @@ import ( "github.com/rs/zerolog/log" "github.com/wekafs/csi-wekafs/pkg/wekafs/apiclient" "k8s.io/mount-utils" - "sync" "time" ) type nfsMounter struct { - mountMap mountsMap - lock sync.Mutex - kMounter mount.Interface - debugPath string - selinuxSupport *bool - gc *innerPathVolGc - interfaceGroupName *string - clientGroupName string + kMounter mount.Interface + debugPath string + selinuxSupport *bool + gc *innerPathVolGc + interfaceGroupName *string + clientGroupName string + exclusiveMountOptions []mutuallyExclusiveMountOptionSet } func (m *nfsMounter) getGarbageCollector() *innerPathVolGc { @@ -30,7 +28,7 @@ func newNfsMounter(driver *WekaFsDriver) *nfsMounter { log.Debug().Msg("SELinux support is forced") selinuxSupport = &[]bool{true}[0] } - mounter := &nfsMounter{mountMap: mountsMap{}, debugPath: driver.debugPath, selinuxSupport: selinuxSupport} + mounter := &nfsMounter{debugPath: driver.debugPath, selinuxSupport: selinuxSupport, exclusiveMountOptions: driver.config.mutuallyExclusiveOptions} mounter.gc = initInnerPathVolumeGc(mounter) mounter.schedulePeriodicMountGc() mounter.interfaceGroupName = &driver.config.interfaceGroupName @@ -40,28 +38,20 @@ func newNfsMounter(driver *WekaFsDriver) *nfsMounter { } func (m *nfsMounter) NewMount(fsName string, options MountOptions) AnyMount { - m.lock.Lock() if m.kMounter == nil { m.kMounter = mount.New("") } - if _, ok := m.mountMap[fsName]; !ok { - m.mountMap[fsName] = mountsMapPerFs{} + uniqueId := getStringSha1AsB32(fsName + ":" + options.String()) + wMount := &nfsMount{ + kMounter: m.kMounter, + fsName: fsName, + debugPath: m.debugPath, + mountPoint: "/run/weka-fs-mounts/" + getAsciiPart(fsName, 64) + "-" + uniqueId, + mountOptions: options, + interfaceGroupName: m.interfaceGroupName, + clientGroupName: m.clientGroupName, } - if _, ok := m.mountMap[fsName][options.AsMapKey()]; !ok { - uniqueId := getStringSha1AsB32(fsName + ":" + options.String()) - wMount := &nfsMount{ - kMounter: m.kMounter, - fsName: fsName, - debugPath: m.debugPath, - mountPoint: "/run/weka-fs-mounts/" + getAsciiPart(fsName, 64) + "-" + uniqueId, - mountOptions: options, - interfaceGroupName: m.interfaceGroupName, - clientGroupName: m.clientGroupName, - } - m.mountMap[fsName][options.AsMapKey()] = wMount - } - m.lock.Unlock() - return m.mountMap[fsName][options.AsMapKey()] + return wMount } func (m *nfsMounter) getSelinuxStatus(ctx context.Context) bool { @@ -76,6 +66,7 @@ func (m *nfsMounter) getSelinuxStatus(ctx context.Context) bool { func (m *nfsMounter) mountWithOptions(ctx context.Context, fsName string, mountOptions MountOptions, apiClient *apiclient.ApiClient) (string, error, UnmountFunc) { mountOptions.setSelinux(m.getSelinuxStatus(ctx), MountProtocolNfs) mountOptions = mountOptions.AsNfs() + mountOptions.Merge(mountOptions, m.exclusiveMountOptions) mountObj := m.NewMount(fsName, mountOptions) mountErr := mountObj.incRef(ctx, apiClient) @@ -97,68 +88,54 @@ func (m *nfsMounter) Mount(ctx context.Context, fs string, apiClient *apiclient. func (m *nfsMounter) unmountWithOptions(ctx context.Context, fsName string, options MountOptions) error { opts := options options.setSelinux(m.getSelinuxStatus(ctx), MountProtocolNfs) + options = options.AsNfs() + options.Merge(options, m.exclusiveMountOptions) + mnt := m.NewMount(fsName, options) log.Ctx(ctx).Trace().Strs("mount_options", opts.Strings()).Str("filesystem", fsName).Msg("Received an unmount request") - if mnt, ok := m.mountMap[fsName][options.AsMapKey()]; ok { - err := mnt.decRef(ctx) - if err == nil { - if m.mountMap[fsName][options.AsMapKey()].getRefCount() <= 0 { - log.Ctx(ctx).Trace().Str("filesystem", fsName).Strs("mount_options", options.Strings()).Msg("This is a last use of this mount, removing from map") - delete(m.mountMap[fsName], options.String()) - } - if len(m.mountMap[fsName]) == 0 { - log.Ctx(ctx).Trace().Str("filesystem", fsName).Msg("No more mounts to filesystem, removing from map") - delete(m.mountMap, fsName) - } - } - return err - - } else { - log.Ctx(ctx).Warn().Msg("Attempted to access mount point which is not known to the system") - return nil - } + return mnt.decRef(ctx) } func (m *nfsMounter) LogActiveMounts() { - if len(m.mountMap) > 0 { - count := 0 - for fsName := range m.mountMap { - for mnt := range m.mountMap[fsName] { - mapEntry := m.mountMap[fsName][mnt] - if mapEntry.getRefCount() > 0 { - log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.getRefCount()).Strs("mount_options", mapEntry.getMountOptions().Strings()).Msg("Mount is active") - count++ - } else { - log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.getRefCount()).Strs("mount_options", mapEntry.getMountOptions().Strings()).Msg("Mount is not active") - } - - } - } - log.Debug().Int("total", len(m.mountMap)).Int("active", count).Msg("Periodic checkup on mount map") - } + //if len(m.mountMap) > 0 { + // count := 0 + // for fsName := range m.mountMap { + // for mnt := range m.mountMap[fsName] { + // mapEntry := m.mountMap[fsName][mnt] + // if mapEntry.getRefCount() > 0 { + // log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.getRefCount()).Strs("mount_options", mapEntry.getMountOptions().Strings()).Msg("Mount is active") + // count++ + // } else { + // log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.getRefCount()).Strs("mount_options", mapEntry.getMountOptions().Strings()).Msg("Mount is not active") + // } + // + // } + // } + // log.Debug().Int("total", len(m.mountMap)).Int("active", count).Msg("Periodic checkup on mount map") + //} } func (m *nfsMounter) gcInactiveMounts() { - if len(m.mountMap) > 0 { - for fsName := range m.mountMap { - for uniqueId, wekaMount := range m.mountMap[fsName] { - if wekaMount.getRefCount() == 0 { - if wekaMount.getLastUsed().Before(time.Now().Add(-inactiveMountGcPeriod)) { - m.lock.Lock() - if wekaMount.getRefCount() == 0 { - log.Trace().Str("filesystem", fsName).Strs("mount_options", wekaMount.getMountOptions().Strings()). - Time("last_used", wekaMount.getLastUsed()).Msg("Removing stale mount from map") - delete(m.mountMap[fsName], uniqueId) - } - m.lock.Unlock() - } - } - } - if len(m.mountMap[fsName]) == 0 { - delete(m.mountMap, fsName) - } - } - } + //if len(m.mountMap) > 0 { + // for fsName := range m.mountMap { + // for uniqueId, wekaMount := range m.mountMap[fsName] { + // if wekaMount.getRefCount() == 0 { + // if wekaMount.getLastUsed().Before(time.Now().Add(-inactiveMountGcPeriod)) { + // m.lock.Lock() + // if wekaMount.getRefCount() == 0 { + // log.Trace().Str("filesystem", fsName).Strs("mount_options", wekaMount.getMountOptions().Strings()). + // Time("last_used", wekaMount.getLastUsed()).Msg("Removing stale mount from map") + // delete(m.mountMap[fsName], uniqueId) + // } + // m.lock.Unlock() + // } + // } + // } + // if len(m.mountMap[fsName]) == 0 { + // delete(m.mountMap, fsName) + // } + // } + //} } func (m *nfsMounter) schedulePeriodicMountGc() { diff --git a/pkg/wekafs/nodeserver.go b/pkg/wekafs/nodeserver.go index 179a62caa..4190666ff 100644 --- a/pkg/wekafs/nodeserver.go +++ b/pkg/wekafs/nodeserver.go @@ -251,9 +251,11 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis err, unmount := volume.MountUnderlyingFS(ctx) if err != nil { - unmount() + logger.Error().Err(err).Msg("Failed to mount underlying filesystem") return NodePublishVolumeError(ctx, codes.Internal, "Failed to mount a parent filesystem, check Authentication: "+err.Error()) } + defer unmount() // unmount the parent mount since there is a bind mount anyway + fullPath := volume.GetFullPath(ctx) targetPathDir := filepath.Dir(targetPath) @@ -294,10 +296,8 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis // if we run in K8s isolated environment, 2nd mount must be done using mapped volume path if err := mounter.Mount(fullPath, targetPath, "", innerMountOpts); err != nil { - var errList strings.Builder - errList.WriteString(err.Error()) - unmount() // unmount only if mount bind failed - return NodePublishVolumeError(ctx, codes.Internal, fmt.Sprintf("failed to Mount device: %s at %s: %s", fullPath, targetPath, errList.String())) + logger.Error().Err(err).Str("full_path", fullPath).Str("target_path", targetPath).Msg("Failed to perform mount") + return NodePublishVolumeError(ctx, codes.Internal, fmt.Sprintf("failed to Mount device: %s at %s: %s", fullPath, targetPath, err.Error())) } result = "SUCCESS" // Not doing unmount, NodePublish should do unmount but only when it unmounts bind successfully @@ -314,7 +314,6 @@ func NodeUnpublishVolumeError(ctx context.Context, errorCode codes.Code, errorMe func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { op := "NodeUnpublishVolume" result := "FAILURE" - volumeID := req.GetVolumeId() ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot()) defer span.End() ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) @@ -337,12 +336,6 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return NodeUnpublishVolumeError(ctx, codes.Unavailable, "Too many concurrent requests, please retry") } - // Check arguments - volume, err := NewVolumeFromId(ctx, req.GetVolumeId(), nil, ns) - if err != nil { - return &csi.NodeUnpublishVolumeResponse{}, err - } - if len(req.GetTargetPath()) == 0 { return NodeUnpublishVolumeError(ctx, codes.InvalidArgument, "Target path missing in request") } @@ -387,11 +380,6 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return NodeUnpublishVolumeError(ctx, codes.Internal, err.Error()) } - logger.Trace().Str("volume_id", volumeID).Msg("Unmounting") - err = volume.UnmountUnderlyingFS(ctx) - if err != nil { - logger.Error().Str("volume_id", volumeID).Err(err).Msg("Post-unpublish task failed") - } result = "SUCCESS" return &csi.NodeUnpublishVolumeResponse{}, nil }