From 8cafaa203be2bee396b8877764fbde7a0a99a0a1 Mon Sep 17 00:00:00 2001 From: Jimmy Moore <67790371+jimmyaxod@users.noreply.github.com> Date: Tue, 19 Nov 2024 17:24:14 +0000 Subject: [PATCH] arch-211-add-logging-to-silo (#53) * Added logging to migrator Signed-off-by: Jimmy Moore * Sync now passes logger to migrator Signed-off-by: Jimmy Moore * Updated logger module Signed-off-by: Jimmy Moore * Added logging to device Signed-off-by: Jimmy Moore * Added logging to nbd expose Signed-off-by: Jimmy Moore * Added logging to cmd/serve Signed-off-by: Jimmy Moore * Fix nbd config, and added logging output for tests Signed-off-by: Jimmy Moore * Couple of logs in device.go Signed-off-by: Jimmy Moore * Added logging to waitingCache Signed-off-by: Jimmy Moore --------- Signed-off-by: Jimmy Moore --- cmd/connect.go | 2 +- cmd/serve.go | 30 ++- cmd/sync.go | 5 +- go.mod | 3 +- go.sum | 7 + pkg/storage/device/device.go | 41 +++- pkg/storage/expose/nbd.go | 177 +++++++++++++++--- pkg/storage/expose/nbd_dev_test.go | 6 +- pkg/storage/expose/nbd_dispatch.go | 67 ++++++- pkg/storage/expose/nbd_test.go | 17 +- pkg/storage/expose/process_memory_test.go | 2 +- pkg/storage/migrator/migrator.go | 90 +++++++++ pkg/storage/migrator/sync.go | 1 + pkg/storage/modules/copy_on_write_test.go | 4 +- pkg/storage/modules/logger.go | 59 +++++- pkg/storage/storage_events_test.go | 4 +- pkg/storage/waitingcache/waiting_cache.go | 43 ++++- .../waitingcache/waiting_cache_local.go | 26 +++ .../waitingcache/waiting_cache_remote.go | 69 ++++--- pkg/testutils/logging_output.go | 29 +++ 20 files changed, 583 insertions(+), 99 deletions(-) create mode 100644 pkg/testutils/logging_output.go diff --git a/cmd/connect.go b/cmd/connect.go index 8ef356e0..397661af 100644 --- a/cmd/connect.go +++ b/cmd/connect.go @@ -356,7 +356,7 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32 // Called to setup an exposed storage device func dstDeviceSetup(prov storage.Provider) (storage.ExposedStorage, error) { - p := expose.NewExposedStorageNBDNL(prov, 1, 0, prov.Size(), 4096, true) + p := expose.NewExposedStorageNBDNL(prov, expose.DefaultConfig) var err error err = p.Init() diff --git a/cmd/serve.go b/cmd/serve.go index 9fa41d50..9ccdf3e5 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -12,6 +12,8 @@ import ( "time" "github.com/fatih/color" + "github.com/loopholelabs/logging" + "github.com/loopholelabs/logging/types" "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/blocks" "github.com/loopholelabs/silo/pkg/storage/config" @@ -48,6 +50,8 @@ var srcStorage []*storageInfo var serveProgressBar *mpb.Progress var serveBars []*mpb.Bar +var serveDebug bool + func init() { rootCmd.AddCommand(cmdServe) cmdServe.Flags().StringVarP(&serveAddr, "addr", "a", ":5170", "Address to serve from") @@ -55,6 +59,7 @@ func init() { cmdServe.Flags().BoolVarP(&serveProgress, "progress", "p", false, "Show progress") cmdServe.Flags().BoolVarP(&serveContinuous, "continuous", "C", false, "Continuous sync") cmdServe.Flags().BoolVarP(&serveAnyOrder, "order", "o", false, "Any order (faster)") + cmdServe.Flags().BoolVarP(&serveDebug, "debug", "d", false, "Debug logging (trace)") } type storageInfo struct { @@ -69,6 +74,12 @@ type storageInfo struct { } func runServe(_ *cobra.Command, _ []string) { + var log types.RootLogger + if serveDebug { + log = logging.New(logging.Zerolog, "silo.serve", os.Stderr) + log.SetLevel(types.TraceLevel) + } + if serveProgress { serveProgressBar = mpb.New( mpb.WithOutput(color.Output), @@ -85,7 +96,7 @@ func runServe(_ *cobra.Command, _ []string) { signal.Notify(c, os.Interrupt, syscall.SIGTERM) go func() { <-c - shutdownEverything() + shutdownEverything(log) os.Exit(1) }() @@ -96,7 +107,7 @@ func runServe(_ *cobra.Command, _ []string) { for i, s := range siloConf.Device { fmt.Printf("Setup storage %d [%s] size %s - %d\n", i, s.Name, s.Size, s.ByteSize()) - sinfo, err := setupStorageDevice(s) + sinfo, err := setupStorageDevice(s, log) if err != nil { panic(fmt.Sprintf("Could not setup storage. %v", err)) } @@ -108,7 +119,7 @@ func runServe(_ *cobra.Command, _ []string) { l, err := net.Listen("tcp", serveAddr) if err != nil { - shutdownEverything() + shutdownEverything(log) panic("Listener issue...") } @@ -133,7 +144,7 @@ func runServe(_ *cobra.Command, _ []string) { for i, s := range srcStorage { wg.Add(1) go func(index int, src *storageInfo) { - err := migrateDevice(uint32(index), src.name, pro, src) + err := migrateDevice(log, uint32(index), src.name, pro, src) if err != nil { fmt.Printf("There was an issue migrating the storage %d %v\n", index, err) } @@ -149,10 +160,10 @@ func runServe(_ *cobra.Command, _ []string) { con.Close() } - shutdownEverything() + shutdownEverything(log) } -func shutdownEverything() { +func shutdownEverything(_ types.RootLogger) { // first unlock everything fmt.Printf("Unlocking devices...\n") for _, i := range srcStorage { @@ -169,8 +180,8 @@ func shutdownEverything() { } } -func setupStorageDevice(conf *config.DeviceSchema) (*storageInfo, error) { - source, ex, err := device.NewDevice(conf) +func setupStorageDevice(conf *config.DeviceSchema, log types.RootLogger) (*storageInfo, error) { + source, ex, err := device.NewDeviceWithLogging(conf, log) if err != nil { return nil, err } @@ -223,7 +234,7 @@ func setupStorageDevice(conf *config.DeviceSchema) (*storageInfo, error) { } // Migrate a device -func migrateDevice(devID uint32, name string, +func migrateDevice(log types.RootLogger, devID uint32, name string, pro protocol.Protocol, sinfo *storageInfo) error { size := sinfo.lockable.Size() @@ -293,6 +304,7 @@ func migrateDevice(devID uint32, name string, }() conf := migrator.NewConfig().WithBlockSize(sinfo.blockSize) + conf.Logger = log conf.LockerHandler = func() { _ = dest.SendEvent(&packets.Event{Type: packets.EventPreLock}) sinfo.lockable.Lock() diff --git a/cmd/sync.go b/cmd/sync.go index f617f2bc..e324f592 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -260,10 +260,7 @@ func syncMigrateS3(_ uint32, name string, fmt.Printf("[%s] Error for block %d error %v\n", name, b.Block, err) } - // Show logging for S3 writes - logDest := modules.NewLogger(destMetrics, "S3") - - mig, err := migrator.NewMigrator(sinfo.tracker, logDest, sinfo.orderer, conf) + mig, err := migrator.NewMigrator(sinfo.tracker, destMetrics, sinfo.orderer, conf) if err != nil { return err diff --git a/go.mod b/go.mod index 0b265ee8..cc13ed05 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.9.0 github.com/vbauerster/mpb/v8 v8.8.3 + golang.org/x/sys v0.25.0 ) require ( @@ -64,6 +65,7 @@ require ( github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/rs/xid v1.6.0 // indirect + github.com/rs/zerolog v1.33.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect @@ -74,7 +76,6 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.28.0 // indirect golang.org/x/sync v0.8.0 // indirect - golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.17.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect diff --git a/go.sum b/go.sum index c455c049..9fbbbed6 100644 --- a/go.sum +++ b/go.sum @@ -24,6 +24,7 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3 github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/containerd/continuity v0.4.3 h1:6HVkalIp+2u1ZLH1J/pYX2oBVXlJZvh1X1A7bEZ9Su8= github.com/containerd/continuity v0.4.3/go.mod h1:F6PTNCKepoxEaXLQp3wDAjygEnImnZ/7o4JzpodfroQ= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= @@ -51,6 +52,7 @@ github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68= github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= @@ -86,6 +88,7 @@ github.com/loopholelabs/logging v0.3.1/go.mod h1:uRDUydiqPqKbZkb0WoQ3dfyAcJ2iOMh github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= @@ -127,8 +130,11 @@ github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUc github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= +github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= +github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= @@ -184,6 +190,7 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/pkg/storage/device/device.go b/pkg/storage/device/device.go index 85076017..e53c7e79 100644 --- a/pkg/storage/device/device.go +++ b/pkg/storage/device/device.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/loopholelabs/logging/types" "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/blocks" "github.com/loopholelabs/silo/pkg/storage/config" @@ -37,9 +38,13 @@ type Device struct { } func NewDevices(ds []*config.DeviceSchema) (map[string]*Device, error) { + return NewDevicesWithLogging(ds, nil) +} + +func NewDevicesWithLogging(ds []*config.DeviceSchema, log types.RootLogger) (map[string]*Device, error) { devices := make(map[string]*Device) for _, c := range ds { - dev, ex, err := NewDevice(c) + dev, ex, err := NewDeviceWithLogging(c, log) if err != nil { // Close/shutdown any we already setup, but we'll ignore any close errors here. for _, cc := range devices { @@ -59,6 +64,13 @@ func NewDevices(ds []*config.DeviceSchema) (map[string]*Device, error) { } func NewDevice(ds *config.DeviceSchema) (storage.Provider, storage.ExposedStorage, error) { + return NewDeviceWithLogging(ds, nil) +} + +func NewDeviceWithLogging(ds *config.DeviceSchema, log types.RootLogger) (storage.Provider, storage.ExposedStorage, error) { + if log != nil { + log.Debug().Str("name", ds.Name).Msg("creating new device") + } var prov storage.Provider var err error @@ -175,6 +187,9 @@ func NewDevice(ds *config.DeviceSchema) (storage.Provider, storage.ExposedStorag // Optionally use a copy on write RO source... if ds.ROSource != nil { + if log != nil { + log.Debug().Str("name", ds.Name).Msg("setting up CopyOnWrite") + } // Create the ROSource... rodev, _, err := NewDevice(ds.ROSource) @@ -201,6 +216,10 @@ func NewDevice(ds *config.DeviceSchema) (storage.Provider, storage.ExposedStorag // Make sure the cow data gets dumped on close... cow.CloseFn = func() { + if log != nil { + log.Debug().Str("name", ds.Name).Msg("Writing CopyOnWrite state") + } + blocks := cow.GetBlockExists() // Write it out to file data := make([]byte, 0) @@ -216,6 +235,10 @@ func NewDevice(ds *config.DeviceSchema) (storage.Provider, storage.ExposedStorag // Optionally binlog this dev to a file if ds.Binlog != "" { + if log != nil { + log.Debug().Str("name", ds.Name).Msg("logging to binlog") + } + prov, err = modules.NewBinLog(prov, ds.Binlog) if err != nil { return nil, nil, err @@ -227,7 +250,7 @@ func NewDevice(ds *config.DeviceSchema) (storage.Provider, storage.ExposedStorag var ex storage.ExposedStorage if ds.Expose { - ex = expose.NewExposedStorageNBDNL(prov, 8, 0, prov.Size(), expose.NBDDefaultBlockSize, true) + ex = expose.NewExposedStorageNBDNL(prov, expose.DefaultConfig.WithLogger(log)) err := ex.Init() if err != nil { @@ -238,6 +261,9 @@ func NewDevice(ds *config.DeviceSchema) (storage.Provider, storage.ExposedStorag // Optionally sync the device to S3 if ds.Sync != nil { + if log != nil { + log.Debug().Str("name", ds.Name).Msg("setting up S3 sync") + } s3dest, err := sources.NewS3StorageCreate(ds.Sync.Secure, ds.Sync.Endpoint, @@ -280,6 +306,7 @@ func NewDevice(ds *config.DeviceSchema) (storage.Provider, storage.ExposedStorag // Start doing the sync... syncer := migrator.NewSyncer(ctx, &migrator.SyncConfig{ + Logger: log, Name: ds.Name, Integrity: false, CancelWrites: true, @@ -306,6 +333,9 @@ func NewDevice(ds *config.DeviceSchema) (storage.Provider, storage.ExposedStorag var wg sync.WaitGroup startSync := func(_ storage.EventType, data storage.EventData) storage.EventReturnData { + if log != nil { + log.Debug().Str("name", ds.Name).Msg("sync.start called") + } if data != nil { startConfig := data.(storage.SyncStartConfig) @@ -356,6 +386,9 @@ func NewDevice(ds *config.DeviceSchema) (storage.Provider, storage.ExposedStorag } stopSync := func(_ storage.EventType, _ storage.EventData) storage.EventReturnData { + if log != nil { + log.Debug().Str("name", ds.Name).Msg("sync.stop called") + } syncLock.Lock() if !syncRunning { syncLock.Unlock() @@ -381,6 +414,10 @@ func NewDevice(ds *config.DeviceSchema) (storage.Provider, storage.ExposedStorag altSources = append(altSources, as) } + if log != nil { + log.Debug().Str("name", ds.Name).Int("sources", len(altSources)).Msg("sync.stop returning altSources") + } + return altSources } diff --git a/pkg/storage/expose/nbd.go b/pkg/storage/expose/nbd.go index 26f1d01e..7f3379fd 100644 --- a/pkg/storage/expose/nbd.go +++ b/pkg/storage/expose/nbd.go @@ -7,10 +7,13 @@ import ( "net" "os" "strings" + "sync" "syscall" "time" "github.com/Merovius/nbd/nbdnl" + "github.com/google/uuid" + "github.com/loopholelabs/logging/types" "github.com/loopholelabs/silo/pkg/storage" ) @@ -18,26 +21,97 @@ const NBDDefaultBlockSize = 4096 const NBDAlignSectorSize = 512 +var DefaultConfig = &Config{ + NumConnections: 8, + Timeout: 0, + BlockSize: NBDDefaultBlockSize, + AsyncReads: true, + AsyncWrites: true, +} + +type Config struct { + Logger types.RootLogger + NumConnections int + Timeout time.Duration + BlockSize uint64 + AsyncReads bool + AsyncWrites bool +} + +func (c *Config) WithLogger(l types.RootLogger) *Config { + return &Config{Logger: l, + NumConnections: c.NumConnections, + Timeout: c.Timeout, + BlockSize: c.BlockSize, + AsyncReads: c.AsyncReads, + AsyncWrites: c.AsyncWrites} +} + +func (c *Config) WithNumConnections(cons int) *Config { + return &Config{Logger: c.Logger, + NumConnections: cons, + Timeout: c.Timeout, + BlockSize: c.BlockSize, + AsyncReads: c.AsyncReads, + AsyncWrites: c.AsyncWrites} +} + +func (c *Config) WithTimeout(t time.Duration) *Config { + return &Config{Logger: c.Logger, + NumConnections: c.NumConnections, + Timeout: t, + BlockSize: c.BlockSize, + AsyncReads: c.AsyncReads, + AsyncWrites: c.AsyncWrites} +} + +func (c *Config) WithBlockSize(bs uint64) *Config { + return &Config{Logger: c.Logger, + NumConnections: c.NumConnections, + Timeout: c.Timeout, + BlockSize: bs, + AsyncReads: c.AsyncReads, + AsyncWrites: c.AsyncWrites} +} + +func (c *Config) WithAsyncReads(t bool) *Config { + return &Config{Logger: c.Logger, + NumConnections: c.NumConnections, + Timeout: c.Timeout, + BlockSize: c.BlockSize, + AsyncReads: t, + AsyncWrites: c.AsyncWrites} +} + +func (c *Config) WithAsyncWrites(t bool) *Config { + return &Config{Logger: c.Logger, + NumConnections: c.NumConnections, + Timeout: c.Timeout, + BlockSize: c.BlockSize, + AsyncReads: c.AsyncReads, + AsyncWrites: t} +} + /** * Exposes a storage provider as an nbd device using netlink * */ type ExposedStorageNBDNL struct { - ctx context.Context - cancelfn context.CancelFunc - numConnections int - timeout time.Duration - size uint64 - blockSize uint64 + config *Config + uuid uuid.UUID + ctx context.Context + cancelfn context.CancelFunc + size uint64 socks []io.Closer prov storage.Provider + provLock sync.RWMutex deviceIndex int - async bool dispatchers []*Dispatch } -func NewExposedStorageNBDNL(prov storage.Provider, numConnections int, timeout time.Duration, size uint64, blockSize uint64, async bool) *ExposedStorageNBDNL { +func NewExposedStorageNBDNL(prov storage.Provider, conf *Config) *ExposedStorageNBDNL { + size := prov.Size() // The size must be a multiple of sector size size = NBDAlignSectorSize * ((size + NBDAlignSectorSize - 1) / NBDAlignSectorSize) @@ -45,44 +119,56 @@ func NewExposedStorageNBDNL(prov storage.Provider, numConnections int, timeout t ctx, cancelfn := context.WithCancel(context.TODO()) return &ExposedStorageNBDNL{ - ctx: ctx, - cancelfn: cancelfn, - prov: prov, - numConnections: numConnections, - timeout: timeout, - size: size, - blockSize: blockSize, - socks: make([]io.Closer, 0), - async: async, + config: conf, + uuid: uuid.New(), + ctx: ctx, + cancelfn: cancelfn, + prov: prov, + size: size, + socks: make([]io.Closer, 0), } } func (n *ExposedStorageNBDNL) SetProvider(prov storage.Provider) { + n.provLock.Lock() n.prov = prov + n.provLock.Unlock() } // Impl StorageProvider here so we can route calls to provider func (n *ExposedStorageNBDNL) ReadAt(buffer []byte, offset int64) (int, error) { + n.provLock.RLock() + defer n.provLock.RUnlock() return n.prov.ReadAt(buffer, offset) } func (n *ExposedStorageNBDNL) WriteAt(buffer []byte, offset int64) (int, error) { + n.provLock.RLock() + defer n.provLock.RUnlock() return n.prov.WriteAt(buffer, offset) } func (n *ExposedStorageNBDNL) Flush() error { + n.provLock.RLock() + defer n.provLock.RUnlock() return n.prov.Flush() } func (n *ExposedStorageNBDNL) Size() uint64 { + n.provLock.RLock() + defer n.provLock.RUnlock() return n.prov.Size() } func (n *ExposedStorageNBDNL) Close() error { + n.provLock.RLock() + defer n.provLock.RUnlock() return n.prov.Close() } func (n *ExposedStorageNBDNL) CancelWrites(offset int64, length int64) { + n.provLock.RLock() + defer n.provLock.RUnlock() n.prov.CancelWrites(offset, length) } @@ -99,7 +185,7 @@ func (n *ExposedStorageNBDNL) Init() error { n.dispatchers = make([]*Dispatch, 0) // Create the socket pairs - for i := 0; i < n.numConnections; i++ { + for i := 0; i < n.config.NumConnections; i++ { sockPair, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0) if err != nil { return err @@ -113,21 +199,27 @@ func (n *ExposedStorageNBDNL) Init() error { } server.Close() - d := NewDispatch(n.ctx, serverc, n) - d.asyncReads = n.async - d.asyncWrites = n.async + d := NewDispatch(n.ctx, n.uuid.String(), n.config.Logger, serverc, n) + d.asyncReads = n.config.AsyncReads + d.asyncWrites = n.config.AsyncWrites // Start reading commands on the socket and dispatching them to our provider go func() { - _ = d.Handle() + err := d.Handle() + if n.config.Logger != nil { + n.config.Logger.Trace(). + Str("uuid", n.uuid.String()). + Err(err). + Msg("nbd dispatch completed") + } }() n.socks = append(n.socks, serverc) socks = append(socks, client) n.dispatchers = append(n.dispatchers, d) } var opts []nbdnl.ConnectOption - opts = append(opts, nbdnl.WithBlockSize(n.blockSize)) - opts = append(opts, nbdnl.WithTimeout(n.timeout)) - opts = append(opts, nbdnl.WithDeadconnTimeout(n.timeout)) + opts = append(opts, nbdnl.WithBlockSize(n.config.BlockSize)) + opts = append(opts, nbdnl.WithTimeout(n.config.Timeout)) + opts = append(opts, nbdnl.WithDeadconnTimeout(n.config.Timeout)) serverFlags := nbdnl.FlagHasFlags | nbdnl.FlagCanMulticonn @@ -137,6 +229,13 @@ func (n *ExposedStorageNBDNL) Init() error { break } + if n.config.Logger != nil { + n.config.Logger.Trace(). + Str("uuid", n.uuid.String()). + Err(err). + Msg("error from nbdnl.Connect") + } + // Sometimes (rare), there seems to be a BADF error here. Lets just retry for now... // Close things down and try again... for _, s := range socks { @@ -150,6 +249,13 @@ func (n *ExposedStorageNBDNL) Init() error { time.Sleep(50 * time.Millisecond) } + if n.config.Logger != nil { + n.config.Logger.Trace(). + Str("uuid", n.uuid.String()). + Int("deviceIndex", n.deviceIndex). + Msg("Waiting for nbd device to init") + } + // Wait until it's connected... for { s, err := nbdnl.Status(uint32(n.deviceIndex)) @@ -159,10 +265,24 @@ func (n *ExposedStorageNBDNL) Init() error { time.Sleep(100 * time.Nanosecond) } + if n.config.Logger != nil { + n.config.Logger.Trace(). + Str("uuid", n.uuid.String()). + Int("deviceIndex", n.deviceIndex). + Msg("nbd device connected") + } + return nil } func (n *ExposedStorageNBDNL) Shutdown() error { + if n.config.Logger != nil { + n.config.Logger.Trace(). + Str("uuid", n.uuid.String()). + Int("deviceIndex", n.deviceIndex). + Msg("nbd device shutdown initiated") + } + // First cancel the context, which will stop waiting on pending readAt/writeAt... n.cancelfn() @@ -194,5 +314,12 @@ func (n *ExposedStorageNBDNL) Shutdown() error { time.Sleep(100 * time.Nanosecond) } + if n.config.Logger != nil { + n.config.Logger.Trace(). + Str("uuid", n.uuid.String()). + Int("deviceIndex", n.deviceIndex). + Msg("nbd device disconnected") + } + return nil } diff --git a/pkg/storage/expose/nbd_dev_test.go b/pkg/storage/expose/nbd_dev_test.go index d4ea4528..1240e611 100644 --- a/pkg/storage/expose/nbd_dev_test.go +++ b/pkg/storage/expose/nbd_dev_test.go @@ -35,7 +35,7 @@ func BenchmarkDevReadNL(mb *testing.B) { store := sources.NewMemoryStorage(diskSize) driver := modules.NewMetrics(store) - n := NewExposedStorageNBDNL(driver, c, 0, driver.Size(), 4096, true) + n := NewExposedStorageNBDNL(driver, DefaultConfig.WithNumConnections(c)) err := n.Init() if err != nil { @@ -161,7 +161,7 @@ func BenchmarkDevReadNLLatency(mb *testing.B) { } } - n := NewExposedStorageNBDNL(driver, cns, 0, driver.Size(), 4096, asy) + n := NewExposedStorageNBDNL(driver, DefaultConfig.WithNumConnections(cns).WithAsyncReads(asy).WithAsyncWrites(asy)) err := n.Init() if err != nil { @@ -265,7 +265,7 @@ func BenchmarkDevWriteNL(b *testing.B) { // store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 0, 100*time.Millisecond, 0) driver := modules.NewMetrics(store) - n := NewExposedStorageNBDNL(driver, 1, 0, driver.Size(), 4096, true) + n := NewExposedStorageNBDNL(driver, DefaultConfig.WithNumConnections(1)) err = n.Init() if err != nil { diff --git a/pkg/storage/expose/nbd_dispatch.go b/pkg/storage/expose/nbd_dispatch.go index 6e506de7..d5b15cff 100644 --- a/pkg/storage/expose/nbd_dispatch.go +++ b/pkg/storage/expose/nbd_dispatch.go @@ -7,6 +7,7 @@ import ( "io" "sync" + "github.com/loopholelabs/logging/types" "github.com/loopholelabs/silo/pkg/storage" ) @@ -62,6 +63,8 @@ type Response struct { } type Dispatch struct { + logger types.RootLogger + dev string ctx context.Context asyncReads bool asyncWrites bool @@ -75,9 +78,11 @@ type Dispatch struct { metricPacketsOut uint64 } -func NewDispatch(ctx context.Context, fp io.ReadWriteCloser, prov storage.Provider) *Dispatch { +func NewDispatch(ctx context.Context, name string, logger types.RootLogger, fp io.ReadWriteCloser, prov storage.Provider) *Dispatch { d := &Dispatch{ + logger: logger, + dev: name, asyncWrites: true, asyncReads: true, responseHeader: make([]byte, 16), @@ -92,8 +97,14 @@ func NewDispatch(ctx context.Context, fp io.ReadWriteCloser, prov storage.Provid } func (d *Dispatch) Wait() { + if d.logger != nil { + d.logger.Trace().Str("device", d.dev).Msg("nbd waiting for pending responses") + } // Wait for any pending responses d.pendingResponses.Wait() + if d.logger != nil { + d.logger.Trace().Str("device", d.dev).Msg("nbd all responses sent") + } } /** @@ -104,18 +115,43 @@ func (d *Dispatch) writeResponse(respError uint32, respHandle uint64, chunk []by d.writeLock.Lock() defer d.writeLock.Unlock() - // fmt.Printf("WriteResponse %v %x -> %d\n", d.fp, respHandle, len(chunk)) + if d.logger != nil { + d.logger.Trace(). + Str("device", d.dev). + Uint32("respError", respError). + Uint64("respHandle", respHandle). + Int("data", len(chunk)). + Msg("nbd writing response") + } binary.BigEndian.PutUint32(d.responseHeader[4:], respError) binary.BigEndian.PutUint64(d.responseHeader[8:], respHandle) _, err := d.fp.Write(d.responseHeader) if err != nil { + if d.logger != nil { + d.logger.Trace(). + Str("device", d.dev). + Uint32("respError", respError). + Uint64("respHandle", respHandle). + Int("data", len(chunk)). + Err(err). + Msg("nbd error writing response header") + } return err } if len(chunk) > 0 { _, err = d.fp.Write(chunk) if err != nil { + if d.logger != nil { + d.logger.Trace(). + Str("device", d.dev). + Uint32("respError", respError). + Uint64("respHandle", respHandle). + Int("data", len(chunk)). + Err(err). + Msg("nbd error writing response data") + } return err } } @@ -149,6 +185,11 @@ func (d *Dispatch) Handle() error { // If the context has been cancelled, quit select { case <-d.ctx.Done(): + if d.logger != nil { + d.logger.Trace(). + Str("device", d.dev). + Msg("nbd handler context cancelled") + } return d.ctx.Err() default: } @@ -170,6 +211,11 @@ func (d *Dispatch) Handle() error { switch request.Type { case NBDCmdDisconnect: + if d.logger != nil { + d.logger.Trace(). + Str("device", d.dev). + Msg("nbd disconnect received") + } return nil // All done case NBDCmdFlush: return fmt.Errorf("not supported: Flush") @@ -222,6 +268,14 @@ func (d *Dispatch) Handle() error { * */ func (d *Dispatch) cmdRead(cmdHandle uint64, cmdFrom uint64, cmdLength uint32) error { + if d.logger != nil { + d.logger.Trace(). + Str("device", d.dev). + Uint64("cmdHandle", cmdHandle). + Uint64("cmdFrom", cmdFrom). + Uint32("cmdLength", cmdLength). + Msg("nbd cmdRead") + } performRead := func(handle uint64, from uint64, length uint32) error { errchan := make(chan error) @@ -271,6 +325,15 @@ func (d *Dispatch) cmdRead(cmdHandle uint64, cmdFrom uint64, cmdLength uint32) e * */ func (d *Dispatch) cmdWrite(cmdHandle uint64, cmdFrom uint64, cmdLength uint32, cmdData []byte) error { + if d.logger != nil { + d.logger.Trace(). + Str("device", d.dev). + Uint64("cmdHandle", cmdHandle). + Uint64("cmdFrom", cmdFrom). + Uint32("cmdLength", cmdLength). + Msg("nbd cmdWrite") + } + performWrite := func(handle uint64, from uint64, _ uint32, data []byte) error { errchan := make(chan error) go func() { diff --git a/pkg/storage/expose/nbd_test.go b/pkg/storage/expose/nbd_test.go index b9a93c2b..5012f6fb 100644 --- a/pkg/storage/expose/nbd_test.go +++ b/pkg/storage/expose/nbd_test.go @@ -12,6 +12,7 @@ import ( "github.com/loopholelabs/silo/pkg/storage/modules" "github.com/loopholelabs/silo/pkg/storage/sources" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestNBDNLDevice(t *testing.T) { @@ -33,10 +34,10 @@ func TestNBDNLDevice(t *testing.T) { size := 4096 * 1024 * 1024 prov := sources.NewMemoryStorage(size) - n = NewExposedStorageNBDNL(prov, 8, 0, uint64(size), 4096, true) + n = NewExposedStorageNBDNL(prov, DefaultConfig) err = n.Init() - assert.NoError(t, err) + require.NoError(t, err) var wg sync.WaitGroup @@ -83,7 +84,7 @@ func TestNBDNLDeviceBlocksizes(t *testing.T) { assert.NoError(t, err) }() - n = NewExposedStorageNBDNL(prov, 8, 0, uint64(size), uint64(bs), true) + n = NewExposedStorageNBDNL(prov, DefaultConfig.WithBlockSize(uint64(bs))) err = n.Init() assert.NoError(tt, err) @@ -117,7 +118,7 @@ func TestNBDNLDeviceSmall(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 900, n) - ndev = NewExposedStorageNBDNL(prov, 1, 0, uint64(size), 4096, true) + ndev = NewExposedStorageNBDNL(prov, DefaultConfig.WithNumConnections(1)) err = ndev.Init() assert.NoError(t, err) @@ -162,7 +163,7 @@ func TestNBDNLDeviceUnalignedPartialRead(t *testing.T) { assert.NoError(t, err) assert.Equal(t, size, n) - ndev = NewExposedStorageNBDNL(prov, 1, 0, uint64(size), 4096, true) + ndev = NewExposedStorageNBDNL(prov, DefaultConfig.WithNumConnections(1)) err = ndev.Init() assert.NoError(t, err) @@ -199,7 +200,7 @@ func TestNBDNLDeviceUnalignedPartialWrite(t *testing.T) { size := 900 prov := sources.NewMemoryStorage(size) - ndev = NewExposedStorageNBDNL(prov, 1, 0, uint64(size), 4096, true) + ndev = NewExposedStorageNBDNL(prov, DefaultConfig.WithNumConnections(1)) err = ndev.Init() assert.NoError(t, err) @@ -258,7 +259,7 @@ func TestNBDNLDeviceShutdownRead(t *testing.T) { return false, 0, nil } - n = NewExposedStorageNBDNL(hooks, 8, 0, uint64(size), 4096, true) + n = NewExposedStorageNBDNL(hooks, DefaultConfig) err = n.Init() assert.NoError(t, err) @@ -323,7 +324,7 @@ func TestNBDNLDeviceShutdownWrite(t *testing.T) { return false, 0, nil } - n = NewExposedStorageNBDNL(hooks, 8, 0, uint64(size), 4096, true) + n = NewExposedStorageNBDNL(hooks, DefaultConfig) err = n.Init() assert.NoError(t, err) diff --git a/pkg/storage/expose/process_memory_test.go b/pkg/storage/expose/process_memory_test.go index 3fbe1259..8d7a575e 100644 --- a/pkg/storage/expose/process_memory_test.go +++ b/pkg/storage/expose/process_memory_test.go @@ -18,7 +18,7 @@ import ( func setupDevTest(t *testing.T, size int) (*ExposedStorageNBDNL, storage.Provider, []byte) { prov := sources.NewMemoryStorage(size) - n := NewExposedStorageNBDNL(prov, 8, 0, uint64(size), NBDDefaultBlockSize, true) + n := NewExposedStorageNBDNL(prov, DefaultConfig) err := n.Init() assert.NoError(t, err) diff --git a/pkg/storage/migrator/migrator.go b/pkg/storage/migrator/migrator.go index 3ea5adcc..e2896c84 100644 --- a/pkg/storage/migrator/migrator.go +++ b/pkg/storage/migrator/migrator.go @@ -8,13 +8,18 @@ import ( "sync/atomic" "time" + "github.com/google/uuid" + "github.com/loopholelabs/logging/types" + "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/integrity" "github.com/loopholelabs/silo/pkg/storage/modules" + "github.com/loopholelabs/silo/pkg/storage/protocol/packets" "github.com/loopholelabs/silo/pkg/storage/util" ) type Config struct { + Logger types.RootLogger BlockSize int LockerHandler func() UnlockerHandler func() @@ -30,6 +35,7 @@ type Config struct { func NewConfig() *Config { return &Config{ + Logger: nil, BlockSize: 0, LockerHandler: func() {}, UnlockerHandler: func() {}, @@ -67,6 +73,8 @@ type MigrationProgress struct { } type Migrator struct { + uuid uuid.UUID + logger types.RootLogger sourceTracker storage.TrackingProvider // Tracks writes so we know which are dirty sourceMapped *modules.MappedStorage destWriteWithMap func([]byte, int64, map[uint64]uint64) (int, error) @@ -108,6 +116,8 @@ func NewMigrator(source storage.TrackingProvider, numBlocks := (int(source.Size()) + config.BlockSize - 1) / config.BlockSize m := &Migrator{ + uuid: uuid.New(), + logger: config.Logger, migrationStarted: false, dest: dest, sourceTracker: source, @@ -147,6 +157,14 @@ func NewMigrator(source storage.TrackingProvider, if config.Integrity { m.integrity = integrity.NewChecker(int64(m.dest.Size()), m.blockSize) } + + if m.logger != nil { + m.logger.Debug(). + Str("uuid", m.uuid.String()). + Uint64("size", source.Size()). + Msg("Created migrator") + } + return m, nil } @@ -180,10 +198,24 @@ func (m *Migrator) startMigration() { } m.migrationStarted = true + if m.logger != nil { + m.logger.Debug(). + Str("uuid", m.uuid.String()). + Uint64("size", m.sourceTracker.Size()). + Msg("Migration started") + } + // Tell the source to stop sync, and send alternateSources to the destination. as := storage.SendSiloEvent(m.sourceTracker, "sync.stop", nil) if len(as) == 1 { storage.SendSiloEvent(m.dest, "sources", as[0]) + if m.logger != nil { + m.logger.Debug(). + Str("uuid", m.uuid.String()). + Uint64("size", m.sourceTracker.Size()). + Int("sources", len(as[0].([]packets.AlternateSource))). + Msg("Migrator alternate sources sent to destination") + } } } @@ -191,6 +223,14 @@ func (m *Migrator) startMigration() { * Migrate storage to dest. */ func (m *Migrator) Migrate(numBlocks int) error { + if m.logger != nil { + m.logger.Debug(). + Str("uuid", m.uuid.String()). + Uint64("size", m.sourceTracker.Size()). + Int("blocks", numBlocks). + Msg("Migrate") + } + m.startMigration() m.ctime = time.Now() @@ -271,6 +311,14 @@ func (m *Migrator) MigrateDirty(blocks []uint) error { * You can give a tracking ID which will turn up at block_fn on success */ func (m *Migrator) MigrateDirtyWithID(blocks []uint, tid uint64) error { + if m.logger != nil { + m.logger.Debug(). + Str("uuid", m.uuid.String()). + Uint64("size", m.sourceTracker.Size()). + Int("blocks", len(blocks)). + Msg("Migrate dirty") + } + m.startMigration() for _, pos := range blocks { @@ -324,8 +372,20 @@ func (m *Migrator) MigrateDirtyWithID(blocks []uint, tid uint64) error { } func (m *Migrator) WaitForCompletion() error { + if m.logger != nil { + m.logger.Debug(). + Str("uuid", m.uuid.String()). + Uint64("size", m.sourceTracker.Size()). + Msg("Migration waiting for completion") + } m.wg.Wait() m.reportProgress(true) + if m.logger != nil { + m.logger.Debug(). + Str("uuid", m.uuid.String()). + Uint64("size", m.sourceTracker.Size()). + Msg("Migration complete") + } return nil } @@ -367,6 +427,23 @@ func (m *Migrator) reportProgress(forced bool) { TotalMigratedBlocks: int(atomic.LoadInt64(&m.metricBlocksMigrated)), TotalDuplicatedBlocks: int(atomic.LoadInt64(&m.metricBlocksDuplicates)), } + + if m.logger != nil { + m.logger.Debug(). + Str("uuid", m.uuid.String()). + Uint64("size", m.sourceTracker.Size()). + Int("TotalBlocks", m.progressLastStatus.TotalBlocks). + Int("MigratedBlocks", m.progressLastStatus.MigratedBlocks). + Float64("MigratedBlocksPerc", m.progressLastStatus.MigratedBlocksPerc). + Int("ReadyBlocks", m.progressLastStatus.ReadyBlocks). + Float64("ReadyBlocksPerc", m.progressLastStatus.ReadyBlocksPerc). + Int("ActiveBlocks", m.progressLastStatus.ActiveBlocks). + Int("TotalCanceledBlocks", m.progressLastStatus.TotalCanceledBlocks). + Int("TotalMigratedBlocks", m.progressLastStatus.TotalMigratedBlocks). + Int("TotalDuplicatedBlocks", m.progressLastStatus.TotalDuplicatedBlocks). + Msg("Migration progress") + } + // Callback m.progressFn(m.progressLastStatus) } @@ -415,6 +492,13 @@ func (m *Migrator) migrateBlock(block int) ([]byte, error) { // Read from source n, err := m.sourceTracker.ReadAt(buff, int64(offset)) if err != nil { + if m.logger != nil { + m.logger.Error(). + Str("uuid", m.uuid.String()). + Uint64("size", m.sourceTracker.Size()). + Err(err). + Msg("Migration error reading from source") + } return nil, err } @@ -436,6 +520,12 @@ func (m *Migrator) migrateBlock(block int) ([]byte, error) { if n != len(buff) || err != nil { if errors.Is(err, context.Canceled) { atomic.AddInt64(&m.metricBlocksCanceled, 1) + } else if m.logger != nil { + m.logger.Error(). + Str("uuid", m.uuid.String()). + Uint64("size", m.sourceTracker.Size()). + Err(err). + Msg("Migration error writing to destination") } return nil, err } diff --git a/pkg/storage/migrator/sync.go b/pkg/storage/migrator/sync.go index f2bcddba..10a9fba6 100644 --- a/pkg/storage/migrator/sync.go +++ b/pkg/storage/migrator/sync.go @@ -94,6 +94,7 @@ func (s *Syncer) GetSafeBlockMap() map[uint][sha256.Size]byte { */ func (s *Syncer) Sync(syncAllFirst bool, continuous bool) (*MigrationProgress, error) { conf := NewConfig().WithBlockSize(s.config.BlockSize) + conf.Logger = s.config.Logger conf.LockerHandler = func() { if s.config.LockerHandler != nil { s.config.LockerHandler() diff --git a/pkg/storage/modules/copy_on_write_test.go b/pkg/storage/modules/copy_on_write_test.go index 0a4ae29f..e6cf9e19 100644 --- a/pkg/storage/modules/copy_on_write_test.go +++ b/pkg/storage/modules/copy_on_write_test.go @@ -167,9 +167,7 @@ func TestCopyOnWriteCRCIssue(t *testing.T) { rosource := sources.NewMemoryStorage(size) - fstoreLog := NewLogger(fstore, "fstore") - - cow := NewCopyOnWrite(rosource, fstoreLog, blockSize) + cow := NewCopyOnWrite(rosource, fstore, blockSize) reference := sources.NewMemoryStorage(size) diff --git a/pkg/storage/modules/logger.go b/pkg/storage/modules/logger.go index de5b8dcd..db99735b 100644 --- a/pkg/storage/modules/logger.go +++ b/pkg/storage/modules/logger.go @@ -1,10 +1,9 @@ package modules import ( - "fmt" "sync/atomic" - "time" + "github.com/loopholelabs/logging/types" "github.com/loopholelabs/silo/pkg/storage" ) @@ -12,6 +11,7 @@ type Logger struct { storage.ProviderWithEvents prov storage.Provider prefix string + log types.RootLogger enabled atomic.Bool } @@ -21,9 +21,10 @@ func (i *Logger) SendSiloEvent(eventType storage.EventType, eventData storage.Ev return append(data, storage.SendSiloEvent(i.prov, eventType, eventData)...) } -func NewLogger(prov storage.Provider, prefix string) *Logger { +func NewLogger(prov storage.Provider, prefix string, log types.RootLogger) *Logger { l := &Logger{ prov: prov, + log: log, prefix: prefix, } l.enabled.Store(true) @@ -31,31 +32,56 @@ func NewLogger(prov storage.Provider, prefix string) *Logger { } func (i *Logger) Disable() { + if i.enabled.Load() && i.log != nil { + i.log.Debug().Str("device", i.prefix).Msg("logging disabled") + } i.enabled.Store(false) } func (i *Logger) Enable() { i.enabled.Store(true) + if i.enabled.Load() && i.log != nil { + i.log.Debug().Str("device", i.prefix).Msg("logging enabled") + } } func (i *Logger) ReadAt(buffer []byte, offset int64) (int, error) { n, err := i.prov.ReadAt(buffer, offset) - if i.enabled.Load() { - fmt.Printf("%v: %s ReadAt(%d, offset=%d) -> %d, %v\n", time.Now(), i.prefix, len(buffer), offset, n, err) + if i.enabled.Load() && i.log != nil { + i.log.Debug(). + Str("device", i.prefix). + Int("length", len(buffer)). + Int64("offset", offset). + Int("n", n). + Err(err). + Msg("ReadAt") } return n, err } func (i *Logger) WriteAt(buffer []byte, offset int64) (int, error) { n, err := i.prov.WriteAt(buffer, offset) - if i.enabled.Load() { - fmt.Printf("%v: %s WriteAt(%d, offset=%d) -> %d, %v\n", time.Now(), i.prefix, len(buffer), offset, n, err) + if i.enabled.Load() && i.log != nil { + i.log.Debug(). + Str("device", i.prefix). + Int("length", len(buffer)). + Int64("offset", offset). + Int("n", n). + Err(err). + Msg("WriteAt") } return n, err } func (i *Logger) Flush() error { - return i.prov.Flush() + err := i.prov.Flush() + if i.enabled.Load() && i.log != nil { + i.log.Debug(). + Str("device", i.prefix). + Err(err). + Msg("Flush") + } + return err } func (i *Logger) Size() uint64 { @@ -63,10 +89,23 @@ func (i *Logger) Size() uint64 { } func (i *Logger) Close() error { - return i.prov.Close() + err := i.prov.Close() + if i.enabled.Load() && i.log != nil { + i.log.Debug(). + Str("device", i.prefix). + Err(err). + Msg("Close") + } + return err } func (i *Logger) CancelWrites(offset int64, length int64) { i.prov.CancelWrites(offset, length) - // TODO: Implement + if i.enabled.Load() && i.log != nil { + i.log.Debug(). + Str("device", i.prefix). + Int64("offset", offset). + Int64("length", length). + Msg("CancelWrites") + } } diff --git a/pkg/storage/storage_events_test.go b/pkg/storage/storage_events_test.go index c7514186..4bb16958 100644 --- a/pkg/storage/storage_events_test.go +++ b/pkg/storage/storage_events_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/loopholelabs/logging" + "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/dirtytracker" "github.com/loopholelabs/silo/pkg/storage/modules" @@ -226,7 +228,7 @@ func TestStorageEventsForModules(tt *testing.T) { addModule(mod7) mod8 := modules.NewLockable(mod7) addModule(mod8) - mod9 := modules.NewLogger(mod8, "prefix") + mod9 := modules.NewLogger(mod8, "prefix", logging.New(logging.Zerolog, "silo", os.Stdout)) addModule(mod9) mod10 := modules.NewMetrics(mod9) addModule(mod10) diff --git a/pkg/storage/waitingcache/waiting_cache.go b/pkg/storage/waitingcache/waiting_cache.go index e2b4da47..38a7025f 100644 --- a/pkg/storage/waitingcache/waiting_cache.go +++ b/pkg/storage/waitingcache/waiting_cache.go @@ -3,6 +3,8 @@ package waitingcache import ( "sync" + "github.com/google/uuid" + "github.com/loopholelabs/logging/types" "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/util" ) @@ -14,6 +16,8 @@ import ( * */ type WaitingCache struct { + logger types.RootLogger + uuid uuid.UUID prov storage.Provider local *Local remote *Remote @@ -26,8 +30,14 @@ type WaitingCache struct { } func NewWaitingCache(prov storage.Provider, blockSize int) (*Local, *Remote) { + return NewWaitingCacheWithLogger(prov, blockSize, nil) +} + +func NewWaitingCacheWithLogger(prov storage.Provider, blockSize int, log types.RootLogger) (*Local, *Remote) { numBlocks := (int(prov.Size()) + blockSize - 1) / blockSize wc := &WaitingCache{ + logger: log, + uuid: uuid.New(), prov: prov, blockSize: blockSize, size: prov.Size(), @@ -55,6 +65,17 @@ func (i *WaitingCache) waitForBlocks(bStart uint, bEnd uint, lockCB func(b uint) } func (i *WaitingCache) waitForBlock(b uint, lockCB func(b uint)) { + if i.logger != nil { + i.logger.Trace(). + Str("uuid", i.uuid.String()). + Uint("block", b). + Msg("waitForBlock") + defer i.logger.Trace(). + Str("uuid", i.uuid.String()). + Uint("block", b). + Msg("waitForBlock complete") + } + // If we have it locally, return. if i.local.available.BitSet(int(b)) { return @@ -80,8 +101,14 @@ func (i *WaitingCache) waitForBlock(b uint, lockCB func(b uint)) { rwl.RLock() } -// TODO: Fix the logic here a bit func (i *WaitingCache) markAvailableBlockLocal(b uint) { + if i.logger != nil { + i.logger.Trace(). + Str("uuid", i.uuid.String()). + Uint("block", b). + Msg("markAvailableLocalBlock") + } + i.lockersLock.Lock() avail := i.local.available.BitSet(int(b)) rwl, ok := i.lockers[b] @@ -108,6 +135,13 @@ func (i *WaitingCache) markAvailableRemoteBlocks(bStart uint, bEnd uint) { } func (i *WaitingCache) markAvailableRemoteBlock(b uint) { + if i.logger != nil { + i.logger.Trace(). + Str("uuid", i.uuid.String()). + Uint("block", b). + Msg("markAvailableRemoteBlock") + } + i.lockersLock.Lock() avail := i.remote.available.BitSet(int(b)) rwl, ok := i.lockers[b] @@ -126,6 +160,13 @@ func (i *WaitingCache) markAvailableRemoteBlock(b uint) { } func (i *WaitingCache) markUnavailableRemoteBlock(b uint) { + if i.logger != nil { + i.logger.Trace(). + Str("uuid", i.uuid.String()). + Uint("block", b). + Msg("markUnavailableRemoteBlock") + } + i.lockersLock.Lock() avail := i.remote.available.BitSet(int(b)) if avail { diff --git a/pkg/storage/waitingcache/waiting_cache_local.go b/pkg/storage/waitingcache/waiting_cache_local.go index a752161b..bae8f9ec 100644 --- a/pkg/storage/waitingcache/waiting_cache_local.go +++ b/pkg/storage/waitingcache/waiting_cache_local.go @@ -20,6 +20,19 @@ func (wcl *Local) SendSiloEvent(eventType storage.EventType, eventData storage.E } func (wcl *Local) ReadAt(buffer []byte, offset int64) (int, error) { + if wcl.wc.logger != nil { + wcl.wc.logger.Trace(). + Str("uuid", wcl.wc.uuid.String()). + Int64("offset", offset). + Int("length", len(buffer)). + Msg("local ReadAt") + defer wcl.wc.logger.Trace(). + Str("uuid", wcl.wc.uuid.String()). + Int64("offset", offset). + Int("length", len(buffer)). + Msg("local ReadAt complete") + } + end := uint64(offset + int64(len(buffer))) if end > wcl.wc.size { end = wcl.wc.size @@ -38,6 +51,19 @@ func (wcl *Local) ReadAt(buffer []byte, offset int64) (int, error) { } func (wcl *Local) WriteAt(buffer []byte, offset int64) (int, error) { + if wcl.wc.logger != nil { + wcl.wc.logger.Trace(). + Str("uuid", wcl.wc.uuid.String()). + Int64("offset", offset). + Int("length", len(buffer)). + Msg("local WriteAt") + defer wcl.wc.logger.Trace(). + Str("uuid", wcl.wc.uuid.String()). + Int64("offset", offset). + Int("length", len(buffer)). + Msg("local WriteAt complete") + } + end := uint64(offset + int64(len(buffer))) if end > wcl.wc.size { end = wcl.wc.size diff --git a/pkg/storage/waitingcache/waiting_cache_remote.go b/pkg/storage/waitingcache/waiting_cache_remote.go index 2bc9adf6..f408bc57 100644 --- a/pkg/storage/waitingcache/waiting_cache_remote.go +++ b/pkg/storage/waitingcache/waiting_cache_remote.go @@ -14,34 +14,47 @@ type Remote struct { } // Relay events to embedded StorageProvider -func (wcl *Remote) SendSiloEvent(eventType storage.EventType, eventData storage.EventData) []storage.EventReturnData { - data := wcl.ProviderWithEvents.SendSiloEvent(eventType, eventData) - return append(data, storage.SendSiloEvent(wcl.wc.prov, eventType, eventData)...) +func (wcr *Remote) SendSiloEvent(eventType storage.EventType, eventData storage.EventData) []storage.EventReturnData { + data := wcr.ProviderWithEvents.SendSiloEvent(eventType, eventData) + return append(data, storage.SendSiloEvent(wcr.wc.prov, eventType, eventData)...) } -func (wcl *Remote) ReadAt(_ []byte, _ int64) (int, error) { +func (wcr *Remote) ReadAt(_ []byte, _ int64) (int, error) { // Remote reads are unsupported at the moment. return 0, io.EOF } -func (wcl *Remote) WriteAt(buffer []byte, offset int64) (int, error) { +func (wcr *Remote) WriteAt(buffer []byte, offset int64) (int, error) { + if wcr.wc.logger != nil { + wcr.wc.logger.Trace(). + Str("uuid", wcr.wc.uuid.String()). + Int64("offset", offset). + Int("length", len(buffer)). + Msg("remote WriteAt") + defer wcr.wc.logger.Trace(). + Str("uuid", wcr.wc.uuid.String()). + Int64("offset", offset). + Int("length", len(buffer)). + Msg("remote WriteAt complete") + } + end := uint64(offset + int64(len(buffer))) - if end > wcl.wc.size { - end = wcl.wc.size + if end > wcr.wc.size { + end = wcr.wc.size } - bStart := uint(offset / int64(wcl.wc.blockSize)) - bEnd := uint((end-1)/uint64(wcl.wc.blockSize)) + 1 + bStart := uint(offset / int64(wcr.wc.blockSize)) + bEnd := uint((end-1)/uint64(wcr.wc.blockSize)) + 1 align := 0 // If the first block is incomplete, we won't mark it. - if offset > (int64(bStart) * int64(wcl.wc.blockSize)) { + if offset > (int64(bStart) * int64(wcr.wc.blockSize)) { bStart++ - align = int(offset - (int64(bStart) * int64(wcl.wc.blockSize))) + align = int(offset - (int64(bStart) * int64(wcr.wc.blockSize))) } // If the last block is incomplete, we won't mark it. *UNLESS* It's the last block in the storage - if (end % uint64(wcl.wc.blockSize)) > 0 { - if uint64(offset)+uint64(len(buffer)) < wcl.wc.size { + if (end % uint64(wcr.wc.blockSize)) > 0 { + if uint64(offset)+uint64(len(buffer)) < wcr.wc.size { bEnd-- } } @@ -49,19 +62,19 @@ func (wcl *Remote) WriteAt(buffer []byte, offset int64) (int, error) { var err error var n int - if wcl.wc.allowLocalWrites { + if wcr.wc.allowLocalWrites { // Check if we have local data that needs merging (From local writes) - avail := wcl.wc.local.available.Collect(bStart, bEnd) + avail := wcr.wc.local.available.Collect(bStart, bEnd) if len(avail) != 0 { pbuffer := make([]byte, len(buffer)) - _, err = wcl.wc.prov.ReadAt(pbuffer, offset) + _, err = wcr.wc.prov.ReadAt(pbuffer, offset) if err == nil { for _, b := range avail { - s := align + (int(b-bStart) * wcl.wc.blockSize) + s := align + (int(b-bStart) * wcr.wc.blockSize) // Merge the data in. We know these are complete blocks. // NB This does modify the callers buffer. - copy(buffer[s:s+wcl.wc.blockSize], pbuffer[s:s+wcl.wc.blockSize]) + copy(buffer[s:s+wcr.wc.blockSize], pbuffer[s:s+wcr.wc.blockSize]) } } } @@ -69,31 +82,31 @@ func (wcl *Remote) WriteAt(buffer []byte, offset int64) (int, error) { // Perform the WriteAt if err == nil { - n, err = wcl.wc.prov.WriteAt(buffer, offset) + n, err = wcr.wc.prov.WriteAt(buffer, offset) } // Signal that we have blocks available from remote if err == nil { if bEnd > bStart { - wcl.wc.markAvailableRemoteBlocks(bStart, bEnd) + wcr.wc.markAvailableRemoteBlocks(bStart, bEnd) } } return n, err } -func (wcl *Remote) Flush() error { - return wcl.wc.prov.Flush() +func (wcr *Remote) Flush() error { + return wcr.wc.prov.Flush() } -func (wcl *Remote) Size() uint64 { - return wcl.wc.prov.Size() +func (wcr *Remote) Size() uint64 { + return wcr.wc.prov.Size() } -func (wcl *Remote) Close() error { - return wcl.wc.prov.Close() +func (wcr *Remote) Close() error { + return wcr.wc.prov.Close() } -func (wcl *Remote) CancelWrites(offset int64, length int64) { - wcl.wc.prov.CancelWrites(offset, length) +func (wcr *Remote) CancelWrites(offset int64, length int64) { + wcr.wc.prov.CancelWrites(offset, length) } diff --git a/pkg/testutils/logging_output.go b/pkg/testutils/logging_output.go new file mode 100644 index 00000000..f18100ff --- /dev/null +++ b/pkg/testutils/logging_output.go @@ -0,0 +1,29 @@ +package testutils + +import ( + "bytes" + "sync" +) + +type SafeWriteBuffer struct { + bufferLock sync.Mutex + buffer bytes.Buffer +} + +func (swb *SafeWriteBuffer) Write(p []byte) (n int, err error) { + swb.bufferLock.Lock() + defer swb.bufferLock.Unlock() + return swb.buffer.Write(p) +} + +func (swb *SafeWriteBuffer) Bytes() []byte { + swb.bufferLock.Lock() + defer swb.bufferLock.Unlock() + return swb.buffer.Bytes() +} + +func (swb *SafeWriteBuffer) Len() int { + swb.bufferLock.Lock() + defer swb.bufferLock.Unlock() + return swb.buffer.Len() +}