From 7fb7879b534de891deb4004cb022e67222f3fd25 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Tue, 27 Feb 2024 14:27:34 +0000 Subject: [PATCH] bugfix multi con nbd, work on benchmarks --- pkg/storage/expose/nbd.go | 10 +- pkg/storage/expose/nbd_dispatch.go | 27 +++-- pkg/storage/expose/nbdnl.go | 13 ++- pkg/storage/expose/nbdnl_dev_test.go | 141 ++++++++++++++------------- pkg/storage/expose/nbdnl_test.go | 31 ++++-- 5 files changed, 122 insertions(+), 100 deletions(-) diff --git a/pkg/storage/expose/nbd.go b/pkg/storage/expose/nbd.go index 511d5e8b..05c07d2e 100644 --- a/pkg/storage/expose/nbd.go +++ b/pkg/storage/expose/nbd.go @@ -75,7 +75,6 @@ type ExposedStorageNBD struct { flags uint64 socketPairs [][2]int device_file uintptr - dispatch *Dispatch prov storage.StorageProvider } @@ -88,7 +87,6 @@ func NewExposedStorageNBD(prov storage.StorageProvider, dev string, num_connecti size: size, block_size: block_size, flags: flags, - dispatch: NewDispatch(), } } @@ -176,11 +174,11 @@ func (n *ExposedStorageNBD) Handle() error { } n.socketPairs = append(n.socketPairs, sockPair) + rwc := os.NewFile(uintptr(sockPair[1]), "unix") + d := NewDispatch(rwc, n.prov) + // Start reading commands on the socket and dispatching them to our provider - go func(fd int) { - rwc := os.NewFile(uintptr(fd), "unix") - n.dispatch.Handle(rwc, n.prov) - }(sockPair[1]) + go d.Handle() if i == 0 { n.setSizes(fp.Fd(), n.size, n.block_size, n.flags) diff --git a/pkg/storage/expose/nbd_dispatch.go b/pkg/storage/expose/nbd_dispatch.go index a7917662..0e2b48a9 100644 --- a/pkg/storage/expose/nbd_dispatch.go +++ b/pkg/storage/expose/nbd_dispatch.go @@ -9,6 +9,8 @@ import ( "github.com/loopholelabs/silo/pkg/storage" ) +// TODO: Context, and handle fatal errors + type Dispatch struct { ASYNC_READS bool ASYNC_WRITES bool @@ -18,14 +20,18 @@ type Dispatch struct { prov storage.StorageProvider fatal chan error pendingResponses sync.WaitGroup + packets_in uint64 + packets_out uint64 } -func NewDispatch() *Dispatch { +func NewDispatch(fp io.ReadWriteCloser, prov storage.StorageProvider) *Dispatch { d := &Dispatch{ ASYNC_WRITES: true, ASYNC_READS: true, responseHeader: make([]byte, 16), fatal: make(chan error, 8), + fp: fp, + prov: prov, } binary.BigEndian.PutUint32(d.responseHeader, NBD_RESPONSE_MAGIC) return d @@ -59,6 +65,8 @@ func (d *Dispatch) writeResponse(respError uint32, respHandle uint64, chunk []by return err } } + + d.packets_out++ return nil } @@ -66,11 +74,10 @@ func (d *Dispatch) writeResponse(respError uint32, respHandle uint64, chunk []by * This dispatches incoming NBD requests sequentially to the provider. * */ -func (d *Dispatch) Handle(fp io.ReadWriteCloser, prov storage.StorageProvider) error { - d.fp = fp - - d.prov = prov - +func (d *Dispatch) Handle() error { + // defer func() { + // fmt.Printf("Handle %d in %d out\n", d.packets_in, d.packets_out) + // }() // Speed read and dispatch... BUFFER_SIZE := 4 * 1024 * 1024 @@ -80,9 +87,9 @@ func (d *Dispatch) Handle(fp io.ReadWriteCloser, prov storage.StorageProvider) e request := Request{} for { + // fmt.Printf("Read from [%d in, %d out] %v\n", d.packets_in, d.packets_out, d.fp) n, err := d.fp.Read(buffer[wp:]) if err != nil { - fmt.Printf("Error %v\n", err) return err } wp += n @@ -109,13 +116,14 @@ func (d *Dispatch) Handle(fp io.ReadWriteCloser, prov storage.StorageProvider) e } if request.Type == NBD_CMD_DISCONNECT { - fmt.Printf(" -> CMD_DISCONNECT\n") + // fmt.Printf(" -> CMD_DISCONNECT\n") return nil // All done } else if request.Type == NBD_CMD_FLUSH { return fmt.Errorf("not supported: Flush") } else if request.Type == NBD_CMD_READ { // fmt.Printf("READ %x %d\n", request.Handle, request.Length) rp += 28 + d.packets_in++ err := d.cmdRead(request.Handle, request.From, request.Length) if err != nil { return err @@ -126,6 +134,7 @@ func (d *Dispatch) Handle(fp io.ReadWriteCloser, prov storage.StorageProvider) e rp -= 28 break // We don't have enough data yet... Wait for next read } + d.packets_in++ data := make([]byte, request.Length) copy(data, buffer[rp:rp+int(request.Length)]) rp += int(request.Length) @@ -182,7 +191,6 @@ func (d *Dispatch) cmdRead(cmd_handle uint64, cmd_from uint64, cmd_length uint32 go func() { err := performRead(cmd_handle, cmd_from, cmd_length) if err != nil { - fmt.Printf("FATAL %v\n", err) d.fatal <- err } d.pendingResponses.Done() @@ -212,7 +220,6 @@ func (d *Dispatch) cmdWrite(cmd_handle uint64, cmd_from uint64, cmd_length uint3 go func() { err := performWrite(cmd_handle, cmd_from, cmd_length, cmd_data) if err != nil { - fmt.Printf("FATAL %v\n", err) d.fatal <- err } d.pendingResponses.Done() diff --git a/pkg/storage/expose/nbdnl.go b/pkg/storage/expose/nbdnl.go index 4a2e54a4..71a965ad 100644 --- a/pkg/storage/expose/nbdnl.go +++ b/pkg/storage/expose/nbdnl.go @@ -1,7 +1,6 @@ package expose import ( - "fmt" "io" "net" "os" @@ -25,7 +24,6 @@ type ExposedStorageNBDNL struct { socks []io.Closer device_file uintptr - dispatch *Dispatch prov storage.StorageProvider DevIndex int } @@ -37,7 +35,6 @@ func NewExposedStorageNBDNL(prov storage.StorageProvider, num_connections int, t timeout: timeout, size: size, block_size: block_size, - dispatch: NewDispatch(), socks: make([]io.Closer, 0), } } @@ -61,10 +58,12 @@ func (n *ExposedStorageNBDNL) Handle() error { } server.Close() + // fmt.Printf("[%d] Socket pair %d -> %d %v %v %v\n", i, sockPair[0], sockPair[1], client, server, serverc) + + d := NewDispatch(serverc, n.prov) + // Start reading commands on the socket and dispatching them to our provider - go func(num int, c net.Conn) { - n.dispatch.Handle(c, n.prov) - }(i, serverc) + go d.Handle() n.socks = append(n.socks, serverc) socks = append(socks, client) @@ -107,7 +106,7 @@ func (n *ExposedStorageNBDNL) Shutdown() error { return err } - fmt.Printf("Closing sockets...\n") + // fmt.Printf("Closing sockets...\n") // Close all the socket pairs... for _, v := range n.socks { err = v.Close() diff --git a/pkg/storage/expose/nbdnl_dev_test.go b/pkg/storage/expose/nbdnl_dev_test.go index ced7eb19..74d1386b 100644 --- a/pkg/storage/expose/nbdnl_dev_test.go +++ b/pkg/storage/expose/nbdnl_dev_test.go @@ -2,6 +2,7 @@ package expose import ( "fmt" + "math/rand" "os" "os/user" "sync" @@ -22,96 +23,102 @@ func BenchmarkDevReadNL(mb *testing.B) { return } + cons := []int{1, 4, 16, 32} sizes := []int64{4096, 65536, 1024 * 1024} - for _, v := range sizes { - name := fmt.Sprintf("blocksize_%d", v) - mb.Run(name, func(b *testing.B) { - diskSize := 1024 * 1024 * 1024 * 4 + for _, c := range cons { + for _, v := range sizes { + name := fmt.Sprintf("readsize_%d_cons_%d", v, c) + mb.Run(name, func(b *testing.B) { + diskSize := 1024 * 1024 * 1024 * 4 - // Setup... - // Lets simulate a little latency here - store := sources.NewMemoryStorage(int(diskSize)) - //store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 0, 100*time.Millisecond, 0) - driver := modules.NewMetrics(store) + store := sources.NewMemoryStorage(int(diskSize)) + driver := modules.NewMetrics(store) - n := NewExposedStorageNBDNL(driver, 1, 0, uint64(driver.Size()), 4096) + n := NewExposedStorageNBDNL(driver, c, 0, uint64(driver.Size()), 4096) - err := n.Handle() - if err != nil { - panic(err) - } - - n.WaitReady() - - /** - * Cleanup everything - * - */ - b.Cleanup(func() { - err := n.Shutdown() + err := n.Handle() if err != nil { - fmt.Printf("Error cleaning up %v\n", err) + panic(err) } - }) - driver.ResetMetrics() // Only start counting from now... + n.WaitReady() - // Here's the actual benchmark... + /** + * Cleanup everything + * + */ + b.Cleanup(func() { + err := n.Shutdown() + if err != nil { + fmt.Printf("Error cleaning up %v\n", err) + } + }) - devfile, err := os.OpenFile(fmt.Sprintf("/dev/nbd%d", n.DevIndex), os.O_RDWR, 0666) - if err != nil { - panic("Error opening dev file\n") - } + driver.ResetMetrics() // Only start counting from now... - b.Cleanup(func() { - devfile.Close() - }) + // Here's the actual benchmark... - var wg sync.WaitGroup - concurrent := make(chan bool, 100) // Do max 100 reads concurrently + num_readers := 8 - // Now do some timing... - b.ResetTimer() - ctime := time.Now() + devfiles := make([]*os.File, 0) - read_size := int64(v) - offset := int64(0) - totalData := int64(0) + for i := 0; i < num_readers; i++ { + df, err := os.OpenFile(fmt.Sprintf("/dev/nbd%d", n.DevIndex), os.O_RDWR, 0666) + if err != nil { + panic("Error opening dev file\n") + } - for i := 0; i < b.N; i++ { - wg.Add(1) - concurrent <- true - length := read_size - offset += read_size - if offset+length >= int64(diskSize) { - offset = 0 + devfiles = append(devfiles, df) } - totalData += length - // Test read speed... - go func(f_offset int64, f_length int64) { - buffer := make([]byte, f_length) - _, err := devfile.ReadAt(buffer, f_offset) - if err != nil { - fmt.Printf("Error reading file %v\n", err) + b.Cleanup(func() { + for _, df := range devfiles { + df.Close() } + }) - wg.Done() - <-concurrent - }(offset, length) - } + var wg sync.WaitGroup + concurrent := make(chan bool, 100) // Do max 100 reads concurrently + + // Now do some timing... + b.ResetTimer() + // ctime := time.Now() - b.SetBytes(int64(read_size)) + read_size := int64(v) - wg.Wait() + for i := 0; i < b.N; i++ { + wg.Add(1) + concurrent <- true - duration := time.Since(ctime).Seconds() - mb_per_sec := float64(totalData) / (duration * 1024 * 1024) + // Test read speed... + go func() { + offset := rand.Intn(diskSize - int(read_size)) + length := read_size + + buffer := make([]byte, length) + dfi := rand.Intn(len(devfiles)) + _, err := devfiles[dfi].ReadAt(buffer, int64(offset)) + if err != nil { + fmt.Printf("Error reading file %v\n", err) + } + + wg.Done() + <-concurrent + }() + } - fmt.Printf("Total Data %d from %d ops in %dms RATE %.2fMB/s\n", totalData, b.N, time.Since(ctime).Milliseconds(), mb_per_sec) - driver.ShowStats("stats") - }) + b.SetBytes(int64(read_size)) + + wg.Wait() + + // duration := time.Since(ctime).Seconds() + // mb_per_sec := float64(totalData) / (duration * 1024 * 1024) + + // fmt.Printf("Total Data %d from %d ops in %dms RATE %.2fMB/s\n", totalData, b.N, time.Since(ctime).Milliseconds(), mb_per_sec) + // driver.ShowStats("stats") + }) + } } } diff --git a/pkg/storage/expose/nbdnl_test.go b/pkg/storage/expose/nbdnl_test.go index af56e2ef..d7a9157a 100644 --- a/pkg/storage/expose/nbdnl_test.go +++ b/pkg/storage/expose/nbdnl_test.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "os/user" + "sync" "testing" "github.com/loopholelabs/silo/pkg/storage/sources" @@ -31,7 +32,7 @@ func TestNBDNLDevice(t *testing.T) { size := 4096 * 1024 * 1024 prov := sources.NewMemoryStorage(size) - n = NewExposedStorageNBDNL(prov, 1, 0, uint64(size), 4096) + n = NewExposedStorageNBDNL(prov, 8, 0, uint64(size), 4096) err = n.Handle() assert.NoError(t, err) @@ -39,15 +40,25 @@ func TestNBDNLDevice(t *testing.T) { fmt.Printf("WaitReady...\n") n.WaitReady() - devfile, err := os.OpenFile(fmt.Sprintf("/dev/nbd%d", n.DevIndex), os.O_RDWR, 0666) - assert.NoError(t, err) + var wg sync.WaitGroup - // Try doing a read... - off := 12 - buffer := make([]byte, 4096) - num, err := devfile.ReadAt(buffer, int64(off)) - assert.NoError(t, err) - assert.Equal(t, len(buffer), num) - devfile.Close() + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + fmt.Printf("Open dev\n") + devfile, err := os.OpenFile(fmt.Sprintf("/dev/nbd%d", n.DevIndex), os.O_RDWR, 0666) + assert.NoError(t, err) + + // Try doing a read... + off := 12 + buffer := make([]byte, 4096) + num, err := devfile.ReadAt(buffer, int64(off)) + assert.NoError(t, err) + assert.Equal(t, len(buffer), num) + devfile.Close() + wg.Done() + }() + } + wg.Wait() }