Skip to content

Commit

Permalink
Merge pull request #82 from yowcow/fix-issue-81
Browse files Browse the repository at this point in the history
Fix bolt and berkeleydb storage locking
  • Loading branch information
yowcow authored Jul 24, 2018
2 parents 881c6e0 + 758470b commit bfdbf61
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 7 deletions.
3 changes: 3 additions & 0 deletions cmd/heavy-load/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/main
/_watcher
/_storage
13 changes: 13 additions & 0 deletions cmd/heavy-load/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
all: _watcher _storage main

_%:
mkdir -p $@

main:
go build -o $@

clean:
rm -rf main

realclean: clean
rm -rf _watcher _storage
159 changes: 159 additions & 0 deletions cmd/heavy-load/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package main

import (
"bytes"
"context"
"flag"
"io"
"log"
"os"
"sync"
"time"

"github.com/yowcow/goromdb/handler/simplehandler"
"github.com/yowcow/goromdb/loader"
"github.com/yowcow/goromdb/storage/boltstorage"
"github.com/yowcow/goromdb/watcher"
)

var (
concurrency int
duration int
help bool
logger *log.Logger

bucket = "goromdb"
watcherFile = "_watcher/data.db"
storagePath = "_storage"

sourceDataFile = "../../data/store/sample-boltdb.db"
sourceMD5File = "../../data/store/sample-boltdb.db.md5"
)

func init() {
flag.IntVar(&concurrency, "c", 1, "concurrency")
flag.IntVar(&duration, "d", 1, "duration in seconds")
flag.BoolVar(&help, "h", false, "show help")
flag.Parse()

if help {
flag.Usage()
os.Exit(0)
}
}

func init() {
logger = log.New(os.Stdout, "", log.Ldate|log.Ltime)
}

func init() {
if _, err := os.Stat(sourceDataFile); os.IsNotExist(err) {
logger.Println(sourceDataFile, "is not found")
os.Exit(1)
}
if _, err := os.Stat(sourceMD5File); os.IsNotExist(err) {
logger.Println(sourceMD5File, "is not found")
os.Exit(2)
}
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(duration)*time.Second+3*time.Second)
defer cancel()

// create watcher
wcr := watcher.New(watcherFile, 1, logger)
filein := wcr.Start(ctx)

// create storage
stg := boltstorage.New(bucket)

// create loader
ldr, err := loader.New(storagePath, "data.db")
if err != nil {
panic(err)
}

// create handler
hdr := simplehandler.New(stg, logger)

var wg sync.WaitGroup

// start goromdb handler
wg.Add(1)
go func(w *sync.WaitGroup) {
defer w.Done()
done := hdr.Start(filein, ldr)
<-done
}(&wg)

// start infinite file loading
wg.Add(1)
go func(w *sync.WaitGroup) {
defer w.Done()
tc := time.NewTicker(500 * time.Millisecond)
for {
// copy data.db
if _, err := os.Stat(watcherFile); os.IsNotExist(err) {
if r, err := os.Open(sourceDataFile); err == nil {
if w, err := os.OpenFile(watcherFile+".tmp", os.O_WRONLY|os.O_CREATE, 0644); err == nil {
io.Copy(w, r)
w.Close()
}
r.Close()
}
os.Rename(watcherFile+".tmp", watcherFile)
}
// copy data.db.md5
if _, err := os.Stat(watcherFile + ".md5"); os.IsNotExist(err) {
if r, err := os.Open(sourceMD5File); err == nil {
if w, err := os.OpenFile(watcherFile+".md5.tmp", os.O_WRONLY|os.O_CREATE, 0644); err == nil {
io.Copy(w, r)
w.Close()
}
r.Close()
}
os.Rename(watcherFile+".md5.tmp", watcherFile+".md5")
}

select {
case <-tc.C:
case <-ctx.Done():
tc.Stop()
return
}
}
}(&wg)

time.Sleep(3 * time.Second) // wait 3 secs

// start infinite `Get` calls
f := func(id int, w *sync.WaitGroup, c context.Context, l *log.Logger) {
defer w.Done()
for {
_, err := hdr.Get([]byte("hoge"))
if err != nil {
l.Println("worker", id, "got error:", err)
}

select {
case <-c.Done():
return
default:
}
}
}

logbuf := new(bytes.Buffer)
l := log.New(logbuf, "", log.Ldate|log.Ltime)
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go f(i+1, &wg, ctx, l)
}

// wait for everybody
wg.Wait()

logger.Println("===== errors during `Get()` calls =====")
io.WriteString(os.Stdout, logbuf.String())
}
11 changes: 7 additions & 4 deletions storage/bdbstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ var (
// Storage represents a BDB storage
type Storage struct {
db *atomic.Value
mux *sync.Mutex
mux *sync.RWMutex
}

// New creates and returns a storage
func New() *Storage {
return &Storage{new(atomic.Value), new(sync.Mutex)}
return &Storage{new(atomic.Value), new(sync.RWMutex)}
}

// Load loads a new db handle into storage, and closes old db handle if exists
Expand Down Expand Up @@ -54,15 +54,18 @@ func openBDB(file string) (*bdb.BerkeleyDB, error) {

// Get finds a given key in db, and returns its value
func (s *Storage) Get(key []byte) ([]byte, error) {
s.mux.Lock()
defer s.mux.Unlock()
s.mux.RLock()
defer s.mux.RUnlock()

db := s.getDB()
if db == nil {
return nil, storage.InternalError("couldn't load db")
}

v, err := db.Get(bdb.NoTxn, key, 0)
if err != nil {
return nil, storage.KeyNotFoundError(key)
}

return v, nil
}
6 changes: 3 additions & 3 deletions storage/boltstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ func openDB(file string) (*bolt.DB, error) {

// Get finds a given key in db, and returns its value
func (s *Storage) Get(key []byte) ([]byte, error) {
s.mux.RLock()
defer s.mux.RUnlock()

db := s.getDB()
if db == nil {
return nil, storage.InternalError("couldn't load db")
}

s.mux.RLock()
defer s.mux.RUnlock()

return getFromBucket(db, s.bucket, key)
}

Expand Down

0 comments on commit bfdbf61

Please sign in to comment.