Skip to content

Commit

Permalink
New newdev notification on protocol handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmyaxod committed Feb 29, 2024
1 parent a370b45 commit 972b71f
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 115 deletions.
108 changes: 46 additions & 62 deletions cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"fmt"
"math/rand"
"net"
"net/http"
"os"
Expand All @@ -29,35 +28,35 @@ var (
)

var connect_addr string
var connect_dev string
var connect_expose_dev bool

func init() {
rootCmd.AddCommand(cmdConnect)
cmdConnect.Flags().StringVarP(&connect_addr, "addr", "a", "localhost:5170", "Address to serve from")
cmdConnect.Flags().StringVarP(&connect_dev, "dev", "d", "", "Device eg nbd1")
cmdConnect.Flags().BoolVarP(&connect_expose_dev, "expose", "e", false, "Expose as an nbd devices")
}

func runConnect(ccmd *cobra.Command, args []string) {
fmt.Printf("Starting silo connect %s at %s\n", connect_dev, connect_addr)
fmt.Printf("Starting silo connect from %s\n", connect_addr)

// Setup some statistics output
http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":4114", nil)

// Size of migration blocks
block_size := 1024 * 64

var p storage.ExposedStorage

var destStorage storage.StorageProvider
var destWaitingLocal *modules.WaitingCacheLocal
var destWaitingRemote *modules.WaitingCacheRemote
var destStorageMetrics *modules.Metrics

// Connect to the source
con, err := net.Dial("tcp", connect_addr)
if err != nil {
panic("Error connecting")
}

var p storage.ExposedStorage
var destStorage storage.StorageProvider
var destWaitingLocal *modules.WaitingCacheLocal
var destWaitingRemote *modules.WaitingCacheRemote
var destStorageMetrics *modules.Metrics
var dest *modules.FromProtocol

