Skip to content

Commit

Permalink
Lots of rework
Browse files Browse the repository at this point in the history
  • Loading branch information
majst01 committed Feb 26, 2021
1 parent bfab45f commit 36f8c66
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 165 deletions.
10 changes: 9 additions & 1 deletion deploy/kubernetes/csi-driver-s3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ spec:
mountPath: /registration/
- name: csi-driver-s3
securityContext:
runAsUser: 0
privileged: true
capabilities:
add: ["SYS_ADMIN"]
allowPrivilegeEscalation: true
image: majst01/csi-driver-s3:v0.3.1
image: majst01/csi-driver-s3:v0.3.2
args:
- "--endpoint=$(CSI_ENDPOINT)"
- "--nodeid=$(NODE_ID)"
Expand All @@ -100,6 +101,9 @@ spec:
- name: pods-mount-dir
mountPath: /var/lib/kubelet/pods
mountPropagation: "Bidirectional"
- name: plugins-mount-dir
mountPath: /var/lib/kubelet/plugins
mountPropagation: "Bidirectional"
- name: fuse-device
mountPath: /dev/fuse
volumes:
Expand All @@ -115,6 +119,10 @@ spec:
hostPath:
path: /var/lib/kubelet/pods
type: Directory
- name: plugins-mount-dir
hostPath:
path: /var/lib/kubelet/plugins
type: Directory
- name: fuse-device
hostPath:
path: /dev/fuse
2 changes: 1 addition & 1 deletion deploy/kubernetes/provisioner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ spec:
- name: socket-dir
mountPath: /var/lib/kubelet/plugins/s3.csi.metal-stack.io
- name: csi-driver-s3
image: majst01/csi-driver-s3:v0.3.1
image: majst01/csi-driver-s3:v0.3.2
args:
- "--endpoint=$(CSI_ENDPOINT)"
- "--nodeid=$(NODE_ID)"
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/minio/minio-go/v7 v7.0.10
github.com/onsi/ginkgo v1.15.0
github.com/onsi/gomega v1.10.5
golang.org/x/net v0.0.0-20210222171744-9060382bd457
google.golang.org/grpc v1.35.0
golang.org/x/net v0.0.0-20210224082022-3d97a244fca7
google.golang.org/grpc v1.36.0
k8s.io/klog/v2 v2.5.0
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/Lt
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210222171744-9060382bd457 h1:hMm9lBjyNLe/c9C6bElQxp4wsrleaJn1vXMZIQkNN44=
golang.org/x/net v0.0.0-20210222171744-9060382bd457/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210224082022-3d97a244fca7 h1:OgUuv8lsRpBibGNbSizVwKWlysjaNzmC9gYMhPVfqFM=
golang.org/x/net v0.0.0-20210224082022-3d97a244fca7/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -418,6 +420,8 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8
google.golang.org/grpc v1.29.0/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.35.0 h1:TwIQcH3es+MojMVojxxfQ3l3OF2KzlRxML2xZq0kRo8=
google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.36.0 h1:o1bcQ6imQMIOpdrO3SWf2z5RV72WbDwdXuK0MDlc8As=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
85 changes: 50 additions & 35 deletions pkg/s3/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,45 +57,12 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}

capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes())
params := req.GetParameters()
mounter := params[mounterTypeKey]

klog.Infof("Got a request to create volume %s", volumeID)

s3, err := newS3ClientFromSecrets(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %w", err)
}
exists, err := s3.bucketExists(volumeID)
err := ensureBucketWithMetadata(volumeID, req.GetSecrets(), capacityBytes)
if err != nil {
return nil, fmt.Errorf("failed to check if bucket %s exists: %w", volumeID, err)
}
if exists {
var b *bucket
b, err = s3.getBucket(volumeID)
if err != nil {
return nil, fmt.Errorf("failed to get bucket metadata of bucket %s: %w", volumeID, err)
}
// Check if volume capacity requested is bigger than the already existing capacity
if capacityBytes > b.CapacityBytes {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID))
}
} else {
if err = s3.createBucket(volumeID); err != nil {
return nil, fmt.Errorf("failed to create volume %s: %w", volumeID, err)
}
if err = s3.createPrefix(volumeID, fsPrefix); err != nil {
return nil, fmt.Errorf("failed to create prefix %s: %w", fsPrefix, err)
}
b := &bucket{
Name: volumeID,
Mounter: mounter,
CapacityBytes: capacityBytes,
FSPath: fsPrefix,
}
if err := s3.setBucket(b); err != nil {
return nil, fmt.Errorf("Error setting bucket metadata: %w", err)
}
return nil, status.Errorf(codes.Internal, "cannot create backup and metadata:%v", err)
}
klog.Infof("create volume %s", volumeID)
return &csi.CreateVolumeResponse{
Expand Down Expand Up @@ -199,3 +166,51 @@ func sanitizeVolumeID(volumeID string) string {
}
return volumeID
}

