Skip to content

Commit

Permalink
Some more work on nbd read bench
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmyaxod committed Feb 27, 2024
1 parent 84b2f08 commit a370b45
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 11 deletions.
6 changes: 3 additions & 3 deletions pkg/storage/expose/nbd_dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (

// TODO: Context, and handle fatal errors

const READ_POOL_BUFFER_SIZE = 1 * 1024 * 1024
const READ_POOL_SIZE = 32
const READ_POOL_BUFFER_SIZE = 256 * 1024
const READ_POOL_SIZE = 128

type Dispatch struct {
ASYNC_READS bool
Expand Down Expand Up @@ -202,7 +202,7 @@ func (d *Dispatch) cmdRead(cmd_handle uint64, cmd_from uint64, cmd_length uint32
// Couldn't get one from the pool
if b == nil {
// We'll have to alloc it
fmt.Printf("Alloc %d\n", length)
// fmt.Printf("Alloc %d\n", length)
b = make([]byte, length)
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/expose/nbdnl.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ type ExposedStorageNBDNL struct {
device_file uintptr
prov storage.StorageProvider
DevIndex int
async bool
}

func NewExposedStorageNBDNL(prov storage.StorageProvider, num_connections int, timeout uint64, size uint64, block_size uint64) *ExposedStorageNBDNL {
func NewExposedStorageNBDNL(prov storage.StorageProvider, num_connections int, timeout uint64, size uint64, block_size uint64, async bool) *ExposedStorageNBDNL {
return &ExposedStorageNBDNL{
prov: prov,
num_connections: num_connections,
timeout: timeout,
size: size,
block_size: block_size,
socks: make([]io.Closer, 0),
async: async,
}
}

Expand All @@ -61,7 +63,8 @@ func (n *ExposedStorageNBDNL) Handle() error {
// fmt.Printf("[%d] Socket pair %d -> %d %v %v %v\n", i, sockPair[0], sockPair[1], client, server, serverc)

d := NewDispatch(serverc, n.prov)

d.ASYNC_READS = n.async
d.ASYNC_WRITES = n.async
// Start reading commands on the socket and dispatching them to our provider
go d.Handle()

Expand Down
134 changes: 129 additions & 5 deletions pkg/storage/expose/nbdnl_dev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func BenchmarkDevReadNL(mb *testing.B) {
}

cons := []int{1, 4, 16, 32}
sizes := []int64{4096, 65536, 1024 * 1024}
sizes := []int64{4 * 1024, 8 * 1024, 16 * 1024, 32 * 1024, 64 * 1024, 256 * 1024, 1024 * 1024}
diskSize := 1024 * 1024 * 1024 * 4

for _, c := range cons {
Expand All @@ -35,7 +35,7 @@ func BenchmarkDevReadNL(mb *testing.B) {
store := sources.NewMemoryStorage(int(diskSize))
driver := modules.NewMetrics(store)

n := NewExposedStorageNBDNL(driver, c, 0, uint64(driver.Size()), 4096)
n := NewExposedStorageNBDNL(driver, c, 0, uint64(driver.Size()), 4096, true)

err := n.Handle()
if err != nil {
Expand Down Expand Up @@ -91,8 +91,7 @@ func BenchmarkDevReadNL(mb *testing.B) {
}

for i := 0; i < b.N; i++ {
offset := (rand.Intn(diskSize-int(read_size)) / 4096) * 4096
offsets = append(offsets, offset)
offsets[i] = (rand.Intn(diskSize-int(read_size)) / 4096) * 4096
}

b.ResetTimer()
Expand Down Expand Up @@ -127,6 +126,131 @@ func BenchmarkDevReadNL(mb *testing.B) {
}
}

func BenchmarkDevReadNLLatency(mb *testing.B) {
currentUser, err := user.Current()
if err != nil {
panic(err)
}
if currentUser.Username != "root" {
fmt.Printf("Cannot run test unless we are root.\n")
return
}

latencies := []time.Duration{0, 1 * time.Millisecond, 5 * time.Millisecond}
cons := []int{1, 16}
asyncs := []bool{false, true}
diskSize := 1024 * 1024 * 1024 * 8

for _, lts := range latencies {
for _, asy := range asyncs {
for _, cns := range cons {
name := fmt.Sprintf("latency_%dms_cons_%d_async_%t", lts.Milliseconds(), cns, asy)
mb.Run(name, func(b *testing.B) {

store := sources.NewMemoryStorage(int(diskSize))
lstore := modules.NewArtificialLatency(store, lts, 0, lts, 0)
// logstore := modules.NewLogger(lstore)
driver := modules.NewMetrics(lstore)

data := make([]byte, 4096)
for p := 0; p < len(data); p++ {
data[p] = 0x9a
}
for off := 0; off < diskSize; off += 4096 {
n, err := store.WriteAt(data, int64(off))
if n != len(data) || err != nil {
panic(fmt.Sprintf("Can't WriteAt %v\n", err))
}
}

n := NewExposedStorageNBDNL(driver, cns, 0, uint64(driver.Size()), 4096, asy)

err := n.Handle()
if err != nil {
panic(err)
}

n.WaitReady()

/**
* Cleanup everything
*
*/
b.Cleanup(func() {
err := n.Shutdown()
if err != nil {
fmt.Printf("Error cleaning up %v\n", err)
}
// driver.ShowStats(fmt.Sprintf("Metrics-%d", b.N))
})

// driver.ResetMetrics() // Only start counting from now...

// Here's the actual benchmark...

num_readers := 8

devfiles := make([]*os.File, 0)

for i := 0; i < num_readers; i++ {
df, err := os.OpenFile(fmt.Sprintf("/dev/nbd%d", n.DevIndex), os.O_RDWR, 0666)
if err != nil {
panic("Error opening dev file\n")
}

devfiles = append(devfiles, df)
}

b.Cleanup(func() {
for _, df := range devfiles {
df.Close()
}
})

var wg sync.WaitGroup

read_size := int64(65536)

offsets := make([]int, b.N)
buffers := make(chan []byte, 100)

for i := 0; i < 100; i++ {
buffer := make([]byte, read_size)
buffers <- buffer
}

for i := 0; i < b.N; i++ {
offsets[i] = (rand.Intn(diskSize-int(read_size)) / 4096) * 4096
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
wg.Add(1)

// Test read speed...
go func(buff []byte, off int) {
dfi := rand.Intn(len(devfiles))
// newbuff := make([]byte, read_size)
n, err := devfiles[dfi].ReadAt(buff, int64(off))
if n != len(buff) || err != nil {
fmt.Printf("Error reading file %d %v\n", n, err)
}

wg.Done()
buffers <- buff
}(<-buffers, offsets[i])
}

b.SetBytes(int64(read_size))

wg.Wait()
})
}
}
}
}

func BenchmarkDevWriteNL(b *testing.B) {
currentUser, err := user.Current()
if err != nil {
Expand All @@ -145,7 +269,7 @@ func BenchmarkDevWriteNL(b *testing.B) {
// store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 0, 100*time.Millisecond, 0)
driver := modules.NewMetrics(store)

n := NewExposedStorageNBDNL(driver, 1, 0, uint64(driver.Size()), 4096)
n := NewExposedStorageNBDNL(driver, 1, 0, uint64(driver.Size()), 4096, true)

go func() {
err := n.Handle()
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/expose/nbdnl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestNBDNLDevice(t *testing.T) {
size := 4096 * 1024 * 1024
prov := sources.NewMemoryStorage(size)

n = NewExposedStorageNBDNL(prov, 8, 0, uint64(size), 4096)
n = NewExposedStorageNBDNL(prov, 8, 0, uint64(size), 4096, true)

err = n.Handle()
assert.NoError(t, err)
Expand Down

0 comments on commit a370b45

Please sign in to comment.