Skip to content

Commit

Permalink
feat(conc): introduce conc operations with chans and mutex for queue (#7
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ehrktia authored May 22, 2024
1 parent efa8865 commit 8a19567
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 221 deletions.
39 changes: 38 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,42 @@
#### memcache
### memcache

in memory cache via http api

#### endpoints

`/save`

to save data in to cache
data format

```json
{
"key":"some-key",
"value":"some-value"
}
```

req type - **POST**

`/get`

to retrieve data from cache

data format

```json
{
"key":"some-key"
}
```

#### about

this is a concurrent implementation of cache via http.
Can be used across any service layer as a stage or landing for some data
which you require to consume / refer / lookup

**TODO**

- [ ] benchmark and test for disaster recovery
- [ ] implement a leader process to have data sync between instances
118 changes: 74 additions & 44 deletions datastructure/datastructure.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,72 +2,102 @@ package datastructure

// TODO: make it concurrent using channels
import (
"strings"
"sync"
)

type Data struct {
Key any `json:"key"`
Value any `json:"value"`
}

// inMemoryCache concurrent cache memory
var inMemoryCache sync.Map
var result = make(chan any)
var load = make(chan bool)
var storeCh = make(chan *Data)

// var errCh = make(chan error)
var done = make(chan bool)

func buildInp(k, v any) {
go func() {
d := &Data{
Key: k, Value: v,
}
storeCh <- d
}()
}

// Add inserts new element in cache.when the element
// is already present it replaces existing value with new
// incoming value and returns new value and loaded flag as true
// triggers update to queue , which adds provided data to end of
// queue
func Add(k, v any) (result any, loaded bool) {
result, loaded = inMemoryCache.LoadOrStore(k, v)
if loaded {
result = v
// update queue with new element
inMemoryIdx.add(k)
return
}
inMemoryIdx.add(k)
return
func Add(k, v any) (any, bool) {
buildInp(k, v)
go func() {
data := <-storeCh
// load data
_, l := inMemoryCache.LoadOrStore(data.Key, data.Value)
// return output
result <- data.Value
load <- l
// update queue
idx := <-inMemoryIdx.getIdx(k)
// add when missing
if idx < 0 {
inMemoryIdx.add(k)
done <- true
return
}
// when present move to top
inMemoryIdx.swap(idx)
done <- true
}()
r := <-result
l := <-load
return r, l
}

// Delete removes provided key from cache
// remove the key from queue
func Delete(k any) {
inMemoryCache.Delete(k)
}

// Get fetches matching key from cache
// update queue in such manner the recently accessed
// element from cache is moved to front of queue
func Get(k any) (v any, ok bool) {
v, ok = inMemoryCache.Load(k)
// when found in cache
if ok {
updQueue(k)
idx := <-inMemoryIdx.getIdx(k)
if idx < 0 {
return
}
return
// remove element from queue
inMemoryIdx.removeAt(idx)
}

func updQueue(k any) {
idx := inMemoryIdx.getIdx(k)
// when element is missing from queue
// upd queue and move it to first position
if idx < 0 {
lastPosition := len(inMemoryIdx.list) - 1
inMemoryIdx.list[lastPosition] = k
inMemoryIdx.swap(lastPosition)

}
const NotFound = "not found"

func fetch(k any) chan any {
out := make(chan any)
go func(key any) {
v, ok := inMemoryCache.Load(key)
// when found in cache
if ok {
out <- v
// upd queue
idx := <-inMemoryIdx.getIdx(k)
inMemoryIdx.swap(idx)
return
}
out <- NotFound
}(k)
return out
}

// func updQueue(k any) {
// // get element position in queue
// idx := inMemoryIdx.getIndex(k)
// // not found in queue
// if idx < 0 {
// // add new key to queue
// inMemoryIdx.add(k)
// } else {
// // move element to front of queue
// inMemoryIdx.swap(idx)
// }
//
// }
func Get(k any) any {
out := <-fetch(k)
if strings.EqualFold(out.(string), NotFound) {
return NotFound
}
return out
}

// NewQueue creates new inmemory store and queue
// each instance of `once` creates new store and queue
Expand Down
85 changes: 29 additions & 56 deletions datastructure/queue.go
Original file line number Diff line number Diff line change
@@ -1,92 +1,65 @@
package datastructure

// type inMemoryQueue []any
import "sync"

type queue struct {
list []any
lastAdded int
lock *sync.RWMutex
}

func new(size int) *queue {
return &queue{
list: make([]any, size),
lastAdded: 0,
lock: &sync.RWMutex{},
}
}

func (q *queue) add(k any) {
q.lock.Lock()
defer q.lock.Unlock()
q.list[q.lastAdded] = k
q.lastAdded++
}

func (q *queue) swap(idx int) {
q.lock.Lock()
defer q.lock.Unlock()
q.list[0], q.list[idx] = q.list[idx], q.list[0]
}

func (q *queue) getIdx(k any) int {
totLen := len(q.list) - 1
for i := 0; i < totLen; i++ {
if q.list[i] == k {
return i
}
func (q *queue) getIdx(k any) chan int {
out := make(chan int)
go func() {
totLen := len(q.list) - 1
for i := 0; i < totLen; i++ {
if q.list[i] == k {
out <- i
}

}
return -1
}
out <- -1
}()
return out

}

var inMemoryIdx *queue

func (q *queue) evict() {
q.lock.Lock()
defer q.lock.Unlock()
lastElementPosition := len(q.list) - 1
q.list[lastElementPosition] = nil
}

// swap replaces first value in queue with
// recently accessed index
// func (idx *inMemoryQueue) swap(i int) {
// tmp := (*idx)[0]
// (*idx)[0] = (*idx)[i]
// (*idx)[i] = tmp
// }
//
// func (idx *inMemoryQueue) getIndex(k any) int {
// totLen := len(*idx) - 1
// for i := 0; i < totLen; i++ {
// if (*idx)[i] == k {
// return i
// }
// }
// return -1
// }

// add inserts the key to last empty position in queue
// func (idx *inMemoryQueue) add(k any) {
// index := idx.check()
// (*idx)[index] = k
// }

// func (idx *inMemoryQueue) check() int {
// // when queue is empty return first position
// if (*idx)[0] == nil {
// return 0
// }
// // get len of queue
// totLen := len(*idx) - 1
// // return the first free space in queue
// for i := totLen; i >= 0; i-- {
// if (*idx)[i] == nil {
// return i
// }
// }
// return totLen
//
// }
func (q *queue) removeAt(idx int) {
q.lock.Lock()
defer q.lock.Unlock()
q.list = append(q.list[:idx], q.list[idx+1:]...)
if q.lastAdded >= 1 {
q.lastAdded = q.lastAdded - 1
}

// evict follows last accessed/final element
// in queue and remove it from queue and cache store
// this is called only when queue is full and no space a vailable
// func (idx *) evict() {
// totLen := len(*idx) - 1
// inMemoryIdx[totLen] = nil
// }
}
Loading

0 comments on commit 8a19567

Please sign in to comment.