Skip to content

Commit

Permalink
Merge pull request #2 from loopholelabs/james/arch-51-silo-optimizations
Browse files Browse the repository at this point in the history
James/arch 51 silo optimizations
  • Loading branch information
jimmyaxod authored Mar 18, 2024
2 parents c074a77 + 2c499c3 commit 7e39e48
Show file tree
Hide file tree
Showing 63 changed files with 2,359 additions and 1,243 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
name: Tests

on:
push:
branches:
- "*"
on: [pull_request]

jobs:
tests:
Expand Down Expand Up @@ -34,7 +31,10 @@ jobs:
path: ${{ steps.go-cache-paths.outputs.go-mod }}
key: ${{ runner.os }}-go-mod-${{ hashFiles('**/go.sum') }}

- name: Setup nbd
run: sudo modprobe nbd

- name: Test
run: go test -v $(go list ./... | grep -v expose)
run: go test -exec sudo -v ./...
- name: Test with Race Conditions
run: go test -race -v $(go list ./... | grep -v expose)
170 changes: 141 additions & 29 deletions cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"os/exec"
"os/signal"
"sync"
"syscall"
"time"

Expand All @@ -16,14 +17,19 @@ import (
"github.com/loopholelabs/silo/pkg/storage/modules"
"github.com/loopholelabs/silo/pkg/storage/protocol"
"github.com/loopholelabs/silo/pkg/storage/sources"
"github.com/loopholelabs/silo/pkg/storage/waitingcache"
"github.com/spf13/cobra"

"github.com/fatih/color"
"github.com/vbauerster/mpb/v8"
"github.com/vbauerster/mpb/v8/decor"
)

var (
cmdConnect = &cobra.Command{
Use: "connect",
Short: "Start up connect",
Long: ``,
Short: "Conndct and stream Silo devices.",
Long: `Connect to a Silo instance, and stream available devices.`,
Run: runConnect,
}
)
Expand All @@ -40,6 +46,11 @@ var connect_mount_dev bool
// List of ExposedStorage so they can be cleaned up on exit.
var dst_exposed []storage.ExposedStorage

var dst_progress *mpb.Progress
var dst_bars []*mpb.Bar
var dst_wg sync.WaitGroup
var dst_wg_first bool

