Skip to content

Commit

Permalink
Merge pull request #61 from bgokden/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
bgokden authored Jul 22, 2021
2 parents b22714b + 5c5822c commit 833f8ba
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 2 deletions.
2 changes: 2 additions & 0 deletions data/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func TestData2(t *testing.T) {
for _, e := range collector.List {
log.Printf("label: %v score: %v\n", string(e.Datum.Value.Label), e.Score)
keyByte, _ := data.GetKeyAsBytes(e.Datum)
valueByte, _ := data.GetValueAsBytes(e.Datum)
log.Printf("len key normal: %v len key compressed: %v info: %v\n", len(keyByte), len(util.Compress(keyByte)), string(e.Datum.Key.GroupLabel))
log.Printf("len value normal: %v len value compressed: %v info: %v\n", len(valueByte), len(util.Compress(valueByte)), string(e.Datum.Value.Label))
}
assert.Equal(t, config.Limit, uint32(len(collector.List)))

Expand Down
34 changes: 33 additions & 1 deletion data/newsync.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package data

import (
"errors"
"io/ioutil"
"log"
"math/rand"
"os"
"time"
"unsafe"

"github.com/bgokden/veri/annoyindex"
"github.com/bgokden/veri/util"
Expand All @@ -17,19 +19,48 @@ type DBMapEntry struct {
Datum *pb.Datum
}

func NewAllocadtedDatum(datum *pb.Datum) *pb.Datum {
ptrKey := (*pb.DatumKey)(util.GlobalMemoli.New(unsafe.Sizeof(*(datum.Key))))
if ptrKey == nil {
return nil
}
ptrValue := (*pb.DatumValue)(util.GlobalMemoli.New(unsafe.Sizeof(*(datum.Value))))
if ptrValue == nil {
util.GlobalMemoli.Free(unsafe.Pointer(ptrKey))
return nil
}
newDatum := &pb.Datum{
Key: ptrKey,
Value: ptrValue,
}
*(newDatum.Key) = *datum.Key
*(newDatum.Value) = *datum.Value
return newDatum
}

func FreeAllocadtedDatum(datum *pb.Datum) {
util.GlobalMemoli.Free(unsafe.Pointer(datum.Key))
util.GlobalMemoli.Free(unsafe.Pointer(datum.Value))
}

func (dt *Data) InsertBDMap(datum *pb.Datum, config *pb.InsertConfig) error {
exprireAt := int64(0)
if config != nil && config.TTL != 0 {
exprireAt = time.Now().Unix() + int64(config.TTL)
}
newDatum := NewAllocadtedDatum(datum)
if newDatum == nil {
return errors.New("Running out of reserved memory")
}
entry := &DBMapEntry{
ExprireAt: exprireAt,
Datum: datum,
Datum: newDatum,
}
keyByte, err := GetKeyAsBytes(datum)
if err != nil {
return err
}

dt.DBMap.Store(util.EncodeToString(keyByte), entry)
return nil
}
Expand All @@ -40,6 +71,7 @@ func (dt *Data) DeleteBDMap(datum *pb.Datum) error {
return err
}
dt.DBMap.Delete(util.EncodeToString(keyByte))
FreeAllocadtedDatum(datum)
return nil
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/bgokden/go-cache v2.1.1+incompatible
github.com/chewxy/math32 v1.0.8
github.com/goburrow/cache v0.1.3
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
github.com/golang/protobuf v1.4.1
github.com/google/go-cmp v0.5.4 // indirect
github.com/google/uuid v1.1.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ github.com/goburrow/cache v0.1.3 h1:v/fTGH/k6gMjd+GO2YysVYv+p11LK7vpT56WiLHir50=
github.com/goburrow/cache v0.1.3/go.mod h1:cDFesZDnIlrHoNlMYqqMpCRawuXulgx+y7mXU8HZ+/c=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 h1:zN2lZNZRflqFyxVaTIU61KNKQ9C0055u9CAfpmqUvo4=
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3/go.mod h1:nPpo7qLxd6XL3hWJG/O60sR8ZKfMCiIoNap5GvD12KU=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down
2 changes: 2 additions & 0 deletions server/veriserviceserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/bgokden/veri/node"
"github.com/bgokden/veri/state"
"github.com/bgokden/veri/util"
)

func RunServer(configMap map[string]interface{}) {
Expand Down Expand Up @@ -93,6 +94,7 @@ func RunServer(configMap map[string]interface{}) {
log.Printf("Closing services started.")
// Cleap up here
s.Close()
util.GlobalMemoli.Close()
os.Exit(0)
}()
log.Printf("Server started.")
Expand Down
179 changes: 179 additions & 0 deletions util/memoli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package util

import (
"io/ioutil"
"log"
"os"
"sync"
"sync/atomic"
"syscall"
"unsafe"

"github.com/golang-collections/collections/stack"
)

var GlobalMemoli *Memoli

func init() {
GlobalMemoli = NewGlobalMemoli()
}

type MemoliArena struct {
BlockSize uintptr
Length int
MaxSize uintptr
MmapHandle []byte
StartPointer uintptr
Filename string
Index uintptr
resuableIndexMutex sync.Mutex
Reuseables *stack.Stack
}

func getNewMemoliArena(blockSize uintptr, length int) *MemoliArena {
ma, err := NewMemoliArena(int(blockSize), length)
if err != nil {
log.Printf("Memoli Creation Error: %v", err)
return nil
}
return ma
}

func NewMemoliArena(blockSize int, length int) (*MemoliArena, error) {
memoliArena := &MemoliArena{
BlockSize: uintptr(blockSize),
Length: length,
MaxSize: uintptr(blockSize * length),
}
ti := blockSize * memoliArena.Length
mapFile, err := ioutil.TempFile("", "memoliarena")
if err != nil {
return nil, err
}
memoliArena.Filename = mapFile.Name()
_, err = mapFile.Seek(int64(ti-1), 0)
if err != nil {
return nil, err
}
_, err = mapFile.Write([]byte(" "))
if err != nil {
return nil, err
}
mmap, err := syscall.Mmap(int(mapFile.Fd()), 0, int(ti), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
if err != nil {
return nil, err
}
memoliArena.MmapHandle = mmap
memoliArena.StartPointer = uintptr(unsafe.Pointer(&mmap[0]))

memoliArena.Reuseables = stack.New()
memoliArena.Reuseables.Push(uintptr(0))
return memoliArena, nil
}

func (ma *MemoliArena) Close(clean bool) error {
err := syscall.Munmap(ma.MmapHandle)
if err != nil {
return err
}
if clean {
os.Remove(ma.Filename)
}
return nil
}

func (ma *MemoliArena) GetNewIndex() uintptr {
ma.resuableIndexMutex.Lock()
defer ma.resuableIndexMutex.Unlock()
if reusbaleInterface := ma.Reuseables.Pop(); reusbaleInterface != nil {
if resuable, ok := reusbaleInterface.(uintptr); ok {
return resuable
}
}
index := atomic.AddUintptr(&ma.Index, ma.BlockSize)
return index
}

func (ma *MemoliArena) GetByteSlicePtr() *[]byte {
index := ma.GetNewIndex()
return (*[]byte)(unsafe.Pointer(ma.StartPointer + index))
}

// This is only for testing
func (ma *MemoliArena) GetByteSlicePtrByIndex(index int) *[]byte {
diff := uintptr(index) * ma.BlockSize
return (*[]byte)(unsafe.Pointer(ma.StartPointer + diff))
}

func (ma *MemoliArena) Delete(byteSlicePtr *[]byte) error {
ma.resuableIndexMutex.Lock()
defer ma.resuableIndexMutex.Unlock()
index := uintptr(unsafe.Pointer(byteSlicePtr)) - ma.StartPointer
ma.Reuseables.Push(index)
return nil
}

func (ma *MemoliArena) New() unsafe.Pointer {
index := ma.GetNewIndex()
if index >= ma.MaxSize {
return nil // run out of space
}
return unsafe.Pointer(ma.StartPointer + index)
}

func (ma *MemoliArena) Free(ptr unsafe.Pointer) {
ma.resuableIndexMutex.Lock()
defer ma.resuableIndexMutex.Unlock()
index := uintptr(ptr) - ma.StartPointer
ma.Reuseables.Push(index)
}

type Memoli struct {
ArenaMap sync.Map
Length int
BucketSize uintptr
}

func NewGlobalMemoli() *Memoli {
return &Memoli{
Length: 1e+6,
BucketSize: uintptr(256),
}
}

func (m *Memoli) ArenaKey(size uintptr) uintptr {
return size % m.BucketSize
}

func (m *Memoli) New(size uintptr) unsafe.Pointer {
maInterface, _ := m.ArenaMap.LoadOrStore(m.ArenaKey(size), getNewMemoliArena(size, m.Length))
if maInterface == nil {
return nil
}
if ma, ok := maInterface.(*MemoliArena); ok {
return ma.New()
}
return nil
}

func (m *Memoli) Free(ptr unsafe.Pointer) {
size := unsafe.Sizeof(ptr)
if maInterface, ok := m.ArenaMap.Load(m.ArenaKey(size)); ok {
if ma, ok2 := maInterface.(*MemoliArena); ok2 {
ma.Free(ptr)
}
}
}

func (m *Memoli) Close() error {
m.ArenaMap.Range(func(_, value interface{}) bool {
if ma, ok := value.(*MemoliArena); ok {
err := ma.Close(true)
if err != nil {
log.Printf("Error %v\n", err) // There is not much to do
}
}
return true
})
return nil // For future
}
99 changes: 99 additions & 0 deletions util/memoli_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package util_test

import (
"fmt"
"log"
"testing"
"unsafe"

"github.com/bgokden/veri/util"
"github.com/stretchr/testify/assert"
)

// This is a test for MemoliArena development
func TestMemoliArena(t *testing.T) {
n := int(1000)
size := int(8192)

b, err := util.NewMemoliArena(size, n)
assert.Nil(t, err)

arrayBackedByMmap := []*[]byte{}

log.Println("Start")
testN := 10
for i := 0; i < testN; i++ {
a := []byte(fmt.Sprintf("hello world %v", i))
slicePtr := b.GetByteSlicePtr()
if slicePtr != nil {
log.Printf("slicePtr: %X\n", unsafe.Pointer(slicePtr))
*slicePtr = a
}
arrayBackedByMmap = append(arrayBackedByMmap, slicePtr)
}

for i := 0; i < testN; i++ {
slicePtr := b.GetByteSlicePtrByIndex(i)
fmt.Printf("_%v_\n", string(*slicePtr))
}

// delete first 3
for i := 0; i < 3; i++ {
slicePtr := arrayBackedByMmap[i]
b.Delete(slicePtr)
}

// re-write
for i := 20; i < 20+testN; i++ {
a := []byte(fmt.Sprintf("hello world %v", i))
slicePtr := b.GetByteSlicePtr()
if slicePtr != nil {
*slicePtr = a
}
arrayBackedByMmap = append(arrayBackedByMmap, slicePtr)
}

// re-print
for i, byteSlice := range arrayBackedByMmap {
fmt.Printf("%v -> %p = _%v_\n", i, byteSlice, string(*byteSlice))
}

fmt.Println("done")

err = b.Close(true)
assert.Nil(t, err)
}

func TestGlobalMemoli(t *testing.T) {
arrayBackedByMmap := []*string{}
log.Println("Start")
testN := 10
for i := 0; i < testN; i++ {
a := fmt.Sprintf("hello world %v", i)
slicePtr := (*string)(util.GlobalMemoli.New(unsafe.Sizeof(a)))
if slicePtr != nil {
log.Printf("slicePtr: %X\n", unsafe.Pointer(slicePtr))
*slicePtr = a
}
arrayBackedByMmap = append(arrayBackedByMmap, slicePtr)
}

for i := 0; i < 3; i++ {
slicePtr := arrayBackedByMmap[i]
util.GlobalMemoli.Free(unsafe.Pointer(slicePtr))
}

for i := 20; i < 20+testN; i++ {
a := fmt.Sprintf("hello world %v", i)
slicePtr := (*string)(util.GlobalMemoli.New(unsafe.Sizeof(a)))
if slicePtr != nil {
log.Printf("slicePtr: %X\n", unsafe.Pointer(slicePtr))
*slicePtr = a
}
arrayBackedByMmap = append(arrayBackedByMmap, slicePtr)
}

for i, slicePtr := range arrayBackedByMmap {
fmt.Printf("%v -> %p = _%v_\n", i, slicePtr, *slicePtr)
}
}
Loading

0 comments on commit 833f8ba

Please sign in to comment.