Skip to content

Commit

Permalink
nbd reorg/benchmarking start
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmyaxod committed Feb 9, 2024
1 parent f29ce29 commit 6df8425
Show file tree
Hide file tree
Showing 11 changed files with 1,261 additions and 707 deletions.
4 changes: 1 addition & 3 deletions cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"syscall"
"time"

"github.com/loopholelabs/silo/internal/expose"
"github.com/loopholelabs/silo/pkg/storage"
"github.com/loopholelabs/silo/pkg/storage/modules"
"github.com/loopholelabs/silo/pkg/storage/protocol"
Expand Down Expand Up @@ -122,8 +121,7 @@ func runConnect(ccmd *cobra.Command, args []string) {
}()

if connect_dev != "" {
d := expose.NewDispatch()
p, err = setup(connect_dev, d, destStorageMetrics, false)
p, err = setup(connect_dev, destStorageMetrics, false)
if err != nil {
fmt.Printf("Error during setup %v\n", err)
return
Expand Down
7 changes: 3 additions & 4 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ func runServe(ccmd *cobra.Command, args []string) {

if serve_dev != "" {
var err error
d := expose.NewDispatch()
p, err = setup(serve_dev, d, sourceStorage, true)
p, err = setup(serve_dev, sourceStorage, true)
if err != nil {
fmt.Printf("Error during setup %v\n", err)
return
Expand Down Expand Up @@ -234,8 +233,8 @@ func runServe(ccmd *cobra.Command, args []string) {
* Setup a disk
*
*/
func setup(device string, dispatch expose.NBDDispatcher, prov storage.StorageProvider, server bool) (storage.ExposedStorage, error) {
p, err := expose.NewNBD(dispatch, fmt.Sprintf("/dev/%s", device))
func setup(device string, prov storage.StorageProvider, server bool) (storage.ExposedStorage, error) {
p, err := expose.NewNBD(fmt.Sprintf("/dev/%s", device))
if err != nil {
return nil, err
}
Expand Down
258 changes: 244 additions & 14 deletions internal/expose/nbd.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package expose

import (
"encoding/binary"
"fmt"
"os"
"sync"
"syscall"

"github.com/loopholelabs/silo/pkg/storage"
Expand All @@ -25,12 +27,14 @@ const NBD_DISCONNECT = 8 | NBD_COMMAND
const NBD_SET_TIMEOUT = 9 | NBD_COMMAND
const NBD_SET_FLAGS = 10 | NBD_COMMAND

// NBD Commands
const NBD_CMD_READ = 0
const NBD_CMD_WRITE = 1
const NBD_CMD_DISCONNECT = 2
const NBD_CMD_FLUSH = 3
const NBD_CMD_TRIM = 4

// NBD Flags
const NBD_FLAG_HAS_FLAGS = (1 << 0)
const NBD_FLAG_READ_ONLY = (1 << 1)
const NBD_FLAG_SEND_FLUSH = (1 << 2)
Expand Down Expand Up @@ -66,22 +70,14 @@ type NBDExposedStorage struct {
fp uintptr
socketPair [2]int
readyChannel chan error
dispatcher NBDDispatcher
}

// Want to swap in and out different versions here so we can benchmark
type NBDDispatcher interface {
Handle(fd int, prov storage.StorageProvider) error
Name() string
Wait()
dispatcher *Dispatch
}

/**
* Handle storage requests using the provider
*
*/
func (s *NBDExposedStorage) Handle(prov storage.StorageProvider) error {
// Handle incoming requests...
go func() {
err := s.dispatcher.Handle(s.socketPair[0], prov)
if err != nil {
Expand All @@ -102,7 +98,7 @@ func (s *NBDExposedStorage) Handle(prov storage.StorageProvider) error {

// Issue ioctl calls to set it up
calls := []IoctlCall{
// {NBD_SET_BLKSIZE, 4096},
{NBD_SET_BLKSIZE, 4096},
{NBD_SET_TIMEOUT, 0},
{NBD_PRINT_DEBUG, 1},
{NBD_SET_SIZE, uintptr(prov.Size())},
Expand Down Expand Up @@ -136,7 +132,7 @@ func (s *NBDExposedStorage) Handle(prov storage.StorageProvider) error {
*/
func (s *NBDExposedStorage) WaitReady() error {
// fmt.Printf("WaitReady\n")
// Wait for a ready signal
// Wait for a ready signal (The ioctl DO_IT has just about been sent)
return <-s.readyChannel
}

Expand All @@ -148,7 +144,6 @@ func (s *NBDExposedStorage) Shutdown() error {
s.dispatcher.Wait()

calls := []IoctlCall{
{NBD_CLEAR_QUE, 0},
{NBD_DISCONNECT, 0},
{NBD_CLEAR_SOCK, 0},
}
Expand All @@ -175,7 +170,7 @@ func (s *NBDExposedStorage) Shutdown() error {
* Create a new NBD device
*
*/
func NewNBD(d NBDDispatcher, dev string) (storage.ExposedStorage, error) {
func NewNBD(dev string) (storage.ExposedStorage, error) {
// Create a pair of sockets to communicate with the NBD device over
// fmt.Printf("Create socketpair\n")
sockPair, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0)
Expand All @@ -196,8 +191,243 @@ func NewNBD(d NBDDispatcher, dev string) (storage.ExposedStorage, error) {
device: fp,
fp: fp.Fd(),
readyChannel: make(chan error),
dispatcher: d,
dispatcher: NewDispatch(),
}

return es, nil
}

type Dispatch struct {
ASYNC_READS bool
ASYNC_WRITES bool
fp *os.File
responseHeader []byte
writeLock sync.Mutex
prov storage.StorageProvider
fatal chan error
pendingResponses sync.WaitGroup
}

func NewDispatch() *Dispatch {
d := &Dispatch{
ASYNC_WRITES: true,
ASYNC_READS: true,
responseHeader: make([]byte, 16),
fatal: make(chan error, 8),
}
binary.BigEndian.PutUint32(d.responseHeader, NBD_RESPONSE_MAGIC)
return d
}

func (d *Dispatch) Wait() {
// Wait for any pending responses
d.pendingResponses.Wait()
}

/**
* Write a response...
*
*/
func (d *Dispatch) writeResponse(respError uint32, respHandle uint64, chunk []byte) error {
d.writeLock.Lock()
defer d.writeLock.Unlock()

// fmt.Printf("WriteResponse %x -> %d\n", respHandle, len(chunk))

binary.BigEndian.PutUint32(d.responseHeader[4:], respError)
binary.BigEndian.PutUint64(d.responseHeader[8:], respHandle)

_, err := d.fp.Write(d.responseHeader)
if err != nil {
return err
}
if len(chunk) > 0 {
_, err = d.fp.Write(chunk)
if err != nil {
return err
}
}
return nil
}

/**
* This dispatches incoming NBD requests sequentially to the provider.
*
*/
func (d *Dispatch) Handle(fd int, prov storage.StorageProvider) error {
d.prov = prov
d.fp = os.NewFile(uintptr(fd), "unix")

// Speed read and dispatch...

BUFFER_SIZE := 4 * 1024 * 1024
buffer := make([]byte, BUFFER_SIZE)
wp := 0

request := Request{}

for {
// fmt.Printf("Calling read...\n")
n, err := d.fp.Read(buffer[wp:])
if err != nil {
fmt.Printf("Error %v\n", err)
return err
}
wp += n

// fmt.Printf("Read %d\n", n)

// Now go through processing complete packets
rp := 0
for {
// fmt.Printf("Processing data %d %d\n", rp, wp)
// Make sure we have a complete header
if wp-rp >= 28 {
// We can read the neader...

header := buffer[rp : rp+28]
request.Magic = binary.BigEndian.Uint32(header)
request.Type = binary.BigEndian.Uint32(header[4:8])
request.Handle = binary.BigEndian.Uint64(header[8:16])
request.From = binary.BigEndian.Uint64(header[16:24])
request.Length = binary.BigEndian.Uint32(header[24:28])

if request.Magic != NBD_REQUEST_MAGIC {
return fmt.Errorf("Received invalid MAGIC")
}

if request.Type == NBD_CMD_DISCONNECT {
// fmt.Printf("CMD_DISCONNECT")
return nil // All done
} else if request.Type == NBD_CMD_FLUSH {
return fmt.Errorf("Not supported: Flush")
} else if request.Type == NBD_CMD_READ {
// fmt.Printf("READ %x %d\n", request.Handle, request.Length)
rp += 28
err := d.cmdRead(request.Handle, request.From, request.Length)
if err != nil {
return err
}
} else if request.Type == NBD_CMD_WRITE {
rp += 28
if wp-rp < int(request.Length) {
rp -= 28
break // We don't have enough data yet... Wait for next read
}
data := make([]byte, request.Length)
copy(data, buffer[rp:rp+int(request.Length)])
rp += int(request.Length)
// fmt.Printf("WRITE %x %d\n", request.Handle, request.Length)
err := d.cmdWrite(request.Handle, request.From, request.Length, data)
if err != nil {
return err
}
} else if request.Type == NBD_CMD_TRIM {
// fmt.Printf("TRIM\n")
rp += 28
err = d.cmdTrim(request.Handle, request.From, request.Length)
if err != nil {
return err
}
} else {
return fmt.Errorf("NBD Not implemented %d\n", request.Type)
}

} else {
break // Try again when we have more data...
}
}
// Now we need to move any partial to the start
if rp != 0 && rp != wp {
// fmt.Printf("Copy partial %d %d\n", rp, wp)

copy(buffer, buffer[rp:wp])
}
wp -= rp

}
}

/**
* cmdRead
*
*/
func (d *Dispatch) cmdRead(cmd_handle uint64, cmd_from uint64, cmd_length uint32) error {

performRead := func(handle uint64, from uint64, length uint32) error {
data := make([]byte, length)
_, e := d.prov.ReadAt(data, int64(from))
errorValue := uint32(0)
if e != nil {
errorValue = 1
data = make([]byte, 0) // If there was an error, don't send data
}
return d.writeResponse(errorValue, handle, data)
}

if d.ASYNC_READS {
d.pendingResponses.Add(1)
go func() {
err := performRead(cmd_handle, cmd_from, cmd_length)
if err != nil {
d.fatal <- err
}
d.pendingResponses.Done()
}()
} else {
return performRead(cmd_handle, cmd_from, cmd_length)
}
return nil
}

/**
* cmdWrite
*
*/
func (d *Dispatch) cmdWrite(cmd_handle uint64, cmd_from uint64, cmd_length uint32, cmd_data []byte) error {
performWrite := func(handle uint64, from uint64, length uint32, data []byte) error {
_, e := d.prov.WriteAt(data, int64(from))
errorValue := uint32(0)
if e != nil {
errorValue = 1
}
return d.writeResponse(errorValue, handle, []byte{})
}

if d.ASYNC_WRITES {
d.pendingResponses.Add(1)
go func() {
err := performWrite(cmd_handle, cmd_from, cmd_length, cmd_data)
if err != nil {
d.fatal <- err
}
d.pendingResponses.Done()
}()
} else {
return performWrite(cmd_handle, cmd_from, cmd_length, cmd_data)
}
return nil
}

/**
* cmdTrim
*
*/
func (d *Dispatch) cmdTrim(handle uint64, from uint64, length uint32) error {
// Ask the provider
/*
e := d.prov.Trim(from, length)
if e != storage.StorageError_SUCCESS {
err := d.writeResponse(1, handle, []byte{})
if err != nil {
return err
}
} else {
*/
err := d.writeResponse(0, handle, []byte{})
if err != nil {
return err
}
// }
return nil
}
Loading

0 comments on commit 6df8425

Please sign in to comment.