From 243e469676fd6d5587467d3f4f3978afeda6f235 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Fri, 9 Feb 2024 17:50:38 +0000 Subject: [PATCH 1/5] Removed nbd tests from workflow (need root etc) --- .github/workflows/tests.yaml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 58cda9eb..9ca2809c 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -35,7 +35,6 @@ jobs: key: ${{ runner.os }}-go-mod-${{ hashFiles('**/go.sum') }} - name: Test - run: go test -v ./... - + run: go test -v $(go list ./... | grep -v expose) - name: Test with Race Conditions - run: go test -race -v ./... + run: go test -race -v $(go list ./... | grep -v expose) From 1468f97c46bade024662a4ced64946a9bcdcfd30 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Mon, 12 Feb 2024 12:35:24 +0000 Subject: [PATCH 2/5] Removed old nbd. Updated to new --- cmd/serve.go | 11 +- internal/expose/nbd.go | 433 --------------------- internal/expose/nbd_test.go | 336 ---------------- internal/expose/ndb_dev_test.go | 199 ---------- main.go | 13 +- pkg/storage/expose/nbd.go | 7 +- pkg/storage/expose/nbd_dev_test.go | 175 +++++---- pkg/storage/expose/nbd_test.go | 6 +- pkg/storage/modules/block_splitter_test.go | 10 +- pkg/storage/storage.go | 2 +- 10 files changed, 115 insertions(+), 1077 deletions(-) delete mode 100644 internal/expose/nbd.go delete mode 100644 internal/expose/nbd_test.go delete mode 100644 internal/expose/ndb_dev_test.go diff --git a/cmd/serve.go b/cmd/serve.go index ac4a5f88..1fce5993 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -13,9 +13,9 @@ import ( "syscall" "time" - "github.com/loopholelabs/silo/internal/expose" "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/blocks" + "github.com/loopholelabs/silo/pkg/storage/expose" "github.com/loopholelabs/silo/pkg/storage/modules" "github.com/loopholelabs/silo/pkg/storage/protocol" "github.com/loopholelabs/silo/pkg/storage/sources" @@ -234,13 +234,10 @@ func runServe(ccmd *cobra.Command, args []string) { * */ func setup(device string, prov storage.StorageProvider, server bool) (storage.ExposedStorage, error) { - p, err := expose.NewNBD(fmt.Sprintf("/dev/%s", device)) - if err != nil { - return nil, err - } + p := expose.NewExposedStorageNBD(prov, device, 1, 0, prov.Size(), 4096, 0) go func() { - err := p.Handle(prov) + err := p.Handle() if err != nil { fmt.Printf("p.Handle returned %v\n", err) } @@ -248,7 +245,7 @@ func setup(device string, prov storage.StorageProvider, server bool) (storage.Ex p.WaitReady() - err = os.Mkdir(fmt.Sprintf("/mnt/mount%s", device), 0600) + err := os.Mkdir(fmt.Sprintf("/mnt/mount%s", device), 0600) if err != nil { return nil, fmt.Errorf("Error mkdir %v", err) } diff --git a/internal/expose/nbd.go b/internal/expose/nbd.go deleted file mode 100644 index 6f1abeb9..00000000 --- a/internal/expose/nbd.go +++ /dev/null @@ -1,433 +0,0 @@ -package expose - -import ( - "encoding/binary" - "fmt" - "os" - "sync" - "syscall" - - "github.com/loopholelabs/silo/pkg/storage" -) - -/** - * Exposes a storage provider as an nbd device - * - */ -const NBD_COMMAND = 0xab00 -const NBD_SET_SOCK = 0 | NBD_COMMAND -const NBD_SET_BLKSIZE = 1 | NBD_COMMAND -const NBD_SET_SIZE = 2 | NBD_COMMAND -const NBD_DO_IT = 3 | NBD_COMMAND -const NBD_CLEAR_SOCK = 4 | NBD_COMMAND -const NBD_CLEAR_QUE = 5 | NBD_COMMAND -const NBD_PRINT_DEBUG = 6 | NBD_COMMAND -const NBD_SET_SIZE_BLOCKS = 7 | NBD_COMMAND -const NBD_DISCONNECT = 8 | NBD_COMMAND -const NBD_SET_TIMEOUT = 9 | NBD_COMMAND -const NBD_SET_FLAGS = 10 | NBD_COMMAND - -// NBD Commands -const NBD_CMD_READ = 0 -const NBD_CMD_WRITE = 1 -const NBD_CMD_DISCONNECT = 2 -const NBD_CMD_FLUSH = 3 -const NBD_CMD_TRIM = 4 - -// NBD Flags -const NBD_FLAG_HAS_FLAGS = (1 << 0) -const NBD_FLAG_READ_ONLY = (1 << 1) -const NBD_FLAG_SEND_FLUSH = (1 << 2) -const NBD_FLAG_SEND_TRIM = (1 << 5) - -const NBD_REQUEST_MAGIC = 0x25609513 -const NBD_RESPONSE_MAGIC = 0x67446698 - -// NBD Request packet -type Request struct { - Magic uint32 - Type uint32 - Handle uint64 - From uint64 - Length uint32 -} - -// NBD Response packet -type Response struct { - Magic uint32 - Error uint32 - Handle uint64 -} - -// IOctl call info -type IoctlCall struct { - Cmd uintptr - Value uintptr -} - -type NBDExposedStorage struct { - device *os.File - fp uintptr - socketPair [2]int - readyChannel chan error - dispatcher *Dispatch -} - -/** - * Handle storage requests using the provider - * - */ -func (s *NBDExposedStorage) Handle(prov storage.StorageProvider) error { - go func() { - err := s.dispatcher.Handle(s.socketPair[0], prov) - if err != nil { - fmt.Printf("RequestHandler quit unexpectedly %v\n", err) - // fmt.Printf("Sleeping for a min...\n") - // time.Sleep(time.Minute) - // Shutdown properly... - s.Shutdown() - } - /* - fmt.Printf("Close socketPair[0] %d\n", s.socketPair[0]) - err = syscall.Close(s.socketPair[0]) - if err != nil { - fmt.Printf("RequestHandler close error %v\n", err) - } - */ - }() - - // Issue ioctl calls to set it up - calls := []IoctlCall{ - {NBD_SET_BLKSIZE, 4096}, - {NBD_SET_TIMEOUT, 0}, - {NBD_PRINT_DEBUG, 1}, - {NBD_SET_SIZE, uintptr(prov.Size())}, - {NBD_CLEAR_QUE, 0}, - {NBD_CLEAR_SOCK, 0}, - {NBD_SET_SOCK, uintptr(s.socketPair[1])}, - {NBD_SET_FLAGS, NBD_FLAG_SEND_TRIM}, - {NBD_DO_IT, 0}, - } - - for _, c := range calls { - if c.Cmd == NBD_DO_IT { - s.readyChannel <- nil - } - // fmt.Printf("IOCTL %d %d\n", c.Cmd, c.Value) - _, _, en := syscall.Syscall(syscall.SYS_IOCTL, s.fp, c.Cmd, c.Value) - if en != 0 { - err := fmt.Errorf("syscall error %s", syscall.Errno(en)) - if c.Cmd != NBD_DO_IT { - s.readyChannel <- err - } - return err - } - } - return nil -} - -/** - * Wait until things are running - * This can only be called ONCE - */ -func (s *NBDExposedStorage) WaitReady() error { - // fmt.Printf("WaitReady\n") - // Wait for a ready signal (The ioctl DO_IT has just about been sent) - return <-s.readyChannel -} - -/** - * Shutdown the nbd device - * - */ -func (s *NBDExposedStorage) Shutdown() error { - s.dispatcher.Wait() - - calls := []IoctlCall{ - {NBD_DISCONNECT, 0}, - {NBD_CLEAR_SOCK, 0}, - } - - for _, c := range calls { - fmt.Printf("IOCTL %d %d\n", c.Cmd, c.Value) - _, _, en := syscall.Syscall(syscall.SYS_IOCTL, s.fp, c.Cmd, c.Value) - if en != 0 { - return fmt.Errorf("syscall error %s", syscall.Errno(en)) - } - } - fmt.Printf("CLOSING DEVICE...\n") - /* - fmt.Printf("Close socketPair[1] %d\n", s.socketPair[1]) - err := syscall.Close(s.socketPair[1]) - if err != nil { - return err - } - */ - return s.device.Close() -} - -/** - * Create a new NBD device - * - */ -func NewNBD(dev string) (storage.ExposedStorage, error) { - // Create a pair of sockets to communicate with the NBD device over - // fmt.Printf("Create socketpair\n") - sockPair, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0) - if err != nil { - return nil, err - } - // fmt.Printf("Socketpair %d %d\n", sockPair[0], sockPair[1]) - - // Open the nbd device - // fmt.Printf("Open device %s\n", dev) - fp, err := os.OpenFile(dev, os.O_RDWR, 0600) - if err != nil { - return nil, err - } - - es := &NBDExposedStorage{ - socketPair: sockPair, - device: fp, - fp: fp.Fd(), - readyChannel: make(chan error), - dispatcher: NewDispatch(), - } - - return es, nil -} - -type Dispatch struct { - ASYNC_READS bool - ASYNC_WRITES bool - fp *os.File - responseHeader []byte - writeLock sync.Mutex - prov storage.StorageProvider - fatal chan error - pendingResponses sync.WaitGroup -} - -func NewDispatch() *Dispatch { - d := &Dispatch{ - ASYNC_WRITES: true, - ASYNC_READS: true, - responseHeader: make([]byte, 16), - fatal: make(chan error, 8), - } - binary.BigEndian.PutUint32(d.responseHeader, NBD_RESPONSE_MAGIC) - return d -} - -func (d *Dispatch) Wait() { - // Wait for any pending responses - d.pendingResponses.Wait() -} - -/** - * Write a response... - * - */ -func (d *Dispatch) writeResponse(respError uint32, respHandle uint64, chunk []byte) error { - d.writeLock.Lock() - defer d.writeLock.Unlock() - - // fmt.Printf("WriteResponse %x -> %d\n", respHandle, len(chunk)) - - binary.BigEndian.PutUint32(d.responseHeader[4:], respError) - binary.BigEndian.PutUint64(d.responseHeader[8:], respHandle) - - _, err := d.fp.Write(d.responseHeader) - if err != nil { - return err - } - if len(chunk) > 0 { - _, err = d.fp.Write(chunk) - if err != nil { - return err - } - } - return nil -} - -/** - * This dispatches incoming NBD requests sequentially to the provider. - * - */ -func (d *Dispatch) Handle(fd int, prov storage.StorageProvider) error { - d.prov = prov - d.fp = os.NewFile(uintptr(fd), "unix") - - // Speed read and dispatch... - - BUFFER_SIZE := 4 * 1024 * 1024 - buffer := make([]byte, BUFFER_SIZE) - wp := 0 - - request := Request{} - - for { - // fmt.Printf("Calling read...\n") - n, err := d.fp.Read(buffer[wp:]) - if err != nil { - fmt.Printf("Error %v\n", err) - return err - } - wp += n - - // fmt.Printf("Read %d\n", n) - - // Now go through processing complete packets - rp := 0 - for { - // fmt.Printf("Processing data %d %d\n", rp, wp) - // Make sure we have a complete header - if wp-rp >= 28 { - // We can read the neader... - - header := buffer[rp : rp+28] - request.Magic = binary.BigEndian.Uint32(header) - request.Type = binary.BigEndian.Uint32(header[4:8]) - request.Handle = binary.BigEndian.Uint64(header[8:16]) - request.From = binary.BigEndian.Uint64(header[16:24]) - request.Length = binary.BigEndian.Uint32(header[24:28]) - - if request.Magic != NBD_REQUEST_MAGIC { - return fmt.Errorf("Received invalid MAGIC") - } - - if request.Type == NBD_CMD_DISCONNECT { - // fmt.Printf("CMD_DISCONNECT") - return nil // All done - } else if request.Type == NBD_CMD_FLUSH { - return fmt.Errorf("Not supported: Flush") - } else if request.Type == NBD_CMD_READ { - // fmt.Printf("READ %x %d\n", request.Handle, request.Length) - rp += 28 - err := d.cmdRead(request.Handle, request.From, request.Length) - if err != nil { - return err - } - } else if request.Type == NBD_CMD_WRITE { - rp += 28 - if wp-rp < int(request.Length) { - rp -= 28 - break // We don't have enough data yet... Wait for next read - } - data := make([]byte, request.Length) - copy(data, buffer[rp:rp+int(request.Length)]) - rp += int(request.Length) - // fmt.Printf("WRITE %x %d\n", request.Handle, request.Length) - err := d.cmdWrite(request.Handle, request.From, request.Length, data) - if err != nil { - return err - } - } else if request.Type == NBD_CMD_TRIM { - // fmt.Printf("TRIM\n") - rp += 28 - err = d.cmdTrim(request.Handle, request.From, request.Length) - if err != nil { - return err - } - } else { - return fmt.Errorf("NBD Not implemented %d\n", request.Type) - } - - } else { - break // Try again when we have more data... - } - } - // Now we need to move any partial to the start - if rp != 0 && rp != wp { - // fmt.Printf("Copy partial %d %d\n", rp, wp) - - copy(buffer, buffer[rp:wp]) - } - wp -= rp - - } -} - -/** - * cmdRead - * - */ -func (d *Dispatch) cmdRead(cmd_handle uint64, cmd_from uint64, cmd_length uint32) error { - - performRead := func(handle uint64, from uint64, length uint32) error { - data := make([]byte, length) - _, e := d.prov.ReadAt(data, int64(from)) - errorValue := uint32(0) - if e != nil { - errorValue = 1 - data = make([]byte, 0) // If there was an error, don't send data - } - return d.writeResponse(errorValue, handle, data) - } - - if d.ASYNC_READS { - d.pendingResponses.Add(1) - go func() { - err := performRead(cmd_handle, cmd_from, cmd_length) - if err != nil { - d.fatal <- err - } - d.pendingResponses.Done() - }() - } else { - return performRead(cmd_handle, cmd_from, cmd_length) - } - return nil -} - -/** - * cmdWrite - * - */ -func (d *Dispatch) cmdWrite(cmd_handle uint64, cmd_from uint64, cmd_length uint32, cmd_data []byte) error { - performWrite := func(handle uint64, from uint64, length uint32, data []byte) error { - _, e := d.prov.WriteAt(data, int64(from)) - errorValue := uint32(0) - if e != nil { - errorValue = 1 - } - return d.writeResponse(errorValue, handle, []byte{}) - } - - if d.ASYNC_WRITES { - d.pendingResponses.Add(1) - go func() { - err := performWrite(cmd_handle, cmd_from, cmd_length, cmd_data) - if err != nil { - d.fatal <- err - } - d.pendingResponses.Done() - }() - } else { - return performWrite(cmd_handle, cmd_from, cmd_length, cmd_data) - } - return nil -} - -/** - * cmdTrim - * - */ -func (d *Dispatch) cmdTrim(handle uint64, from uint64, length uint32) error { - // Ask the provider - /* - e := d.prov.Trim(from, length) - if e != storage.StorageError_SUCCESS { - err := d.writeResponse(1, handle, []byte{}) - if err != nil { - return err - } - } else { - */ - err := d.writeResponse(0, handle, []byte{}) - if err != nil { - return err - } - // } - return nil -} diff --git a/internal/expose/nbd_test.go b/internal/expose/nbd_test.go deleted file mode 100644 index d8657768..00000000 --- a/internal/expose/nbd_test.go +++ /dev/null @@ -1,336 +0,0 @@ -package expose - -import ( - crand "crypto/rand" - "fmt" - "math/rand" - "os" - "os/exec" - "sync" - "testing" - "time" - - "github.com/loopholelabs/silo/pkg/storage" - "github.com/loopholelabs/silo/pkg/storage/modules" - "github.com/loopholelabs/silo/pkg/storage/sources" -) - -const NBDdevice = "nbd2" - -/** - * Setup a disk with some files created. - * - */ -func setup(prov storage.StorageProvider, fileMin uint64, fileMax uint64, maxSize uint64) (storage.ExposedStorage, []string, error) { - defer func() { - // fmt.Printf("umount\n") - cmd := exec.Command("umount", fmt.Sprintf("/dev/%s", NBDdevice)) - err := cmd.Run() - if err != nil { - fmt.Printf("Error cleaning up unount %v\n", err) - } - - err = os.Remove(fmt.Sprintf("/mnt/setup%s", NBDdevice)) - if err != nil { - fmt.Printf("Error cleaning up remove %v\n", err) - } - }() - - p, err := NewNBD(fmt.Sprintf("/dev/%s", NBDdevice)) - if err != nil { - return nil, nil, err - } - - go func() { - err := p.Handle(prov) - if err != nil { - fmt.Printf("p.Handle returned %v\n", err) - } - }() - - p.WaitReady() - - err = os.Mkdir(fmt.Sprintf("/mnt/setup%s", NBDdevice), 0600) - if err != nil { - return nil, nil, err - } - - // fmt.Printf("mkfs.ext4\n") - cmd := exec.Command("mkfs.ext4", fmt.Sprintf("/dev/%s", NBDdevice)) - err = cmd.Run() - if err != nil { - return nil, nil, err - } - - // fmt.Printf("mount\n") - cmd = exec.Command("mount", fmt.Sprintf("/dev/%s", NBDdevice), fmt.Sprintf("/mnt/setup%s", NBDdevice)) - err = cmd.Run() - if err != nil { - return nil, nil, err - } - - totalData := 0 - files := make([]string, 0) - fileno := 0 - // Create some files fill up the disk... - for { - length := int(fileMin) - if fileMax > fileMin { - length += rand.Intn(int(fileMax - fileMin)) - } - - if totalData > int(maxSize) { - // We have enough data... - break - } - - randomData := make([]byte, length) - crand.Read(randomData) - name := fmt.Sprintf("/mnt/setup%s/file_%d", NBDdevice, fileno) - err := os.WriteFile(name, randomData, 0600) - if err != nil { - // Remove the file if it was created... - os.Remove(name) - - break // Out of disk space maybe. FIXME - } - - totalData += length - - files = append(files, fmt.Sprintf("/mnt/bench%s/file_%d", NBDdevice, fileno)) - fileno++ - } - - return p, files, nil -} - -func shutdown(p storage.ExposedStorage) error { - // fmt.Printf("umount\n") - cmd := exec.Command("umount", fmt.Sprintf("/dev/%s", NBDdevice)) - err := cmd.Run() - if err != nil { - return err - } - err = os.Remove(fmt.Sprintf("/mnt/bench%s", NBDdevice)) - if err != nil { - return err - } - - err = p.Shutdown() - if err != nil { - return err - } - return nil -} - -type TestConfig struct { - Name string - Size uint64 - FileMin uint64 - FileMax uint64 -} - -func BenchmarkRead(mb *testing.B) { - - tests := []TestConfig{ - {"BigFiles", 100 * 1024 * 1024, 20 * 1024 * 1024, 20 * 1024 * 1024}, - {"MediumFiles", 100 * 1024 * 1024, 1024 * 1024, 1024 * 1024}, - {"SmallFiles", 100 * 1024 * 1024, 8 * 1024, 8 * 1024}, - } - - for _, c := range tests { - - mb.Run(c.Name, func(b *testing.B) { - - // Setup... - driver := modules.NewMetrics(sources.NewMemoryStorage(int(c.Size))) - - p, files, err := setup(driver, c.FileMin, c.FileMax, uint64(float64(c.Size)*0.8)) - if err != nil { - fmt.Printf("Error setup %v\n", err) - return - } - - // Now mount the disk... - err = os.Mkdir(fmt.Sprintf("/mnt/bench%s", NBDdevice), 0600) - if err != nil { - panic(err) - } - - // fmt.Printf("mount\n") - cmd := exec.Command("mount", fmt.Sprintf("/dev/%s", NBDdevice), fmt.Sprintf("/mnt/bench%s", NBDdevice)) - err = cmd.Run() - if err != nil { - panic(err) - } - - /** - * Cleanup everything - * - */ - b.Cleanup(func() { - shutdown(p) - driver.ShowStats("stats") - }) - - driver.ResetMetrics() // Only start counting from now... - - // Here's the actual benchmark... - - var wg sync.WaitGroup - concurrent := make(chan bool, 32) - - // Now do some timing... - b.ResetTimer() - - totalData := int64(0) - - for i := 0; i < b.N; i++ { - fileno := rand.Intn(len(files)) - name := files[fileno] - - fi, err := os.Stat(name) - if err != nil { - fmt.Printf("Error statting file %v\n", err) - } else { - - wg.Add(1) - concurrent <- true - totalData += fi.Size() - - // Test read speed... - go func(filename string) { - - data, err := os.ReadFile(filename) - if err != nil { - fmt.Printf("Error reading file %v\n", err) - } - - if len(data) == 0 { - fmt.Printf("Warning: The file %s was not there\n", filename) - } - - wg.Done() - <-concurrent - }(name) - } - } - - b.SetBytes(totalData) - - wg.Wait() - - fmt.Printf("Total Data %d\n", totalData) - - }) - } -} - -func BenchmarkWrite(mb *testing.B) { - tests := []TestConfig{ - {"BigFiles", 100 * 1024 * 1024, 20 * 1024 * 1024, 20 * 1024 * 1024}, - {"MediumFiles", 100 * 1024 * 1024, 1024 * 1024, 1024 * 1024}, - {"SmallFiles", 100 * 1024 * 1024, 8 * 1024, 8 * 1024}, - } - - for _, c := range tests { - - mb.Run(c.Name, func(b *testing.B) { - - // Setup... - driver := modules.NewMetrics(sources.NewMemoryStorage(int(c.Size))) - - p, files, err := setup(driver, c.FileMin, c.FileMax, uint64(float64(c.Size)*0.8)) - - if err != nil { - fmt.Printf("Error setup %v\n", err) - return - } - - // Now mount the disk... - err = os.Mkdir(fmt.Sprintf("/mnt/bench%s", NBDdevice), 0600) - if err != nil { - panic(err) - } - - cmd := exec.Command("mount", fmt.Sprintf("/dev/%s", NBDdevice), fmt.Sprintf("/mnt/bench%s", NBDdevice)) - err = cmd.Run() - if err != nil { - panic(err) - } - - /** - * Cleanup everything - * - */ - b.Cleanup(func() { - shutdown(p) - driver.ShowStats("stats") - }) - - driver.ResetMetrics() - - // Here's the actual benchmark... - - var wg sync.WaitGroup - concurrent := make(chan bool, 32) - - // Now do some timing... - b.ResetTimer() - - totalData := int64(0) - - for i := 0; i < b.N; i++ { - fileno := rand.Intn(len(files)) - name := files[fileno] - - fi, err := os.Stat(name) - if err != nil { - fmt.Printf("Error statting file %v\n", err) - } else { - - totalData += fi.Size() - - newData := make([]byte, fi.Size()) - crand.Read(newData) - - wg.Add(1) - concurrent <- true - - // Test write speed... FIXME: Concurrent access to same file - go func(filename string, data []byte) { - - af, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - fmt.Printf("Error opening file %v\n", err) - } else { - _, err = af.Write(data) - if err != nil { - fmt.Printf("Error writing file %v\n", err) - time.Sleep(1 * time.Minute) - } - err = af.Close() - if err != nil { - fmt.Printf("Error closing file %v\n", err) - } - } - - //err := os.WriteFile(filename, data, 0600) - if err != nil { - fmt.Printf("Error writing file %v\n", err) - } - - wg.Done() - <-concurrent - }(name, newData) - } - } - - b.SetBytes(totalData) - - wg.Wait() - - fmt.Printf("Total Data %d\n", totalData) - }) - } -} diff --git a/internal/expose/ndb_dev_test.go b/internal/expose/ndb_dev_test.go deleted file mode 100644 index 431705b8..00000000 --- a/internal/expose/ndb_dev_test.go +++ /dev/null @@ -1,199 +0,0 @@ -package expose - -import ( - "fmt" - "os" - "sync" - "testing" - "time" - - "github.com/loopholelabs/silo/pkg/storage/modules" - "github.com/loopholelabs/silo/pkg/storage/sources" -) - -func BenchmarkDevRead(mb *testing.B) { - NBDdevice := "nbd1" - diskSize := 1024 * 1024 * 1024 * 4 - - mb.Run("devRead", func(b *testing.B) { - - // Setup... - // Lets simulate a little latency - store := sources.NewMemoryStorage(int(diskSize)) - store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 0, 100*time.Millisecond, 0) - driver := modules.NewMetrics(store_latency) - - p, err := NewNBD(fmt.Sprintf("/dev/%s", NBDdevice)) - if err != nil { - panic(err) - } - - go func() { - err := p.Handle(driver) - if err != nil { - fmt.Printf("p.Handle returned %v\n", err) - } - }() - - p.WaitReady() - - /** - * Cleanup everything - * - */ - b.Cleanup(func() { - err = p.Shutdown() - if err != nil { - fmt.Printf("Error cleaning up %v\n", err) - } - driver.ShowStats("stats") - }) - - driver.ResetMetrics() // Only start counting from now... - - // Here's the actual benchmark... - - devfile, err := os.OpenFile(fmt.Sprintf("/dev/%s", NBDdevice), os.O_RDWR, 0666) - if err != nil { - panic("Error opening dev file\n") - } - - b.Cleanup(func() { - devfile.Close() - }) - - var wg sync.WaitGroup - concurrent := make(chan bool, 100) // Do max 100 reads concurrently - - // Now do some timing... - b.ResetTimer() - - offset := int64(0) - totalData := int64(0) - - for i := 0; i < b.N; i++ { - wg.Add(1) - concurrent <- true - length := int64(4096) - offset += 4096 - if offset+length >= int64(diskSize) { - offset = 0 - } - totalData += length - - // Test read speed... - go func(f_offset int64, f_length int64) { - buffer := make([]byte, f_length) - _, err := devfile.ReadAt(buffer, f_offset) - if err != nil { - fmt.Printf("Error reading file %v\n", err) - } - - wg.Done() - <-concurrent - }(offset, length) - } - - b.SetBytes(totalData) - - wg.Wait() - - fmt.Printf("Total Data %d\n", totalData) - }) -} - -func BenchmarkDevWrite(mb *testing.B) { - NBDdevice := "nbd2" - diskSize := 1024 * 1024 * 1024 * 4 - - mb.Run("devWrite", func(b *testing.B) { - - // Setup... - // Lets simulate a little latency - store := sources.NewMemoryStorage(int(diskSize)) - store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 0, 100*time.Millisecond, 0) - driver := modules.NewMetrics(store_latency) - - p, err := NewNBD(fmt.Sprintf("/dev/%s", NBDdevice)) - if err != nil { - panic(err) - } - - go func() { - err := p.Handle(driver) - if err != nil { - fmt.Printf("p.Handle returned %v\n", err) - } - }() - - p.WaitReady() - - /** - * Cleanup everything - * - */ - b.Cleanup(func() { - err = p.Shutdown() - if err != nil { - fmt.Printf("Error cleaning up %v\n", err) - } - driver.ShowStats("stats") - }) - - driver.ResetMetrics() // Only start counting from now... - - // Here's the actual benchmark... - - devfile, err := os.OpenFile(fmt.Sprintf("/dev/%s", NBDdevice), os.O_RDWR, 0666) - if err != nil { - panic("Error opening dev file\n") - } - - b.Cleanup(func() { - devfile.Close() - }) - - var wg sync.WaitGroup - concurrent := make(chan bool, 100) // Do max 100 reads concurrently - - // Now do some timing... - b.ResetTimer() - - offset := int64(0) - totalData := int64(0) - - for i := 0; i < b.N; i++ { - wg.Add(1) - concurrent <- true - length := int64(4096) - offset += 4096 - if offset+length >= int64(diskSize) { - offset = 0 - } - totalData += length - - // Test read speed... - go func(f_offset int64, f_length int64) { - buffer := make([]byte, f_length) - _, err := devfile.WriteAt(buffer, f_offset) - if err != nil { - fmt.Printf("Error writing file %d %d | %v\n", f_offset, f_length, err) - } - - wg.Done() - <-concurrent - }(offset, length) - } - - err = devfile.Sync() - if err != nil { - fmt.Printf("Error syncing %v\n", err) - } - - b.SetBytes(totalData) - - wg.Wait() - - fmt.Printf("Total Data %d\n", totalData) - }) -} diff --git a/main.go b/main.go index 284ba63b..e07e92fe 100644 --- a/main.go +++ b/main.go @@ -8,8 +8,8 @@ import ( "syscall" "time" - "github.com/loopholelabs/silo/internal/expose" "github.com/loopholelabs/silo/pkg/storage" + "github.com/loopholelabs/silo/pkg/storage/expose" "github.com/loopholelabs/silo/pkg/storage/modules" "github.com/loopholelabs/silo/pkg/storage/sources" ) @@ -99,21 +99,18 @@ func main() { * */ func setup(prov storage.StorageProvider) (storage.ExposedStorage, error) { - p, err := expose.NewNBD(device) - if err != nil { - return nil, err - } + p := expose.NewExposedStorageNBD(prov, device, 1, 0, prov.Size(), 4096, 0) go func() { - err := p.Handle(prov) + err := p.Handle() if err != nil { - fmt.Printf("p.Handle returned %v\n", err) + fmt.Printf("p.Start returned %v\n", err) } }() p.WaitReady() - err = os.Mkdir(mountpoint, 0600) + err := os.Mkdir(mountpoint, 0600) if err != nil { return nil, fmt.Errorf("Error mkdir %v", err) } diff --git a/pkg/storage/expose/nbd.go b/pkg/storage/expose/nbd.go index 3cad9619..3441f7f5 100644 --- a/pkg/storage/expose/nbd.go +++ b/pkg/storage/expose/nbd.go @@ -154,7 +154,7 @@ func (n *ExposedStorageNBD) checkConn(dev string) error { return err } -func (n *ExposedStorageNBD) Start() error { +func (n *ExposedStorageNBD) Handle() error { device_file := fmt.Sprintf("/dev/%s", n.dev) fp, err := os.OpenFile(device_file, os.O_RDWR, 0600) @@ -216,7 +216,7 @@ func (n *ExposedStorageNBD) Start() error { } // Wait until it's ready... -func (n *ExposedStorageNBD) Ready() { +func (n *ExposedStorageNBD) WaitReady() error { for { err := n.checkConn(n.dev) if err == nil { @@ -224,9 +224,10 @@ func (n *ExposedStorageNBD) Ready() { } time.Sleep(100 * time.Nanosecond) } + return nil } -func (n *ExposedStorageNBD) Disconnect() error { +func (n *ExposedStorageNBD) Shutdown() error { fmt.Printf("Closing sockets...\n") // Close all the socket pairs... diff --git a/pkg/storage/expose/nbd_dev_test.go b/pkg/storage/expose/nbd_dev_test.go index d22a1b52..2cf3eff4 100644 --- a/pkg/storage/expose/nbd_dev_test.go +++ b/pkg/storage/expose/nbd_dev_test.go @@ -5,95 +5,108 @@ import ( "os" "sync" "testing" + "time" "github.com/loopholelabs/silo/pkg/storage/modules" "github.com/loopholelabs/silo/pkg/storage/sources" ) -func BenchmarkDevRead(b *testing.B) { - NBDdevice := "nbd1" - diskSize := 1024 * 1024 * 1024 * 4 - - // Setup... - // Lets simulate a little latency here - store := sources.NewMemoryStorage(int(diskSize)) - //store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 0, 100*time.Millisecond, 0) - driver := modules.NewMetrics(store) - - n := NewExposedStorageNBD(driver, NBDdevice, 1, 0, uint64(driver.Size()), 4096, 0) - - go func() { - err := n.Start() - if err != nil { - panic(err) - } - }() +func BenchmarkDevRead(mb *testing.B) { - n.Ready() + sizes := []int64{4096, 65536, 1024 * 1024} - /** - * Cleanup everything - * - */ - b.Cleanup(func() { - err := n.Disconnect() - if err != nil { - fmt.Printf("Error cleaning up %v\n", err) - } - driver.ShowStats("stats") - }) - - driver.ResetMetrics() // Only start counting from now... + for _, v := range sizes { + name := fmt.Sprintf("blocksize_%d", v) + mb.Run(name, func(b *testing.B) { + NBDdevice := "nbd1" + diskSize := 1024 * 1024 * 1024 * 4 - // Here's the actual benchmark... + // Setup... + // Lets simulate a little latency here + store := sources.NewMemoryStorage(int(diskSize)) + //store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 0, 100*time.Millisecond, 0) + driver := modules.NewMetrics(store) - devfile, err := os.OpenFile(fmt.Sprintf("/dev/%s", NBDdevice), os.O_RDWR, 0666) - if err != nil { - panic("Error opening dev file\n") - } + n := NewExposedStorageNBD(driver, NBDdevice, 1, 0, uint64(driver.Size()), 4096, 0) - b.Cleanup(func() { - devfile.Close() - }) + go func() { + err := n.Handle() + if err != nil { + panic(err) + } + }() - var wg sync.WaitGroup - concurrent := make(chan bool, 100) // Do max 100 reads concurrently + n.WaitReady() - // Now do some timing... - b.ResetTimer() + /** + * Cleanup everything + * + */ + b.Cleanup(func() { + err := n.Shutdown() + if err != nil { + fmt.Printf("Error cleaning up %v\n", err) + } + }) - read_size := 4096 - offset := int64(0) - totalData := int64(0) + driver.ResetMetrics() // Only start counting from now... - for i := 0; i < b.N; i++ { - wg.Add(1) - concurrent <- true - length := int64(read_size) - offset += int64(read_size) - if offset+length >= int64(diskSize) { - offset = 0 - } - totalData += length + // Here's the actual benchmark... - // Test read speed... - go func(f_offset int64, f_length int64) { - buffer := make([]byte, f_length) - _, err := devfile.ReadAt(buffer, f_offset) + devfile, err := os.OpenFile(fmt.Sprintf("/dev/%s", NBDdevice), os.O_RDWR, 0666) if err != nil { - fmt.Printf("Error reading file %v\n", err) + panic("Error opening dev file\n") } - wg.Done() - <-concurrent - }(offset, length) - } + b.Cleanup(func() { + devfile.Close() + }) + + var wg sync.WaitGroup + concurrent := make(chan bool, 100) // Do max 100 reads concurrently + + // Now do some timing... + b.ResetTimer() + ctime := time.Now() + + read_size := int64(v) + offset := int64(0) + totalData := int64(0) + + for i := 0; i < b.N; i++ { + wg.Add(1) + concurrent <- true + length := read_size + offset += read_size + if offset+length >= int64(diskSize) { + offset = 0 + } + totalData += length + + // Test read speed... + go func(f_offset int64, f_length int64) { + buffer := make([]byte, f_length) + _, err := devfile.ReadAt(buffer, f_offset) + if err != nil { + fmt.Printf("Error reading file %v\n", err) + } + + wg.Done() + <-concurrent + }(offset, length) + } - b.SetBytes(totalData) + b.SetBytes(int64(read_size)) - wg.Wait() + wg.Wait() - fmt.Printf("Total Data %d from %d ops\n", totalData, b.N) + duration := time.Since(ctime).Seconds() + mb_per_sec := float64(totalData) / (duration * 1024 * 1024) + + fmt.Printf("Total Data %d from %d ops in %dms RATE %.2fMB/s\n", totalData, b.N, time.Since(ctime).Milliseconds(), mb_per_sec) + driver.ShowStats("stats") + }) + } } func BenchmarkDevWrite(b *testing.B) { @@ -109,24 +122,23 @@ func BenchmarkDevWrite(b *testing.B) { n := NewExposedStorageNBD(driver, NBDdevice, 1, 0, uint64(driver.Size()), 4096, 0) go func() { - err := n.Start() + err := n.Handle() if err != nil { panic(err) } }() - n.Ready() + n.WaitReady() /** * Cleanup everything * */ b.Cleanup(func() { - err := n.Disconnect() + err := n.Shutdown() if err != nil { fmt.Printf("Error cleaning up %v\n", err) } - driver.ShowStats("stats") }) driver.ResetMetrics() // Only start counting from now... @@ -138,26 +150,22 @@ func BenchmarkDevWrite(b *testing.B) { panic("Error opening dev file\n") } - /* - b.Cleanup(func() { - devfile.Close() - }) - */ - var wg sync.WaitGroup concurrent := make(chan bool, 100) // Do max 100 writes concurrently // Now do some timing... b.ResetTimer() + ctime := time.Now() offset := int64(0) totalData := int64(0) + write_size := int64(4096) for i := 0; i < b.N; i++ { wg.Add(1) concurrent <- true - length := int64(4096) - offset += 4096 + length := write_size + offset += write_size if offset+length >= int64(diskSize) { offset = 0 } @@ -176,7 +184,7 @@ func BenchmarkDevWrite(b *testing.B) { }(offset, length) } - b.SetBytes(totalData) + b.SetBytes(int64(write_size)) wg.Wait() @@ -185,10 +193,13 @@ func BenchmarkDevWrite(b *testing.B) { if err != nil { fmt.Printf("Error performing sync %v\n", err) } + err = devfile.Close() if err != nil { fmt.Printf("Error closing dev %v\n", err) } - fmt.Printf("Total Data %d from %d ops\n", totalData, b.N) + fmt.Printf("Total Data %d from %d ops in %dms\n", totalData, b.N, time.Since(ctime).Milliseconds()) + driver.ShowStats("stats") + } diff --git a/pkg/storage/expose/nbd_test.go b/pkg/storage/expose/nbd_test.go index 3f5e9575..cc498fad 100644 --- a/pkg/storage/expose/nbd_test.go +++ b/pkg/storage/expose/nbd_test.go @@ -14,7 +14,7 @@ func TestNBDDevice(t *testing.T) { dev := "nbd1" defer func() { fmt.Printf("Shutting down properly...\n") - err := n.Disconnect() + err := n.Shutdown() assert.NoError(t, err) fmt.Printf("Shutdown complete\n") }() @@ -25,11 +25,11 @@ func TestNBDDevice(t *testing.T) { n = NewExposedStorageNBD(prov, dev, 1, 0, uint64(size), 4096, 0) go func() { - err := n.Start() + err := n.Handle() assert.NoError(t, err) }() - n.Ready() + n.WaitReady() devfile, err := os.OpenFile(fmt.Sprintf("/dev/%s", dev), os.O_RDWR, 0666) assert.NoError(t, err) diff --git a/pkg/storage/modules/block_splitter_test.go b/pkg/storage/modules/block_splitter_test.go index 5076f4d5..62959ab1 100644 --- a/pkg/storage/modules/block_splitter_test.go +++ b/pkg/storage/modules/block_splitter_test.go @@ -15,10 +15,10 @@ func TestBlockSplitter(t *testing.T) { // Create a new block storage, backed by memory storage size := 1024 * 1024 * 32 mem := sources.NewMemoryStorage(size) - source := NewArtificialLatency(mem, 10*time.Millisecond, 100*time.Nanosecond, 10*time.Millisecond, 100*time.Nanosecond) - metrics := NewMetrics(source) - log := NewLogger(metrics) - split := NewBlockSplitter(log, 100) // Block size of 100 bytes + // source := NewArtificialLatency(mem, 10*time.Millisecond, 100*time.Nanosecond, 10*time.Millisecond, 100*time.Nanosecond) + metrics := NewMetrics(mem) + //log := NewLogger(metrics) + split := NewBlockSplitter(metrics, 100) // Block size of 100 bytes // Fill it with stuff data := make([]byte, size) @@ -39,7 +39,7 @@ func TestBlockSplitter(t *testing.T) { // Read in a single go from the source itself buffer2 := make([]byte, read_len) ctime_2 := time.Now() - _, err = source.ReadAt(buffer2, 10) + _, err = mem.ReadAt(buffer2, 10) read_duration := time.Since(ctime_2) assert.Equal(t, buffer2, buffer) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index d931fc5d..5dd0c7da 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -25,7 +25,7 @@ type TrackingStorageProvider interface { } type ExposedStorage interface { - Handle(prov StorageProvider) error + Handle() error WaitReady() error Shutdown() error } From 8182755d41270a8d4f60f00945fb1f8cce75fd56 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Mon, 12 Feb 2024 13:20:32 +0000 Subject: [PATCH 3/5] Added writeAt block splitter and test --- cmd/serve.go | 1 + pkg/storage/modules/block_splitter.go | 62 ++++++++++++++++++++-- pkg/storage/modules/block_splitter_test.go | 38 ++++++++++++- 3 files changed, 96 insertions(+), 5 deletions(-) diff --git a/cmd/serve.go b/cmd/serve.go index 1fce5993..fbea96dc 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -85,6 +85,7 @@ func runServe(ccmd *cobra.Command, args []string) { sourceMonitor := modules.NewVolatilityMonitor(sourceDirty, block_size, 10*time.Second) sourceStorage := modules.NewLockable(sourceMonitor) + // Write some random stuff to the device... go writeRandom(sourceStorage) // Start monitoring blocks. diff --git a/pkg/storage/modules/block_splitter.go b/pkg/storage/modules/block_splitter.go index a593eb52..477c9962 100644 --- a/pkg/storage/modules/block_splitter.go +++ b/pkg/storage/modules/block_splitter.go @@ -75,12 +75,66 @@ func (i *BlockSplitter) ReadAt(buffer []byte, offset int64) (n int, err error) { return len(buffer), nil } -func (i *BlockSplitter) WriteAt(p []byte, off int64) (n int, err error) { - // Split the write up into blocks, and concurrenty perform the writes... +func (i *BlockSplitter) WriteAt(buffer []byte, offset int64) (n int, err error) { + // Split the read up into blocks, and concurrenty perform the reads... + end := uint64(offset + int64(len(buffer))) + if end > i.size { + end = i.size + } + + b_start := uint(offset / int64(i.block_size)) + b_end := uint((end-1)/uint64(i.block_size)) + 1 + + blocks := b_end - b_start + errs := make(chan error, blocks) + + for b := b_start; b < b_end; b++ { + go func(block_no uint) { + block_offset := int64(block_no) * int64(i.block_size) + var err error + if block_offset > offset { + // Partial write at the end + if len(buffer[block_offset-offset:]) < i.block_size { + block_buffer := make([]byte, i.block_size) + // Read existing data + _, err = i.prov.ReadAt(block_buffer, block_offset) + if err == nil { + // Update the data + copy(block_buffer, buffer[block_offset-offset:]) + // Write back + _, err = i.prov.WriteAt(block_buffer, block_offset) + } + } else { + // Complete write in the middle + s := block_offset - offset + e := s + int64(i.block_size) + if e > int64(len(buffer)) { + e = int64(len(buffer)) + } + _, err = i.prov.WriteAt(buffer[s:e], block_offset) + } + } else { + // Partial write at the start + block_buffer := make([]byte, i.block_size) + _, err = i.prov.ReadAt(block_buffer, block_offset) + if err == nil { + copy(block_buffer[offset-block_offset:], buffer) + _, err = i.prov.WriteAt(block_buffer, block_offset) + } + } + errs <- err + }(b) + } - // TODO + // Wait for completion, Check for errors and return... + for b := b_start; b < b_end; b++ { + e := <-errs + if e != nil { + return 0, e + } + } - return i.prov.WriteAt(p, off) + return len(buffer), nil } func (i *BlockSplitter) Flush() error { diff --git a/pkg/storage/modules/block_splitter_test.go b/pkg/storage/modules/block_splitter_test.go index 62959ab1..08f14981 100644 --- a/pkg/storage/modules/block_splitter_test.go +++ b/pkg/storage/modules/block_splitter_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestBlockSplitter(t *testing.T) { +func TestBlockSplitterRead(t *testing.T) { // Create a new block storage, backed by memory storage size := 1024 * 1024 * 32 @@ -47,3 +47,39 @@ func TestBlockSplitter(t *testing.T) { fmt.Printf("Read took %dms. Read split took %dms.\n", read_duration.Milliseconds(), read_duration_split.Milliseconds()) metrics.ShowStats("Source") } + +func TestBlockSplitterWrite(t *testing.T) { + + // Create a new block storage, backed by memory storage + size := 1024 * 1024 * 32 + mem := sources.NewMemoryStorage(size) + // metrics := NewMetrics(mem) + split := NewBlockSplitter(mem, 100) // Block size of 100 bytes + + write_len := 1234567 + + // Write some stuff to the mem... + b := make([]byte, size) + rand.Read(b) + _, err := mem.WriteAt(b, 0) + assert.NoError(t, err) + + offset := int64(10) + // Write using a splitter (splits the read up into concurrent block writes) + buffer := make([]byte, write_len) + rand.Read(buffer) + _, err = split.WriteAt(buffer, offset) + assert.NoError(t, err) + + // Check that the write was performed properly and didn't mess up anything + + copy(b[offset:], buffer) // Perform the write in the b buffer. + + check := make([]byte, size) + _, err = mem.ReadAt(check, 0) + assert.NoError(t, err) + + assert.Equal(t, b, check) + + // metrics.ShowStats("source") +} From a75a41e2317edac7e1e6db81e8a004e3630259d3 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Mon, 12 Feb 2024 14:08:50 +0000 Subject: [PATCH 4/5] Added unit test for readDirtyTracker --- pkg/storage/modules/block_splitter_test.go | 3 +- pkg/storage/modules/read_dirty_tracker.go | 2 +- .../modules/read_dirty_tracker_test.go | 47 +++++++++++++++++++ 3 files changed, 50 insertions(+), 2 deletions(-) create mode 100644 pkg/storage/modules/read_dirty_tracker_test.go diff --git a/pkg/storage/modules/block_splitter_test.go b/pkg/storage/modules/block_splitter_test.go index 08f14981..8eca4a87 100644 --- a/pkg/storage/modules/block_splitter_test.go +++ b/pkg/storage/modules/block_splitter_test.go @@ -45,7 +45,8 @@ func TestBlockSplitterRead(t *testing.T) { assert.Equal(t, buffer2, buffer) fmt.Printf("Read took %dms. Read split took %dms.\n", read_duration.Milliseconds(), read_duration_split.Milliseconds()) - metrics.ShowStats("Source") + // + // metrics.ShowStats("Source") } func TestBlockSplitterWrite(t *testing.T) { diff --git a/pkg/storage/modules/read_dirty_tracker.go b/pkg/storage/modules/read_dirty_tracker.go index 42282dd7..8fb2d4a9 100644 --- a/pkg/storage/modules/read_dirty_tracker.go +++ b/pkg/storage/modules/read_dirty_tracker.go @@ -9,7 +9,7 @@ import ( /** * This can track writes so that we can get a list of dirty areas after some period. - * Tracking is enabled for a block when a Read is performed + * Tracking is enabled for a block when a Read is performed on that block */ type FilterReadDirtyTracker struct { diff --git a/pkg/storage/modules/read_dirty_tracker_test.go b/pkg/storage/modules/read_dirty_tracker_test.go new file mode 100644 index 00000000..c3bb1a6e --- /dev/null +++ b/pkg/storage/modules/read_dirty_tracker_test.go @@ -0,0 +1,47 @@ +package modules + +import ( + "testing" + + "github.com/loopholelabs/silo/pkg/storage/sources" + "github.com/stretchr/testify/assert" +) + +func TestReadDirtyTracker(t *testing.T) { + + // Create a new block storage, backed by memory storage + size := 1024 * 1024 * 32 + mem := sources.NewMemoryStorage(size) + metrics := NewMetrics(mem) + tracker := NewFilterReadDirtyTracker(metrics, 4096) + + b := tracker.Sync() + + // There should be no dirty blocks + assert.Equal(t, 0, b.Count(0, b.Length())) + + // Perform a read to start tracking dirty writes + buffer := make([]byte, 1234567) + _, err := tracker.ReadAt(buffer, 10) + assert.NoError(t, err) + + // Now do a few writes to make dirty blocks... + locs := []int64{10, 10000, 40000} + for _, l := range locs { + w_buffer := make([]byte, 9000) + _, err = tracker.WriteAt(w_buffer, l) + assert.NoError(t, err) + } + + // Check the dirty blocks + b = tracker.Sync() + assert.Equal(t, 8, b.Count(0, b.Length())) + blocks := b.Collect(0, b.Length()) + expected_blocks := []uint{0, 1, 2, 3, 4, 9, 10, 11} + assert.Equal(t, expected_blocks, blocks) + + b = tracker.Sync() + + // There should be no dirty blocks + assert.Equal(t, 0, b.Count(0, b.Length())) +} From 25c30a876dcf9177698ad7e03f87cf0d216cc77b Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Mon, 12 Feb 2024 14:44:09 +0000 Subject: [PATCH 5/5] Added unit test for waitingCache --- pkg/storage/modules/waiting_cache_test.go | 52 +++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 pkg/storage/modules/waiting_cache_test.go diff --git a/pkg/storage/modules/waiting_cache_test.go b/pkg/storage/modules/waiting_cache_test.go new file mode 100644 index 00000000..51e86876 --- /dev/null +++ b/pkg/storage/modules/waiting_cache_test.go @@ -0,0 +1,52 @@ +package modules + +import ( + "crypto/rand" + "testing" + "time" + + "github.com/loopholelabs/silo/pkg/storage/sources" + "github.com/stretchr/testify/assert" +) + +func TestWaitingCache(t *testing.T) { + + // Create a new block storage, backed by memory storage + size := 1024 * 1024 * 32 + mem := sources.NewMemoryStorage(size) + metrics := NewMetrics(mem) + waiting := NewWaitingCache(metrics, 4096) + + data := make([]byte, 12000) + rand.Read(data) + + // We'll write something in 50ms + go func() { + time.Sleep(50 * time.Millisecond) + _, err := waiting.WriteAt(data, 0) + assert.NoError(t, err) + }() + + // The waiting cache will wait for data to be available. + offset := int64(20) + buffer := make([]byte, 8000) + ctime := time.Now() + _, err := waiting.ReadAt(buffer, offset) + assert.NoError(t, err) + wait_time := time.Since(ctime).Milliseconds() + + // We'd expect this read to take around 50ms (It's waiting for the data) + assert.InDelta(t, wait_time, 50, 10) + + assert.Equal(t, data[offset:int(offset)+len(buffer)], buffer) + + // Read again + ctime2 := time.Now() + _, err = waiting.ReadAt(buffer, offset) + assert.NoError(t, err) + wait_time2 := time.Since(ctime2).Milliseconds() + + // We'd expect this read to be instant (The data exists now) + assert.InDelta(t, wait_time2, 0, 10) + +}