From 50a77d4b915d04421973ba60036509751de46dcb Mon Sep 17 00:00:00 2001 From: Hamdy Khader Date: Tue, 22 Aug 2023 04:54:30 +0300 Subject: [PATCH] upload --- deploy/kubernetes/config-map.yaml | 4 +- deploy/kubernetes/secret.yaml | 2 +- pkg/util/jsonrpc.go | 128 ++++++++++-- pkg/util/nvmf.go | 316 +++++++----------------------- 4 files changed, 191 insertions(+), 259 deletions(-) diff --git a/deploy/kubernetes/config-map.yaml b/deploy/kubernetes/config-map.yaml index 85a82d4..fb4e282 100644 --- a/deploy/kubernetes/config-map.yaml +++ b/deploy/kubernetes/config-map.yaml @@ -14,8 +14,8 @@ data: { "nodes": [ { - "name": "localhost", - "rpcURL": "http://127.0.0.1:9009", + "name": "pool1", + "rpcURL": "http://127.0.0.1", "targetType": "nvme-tcp", "targetAddr": "127.0.0.1" } diff --git a/deploy/kubernetes/secret.yaml b/deploy/kubernetes/secret.yaml index 008d001..b86ee51 100644 --- a/deploy/kubernetes/secret.yaml +++ b/deploy/kubernetes/secret.yaml @@ -17,7 +17,7 @@ stringData: { "rpcTokens": [ { - "name": "localhost", + "name": "pool1", "username": "spdkcsiuser", "password": "spdkcsipass" } diff --git a/pkg/util/jsonrpc.go b/pkg/util/jsonrpc.go index 4546ad6..bc6d409 100644 --- a/pkg/util/jsonrpc.go +++ b/pkg/util/jsonrpc.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "net/http" + "strconv" "strings" "sync/atomic" "time" @@ -162,7 +163,7 @@ func (client *rpcClient) lvStores() ([]LvStore, error) { UUID string `json:"uuid"` } - err := client.call("bdev_lvol_get_lvstores", nil, &result) + err := client.callSBCLI("GET", "csi/get_pools", nil, &result) if err != nil { return nil, err } @@ -195,8 +196,7 @@ func (client *rpcClient) createVolume(lvolName, lvsName string, sizeMiB int64) ( } var lvolID string - - err := client.call("bdev_lvol_create", ¶ms, &lvolID) + err := client.callSBCLI("POST", "csi/create_volume", ¶ms, &lvolID) if errorMatches(err, ErrJSONNoSpaceLeft) { err = ErrJSONNoSpaceLeft // may happen in concurrency } @@ -204,20 +204,56 @@ func (client *rpcClient) createVolume(lvolName, lvsName string, sizeMiB int64) ( return lvolID, err } -// get a volume and return a BDev +// get a volume and return a BDev,, lvsName/lvolName func (client *rpcClient) getVolume(lvolID string) (*BDev, error) { var result []BDev + err := client.callSBCLI("GET", + fmt.Sprintf("csi/get_volume_info/%s", lvolID), nil, &result) - params := struct { - Name string `json:"name"` - }{ - Name: lvolID, + if errorMatches(err, ErrJSONNoSuchDevice) { + return nil, ErrJSONNoSuchDevice + } + if err != nil { + return nil, err + } + return &result[0], err +} + +// get a volume and return a BDev +func (client *rpcClient) getVolumeInfo(lvolID string) (map[string]string, error) { + var result []struct { + Name string `json:"name"` + UUID string `json:"uuid"` + BlockSize int64 `json:"block_size"` + NumBlocks int64 `json:"num_blocks"` + PoolID string `json:"pool_id"` + TargetType string `json:"targetType"` + TargetAddr string `json:"targetAddr"` + TargetPort string `json:"targetPort"` + Nqn string `json:"nqn"` + Model string `json:"model"` + LvolSize string `json:"lvolSize"` } - err := client.call("bdev_get_bdevs", ¶ms, &result) + + err := client.callSBCLI("GET", + fmt.Sprintf("csi/get_volume_info/%s", lvolID), nil, &result) + if errorMatches(err, ErrJSONNoSuchDevice) { return nil, ErrJSONNoSuchDevice } - return &result[0], nil + r := &result[0] + return map[string]string{ + "name": r.Name, + "uuid": r.UUID, + "pool_id": r.PoolID, + + "targetType": r.TargetType, + "targetAddr": r.TargetAddr, + "targetPort": r.TargetPort, + "nqn": r.Nqn, + "model": r.Model, + "lvolSize": strconv.FormatInt(r.BlockSize*r.NumBlocks, 10), + }, nil } func (client *rpcClient) isVolumeCreated(lvolID string) (bool, error) { @@ -232,13 +268,9 @@ func (client *rpcClient) isVolumeCreated(lvolID string) (bool, error) { } func (client *rpcClient) deleteVolume(lvolID string) error { - params := struct { - Name string `json:"name"` - }{ - Name: lvolID, - } + err := client.callSBCLI("DELETE", + fmt.Sprintf("csi/delete_lvol/%s", lvolID), nil, nil) - err := client.call("bdev_lvol_delete", ¶ms, nil) if errorMatches(err, ErrJSONNoSuchDevice) { err = ErrJSONNoSuchDevice // may happen in concurrency } @@ -360,6 +392,70 @@ func (client *rpcClient) call(method string, args, result interface{}) error { return nil } +func (client *rpcClient) callSBCLI(method string, path string, args, result interface{}) error { + //type rpcRequest struct { + // Ver string `json:"jsonrpc"` + // ID int32 `json:"id"` + // Method string `json:"method"` + //} + // + //id := atomic.AddInt32(&client.rpcID, 1) + //request := rpcRequest{ + // Ver: "2.0", + // ID: id, + // Method: method, + //} + + var data = []byte(`{}`) + var err error + + if args != nil { + data, err = json.Marshal(args) + if err != nil { + return fmt.Errorf("%s: %w", method, err) + } + } + + requestURL := fmt.Sprintf("%s/%s", client.rpcURL, path) + req, err := http.NewRequest(method, requestURL, bytes.NewReader(data)) + if err != nil { + return fmt.Errorf("%s: %w", method, err) + } + + //req.SetBasicAuth(client.rpcUser, client.rpcPass) + req.Header.Set("Content-Type", "application/json") + + resp, err := client.httpClient.Do(req) + if err != nil { + return fmt.Errorf("%s: %w", method, err) + } + + defer resp.Body.Close() + if resp.StatusCode >= http.StatusBadRequest { + return fmt.Errorf("%s: HTTP error code: %d", method, resp.StatusCode) + } + + response := struct { + Error struct { + Code int `json:"code"` + Message string `json:"message"` + } `json:"error"` + Result interface{} `json:"result"` + }{ + Result: result, + } + + err = json.NewDecoder(resp.Body).Decode(&response) + if err != nil { + return fmt.Errorf("%s: %w", method, err) + } + if response.Error.Code != 0 { + return fmt.Errorf("%s: json response error: %s", method, response.Error.Message) + } + + return nil +} + func errorMatches(errFull, errJSON error) bool { if errFull == nil { return false diff --git a/pkg/util/nvmf.go b/pkg/util/nvmf.go index b9df58c..32b869e 100644 --- a/pkg/util/nvmf.go +++ b/pkg/util/nvmf.go @@ -18,20 +18,15 @@ package util import ( "fmt" - "strconv" - "strings" - "sync/atomic" - "k8s.io/klog" ) type nodeNVMf struct { client *rpcClient - targetType string // RDMA, TCP - targetAddr string - targetPort string - transCreated int32 + targetType string // RDMA, TCP + targetAddr string + targetPort string } func newNVMf(client *rpcClient, targetType, targetAddr string) *nodeNVMf { @@ -54,19 +49,12 @@ func (node *nodeNVMf) LvStores() ([]LvStore, error) { // VolumeInfo returns a string:string map containing information necessary // for CSI node(initiator) to connect to this target and identify the disk. func (node *nodeNVMf) VolumeInfo(lvolID string) (map[string]string, error) { - lvol, err := node.client.getVolume(lvolID) + lvol, err := node.client.getVolumeInfo(lvolID) if err != nil { return nil, err } - return map[string]string{ - "targetType": node.targetType, - "targetAddr": node.targetAddr, - "targetPort": node.targetPort, - "nqn": node.getVolumeNqn(lvolID), - "model": node.getVolumeModel(lvolID), - "lvolSize": strconv.FormatInt(lvol.BlockSize*lvol.NumBlocks, 10), - }, nil + return lvol, nil } // CreateVolume creates a logical volume and returns volume ID @@ -129,63 +117,58 @@ func (node *nodeNVMf) DeleteVolume(lvolID string) error { // PublishVolume exports a volume through NVMf target func (node *nodeNVMf) PublishVolume(lvolID string) error { - exists, err := node.isVolumeCreated(lvolID) - if err != nil { - return err - } - if !exists { - return ErrVolumeDeleted - } - published, err := node.isVolumePublished(lvolID) - if err != nil { - return err - } - if published { - return nil - } - - err = node.createTransport() - if err != nil { - return err - } - - err = node.createSubsystem(lvolID) - if err != nil { - return err - } - - _, err = node.subsystemAddNs(lvolID) - if err != nil { - node.deleteSubsystem(lvolID) //nolint:errcheck // we can do few - return err - } - - err = node.subsystemAddListener(lvolID) + var isPublished bool + //err := node.client.call("nvmf_subsystem_get_listeners", ¶ms, &result) + err := node.client.callSBCLI("GET", "csi/publish_volume/"+lvolID, nil, &isPublished) if err != nil { - node.subsystemRemoveNs(lvolID) //nolint:errcheck // ditto - node.deleteSubsystem(lvolID) //nolint:errcheck // ditto return err } + //exists, err := node.isVolumeCreated(lvolID) + //if err != nil { + // return err + //} + //if !exists { + // return ErrVolumeDeleted + //} + //published, err := node.isVolumePublished(lvolID) + //if err != nil { + // return err + //} + //if published { + // return nil + //} + // + //err = node.createTransport() + //if err != nil { + // return err + //} + // + //err = node.createSubsystem(lvolID) + //if err != nil { + // return err + //} + // + //_, err = node.subsystemAddNs(lvolID) + //if err != nil { + // node.deleteSubsystem(lvolID) //nolint:errcheck // we can do few + // return err + //} + // + //err = node.subsystemAddListener(lvolID) + //if err != nil { + // node.subsystemRemoveNs(lvolID) //nolint:errcheck // ditto + // node.deleteSubsystem(lvolID) //nolint:errcheck // ditto + // return err + //} klog.V(5).Infof("volume published: %s", lvolID) return nil } func (node *nodeNVMf) isVolumePublished(lvolID string) (bool, error) { - var result []struct { - Address struct { - TrType string `json:"trtype"` - AdrFam string `json:"adrfam"` - TrAddr string `json:"traddr"` - TrSvcID string `json:"trsvcid"` - } `json:"address"` - } - params := struct { - Nqn string `json:"nqn"` - }{ - Nqn: node.getVolumeNqn(lvolID), - } - err := node.client.call("nvmf_subsystem_get_listeners", ¶ms, &result) + var isPublished bool + //err := node.client.call("nvmf_subsystem_get_listeners", ¶ms, &result) + err := node.client.callSBCLI("GET", "csi/is_volume_published/"+lvolID, nil, &isPublished) if err != nil { // querying nqn that does not exist, an invalid parameters error will be thrown if errorMatches(err, ErrInvalidParameters) { @@ -193,187 +176,40 @@ func (node *nodeNVMf) isVolumePublished(lvolID string) (bool, error) { } return false, err } - for i := range result { - if result[i].Address.TrType == node.targetType && - result[i].Address.TrAddr == node.targetAddr && - result[i].Address.TrSvcID == node.targetPort && - result[i].Address.AdrFam == cfgAddrFamily { - return true, nil - } - } - return false, nil + return isPublished, nil } func (node *nodeNVMf) UnpublishVolume(lvolID string) error { - exists, err := node.isVolumeCreated(lvolID) - if err != nil { - return err - } - if !exists { - return ErrVolumeDeleted - } - published, err := node.isVolumePublished(lvolID) - if err != nil { - return err - } - if !published { - // already unpublished - return nil - } - err = node.subsystemRemoveNs(lvolID) - if err != nil { - // we should try deleting subsystem even if we fail here - klog.Errorf("failed to remove namespace(nqn=%s): %s", node.getVolumeNqn(lvolID), err) - } - err = node.deleteSubsystem(lvolID) + var isPublished bool + //err := node.client.call("nvmf_subsystem_get_listeners", ¶ms, &result) + err := node.client.callSBCLI("GET", "csi/unpublish_volume/"+lvolID, nil, &isPublished) if err != nil { return err } + //exists, err := node.isVolumeCreated(lvolID) + //if err != nil { + // return err + //} + //if !exists { + // return ErrVolumeDeleted + //} + //published, err := node.isVolumePublished(lvolID) + //if err != nil { + // return err + //} + //if !published { + // // already unpublished + // return nil + //} + //err = node.subsystemRemoveNs(lvolID) + //if err != nil { + // // we should try deleting subsystem even if we fail here + // klog.Errorf("failed to remove namespace(nqn=%s): %s", node.getVolumeNqn(lvolID), err) + //} + //err = node.deleteSubsystem(lvolID) + //if err != nil { + // return err + //} klog.V(5).Infof("volume unpublished: %s", lvolID) return nil } - -func (node *nodeNVMf) getVolumeModel(lvolID string) string { - return lvolID -} - -func (node *nodeNVMf) getVolumeNqn(lvolID string) string { - return "nqn.2020-04.io.spdk.csi:uuid:" + node.getVolumeModel(lvolID) -} - -func (node *nodeNVMf) createSubsystem(lvolID string) error { - params := struct { - Nqn string `json:"nqn"` - AllowAnyHost bool `json:"allow_any_host"` - SerialNumber string `json:"serial_number"` - ModelNumber string `json:"model_number"` - }{ - Nqn: node.getVolumeNqn(lvolID), - AllowAnyHost: cfgAllowAnyHost, - SerialNumber: "spdkcsi-sn", - ModelNumber: node.getVolumeModel(lvolID), // client matches imported disk with model string - } - - return node.client.call("nvmf_create_subsystem", ¶ms, nil) -} - -func (node *nodeNVMf) subsystemAddNs(lvolID string) (int, error) { - type namespace struct { - BdevName string `json:"bdev_name"` - } - - params := struct { - Nqn string `json:"nqn"` - Namespace namespace `json:"namespace"` - }{ - Nqn: node.getVolumeNqn(lvolID), - Namespace: namespace{ - BdevName: lvolID, - }, - } - var nsID int - err := node.client.call("nvmf_subsystem_add_ns", ¶ms, &nsID) - return nsID, err -} - -func (node *nodeNVMf) subsystemGetNsID(lvolID string) (int, error) { - var results []struct { - Nqn string `json:"nqn"` - Namespace []struct { - NSID int `json:"nsid"` - BdevName string `json:"bdev_name"` - } `json:"namespaces"` - } - err := node.client.call("nvmf_get_subsystems", nil, &results) - if err != nil { - return 0, err - } - nqn := node.getVolumeNqn(lvolID) - for i := range results { - result := &results[i] - if result.Nqn == nqn { - for i := range result.Namespace { - if result.Namespace[i].BdevName == lvolID { - return result.Namespace[i].NSID, nil - } - } - } - } - return 0, fmt.Errorf("no such namespace") -} - -func (node *nodeNVMf) subsystemAddListener(lvolID string) error { - type listenAddress struct { - TrType string `json:"trtype"` - AdrFam string `json:"adrfam"` - TrAddr string `json:"traddr"` - TrSvcID string `json:"trsvcid"` - } - - params := struct { - Nqn string `json:"nqn"` - ListenAddress listenAddress `json:"listen_address"` - }{ - Nqn: node.getVolumeNqn(lvolID), - ListenAddress: listenAddress{ - TrType: node.targetType, - TrAddr: node.targetAddr, - TrSvcID: node.targetPort, - AdrFam: cfgAddrFamily, - }, - } - - return node.client.call("nvmf_subsystem_add_listener", ¶ms, nil) -} - -func (node *nodeNVMf) subsystemRemoveNs(lvolID string) error { - nsID, err := node.subsystemGetNsID(lvolID) - if err != nil { - return err - } - - params := struct { - Nqn string `json:"nqn"` - NsID int `json:"nsid"` - }{ - Nqn: node.getVolumeNqn(lvolID), - NsID: nsID, - } - return node.client.call("nvmf_subsystem_remove_ns", ¶ms, nil) -} - -func (node *nodeNVMf) deleteSubsystem(lvolID string) error { - params := struct { - Nqn string `json:"nqn"` - }{ - Nqn: node.getVolumeNqn(lvolID), - } - - return node.client.call("nvmf_delete_subsystem", ¶ms, nil) -} - -func (node *nodeNVMf) createTransport() error { - // concurrent requests can happen despite this fast path check - if atomic.LoadInt32(&node.transCreated) != 0 { - return nil - } - - // TODO: support transport parameters - params := struct { - TrType string `json:"trtype"` - }{ - TrType: node.targetType, - } - - err := node.client.call("nvmf_create_transport", ¶ms, nil) - - if err == nil { - klog.V(5).Infof("Transport created: %s,%s", node.targetAddr, node.targetType) - atomic.StoreInt32(&node.transCreated, 1) - } else if strings.Contains(err.Error(), "already exists") { - err = nil // ignore transport already exists error - atomic.StoreInt32(&node.transCreated, 1) - } - - return err -}