Skip to content

Commit

Permalink
bugfix multi con nbd, work on benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmyaxod committed Feb 27, 2024
1 parent e0e6b97 commit 7fb7879
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 100 deletions.
10 changes: 4 additions & 6 deletions pkg/storage/expose/nbd.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type ExposedStorageNBD struct {
flags uint64
socketPairs [][2]int
device_file uintptr
dispatch *Dispatch
prov storage.StorageProvider
}

Expand All @@ -88,7 +87,6 @@ func NewExposedStorageNBD(prov storage.StorageProvider, dev string, num_connecti
size: size,
block_size: block_size,
flags: flags,
dispatch: NewDispatch(),
}
}

Expand Down Expand Up @@ -176,11 +174,11 @@ func (n *ExposedStorageNBD) Handle() error {
}
n.socketPairs = append(n.socketPairs, sockPair)

rwc := os.NewFile(uintptr(sockPair[1]), "unix")
d := NewDispatch(rwc, n.prov)

// Start reading commands on the socket and dispatching them to our provider
go func(fd int) {
rwc := os.NewFile(uintptr(fd), "unix")
n.dispatch.Handle(rwc, n.prov)
}(sockPair[1])
go d.Handle()

if i == 0 {
n.setSizes(fp.Fd(), n.size, n.block_size, n.flags)
Expand Down
27 changes: 17 additions & 10 deletions pkg/storage/expose/nbd_dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/loopholelabs/silo/pkg/storage"
)

// TODO: Context, and handle fatal errors