destWaitingRemoteFactory := func(di *protocol.DevInfo) storage.StorageProvider {
Expand All @@ -71,25 +70,6 @@ func runConnect(ccmd *cobra.Command, args []string) {
destWaitingLocal, destWaitingRemote = modules.NewWaitingCache(destStorage, block_size)
destStorageMetrics = modules.NewMetrics(destWaitingLocal)

// Set something up to randomly read...
go func() {
for {

o := rand.Intn(int(di.Size))

b := make([]byte, 1)

// rtime := time.Now()
destStorageMetrics.ReadAt(b, int64(o))

// fmt.Printf("DATA READ %d %d %v %dms\n", o, n, err, time.Since(rtime).Milliseconds())

w := rand.Intn(10)

time.Sleep(time.Duration(w) * time.Millisecond)
}
}()

// Connect the waitingCache to the FromProtocol
destWaitingLocal.NeedAt = func(offset int64, length int32) {
dest.NeedAt(offset, length)
Expand All @@ -98,18 +78,30 @@ func runConnect(ccmd *cobra.Command, args []string) {
destWaitingLocal.DontNeedAt = func(offset int64, length int32) {
dest.DontNeedAt(offset, length)
}

// Expose it if we should...
if connect_expose_dev {
p, err = setup(destWaitingLocal, false)
if err != nil {
fmt.Printf("Error during setup (expose nbd) %v\n", err)
}
}

fmt.Printf("Returning destWaitingRemote...\n")
return destWaitingRemote
}

pro := protocol.NewProtocolRW(context.TODO(), con, con)
pro := protocol.NewProtocolRW(context.TODO(), con, con, func(dev uint32) {
fmt.Printf("NEW DEVICE %d\n", dev)
})

// TODO: Need to allow for DevInfo on different IDs better here...
dest = modules.NewFromProtocol(777, destWaitingRemoteFactory, pro)

go func() {
err := pro.Handle()
fmt.Printf("PROTOCOL ERROR %v\n", err)

// If it's EOF then the migration is completed

}()

go dest.HandleSend(context.TODO())
Expand All @@ -122,54 +114,46 @@ func runConnect(ccmd *cobra.Command, args []string) {

go dest.HandleDirtyList(func(dirty []uint) {
fmt.Printf("GOT LIST OF DIRTY BLOCKS %v\n", dirty)
destWaitingLocal.DirtyBlocks(dirty)
})

c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c

if connect_dev != "" {
if connect_expose_dev {
fmt.Printf("\nShutting down cleanly...\n")
shutdown(connect_dev, p)
shutdown(p)
}
destStorageMetrics.ShowStats("Source")
os.Exit(1)
}()

if connect_dev != "" {
p, err = setup(connect_dev, destStorageMetrics, false)
if err != nil {
fmt.Printf("Error during setup %v\n", err)
return
}
fmt.Printf("Ready on %s...\n", connect_dev)
}

ticker := time.NewTicker(time.Second)

for {
select {
case <-ticker.C:
// Show some stats...
destStorageMetrics.ShowStats("Dest")

s := destStorageMetrics.Snapshot()
prom_read_ops.Set(float64(s.Read_ops))
prom_read_bytes.Set(float64(s.Read_bytes))
prom_read_time.Set(float64(s.Read_time))
prom_read_errors.Set(float64(s.Read_errors))

prom_write_ops.Set(float64(s.Write_ops))
prom_write_bytes.Set(float64(s.Write_bytes))
prom_write_time.Set(float64(s.Write_time))
prom_write_errors.Set(float64(s.Write_errors))

prom_flush_ops.Set(float64(s.Flush_ops))
prom_flush_time.Set(float64(s.Flush_time))
prom_flush_errors.Set(float64(s.Flush_errors))

if destStorageMetrics != nil {
destStorageMetrics.ShowStats("Dest")

s := destStorageMetrics.Snapshot()
prom_read_ops.Set(float64(s.Read_ops))
prom_read_bytes.Set(float64(s.Read_bytes))
prom_read_time.Set(float64(s.Read_time))
prom_read_errors.Set(float64(s.Read_errors))

prom_write_ops.Set(float64(s.Write_ops))
prom_write_bytes.Set(float64(s.Write_bytes))
prom_write_time.Set(float64(s.Write_time))
prom_write_errors.Set(float64(s.Write_errors))

prom_flush_ops.Set(float64(s.Flush_ops))
prom_flush_time.Set(float64(s.Flush_time))
prom_flush_errors.Set(float64(s.Flush_errors))
}
}
}

}
58 changes: 33 additions & 25 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ var (
)

var serve_addr string
var serve_dev string
var serve_expose_dev bool
var serve_size int

func init() {
rootCmd.AddCommand(cmdServe)
cmdServe.Flags().StringVarP(&serve_addr, "addr", "a", ":5170", "Address to serve from")
cmdServe.Flags().StringVarP(&serve_dev, "dev", "d", "", "Device eg nbd1")
cmdServe.Flags().BoolVarP(&serve_expose_dev, "expose", "e", false, "Expose as nbd dev")
cmdServe.Flags().IntVarP(&serve_size, "size", "s", 1024*1024*1024, "Size")
}

Expand All @@ -63,7 +63,7 @@ var (
)

func runServe(ccmd *cobra.Command, args []string) {
fmt.Printf("Starting silo serve %s at %s size %d\n", serve_dev, serve_addr, serve_size)
fmt.Printf("Starting silo serve %s size %d\n", serve_addr, serve_size)

// Setup some statistics output
http.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -101,17 +101,17 @@ func runServe(ccmd *cobra.Command, args []string) {
go func() {
<-c

if serve_dev != "" {
if serve_expose_dev {
fmt.Printf("\nShutting down cleanly...\n")
shutdown(serve_dev, p)
shutdown(p)
}
sourceMetrics.ShowStats("Source")
os.Exit(1)
}()

if serve_dev != "" {
if serve_expose_dev {
var err error
p, err = setup(serve_dev, sourceStorage, true)
p, err = setup(sourceStorage, true)
if err != nil {
fmt.Printf("Error during setup %v\n", err)
return
Expand All @@ -123,9 +123,9 @@ func runServe(ccmd *cobra.Command, args []string) {

l, err := net.Listen("tcp", serve_addr)
if err != nil {
if serve_dev != "" {
if serve_expose_dev {
fmt.Printf("\nShutting down cleanly...\n")
shutdown(serve_dev, p)
shutdown(p)
}
panic("Listener issue...")
}
Expand All @@ -137,7 +137,7 @@ func runServe(ccmd *cobra.Command, args []string) {
fmt.Printf("GOT CONNECTION\n")
// Now we can migrate to the client...

pro := protocol.NewProtocolRW(context.TODO(), c, c)
pro := protocol.NewProtocolRW(context.TODO(), c, c, nil)
dest := modules.NewToProtocol(uint64(serve_size), 777, pro)

dest.SendDevInfo()
Expand Down Expand Up @@ -267,19 +267,20 @@ func runServe(ccmd *cobra.Command, args []string) {
* Setup a disk
*
*/
func setup(device string, prov storage.StorageProvider, server bool) (storage.ExposedStorage, error) {
p := expose.NewExposedStorageNBD(prov, device, 1, 0, prov.Size(), 4096, 0)
func setup(prov storage.StorageProvider, server bool) (storage.ExposedStorage, error) {
p := expose.NewExposedStorageNBDNL(prov, 1, 0, prov.Size(), 4096, true)

go func() {
err := p.Handle()
if err != nil {
fmt.Printf("p.Handle returned %v\n", err)
}
}()
err := p.Handle()
if err != nil {
fmt.Printf("p.Handle returned %v\n", err)
}

p.WaitReady()

err := os.Mkdir(fmt.Sprintf("/mnt/mount%s", device), 0600)
device := p.Device()
fmt.Printf("* Device ready on /dev/%s\n", device)

err = os.Mkdir(fmt.Sprintf("/mnt/mount%s", device), 0600)
if err != nil {
return nil, fmt.Errorf("Error mkdir %v", err)
}
Expand All @@ -297,17 +298,24 @@ func setup(device string, prov storage.StorageProvider, server bool) (storage.Ex
return nil, fmt.Errorf("Error mount %v", err)
}
} else {
cmd := exec.Command("mount", "-r", fmt.Sprintf("/dev/%s", device), fmt.Sprintf("/mnt/mount%s", device))
err = cmd.Run()
if err != nil {
return nil, fmt.Errorf("Error mount %v", err)
}
go func() {
fmt.Printf("Mounting device...")
cmd := exec.Command("mount", "-r", fmt.Sprintf("/dev/%s", device), fmt.Sprintf("/mnt/mount%s", device))
err = cmd.Run()
if err != nil {
fmt.Printf("Could not mount device %v\n", err)
return
}
fmt.Printf("* Device is mounted at /mnt/mount%s\n", device)
}()
}

return p, nil
}

func shutdown(device string, p storage.ExposedStorage) error {
func shutdown(p storage.ExposedStorage) error {
device := p.Device()

fmt.Printf("shutdown %s\n", device)
cmd := exec.Command("umount", fmt.Sprintf("/dev/%s", device))
err := cmd.Run()
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/expose/nbd.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ func (n *ExposedStorageNBD) WaitReady() error {
return nil
}

func (n *ExposedStorageNBD) Device() string {
return n.dev
}

func (n *ExposedStorageNBD) Shutdown() error {

/*
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/expose/nbdnl.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package expose

import (
"fmt"
"io"
"net"
"os"
Expand Down Expand Up @@ -41,6 +42,10 @@ func NewExposedStorageNBDNL(prov storage.StorageProvider, num_connections int, t
}
}

func (n *ExposedStorageNBDNL) Device() string {
return fmt.Sprintf("nbd%d", n.DevIndex)
}

func (n *ExposedStorageNBDNL) Handle() error {

socks := make([]*os.File, 0)
Expand Down Expand Up @@ -88,7 +93,7 @@ func (n *ExposedStorageNBDNL) Handle() error {
return nil
}

// Wait until it's connected...
// Wait until it's connected... (Handle must have been called already)
func (n *ExposedStorageNBDNL) WaitReady() error {
for {
s, err := nbdnl.Status(uint32(n.DevIndex))
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/migrator/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func TestMigratorSimplePipe(t *testing.T) {
r1, w1 := io.Pipe()
r2, w2 := io.Pipe()

prSource := protocol.NewProtocolRW(context.TODO(), r1, w2)
prDest := protocol.NewProtocolRW(context.TODO(), r2, w1)
prSource := protocol.NewProtocolRW(context.TODO(), r1, w2, nil)
prDest := protocol.NewProtocolRW(context.TODO(), r2, w1, nil)

go prSource.Handle()
go prDest.Handle()
Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/modules/waiting_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ func (wcl *WaitingCacheLocal) Size() uint64 {
return wcl.wc.localSize()
}

func (wcl *WaitingCacheLocal) DirtyBlocks(blocks []uint) {
for _, v := range blocks {
wcl.wc.haveNotBlock(v)
}
}

type WaitingCacheRemote struct {
wc *WaitingCache
}
Expand Down Expand Up @@ -207,12 +213,6 @@ func (i *WaitingCache) remoteWriteAt(buffer []byte, offset int64) (int, error) {
return n, err
}

func (i *WaitingCache) DirtyBlocks(blocks []uint) {
for _, v := range blocks {
i.haveNotBlock(v)
}
}

func (i *WaitingCache) localFlush() error {
return i.prov.Flush()
}
Expand Down
Loading

0 comments on commit 972b71f

Please sign in to comment.