func ensureBucketWithMetadata(volumeID string, secrets map[string]string, capacityBytes int64) error {
s3, err := newS3ClientFromSecrets(secrets)
if err != nil {
return fmt.Errorf("failed to initialize S3 client: %w", err)
}
exists, err := s3.bucketExists(volumeID)
if err != nil {
return fmt.Errorf("failed to check if bucket %s exists: %w", volumeID, err)
}
if exists {
var meta *metadata
if !s3.metadataExist(volumeID) {
b := &metadata{
Name: volumeID,
CapacityBytes: capacityBytes,
FSPath: fsPrefix,
}
if err := s3.writeMetadata(b); err != nil {
return fmt.Errorf("Error setting volume metadata: %w", err)
}
}
meta, err = s3.getMetadata(volumeID)
if err != nil {
return fmt.Errorf("failed to get metadata of volume %s: %w", volumeID, err)
}
// Check if volume capacity requested is bigger than the already existing capacity
if capacityBytes > meta.CapacityBytes {
return status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID))
}
} else {
if err = s3.createBucket(volumeID); err != nil {
return fmt.Errorf("failed to create bucket for volume %s: %w", volumeID, err)
}
if err = s3.createPrefix(volumeID, fsPrefix); err != nil {
return fmt.Errorf("failed to create prefix %s for volume %s: %w", fsPrefix, volumeID, err)
}
meta := &metadata{
Name: volumeID,
CapacityBytes: capacityBytes,
FSPath: fsPrefix,
}
if err := s3.writeMetadata(meta); err != nil {
return fmt.Errorf("Error setting volume metadata: %w", err)
}
}
return nil
}
90 changes: 75 additions & 15 deletions pkg/s3/mounter.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,89 @@
package s3

// Mounter interface which can be implemented
// by the different mounter types
import (
"fmt"
"os"
"os/exec"

"k8s.io/klog/v2"
)

// Mounter interface
type Mounter interface {
Stage(stagePath string) error
Unstage(stagePath string) error
Mount(source string, target string) error
}

// newMounter returns a new mounter
func newMounter(meta *metadata, cfg *Config) Mounter {
return &s3fsMounter{
metadata: meta,
url: cfg.Endpoint,
region: cfg.Region,
pwFileContent: cfg.AccessKeyID + ":" + cfg.SecretAccessKey,
}
}

// Implements Mounter
type s3fsMounter struct {
metadata *metadata
url string
region string
pwFileContent string
}

const (
s3fsMounterType = "s3fs"
mounterTypeKey = "mounter"
s3fsCmd = "s3fs"
)

// newMounter returns a new mounter depending on the mounterType parameter
func newMounter(bucket *bucket, cfg *Config) (Mounter, error) {
mounter := bucket.Mounter
// Fall back to mounterType in cfg
if len(bucket.Mounter) == 0 {
mounter = cfg.Mounter
func (s3fs *s3fsMounter) Stage(stageTarget string) error {
return nil
}

func (s3fs *s3fsMounter) Unstage(stageTarget string) error {
return nil
}

func (s3fs *s3fsMounter) Mount(source string, target string) error {
if err := writes3fsPass(s3fs.pwFileContent); err != nil {
return err
}
args := []string{
fmt.Sprintf("%s:/%s", s3fs.metadata.Name, s3fs.metadata.FSPath),
target,
"-o", "use_path_request_style",
"-o", fmt.Sprintf("url=%s", s3fs.url),
"-o", fmt.Sprintf("endpoint=%s", s3fs.region),
"-o", "allow_other",
"-o", "mp_umask=000",
}
return fuseMount(s3fsCmd, args)
}

func writes3fsPass(pwFileContent string) error {
pwFileName := fmt.Sprintf("%s/.passwd-s3fs", os.Getenv("HOME"))
pwFile, err := os.OpenFile(pwFileName, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
switch mounter {
case s3fsMounterType:
return newS3fsMounter(bucket, cfg)
default:
return newS3fsMounter(bucket, cfg)
_, err = pwFile.WriteString(pwFileContent)
if err != nil {
return err
}
pwFile.Close()
return nil
}

func fuseMount(command string, args []string) error {
cmd := exec.Command(command, args...)
klog.Infof("mounting fuse with command:%s with args:%s", command, args)

out, err := cmd.CombinedOutput()
if err != nil {
klog.Errorf("mounting fuse with command:%s with args:%s error:%s", command, args, string(out))
return fmt.Errorf("fuseMount command:%s with args:%s error:%s", command, args, string(out))
}

return nil
}
81 changes: 0 additions & 81 deletions pkg/s3/mounter_s3fs.go

This file was deleted.

Loading

0 comments on commit 36f8c66

Please sign in to comment.