diff --git a/Dockerfile b/Dockerfile index 1391ff267..6549db795 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,6 @@ FROM golang:1.22-alpine as go-builder # https://stackoverflow.com/questions/36279253/go-compiled-binary-wont-run-in-an-alpine-docker-container-on-ubuntu-host -RUN apk add --no-cache libc6-compat gcc -RUN apk add musl-dev +RUN apk add --no-cache libc6-compat gcc musl-dev COPY go.mod /src/go.mod COPY go.sum /src/go.sum WORKDIR /src @@ -26,13 +25,14 @@ RUN CGO_ENABLED=0 GOOS="linux" GOARCH="amd64" go build -a -ldflags '-X main.vers FROM alpine:3.18 LABEL maintainers="WekaIO, LTD" LABEL description="Weka CSI Driver" -# Add util-linux to get a new version of losetup. -RUN apk add util-linux libselinux libselinux-utils util-linux pciutils usbutils coreutils binutils findutils grep bash -# Update CA certificates -RUN apk add ca-certificates -RUN update-ca-certificates ADD https://github.com/tigrawap/locar/releases/download/0.4.0/locar_linux_amd64 /locar RUN chmod +x /locar +RUN apk add --no-cache util-linux libselinux libselinux-utils util-linux \ + pciutils usbutils coreutils binutils findutils \ + grep bash nfs-utils rpcbind ca-certificates +# Update CA certificates +RUN update-ca-certificates COPY --from=go-builder /bin/wekafsplugin /wekafsplugin ARG binary=/bin/wekafsplugin +EXPOSE 2049 111/tcp 111/udp ENTRYPOINT ["/wekafsplugin"] diff --git a/charts/csi-wekafsplugin/templates/NOTES.txt b/charts/csi-wekafsplugin/templates/NOTES.txt index 7bf60d2f3..039e6e00e 100644 --- a/charts/csi-wekafsplugin/templates/NOTES.txt +++ b/charts/csi-wekafsplugin/templates/NOTES.txt @@ -25,3 +25,21 @@ https://github.com/weka/csi-wekafs/tree/master/examples | NEW FEATURES RELY ON API CONNECTIVITY TO WEKA CLUSTER AND WILL NOT BE SUPPORTED ON API-UNBOUND VOLUMES. | | PLEASE MAKE SURE TO MIGRATE ALL EXISTING VOLUMES TO API-BASED SCHEME PRIOR TO NEXT VERSION UPGRADE. | ------------------------------------------------------------------------------------------------------------ + +{{- if or .Values.pluginConfig.mountProtocol.useNfs .Values.pluginConfig.mountProtocol.allowNfsFailback }} +-------------------------------------------------- WARNING ------------------------------------------------- +{{- if .Values.pluginConfig.mountProtocol.useNfs }} +| WARNING: NFS PROTOCOL IS ENFORCED AND WILL ALWAYS BE USED FOR MOUNTING WEKA FILESYSTEMS! | +| NFS TRANSPORT DOES NOT PROVIDE MAXIMUM PERFORMANCE AND IS NOT RECOMMENDED FOR PRODUCTION USE. | +{{- else }} +| WARNING: NFS MOUNT PROTOCOL FAILBACK IS ENABLED, AND NFS MOUNTS WILL BE USED IF WEKA IS NOT INSTALLED. | +| NFS TRANSPORT DOES NOT PROVIDE MAXIMUM PERFORMANCE AND IS NOT RECOMMENDED FOR PRODUCTION USE. | +| HOWEVER, IN CERTAIN CASES WHEN WEKA CLIENT INSTALLATION IS NOT POSSIBLE, NFS MOUNTS WILL BE USED. | +| IF WEKA CLIENT IS INSTALLED ON NODES AFTER CSI PLUGIN INSTALLATION, RESTART IS REQUIRED FOR THE | +| CORRESPONDENT CSI PLUGIN COMPONENTS RUNNING ON THE NODE TO SWITCH BACK TO WEKAFS PROTOCOL MOUNTING. | +{{- end }} +| MAKE SURE THAT AT LEAST ONE INTERFACE GROUP IS CONFIGURED ON WEKA CLUSTER, OTHERWISE PROVISION WILL FAIL | +| REFER TO THE DOCUMENTATION ABOVE FOR MORE INFORMATION ON NFS INTERFACE GROUP CONFIGURATION. | +| REFER TO WEKA CUSTOMER SUCCESS TEAM FOR RECOMMENDED CONFIGURATION AND BEST PRACTICES | +------------------------------------------------------------------------------------------------------------ +{{- end }} \ No newline at end of file diff --git a/charts/csi-wekafsplugin/templates/controllerserver-security-context-constraint.yaml b/charts/csi-wekafsplugin/templates/controllerserver-security-context-constraint.yaml index c9bcb02d8..8a463f6cb 100644 --- a/charts/csi-wekafsplugin/templates/controllerserver-security-context-constraint.yaml +++ b/charts/csi-wekafsplugin/templates/controllerserver-security-context-constraint.yaml @@ -6,14 +6,14 @@ metadata: allowPrivilegedContainer: true allowHostDirVolumePlugin: true -{{- if .Values.hostNetwork }} +{{- if or .Values.hostNetwork .Values.pluginConfig.mountProtocol.allowNfsFailback .Values.pluginConfig.mountProtocol.useNfs }} allowHostNetwork: true {{- end }} allowedVolumeTypes: - hostPath - secret readOnlyRootFilesystem: false - +allowHostPorts: true runAsUser: type: RunAsAny seLinuxContext: diff --git a/charts/csi-wekafsplugin/templates/controllerserver-statefulset.yaml b/charts/csi-wekafsplugin/templates/controllerserver-statefulset.yaml index b15ee8ab5..fbbdcf93a 100755 --- a/charts/csi-wekafsplugin/templates/controllerserver-statefulset.yaml +++ b/charts/csi-wekafsplugin/templates/controllerserver-statefulset.yaml @@ -30,8 +30,8 @@ spec: nodeSelector: {{ toYaml .Values.nodeSelector | nindent 8}} {{- end }} serviceAccountName: {{ .Release.Name }}-controller - {{- if .Values.hostNetwork }} - hostNetwork: {{ .Values.hostNetwork }} + {{- if or .Values.hostNetwork .Values.pluginConfig.mountProtocol.useNfs .Values.pluginConfig.mountProtocol.allowNfsFailback}} + hostNetwork: true {{- end }} containers: - name: csi-attacher @@ -215,6 +215,15 @@ spec: - "--concurrency.createSnapshot={{ .Values.controller.concurrency.createSnapshot | default "1" }}" - "--concurrency.deleteSnapshot={{ .Values.controller.concurrency.deleteSnapshot | default "1" }}" {{- end }} + {{- if .Values.pluginConfig.mountProtocol.useNfs | default false }} + - "--usenfs" + {{- end }} + {{- if .Values.pluginConfig.mountProtocol.allowNfsFailback | default false }} + - "--allownfsfailback" + {{- end }} + {{- if .Values.pluginConfig.mountProtocol.interfaceGroupName }} + - "--interfacegroupname={{ .Values.pluginConfig.mountProtocol.interfaceGroupName }}" + {{- end }} ports: - containerPort: 9898 name: healthz @@ -249,6 +258,10 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName + - name: KUBE_NODE_IP_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.hostIP volumeMounts: - mountPath: /csi name: socket-dir diff --git a/charts/csi-wekafsplugin/templates/nodeserver-daemonset.yaml b/charts/csi-wekafsplugin/templates/nodeserver-daemonset.yaml index e438de56b..240d6f944 100644 --- a/charts/csi-wekafsplugin/templates/nodeserver-daemonset.yaml +++ b/charts/csi-wekafsplugin/templates/nodeserver-daemonset.yaml @@ -39,8 +39,8 @@ spec: {{- if .Values.priorityClassName }} priorityClassName: {{ .Values.priorityClassName }} {{- end }} - {{- if .Values.hostNetwork }} - hostNetwork: {{ .Values.hostNetwork }} + {{- if or .Values.hostNetwork .Values.pluginConfig.mountProtocol.useNfs .Values.pluginConfig.mountProtocol.allowNfsFailback}} + hostNetwork: true {{- end }} initContainers: - name: init @@ -106,6 +106,15 @@ spec: - "--concurrency.nodePublishVolume={{ .Values.node.concurrency.nodePublishVolume | default "1" }}" - "--concurrency.nodeUnpublishVolume={{ .Values.node.concurrency.nodeUnpublishVolume | default "1" }}" {{- end }} + {{- if .Values.pluginConfig.mountProtocol.useNfs | default false }} + - "--usenfs" + {{- end }} + {{- if .Values.pluginConfig.mountProtocol.allowNfsFailback | default false }} + - "--allownfsfailback" + {{- end }} + {{- if .Values.pluginConfig.mountProtocol.interfaceGroupName }} + - "--interfacegroupname={{ .Values.pluginConfig.mountProtocol.interfaceGroupName }}" + {{- end }} ports: - containerPort: 9899 name: healthz @@ -136,6 +145,10 @@ spec: value: {{ required "Provide CSI Driver Dynamic Volume Creation Path" .Values.dynamicProvisionPath }} - name: X_CSI_MODE value: node + - name: KUBE_NODE_IP_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.hostIP volumeMounts: - mountPath: /csi name: socket-dir diff --git a/charts/csi-wekafsplugin/templates/nodeserver-security-context-constraint.yaml b/charts/csi-wekafsplugin/templates/nodeserver-security-context-constraint.yaml index 1334e7c9d..153a3c4db 100644 --- a/charts/csi-wekafsplugin/templates/nodeserver-security-context-constraint.yaml +++ b/charts/csi-wekafsplugin/templates/nodeserver-security-context-constraint.yaml @@ -6,7 +6,7 @@ metadata: allowPrivilegedContainer: true allowHostDirVolumePlugin: true -{{- if .Values.hostNetwork }} +{{- if or .Values.hostNetwork .Values.pluginConfig.mountProtocol.allowNfsFailback .Values.pluginConfig.mountProtocol.useNfs }} allowHostNetwork: true {{- end }} allowedVolumeTypes: diff --git a/charts/csi-wekafsplugin/values.yaml b/charts/csi-wekafsplugin/values.yaml index d4d30d3c5..1aa3befd2 100644 --- a/charts/csi-wekafsplugin/values.yaml +++ b/charts/csi-wekafsplugin/values.yaml @@ -107,7 +107,7 @@ metrics: # -- Tracing URL (For Jaeger tracing engine / OpenTelemetry), optional # @ignore tracingUrl: "" -# -- Set to true to use host networking +# -- Set to true to use host networking. Will be always set to true when using NFS mount protocol hostNetwork: false pluginConfig: # -- CSI Driver support for fsGroupPolicy, may be either "File" or "None". Default is "File" @@ -140,3 +140,12 @@ pluginConfig: snapshotVolumesWithoutQuotaEnforcement: false mutuallyExclusiveMountOptions: - "readcache,writecache,coherent,forcedirect" + mountProtocol: + # -- Use NFS transport for mounting Weka filesystems, off by default + useNfs: false + # -- Allow Failback to NFS transport if Weka client fails to mount filesystem using native protocol + allowNfsFailback: false + # -- Specify name of NFS interface group to use for mounting Weka filesystems. If not set, first NFS interface group will be used + interfaceGroupName: "" + + diff --git a/cmd/wekafsplugin/main.go b/cmd/wekafsplugin/main.go index 54c8a9271..df393ee2e 100644 --- a/cmd/wekafsplugin/main.go +++ b/cmd/wekafsplugin/main.go @@ -91,6 +91,9 @@ var ( maxConcurrentNodeUnpublishVolumeReqs = flag.Int64("concurrency.nodeUnpublishVolume", 1, "Maximum concurrent NodeUnpublishVolume requests") grpcRequestTimeoutSeconds = flag.Int("grpcrequesttimeoutseconds", 30, "Time out requests waiting in queue after X seconds") allowProtocolContainers = flag.Bool("allowprotocolcontainers", false, "Allow protocol containers to be used for mounting filesystems") + allowNfsFailback = flag.Bool("allownfsfailback", false, "Allow NFS failback") + useNfs = flag.Bool("usenfs", false, "Use NFS for mounting volumes") + interfaceGroupName = flag.String("interfacegroupname", "", "Name of the NFS interface group to use for mounting volumes") // Set by the build process version = "" ) @@ -217,6 +220,9 @@ func handle() { *maxConcurrentNodeUnpublishVolumeReqs, *grpcRequestTimeoutSeconds, *allowProtocolContainers, + *allowNfsFailback, + *useNfs, + *interfaceGroupName, ) driver, err := wekafs.NewWekaFsDriver( *driverName, *nodeID, *endpoint, *maxVolumesPerNode, version, *debugPath, csiMode, *selinuxSupport, config) diff --git a/go.mod b/go.mod index 6b6766c91..ae6928226 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/prometheus/client_golang v1.19.1 github.com/rs/zerolog v1.33.0 github.com/showa-93/go-mask v0.6.2 + github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/exporters/jaeger v1.17.0 go.opentelemetry.io/otel/sdk v1.28.0 @@ -30,13 +31,16 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/moby/sys/mountinfo v0.7.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect @@ -45,6 +49,7 @@ require ( golang.org/x/sys v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect ) diff --git a/go.sum b/go.sum index e7a564d21..1fe56df78 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,7 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/container-storage-interface/spec v1.10.0 h1:YkzWPV39x+ZMTa6Ax2czJLLwpryrQ+dPesB34mrRMXA= github.com/container-storage-interface/spec v1.10.0/go.mod h1:DtUvaQszPml1YJfIK7c00mlv6/g4wNMLanLgiUbKFRI= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -24,6 +25,10 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kubernetes-csi/csi-lib-utils v0.18.1 h1:vpg1kbQ6lFVCz7mY71zcqVE7W0GAQXXBoFfHvbW3gdw= github.com/kubernetes-csi/csi-lib-utils v0.18.1/go.mod h1:PIcn27zmbY0KBue4JDdZVfDF56tjcS3jKroZPi+pMoY= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= @@ -49,13 +54,15 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/showa-93/go-mask v0.6.2 h1:sJEUQRpbxUoMTfBKey5K9hCg+eSx5KIAZFT7pa1LXbM= github.com/showa-93/go-mask v0.6.2/go.mod h1:aswIj007gm0EPAzOGES9ACy1jDm3QT08/LPSClMp410= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= @@ -89,6 +96,9 @@ google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/apimachinery v0.30.3 h1:q1laaWCmrszyQuSQCfNB8cFgCuDAoPszKY4ucAjDwHc= diff --git a/pkg/wekafs/apiclient/apiclient.go b/pkg/wekafs/apiclient/apiclient.go index d7c0cbd3d..5aff66973 100644 --- a/pkg/wekafs/apiclient/apiclient.go +++ b/pkg/wekafs/apiclient/apiclient.go @@ -64,6 +64,7 @@ type ApiClient struct { CompatibilityMap *WekaCompatibilityMap clientHash uint32 hostname string + NfsInterfaceGroups map[string]*InterfaceGroup } type ApiEndPoint struct { @@ -119,6 +120,7 @@ func NewApiClient(ctx context.Context, credentials Credentials, allowInsecureHtt CompatibilityMap: &WekaCompatibilityMap{}, hostname: hostname, actualApiEndpoints: make(map[string]*ApiEndPoint), + NfsInterfaceGroups: make(map[string]*InterfaceGroup), } a.resetDefaultEndpoints(ctx) @@ -745,21 +747,21 @@ type ApiResponse struct { // ApiObject generic interface of API object of any type (FileSystem, Quota, etc.) type ApiObject interface { - GetType() string - GetBasePath(a *ApiClient) string - GetApiUrl(a *ApiClient) string - EQ(other ApiObject) bool - getImmutableFields() []string - String() string + GetType() string // returns the type of the object + GetBasePath(a *ApiClient) string // returns the base path of objects of this type (plural) + GetApiUrl(a *ApiClient) string // returns the full URL of the object consisting of base path and object UID + EQ(other ApiObject) bool // a way to compare objects and check if they are the same + getImmutableFields() []string // provides a list of fields that are used for comparison in EQ() + String() string // returns a string representation of the object } // ApiObjectRequest interface that describes a request for an ApiObject CRUD operation type ApiObjectRequest interface { - getRequiredFields() []string - hasRequiredFields() bool - getRelatedObject() ApiObject - getApiUrl(a *ApiClient) string - String() string + getRequiredFields() []string // returns a list of fields that are mandatory for the object for creation + hasRequiredFields() bool // checks if all mandatory fields are filled in + getRelatedObject() ApiObject // returns the type of object that is being requested + getApiUrl(a *ApiClient) string // returns the full URL of the object consisting of base path and object UID + String() string // returns a string representation of the object request } type Credentials struct { diff --git a/pkg/wekafs/apiclient/interfacegroup.go b/pkg/wekafs/apiclient/interfacegroup.go new file mode 100644 index 000000000..c4c181abe --- /dev/null +++ b/pkg/wekafs/apiclient/interfacegroup.go @@ -0,0 +1,195 @@ +package apiclient + +import ( + "context" + "errors" + "fmt" + "github.com/google/uuid" + "github.com/rs/zerolog/log" + "k8s.io/helm/pkg/urlutil" + "os" + "sort" +) + +type InterfaceGroupType string + +const ( + InterfaceGroupTypeNFS InterfaceGroupType = "NFS" + InterfaceGroupTypeSMB InterfaceGroupType = "SMB" +) + +type InterfaceGroup struct { + SubnetMask string `json:"subnet_mask"` + Name string `json:"name"` + Uid uuid.UUID `json:"uid"` + Ips []string `json:"ips"` + AllowManageGids bool `json:"allow_manage_gids"` + Type InterfaceGroupType `json:"type"` + Gateway string `json:"gateway"` + Status string `json:"status"` +} + +func (i *InterfaceGroup) String() string { + return fmt.Sprintln("InterfaceGroup ", i.Name, "Uid:", i.Uid.String(), "type:", i.Type, "status:", i.Status) +} + +func (i *InterfaceGroup) getImmutableFields() []string { + return []string{"Name", "Gateway", "SubnetMask", "Type"} +} + +func (i *InterfaceGroup) GetType() string { + return "interfaceGroup" +} + +func (i *InterfaceGroup) GetBasePath(client *ApiClient) string { + return "interfaceGroups" +} + +func (i *InterfaceGroup) GetApiUrl(client *ApiClient) string { + url, err := urlutil.URLJoin(i.GetBasePath(client), i.Uid.String()) + if err == nil { + return url + } + return "" +} + +func (i *InterfaceGroup) EQ(other ApiObject) bool { + return ObjectsAreEqual(i, other) +} + +func (i *InterfaceGroup) getInterfaceGroupType() InterfaceGroupType { + return i.Type +} + +func (i *InterfaceGroup) isNfs() bool { + return i.getInterfaceGroupType() == InterfaceGroupTypeNFS +} + +func (i *InterfaceGroup) isSmb() bool { + return i.getInterfaceGroupType() == InterfaceGroupTypeSMB +} + +// GetIpAddress returns a single IP address based on hostname, so for same server, always same IP address will be returned +// This is useful for NFS mount, where we need to have same IP address for same server +// TODO: this could be further optimized in future to avoid a situation where multiple servers are not evenly distributed +// and some IPs are getting more load than others. Could be done, for example, by random selection of IP address etc. +func (i *InterfaceGroup) GetIpAddress(ctx context.Context) (string, error) { + logger := log.Ctx(ctx) + if i == nil { + return "", errors.New("interface group is nil") + } + if len(i.Ips) == 0 { + return "", errors.New("no IP addresses found for interface group") + } + hostname, err := os.Hostname() + if err != nil { + return "", err + } + if hostname == "" { + hostname = "localhost" + } + idx := hashString(hostname, len(i.Ips)) + logger.Debug().Int("index", idx).Str("hostname", hostname).Int("ips", len(i.Ips)).Msg("Selected IP address based on hostname") + return i.Ips[idx], nil +} + +func (a *ApiClient) GetInterfaceGroups(ctx context.Context, interfaceGroups *[]InterfaceGroup) error { + ig := &InterfaceGroup{} + + err := a.Get(ctx, ig.GetBasePath(a), nil, interfaceGroups) + if err != nil { + return err + } + return nil +} + +func (a *ApiClient) GetInterfaceGroupsByType(ctx context.Context, groupType InterfaceGroupType, interfaceGroups *[]InterfaceGroup) error { + res := &[]InterfaceGroup{} + err := a.GetInterfaceGroups(ctx, res) + if err != nil { + return nil + } + for _, ig := range *res { + if ig.getInterfaceGroupType() == groupType { + *interfaceGroups = append(*interfaceGroups, ig) + } + } + return nil +} + +func (a *ApiClient) GetInterfaceGroupByUid(ctx context.Context, uid uuid.UUID, interfaceGroup *InterfaceGroup) error { + ig := &InterfaceGroup{ + Uid: uid, + } + err := a.Get(ctx, ig.GetApiUrl(a), nil, interfaceGroup) + if err != nil { + return err + } + return nil +} + +func (a *ApiClient) fetchNfsInterfaceGroup(ctx context.Context, name *string, useDefault bool) error { + igs := &[]InterfaceGroup{} + err := a.GetInterfaceGroupsByType(ctx, InterfaceGroupTypeNFS, igs) + if err != nil { + return errors.Join(errors.New("failed to fetch nfs interface groups"), err) + } + if len(*igs) == 0 { + return errors.New("no nfs interface groups found") + } + if name != nil { + for _, ig := range *igs { + if ig.Name == *name { + a.NfsInterfaceGroups[*name] = &ig + } + } + } else if useDefault { + a.NfsInterfaceGroups["default"] = &(*igs)[0] + } + + ig := &InterfaceGroup{} + if name != nil { + ig = a.NfsInterfaceGroups[*name] + } else { + ig = a.NfsInterfaceGroups["default"] + } + if ig == nil { + return errors.New("no nfs interface group found") + } + + if len(ig.Ips) == 0 { + return errors.New("no IP addresses found for nfs interface group") + } + // Make sure the IPs are always sorted + sort.Strings(ig.Ips) + return nil +} + +func (a *ApiClient) GetNfsInterfaceGroup(ctx context.Context, name *string) *InterfaceGroup { + igName := "default" + if name != nil { + igName = *name + } + _, ok := a.NfsInterfaceGroups[igName] + if !ok { + err := a.fetchNfsInterfaceGroup(ctx, name, true) + if err != nil { + return nil + } + } + return a.NfsInterfaceGroups[igName] +} + +// GetNfsMountIp returns the IP address of the NFS interface group to be used for NFS mount +// TODO: need to do it much more sophisticated way to distribute load +func (a *ApiClient) GetNfsMountIp(ctx context.Context, interfaceGroupName *string) (string, error) { + ig := a.GetNfsInterfaceGroup(ctx, interfaceGroupName) + if ig == nil { + return "", errors.New("no NFS interface group found") + } + if ig.Ips == nil || len(ig.Ips) == 0 { + return "", errors.New("no IP addresses found for NFS interface group") + } + + return ig.GetIpAddress(ctx) +} diff --git a/pkg/wekafs/apiclient/nfs.go b/pkg/wekafs/apiclient/nfs.go new file mode 100644 index 000000000..ec2f9612f --- /dev/null +++ b/pkg/wekafs/apiclient/nfs.go @@ -0,0 +1,721 @@ +package apiclient + +import ( + "context" + "encoding/json" + "errors" + "fmt" + qs "github.com/google/go-querystring/query" + "github.com/google/uuid" + "github.com/rs/zerolog/log" + "go.opentelemetry.io/otel" + "golang.org/x/exp/slices" + "k8s.io/helm/pkg/urlutil" + "strconv" +) + +type NfsPermissionType string +type NfsPermissionSquashMode string +type NfsClientGroupRuleType string +type NfsVersionString string +type NfsAuthType string + +const ( + NfsPermissionTypeReadWrite NfsPermissionType = "RW" + NfsPermissionTypeReadOnly NfsPermissionType = "RO" + NfsPermissionSquashModeNone NfsPermissionSquashMode = "none" + NfsPermissionSquashModeRoot NfsPermissionSquashMode = "root" + NfsPermissionSquashModeAll NfsPermissionSquashMode = "all" + NfsClientGroupRuleTypeDNS NfsClientGroupRuleType = "DNS" + NfsClientGroupRuleTypeIP NfsClientGroupRuleType = "IP" + NfsVersionV3 NfsVersionString = "V3" + NfsVersionV4 NfsVersionString = "V4" + NfsAuthTypeNone NfsAuthType = "NONE" + NfsAuthTypeSys NfsAuthType = "SYS" + NfsAuthTypeKerberos5 NfsAuthType = "KRB5" + NfsClientGroupName = "WekaCSIPluginClients" +) + +type NfsPermission struct { + GroupId string `json:"group_id,omitempty" url:"-"` + PrivilegedPort bool `json:"privileged_port,omitempty" url:"-"` + SupportedVersions []NfsVersionString `json:"supported_versions,omitempty" url:"-"` + Id string `json:"id,omitempty" url:"-"` + ObsDirect bool `json:"obs_direct,omitempty" url:"-"` + AnonUid string `json:"anon_uid,omitempty" url:"-"` + ManageGids bool `json:"manage_gids,omitempty" url:"-"` + CustomOptions string `json:"custom_options,omitempty" url:"-"` + Filesystem string `json:"filesystem" url:"-"` + Uid uuid.UUID `json:"uid,omitempty" url:"-"` + Group string `json:"group" url:"-"` + NfsClientGroupId string `json:"NfsClientGroup_id,omitempty" url:"-"` + PermissionType NfsPermissionType `json:"permission_type,omitempty" url:"-"` + MountOptions string `json:"mount_options,omitempty" url:"-"` + Path string `json:"path,omitempty" url:"-"` + SquashMode NfsPermissionSquashMode `json:"squash_mode,omitempty" url:"-"` + RootSquashing bool `json:"root_squashing,omitempty" url:"-"` + AnonGid string `json:"anon_gid,omitempty" url:"-"` + EnableAuthTypes []NfsAuthType `json:"enable_auth_types,omitempty" url:"-"` +} + +func (n *NfsPermission) GetType() string { + return "nfsPermission" +} + +func (n *NfsPermission) GetBasePath(a *ApiClient) string { + return "nfs/permissions" +} + +func (n *NfsPermission) GetApiUrl(a *ApiClient) string { + url, err := urlutil.URLJoin(n.GetBasePath(a), n.Uid.String()) + if err != nil { + return url + } + return "" +} + +func (n *NfsPermission) EQ(other ApiObject) bool { + return ObjectsAreEqual(n, other) +} + +func (n *NfsPermission) getImmutableFields() []string { + return []string{"Group", "Filesystem", "SupportedVersions", "PermissionType", "Path", "SquashMode"} +} + +func (n *NfsPermission) String() string { + return fmt.Sprintln("NfsPermission Uid:", n.Uid.String(), "NfsClientGroup:", n.Group, "path:", n.Path) +} + +func (n *NfsPermission) IsEligibleForCsi() bool { + return n.RootSquashing == false && slices.Contains(n.SupportedVersions, "V4") && + n.PermissionType == NfsPermissionTypeReadWrite && + n.SquashMode == NfsPermissionSquashModeNone +} + +func (a *ApiClient) GetNfsPermissions(ctx context.Context, fsUid uuid.UUID, permissions *[]NfsPermission) error { + n := &NfsPermission{} + + err := a.Get(ctx, n.GetBasePath(a), nil, permissions) + if err != nil { + return err + } + return nil +} + +func (a *ApiClient) FindNfsPermissionsByFilter(ctx context.Context, query *NfsPermission, resultSet *[]NfsPermission) error { + op := "FindNfsPermissionsByFilter" + ctx, span := otel.Tracer(TracerName).Start(ctx, op) + defer span.End() + ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) + ret := &[]NfsPermission{} + q, _ := qs.Values(query) + err := a.Get(ctx, query.GetBasePath(a), q, ret) + if err != nil { + return err + } + for _, r := range *ret { + if r.EQ(query) { + *resultSet = append(*resultSet, r) + } + } + return nil +} + +// GetNfsPermissionByFilter expected to return exactly one result of FindNfsPermissionsByFilter (error) +func (a *ApiClient) GetNfsPermissionByFilter(ctx context.Context, query *NfsPermission) (*NfsPermission, error) { + rs := &[]NfsPermission{} + err := a.FindNfsPermissionsByFilter(ctx, query, rs) + if err != nil { + return &NfsPermission{}, err + } + if *rs == nil || len(*rs) == 0 { + return &NfsPermission{}, ObjectNotFoundError + } + if len(*rs) > 1 { + return &NfsPermission{}, MultipleObjectsFoundError + } + result := &(*rs)[0] + return result, nil +} + +func (a *ApiClient) GetNfsPermissionsByFilesystemName(ctx context.Context, fsName string, permissions *[]NfsPermission) error { + query := &NfsPermission{Path: fsName} + return a.FindNfsPermissionsByFilter(ctx, query, permissions) +} + +func (a *ApiClient) GetNfsPermissionByUid(ctx context.Context, uid uuid.UUID) (*NfsPermission, error) { + query := &NfsPermission{Uid: uid} + return a.GetNfsPermissionByFilter(ctx, query) +} + +type NfsPermissionCreateRequest struct { + Filesystem string `json:"filesystem"` + Group string `json:"group"` + Path string `json:"path"` + PermissionType NfsPermissionType `json:"permission_type"` + SquashMode NfsPermissionSquashMode `json:"squash_mode"` + AnonUid int `json:"anon_uid"` + AnonGid int `json:"anon_gid"` + ObsDirect *bool `json:"obs_direct,omitempty"` + SupportedVersions *[]string `json:"supported_versions,omitempty"` + Priority int `json:"priority"` + EnableAuthTypes []NfsAuthType `json:"enable_auth_types"` +} + +func (qc *NfsPermissionCreateRequest) getApiUrl(a *ApiClient) string { + return qc.getRelatedObject().GetApiUrl(a) +} +func (qc *NfsPermissionCreateRequest) getRelatedObject() ApiObject { + return &NfsPermission{ + GroupId: qc.Group, + } +} + +func (qc *NfsPermissionCreateRequest) getRequiredFields() []string { + return []string{"Filesystem", "Group", "Path", "PermissionType", "SquashMode", "SupportedVersions"} +} +func (qc *NfsPermissionCreateRequest) hasRequiredFields() bool { + return ObjectRequestHasRequiredFields(qc) +} + +func (qc *NfsPermissionCreateRequest) String() string { + return fmt.Sprintln("NfsPermissionCreateRequest(FS:", qc.Filesystem) +} + +func (a *ApiClient) CreateNfsPermission(ctx context.Context, r *NfsPermissionCreateRequest, p *NfsPermission) error { + op := "CreateNfsPermission" + ctx, span := otel.Tracer(TracerName).Start(ctx, op) + defer span.End() + ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) + if !r.hasRequiredFields() { + return RequestMissingParams + } + payload, err := json.Marshal(r) + if err != nil { + return err + } + + err = a.Post(ctx, r.getRelatedObject().GetBasePath(a), &payload, nil, p) + return err +} + +func EnsureNfsPermission(ctx context.Context, fsName string, group string, apiClient *ApiClient) error { + perm := &NfsPermission{ + SupportedVersions: []NfsVersionString{NfsVersionV4}, + AnonUid: strconv.Itoa(65534), + AnonGid: strconv.Itoa(65534), + Filesystem: fsName, + Group: group, + PermissionType: NfsPermissionTypeReadWrite, + Path: "/", + SquashMode: NfsPermissionSquashModeNone, + } + _, err := apiClient.GetNfsPermissionByFilter(ctx, perm) + if err != nil { + if err == ObjectNotFoundError { + req := &NfsPermissionCreateRequest{ + Filesystem: fsName, + Group: group, + Path: "/", + PermissionType: NfsPermissionTypeReadWrite, + SquashMode: NfsPermissionSquashModeNone, + AnonGid: 65534, + AnonUid: 65534, + SupportedVersions: &[]string{string(NfsVersionV4)}, + } + return apiClient.CreateNfsPermission(ctx, req, perm) + } + } + return err +} + +type NfsClientGroup struct { + Uid uuid.UUID `json:"uid,omitempty" url:"-"` + Rules []NfsClientGroupRule `json:"rules,omitempty" url:"-"` + Id string `json:"id,omitempty" url:"-"` + Name string `json:"name,omitempty" url:"name,omitempty"` +} + +func (g *NfsClientGroup) GetType() string { + return "clientGroup" +} + +func (g *NfsClientGroup) GetBasePath(a *ApiClient) string { + return "nfs/clientGroups" +} + +func (g *NfsClientGroup) GetApiUrl(a *ApiClient) string { + url, err := urlutil.URLJoin(g.GetBasePath(a), g.Uid.String()) + if err == nil { + return url + } + return "" +} + +func (g *NfsClientGroup) EQ(other ApiObject) bool { + return ObjectsAreEqual(g, other) +} + +func (g *NfsClientGroup) getImmutableFields() []string { + return []string{"Name"} +} + +func (g *NfsClientGroup) String() string { + return fmt.Sprintln("NfsClientGroup name:", g.Name) +} + +func (a *ApiClient) GetNfsClientGroups(ctx context.Context, clientGroups *[]NfsClientGroup) error { + cg := &NfsClientGroup{} + + err := a.Get(ctx, cg.GetBasePath(a), nil, clientGroups) + if err != nil { + return err + } + return nil +} + +func (a *ApiClient) FindNfsClientGroupsByFilter(ctx context.Context, query *NfsClientGroup, resultSet *[]NfsClientGroup) error { + op := "FindNfsClientGroupsByFilter" + ctx, span := otel.Tracer(TracerName).Start(ctx, op) + defer span.End() + ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) + logger := log.Ctx(ctx) + logger.Trace().Str("client_group_query", query.String()).Msg("Finding client groups by filter") + ret := &[]NfsClientGroup{} + q, _ := qs.Values(query) + err := a.Get(ctx, query.GetBasePath(a), q, ret) + if err != nil { + return err + } + for _, r := range *ret { + if r.EQ(query) { + *resultSet = append(*resultSet, r) + } + } + return nil +} + +// GetNfsClientGroupByFilter expected to return exactly one result of FindNfsClientGroupsByFilter (error) +func (a *ApiClient) GetNfsClientGroupByFilter(ctx context.Context, query *NfsClientGroup) (*NfsClientGroup, error) { + op := "GetNfsClientGroupByFilter" + ctx, span := otel.Tracer(TracerName).Start(ctx, op) + defer span.End() + ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) + logger := log.Ctx(ctx) + rs := &[]NfsClientGroup{} + err := a.FindNfsClientGroupsByFilter(ctx, query, rs) + logger.Trace().Str("client_group", query.String()).Msg("Getting client group by filter") + if err != nil { + return &NfsClientGroup{}, err + } + if *rs == nil || len(*rs) == 0 { + return &NfsClientGroup{}, ObjectNotFoundError + } + if len(*rs) > 1 { + return &NfsClientGroup{}, MultipleObjectsFoundError + } + result := &(*rs)[0] + return result, nil +} + +func (a *ApiClient) GetNfsClientGroupByName(ctx context.Context, name string) (*NfsClientGroup, error) { + query := &NfsClientGroup{Name: name} + return a.GetNfsClientGroupByFilter(ctx, query) +} + +func (a *ApiClient) GetNfsClientGroupByUid(ctx context.Context, uid uuid.UUID, cg *NfsClientGroup) error { + ret := &NfsClientGroup{ + Uid: uid, + } + err := a.Get(ctx, ret.GetApiUrl(a), nil, cg) + if err != nil { + switch t := err.(type) { + case *ApiNotFoundError: + return ObjectNotFoundError + case *ApiBadRequestError: + for _, c := range t.ApiResponse.ErrorCodes { + if c == "ClientGroupDoesNotExistException" { + return ObjectNotFoundError + } + } + default: + return err + } + } + return nil + +} + +func (a *ApiClient) CreateNfsClientGroup(ctx context.Context, r *NfsClientGroupCreateRequest, fs *NfsClientGroup) error { + op := "CreateNfsClientGroup" + ctx, span := otel.Tracer(TracerName).Start(ctx, op) + defer span.End() + ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) + if !r.hasRequiredFields() { + return RequestMissingParams + } + payload, err := json.Marshal(r) + if err != nil { + return err + } + + err = a.Post(ctx, r.getRelatedObject().GetBasePath(a), &payload, nil, fs) + return err +} + +func (a *ApiClient) EnsureCsiPluginNfsClientGroup(ctx context.Context) (*NfsClientGroup, error) { + op := "EnsureCsiPluginNfsClientGroup" + ctx, span := otel.Tracer(TracerName).Start(ctx, op) + defer span.End() + ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) + logger := log.Ctx(ctx) + var ret *NfsClientGroup + logger.Trace().Str("client_group_name", NfsClientGroupName).Msg("Getting client group by name") + ret, err := a.GetNfsClientGroupByName(ctx, NfsClientGroupName) + if err != nil { + if err != ObjectNotFoundError { + logger.Error().Err(err).Msg("Failed to get client group by name") + return ret, err + } else { + logger.Trace().Str("client_group_name", NfsClientGroupName).Msg("Existing client group not found, creating client group") + err = a.CreateNfsClientGroup(ctx, NewNfsClientGroupCreateRequest(NfsClientGroupName), ret) + } + } + return ret, nil +} + +type NfsClientGroupCreateRequest struct { + Name string `json:"name"` +} + +func (fsc *NfsClientGroupCreateRequest) getApiUrl(a *ApiClient) string { + return fsc.getRelatedObject().GetBasePath(a) +} + +func (fsc *NfsClientGroupCreateRequest) getRequiredFields() []string { + return []string{"Name"} +} + +func (fsc *NfsClientGroupCreateRequest) hasRequiredFields() bool { + return ObjectRequestHasRequiredFields(fsc) +} +func (fsc *NfsClientGroupCreateRequest) getRelatedObject() ApiObject { + return &NfsClientGroup{} +} + +func (fsc *NfsClientGroupCreateRequest) String() string { + return fmt.Sprintln("NfsClientGroupCreateRequest(name:", fsc.Name) +} + +func NewNfsClientGroupCreateRequest(name string) *NfsClientGroupCreateRequest { + return &NfsClientGroupCreateRequest{ + Name: name, + } +} + +type NfsClientGroupDeleteRequest struct { + Uid uuid.UUID `json:"-"` +} + +func (cgd *NfsClientGroupDeleteRequest) getApiUrl(a *ApiClient) string { + return cgd.getRelatedObject().GetApiUrl(a) +} + +func (cgd *NfsClientGroupDeleteRequest) getRelatedObject() ApiObject { + return &NfsClientGroup{Uid: cgd.Uid} +} + +func (cgd *NfsClientGroupDeleteRequest) getRequiredFields() []string { + return []string{"Uid"} +} + +func (cgd *NfsClientGroupDeleteRequest) hasRequiredFields() bool { + return ObjectRequestHasRequiredFields(cgd) +} + +func (cgd *NfsClientGroupDeleteRequest) String() string { + return fmt.Sprintln("NfsClientGroupDeleteRequest(uid:", cgd.Uid) +} + +func (a *ApiClient) DeleteNfsClientGroup(ctx context.Context, r *NfsClientGroupDeleteRequest) error { + op := "DeleteNfsClientGroup" + ctx, span := otel.Tracer(TracerName).Start(ctx, op) + defer span.End() + ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) + if !r.hasRequiredFields() { + return RequestMissingParams + } + apiResponse := &ApiResponse{} + err := a.Delete(ctx, r.getApiUrl(a), nil, nil, apiResponse) + if err != nil { + switch t := err.(type) { + case *ApiNotFoundError: + return ObjectNotFoundError + case *ApiBadRequestError: + for _, c := range t.ApiResponse.ErrorCodes { + if c == "FilesystemDoesNotExistException" { + return ObjectNotFoundError + } + } + } + } + return nil +} + +type NfsClientGroupRule struct { + NfsClientGroupUid uuid.UUID `json:"-" url:"-"` + Type NfsClientGroupRuleType `json:"type,omitempty" url:"-"` + Uid uuid.UUID `json:"uid,omitempty" url:"-"` + Rule string `json:"rule,omitempty" url:"-"` + Id string `json:"id,omitempty" url:"-"` +} + +func (r *NfsClientGroupRule) GetType() string { + return "rules" +} + +func (r *NfsClientGroupRule) GetBasePath(a *ApiClient) string { + ncgUrl := (&NfsClientGroup{Uid: r.Uid}).GetApiUrl(a) + url, err := urlutil.URLJoin(ncgUrl, r.GetType()) + if err != nil { + return "" + } + return url +} + +func (r *NfsClientGroupRule) GetApiUrl(a *ApiClient) string { + url, err := urlutil.URLJoin(r.GetBasePath(a), r.Uid.String()) + if err != nil { + return url + } + return "" +} + +func (r *NfsClientGroupRule) EQ(other ApiObject) bool { + return ObjectsAreEqual(r, other) +} + +func (r *NfsClientGroupRule) getImmutableFields() []string { + return []string{"Rule"} +} + +func (r *NfsClientGroupRule) String() string { + return fmt.Sprintln("NfsClientGroupRule Uid:", r.Uid.String(), "clientGroupUid:", r.NfsClientGroupUid.String(), + "type:", r.Type, "rule", r.Rule) +} + +func (r *NfsClientGroupRule) IsIPRule() bool { + return r.Type == NfsClientGroupRuleTypeIP +} + +func (r *NfsClientGroupRule) IsDNSRule() bool { + return r.Type == NfsClientGroupRuleTypeDNS +} + +func (r *NfsClientGroupRule) GetNetwork() *Network { + if !r.IsIPRule() { + return nil + } + n, err := parseNetworkString(r.Rule) + if err != nil { + return nil + } + return n +} + +func (r *NfsClientGroupRule) IsEligibleForIP(ip string) bool { + network := r.GetNetwork() + if network == nil { + return false + } + return network.ContainsIPAddress(ip) +} + +func (a *ApiClient) GetNfsClientGroupRules(ctx context.Context, rules *[]NfsClientGroupRule) error { + op := "GetNfsClientGroupRules" + ctx, span := otel.Tracer(TracerName).Start(ctx, op) + defer span.End() + ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) + cg, err := a.EnsureCsiPluginNfsClientGroup(ctx) + if err != nil { + return err + } + copiedRules := make([]NfsClientGroupRule, len(cg.Rules)) + copy(copiedRules, cg.Rules) + *rules = copiedRules + return nil +} + +func (a *ApiClient) FindNfsClientGroupRulesByFilter(ctx context.Context, query *NfsClientGroupRule, resultSet *[]NfsClientGroupRule) error { + op := "FindNfsClientGroupRulesByFilter" + ctx, span := otel.Tracer(TracerName).Start(ctx, op) + defer span.End() + ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) + logger := log.Ctx(ctx) + + // this is different that in other functions since we don't have /rules entry point for GET + // so we need to get the client group first + logger.Trace().Str("client_group_uid", query.NfsClientGroupUid.String()).Msg("Getting client group") + cg := &NfsClientGroup{} + err := a.GetNfsClientGroupByUid(ctx, query.NfsClientGroupUid, cg) + if cg == nil || err != nil { + return err + } + ret := cg.Rules + + for _, r := range ret { + if r.EQ(query) { + *resultSet = append(*resultSet, r) + } + } + return nil +} + +func (a *ApiClient) GetNfsClientGroupRuleByFilter(ctx context.Context, rule *NfsClientGroupRule) (*NfsClientGroupRule, error) { + op := "GetNfsClientGroupRuleByFilter" + ctx, span := otel.Tracer(TracerName).Start(ctx, op) + defer span.End() + ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) + rs := &[]NfsClientGroupRule{} + err := a.FindNfsClientGroupRulesByFilter(ctx, rule, rs) + if err != nil { + return &NfsClientGroupRule{}, err + } + if *rs == nil || len(*rs) == 0 { + return &NfsClientGroupRule{}, ObjectNotFoundError + } + if len(*rs) > 1 { + return &NfsClientGroupRule{}, MultipleObjectsFoundError + } + result := &(*rs)[0] + return result, nil +} + +type NfsClientGroupRuleCreateRequest struct { + NfsClientGroupUid uuid.UUID `json:"-"` + Type NfsClientGroupRuleType `json:"-"` + Hostname string `json:"dns,omitempty"` + Ip string `json:"ip,omitempty"` +} + +func (r *NfsClientGroupRuleCreateRequest) getType() string { + return "rules" +} + +func (r *NfsClientGroupRuleCreateRequest) getApiUrl(a *ApiClient) string { + ret, err := urlutil.URLJoin(r.getRelatedObject().GetApiUrl(a), r.getType()) + if err != nil { + return "" + } + return ret +} + +func (r *NfsClientGroupRuleCreateRequest) getRequiredFields() []string { + return []string{"Type"} +} + +func (r *NfsClientGroupRuleCreateRequest) hasRequiredFields() bool { + return ObjectRequestHasRequiredFields(r) +} + +func (r *NfsClientGroupRuleCreateRequest) getRelatedObject() ApiObject { + return &NfsClientGroup{Uid: r.NfsClientGroupUid} +} + +func (r *NfsClientGroupRuleCreateRequest) String() string { + return fmt.Sprintln("NfsClientGroupRuleCreateRequest(NfsClientGroupUid:", r.NfsClientGroupUid, "Type:", r.Type) +} + +func (r *NfsClientGroupRuleCreateRequest) AsRule() string { + if r.Type == NfsClientGroupRuleTypeDNS { + return r.Hostname + } + return r.Ip +} + +func NewNfsClientGroupRuleCreateRequest(cgUid uuid.UUID, ruleType NfsClientGroupRuleType, rule string) *NfsClientGroupRuleCreateRequest { + + ret := &NfsClientGroupRuleCreateRequest{ + NfsClientGroupUid: cgUid, + Type: ruleType, + } + if ruleType == NfsClientGroupRuleTypeDNS { + ret.Hostname = rule + } else if ruleType == NfsClientGroupRuleTypeIP { + net, err := parseNetworkString(rule) + if err != nil { + return nil + } + ret.Ip = net.AsNfsRule() + } + return ret +} + +func (a *ApiClient) CreateNfsClientGroupRule(ctx context.Context, r *NfsClientGroupRuleCreateRequest, rule *NfsClientGroupRule) error { + op := "CreateNfsClientGroupRule" + ctx, span := otel.Tracer(TracerName).Start(ctx, op) + defer span.End() + ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) + logger := log.Ctx(ctx) + logger.Trace().Str("client_group_rule", r.String()).Msg("Creating client group rule") + + if !r.hasRequiredFields() { + return RequestMissingParams + } + + payload, err := json.Marshal(r) + if err != nil { + return err + } + + err = a.Post(ctx, r.getApiUrl(a), &payload, nil, rule) + return err +} + +func (a *ApiClient) EnsureNfsClientGroupRuleForIp(ctx context.Context, cg *NfsClientGroup, ip string) error { + if cg == nil { + return errors.New("NfsClientGroup is nil") + } + r, err := parseNetworkString(ip) + if err != nil { + return err + } + + q := &NfsClientGroupRule{Type: NfsClientGroupRuleTypeIP, Rule: r.AsNfsRule(), NfsClientGroupUid: cg.Uid} + + rule, err := a.GetNfsClientGroupRuleByFilter(ctx, q) + if err != nil { + if err == ObjectNotFoundError { + req := NewNfsClientGroupRuleCreateRequest(cg.Uid, q.Type, q.Rule) + return a.CreateNfsClientGroupRule(ctx, req, rule) + } + } + return err +} + +func (a *ApiClient) EnsureNfsPermissions(ctx context.Context, ip string, fsName string) error { + op := "EnsureNfsPermissions" + ctx, span := otel.Tracer(TracerName).Start(ctx, op) + defer span.End() + ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) + logger := log.Ctx(ctx) + logger.Debug().Str("ip", ip).Str("filesystem", fsName).Msg("Ensuring NFS permissions") + // Ensure client group + logger.Trace().Msg("Ensuring CSI Plugin NFS Client Group") + cg, err := a.EnsureCsiPluginNfsClientGroup(ctx) + if err != nil { + logger.Error().Err(err).Msg("Failed to ensure NFS client group") + return err + } + + // Ensure client group rule + logger.Trace().Str("ip_address", ip).Msg("Ensuring NFS Client Group Rule for IP") + err = a.EnsureNfsClientGroupRuleForIp(ctx, cg, ip) + if err != nil { + return err + } + // Ensure NFS permission + logger.Trace().Str("filesystem", fsName).Str("client_group", cg.Name).Msg("Ensuring NFS Export for client group") + err = EnsureNfsPermission(ctx, fsName, cg.Name, a) + return err +} diff --git a/pkg/wekafs/apiclient/nfs_test.go b/pkg/wekafs/apiclient/nfs_test.go new file mode 100644 index 000000000..8dd41e2ea --- /dev/null +++ b/pkg/wekafs/apiclient/nfs_test.go @@ -0,0 +1,314 @@ +package apiclient + +import ( + "context" + "flag" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/rand" + "testing" +) + +var creds Credentials +var endpoint string +var fsName string + +var client *ApiClient + +func TestMain(m *testing.M) { + flag.StringVar(&endpoint, "api-endpoint", "vm49-1723969301909816-0.lan:14000", "API endpoint for tests") + flag.StringVar(&creds.Username, "api-username", "admin", "API username for tests") + flag.StringVar(&creds.Password, "api-password", "AAbb1234", "API password for tests") + flag.StringVar(&creds.Organization, "api-org", "Root", "API org for tests") + flag.StringVar(&creds.HttpScheme, "api-scheme", "https", "API scheme for tests") + flag.StringVar(&fsName, "fs-name", "default", "Filesystem name for tests") + flag.Parse() + m.Run() +} + +func GetApiClientForTest(t *testing.T) *ApiClient { + creds.Endpoints = []string{endpoint} + if client == nil { + apiClient, err := NewApiClient(context.Background(), creds, true, "test") + if err != nil { + t.Fatalf("Failed to create API client: %v", err) + } + if apiClient == nil { + t.Fatalf("Failed to create API client") + } + if err := apiClient.Login(context.Background()); err != nil { + t.Fatalf("Failed to login: %v", err) + } + client = apiClient + } + return client +} + +// +//func TestGetNfsPermissions(t *testing.T) { +// apiClient := GetApiClientForTest(t) +// +// var permissions []NfsPermission +// +// req := &NfsPermissionCreateRequest{ +// Filesystem: fsName, +// Group: "group1", +// } +// p := &NfsPermission{} +// err := apiClient.CreateNfsPermission(context.Background(), &NfsPermissionCreateRequest{}, p) +// assert.NoError(t, err) +// assert.NotZero(t, p.Uid) +// +// err := apiClient.GetNfsPermissions(context.Background(), &permissions) +// assert.NoError(t, err) +// assert.NotEmpty(t, permissions) +//} +// +//func TestFindNfsPermissionsByFilter(t *testing.T) { +// apiClient := GetApiClientForTest(t) +// query := &NfsPermission{Filesystem: "fs1"} +// var resultSet []NfsPermission +// err := apiClient.FindNfsPermissionsByFilter(context.Background(), query, &resultSet) +// assert.NoError(t, err) +// assert.NotEmpty(t, resultSet) +//} +// +//func TestGetNfsPermissionByFilter(t *testing.T) { +// apiClient := GetApiClientForTest(t) +// +// query := &NfsPermission{Filesystem: "fs1"} +// result, err := apiClient.GetNfsPermissionByFilter(context.Background(), query) +// assert.NoError(t, err) +// assert.NotNil(t, result) +//} +// +//func TestGetNfsPermissionsByFilesystemName(t *testing.T) { +// apiClient := GetApiClientForTest(t) +// +// +// var permissions []NfsPermission +// err := apiClient.GetNfsPermissionsByFilesystemName(context.Background(), "fs1", &permissions) +// assert.NoError(t, err) +// assert.NotEmpty(t, permissions) +//} +// +//func TestGetNfsPermissionByUid(t *testing.T) { +// apiClient := GetApiClientForTest(t) +// server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +// w.WriteHeader(http.StatusOK) +// w.Write([]byte(`{"filesystem": "fs1", "group": "group1"}`)) +// })) +// defer server.Close() +// +// +// uid := uuid.New() +// result, err := apiClient.GetNfsPermissionByUid(context.Background(), uid) +// assert.NoError(t, err) +// assert.NotNil(t, result) +//} +// +//func TestCreateNfsPermission(t *testing.T) { +// apiClient := GetApiClientForTest(t) +// +// req := &NfsPermissionCreateRequest{ +// Filesystem: "fs1", +// Group: "group1", +// SquashMode: NfsPermissionSquashModeNone, +// AnonUid: 1000, +// AnonGid: 1000, +// EnableAuthTypes: []NfsAuthType{NfsAuthTypeSys}, +// } +// var perm NfsPermission +// err := apiClient.CreateNfsPermission(context.Background(), req, &perm) +// assert.NoError(t, err) +// assert.NotNil(t, perm) +//} +// +//func TestEnsureNfsPermission(t *testing.T) { +// apiClient := GetApiClientForTest(t) +// server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +// w.WriteHeader(http.StatusOK) +// w.Write([]byte(`{"filesystem": "fs1", "group": "group1"}`)) +// })) +// defer server.Close() +// +// +// err := EnsureNfsPermission(context.Background(), "fs1", "group1", apiClient) +// assert.NoError(t, err) +//} + +func TestNfsClientGroup(t *testing.T) { + apiClient := GetApiClientForTest(t) + + var clientGroups []NfsClientGroup + var cg = &NfsClientGroup{ + Uid: uuid.New(), + } + // Test GetApiUrl + assert.NotEmpty(t, cg.GetApiUrl(apiClient)) + assert.Contains(t, cg.GetApiUrl(apiClient), cg.Uid.String()) + + // Test EQ + cg1 := &NfsClientGroup{ + Name: "test", + } + + cg2 := &NfsClientGroup{ + Name: "test", + } + assert.True(t, cg1.EQ(cg2)) + + // Test GetBasePath + assert.NotEmpty(t, cg.GetBasePath(apiClient)) + + // Test Create + cgName := rand.String(10) + err := apiClient.CreateNfsClientGroup(context.Background(), &NfsClientGroupCreateRequest{Name: cgName}, cg) + assert.NotEmpty(t, cg.Uid) + assert.NoError(t, err) + assert.Equal(t, cgName, cg.Name) + assert.Empty(t, cg.Rules) + + // Test GetGroups + err = apiClient.GetNfsClientGroups(context.Background(), &clientGroups) + assert.NoError(t, err) + assert.NotEmpty(t, clientGroups) + + // Test GetGroupByUid + uid := cg.Uid + err = apiClient.GetNfsClientGroupByUid(context.Background(), uid, cg) + assert.NoError(t, err) + assert.Equal(t, cgName, cg.Name) + assert.NotEmpty(t, cg.Uid) + + // Test GetGroupByName + name := cg.Name + cg, err = apiClient.GetNfsClientGroupByName(context.Background(), name) + assert.NoError(t, err) + assert.Equal(t, cgName, cg.Name) + assert.NotEmpty(t, cg.Uid) + + // Test Delete + r := &NfsClientGroupDeleteRequest{Uid: cg.Uid} + err = apiClient.DeleteNfsClientGroup(context.Background(), r) + assert.NoError(t, err) + err = apiClient.GetNfsClientGroups(context.Background(), &clientGroups) + assert.NoError(t, err) + for _, r := range clientGroups { + if r.Uid == cg.Uid { + t.Errorf("Failed to delete group") + } + } +} + +func TestEnsureCsiPluginNfsClientGroup(t *testing.T) { + apiClient := GetApiClientForTest(t) + result, err := apiClient.EnsureCsiPluginNfsClientGroup(context.Background()) + assert.NoError(t, err) + assert.NotNil(t, result) +} + +func TestNfsClientGroupRules(t *testing.T) { + apiClient := GetApiClientForTest(t) + cg, err := apiClient.EnsureCsiPluginNfsClientGroup(context.Background()) + assert.NoError(t, err) + assert.NotNil(t, cg) + + // Test Create + r := &NfsClientGroupRule{} + //r2 := &NfsClientGroupRule{} + + req1 := NewNfsClientGroupRuleCreateRequest(cg.Uid, NfsClientGroupRuleTypeIP, "192.168.1.1") + req2 := NewNfsClientGroupRuleCreateRequest(cg.Uid, NfsClientGroupRuleTypeIP, "192.168.2.0/24") + req3 := NewNfsClientGroupRuleCreateRequest(cg.Uid, NfsClientGroupRuleTypeIP, "192.168.3.0/255.255.255.255") + req4 := NewNfsClientGroupRuleCreateRequest(cg.Uid, NfsClientGroupRuleTypeDNS, "test-hostname") +outerLoop: + for _, req := range []*NfsClientGroupRuleCreateRequest{req1, req2, req3, req4} { + for _, rule := range cg.Rules { + if rule.Type == req.Type && rule.Rule == req.AsRule() { + continue outerLoop + } + } + assert.NotNil(t, req) + //req2 := &NfsClientGroupRuleCreateRequest{Type: NfsClientGroupRuleTypeDNS, Hostname: "test-hostname", NfsClientGroupUid: cg.Uid} + + err = apiClient.CreateNfsClientGroupRule(context.Background(), req, r) + assert.NoError(t, err) + } + rules := &[]NfsClientGroupRule{} + err = apiClient.GetNfsClientGroupRules(context.Background(), rules) + assert.NoError(t, err) + assert.NotEmpty(t, rules) + for _, rule := range *rules { + assert.NotEmpty(t, rule.Uid) + assert.NotEmpty(t, rule.Type) + assert.NotEmpty(t, rule.Rule) + assert.NotEmpty(t, rule.Id) + } +} + +func TestNfsEnsureNfsPermissions(t *testing.T) { + apiClient := GetApiClientForTest(t) + + // Test EnsureNfsPermission + err := apiClient.EnsureNfsPermissions(context.Background(), "172.16.5.147", "default") + assert.NoError(t, err) +} + +func TestInterfaceGroup(t *testing.T) { + apiClient := GetApiClientForTest(t) + + var igs []InterfaceGroup + var ig = &InterfaceGroup{ + Uid: uuid.New(), + } + // Test GetApiUrl + assert.NotEmpty(t, ig.GetApiUrl(apiClient)) + assert.Contains(t, ig.GetApiUrl(apiClient), ig.Uid.String()) + + // Test EQ + ig1 := &InterfaceGroup{ + Name: "test", + } + + ig2 := &InterfaceGroup{ + Name: "test", + } + assert.True(t, ig1.EQ(ig2)) + + // Test GetBasePath + assert.NotEmpty(t, ig.GetBasePath(apiClient)) + + // Test Create + // Test GetGroups + err := apiClient.GetInterfaceGroups(context.Background(), &igs) + assert.NoError(t, err) + assert.NotEmpty(t, igs) + assert.NotEmpty(t, igs[0].Ips) + // + //// Test GetGroupByUid + //uid := ig.Uid + //err = apiClient.GetInterfaceGroupByUid(context.Background(), uid, ig) + //assert.NoError(t, err) + //assert.Equal(t, igName, ig.Name) + //assert.NotEmpty(t, ig.Uid) + // + //// Test GetGroupByName + //name := ig.Name + //ig, err = apiClient.GetInterfaceGroupByName(context.Background(), name) + //assert.NoError(t, err) + //assert.Equal(t, igName, ig.Name) + //assert.NotEmpty(t, ig.Uid) + // + //// Test Delete + //r := &InterfaceGroupDeleteRequest{Uid: ig.Uid} + //err = apiClient.DeleteInterfaceGroup(context.Background(), r) + //assert.NoError(t, err) + //err = apiClient.GetInterfaceGroups(context.Background(), &igs) + //assert.NoError(t, err) + //for _, r := range igs { + // if r.Uid == ig.Uid { + // t.Errorf("Failed to delete group") + // } + //} +} diff --git a/pkg/wekafs/apiclient/utils.go b/pkg/wekafs/apiclient/utils.go index 498c49e6e..a96cedf0d 100644 --- a/pkg/wekafs/apiclient/utils.go +++ b/pkg/wekafs/apiclient/utils.go @@ -1,8 +1,13 @@ package apiclient import ( + "fmt" "github.com/rs/zerolog/log" + "hash/fnv" + "net" + "os" "reflect" + "strings" ) // ObjectsAreEqual returns true if both ApiObject have same immutable fields (other fields and nil fields are disregarded) @@ -39,3 +44,79 @@ func ObjectRequestHasRequiredFields(o ApiObjectRequest) bool { } return true } + +// hashString is a simple hash function that takes a string and returns a hash value in the range [0, n) +func hashString(s string, n int) int { + if n == 0 { + return 0 + } + + // Create a new FNV-1a hash + h := fnv.New32a() + + // Write the string to the hash + _, _ = h.Write([]byte(s)) + + // Get the hash sum as a uint32 + hashValue := h.Sum32() + + // Return the hash value in the range of [0, n) + return int(hashValue % uint32(n)) +} + +type Network struct { + IP net.IP + Subnet *net.IP +} + +func (n *Network) AsNfsRule() string { + return fmt.Sprintf("%s/%s", n.IP.String(), n.Subnet.String()) +} + +func parseNetworkString(s string) (*Network, error) { + var ip, subnet net.IP + if strings.Contains(s, "/") { + parts := strings.Split(s, "/") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid CIDR notation: %s", s) + } + ip = net.ParseIP(parts[0]) + subnet = net.ParseIP(parts[1]) + if subnet == nil { + _, ipNet, err := net.ParseCIDR(s) + if err != nil { + return nil, fmt.Errorf("invalid CIDR notation: %s", s) + } + subnet = net.IP(ipNet.Mask) + } + } else { + ip = net.ParseIP(s) + subnet = net.ParseIP("255.255.255.255") + } + + return &Network{ + IP: ip, + Subnet: &subnet, + }, nil +} + +func (n *Network) ContainsIPAddress(ipStr string) bool { + ip := net.ParseIP(ipStr) + if ip == nil { + return false + } + + _, ipNet, err := net.ParseCIDR(fmt.Sprintf("%s/%s", n.IP.String(), n.Subnet.String())) + if err != nil { + return false + } + return ipNet.Contains(ip) +} + +func GetNodeIpAddress() string { + ret := os.Getenv("KUBE_NODE_IP_ADDRESS") + if ret != "" { + return ret + } + return "127.0.0.1" +} diff --git a/pkg/wekafs/apiclient/utils_test.go b/pkg/wekafs/apiclient/utils_test.go new file mode 100644 index 000000000..ede39534c --- /dev/null +++ b/pkg/wekafs/apiclient/utils_test.go @@ -0,0 +1,29 @@ +package apiclient + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHashString(t *testing.T) { + testCases := []struct { + input string + n int + expected int + }{ + {"test", 10, 5}, + {"example", 10, 9}, + {"hash", 10, 1}, + {"string", 10, 8}, + {"", 10, 1}, + {"osi415-zbjgk-worker-0-t6g55", 10, 5}, + } + + for _, tc := range testCases { + t.Run(tc.input, func(t *testing.T) { + result := hashString(tc.input, tc.n) + assert.Equal(t, tc.expected, result) + }) + } +} diff --git a/pkg/wekafs/controllerserver.go b/pkg/wekafs/controllerserver.go index 3b71ef0a4..dd14a9094 100644 --- a/pkg/wekafs/controllerserver.go +++ b/pkg/wekafs/controllerserver.go @@ -44,7 +44,7 @@ type ControllerServer struct { csi.UnimplementedControllerServer caps []*csi.ControllerServiceCapability nodeID string - mounter *wekaMounter + mounter AnyMounter api *ApiStore config *DriverConfig semaphores map[string]*semaphore.Weighted @@ -67,7 +67,7 @@ func (cs *ControllerServer) getConfig() *DriverConfig { return cs.config } -func (cs *ControllerServer) getMounter() *wekaMounter { +func (cs *ControllerServer) getMounter() AnyMounter { return cs.mounter } @@ -105,7 +105,7 @@ func (cs *ControllerServer) ControllerModifyVolume(context.Context, *csi.Control panic("implement me") } -func NewControllerServer(nodeID string, api *ApiStore, mounter *wekaMounter, config *DriverConfig) *ControllerServer { +func NewControllerServer(nodeID string, api *ApiStore, mounter AnyMounter, config *DriverConfig) *ControllerServer { exposedCapabilities := []csi.ControllerServiceCapability_RPC_Type{ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, diff --git a/pkg/wekafs/driverconfig.go b/pkg/wekafs/driverconfig.go index 3f3b0611d..d88391830 100644 --- a/pkg/wekafs/driverconfig.go +++ b/pkg/wekafs/driverconfig.go @@ -33,15 +33,36 @@ type DriverConfig struct { maxConcurrencyPerOp map[string]int64 grpcRequestTimeout time.Duration allowProtocolContainers bool + allowNfsFailback bool + useNfs bool + interfaceGroupName *string } func (dc *DriverConfig) Log() { + igName := "<>" + if dc.interfaceGroupName != nil { + igName = *dc.interfaceGroupName + } log.Info().Str("dynamic_vol_path", dc.DynamicVolPath). Str("volume_prefix", dc.VolumePrefix).Str("snapshot_prefix", dc.SnapshotPrefix).Str("seed_snapshot_prefix", dc.SnapshotPrefix). Bool("allow_auto_fs_creation", dc.allowAutoFsCreation).Bool("allow_auto_fs_expansion", dc.allowAutoFsExpansion). Bool("advertise_snapshot_support", dc.advertiseSnapshotSupport).Bool("advertise_volume_clone_support", dc.advertiseVolumeCloneSupport). Bool("allow_insecure_https", dc.allowInsecureHttps).Bool("always_allow_snapshot_volumes", dc.alwaysAllowSnapshotVolumes). - Interface("mutually_exclusive_mount_options", dc.mutuallyExclusiveOptions).Msg("Starting driver with the following configuration") + Interface("mutually_exclusive_mount_options", dc.mutuallyExclusiveOptions). + Int64("max_create_volume_reqs", dc.maxConcurrencyPerOp["CreateVolume"]). + Int64("max_delete_volume_reqs", dc.maxConcurrencyPerOp["DeleteVolume"]). + Int64("max_expand_volume_reqs", dc.maxConcurrencyPerOp["ExpandVolume"]). + Int64("max_create_snapshot_reqs", dc.maxConcurrencyPerOp["CreateSnapshot"]). + Int64("max_delete_snapshot_reqs", dc.maxConcurrencyPerOp["DeleteSnapshot"]). + Int64("max_node_publish_volume_reqs", dc.maxConcurrencyPerOp["NodePublishVolume"]). + Int64("max_node_unpublish_volume_reqs", dc.maxConcurrencyPerOp["NodeUnpublishVolume"]). + Int("grpc_request_timeout_seconds", int(dc.grpcRequestTimeout.Seconds())). + Bool("allow_protocol_containers", dc.allowProtocolContainers). + Bool("allow_nfs_failback", dc.allowNfsFailback). + Bool("use_nfs", dc.useNfs). + Str("interface_group_name", igName). + Msg("Starting driver with the following configuration") + } func NewDriverConfig(dynamicVolPath, VolumePrefix, SnapshotPrefix, SeedSnapshotPrefix, debugPath string, allowAutoFsCreation, allowAutoFsExpansion, allowSnapshotsOfLegacyVolumes bool, @@ -50,6 +71,8 @@ func NewDriverConfig(dynamicVolPath, VolumePrefix, SnapshotPrefix, SeedSnapshotP maxCreateVolumeReqs, maxDeleteVolumeReqs, maxExpandVolumeReqs, maxCreateSnapshotReqs, maxDeleteSnapshotReqs, maxNodePublishVolumeReqs, maxNodeUnpublishVolumeReqs int64, grpcRequestTimeoutSeconds int, allowProtocolContainers bool, + allowNfsFailback, useNfs bool, + interfaceGroupName string, ) *DriverConfig { var MutuallyExclusiveMountOptions []mutuallyExclusiveMountOptionSet @@ -72,6 +95,11 @@ func NewDriverConfig(dynamicVolPath, VolumePrefix, SnapshotPrefix, SeedSnapshotP concurrency["NodePublishVolume"] = maxNodePublishVolumeReqs concurrency["NodeUnpublishVolume"] = maxNodeUnpublishVolumeReqs + igName := &[]string{interfaceGroupName}[0] + if interfaceGroupName == "" { + igName = nil + } + return &DriverConfig{ DynamicVolPath: dynamicVolPath, VolumePrefix: VolumePrefix, @@ -89,6 +117,9 @@ func NewDriverConfig(dynamicVolPath, VolumePrefix, SnapshotPrefix, SeedSnapshotP maxConcurrencyPerOp: concurrency, grpcRequestTimeout: grpcRequestTimeout, allowProtocolContainers: allowProtocolContainers, + allowNfsFailback: allowNfsFailback, + useNfs: useNfs, + interfaceGroupName: igName, } } diff --git a/pkg/wekafs/gc.go b/pkg/wekafs/gc.go index 304758f02..604799207 100644 --- a/pkg/wekafs/gc.go +++ b/pkg/wekafs/gc.go @@ -21,10 +21,10 @@ type innerPathVolGc struct { isRunning map[string]bool isDeferred map[string]bool sync.Mutex - mounter *wekaMounter + mounter AnyMounter } -func initInnerPathVolumeGc(mounter *wekaMounter) *innerPathVolGc { +func initInnerPathVolumeGc(mounter AnyMounter) *innerPathVolGc { gc := innerPathVolGc{mounter: mounter} gc.isRunning = make(map[string]bool) gc.isDeferred = make(map[string]bool) diff --git a/pkg/wekafs/identityserver.go b/pkg/wekafs/identityserver.go index 898347c80..132cf746d 100644 --- a/pkg/wekafs/identityserver.go +++ b/pkg/wekafs/identityserver.go @@ -79,10 +79,15 @@ func (ids *identityServer) getConfig() *DriverConfig { } func (ids *identityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) { + logger := log.Ctx(ctx) isReady := ids.getConfig().isInDevMode() || isWekaInstalled() if !isReady { - logger := log.Ctx(ctx) - logger.Error().Msg("Weka driver not running on host, not ready to perform operations") + if ids.getConfig().useNfs || ids.getConfig().allowNfsFailback { + isReady = true + } + } + if !isReady { + logger.Error().Msg("Weka driver not running on host and NFS transport is not configured, not ready to perform operations") } return &csi.ProbeResponse{ Ready: &wrapperspb.BoolValue{ diff --git a/pkg/wekafs/interfaces.go b/pkg/wekafs/interfaces.go index 75af3f50f..13cc44648 100644 --- a/pkg/wekafs/interfaces.go +++ b/pkg/wekafs/interfaces.go @@ -1,10 +1,45 @@ package wekafs +import ( + "context" + "github.com/wekafs/csi-wekafs/pkg/wekafs/apiclient" + "time" +) + type AnyServer interface { - getMounter() *wekaMounter + getMounter() AnyMounter getApiStore() *ApiStore getConfig() *DriverConfig - isInDevMode() bool // TODO: Rename to isInDevMode + isInDevMode() bool getDefaultMountOptions() MountOptions getNodeId() string } + +type AnyMounter interface { + NewMount(fsName string, options MountOptions) AnyMount + mountWithOptions(ctx context.Context, fsName string, mountOptions MountOptions, apiClient *apiclient.ApiClient) (string, error, UnmountFunc) + Mount(ctx context.Context, fs string, apiClient *apiclient.ApiClient) (string, error, UnmountFunc) + unmountWithOptions(ctx context.Context, fsName string, options MountOptions) error + LogActiveMounts() + gcInactiveMounts() + schedulePeriodicMountGc() + getGarbageCollector() *innerPathVolGc +} + +type mountsMapPerFs map[string]AnyMount +type mountsMap map[string]mountsMapPerFs + +type UnmountFunc func() + +type AnyMount interface { + isInDevMode() bool + isMounted() bool + incRef(ctx context.Context, apiClient *apiclient.ApiClient) error + decRef(ctx context.Context) error + getRefCount() int + doUnmount(ctx context.Context) error + doMount(ctx context.Context, apiClient *apiclient.ApiClient, mountOptions MountOptions) error + getMountPoint() string + getMountOptions() MountOptions + getLastUsed() time.Time +} diff --git a/pkg/wekafs/mountoptions.go b/pkg/wekafs/mountoptions.go index 8636d3f4c..9e7f3568f 100644 --- a/pkg/wekafs/mountoptions.go +++ b/pkg/wekafs/mountoptions.go @@ -8,12 +8,15 @@ import ( ) const ( - selinuxContext = "wekafs_csi_volume" + selinuxContextWekaFs = "wekafs_csi_volume_t" + selinuxContextNfs = "nfs_t" MountOptionSyncOnClose = "sync_on_close" MountOptionReadOnly = "ro" MountOptionWriteCache = "writecache" MountOptionCoherent = "coherent" MountOptionReadCache = "readcache" + MountProtocolWekafs = "wekafs" + MountProtocolNfs = "nfs" ) type mountOption struct { @@ -155,15 +158,57 @@ func (opts MountOptions) AsMapKey() string { return ret.String() } -func (opts MountOptions) setSelinux(selinuxSupport bool) { +func (opts MountOptions) setSelinux(selinuxSupport bool, mountProtocol string) { if selinuxSupport { - o := newMountOptionFromString(fmt.Sprintf("fscontext=\"system_u:object_r:%s_t:s0\"", selinuxContext)) + var o mountOption + if mountProtocol == MountProtocolWekafs { + o = newMountOptionFromString(fmt.Sprintf("fscontext=\"system_u:object_r:%s:s0\"", selinuxContextWekaFs)) + } else if mountProtocol == MountProtocolNfs { + o = newMountOptionFromString(fmt.Sprintf("context=\"system_u:object_r:%s:s0\"", selinuxContextNfs)) + } opts.customOptions[o.option] = o } else { - delete(opts.customOptions, "fscontext") + if mountProtocol == MountProtocolWekafs { + delete(opts.customOptions, "fscontext") + } + if mountProtocol == MountProtocolNfs { + delete(opts.customOptions, "context") + } } } +func (opts MountOptions) AsNfs() MountOptions { + ret := NewMountOptionsFromString("hard,rdirplus") + for _, o := range opts.getOpts() { + switch o.option { + case "writecache": + ret.AddOption("async") + case "coherent": + ret.AddOption("sync") + case "forcedirect": + ret.AddOption("sync") + case "readcache": + ret.AddOption("noac") + case "dentry_max_age_positive": + ret.AddOption(fmt.Sprintf("acdirmax=%s", o.value)) + ret.AddOption(fmt.Sprintf("acregmax=%s", o.value)) + case "inode_bits": + continue + case "verbose": + continue + case "quiet": + continue + case "obs_direct": + continue + case "sync_on_close": + ret.AddOption("sync") + default: + continue + } + } + return ret +} + func NewMountOptionsFromString(optsString string) MountOptions { if optsString == "" { return NewMountOptions([]string{}) diff --git a/pkg/wekafs/nfsmount.go b/pkg/wekafs/nfsmount.go new file mode 100644 index 000000000..170a2ba3e --- /dev/null +++ b/pkg/wekafs/nfsmount.go @@ -0,0 +1,175 @@ +package wekafs + +import ( + "context" + "errors" + "fmt" + "github.com/rs/zerolog/log" + "github.com/wekafs/csi-wekafs/pkg/wekafs/apiclient" + "k8s.io/mount-utils" + "os" + "path/filepath" + "strings" + "sync" + "time" +) + +type nfsMount struct { + fsName string + mountPoint string + refCount int + lock sync.Mutex + kMounter mount.Interface + debugPath string + mountOptions MountOptions + lastUsed time.Time + mountIpAddress string + interfaceGroupName *string +} + +func (m *nfsMount) getMountPoint() string { + return m.mountPoint +} + +func (m *nfsMount) getRefCount() int { + return m.refCount +} + +func (m *nfsMount) getMountOptions() MountOptions { + return m.mountOptions +} + +func (m *nfsMount) getLastUsed() time.Time { + return m.lastUsed +} + +func (m *nfsMount) isInDevMode() bool { + return m.debugPath != "" +} + +func (m *nfsMount) isMounted() bool { + return PathExists(m.mountPoint) && PathIsWekaMount(context.Background(), m.mountPoint) +} + +func (m *nfsMount) incRef(ctx context.Context, apiClient *apiclient.ApiClient) error { + logger := log.Ctx(ctx) + m.lock.Lock() + defer m.lock.Unlock() + if m.refCount < 0 { + logger.Error().Str("mount_point", m.mountPoint).Int("refcount", m.refCount).Msg("During incRef negative refcount encountered") + m.refCount = 0 // to make sure that we don't have negative refcount later + } + if m.refCount == 0 { + if err := m.doMount(ctx, apiClient, m.mountOptions); err != nil { + return err + } + } else if !m.isMounted() { + logger.Warn().Str("mount_point", m.mountPoint).Int("refcount", m.refCount).Msg("Mount not exists although should!") + if err := m.doMount(ctx, apiClient, m.mountOptions); err != nil { + return err + } + + } + m.refCount++ + logger.Trace().Int("refcount", m.refCount).Strs("mount_options", m.mountOptions.Strings()).Str("filesystem_name", m.fsName).Msg("RefCount increased") + return nil +} + +func (m *nfsMount) decRef(ctx context.Context) error { + logger := log.Ctx(ctx) + m.lock.Lock() + defer m.lock.Unlock() + m.refCount-- + m.lastUsed = time.Now() + logger.Trace().Int("refcount", m.refCount).Strs("mount_options", m.mountOptions.Strings()).Str("filesystem_name", m.fsName).Msg("RefCount decreased") + if m.refCount < 0 { + logger.Error().Int("refcount", m.refCount).Msg("During decRef negative refcount encountered") + m.refCount = 0 // to make sure that we don't have negative refcount later + } + if m.refCount == 0 { + if err := m.doUnmount(ctx); err != nil { + return err + } + } + return nil +} + +func (m *nfsMount) doUnmount(ctx context.Context) error { + logger := log.Ctx(ctx).With().Str("mount_point", m.mountPoint).Str("filesystem", m.fsName).Logger() + logger.Trace().Strs("mount_options", m.mountOptions.Strings()).Msg("Performing umount via k8s native mounter") + err := m.kMounter.Unmount(m.mountPoint) + if err != nil { + logger.Error().Err(err).Msg("Failed to unmount") + } else { + logger.Trace().Msg("Unmounted successfully") + } + return err +} + +func (m *nfsMount) ensureMountIpAddress(ctx context.Context, apiClient *apiclient.ApiClient) error { + if m.mountIpAddress == "" { + ip, err := apiClient.GetNfsMountIp(ctx, m.interfaceGroupName) + if err != nil { + return err + } + m.mountIpAddress = ip + } + return nil +} + +func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient, mountOptions MountOptions) error { + logger := log.Ctx(ctx).With().Str("mount_point", m.mountPoint).Str("filesystem", m.fsName).Logger() + var mountOptionsSensitive []string + if err := os.MkdirAll(m.mountPoint, DefaultVolumePermissions); err != nil { + return err + } + if !m.isInDevMode() { + if apiClient == nil { + // this flow is relevant only for legacy volumes, will not work with SCMC + logger.Trace().Msg("No API client for mount, cannot proceed") + return errors.New("no API client for mount, cannot do NFS mount") + } + + nodeIP := apiclient.GetNodeIpAddress() + if apiClient.EnsureNfsPermissions(ctx, nodeIP, m.fsName) != nil { + logger.Error().Msg("Failed to ensure NFS permissions") + return errors.New("failed to ensure NFS permissions") + } + + if err := m.ensureMountIpAddress(ctx, apiClient); err != nil { + logger.Error().Err(err).Msg("Failed to get mount IP address") + return err + } + + mountTarget := m.mountIpAddress + ":/" + m.fsName + logger.Trace(). + Strs("mount_options", m.mountOptions.Strings()). + Str("mount_target", mountTarget). + Msg("Performing mount") + err := m.kMounter.MountSensitive(mountTarget, m.mountPoint, "nfs", mountOptions.Strings(), mountOptionsSensitive) + if err != nil { + if os.IsNotExist(err) { + logger.Error().Err(err).Msg("Mount target not found") + } else if os.IsPermission(err) { + logger.Error().Err(err).Msg("Mount failed due to permissions issue") + return err + } else if strings.Contains(err.Error(), "invalid argument") { + logger.Error().Err(err).Msg("Mount failed due to invalid argument") + return err + } else { + logger.Error().Err(err).Msg("Mount failed due to unknown issue") + } + return err + } + logger.Trace().Msg("Mounted successfully") + return nil + } else { + fakePath := filepath.Join(m.debugPath, m.fsName) + if err := os.MkdirAll(fakePath, DefaultVolumePermissions); err != nil { + Die(fmt.Sprintf("Failed to create directory %s, while running in debug mode", fakePath)) + } + logger.Trace().Strs("mount_options", m.mountOptions.Strings()).Str("debug_path", m.debugPath).Msg("Performing mount") + + return m.kMounter.Mount(fakePath, m.mountPoint, "", []string{"bind"}) + } +} diff --git a/pkg/wekafs/nfsmounter.go b/pkg/wekafs/nfsmounter.go new file mode 100644 index 000000000..d969c1989 --- /dev/null +++ b/pkg/wekafs/nfsmounter.go @@ -0,0 +1,170 @@ +package wekafs + +import ( + "context" + "github.com/rs/zerolog/log" + "github.com/wekafs/csi-wekafs/pkg/wekafs/apiclient" + "k8s.io/mount-utils" + "sync" + "time" +) + +type nfsMounter struct { + mountMap mountsMap + lock sync.Mutex + kMounter mount.Interface + debugPath string + selinuxSupport *bool + gc *innerPathVolGc + interfaceGroupName *string +} + +func (m *nfsMounter) getGarbageCollector() *innerPathVolGc { + return m.gc +} + +func newNfsMounter(driver *WekaFsDriver) *nfsMounter { + var selinuxSupport *bool + if driver.selinuxSupport { + log.Debug().Msg("SELinux support is forced") + selinuxSupport = &[]bool{true}[0] + } + mounter := &nfsMounter{mountMap: mountsMap{}, debugPath: driver.debugPath, selinuxSupport: selinuxSupport} + mounter.gc = initInnerPathVolumeGc(mounter) + mounter.schedulePeriodicMountGc() + mounter.interfaceGroupName = driver.config.interfaceGroupName + + return mounter +} + +func (m *nfsMounter) NewMount(fsName string, options MountOptions) AnyMount { + m.lock.Lock() + if m.kMounter == nil { + m.kMounter = mount.New("") + } + if _, ok := m.mountMap[fsName]; !ok { + m.mountMap[fsName] = mountsMapPerFs{} + } + if _, ok := m.mountMap[fsName][options.String()]; !ok { + uniqueId := getStringSha1AsB32(fsName + ":" + options.String()) + wMount := &nfsMount{ + kMounter: m.kMounter, + fsName: fsName, + debugPath: m.debugPath, + mountPoint: "/run/weka-fs-mounts/" + getAsciiPart(fsName, 64) + "-" + uniqueId, + mountOptions: options, + interfaceGroupName: m.interfaceGroupName, + } + m.mountMap[fsName][options.String()] = wMount + } + m.lock.Unlock() + return m.mountMap[fsName][options.String()] +} + +func (m *nfsMounter) getSelinuxStatus(ctx context.Context) bool { + if m.selinuxSupport != nil && *m.selinuxSupport { + return true + } + selinuxSupport := getSelinuxStatus(ctx) + m.selinuxSupport = &selinuxSupport + return *m.selinuxSupport +} + +func (m *nfsMounter) mountWithOptions(ctx context.Context, fsName string, mountOptions MountOptions, apiClient *apiclient.ApiClient) (string, error, UnmountFunc) { + mountOptions.setSelinux(m.getSelinuxStatus(ctx), MountProtocolNfs) + mountOptions = mountOptions.AsNfs() + mountObj := m.NewMount(fsName, mountOptions) + mountErr := mountObj.incRef(ctx, apiClient) + + if mountErr != nil { + log.Ctx(ctx).Error().Err(mountErr).Msg("Failed mounting") + return "", mountErr, func() {} + } + return mountObj.getMountPoint(), nil, func() { + if mountErr == nil { + _ = mountObj.decRef(ctx) + } + } +} + +func (m *nfsMounter) Mount(ctx context.Context, fs string, apiClient *apiclient.ApiClient) (string, error, UnmountFunc) { + return m.mountWithOptions(ctx, fs, getDefaultMountOptions(), apiClient) +} + +func (m *nfsMounter) unmountWithOptions(ctx context.Context, fsName string, options MountOptions) error { + opts := options + options.setSelinux(m.getSelinuxStatus(ctx), MountProtocolNfs) + + log.Ctx(ctx).Trace().Strs("mount_options", opts.Strings()).Str("filesystem", fsName).Msg("Received an unmount request") + if mnt, ok := m.mountMap[fsName][options.String()]; ok { + err := mnt.decRef(ctx) + if err == nil { + if m.mountMap[fsName][options.String()].getRefCount() <= 0 { + log.Ctx(ctx).Trace().Str("filesystem", fsName).Strs("mount_options", options.Strings()).Msg("This is a last use of this mount, removing from map") + delete(m.mountMap[fsName], options.String()) + } + if len(m.mountMap[fsName]) == 0 { + log.Ctx(ctx).Trace().Str("filesystem", fsName).Msg("No more mounts to filesystem, removing from map") + delete(m.mountMap, fsName) + } + } + return err + + } else { + log.Ctx(ctx).Warn().Msg("Attempted to access mount point which is not known to the system") + return nil + } +} + +func (m *nfsMounter) LogActiveMounts() { + if len(m.mountMap) > 0 { + count := 0 + for fsName := range m.mountMap { + for mnt := range m.mountMap[fsName] { + mapEntry := m.mountMap[fsName][mnt] + if mapEntry.getRefCount() > 0 { + log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.getRefCount()).Strs("mount_options", mapEntry.getMountOptions().Strings()).Msg("Mount is active") + count++ + } else { + log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.getRefCount()).Strs("mount_options", mapEntry.getMountOptions().Strings()).Msg("Mount is not active") + } + + } + } + log.Debug().Int("total", len(m.mountMap)).Int("active", count).Msg("Periodic checkup on mount map") + } +} + +func (m *nfsMounter) gcInactiveMounts() { + if len(m.mountMap) > 0 { + for fsName := range m.mountMap { + for uniqueId, wekaMount := range m.mountMap[fsName] { + if wekaMount.getRefCount() == 0 { + if wekaMount.getLastUsed().Before(time.Now().Add(-inactiveMountGcPeriod)) { + m.lock.Lock() + if wekaMount.getRefCount() == 0 { + log.Trace().Str("filesystem", fsName).Strs("mount_options", wekaMount.getMountOptions().Strings()). + Time("last_used", wekaMount.getLastUsed()).Msg("Removing stale mount from map") + delete(m.mountMap[fsName], uniqueId) + } + m.lock.Unlock() + } + } + } + if len(m.mountMap[fsName]) == 0 { + delete(m.mountMap, fsName) + } + } + } +} + +func (m *nfsMounter) schedulePeriodicMountGc() { + go func() { + log.Debug().Msg("Initializing periodic mount GC") + for true { + m.LogActiveMounts() + m.gcInactiveMounts() + time.Sleep(10 * time.Minute) + } + }() +} diff --git a/pkg/wekafs/nodeserver.go b/pkg/wekafs/nodeserver.go index 630a1ade0..179a62caa 100644 --- a/pkg/wekafs/nodeserver.go +++ b/pkg/wekafs/nodeserver.go @@ -48,7 +48,7 @@ type NodeServer struct { caps []*csi.NodeServiceCapability nodeID string maxVolumesPerNode int64 - mounter *wekaMounter + mounter AnyMounter api *ApiStore config *DriverConfig semaphores map[string]*semaphore.Weighted @@ -75,7 +75,7 @@ func (ns *NodeServer) getApiStore() *ApiStore { return ns.api } -func (ns *NodeServer) getMounter() *wekaMounter { +func (ns *NodeServer) getMounter() AnyMounter { return ns.mounter } @@ -89,7 +89,7 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, request *csi.NodeG panic("implement me") } -func NewNodeServer(nodeId string, maxVolumesPerNode int64, api *ApiStore, mounter *wekaMounter, config *DriverConfig) *NodeServer { +func NewNodeServer(nodeId string, maxVolumesPerNode int64, api *ApiStore, mounter AnyMounter, config *DriverConfig) *NodeServer { //goland:noinspection GoBoolExpressions return &NodeServer{ caps: getNodeServiceCapabilities( @@ -358,6 +358,7 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu result = "SUCCESS" return &csi.NodeUnpublishVolumeResponse{}, nil } else { + logger.Error().Err(err).Msg("Failed to check target path") return NodeUnpublishVolumeError(ctx, codes.Internal, "unexpected situation, please contact support") } diff --git a/pkg/wekafs/utilities.go b/pkg/wekafs/utilities.go index 7dd36f3ca..e8a0ea8dd 100644 --- a/pkg/wekafs/utilities.go +++ b/pkg/wekafs/utilities.go @@ -294,6 +294,10 @@ func PathIsWekaMount(ctx context.Context, path string) bool { if len(fields) >= 3 && fields[2] == "wekafs" && fields[1] == path { return true } + // TODO: better protect against false positives + if len(fields) >= 3 && strings.HasPrefix(fields[2], "nfs") && fields[1] == path { + return true + } } return false @@ -509,3 +513,29 @@ func isWekaInstalled() bool { } return false } + +func getSelinuxStatus(ctx context.Context) bool { + logger := log.Ctx(ctx) + // check if we have /etc/selinux/config + // if it exists, we can check if selinux is enforced or not + selinuxConf := "/etc/selinux/config" + file, err := os.Open(selinuxConf) + if err != nil { + return false + } + defer func() { _ = file.Close() }() + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + if strings.Contains(line, "SELINUX=enforcing") { + // no need to repeat each time, just set the selinuxSupport to true + return true + } + } + + if err := scanner.Err(); err != nil { + logger.Error().Err(err).Str("filename", selinuxConf).Msg("Failed to read SELinux config file") + } + return false +} diff --git a/pkg/wekafs/volume.go b/pkg/wekafs/volume.go index 44135f543..fe1794d25 100644 --- a/pkg/wekafs/volume.go +++ b/pkg/wekafs/volume.go @@ -88,9 +88,14 @@ func (v *Volume) initMountOptions(ctx context.Context) { func (v *Volume) pruneUnsupportedMountOptions(ctx context.Context) { logger := log.Ctx(ctx) - if v.mountOptions.hasOption(MountOptionSyncOnClose) && (v.apiClient == nil || !v.apiClient.SupportsSyncOnCloseMountOption()) { - logger.Debug().Str("mount_option", MountOptionSyncOnClose).Msg("Mount option not supported by current Weka cluster version and is dropped.") - v.mountOptions = v.mountOptions.RemoveOption(MountOptionSyncOnClose) + if v.mountOptions.hasOption(MountOptionSyncOnClose) { + if v.apiClient != nil && !v.apiClient.SupportsSyncOnCloseMountOption() { + logger.Debug().Str("mount_option", MountOptionSyncOnClose).Msg("Mount option not supported by current Weka cluster version and is dropped.") + v.mountOptions = v.mountOptions.RemoveOption(MountOptionSyncOnClose) + } else if v.apiClient == nil { + logger.Debug().Str("mount_option", MountOptionSyncOnClose).Msg("Cannot determine current Weka cluster version, dropping mount option.") + v.mountOptions = v.mountOptions.RemoveOption(MountOptionSyncOnClose) + } } if v.mountOptions.hasOption(MountOptionReadOnly) { logger.Error().Str("mount_option", MountOptionReadOnly).Msg("Mount option is not supported via custom mount options, use readOnly volume attachments instead") @@ -628,7 +633,7 @@ func (v *Volume) updateCapacityXattr(ctx context.Context, enforceCapacity *bool, func (v *Volume) Trash(ctx context.Context) error { if v.requiresGc() { - v.server.getMounter().gc.triggerGcVolume(ctx, v) + v.server.getMounter().getGarbageCollector().triggerGcVolume(ctx, v) return nil } return v.Delete(ctx) diff --git a/pkg/wekafs/wekafs.go b/pkg/wekafs/wekafs.go index 38b624574..6f1d00123 100644 --- a/pkg/wekafs/wekafs.go +++ b/pkg/wekafs/wekafs.go @@ -264,8 +264,9 @@ func NewWekaFsDriver( } func (driver *WekaFsDriver) Run() { + mounter := driver.NewMounter() + // Create GRPC servers - mounter := newWekaMounter(driver) // identity server runs always log.Info().Msg("Loading IdentityServer") @@ -322,3 +323,21 @@ func GetCsiPluginMode(mode *string) CsiPluginMode { return "" } } + +func (driver *WekaFsDriver) NewMounter() AnyMounter { + log.Info().Msg("Configuring Mounter") + if driver.config.useNfs { + log.Warn().Msg("Enforcing NFS transport due to configuration") + return newNfsMounter(driver) + } + if driver.config.allowNfsFailback && !isWekaInstalled() { + if driver.config.isInDevMode() { + log.Info().Msg("Not Enforcing NFS transport due to dev mode") + } else { + log.Warn().Msg("Weka Driver not found. Failing back to NFS transport") + return newNfsMounter(driver) + } + } + log.Info().Msg("Enforcing WekaFS transport") + return newWekafsMounter(driver) +} diff --git a/pkg/wekafs/mount.go b/pkg/wekafs/wekafsmount.go similarity index 83% rename from pkg/wekafs/mount.go rename to pkg/wekafs/wekafsmount.go index cf02f2d55..82b570ccd 100644 --- a/pkg/wekafs/mount.go +++ b/pkg/wekafs/wekafsmount.go @@ -13,7 +13,7 @@ import ( "time" ) -type wekaMount struct { +type wekafsMount struct { fsName string mountPoint string refCount int @@ -25,15 +25,30 @@ type wekaMount struct { allowProtocolContainers bool } -func (m *wekaMount) isInDevMode() bool { +func (m *wekafsMount) getMountPoint() string { + return m.mountPoint +} + +func (m *wekafsMount) getRefCount() int { + return m.refCount +} + +func (m *wekafsMount) getMountOptions() MountOptions { + return m.mountOptions +} +func (m *wekafsMount) getLastUsed() time.Time { + return m.lastUsed +} + +func (m *wekafsMount) isInDevMode() bool { return m.debugPath != "" } -func (m *wekaMount) isMounted() bool { +func (m *wekafsMount) isMounted() bool { return PathExists(m.mountPoint) && PathIsWekaMount(context.Background(), m.mountPoint) } -func (m *wekaMount) incRef(ctx context.Context, apiClient *apiclient.ApiClient) error { +func (m *wekafsMount) incRef(ctx context.Context, apiClient *apiclient.ApiClient) error { logger := log.Ctx(ctx) m.lock.Lock() defer m.lock.Unlock() @@ -57,7 +72,7 @@ func (m *wekaMount) incRef(ctx context.Context, apiClient *apiclient.ApiClient) return nil } -func (m *wekaMount) decRef(ctx context.Context) error { +func (m *wekafsMount) decRef(ctx context.Context) error { logger := log.Ctx(ctx) m.lock.Lock() defer m.lock.Unlock() @@ -76,7 +91,7 @@ func (m *wekaMount) decRef(ctx context.Context) error { return nil } -func (m *wekaMount) doUnmount(ctx context.Context) error { +func (m *wekafsMount) doUnmount(ctx context.Context) error { logger := log.Ctx(ctx).With().Str("mount_point", m.mountPoint).Str("filesystem", m.fsName).Logger() logger.Trace().Strs("mount_options", m.mountOptions.Strings()).Msg("Performing umount via k8s native mounter") err := m.kMounter.Unmount(m.mountPoint) @@ -88,7 +103,7 @@ func (m *wekaMount) doUnmount(ctx context.Context) error { return err } -func (m *wekaMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient, mountOptions MountOptions) error { +func (m *wekafsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient, mountOptions MountOptions) error { logger := log.Ctx(ctx).With().Str("mount_point", m.mountPoint).Str("filesystem", m.fsName).Logger() mountToken := "" var mountOptionsSensitive []string @@ -119,10 +134,10 @@ func (m *wekaMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient, // if needed, add containerName to the mount string if apiClient != nil && len(containerPaths) > 1 { + localContainerName = apiClient.Credentials.LocalContainerName if apiClient.SupportsMultipleClusters() { - localContainerName = apiClient.Credentials.LocalContainerName if localContainerName != "" { - logger.Info().Str("local_container_name", localContainerName).Msg("Local container name set by secret") + logger.Info().Str("local_container_name", localContainerName).Msg("Local container name set by secrets") } else { container, err := apiClient.GetLocalContainer(ctx, m.allowProtocolContainers) if err != nil || container == nil { @@ -140,14 +155,13 @@ func (m *wekaMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient, option: "container_name", value: localContainerName, } + break } } } else { - err = errors.New("mount failed, local container name not specified and could not be determined automatically, refer to documentation on handling multiple clusters clients with Kubernetes") - logger.Error().Err(err).Msg("Failed to mount") - return err + logger.Error().Err(errors.New("Could not determine container name, refer to documentation on handling multiple clusters clients with Kubernetes")).Msg("Failed to mount") } } } diff --git a/pkg/wekafs/mounter.go b/pkg/wekafs/wekafsmounter.go similarity index 56% rename from pkg/wekafs/mounter.go rename to pkg/wekafs/wekafsmounter.go index 3d76ad700..615d9d8f6 100644 --- a/pkg/wekafs/mounter.go +++ b/pkg/wekafs/wekafsmounter.go @@ -1,13 +1,10 @@ package wekafs import ( - "bufio" "context" "github.com/rs/zerolog/log" "github.com/wekafs/csi-wekafs/pkg/wekafs/apiclient" "k8s.io/mount-utils" - "os" - "strings" "sync" "time" ) @@ -16,28 +13,34 @@ const ( inactiveMountGcPeriod = time.Minute * 10 ) -type mountsMapPerFs map[string]*wekaMount -type mountsMap map[string]mountsMapPerFs - -type wekaMounter struct { +type wekafsMounter struct { mountMap mountsMap lock sync.Mutex kMounter mount.Interface debugPath string - selinuxSupport bool + selinuxSupport *bool gc *innerPathVolGc allowProtocolContainers bool } -func newWekaMounter(driver *WekaFsDriver) *wekaMounter { - mounter := &wekaMounter{mountMap: mountsMap{}, debugPath: driver.debugPath, selinuxSupport: driver.selinuxSupport, allowProtocolContainers: driver.config.allowProtocolContainers} +func (m *wekafsMounter) getGarbageCollector() *innerPathVolGc { + return m.gc +} + +func newWekafsMounter(driver *WekaFsDriver) *wekafsMounter { + var selinuxSupport *bool + if driver.selinuxSupport { + log.Debug().Msg("SELinux support is forced") + selinuxSupport = &[]bool{true}[0] + } + mounter := &wekafsMounter{mountMap: mountsMap{}, debugPath: driver.debugPath, selinuxSupport: selinuxSupport} mounter.gc = initInnerPathVolumeGc(mounter) mounter.schedulePeriodicMountGc() return mounter } -func (m *wekaMounter) NewMount(fsName string, options MountOptions) *wekaMount { +func (m *wekafsMounter) NewMount(fsName string, options MountOptions) AnyMount { m.lock.Lock() if m.kMounter == nil { m.kMounter = mount.New("") @@ -47,7 +50,7 @@ func (m *wekaMounter) NewMount(fsName string, options MountOptions) *wekaMount { } if _, ok := m.mountMap[fsName][options.AsMapKey()]; !ok { uniqueId := getStringSha1AsB32(fsName + ":" + options.String()) - wMount := &wekaMount{ + wMount := &wekafsMount{ kMounter: m.kMounter, fsName: fsName, debugPath: m.debugPath, @@ -61,41 +64,17 @@ func (m *wekaMounter) NewMount(fsName string, options MountOptions) *wekaMount { return m.mountMap[fsName][options.AsMapKey()] } -type UnmountFunc func() - -func (m *wekaMounter) getSelinuxStatus(ctx context.Context) bool { - logger := log.Ctx(ctx) - if m.selinuxSupport { - logger.Trace().Msg("SELinux support is forced") +func (m *wekafsMounter) getSelinuxStatus(ctx context.Context) bool { + if m.selinuxSupport != nil && *m.selinuxSupport { return true } - // check if we have /etc/selinux/config - // if it exists, we can check if selinux is enforced or not - selinuxConf := "/etc/selinux/config" - file, err := os.Open(selinuxConf) - if err != nil { - return false - } - defer func() { _ = file.Close() }() - - scanner := bufio.NewScanner(file) - for scanner.Scan() { - line := scanner.Text() - if strings.Contains(line, "SELINUX=enforcing") { - // no need to repeat each time, just set the selinuxSupport to true - m.selinuxSupport = true - return true - } - } - - if err := scanner.Err(); err != nil { - logger.Error().Err(err).Str("filename", selinuxConf).Msg("Failed to read SELinux config file") - } - return false + selinuxSupport := getSelinuxStatus(ctx) + m.selinuxSupport = &selinuxSupport + return *m.selinuxSupport } -func (m *wekaMounter) mountWithOptions(ctx context.Context, fsName string, mountOptions MountOptions, apiClient *apiclient.ApiClient) (string, error, UnmountFunc) { - mountOptions.setSelinux(m.getSelinuxStatus(ctx)) +func (m *wekafsMounter) mountWithOptions(ctx context.Context, fsName string, mountOptions MountOptions, apiClient *apiclient.ApiClient) (string, error, UnmountFunc) { + mountOptions.setSelinux(m.getSelinuxStatus(ctx), MountProtocolWekafs) mountObj := m.NewMount(fsName, mountOptions) mountErr := mountObj.incRef(ctx, apiClient) @@ -103,26 +82,26 @@ func (m *wekaMounter) mountWithOptions(ctx context.Context, fsName string, mount log.Ctx(ctx).Error().Err(mountErr).Msg("Failed mounting") return "", mountErr, func() {} } - return mountObj.mountPoint, nil, func() { + return mountObj.getMountPoint(), nil, func() { if mountErr == nil { _ = mountObj.decRef(ctx) } } } -func (m *wekaMounter) Mount(ctx context.Context, fs string, apiClient *apiclient.ApiClient) (string, error, UnmountFunc) { +func (m *wekafsMounter) Mount(ctx context.Context, fs string, apiClient *apiclient.ApiClient) (string, error, UnmountFunc) { return m.mountWithOptions(ctx, fs, getDefaultMountOptions(), apiClient) } -func (m *wekaMounter) unmountWithOptions(ctx context.Context, fsName string, options MountOptions) error { +func (m *wekafsMounter) unmountWithOptions(ctx context.Context, fsName string, options MountOptions) error { opts := options - options.setSelinux(m.getSelinuxStatus(ctx)) + options.setSelinux(m.getSelinuxStatus(ctx), MountProtocolWekafs) log.Ctx(ctx).Trace().Strs("mount_options", opts.Strings()).Str("filesystem", fsName).Msg("Received an unmount request") if mnt, ok := m.mountMap[fsName][options.AsMapKey()]; ok { err := mnt.decRef(ctx) if err == nil { - if m.mountMap[fsName][options.AsMapKey()].refCount <= 0 { + if m.mountMap[fsName][options.AsMapKey()].getRefCount() <= 0 { log.Ctx(ctx).Trace().Str("filesystem", fsName).Strs("mount_options", options.Strings()).Msg("This is a last use of this mount, removing from map") delete(m.mountMap[fsName], options.String()) } @@ -139,17 +118,17 @@ func (m *wekaMounter) unmountWithOptions(ctx context.Context, fsName string, opt } } -func (m *wekaMounter) LogActiveMounts() { +func (m *wekafsMounter) LogActiveMounts() { if len(m.mountMap) > 0 { count := 0 for fsName := range m.mountMap { for mnt := range m.mountMap[fsName] { mapEntry := m.mountMap[fsName][mnt] - if mapEntry.refCount > 0 { - log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.refCount).Strs("mount_options", mapEntry.mountOptions.Strings()).Msg("Mount is active") + if mapEntry.getRefCount() > 0 { + log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.getRefCount()).Strs("mount_options", mapEntry.getMountOptions().Strings()).Msg("Mount is active") count++ } else { - log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.refCount).Strs("mount_options", mapEntry.mountOptions.Strings()).Msg("Mount is not active") + log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.getRefCount()).Strs("mount_options", mapEntry.getMountOptions().Strings()).Msg("Mount is not active") } } @@ -158,16 +137,16 @@ func (m *wekaMounter) LogActiveMounts() { } } -func (m *wekaMounter) gcInactiveMounts() { +func (m *wekafsMounter) gcInactiveMounts() { if len(m.mountMap) > 0 { for fsName := range m.mountMap { for uniqueId, wekaMount := range m.mountMap[fsName] { - if wekaMount.refCount == 0 { - if wekaMount.lastUsed.Before(time.Now().Add(-inactiveMountGcPeriod)) { + if wekaMount.getRefCount() == 0 { + if wekaMount.getLastUsed().Before(time.Now().Add(-inactiveMountGcPeriod)) { m.lock.Lock() - if wekaMount.refCount == 0 { - log.Trace().Str("filesystem", fsName).Strs("mount_options", wekaMount.mountOptions.Strings()). - Time("last_used", wekaMount.lastUsed).Msg("Removing stale mount from map") + if wekaMount.getRefCount() == 0 { + log.Trace().Str("filesystem", fsName).Strs("mount_options", wekaMount.getMountOptions().Strings()). + Time("last_used", wekaMount.getLastUsed()).Msg("Removing stale mount from map") delete(m.mountMap[fsName], uniqueId) } m.lock.Unlock() @@ -181,7 +160,7 @@ func (m *wekaMounter) gcInactiveMounts() { } } -func (m *wekaMounter) schedulePeriodicMountGc() { +func (m *wekafsMounter) schedulePeriodicMountGc() { go func() { log.Debug().Msg("Initializing periodic mount GC") for true {