diff --git a/cmd/heavy-load/.gitignore b/cmd/heavy-load/.gitignore new file mode 100644 index 0000000..c44904f --- /dev/null +++ b/cmd/heavy-load/.gitignore @@ -0,0 +1,3 @@ +/main +/_watcher +/_storage diff --git a/cmd/heavy-load/Makefile b/cmd/heavy-load/Makefile new file mode 100644 index 0000000..0cf94c6 --- /dev/null +++ b/cmd/heavy-load/Makefile @@ -0,0 +1,13 @@ +all: _watcher _storage main + +_%: + mkdir -p $@ + +main: + go build -o $@ + +clean: + rm -rf main + +realclean: clean + rm -rf _watcher _storage diff --git a/cmd/heavy-load/main.go b/cmd/heavy-load/main.go new file mode 100644 index 0000000..4420cda --- /dev/null +++ b/cmd/heavy-load/main.go @@ -0,0 +1,159 @@ +package main + +import ( + "bytes" + "context" + "flag" + "io" + "log" + "os" + "sync" + "time" + + "github.com/yowcow/goromdb/handler/simplehandler" + "github.com/yowcow/goromdb/loader" + "github.com/yowcow/goromdb/storage/boltstorage" + "github.com/yowcow/goromdb/watcher" +) + +var ( + concurrency int + duration int + help bool + logger *log.Logger + + bucket = "goromdb" + watcherFile = "_watcher/data.db" + storagePath = "_storage" + + sourceDataFile = "../../data/store/sample-boltdb.db" + sourceMD5File = "../../data/store/sample-boltdb.db.md5" +) + +func init() { + flag.IntVar(&concurrency, "c", 1, "concurrency") + flag.IntVar(&duration, "d", 1, "duration in seconds") + flag.BoolVar(&help, "h", false, "show help") + flag.Parse() + + if help { + flag.Usage() + os.Exit(0) + } +} + +func init() { + logger = log.New(os.Stdout, "", log.Ldate|log.Ltime) +} + +func init() { + if _, err := os.Stat(sourceDataFile); os.IsNotExist(err) { + logger.Println(sourceDataFile, "is not found") + os.Exit(1) + } + if _, err := os.Stat(sourceMD5File); os.IsNotExist(err) { + logger.Println(sourceMD5File, "is not found") + os.Exit(2) + } +} + +func main() { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(duration)*time.Second+3*time.Second) + defer cancel() + + // create watcher + wcr := watcher.New(watcherFile, 1, logger) + filein := wcr.Start(ctx) + + // create storage + stg := boltstorage.New(bucket) + + // create loader + ldr, err := loader.New(storagePath, "data.db") + if err != nil { + panic(err) + } + + // create handler + hdr := simplehandler.New(stg, logger) + + var wg sync.WaitGroup + + // start goromdb handler + wg.Add(1) + go func(w *sync.WaitGroup) { + defer w.Done() + done := hdr.Start(filein, ldr) + <-done + }(&wg) + + // start infinite file loading + wg.Add(1) + go func(w *sync.WaitGroup) { + defer w.Done() + tc := time.NewTicker(500 * time.Millisecond) + for { + // copy data.db + if _, err := os.Stat(watcherFile); os.IsNotExist(err) { + if r, err := os.Open(sourceDataFile); err == nil { + if w, err := os.OpenFile(watcherFile+".tmp", os.O_WRONLY|os.O_CREATE, 0644); err == nil { + io.Copy(w, r) + w.Close() + } + r.Close() + } + os.Rename(watcherFile+".tmp", watcherFile) + } + // copy data.db.md5 + if _, err := os.Stat(watcherFile + ".md5"); os.IsNotExist(err) { + if r, err := os.Open(sourceMD5File); err == nil { + if w, err := os.OpenFile(watcherFile+".md5.tmp", os.O_WRONLY|os.O_CREATE, 0644); err == nil { + io.Copy(w, r) + w.Close() + } + r.Close() + } + os.Rename(watcherFile+".md5.tmp", watcherFile+".md5") + } + + select { + case <-tc.C: + case <-ctx.Done(): + tc.Stop() + return + } + } + }(&wg) + + time.Sleep(3 * time.Second) // wait 3 secs + + // start infinite `Get` calls + f := func(id int, w *sync.WaitGroup, c context.Context, l *log.Logger) { + defer w.Done() + for { + _, err := hdr.Get([]byte("hoge")) + if err != nil { + l.Println("worker", id, "got error:", err) + } + + select { + case <-c.Done(): + return + default: + } + } + } + + logbuf := new(bytes.Buffer) + l := log.New(logbuf, "", log.Ldate|log.Ltime) + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go f(i+1, &wg, ctx, l) + } + + // wait for everybody + wg.Wait() + + logger.Println("===== errors during `Get()` calls =====") + io.WriteString(os.Stdout, logbuf.String()) +} diff --git a/storage/bdbstorage/storage.go b/storage/bdbstorage/storage.go index 5bb74a9..59eb3ba 100644 --- a/storage/bdbstorage/storage.go +++ b/storage/bdbstorage/storage.go @@ -15,12 +15,12 @@ var ( // Storage represents a BDB storage type Storage struct { db *atomic.Value - mux *sync.Mutex + mux *sync.RWMutex } // New creates and returns a storage func New() *Storage { - return &Storage{new(atomic.Value), new(sync.Mutex)} + return &Storage{new(atomic.Value), new(sync.RWMutex)} } // Load loads a new db handle into storage, and closes old db handle if exists @@ -54,15 +54,18 @@ func openBDB(file string) (*bdb.BerkeleyDB, error) { // Get finds a given key in db, and returns its value func (s *Storage) Get(key []byte) ([]byte, error) { - s.mux.Lock() - defer s.mux.Unlock() + s.mux.RLock() + defer s.mux.RUnlock() + db := s.getDB() if db == nil { return nil, storage.InternalError("couldn't load db") } + v, err := db.Get(bdb.NoTxn, key, 0) if err != nil { return nil, storage.KeyNotFoundError(key) } + return v, nil } diff --git a/storage/boltstorage/storage.go b/storage/boltstorage/storage.go index 696fdd2..672d23f 100644 --- a/storage/boltstorage/storage.go +++ b/storage/boltstorage/storage.go @@ -56,14 +56,14 @@ func openDB(file string) (*bolt.DB, error) { // Get finds a given key in db, and returns its value func (s *Storage) Get(key []byte) ([]byte, error) { + s.mux.RLock() + defer s.mux.RUnlock() + db := s.getDB() if db == nil { return nil, storage.InternalError("couldn't load db") } - s.mux.RLock() - defer s.mux.RUnlock() - return getFromBucket(db, s.bucket, key) }