diff --git a/cmd/connect.go b/cmd/connect.go index d007a46c..6404e67c 100644 --- a/cmd/connect.go +++ b/cmd/connect.go @@ -3,7 +3,6 @@ package main import ( "context" "fmt" - "math/rand" "net" "net/http" "os" @@ -29,35 +28,35 @@ var ( ) var connect_addr string -var connect_dev string +var connect_expose_dev bool func init() { rootCmd.AddCommand(cmdConnect) cmdConnect.Flags().StringVarP(&connect_addr, "addr", "a", "localhost:5170", "Address to serve from") - cmdConnect.Flags().StringVarP(&connect_dev, "dev", "d", "", "Device eg nbd1") + cmdConnect.Flags().BoolVarP(&connect_expose_dev, "expose", "e", false, "Expose as an nbd devices") } func runConnect(ccmd *cobra.Command, args []string) { - fmt.Printf("Starting silo connect %s at %s\n", connect_dev, connect_addr) + fmt.Printf("Starting silo connect from %s\n", connect_addr) // Setup some statistics output http.Handle("/metrics", promhttp.Handler()) go http.ListenAndServe(":4114", nil) + // Size of migration blocks block_size := 1024 * 64 - var p storage.ExposedStorage - - var destStorage storage.StorageProvider - var destWaitingLocal *modules.WaitingCacheLocal - var destWaitingRemote *modules.WaitingCacheRemote - var destStorageMetrics *modules.Metrics - + // Connect to the source con, err := net.Dial("tcp", connect_addr) if err != nil { panic("Error connecting") } + var p storage.ExposedStorage + var destStorage storage.StorageProvider + var destWaitingLocal *modules.WaitingCacheLocal + var destWaitingRemote *modules.WaitingCacheRemote + var destStorageMetrics *modules.Metrics var dest *modules.FromProtocol destWaitingRemoteFactory := func(di *protocol.DevInfo) storage.StorageProvider { @@ -71,25 +70,6 @@ func runConnect(ccmd *cobra.Command, args []string) { destWaitingLocal, destWaitingRemote = modules.NewWaitingCache(destStorage, block_size) destStorageMetrics = modules.NewMetrics(destWaitingLocal) - // Set something up to randomly read... - go func() { - for { - - o := rand.Intn(int(di.Size)) - - b := make([]byte, 1) - - // rtime := time.Now() - destStorageMetrics.ReadAt(b, int64(o)) - - // fmt.Printf("DATA READ %d %d %v %dms\n", o, n, err, time.Since(rtime).Milliseconds()) - - w := rand.Intn(10) - - time.Sleep(time.Duration(w) * time.Millisecond) - } - }() - // Connect the waitingCache to the FromProtocol destWaitingLocal.NeedAt = func(offset int64, length int32) { dest.NeedAt(offset, length) @@ -98,18 +78,30 @@ func runConnect(ccmd *cobra.Command, args []string) { destWaitingLocal.DontNeedAt = func(offset int64, length int32) { dest.DontNeedAt(offset, length) } + + // Expose it if we should... + if connect_expose_dev { + p, err = setup(destWaitingLocal, false) + if err != nil { + fmt.Printf("Error during setup (expose nbd) %v\n", err) + } + } + + fmt.Printf("Returning destWaitingRemote...\n") return destWaitingRemote } - pro := protocol.NewProtocolRW(context.TODO(), con, con) + pro := protocol.NewProtocolRW(context.TODO(), con, con, func(dev uint32) { + fmt.Printf("NEW DEVICE %d\n", dev) + }) + + // TODO: Need to allow for DevInfo on different IDs better here... dest = modules.NewFromProtocol(777, destWaitingRemoteFactory, pro) go func() { err := pro.Handle() fmt.Printf("PROTOCOL ERROR %v\n", err) - // If it's EOF then the migration is completed - }() go dest.HandleSend(context.TODO()) @@ -122,6 +114,7 @@ func runConnect(ccmd *cobra.Command, args []string) { go dest.HandleDirtyList(func(dirty []uint) { fmt.Printf("GOT LIST OF DIRTY BLOCKS %v\n", dirty) + destWaitingLocal.DirtyBlocks(dirty) }) c := make(chan os.Signal) @@ -129,47 +122,38 @@ func runConnect(ccmd *cobra.Command, args []string) { go func() { <-c - if connect_dev != "" { + if connect_expose_dev { fmt.Printf("\nShutting down cleanly...\n") - shutdown(connect_dev, p) + shutdown(p) } destStorageMetrics.ShowStats("Source") os.Exit(1) }() - if connect_dev != "" { - p, err = setup(connect_dev, destStorageMetrics, false) - if err != nil { - fmt.Printf("Error during setup %v\n", err) - return - } - fmt.Printf("Ready on %s...\n", connect_dev) - } - ticker := time.NewTicker(time.Second) for { select { case <-ticker.C: // Show some stats... - destStorageMetrics.ShowStats("Dest") - - s := destStorageMetrics.Snapshot() - prom_read_ops.Set(float64(s.Read_ops)) - prom_read_bytes.Set(float64(s.Read_bytes)) - prom_read_time.Set(float64(s.Read_time)) - prom_read_errors.Set(float64(s.Read_errors)) - - prom_write_ops.Set(float64(s.Write_ops)) - prom_write_bytes.Set(float64(s.Write_bytes)) - prom_write_time.Set(float64(s.Write_time)) - prom_write_errors.Set(float64(s.Write_errors)) - - prom_flush_ops.Set(float64(s.Flush_ops)) - prom_flush_time.Set(float64(s.Flush_time)) - prom_flush_errors.Set(float64(s.Flush_errors)) - + if destStorageMetrics != nil { + destStorageMetrics.ShowStats("Dest") + + s := destStorageMetrics.Snapshot() + prom_read_ops.Set(float64(s.Read_ops)) + prom_read_bytes.Set(float64(s.Read_bytes)) + prom_read_time.Set(float64(s.Read_time)) + prom_read_errors.Set(float64(s.Read_errors)) + + prom_write_ops.Set(float64(s.Write_ops)) + prom_write_bytes.Set(float64(s.Write_bytes)) + prom_write_time.Set(float64(s.Write_time)) + prom_write_errors.Set(float64(s.Write_errors)) + + prom_flush_ops.Set(float64(s.Flush_ops)) + prom_flush_time.Set(float64(s.Flush_time)) + prom_flush_errors.Set(float64(s.Flush_errors)) + } } } - } diff --git a/cmd/serve.go b/cmd/serve.go index 0d458602..195afad3 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -36,13 +36,13 @@ var ( ) var serve_addr string -var serve_dev string +var serve_expose_dev bool var serve_size int func init() { rootCmd.AddCommand(cmdServe) cmdServe.Flags().StringVarP(&serve_addr, "addr", "a", ":5170", "Address to serve from") - cmdServe.Flags().StringVarP(&serve_dev, "dev", "d", "", "Device eg nbd1") + cmdServe.Flags().BoolVarP(&serve_expose_dev, "expose", "e", false, "Expose as nbd dev") cmdServe.Flags().IntVarP(&serve_size, "size", "s", 1024*1024*1024, "Size") } @@ -63,7 +63,7 @@ var ( ) func runServe(ccmd *cobra.Command, args []string) { - fmt.Printf("Starting silo serve %s at %s size %d\n", serve_dev, serve_addr, serve_size) + fmt.Printf("Starting silo serve %s size %d\n", serve_addr, serve_size) // Setup some statistics output http.Handle("/metrics", promhttp.Handler()) @@ -101,17 +101,17 @@ func runServe(ccmd *cobra.Command, args []string) { go func() { <-c - if serve_dev != "" { + if serve_expose_dev { fmt.Printf("\nShutting down cleanly...\n") - shutdown(serve_dev, p) + shutdown(p) } sourceMetrics.ShowStats("Source") os.Exit(1) }() - if serve_dev != "" { + if serve_expose_dev { var err error - p, err = setup(serve_dev, sourceStorage, true) + p, err = setup(sourceStorage, true) if err != nil { fmt.Printf("Error during setup %v\n", err) return @@ -123,9 +123,9 @@ func runServe(ccmd *cobra.Command, args []string) { l, err := net.Listen("tcp", serve_addr) if err != nil { - if serve_dev != "" { + if serve_expose_dev { fmt.Printf("\nShutting down cleanly...\n") - shutdown(serve_dev, p) + shutdown(p) } panic("Listener issue...") } @@ -137,7 +137,7 @@ func runServe(ccmd *cobra.Command, args []string) { fmt.Printf("GOT CONNECTION\n") // Now we can migrate to the client... - pro := protocol.NewProtocolRW(context.TODO(), c, c) + pro := protocol.NewProtocolRW(context.TODO(), c, c, nil) dest := modules.NewToProtocol(uint64(serve_size), 777, pro) dest.SendDevInfo() @@ -267,19 +267,20 @@ func runServe(ccmd *cobra.Command, args []string) { * Setup a disk * */ -func setup(device string, prov storage.StorageProvider, server bool) (storage.ExposedStorage, error) { - p := expose.NewExposedStorageNBD(prov, device, 1, 0, prov.Size(), 4096, 0) +func setup(prov storage.StorageProvider, server bool) (storage.ExposedStorage, error) { + p := expose.NewExposedStorageNBDNL(prov, 1, 0, prov.Size(), 4096, true) - go func() { - err := p.Handle() - if err != nil { - fmt.Printf("p.Handle returned %v\n", err) - } - }() + err := p.Handle() + if err != nil { + fmt.Printf("p.Handle returned %v\n", err) + } p.WaitReady() - err := os.Mkdir(fmt.Sprintf("/mnt/mount%s", device), 0600) + device := p.Device() + fmt.Printf("* Device ready on /dev/%s\n", device) + + err = os.Mkdir(fmt.Sprintf("/mnt/mount%s", device), 0600) if err != nil { return nil, fmt.Errorf("Error mkdir %v", err) } @@ -297,17 +298,24 @@ func setup(device string, prov storage.StorageProvider, server bool) (storage.Ex return nil, fmt.Errorf("Error mount %v", err) } } else { - cmd := exec.Command("mount", "-r", fmt.Sprintf("/dev/%s", device), fmt.Sprintf("/mnt/mount%s", device)) - err = cmd.Run() - if err != nil { - return nil, fmt.Errorf("Error mount %v", err) - } + go func() { + fmt.Printf("Mounting device...") + cmd := exec.Command("mount", "-r", fmt.Sprintf("/dev/%s", device), fmt.Sprintf("/mnt/mount%s", device)) + err = cmd.Run() + if err != nil { + fmt.Printf("Could not mount device %v\n", err) + return + } + fmt.Printf("* Device is mounted at /mnt/mount%s\n", device) + }() } return p, nil } -func shutdown(device string, p storage.ExposedStorage) error { +func shutdown(p storage.ExposedStorage) error { + device := p.Device() + fmt.Printf("shutdown %s\n", device) cmd := exec.Command("umount", fmt.Sprintf("/dev/%s", device)) err := cmd.Run() diff --git a/pkg/storage/expose/nbd.go b/pkg/storage/expose/nbd.go index 05c07d2e..f4400fa9 100644 --- a/pkg/storage/expose/nbd.go +++ b/pkg/storage/expose/nbd.go @@ -230,6 +230,10 @@ func (n *ExposedStorageNBD) WaitReady() error { return nil } +func (n *ExposedStorageNBD) Device() string { + return n.dev +} + func (n *ExposedStorageNBD) Shutdown() error { /* diff --git a/pkg/storage/expose/nbdnl.go b/pkg/storage/expose/nbdnl.go index 6a2f3b98..4be8d9da 100644 --- a/pkg/storage/expose/nbdnl.go +++ b/pkg/storage/expose/nbdnl.go @@ -1,6 +1,7 @@ package expose import ( + "fmt" "io" "net" "os" @@ -41,6 +42,10 @@ func NewExposedStorageNBDNL(prov storage.StorageProvider, num_connections int, t } } +func (n *ExposedStorageNBDNL) Device() string { + return fmt.Sprintf("nbd%d", n.DevIndex) +} + func (n *ExposedStorageNBDNL) Handle() error { socks := make([]*os.File, 0) @@ -88,7 +93,7 @@ func (n *ExposedStorageNBDNL) Handle() error { return nil } -// Wait until it's connected... +// Wait until it's connected... (Handle must have been called already) func (n *ExposedStorageNBDNL) WaitReady() error { for { s, err := nbdnl.Status(uint32(n.DevIndex)) diff --git a/pkg/storage/migrator/migrator_test.go b/pkg/storage/migrator/migrator_test.go index 46319cc7..9f1e7082 100644 --- a/pkg/storage/migrator/migrator_test.go +++ b/pkg/storage/migrator/migrator_test.go @@ -103,8 +103,8 @@ func TestMigratorSimplePipe(t *testing.T) { r1, w1 := io.Pipe() r2, w2 := io.Pipe() - prSource := protocol.NewProtocolRW(context.TODO(), r1, w2) - prDest := protocol.NewProtocolRW(context.TODO(), r2, w1) + prSource := protocol.NewProtocolRW(context.TODO(), r1, w2, nil) + prDest := protocol.NewProtocolRW(context.TODO(), r2, w1, nil) go prSource.Handle() go prDest.Handle() diff --git a/pkg/storage/modules/waiting_cache.go b/pkg/storage/modules/waiting_cache.go index 0d14e598..fab71be5 100644 --- a/pkg/storage/modules/waiting_cache.go +++ b/pkg/storage/modules/waiting_cache.go @@ -46,6 +46,12 @@ func (wcl *WaitingCacheLocal) Size() uint64 { return wcl.wc.localSize() } +func (wcl *WaitingCacheLocal) DirtyBlocks(blocks []uint) { + for _, v := range blocks { + wcl.wc.haveNotBlock(v) + } +} + type WaitingCacheRemote struct { wc *WaitingCache } @@ -207,12 +213,6 @@ func (i *WaitingCache) remoteWriteAt(buffer []byte, offset int64) (int, error) { return n, err } -func (i *WaitingCache) DirtyBlocks(blocks []uint) { - for _, v := range blocks { - i.haveNotBlock(v) - } -} - func (i *WaitingCache) localFlush() error { return i.prov.Flush() } diff --git a/pkg/storage/protocol/mock_protocol.go b/pkg/storage/protocol/mock_protocol.go index 25142108..ce1e2506 100644 --- a/pkg/storage/protocol/mock_protocol.go +++ b/pkg/storage/protocol/mock_protocol.go @@ -5,16 +5,6 @@ import ( "sync/atomic" ) -type packetinfo struct { - id uint32 - data []byte -} - -type Waiters struct { - by_cmd map[byte]chan packetinfo - by_id map[uint32]chan packetinfo -} - type MockProtocol struct { waiters map[uint32]Waiters waiters_lock sync.Mutex diff --git a/pkg/storage/protocol/protocol_rw.go b/pkg/storage/protocol/protocol_rw.go index fe88f2b7..10438113 100644 --- a/pkg/storage/protocol/protocol_rw.go +++ b/pkg/storage/protocol/protocol_rw.go @@ -8,22 +8,36 @@ import ( "sync/atomic" ) +type packetinfo struct { + id uint32 + data []byte +} + +type Waiters struct { + by_cmd map[byte]chan packetinfo + by_id map[uint32]chan packetinfo +} + type ProtocolRW struct { ctx context.Context r io.Reader w io.Writer w_lock sync.Mutex tx_id uint32 + active_devs map[uint32]bool waiters map[uint32]Waiters waiters_lock sync.Mutex + newdev_fn func(uint32) } -func NewProtocolRW(ctx context.Context, r io.Reader, w io.Writer) *ProtocolRW { +func NewProtocolRW(ctx context.Context, r io.Reader, w io.Writer, newdev_fn func(uint32)) *ProtocolRW { return &ProtocolRW{ - ctx: ctx, - r: r, - w: w, - waiters: make(map[uint32]Waiters), + ctx: ctx, + r: r, + w: w, + waiters: make(map[uint32]Waiters), + newdev_fn: newdev_fn, + active_devs: make(map[uint32]bool), } } @@ -80,6 +94,14 @@ func (p *ProtocolRW) Handle() error { } // Now queue it up in a channel + _, ok := p.active_devs[dev] + if !ok { + p.active_devs[dev] = true + if p.newdev_fn != nil { + p.newdev_fn(dev) + } + } + cmd := data[0] p.waiters_lock.Lock() diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 05cc9ecc..622f81e3 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -28,6 +28,7 @@ type ExposedStorage interface { Handle() error WaitReady() error Shutdown() error + Device() string } type BlockOrder interface { diff --git a/testing/protocol_test.go b/testing/protocol_test.go index 8171c44b..651ba1e0 100644 --- a/testing/protocol_test.go +++ b/testing/protocol_test.go @@ -3,6 +3,7 @@ package storage import ( "context" "crypto/rand" + "fmt" "io" "testing" @@ -103,6 +104,7 @@ func TestProtocolReadAt(t *testing.T) { } func TestProtocolRWWriteAt(t *testing.T) { + fmt.Printf("TestProtocolRWWriteAt\n") size := 1024 * 1024 var store storage.StorageProvider @@ -112,8 +114,12 @@ func TestProtocolRWWriteAt(t *testing.T) { r1, w1 := io.Pipe() r2, w2 := io.Pipe() - prSource := protocol.NewProtocolRW(context.TODO(), r1, w2) - prDest := protocol.NewProtocolRW(context.TODO(), r2, w1) + destDev := make(chan uint32, 8) + + prSource := protocol.NewProtocolRW(context.TODO(), r1, w2, nil) + prDest := protocol.NewProtocolRW(context.TODO(), r2, w1, func(dev uint32) { + destDev <- dev + }) sourceToProtocol := modules.NewToProtocol(uint64(size), 1, prSource) @@ -138,6 +144,10 @@ func TestProtocolRWWriteAt(t *testing.T) { sourceToProtocol.SendDevInfo() + // Should know the dev now... + assert.Equal(t, uint32(1), <-destDev) + assert.Equal(t, 0, len(destDev)) + buff := make([]byte, 4096) rand.Read(buff) n, err := sourceToProtocol.WriteAt(buff, 12) @@ -167,8 +177,8 @@ func TestProtocolRWReadAt(t *testing.T) { r1, w1 := io.Pipe() r2, w2 := io.Pipe() - prSource := protocol.NewProtocolRW(context.TODO(), r1, w2) - prDest := protocol.NewProtocolRW(context.TODO(), r2, w1) + prSource := protocol.NewProtocolRW(context.TODO(), r1, w2, nil) + prDest := protocol.NewProtocolRW(context.TODO(), r2, w1, nil) sourceToProtocol := modules.NewToProtocol(uint64(size), 1, prSource)