type Dispatch struct {
ASYNC_READS bool
ASYNC_WRITES bool
Expand All @@ -18,14 +20,18 @@ type Dispatch struct {
prov storage.StorageProvider
fatal chan error
pendingResponses sync.WaitGroup
packets_in uint64
packets_out uint64
}

func NewDispatch() *Dispatch {
func NewDispatch(fp io.ReadWriteCloser, prov storage.StorageProvider) *Dispatch {
d := &Dispatch{
ASYNC_WRITES: true,
ASYNC_READS: true,
responseHeader: make([]byte, 16),
fatal: make(chan error, 8),
fp: fp,
prov: prov,
}
binary.BigEndian.PutUint32(d.responseHeader, NBD_RESPONSE_MAGIC)
return d
Expand Down Expand Up @@ -59,18 +65,19 @@ func (d *Dispatch) writeResponse(respError uint32, respHandle uint64, chunk []by
return err
}
}

d.packets_out++
return nil
}

/**
* This dispatches incoming NBD requests sequentially to the provider.
*
*/
func (d *Dispatch) Handle(fp io.ReadWriteCloser, prov storage.StorageProvider) error {
d.fp = fp

d.prov = prov

func (d *Dispatch) Handle() error {
// defer func() {
// fmt.Printf("Handle %d in %d out\n", d.packets_in, d.packets_out)
// }()
// Speed read and dispatch...

BUFFER_SIZE := 4 * 1024 * 1024
Expand All @@ -80,9 +87,9 @@ func (d *Dispatch) Handle(fp io.ReadWriteCloser, prov storage.StorageProvider) e
request := Request{}

for {
// fmt.Printf("Read from [%d in, %d out] %v\n", d.packets_in, d.packets_out, d.fp)
n, err := d.fp.Read(buffer[wp:])
if err != nil {
fmt.Printf("Error %v\n", err)
return err
}
wp += n
Expand All @@ -109,13 +116,14 @@ func (d *Dispatch) Handle(fp io.ReadWriteCloser, prov storage.StorageProvider) e
}

if request.Type == NBD_CMD_DISCONNECT {
fmt.Printf(" -> CMD_DISCONNECT\n")
// fmt.Printf(" -> CMD_DISCONNECT\n")
return nil // All done
} else if request.Type == NBD_CMD_FLUSH {
return fmt.Errorf("not supported: Flush")
} else if request.Type == NBD_CMD_READ {
// fmt.Printf("READ %x %d\n", request.Handle, request.Length)
rp += 28
d.packets_in++
err := d.cmdRead(request.Handle, request.From, request.Length)
if err != nil {
return err
Expand All @@ -126,6 +134,7 @@ func (d *Dispatch) Handle(fp io.ReadWriteCloser, prov storage.StorageProvider) e
rp -= 28
break // We don't have enough data yet... Wait for next read
}
d.packets_in++
data := make([]byte, request.Length)
copy(data, buffer[rp:rp+int(request.Length)])
rp += int(request.Length)
Expand Down Expand Up @@ -182,7 +191,6 @@ func (d *Dispatch) cmdRead(cmd_handle uint64, cmd_from uint64, cmd_length uint32
go func() {
err := performRead(cmd_handle, cmd_from, cmd_length)
if err != nil {
fmt.Printf("FATAL %v\n", err)
d.fatal <- err
}
d.pendingResponses.Done()
Expand Down Expand Up @@ -212,7 +220,6 @@ func (d *Dispatch) cmdWrite(cmd_handle uint64, cmd_from uint64, cmd_length uint3
go func() {
err := performWrite(cmd_handle, cmd_from, cmd_length, cmd_data)
if err != nil {
fmt.Printf("FATAL %v\n", err)
d.fatal <- err
}
d.pendingResponses.Done()
Expand Down
13 changes: 6 additions & 7 deletions pkg/storage/expose/nbdnl.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package expose

import (
"fmt"
"io"
"net"
"os"
Expand All @@ -25,7 +24,6 @@ type ExposedStorageNBDNL struct {

socks []io.Closer
device_file uintptr
dispatch *Dispatch
prov storage.StorageProvider
DevIndex int
}
Expand All @@ -37,7 +35,6 @@ func NewExposedStorageNBDNL(prov storage.StorageProvider, num_connections int, t
timeout: timeout,
size: size,
block_size: block_size,
dispatch: NewDispatch(),
socks: make([]io.Closer, 0),
}
}
Expand All @@ -61,10 +58,12 @@ func (n *ExposedStorageNBDNL) Handle() error {
}
server.Close()

// fmt.Printf("[%d] Socket pair %d -> %d %v %v %v\n", i, sockPair[0], sockPair[1], client, server, serverc)

d := NewDispatch(serverc, n.prov)

// Start reading commands on the socket and dispatching them to our provider
go func(num int, c net.Conn) {
n.dispatch.Handle(c, n.prov)
}(i, serverc)
go d.Handle()

n.socks = append(n.socks, serverc)
socks = append(socks, client)
Expand Down Expand Up @@ -107,7 +106,7 @@ func (n *ExposedStorageNBDNL) Shutdown() error {
return err
}

fmt.Printf("Closing sockets...\n")
// fmt.Printf("Closing sockets...\n")
// Close all the socket pairs...
for _, v := range n.socks {
err = v.Close()
Expand Down
141 changes: 74 additions & 67 deletions pkg/storage/expose/nbdnl_dev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package expose

import (
"fmt"
"math/rand"
"os"
"os/user"
"sync"
Expand All @@ -22,96 +23,102 @@ func BenchmarkDevReadNL(mb *testing.B) {
return
}

cons := []int{1, 4, 16, 32}
sizes := []int64{4096, 65536, 1024 * 1024}

for _, v := range sizes {
name := fmt.Sprintf("blocksize_%d", v)
mb.Run(name, func(b *testing.B) {
diskSize := 1024 * 1024 * 1024 * 4
for _, c := range cons {
for _, v := range sizes {
name := fmt.Sprintf("readsize_%d_cons_%d", v, c)
mb.Run(name, func(b *testing.B) {
diskSize := 1024 * 1024 * 1024 * 4

// Setup...
// Lets simulate a little latency here
store := sources.NewMemoryStorage(int(diskSize))
//store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 0, 100*time.Millisecond, 0)
driver := modules.NewMetrics(store)
store := sources.NewMemoryStorage(int(diskSize))
driver := modules.NewMetrics(store)

n := NewExposedStorageNBDNL(driver, 1, 0, uint64(driver.Size()), 4096)
n := NewExposedStorageNBDNL(driver, c, 0, uint64(driver.Size()), 4096)

err := n.Handle()
if err != nil {
panic(err)
}

n.WaitReady()

/**
* Cleanup everything
*
*/
b.Cleanup(func() {
err := n.Shutdown()
err := n.Handle()
if err != nil {
fmt.Printf("Error cleaning up %v\n", err)
panic(err)
}
})

driver.ResetMetrics() // Only start counting from now...
n.WaitReady()

// Here's the actual benchmark...
/**
* Cleanup everything
*
*/
b.Cleanup(func() {
err := n.Shutdown()
if err != nil {
fmt.Printf("Error cleaning up %v\n", err)
}
})

devfile, err := os.OpenFile(fmt.Sprintf("/dev/nbd%d", n.DevIndex), os.O_RDWR, 0666)
if err != nil {
panic("Error opening dev file\n")
}
driver.ResetMetrics() // Only start counting from now...

b.Cleanup(func() {
devfile.Close()
})
// Here's the actual benchmark...

var wg sync.WaitGroup
concurrent := make(chan bool, 100) // Do max 100 reads concurrently
num_readers := 8

// Now do some timing...
b.ResetTimer()
ctime := time.Now()
devfiles := make([]*os.File, 0)

read_size := int64(v)
offset := int64(0)
totalData := int64(0)
for i := 0; i < num_readers; i++ {
df, err := os.OpenFile(fmt.Sprintf("/dev/nbd%d", n.DevIndex), os.O_RDWR, 0666)
if err != nil {
panic("Error opening dev file\n")
}

for i := 0; i < b.N; i++ {
wg.Add(1)
concurrent <- true
length := read_size
offset += read_size
if offset+length >= int64(diskSize) {
offset = 0
devfiles = append(devfiles, df)
}
totalData += length

// Test read speed...
go func(f_offset int64, f_length int64) {
buffer := make([]byte, f_length)
_, err := devfile.ReadAt(buffer, f_offset)
if err != nil {
fmt.Printf("Error reading file %v\n", err)
b.Cleanup(func() {
for _, df := range devfiles {
df.Close()
}
})

wg.Done()
<-concurrent
}(offset, length)
}
var wg sync.WaitGroup
concurrent := make(chan bool, 100) // Do max 100 reads concurrently

// Now do some timing...
b.ResetTimer()
// ctime := time.Now()

b.SetBytes(int64(read_size))
read_size := int64(v)

wg.Wait()
for i := 0; i < b.N; i++ {
wg.Add(1)
concurrent <- true

duration := time.Since(ctime).Seconds()
mb_per_sec := float64(totalData) / (duration * 1024 * 1024)
// Test read speed...
go func() {
offset := rand.Intn(diskSize - int(read_size))
length := read_size

buffer := make([]byte, length)
dfi := rand.Intn(len(devfiles))
_, err := devfiles[dfi].ReadAt(buffer, int64(offset))
if err != nil {
fmt.Printf("Error reading file %v\n", err)
}

wg.Done()
<-concurrent
}()
}

fmt.Printf("Total Data %d from %d ops in %dms RATE %.2fMB/s\n", totalData, b.N, time.Since(ctime).Milliseconds(), mb_per_sec)
driver.ShowStats("stats")
})
b.SetBytes(int64(read_size))

wg.Wait()

// duration := time.Since(ctime).Seconds()
// mb_per_sec := float64(totalData) / (duration * 1024 * 1024)

// fmt.Printf("Total Data %d from %d ops in %dms RATE %.2fMB/s\n", totalData, b.N, time.Since(ctime).Milliseconds(), mb_per_sec)
// driver.ShowStats("stats")
})
}
}
}

Expand Down
Loading

0 comments on commit 7fb7879

Please sign in to comment.