Skip to content

Commit

Permalink
upload
Browse files Browse the repository at this point in the history
  • Loading branch information
Hamdy-khader committed Aug 22, 2023
1 parent 9bb108f commit 50a77d4
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 259 deletions.
4 changes: 2 additions & 2 deletions deploy/kubernetes/config-map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ data:
{
"nodes": [
{
"name": "localhost",
"rpcURL": "http://127.0.0.1:9009",
"name": "pool1",
"rpcURL": "http://127.0.0.1",
"targetType": "nvme-tcp",
"targetAddr": "127.0.0.1"
}
Expand Down
2 changes: 1 addition & 1 deletion deploy/kubernetes/secret.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ stringData:
{
"rpcTokens": [
{
"name": "localhost",
"name": "pool1",
"username": "spdkcsiuser",
"password": "spdkcsipass"
}
Expand Down
128 changes: 112 additions & 16 deletions pkg/util/jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -162,7 +163,7 @@ func (client *rpcClient) lvStores() ([]LvStore, error) {
UUID string `json:"uuid"`
}

err := client.call("bdev_lvol_get_lvstores", nil, &result)
err := client.callSBCLI("GET", "csi/get_pools", nil, &result)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -195,29 +196,64 @@ func (client *rpcClient) createVolume(lvolName, lvsName string, sizeMiB int64) (
}

var lvolID string

err := client.call("bdev_lvol_create", &params, &lvolID)
err := client.callSBCLI("POST", "csi/create_volume", &params, &lvolID)
if errorMatches(err, ErrJSONNoSpaceLeft) {
err = ErrJSONNoSpaceLeft // may happen in concurrency
}

return lvolID, err
}

// get a volume and return a BDev
// get a volume and return a BDev,, lvsName/lvolName
func (client *rpcClient) getVolume(lvolID string) (*BDev, error) {
var result []BDev
err := client.callSBCLI("GET",
fmt.Sprintf("csi/get_volume_info/%s", lvolID), nil, &result)

params := struct {
Name string `json:"name"`
}{
Name: lvolID,
if errorMatches(err, ErrJSONNoSuchDevice) {
return nil, ErrJSONNoSuchDevice
}
if err != nil {
return nil, err
}
return &result[0], err
}

// get a volume and return a BDev
func (client *rpcClient) getVolumeInfo(lvolID string) (map[string]string, error) {
var result []struct {
Name string `json:"name"`
UUID string `json:"uuid"`
BlockSize int64 `json:"block_size"`
NumBlocks int64 `json:"num_blocks"`
PoolID string `json:"pool_id"`
TargetType string `json:"targetType"`
TargetAddr string `json:"targetAddr"`
TargetPort string `json:"targetPort"`
Nqn string `json:"nqn"`
Model string `json:"model"`
LvolSize string `json:"lvolSize"`
}
err := client.call("bdev_get_bdevs", &params, &result)

err := client.callSBCLI("GET",
fmt.Sprintf("csi/get_volume_info/%s", lvolID), nil, &result)

if errorMatches(err, ErrJSONNoSuchDevice) {
return nil, ErrJSONNoSuchDevice
}
return &result[0], nil
r := &result[0]
return map[string]string{
"name": r.Name,
"uuid": r.UUID,
"pool_id": r.PoolID,

"targetType": r.TargetType,
"targetAddr": r.TargetAddr,
"targetPort": r.TargetPort,
"nqn": r.Nqn,
"model": r.Model,
"lvolSize": strconv.FormatInt(r.BlockSize*r.NumBlocks, 10),
}, nil
}

func (client *rpcClient) isVolumeCreated(lvolID string) (bool, error) {
Expand All @@ -232,13 +268,9 @@ func (client *rpcClient) isVolumeCreated(lvolID string) (bool, error) {
}

func (client *rpcClient) deleteVolume(lvolID string) error {
params := struct {
Name string `json:"name"`
}{
Name: lvolID,
}
err := client.callSBCLI("DELETE",
fmt.Sprintf("csi/delete_lvol/%s", lvolID), nil, nil)

err := client.call("bdev_lvol_delete", &params, nil)
if errorMatches(err, ErrJSONNoSuchDevice) {
err = ErrJSONNoSuchDevice // may happen in concurrency
}
Expand Down Expand Up @@ -360,6 +392,70 @@ func (client *rpcClient) call(method string, args, result interface{}) error {
return nil
}

func (client *rpcClient) callSBCLI(method string, path string, args, result interface{}) error {
//type rpcRequest struct {
// Ver string `json:"jsonrpc"`
// ID int32 `json:"id"`
// Method string `json:"method"`
//}
//
//id := atomic.AddInt32(&client.rpcID, 1)
//request := rpcRequest{
// Ver: "2.0",
// ID: id,
// Method: method,
//}

var data = []byte(`{}`)
var err error

if args != nil {
data, err = json.Marshal(args)
if err != nil {
return fmt.Errorf("%s: %w", method, err)
}
}

requestURL := fmt.Sprintf("%s/%s", client.rpcURL, path)
req, err := http.NewRequest(method, requestURL, bytes.NewReader(data))
if err != nil {
return fmt.Errorf("%s: %w", method, err)
}

//req.SetBasicAuth(client.rpcUser, client.rpcPass)
req.Header.Set("Content-Type", "application/json")

resp, err := client.httpClient.Do(req)
if err != nil {
return fmt.Errorf("%s: %w", method, err)
}

defer resp.Body.Close()
if resp.StatusCode >= http.StatusBadRequest {
return fmt.Errorf("%s: HTTP error code: %d", method, resp.StatusCode)
}

response := struct {
Error struct {
Code int `json:"code"`
Message string `json:"message"`
} `json:"error"`
Result interface{} `json:"result"`
}{
Result: result,
}

err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return fmt.Errorf("%s: %w", method, err)
}
if response.Error.Code != 0 {
return fmt.Errorf("%s: json response error: %s", method, response.Error.Message)
}

return nil
}

func errorMatches(errFull, errJSON error) bool {
if errFull == nil {
return false
Expand Down
Loading

0 comments on commit 50a77d4

Please sign in to comment.