Skip to content

Commit

Permalink
Merge pull request #18 from OpenMPDK/wip-mem-alloc-replication
Browse files Browse the repository at this point in the history
Memory pool support for Replication IO
  • Loading branch information
somnathr authored Oct 25, 2023
2 parents 488ad0b + 9b1638c commit a8f809a
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 32 deletions.
4 changes: 2 additions & 2 deletions cmd/debug-storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ func (d *debugStorage) UpdateStats () error {
return err
}

func (d *debugStorage) ReadAndCopy(volume string, filePath string, writer io.Writer) (err error) {
err = d.s.ReadAndCopy(volume, filePath, writer)
func (d *debugStorage) ReadAndCopy(volume string, filePath string, writer io.Writer, sizehint int64) (err error) {
err = d.s.ReadAndCopy(volume, filePath, writer, sizehint)
if d.enable {
fmt.Printf("%s: ReadAndCopy(%s, %s) (%s)\n", d.path, volume, filePath, errStr(err))
}
Expand Down
119 changes: 119 additions & 0 deletions cmd/erasure-utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,125 @@ func (k *kvMetaPoolECPoolType) PrintCount() {
fmt.Println("## EC-Meta-Pool count", k.count)
}

// Pools used in Replication path..

type kvMinSizePoolRepPoolType struct {
*sync.Pool
}

func (k *kvMinSizePoolRepPoolType) Get() interface{} {
return k.Pool.Get()
}

func (k *kvMinSizePoolRepPoolType) Put(x interface{}) {
//fmt.Println("## EC-Pool Put called, ", k.count)
k.Pool.Put(x)
}


type kvMidSizePoolRepPoolType struct {
*sync.Pool
}

func (k *kvMidSizePoolRepPoolType) Get() interface{} {
return k.Pool.Get()
}

func (k *kvMidSizePoolRepPoolType) Put(x interface{}) {
//fmt.Println("## EC-Pool Put called, ", k.count)
k.Pool.Put(x)
}


type kvMaxSizePoolRepPoolType struct {
*sync.Pool
}

func (k *kvMaxSizePoolRepPoolType) Get() interface{} {
return k.Pool.Get()
}

func (k *kvMaxSizePoolRepPoolType) Put(x interface{}) {
k.Pool.Put(x)
}

var kvMinPoolRep *kvMinSizePoolRepPoolType = nil
var kvMidPoolRep *kvMidSizePoolRepPoolType = nil
var kvMaxPoolRep *kvMaxSizePoolRepPoolType = nil
var midSize int64 = 0
var minSize int64 = 0

func initRepPool(divFactor int) {

fmt.Println("### Creating Max Rep Pool with object size = ", globalMaxKVObject)
kvMaxPoolRep = &kvMaxSizePoolRepPoolType{
Pool: &sync.Pool{
New: func() interface{} {
b := make([]byte, globalMaxKVObject)
return b
},
},
}

midSize = globalMaxKVObject/int64 (divFactor)
for (midSize <= 131072) {
midSize = (2 * globalMaxKVObject)/int64 (divFactor)
}
fmt.Println("### Creating Mid Rep Pool with object size = ", midSize )
kvMidPoolRep = &kvMidSizePoolRepPoolType{
Pool: &sync.Pool{
New: func() interface{} {
b := make([]byte, midSize)
return b
},
},
}

divFactor = 2 * divFactor
minSize = globalMaxKVObject/int64 (divFactor)
for (minSize <= 32768) {
minSize = (2 * globalMaxKVObject)/int64 (divFactor)
}
fmt.Println("### Creating Min Rep Pool with object size = ", minSize )
kvMinPoolRep = &kvMinSizePoolRepPoolType{
Pool: &sync.Pool{
New: func() interface{} {
b := make([]byte, minSize)
return b
},
},
}



}

func poolAllocRep(size int64) []byte {
if (size <= minSize) {
return kvMinPoolRep.Get().([]byte)
} else if ((size > minSize) && (size <= midSize)) {
return kvMidPoolRep.Get().([]byte)
} else {
return kvMaxPoolRep.Get().([]byte)
}

}

func poolDeAllocRep(buffer []byte, size int64) {
if (size <= minSize) {
kvMinPoolRep.Put(buffer)
} else if ((size > minSize) && (size <= midSize)) {
kvMidPoolRep.Put(buffer)
} else {
kvMaxPoolRep.Put(buffer)
}

}




// End Rep pool

var kvPoolEC *kvPoolECPoolType = nil
var kvMetaPoolEC *kvMetaPoolECPoolType = nil
Expand Down
1 change: 1 addition & 0 deletions cmd/globals.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ var (
globalNotransaction_write bool
globalDo_Write_Opt bool
globalDontUseECMemPool bool
globalDontUseRepMemPool bool
globalZeroCopyReader bool
globalOptimizedMetaReader bool
globalMetaOptNoStat bool
Expand Down
29 changes: 18 additions & 11 deletions cmd/kv-storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,28 +1345,35 @@ func (k *KVStorage) WriteAll(volume string, filePath string, buf []byte) (err er
return nil
}

func (k *KVStorage) ReadAndCopy(volume string, filePath string, writer io.Writer) (err error) {
func (k *KVStorage) ReadAndCopy(volume string, filePath string, writer io.Writer, sizehint int64) (err error) {

var is_meta bool = false
var bufp *[]byte = nil
var buf []byte
var err_kv error

nskey := pathJoin(volume, filePath)
if strings.HasSuffix(nskey, xlMetaJSONFile) || strings.Contains(nskey, ".minio.sys") {
is_meta = true
} else {
nskey = k.DataKey(nskey)
}
if is_meta || strings.Contains(nskey, ".minio.sys") {
bufp = kvValuePoolMeta.Get().(*[]byte)
bufp := kvValuePoolMeta.Get().(*[]byte)
defer kvValuePoolMeta.Put(bufp)
buf, err_kv = k.kv.Get(nskey, *bufp)
if err_kv != nil {
return err_kv
}

} else {
//bufp = kvValuePool.Get().(*[]byte)
//defer kvValuePool.Put(bufp)
bufp = kvValuePoolNoEC.Get().(*[]byte)
defer kvValuePoolNoEC.Put(bufp)
}
buf, err_kv := k.kv.Get(nskey, *bufp)
if err_kv != nil {
return err_kv
var buffer []byte
buffer = poolAllocRep(sizehint)
defer poolDeAllocRep(buffer, sizehint)
buf, err_kv = k.kv.Get(nskey, buffer)
if err_kv != nil {
return err_kv
}

}

_, err = io.Copy(writer, bytes.NewReader(buf))
Expand Down
8 changes: 4 additions & 4 deletions cmd/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,9 @@ var kvPadding bool = os.Getenv("MINIO_NKV_PADDING") != "off"
var kvMaxValueSize = getKVMaxValueSize()

func getKVMaxValueSize() int {
str := os.Getenv("MINIO_NKV_MAX_VALUE_SIZE")
str := os.Getenv("MINIO_NKV_MAX_VALUE_SIZE_WITHOUT_RDD")
if str == "" {
return 2 * 1024 * 1024
return 1 * 1024 * 1024
}
valSize, err := strconv.Atoi(str)
logger.FatalIf(err, "parsing MINIO_NKV_MAX_VALUE_SIZE")
Expand Down Expand Up @@ -725,7 +725,7 @@ func (k *KV) Put(keyStr string, value []byte) error {
size := len(value)

if (!globalNoEC || (size > int (globalMaxKVObject))) {
max_supported_size = kvMaxValueSize
max_supported_size = kvMaxValueSize
}
if len(value) > max_supported_size {
fmt.Println("##### invalid value length during PUT", keyStr, len(value), max_supported_size)
Expand Down Expand Up @@ -822,7 +822,7 @@ func (k *KV) Get(keyStr string, value []byte) ([]byte, error) {
size := len(value)

if (!globalNoEC || (size > int (globalMaxKVObject))) {
max_supported_size = kvMaxValueSize
max_supported_size = kvMaxValueSize
}
if len(value) > max_supported_size {
fmt.Println("##### invalid value length during GET", keyStr, len(value), max_supported_size)
Expand Down
2 changes: 1 addition & 1 deletion cmd/posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,7 @@ func (s *posix) UpdateStats() error {
return nil
}

func (s *posix) ReadAndCopy(volume string, filePath string, writer io.Writer) (err error) {
func (s *posix) ReadAndCopy(volume string, filePath string, writer io.Writer, sizehint int64) (err error) {

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/storage-interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type StorageAPI interface {

// Read all.
ReadAll(volume string, path string) (buf []byte, err error)
ReadAndCopy(volume string, path string, writer io.Writer) (err error)
ReadAndCopy(volume string, path string, writer io.Writer, sizehint int64) (err error)
ReadRDDWay(volume string, filePath string, remoteAddress uint64, valueLen uint64, rKey uint32, remoteClientId string) (err error)
AddRDDParam(remoteClientId string, NQNId string, rQhandle uint16) (err error)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/storage-rest-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (client *storageRESTClient) UpdateStats() error {
return nil
}

func (client *storageRESTClient) ReadAndCopy(volume string, filePath string, writer io.Writer) (err error) {
func (client *storageRESTClient) ReadAndCopy(volume string, filePath string, writer io.Writer, sizehint int64) (err error) {

return nil
}
Expand Down
25 changes: 25 additions & 0 deletions cmd/xl-sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,31 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP
fmt.Printf("### RDD Key separator = %s\n", globalRddSeparator)

globalDummy_read = -1

globalDontUseRepMemPool = false
if os.Getenv("MINIO_DONT_USE_REP_MEM_POOL") != "" {
fmt.Println("### Setting up *not to* use Rep mem-pool.. ###")
globalDontUseRepMemPool = true
}

if (!globalDontUseRepMemPool) {
var divFactor int64
fmt.Println("### Setting up to *use* Rep mem-pool.. ###")
if v := os.Getenv("MINIO_REP_MEM_POOL_DIV_FACTOR"); v != "" {
var err error
divFactor, err = strconv.ParseInt(v, 10, 64)
if err != nil {
fmt.Printf("### Wrong value of MINIO_REP_MEM_POOL_DIV_FACTOR = %s\n", v)
divFactor = 4
}
} else {
divFactor = 4
}
fmt.Println("### Div Factor passed = ###", divFactor)

initRepPool(int (divFactor))
}


return s, nil
}
Expand Down
27 changes: 15 additions & 12 deletions cmd/xl-v1-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (xl xlObjects) getObjectNoMeta(ctx context.Context, bucket, object string,
}
} else {

err := disk.ReadAndCopy(bucket, object, writer)
err := disk.ReadAndCopy(bucket, object, writer, length)
if err != nil {
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
if err != io.ErrClosedPipe {
Expand Down Expand Up @@ -606,7 +606,7 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO
index := int(keyCrc % uint32(len(disks)))
disk := disks[index]
//fmt.Println(" ### getObject::Non EC Read :: ", index, onlineDisks, disks, disk, bucket, object)
err := disk.ReadAndCopy(bucket, object, writer)
err := disk.ReadAndCopy(bucket, object, writer, length)
if err != nil {
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
if err != io.ErrClosedPipe {
Expand Down Expand Up @@ -1131,16 +1131,16 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
}
}
} else {
var buff_size int64 = data.Size()
var buffer []byte
switch size := data.Size(); {
case size == 0:
buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF
case size == -1 || size >= blockSizeV1:
buffer = xl.bp.Get()
defer xl.bp.Put(buffer)
case size < blockSizeV1:
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
buffer = make([]byte, size, 2*size)
if (!globalDontUseRepMemPool) {
buffer = poolAllocRep(buff_size)
} else {
if (buff_size == 0) {
buffer = make([]byte, 1)
} else {
buffer = make([]byte, buff_size)
}
}

n, err := io.ReadFull(reader, buffer)
Expand Down Expand Up @@ -1217,8 +1217,11 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
index := int(keyCrc % uint32(len(disks)))
onlineDisks = make([]StorageAPI, 1)
onlineDisks[0] = disks[index]
//fmt.Println(" ### Non EC write :: ", n, index, onlineDisks, len(onlineDisks))
//fmt.Println(" ### Non EC write :: ", n, len(buffer), index, onlineDisks, len(onlineDisks))
err = disks[index].WriteAll(bucket, object, buffer[:n])
if (!globalDontUseRepMemPool) {
poolDeAllocRep(buffer, buff_size)
}
if err != nil {
logger.LogIf(ctx, err)
return ObjectInfo{}, toObjectErr(err, bucket, object)
Expand Down

0 comments on commit a8f809a

Please sign in to comment.