func init() {
rootCmd.AddCommand(cmdConnect)
cmdConnect.Flags().StringVarP(&connect_addr, "addr", "a", "localhost:5170", "Address to serve from")
Expand All @@ -52,6 +63,14 @@ func init() {
*
*/
func runConnect(ccmd *cobra.Command, args []string) {

dst_progress = mpb.New(
mpb.WithOutput(color.Output),
mpb.WithAutoRefresh(),
)

dst_bars = make([]*mpb.Bar, 0)

fmt.Printf("Starting silo connect from source %s\n", connect_addr)

dst_exposed = make([]storage.ExposedStorage, 0)
Expand All @@ -61,7 +80,6 @@ func runConnect(ccmd *cobra.Command, args []string) {
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c

for _, e := range dst_exposed {
dst_device_shutdown(e)
}
Expand All @@ -76,18 +94,29 @@ func runConnect(ccmd *cobra.Command, args []string) {
}

// Wrap the connection in a protocol, and handle incoming devices
pro := protocol.NewProtocolRW(context.TODO(), con, con, handleIncomingDevice)
dst_wg_first = true
dst_wg.Add(1) // We need to at least wait for one to complete.

pro := protocol.NewProtocolRW(context.TODO(), []io.Reader{con}, []io.Writer{con}, handleIncomingDevice)

// Let the protocol do its thing.
err = pro.Handle()
if err != nil && err != io.EOF {
fmt.Printf("Silo protocol error %v\n", err)
}
go func() {
err = pro.Handle()
if err != nil && err != io.EOF {
fmt.Printf("Silo protocol error %v\n", err)
return
}
// We should get an io.EOF here once the migrations have all completed.
}()

dst_wg.Wait() // Wait until the migrations have completed...

dst_progress.Wait()

fmt.Printf("Migrations completed.\n")
fmt.Printf("\nMigrations completed. Please ctrl-c if you want to shut down, or wait an hour :)\n")

// We should pause here, to allow the user to do things with the devices
time.Sleep(10 * time.Minute)
time.Sleep(10 * time.Hour)

// Shutdown any storage exposed as devices
for _, e := range dst_exposed {
Expand All @@ -98,24 +127,85 @@ func runConnect(ccmd *cobra.Command, args []string) {
// Handle a new incoming device. This is called when a packet is received for a device we haven't heard about before.
func handleIncomingDevice(pro protocol.Protocol, dev uint32) {
var destStorage storage.StorageProvider
var destWaitingLocal *modules.WaitingCacheLocal
var destWaitingRemote *modules.WaitingCacheRemote
var destWaitingLocal *waitingcache.WaitingCacheLocal
var destWaitingRemote *waitingcache.WaitingCacheRemote
var destMonitorStorage *modules.Hooks
var dest *protocol.FromProtocol

var bar *mpb.Bar

var statusString = " "
var statusExposed = " "

if !dst_wg_first {
// We have a new migration to deal with
dst_wg.Add(1)
}
dst_wg_first = false

// This is a storage factory which will be called when we recive DevInfo.
storageFactory := func(di *protocol.DevInfo) storage.StorageProvider {
fmt.Printf("= %d = Received DevInfo name=%s size=%d blocksize=%d\n", dev, di.Name, di.Size, di.BlockSize)
// fmt.Printf("= %d = Received DevInfo name=%s size=%d blocksize=%d\n", dev, di.Name, di.Size, di.BlockSize)

statusFn := func(s decor.Statistics) string {
return statusString
}

bar = dst_progress.AddBar(int64(di.Size),
mpb.PrependDecorators(
decor.Name(di.Name, decor.WCSyncSpaceR),
decor.Name(" "),
decor.Any(func(s decor.Statistics) string { return statusExposed }, decor.WC{W: 4}),
decor.Name(" "),
decor.CountersKiloByte("%d/%d", decor.WCSyncWidth),
),
mpb.AppendDecorators(
decor.EwmaETA(decor.ET_STYLE_GO, 30),
decor.Name(" "),
decor.EwmaSpeed(decor.SizeB1024(0), "% .2f", 60, decor.WCSyncWidth),
decor.OnComplete(decor.Percentage(decor.WC{W: 5}), "done"),
decor.Name(" "),
decor.Any(statusFn, decor.WC{W: 2}),
),
)

dst_bars = append(dst_bars, bar)

// You can change this to use sources.NewFileStorage etc etc
cr := func(s int) storage.StorageProvider {
return sources.NewMemoryStorage(s)
cr := func(i int, s int) (storage.StorageProvider, error) {
return sources.NewMemoryStorage(s), nil
}
// Setup some sharded memory storage (for concurrent write speed)
destStorage = modules.NewShardedStorage(int(di.Size), int(di.Size/1024), cr)
shard_size := di.Size
if di.Size > 64*1024 {
shard_size = di.Size / 1024
}
var err error
destStorage, err = modules.NewShardedStorage(int(di.Size), int(shard_size), cr)
if err != nil {
panic(err) // FIXME
}

destMonitorStorage = modules.NewHooks(destStorage)

last_value := uint64(0)
last_time := time.Now()

destMonitorStorage.Post_write = func(buffer []byte, offset int64, n int, err error) (int, error) {
// Update the progress bar
available, total := destWaitingLocal.Availability()
v := uint64(available) * di.Size / uint64(total)
bar.SetCurrent(int64(v))
bar.EwmaIncrInt64(int64(v-last_value), time.Since(last_time))
last_time = time.Now()
last_value = v

return n, err
}

// Use a WaitingCache which will wait for migration blocks, send priorities etc
// A WaitingCache has two ends - local and remote.
destWaitingLocal, destWaitingRemote = modules.NewWaitingCache(destStorage, int(di.BlockSize))
destWaitingLocal, destWaitingRemote = waitingcache.NewWaitingCache(destMonitorStorage, int(di.BlockSize))

// Connect the waitingCache to the FromProtocol.
// Note that since these are hints, errors don't matter too much.
Expand All @@ -132,8 +222,10 @@ func handleIncomingDevice(pro protocol.Protocol, dev uint32) {
p, err := dst_device_setup(destWaitingLocal)
if err != nil {
fmt.Printf("= %d = Error during setup (expose nbd) %v\n", dev, err)
} else {
statusExposed = p.Device()
dst_exposed = append(dst_exposed, p)
}
dst_exposed = append(dst_exposed, p)
}
return destWaitingRemote
}
Expand All @@ -148,33 +240,53 @@ func handleIncomingDevice(pro protocol.Protocol, dev uint32) {

// Handle events from the source
go dest.HandleEvent(func(e protocol.EventType) {
fmt.Printf("= %d = Event %s\n", dev, protocol.EventsByType[e])
if e == protocol.EventPostLock {
statusString = "L" //red.Sprintf("L")
} else if e == protocol.EventPreLock {
statusString = "l" //red.Sprintf("l")
} else if e == protocol.EventPostUnlock {
statusString = "U" //green.Sprintf("U")
} else if e == protocol.EventPreUnlock {
statusString = "u" //green.Sprintf("u")
}
// fmt.Printf("= %d = Event %s\n", dev, protocol.EventsByType[e])
// Check we have all data...
if e == protocol.EventCompleted {
available, total := destWaitingLocal.Availability()
fmt.Printf("= %d = Availability (%d/%d)\n", dev, available, total)
// We completed the migration...
dst_wg.Done()
// available, total := destWaitingLocal.Availability()
// fmt.Printf("= %d = Availability (%d/%d)\n", dev, available, total)
// Set bar to completed
bar.SetCurrent(int64(destWaitingLocal.Size()))
}
})

// Handle dirty list by invalidating local waiting cache
go dest.HandleDirtyList(func(dirty []uint) {
fmt.Printf("= %d = LIST OF DIRTY BLOCKS %v\n", dev, dirty)
// fmt.Printf("= %d = LIST OF DIRTY BLOCKS %v\n", dev, dirty)
destWaitingLocal.DirtyBlocks(dirty)
})
}

// Called to setup an exposed storage device
func dst_device_setup(prov storage.StorageProvider) (storage.ExposedStorage, error) {
p := expose.NewExposedStorageNBDNL(prov, 1, 0, prov.Size(), 4096, true)
var err error

err := p.Init()
if err != nil {
fmt.Printf("p.Init returned %v\n", err)
return nil, err
for {
// Try it a few times... FIXME
err = p.Init()
if err != nil {
fmt.Printf("\n\n\np.Init returned %v\n\n\n", err)
//return nil, err
} else {
break
}
time.Sleep(100 * time.Millisecond)
}

device := p.Device()
fmt.Printf("* Device ready on /dev/%s\n", device)
// fmt.Printf("* Device ready on /dev/%s\n", device)

// We could also mount the device, but we should do so inside a goroutine, so that it doesn't block things...
if connect_mount_dev {
Expand All @@ -184,14 +296,14 @@ func dst_device_setup(prov storage.StorageProvider) (storage.ExposedStorage, err
}

go func() {
fmt.Printf("Mounting device...")
// fmt.Printf("Mounting device...")
cmd := exec.Command("mount", "-r", fmt.Sprintf("/dev/%s", device), fmt.Sprintf("/mnt/mount%s", device))
err = cmd.Run()
if err != nil {
fmt.Printf("Could not mount device %v\n", err)
return
}
fmt.Printf("* Device is mounted at /mnt/mount%s\n", device)
// fmt.Printf("* Device is mounted at /mnt/mount%s\n", device)
}()
}

Expand Down
Loading

0 comments on commit 7e39e48

Please sign in to comment.