diff --git a/cmd/connect.go b/cmd/connect.go index 397661af..81eafc73 100644 --- a/cmd/connect.go +++ b/cmd/connect.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net" + "net/http" "os" "os/exec" "os/signal" @@ -13,15 +14,21 @@ import ( "syscall" "time" + "github.com/loopholelabs/logging" + "github.com/loopholelabs/logging/types" "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/config" "github.com/loopholelabs/silo/pkg/storage/expose" "github.com/loopholelabs/silo/pkg/storage/integrity" + "github.com/loopholelabs/silo/pkg/storage/metrics" "github.com/loopholelabs/silo/pkg/storage/modules" "github.com/loopholelabs/silo/pkg/storage/protocol" "github.com/loopholelabs/silo/pkg/storage/protocol/packets" "github.com/loopholelabs/silo/pkg/storage/sources" "github.com/loopholelabs/silo/pkg/storage/waitingcache" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" "github.com/fatih/color" @@ -49,6 +56,10 @@ var connectMountDev bool var connectProgress bool +var connectDebug bool + +var connectMetrics string + // List of ExposedStorage so they can be cleaned up on exit. var dstExposed []storage.ExposedStorage @@ -63,6 +74,8 @@ func init() { cmdConnect.Flags().BoolVarP(&connectExposeDev, "expose", "e", false, "Expose as an nbd devices") cmdConnect.Flags().BoolVarP(&connectMountDev, "mount", "m", false, "Mount the nbd devices") cmdConnect.Flags().BoolVarP(&connectProgress, "progress", "p", false, "Show progress") + cmdConnect.Flags().BoolVarP(&connectDebug, "debug", "d", false, "Debug logging (trace)") + cmdConnect.Flags().StringVarP(&connectMetrics, "metrics", "M", "", "Prom metrics address") } /** @@ -70,6 +83,39 @@ func init() { * */ func runConnect(_ *cobra.Command, _ []string) { + var log types.RootLogger + var reg *prometheus.Registry + var siloMetrics *metrics.Metrics + + if connectDebug { + log = logging.New(logging.Zerolog, "silo.connect", os.Stderr) + log.SetLevel(types.TraceLevel) + } + + if connectMetrics != "" { + reg = prometheus.NewRegistry() + + siloMetrics = metrics.New(reg) + + // Add the default go metrics + reg.MustRegister( + collectors.NewGoCollector(), + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), + ) + + http.Handle("/metrics", promhttp.HandlerFor( + reg, + promhttp.HandlerOpts{ + // Opt into OpenMetrics to support exemplars. + EnableOpenMetrics: true, + // Pass custom registry + Registry: reg, + }, + )) + + go http.ListenAndServe(connectMetrics, nil) + } + if connectProgress { dstProgress = mpb.New( mpb.WithOutput(color.Output), @@ -107,6 +153,10 @@ func runConnect(_ *cobra.Command, _ []string) { protoCtx, protoCancelfn := context.WithCancel(context.TODO()) + handleIncomingDevice := func(ctx context.Context, pro protocol.Protocol, dev uint32) { + handleIncomingDeviceWithLogging(ctx, pro, dev, log, siloMetrics) + } + pro := protocol.NewRW(protoCtx, []io.Reader{con}, []io.Writer{con}, handleIncomingDevice) // Let the protocol do its thing. @@ -121,12 +171,26 @@ func runConnect(_ *cobra.Command, _ []string) { protoCancelfn() }() + if siloMetrics != nil { + siloMetrics.AddProtocol("protocol", pro) + } + dstWG.Wait() // Wait until the migrations have completed... if connectProgress { dstProgress.Wait() } + if log != nil { + metrics := pro.GetMetrics() + log.Debug(). + Uint64("PacketsSent", metrics.PacketsSent). + Uint64("DataSent", metrics.DataSent). + Uint64("PacketsRecv", metrics.PacketsRecv). + Uint64("DataRecv", metrics.DataRecv). + Msg("protocol metrics") + } + fmt.Printf("\nMigrations completed. Please ctrl-c if you want to shut down, or wait an hour :)\n") // We should pause here, to allow the user to do things with the devices @@ -139,7 +203,7 @@ func runConnect(_ *cobra.Command, _ []string) { } // Handle a new incoming device. This is called when a packet is received for a device we haven't heard about before. -func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32) { +func handleIncomingDeviceWithLogging(ctx context.Context, pro protocol.Protocol, dev uint32, log types.RootLogger, met *metrics.Metrics) { var destStorage storage.Provider var destWaitingLocal *waitingcache.Local var destWaitingRemote *waitingcache.Remote @@ -151,6 +215,7 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32 var bar *mpb.Bar var blockSize uint + var deviceName string var statusString = " " var statusVerify = " " @@ -174,6 +239,7 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32 } blockSize = uint(di.BlockSize) + deviceName = di.Name statusFn := func(_ decor.Statistics) string { return statusString + statusVerify @@ -267,6 +333,10 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32 dest = protocol.NewFromProtocol(ctx, dev, storageFactory, pro) + if met != nil { + met.AddFromProtocol(deviceName, dest) + } + var handlerWG sync.WaitGroup handlerWG.Add(1) @@ -304,6 +374,42 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32 // Check we have all data... case packets.EventCompleted: + if log != nil { + m := destWaitingLocal.GetMetrics() + log.Debug(). + Uint64("WaitForBlock", m.WaitForBlock). + Uint64("WaitForBlockHadRemote", m.WaitForBlockHadRemote). + Uint64("WaitForBlockHadLocal", m.WaitForBlockHadLocal). + Uint64("WaitForBlockTimeMS", uint64(m.WaitForBlockTime.Milliseconds())). + Uint64("WaitForBlockLock", m.WaitForBlockLock). + Uint64("WaitForBlockLockDone", m.WaitForBlockLockDone). + Uint64("MarkAvailableLocalBlock", m.MarkAvailableLocalBlock). + Uint64("MarkAvailableRemoteBlock", m.MarkAvailableRemoteBlock). + Uint64("AvailableLocal", m.AvailableLocal). + Uint64("AvailableRemote", m.AvailableRemote). + Str("name", deviceName). + Msg("waitingCacheMetrics") + + fromMetrics := dest.GetMetrics() + log.Debug(). + Uint64("RecvEvents", fromMetrics.RecvEvents). + Uint64("RecvHashes", fromMetrics.RecvHashes). + Uint64("RecvDevInfo", fromMetrics.RecvDevInfo). + Uint64("RecvAltSources", fromMetrics.RecvAltSources). + Uint64("RecvReadAt", fromMetrics.RecvReadAt). + Uint64("RecvWriteAtHash", fromMetrics.RecvWriteAtHash). + Uint64("RecvWriteAtComp", fromMetrics.RecvWriteAtComp). + Uint64("RecvWriteAt", fromMetrics.RecvWriteAt). + Uint64("RecvWriteAtWithMap", fromMetrics.RecvWriteAtWithMap). + Uint64("RecvRemoveFromMap", fromMetrics.RecvRemoveFromMap). + Uint64("RecvRemoveDev", fromMetrics.RecvRemoveDev). + Uint64("RecvDirtyList", fromMetrics.RecvDirtyList). + Uint64("SentNeedAt", fromMetrics.SentNeedAt). + Uint64("SentDontNeedAt", fromMetrics.SentDontNeedAt). + Str("name", deviceName). + Msg("fromProtocolMetrics") + } + // We completed the migration, but we should wait for handlers to finish before we ok things... // fmt.Printf("Completed, now wait for handlers...\n") go func() { diff --git a/cmd/serve.go b/cmd/serve.go index 9ccdf3e5..19d6a746 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net" + "net/http" "os" "os/signal" "sync" @@ -19,11 +20,16 @@ import ( "github.com/loopholelabs/silo/pkg/storage/config" "github.com/loopholelabs/silo/pkg/storage/device" "github.com/loopholelabs/silo/pkg/storage/dirtytracker" + "github.com/loopholelabs/silo/pkg/storage/expose" + "github.com/loopholelabs/silo/pkg/storage/metrics" "github.com/loopholelabs/silo/pkg/storage/migrator" "github.com/loopholelabs/silo/pkg/storage/modules" "github.com/loopholelabs/silo/pkg/storage/protocol" "github.com/loopholelabs/silo/pkg/storage/protocol/packets" "github.com/loopholelabs/silo/pkg/storage/volatilitymonitor" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" "github.com/vbauerster/mpb/v8" "github.com/vbauerster/mpb/v8/decor" @@ -43,6 +49,9 @@ var serveConf string var serveProgress bool var serveContinuous bool var serveAnyOrder bool +var serveCompress bool + +var serveMetrics string var srcExposed []storage.ExposedStorage var srcStorage []*storageInfo @@ -60,6 +69,8 @@ func init() { cmdServe.Flags().BoolVarP(&serveContinuous, "continuous", "C", false, "Continuous sync") cmdServe.Flags().BoolVarP(&serveAnyOrder, "order", "o", false, "Any order (faster)") cmdServe.Flags().BoolVarP(&serveDebug, "debug", "d", false, "Debug logging (trace)") + cmdServe.Flags().StringVarP(&serveMetrics, "metrics", "m", "", "Prom metrics address") + cmdServe.Flags().BoolVarP(&serveCompress, "compress", "x", false, "Compress") } type storageInfo struct { @@ -75,11 +86,38 @@ type storageInfo struct { func runServe(_ *cobra.Command, _ []string) { var log types.RootLogger + var reg *prometheus.Registry + var siloMetrics *metrics.Metrics + if serveDebug { log = logging.New(logging.Zerolog, "silo.serve", os.Stderr) log.SetLevel(types.TraceLevel) } + if serveMetrics != "" { + reg = prometheus.NewRegistry() + + siloMetrics = metrics.New(reg) + + // Add the default go metrics + reg.MustRegister( + collectors.NewGoCollector(), + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), + ) + + http.Handle("/metrics", promhttp.HandlerFor( + reg, + promhttp.HandlerOpts{ + // Opt into OpenMetrics to support exemplars. + EnableOpenMetrics: true, + // Pass custom registry + Registry: reg, + }, + )) + + go http.ListenAndServe(serveMetrics, nil) + } + if serveProgress { serveProgressBar = mpb.New( mpb.WithOutput(color.Output), @@ -107,7 +145,7 @@ func runServe(_ *cobra.Command, _ []string) { for i, s := range siloConf.Device { fmt.Printf("Setup storage %d [%s] size %s - %d\n", i, s.Name, s.Size, s.ByteSize()) - sinfo, err := setupStorageDevice(s, log) + sinfo, err := setupStorageDevice(s, log, siloMetrics) if err != nil { panic(fmt.Sprintf("Could not setup storage. %v", err)) } @@ -136,6 +174,10 @@ func runServe(_ *cobra.Command, _ []string) { _ = pro.Handle() }() + if siloMetrics != nil { + siloMetrics.AddProtocol("serve", pro) + } + // Lets go through each of the things we want to migrate... ctime := time.Now() @@ -144,7 +186,7 @@ func runServe(_ *cobra.Command, _ []string) { for i, s := range srcStorage { wg.Add(1) go func(index int, src *storageInfo) { - err := migrateDevice(log, uint32(index), src.name, pro, src) + err := migrateDevice(log, siloMetrics, uint32(index), src.name, pro, src) if err != nil { fmt.Printf("There was an issue migrating the storage %d %v\n", index, err) } @@ -158,12 +200,22 @@ func runServe(_ *cobra.Command, _ []string) { } fmt.Printf("\n\nMigration completed in %dms\n", time.Since(ctime).Milliseconds()) + if log != nil { + metrics := pro.GetMetrics() + log.Debug(). + Uint64("PacketsSent", metrics.PacketsSent). + Uint64("DataSent", metrics.DataSent). + Uint64("PacketsRecv", metrics.PacketsRecv). + Uint64("DataRecv", metrics.DataRecv). + Msg("protocol metrics") + } + con.Close() } shutdownEverything(log) } -func shutdownEverything(_ types.RootLogger) { +func shutdownEverything(log types.RootLogger) { // first unlock everything fmt.Printf("Unlocking devices...\n") for _, i := range srcStorage { @@ -177,11 +229,30 @@ func shutdownEverything(_ types.RootLogger) { fmt.Printf("Shutdown nbd device %s\n", device) _ = p.Shutdown() + + // Show some metrics... + if log != nil { + nbdDevice, ok := p.(*expose.ExposedStorageNBDNL) + if ok { + m := nbdDevice.GetMetrics() + log.Debug(). + Uint64("PacketsIn", m.PacketsIn). + Uint64("PacketsOut", m.PacketsOut). + Uint64("ReadAt", m.ReadAt). + Uint64("ReadAtBytes", m.ReadAtBytes). + Uint64("ReadAtTimeMS", uint64(m.ReadAtTime.Milliseconds())). + Uint64("WriteAt", m.WriteAt). + Uint64("WriteAtBytes", m.WriteAtBytes). + Uint64("WriteAtTimeMS", uint64(m.WriteAtTime.Milliseconds())). + Str("device", p.Device()). + Msg("NBD metrics") + } + } } } -func setupStorageDevice(conf *config.DeviceSchema, log types.RootLogger) (*storageInfo, error) { - source, ex, err := device.NewDeviceWithLogging(conf, log) +func setupStorageDevice(conf *config.DeviceSchema, log types.RootLogger, met *metrics.Metrics) (*storageInfo, error) { + source, ex, err := device.NewDeviceWithLoggingMetrics(conf, log, met) if err != nil { return nil, err } @@ -203,6 +274,12 @@ func setupStorageDevice(conf *config.DeviceSchema, log types.RootLogger) (*stora sourceMonitor := volatilitymonitor.NewVolatilityMonitor(sourceDirtyLocal, blockSize, 10*time.Second) sourceStorage := modules.NewLockable(sourceMonitor) + if met != nil { + met.AddDirtyTracker(conf.Name, sourceDirtyRemote) + met.AddVolatilityMonitor(conf.Name, sourceMonitor) + met.AddMetrics(conf.Name, sourceMetrics) + } + if ex != nil { ex.SetProvider(sourceStorage) } @@ -234,12 +311,15 @@ func setupStorageDevice(conf *config.DeviceSchema, log types.RootLogger) (*stora } // Migrate a device -func migrateDevice(log types.RootLogger, devID uint32, name string, +func migrateDevice(log types.RootLogger, met *metrics.Metrics, devID uint32, name string, pro protocol.Protocol, sinfo *storageInfo) error { size := sinfo.lockable.Size() dest := protocol.NewToProtocol(size, devID, pro) + // Maybe compress writes + dest.CompressedWrites = serveCompress + err := dest.SendDevInfo(name, uint32(sinfo.blockSize), sinfo.schema) if err != nil { return err @@ -316,7 +396,7 @@ func migrateDevice(log types.RootLogger, devID uint32, name string, _ = dest.SendEvent(&packets.Event{Type: packets.EventPostUnlock}) } conf.Concurrency = map[int]int{ - storage.BlockTypeAny: 1000000, + storage.BlockTypeAny: 1000, } conf.ErrorHandler = func(_ *storage.BlockInfo, err error) { // For now... @@ -357,6 +437,11 @@ func migrateDevice(log types.RootLogger, devID uint32, name string, return err } + if met != nil { + met.AddToProtocol(name, dest) + met.AddMigrator(name, mig) + } + migrateBlocks := sinfo.numBlocks // Now do the migration... @@ -420,5 +505,26 @@ func migrateDevice(log types.RootLogger, devID uint32, name string, // bar.EwmaIncrInt64(int64(size-last_value), time.Since(last_time)) } */ + + if log != nil { + toMetrics := dest.GetMetrics() + log.Debug(). + Str("name", name). + Uint64("SentEvents", toMetrics.SentEvents). + Uint64("SentHashes", toMetrics.SentHashes). + Uint64("SentDevInfo", toMetrics.SentDevInfo). + Uint64("SentRemoveDev", toMetrics.SentRemoveDev). + Uint64("SentDirtyList", toMetrics.SentDirtyList). + Uint64("SentReadAt", toMetrics.SentReadAt). + Uint64("SentWriteAtHash", toMetrics.SentWriteAtHash). + Uint64("SentWriteAtComp", toMetrics.SentWriteAtComp). + Uint64("SentWriteAt", toMetrics.SentWriteAt). + Uint64("SentWriteAtWithMap", toMetrics.SentWriteAtWithMap). + Uint64("SentRemoveFromMap", toMetrics.SentRemoveFromMap). + Uint64("SentNeedAt", toMetrics.RecvNeedAt). + Uint64("SentDontNeedAt", toMetrics.RecvDontNeedAt). + Msg("ToProtocol metrics") + } + return nil } diff --git a/go.mod b/go.mod index cc13ed05..945f3df5 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,9 @@ require ( github.com/agext/levenshtein v1.2.1 // indirect github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect + github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/containerd/continuity v0.4.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/cli v26.1.4+incompatible // indirect @@ -57,11 +59,16 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/term v0.5.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/opencontainers/runc v1.1.13 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_golang v1.20.5 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/rs/xid v1.6.0 // indirect @@ -78,6 +85,7 @@ require ( golang.org/x/sync v0.8.0 // indirect golang.org/x/text v0.17.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 9fbbbed6..33adfd05 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,12 @@ github.com/apparentlymart/go-textseg/v13 v13.0.0 h1:Y+KvPE1NYz0xl601PVImeQfFyEy6 github.com/apparentlymart/go-textseg/v13 v13.0.0/go.mod h1:ZK2fH7c4NqDTLtiYLvIkEghdlcqw7yxLeM89kiTRPUo= github.com/apparentlymart/go-textseg/v15 v15.0.0 h1:uYvfpb3DyLSCGWnctWKGj857c6ew1u1fNQOlOtuGxQY= github.com/apparentlymart/go-textseg/v15 v15.0.0/go.mod h1:K8XmNZdhEBkdlyDdvbmmsvpAG721bKi0joRfFdHIWJ4= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/containerd/continuity v0.4.3 h1:6HVkalIp+2u1ZLH1J/pYX2oBVXlJZvh1X1A7bEZ9Su8= github.com/containerd/continuity v0.4.3/go.mod h1:F6PTNCKepoxEaXLQp3wDAjygEnImnZ/7o4JzpodfroQ= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= @@ -111,6 +115,8 @@ github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3N github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= @@ -124,6 +130,14 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= +github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= +github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= @@ -207,6 +221,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/pkg/storage/device/device.go b/pkg/storage/device/device.go index e53c7e79..a66ef7e0 100644 --- a/pkg/storage/device/device.go +++ b/pkg/storage/device/device.go @@ -18,6 +18,7 @@ import ( "github.com/loopholelabs/silo/pkg/storage/config" "github.com/loopholelabs/silo/pkg/storage/dirtytracker" "github.com/loopholelabs/silo/pkg/storage/expose" + "github.com/loopholelabs/silo/pkg/storage/metrics" "github.com/loopholelabs/silo/pkg/storage/migrator" "github.com/loopholelabs/silo/pkg/storage/modules" "github.com/loopholelabs/silo/pkg/storage/protocol/packets" @@ -32,6 +33,8 @@ const ( DefaultBlockSize = 4096 ) +var syncConcurrency = map[int]int{storage.BlockTypeAny: 1000} + type Device struct { Provider storage.Provider Exposed storage.ExposedStorage @@ -68,6 +71,11 @@ func NewDevice(ds *config.DeviceSchema) (storage.Provider, storage.ExposedStorag } func NewDeviceWithLogging(ds *config.DeviceSchema, log types.RootLogger) (storage.Provider, storage.ExposedStorage, error) { + return NewDeviceWithLoggingMetrics(ds, log, nil) +} + +func NewDeviceWithLoggingMetrics(ds *config.DeviceSchema, log types.RootLogger, met *metrics.Metrics) (storage.Provider, storage.ExposedStorage, error) { + if log != nil { log.Debug().Str("name", ds.Name).Msg("creating new device") } @@ -257,6 +265,9 @@ func NewDeviceWithLogging(ds *config.DeviceSchema, log types.RootLogger) (storag prov.Close() return nil, nil, err } + if log != nil { + log.Debug().Str("name", ds.Name).Str("device", ex.Device()).Msg("device exposed as nbd device") + } } // Optionally sync the device to S3 @@ -279,6 +290,10 @@ func NewDeviceWithLogging(ds *config.DeviceSchema, log types.RootLogger) (storag return nil, nil, err } + if met != nil { + met.AddS3Storage(ds.Name, s3dest) + } + dirtyBlockSize := bs >> ds.Sync.Config.BlockShift numBlocks := (int(prov.Size()) + bs - 1) / bs @@ -306,6 +321,7 @@ func NewDeviceWithLogging(ds *config.DeviceSchema, log types.RootLogger) (storag // Start doing the sync... syncer := migrator.NewSyncer(ctx, &migrator.SyncConfig{ + Concurrency: syncConcurrency, Logger: log, Name: ds.Name, Integrity: false, diff --git a/pkg/storage/device/device_sync_test.go b/pkg/storage/device/device_sync_test.go index 982236a8..dbde3e9d 100644 --- a/pkg/storage/device/device_sync_test.go +++ b/pkg/storage/device/device_sync_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/loopholelabs/logging" + "github.com/loopholelabs/logging/types" "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/config" "github.com/loopholelabs/silo/pkg/storage/protocol/packets" @@ -49,7 +51,12 @@ func TestDeviceSync(t *testing.T) { s := new(config.SiloSchema) err := s.Decode([]byte(testSyncSchema)) assert.NoError(t, err) - devs, err := NewDevices(s.Device) + + logBuffer := &testutils.SafeWriteBuffer{} + l := logging.New(logging.Zerolog, "device", logBuffer) + l.SetLevel(types.TraceLevel) + + devs, err := NewDevicesWithLogging(s.Device, l) assert.NoError(t, err) t.Cleanup(func() { os.Remove("./testdata/testfile_sync") @@ -119,6 +126,8 @@ func TestDeviceSync(t *testing.T) { assert.Equal(t, false, storage.SendSiloEvent(prov, "sync.running", nil)[0].(bool)) prov.Close() + + assert.Greater(t, logBuffer.Len(), 0) } func TestDeviceSyncClose(t *testing.T) { diff --git a/pkg/storage/dirtytracker/dirty_tracker.go b/pkg/storage/dirtytracker/dirty_tracker.go index f1ab5bea..2b552907 100644 --- a/pkg/storage/dirtytracker/dirty_tracker.go +++ b/pkg/storage/dirtytracker/dirty_tracker.go @@ -28,6 +28,26 @@ type DirtyTracker struct { writeLock sync.RWMutex } +type Metrics struct { + BlockSize uint64 + Size uint64 + TrackingBlocks uint64 + DirtyBlocks uint64 + MaxAgeDirty time.Duration +} + +func (dtr *Remote) GetMetrics() *Metrics { + minAge := dtr.MeasureDirtyAge() + + return &Metrics{ + BlockSize: uint64(dtr.dt.blockSize), + Size: dtr.dt.size, + TrackingBlocks: uint64(dtr.dt.tracking.Count(0, dtr.dt.tracking.Length())), + DirtyBlocks: uint64(dtr.dt.dirtyLog.Count(0, dtr.dt.dirtyLog.Length())), + MaxAgeDirty: time.Since(minAge), + } +} + type Local struct { storage.ProviderWithEvents dt *DirtyTracker diff --git a/pkg/storage/expose/nbd.go b/pkg/storage/expose/nbd.go index 7f3379fd..d9daba1b 100644 --- a/pkg/storage/expose/nbd.go +++ b/pkg/storage/expose/nbd.go @@ -129,6 +129,14 @@ func NewExposedStorageNBDNL(prov storage.Provider, conf *Config) *ExposedStorage } } +func (n *ExposedStorageNBDNL) GetMetrics() *DispatchMetrics { + dm := &DispatchMetrics{} + for _, d := range n.dispatchers { + dm.Add(d.GetMetrics()) + } + return dm +} + func (n *ExposedStorageNBDNL) SetProvider(prov storage.Provider) { n.provLock.Lock() n.prov = prov @@ -179,7 +187,6 @@ func (n *ExposedStorageNBDNL) Device() string { func (n *ExposedStorageNBDNL) Init() error { for { - socks := make([]*os.File, 0) n.dispatchers = make([]*Dispatch, 0) diff --git a/pkg/storage/expose/nbd_dispatch.go b/pkg/storage/expose/nbd_dispatch.go index d5b15cff..12b8095b 100644 --- a/pkg/storage/expose/nbd_dispatch.go +++ b/pkg/storage/expose/nbd_dispatch.go @@ -6,6 +6,8 @@ import ( "fmt" "io" "sync" + "sync/atomic" + "time" "github.com/loopholelabs/logging/types" "github.com/loopholelabs/silo/pkg/storage" @@ -63,19 +65,47 @@ type Response struct { } type Dispatch struct { - logger types.RootLogger - dev string - ctx context.Context - asyncReads bool - asyncWrites bool - fp io.ReadWriteCloser - responseHeader []byte - writeLock sync.Mutex - prov storage.Provider - fatal chan error - pendingResponses sync.WaitGroup - metricPacketsIn uint64 - metricPacketsOut uint64 + logger types.RootLogger + dev string + ctx context.Context + asyncReads bool + asyncWrites bool + fp io.ReadWriteCloser + responseHeader []byte + writeLock sync.Mutex + prov storage.Provider + fatal chan error + pendingResponses sync.WaitGroup + metricPacketsIn uint64 + metricPacketsOut uint64 + metricReadAt uint64 + metricReadAtBytes uint64 + metricReadAtTime uint64 + metricWriteAt uint64 + metricWriteAtBytes uint64 + metricWriteAtTime uint64 +} + +type DispatchMetrics struct { + PacketsIn uint64 + PacketsOut uint64 + ReadAt uint64 + ReadAtBytes uint64 + ReadAtTime time.Duration + WriteAt uint64 + WriteAtBytes uint64 + WriteAtTime time.Duration +} + +func (dm *DispatchMetrics) Add(delta *DispatchMetrics) { + dm.PacketsIn += delta.PacketsIn + dm.PacketsOut += delta.PacketsOut + dm.ReadAt += delta.ReadAt + dm.ReadAtBytes += delta.ReadAtBytes + dm.ReadAtTime += delta.ReadAtTime + dm.WriteAt += delta.WriteAt + dm.WriteAtBytes += delta.WriteAtBytes + dm.WriteAtTime += delta.WriteAtTime } func NewDispatch(ctx context.Context, name string, logger types.RootLogger, fp io.ReadWriteCloser, prov storage.Provider) *Dispatch { @@ -86,7 +116,7 @@ func NewDispatch(ctx context.Context, name string, logger types.RootLogger, fp i asyncWrites: true, asyncReads: true, responseHeader: make([]byte, 16), - fatal: make(chan error, 8), + fatal: make(chan error, 1), fp: fp, prov: prov, ctx: ctx, @@ -96,6 +126,19 @@ func NewDispatch(ctx context.Context, name string, logger types.RootLogger, fp i return d } +func (d *Dispatch) GetMetrics() *DispatchMetrics { + return &DispatchMetrics{ + PacketsIn: atomic.LoadUint64(&d.metricPacketsIn), + PacketsOut: atomic.LoadUint64(&d.metricPacketsOut), + ReadAt: atomic.LoadUint64(&d.metricReadAt), + ReadAtBytes: atomic.LoadUint64(&d.metricReadAtBytes), + ReadAtTime: time.Duration(atomic.LoadUint64(&d.metricReadAtTime)), + WriteAt: atomic.LoadUint64(&d.metricWriteAt), + WriteAtBytes: atomic.LoadUint64(&d.metricWriteAtBytes), + WriteAtTime: time.Duration(atomic.LoadUint64(&d.metricWriteAtTime)), + } +} + func (d *Dispatch) Wait() { if d.logger != nil { d.logger.Trace().Str("device", d.dev).Msg("nbd waiting for pending responses") @@ -184,6 +227,11 @@ func (d *Dispatch) Handle() error { // If the context has been cancelled, quit select { + + // Check if there is a fatal error from an async read/write to return + case err := <-d.fatal: + return err + case <-d.ctx.Done(): if d.logger != nil { d.logger.Trace(). @@ -305,15 +353,29 @@ func (d *Dispatch) cmdRead(cmdHandle uint64, cmdFrom uint64, cmdLength uint32) e if d.asyncReads { d.pendingResponses.Add(1) go func() { + ctime := time.Now() err := performRead(cmdHandle, cmdFrom, cmdLength) - if err != nil { - d.fatal <- err + if err == nil { + atomic.AddUint64(&d.metricReadAt, 1) + atomic.AddUint64(&d.metricReadAtBytes, uint64(cmdLength)) + atomic.AddUint64(&d.metricReadAtTime, uint64(time.Since(ctime))) + } else { + select { + case d.fatal <- err: + default: + } } d.pendingResponses.Done() }() } else { d.pendingResponses.Add(1) + ctime := time.Now() err := performRead(cmdHandle, cmdFrom, cmdLength) + if err == nil { + atomic.AddUint64(&d.metricReadAt, 1) + atomic.AddUint64(&d.metricReadAtBytes, uint64(cmdLength)) + atomic.AddUint64(&d.metricReadAtTime, uint64(time.Since(ctime))) + } d.pendingResponses.Done() return err } @@ -359,15 +421,29 @@ func (d *Dispatch) cmdWrite(cmdHandle uint64, cmdFrom uint64, cmdLength uint32, if d.asyncWrites { d.pendingResponses.Add(1) go func() { + ctime := time.Now() err := performWrite(cmdHandle, cmdFrom, cmdLength, cmdData) - if err != nil { - d.fatal <- err + if err == nil { + atomic.AddUint64(&d.metricWriteAt, 1) + atomic.AddUint64(&d.metricWriteAtBytes, uint64(cmdLength)) + atomic.AddUint64(&d.metricWriteAtTime, uint64(time.Since(ctime))) + } else { + select { + case d.fatal <- err: + default: + } } d.pendingResponses.Done() }() } else { d.pendingResponses.Add(1) + ctime := time.Now() err := performWrite(cmdHandle, cmdFrom, cmdLength, cmdData) + if err == nil { + atomic.AddUint64(&d.metricWriteAt, 1) + atomic.AddUint64(&d.metricWriteAtBytes, uint64(cmdLength)) + atomic.AddUint64(&d.metricWriteAtTime, uint64(time.Since(ctime))) + } d.pendingResponses.Done() return err } diff --git a/pkg/storage/expose/nbd_test.go b/pkg/storage/expose/nbd_test.go index 5012f6fb..88c30684 100644 --- a/pkg/storage/expose/nbd_test.go +++ b/pkg/storage/expose/nbd_test.go @@ -9,8 +9,11 @@ import ( "testing" "time" + "github.com/loopholelabs/logging" + "github.com/loopholelabs/logging/types" "github.com/loopholelabs/silo/pkg/storage/modules" "github.com/loopholelabs/silo/pkg/storage/sources" + "github.com/loopholelabs/silo/pkg/testutils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -34,7 +37,12 @@ func TestNBDNLDevice(t *testing.T) { size := 4096 * 1024 * 1024 prov := sources.NewMemoryStorage(size) - n = NewExposedStorageNBDNL(prov, DefaultConfig) + logBuffer := &testutils.SafeWriteBuffer{} + // Enable logging here to make sure it doesn't break anything... + l := logging.New(logging.Zerolog, "silo", logBuffer) + l.SetLevel(types.TraceLevel) + + n = NewExposedStorageNBDNL(prov, DefaultConfig.WithLogger(l)) err = n.Init() require.NoError(t, err) @@ -59,6 +67,9 @@ func TestNBDNLDevice(t *testing.T) { } wg.Wait() + + // There should be some log entries here + assert.Greater(t, logBuffer.Len(), 0) } func TestNBDNLDeviceBlocksizes(t *testing.T) { diff --git a/pkg/storage/metrics/metrics.go b/pkg/storage/metrics/metrics.go new file mode 100644 index 00000000..99232937 --- /dev/null +++ b/pkg/storage/metrics/metrics.go @@ -0,0 +1,622 @@ +package metrics + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/loopholelabs/silo/pkg/storage/dirtytracker" + "github.com/loopholelabs/silo/pkg/storage/expose" + "github.com/loopholelabs/silo/pkg/storage/migrator" + "github.com/loopholelabs/silo/pkg/storage/modules" + "github.com/loopholelabs/silo/pkg/storage/protocol" + "github.com/loopholelabs/silo/pkg/storage/sources" + "github.com/loopholelabs/silo/pkg/storage/volatilitymonitor" + "github.com/loopholelabs/silo/pkg/storage/waitingcache" + "github.com/prometheus/client_golang/prometheus" +) + +// How often to poll metrics. We may want to do some less often etc +const migratorTick = 100 * time.Millisecond +const protocolTick = 100 * time.Millisecond +const toProtocolTick = 100 * time.Millisecond +const fromProtocolTick = 100 * time.Millisecond +const s3Tick = 100 * time.Millisecond +const dirtyTrackerTick = 100 * time.Millisecond +const volatilityMonitorTick = 100 * time.Millisecond +const metricsTick = 100 * time.Millisecond +const nbdTick = 100 * time.Millisecond +const waitingCacheTick = 100 * time.Millisecond + +const promNamespace = "silo" + +const promSubMigrator = "migrator" +const promSubProtocol = "protocol" +const promSubToProtocol = "toProtocol" +const promSubFromProtocol = "fromProtocol" +const promSubS3 = "s3" +const promSubDirtyTracker = "dirtyTracker" +const promSubVolatilityMonitor = "volatilityMonitor" +const promSubMetrics = "metrics" +const promSubNbd = "nbd" +const promSubWaitingCache = "waitingCache" + +type Metrics struct { + reg prometheus.Registerer + lock sync.Mutex + + // migrator + migratorBlockSize *prometheus.GaugeVec + migratorTotalBlocks *prometheus.GaugeVec + migratorMigratedBlocks *prometheus.GaugeVec + migratorReadyBlocks *prometheus.GaugeVec + migratorActiveBlocks *prometheus.GaugeVec + migratorTotalMigratedBlocks *prometheus.GaugeVec + + // protocol + protocolPacketsSent *prometheus.GaugeVec + protocolDataSent *prometheus.GaugeVec + protocolPacketsRecv *prometheus.GaugeVec + protocolDataRecv *prometheus.GaugeVec + protocolWrites *prometheus.GaugeVec + protocolWriteErrors *prometheus.GaugeVec + protocolWaitingForId *prometheus.GaugeVec + + // s3 + s3BlocksR *prometheus.GaugeVec + s3BlocksRBytes *prometheus.GaugeVec + s3BlocksW *prometheus.GaugeVec + s3BlocksWBytes *prometheus.GaugeVec + + // toProtocol + toProtocolSentEvents *prometheus.GaugeVec + toProtocolSentAltSources *prometheus.GaugeVec + toProtocolSentHashes *prometheus.GaugeVec + toProtocolSentDevInfo *prometheus.GaugeVec + toProtocolSentDirtyList *prometheus.GaugeVec + toProtocolSentReadAt *prometheus.GaugeVec + toProtocolSentWriteAtHash *prometheus.GaugeVec + toProtocolSentWriteAtHashBytes *prometheus.GaugeVec + toProtocolSentWriteAtComp *prometheus.GaugeVec + toProtocolSentWriteAtCompBytes *prometheus.GaugeVec + toProtocolSentWriteAtCompDataBytes *prometheus.GaugeVec + toProtocolSentWriteAt *prometheus.GaugeVec + toProtocolSentWriteAtBytes *prometheus.GaugeVec + toProtocolSentWriteAtWithMap *prometheus.GaugeVec + toProtocolSentRemoveFromMap *prometheus.GaugeVec + toProtocolRecvNeedAt *prometheus.GaugeVec + toProtocolRecvDontNeedAt *prometheus.GaugeVec + + // fromProtocol + fromProtocolRecvEvents *prometheus.GaugeVec + fromProtocolRecvHashes *prometheus.GaugeVec + fromProtocolRecvDevInfo *prometheus.GaugeVec + fromProtocolRecvAltSources *prometheus.GaugeVec + fromProtocolRecvReadAt *prometheus.GaugeVec + fromProtocolRecvWriteAtHash *prometheus.GaugeVec + fromProtocolRecvWriteAtComp *prometheus.GaugeVec + fromProtocolRecvWriteAt *prometheus.GaugeVec + fromProtocolRecvWriteAtWithMap *prometheus.GaugeVec + fromProtocolRecvRemoveFromMap *prometheus.GaugeVec + fromProtocolRecvRemoveDev *prometheus.GaugeVec + fromProtocolRecvDirtyList *prometheus.GaugeVec + fromProtocolSentNeedAt *prometheus.GaugeVec + fromProtocolSentDontNeedAt *prometheus.GaugeVec + + // dirtyTracker + dirtyTrackerBlockSize *prometheus.GaugeVec + dirtyTrackerTrackingBlocks *prometheus.GaugeVec + dirtyTrackerDirtyBlocks *prometheus.GaugeVec + dirtyTrackerMaxAgeDirtyMS *prometheus.GaugeVec + + // volatilityMonitor + volatilityMonitorBlockSize *prometheus.GaugeVec + volatilityMonitorAvailable *prometheus.GaugeVec + volatilityMonitorVolatility *prometheus.GaugeVec + + // metrics + metricsReadOps *prometheus.GaugeVec + metricsReadBytes *prometheus.GaugeVec + metricsReadTime *prometheus.GaugeVec + metricsReadErrors *prometheus.GaugeVec + metricsWriteOps *prometheus.GaugeVec + metricsWriteBytes *prometheus.GaugeVec + metricsWriteTime *prometheus.GaugeVec + metricsWriteErrors *prometheus.GaugeVec + metricsFlushOps *prometheus.GaugeVec + metricsFlushTime *prometheus.GaugeVec + metricsFlushErrors *prometheus.GaugeVec + + // nbd + nbdPacketsIn *prometheus.GaugeVec + nbdPacketsOut *prometheus.GaugeVec + nbdReadAt *prometheus.GaugeVec + nbdReadAtBytes *prometheus.GaugeVec + nbdWriteAt *prometheus.GaugeVec + nbdWriteAtBytes *prometheus.GaugeVec + + // waitingCache + waitingCacheWaitForBlock *prometheus.GaugeVec + waitingCacheWaitForBlockHadRemote *prometheus.GaugeVec + waitingCacheWaitForBlockHadLocal *prometheus.GaugeVec + waitingCacheWaitForBlockLock *prometheus.GaugeVec + waitingCacheWaitForBlockLockDone *prometheus.GaugeVec + waitingCacheMarkAvailableLocalBlock *prometheus.GaugeVec + waitingCacheMarkAvailableRemoteBlock *prometheus.GaugeVec + waitingCacheAvailableLocal *prometheus.GaugeVec + waitingCacheAvailableRemote *prometheus.GaugeVec + + // vm + vmRunning prometheus.Gauge + + cancelfns map[string]context.CancelFunc +} + +func New(reg prometheus.Registerer) *Metrics { + met := &Metrics{ + reg: reg, + // Migrator + migratorBlockSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMigrator, Name: "block_size", Help: "Block size"}, []string{"device"}), + migratorTotalBlocks: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMigrator, Name: "total_blocks", Help: "Total blocks"}, []string{"device"}), + migratorActiveBlocks: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMigrator, Name: "active_blocks", Help: "Active blocks"}, []string{"device"}), + migratorMigratedBlocks: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMigrator, Name: "migrated_blocks", Help: "Migrated blocks"}, []string{"device"}), + migratorReadyBlocks: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMigrator, Name: "ready_blocks", Help: "Ready blocks"}, []string{"device"}), + migratorTotalMigratedBlocks: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMigrator, Name: "total_migrated_blocks", Help: "Total migrated blocks"}, []string{"device"}), + + // Protocol + protocolPacketsSent: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubProtocol, Name: "packets_sent", Help: "Packets sent"}, []string{"device"}), + protocolDataSent: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubProtocol, Name: "data_sent", Help: "Data sent"}, []string{"device"}), + protocolPacketsRecv: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubProtocol, Name: "packets_recv", Help: "Packets recv"}, []string{"device"}), + protocolDataRecv: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubProtocol, Name: "data_recv", Help: "Data recv"}, []string{"device"}), + protocolWrites: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubProtocol, Name: "writes", Help: "Writes"}, []string{"device"}), + protocolWriteErrors: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubProtocol, Name: "write_errors", Help: "Write errors"}, []string{"device"}), + protocolWaitingForId: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubProtocol, Name: "waiting_for_id", Help: "Waiting for ID"}, []string{"device"}), + + // ToProtocol + toProtocolSentEvents: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "sent_events", Help: "sentEvents"}, []string{"device"}), + toProtocolSentAltSources: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "sent_alt_sources", Help: "sentAltSources"}, []string{"device"}), + toProtocolSentHashes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "sent_hashes", Help: "sentHashes"}, []string{"device"}), + toProtocolSentDevInfo: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "sent_dev_info", Help: "sentDevInfo"}, []string{"device"}), + toProtocolSentDirtyList: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "sent_dirty_list", Help: "sentDirtyList"}, []string{"device"}), + toProtocolSentReadAt: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "sent_read_at", Help: "sentReadAt"}, []string{"device"}), + toProtocolSentWriteAtHash: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "sent_write_at_hash", Help: "sentWriteAtHash"}, []string{"device"}), + toProtocolSentWriteAtHashBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "write_at_hash_bytes", Help: "sentWriteAtHashBytes"}, []string{"device"}), + toProtocolSentWriteAtComp: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "write_at_comp", Help: "sentWriteAtComp"}, []string{"device"}), + toProtocolSentWriteAtCompBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "write_at_comp_bytes", Help: "sentWriteAtCompBytes"}, []string{"device"}), + toProtocolSentWriteAtCompDataBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "write_at_comp_data_bytes", Help: "sentWriteAtCompDataBytes"}, []string{"device"}), + toProtocolSentWriteAt: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "write_at", Help: "sentWriteAt"}, []string{"device"}), + toProtocolSentWriteAtBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "write_at_bytes", Help: "sentWriteAtBytes"}, []string{"device"}), + toProtocolSentWriteAtWithMap: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "write_at_with_map", Help: "sentWriteAtWithMap"}, []string{"device"}), + toProtocolSentRemoveFromMap: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "sent_remove_from_map", Help: "sentRemoveFromMap"}, []string{"device"}), + toProtocolRecvNeedAt: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "recv_need_at", Help: "recvNeedAt"}, []string{"device"}), + toProtocolRecvDontNeedAt: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubToProtocol, Name: "recv_dont_need_at", Help: "recvDontNeedAt"}, []string{"device"}), + + // fromProtocol + fromProtocolRecvEvents: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubFromProtocol, Name: "recv_events", Help: "recvEvents"}, []string{"device"}), + fromProtocolRecvHashes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubFromProtocol, Name: "recv_hashes", Help: "recvHashes"}, []string{"device"}), + fromProtocolRecvDevInfo: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubFromProtocol, Name: "recv_dev_info", Help: "recvDevInfo"}, []string{"device"}), + fromProtocolRecvAltSources: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubFromProtocol, Name: "recv_alt_sources", Help: "recvAltSources"}, []string{"device"}), + fromProtocolRecvReadAt: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubFromProtocol, Name: "recv_read_at", Help: "recvReadAt"}, []string{"device"}), + fromProtocolRecvWriteAtHash: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubFromProtocol, Name: "recv_write_at_hash", Help: "recvWriteAtHash"}, []string{"device"}), + fromProtocolRecvWriteAtComp: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubFromProtocol, Name: "recv_write_at_comp", Help: "recvWriteAtComp"}, []string{"device"}), + fromProtocolRecvWriteAt: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubFromProtocol, Name: "recv_write_at", Help: "recvWriteAt"}, []string{"device"}), + fromProtocolRecvWriteAtWithMap: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubFromProtocol, Name: "recv_write_at_with_map", Help: "recvWriteAtWithMap"}, []string{"device"}), + fromProtocolRecvRemoveFromMap: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubFromProtocol, Name: "recv_remove_from_map", Help: "recvRemoveFromMap"}, []string{"device"}), + fromProtocolRecvRemoveDev: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubFromProtocol, Name: "recv_remove_dev", Help: "recvRemoveDev"}, []string{"device"}), + fromProtocolRecvDirtyList: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubFromProtocol, Name: "recv_dirty_list", Help: "recvDirtyList"}, []string{"device"}), + fromProtocolSentNeedAt: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubFromProtocol, Name: "sent_need_at", Help: "sentNeedAt"}, []string{"device"}), + fromProtocolSentDontNeedAt: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubFromProtocol, Name: "sent_dont_need_at", Help: "sentDontNeedAt"}, []string{"device"}), + + // S3Storage + s3BlocksW: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubS3, Name: "blocks_w", Help: "Blocks w"}, []string{"device"}), + s3BlocksWBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubS3, Name: "blocks_w_bytes", Help: "Blocks w bytes"}, []string{"device"}), + s3BlocksR: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubS3, Name: "blocks_r", Help: "Blocks r"}, []string{"device"}), + s3BlocksRBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubS3, Name: "blocks_r_bytes", Help: "Blocks r bytes"}, []string{"device"}), + + // DirtyTracker + dirtyTrackerBlockSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubDirtyTracker, Name: "block_size", Help: "Block size"}, []string{"device"}), + dirtyTrackerTrackingBlocks: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubDirtyTracker, Name: "tracking_blocks", Help: "Blocks being tracked"}, []string{"device"}), + dirtyTrackerDirtyBlocks: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubDirtyTracker, Name: "dirty_blocks", Help: "Blocks dirty"}, []string{"device"}), + dirtyTrackerMaxAgeDirtyMS: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubDirtyTracker, Name: "block_max_age", Help: "Block dirty max age"}, []string{"device"}), + + // VolatilityMonitor + volatilityMonitorBlockSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubVolatilityMonitor, Name: "block_size", Help: "Block size"}, []string{"device"}), + volatilityMonitorAvailable: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubVolatilityMonitor, Name: "available", Help: "Blocks available"}, []string{"device"}), + volatilityMonitorVolatility: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubVolatilityMonitor, Name: "volatility", Help: "Volatility"}, []string{"device"}), + + // Metrics + metricsReadOps: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMetrics, Name: "read_ops", Help: "ReadOps"}, []string{"device"}), + metricsReadBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMetrics, Name: "read_bytes", Help: "ReadBytes"}, []string{"device"}), + metricsReadErrors: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMetrics, Name: "read_errors", Help: "ReadErrors"}, []string{"device"}), + metricsReadTime: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMetrics, Name: "read_time", Help: "ReadTime"}, []string{"device"}), + metricsWriteOps: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMetrics, Name: "write_ops", Help: "WriteOps"}, []string{"device"}), + metricsWriteBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMetrics, Name: "write_bytes", Help: "WriteBytes"}, []string{"device"}), + metricsWriteErrors: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMetrics, Name: "write_errors", Help: "WriteErrors"}, []string{"device"}), + metricsWriteTime: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMetrics, Name: "write_time", Help: "WriteTime"}, []string{"device"}), + metricsFlushOps: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMetrics, Name: "flush_ops", Help: "FlushOps"}, []string{"device"}), + metricsFlushErrors: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMetrics, Name: "flush_errors", Help: "FlushErrors"}, []string{"device"}), + metricsFlushTime: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubMetrics, Name: "flush_time", Help: "FlushTime"}, []string{"device"}), + + // nbd + nbdPacketsIn: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubNbd, Name: "packets_in", Help: "PacketsIn"}, []string{"device"}), + nbdPacketsOut: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubNbd, Name: "packets_out", Help: "PacketsOut"}, []string{"device"}), + nbdReadAt: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubNbd, Name: "read_at", Help: "ReadAt"}, []string{"device"}), + nbdReadAtBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubNbd, Name: "read_at_bytes", Help: "ReadAtBytes"}, []string{"device"}), + nbdWriteAt: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubNbd, Name: "write_at", Help: "WriteAt"}, []string{"device"}), + nbdWriteAtBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubNbd, Name: "write_at_bytes", Help: "WriteAtBytes"}, []string{"device"}), + + // waitingCache + waitingCacheWaitForBlock: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubWaitingCache, Name: "waiting_for_block", Help: "WaitingForBlock"}, []string{"device"}), + waitingCacheWaitForBlockHadRemote: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubWaitingCache, Name: "waiting_for_block_had_remote", Help: "WaitingForBlockHadRemote"}, []string{"device"}), + waitingCacheWaitForBlockHadLocal: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubWaitingCache, Name: "waiting_for_block_had_local", Help: "WaitingForBlockHadLocal"}, []string{"device"}), + waitingCacheWaitForBlockLock: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubWaitingCache, Name: "waiting_for_block_lock", Help: "WaitingForBlockLock"}, []string{"device"}), + waitingCacheWaitForBlockLockDone: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubWaitingCache, Name: "waiting_for_block_lock_done", Help: "WaitingForBlockLockDone"}, []string{"device"}), + waitingCacheMarkAvailableLocalBlock: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubWaitingCache, Name: "mark_available_local_block", Help: "MarkAvailableLocalBlock"}, []string{"device"}), + waitingCacheMarkAvailableRemoteBlock: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubWaitingCache, Name: "mark_available_remote_block", Help: "MarkAvailableRemoteBlock"}, []string{"device"}), + waitingCacheAvailableLocal: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubWaitingCache, Name: "available_local", Help: "AvailableLocal"}, []string{"device"}), + waitingCacheAvailableRemote: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNamespace, Subsystem: promSubWaitingCache, Name: "available_remote", Help: "AvailableRemote"}, []string{"device"}), + + // vm + vmRunning: prometheus.NewGauge(prometheus.GaugeOpts{Namespace: promNamespace, Subsystem: "vm", Name: "running", Help: "running"}), + + cancelfns: make(map[string]context.CancelFunc), + } + + // Register all the metrics + reg.MustRegister(met.migratorBlockSize, met.migratorActiveBlocks, met.migratorTotalBlocks, met.migratorMigratedBlocks, met.migratorTotalMigratedBlocks, met.migratorReadyBlocks) + + reg.MustRegister(met.protocolPacketsSent, met.protocolDataSent, met.protocolPacketsRecv, met.protocolDataRecv, met.protocolWrites, met.protocolWriteErrors, met.protocolWaitingForId) + + reg.MustRegister(met.s3BlocksR, met.s3BlocksRBytes, met.s3BlocksW, met.s3BlocksWBytes) + + reg.MustRegister(met.toProtocolSentEvents, met.toProtocolSentAltSources, met.toProtocolSentHashes, met.toProtocolSentDevInfo, + met.toProtocolSentDirtyList, met.toProtocolSentReadAt, met.toProtocolSentWriteAtHash, met.toProtocolSentWriteAtHashBytes, + met.toProtocolSentWriteAtComp, met.toProtocolSentWriteAtCompBytes, met.toProtocolSentWriteAtCompDataBytes, + met.toProtocolSentWriteAt, met.toProtocolSentWriteAtBytes, met.toProtocolSentWriteAtWithMap, + met.toProtocolSentRemoveFromMap, met.toProtocolRecvNeedAt, met.toProtocolRecvDontNeedAt, + ) + + reg.MustRegister(met.fromProtocolRecvEvents, met.fromProtocolRecvHashes, met.fromProtocolRecvDevInfo, + met.fromProtocolRecvAltSources, met.fromProtocolRecvReadAt, met.fromProtocolRecvWriteAtHash, + met.fromProtocolRecvWriteAtComp, met.fromProtocolRecvWriteAt, met.fromProtocolRecvWriteAtWithMap, + met.fromProtocolRecvRemoveFromMap, met.fromProtocolRecvRemoveDev, met.fromProtocolRecvDirtyList, + met.fromProtocolSentNeedAt, met.fromProtocolSentDontNeedAt) + + reg.MustRegister(met.dirtyTrackerBlockSize, met.dirtyTrackerDirtyBlocks, met.dirtyTrackerTrackingBlocks, met.dirtyTrackerMaxAgeDirtyMS) + + reg.MustRegister(met.volatilityMonitorBlockSize, met.volatilityMonitorAvailable, met.volatilityMonitorVolatility) + + reg.MustRegister( + met.metricsReadOps, + met.metricsReadBytes, + met.metricsReadTime, + met.metricsReadErrors, + met.metricsWriteOps, + met.metricsWriteBytes, + met.metricsWriteTime, + met.metricsWriteErrors, + met.metricsFlushOps, + met.metricsFlushTime, + met.metricsFlushErrors) + + reg.MustRegister( + met.nbdPacketsIn, met.nbdPacketsOut, met.nbdReadAt, met.nbdReadAtBytes, met.nbdWriteAt, met.nbdWriteAtBytes) + + reg.MustRegister( + met.waitingCacheWaitForBlock, + met.waitingCacheWaitForBlockHadRemote, + met.waitingCacheWaitForBlockHadLocal, + met.waitingCacheWaitForBlockLock, + met.waitingCacheWaitForBlockLockDone, + met.waitingCacheMarkAvailableLocalBlock, + met.waitingCacheMarkAvailableRemoteBlock, + met.waitingCacheAvailableLocal, + met.waitingCacheAvailableRemote, + ) + + reg.MustRegister(met.vmRunning) + return met +} + +func (m *Metrics) remove(subsystem string, name string) { + m.lock.Lock() + cancelfn, ok := m.cancelfns[fmt.Sprintf("%s_%s", subsystem, name)] + if ok { + cancelfn() + delete(m.cancelfns, fmt.Sprintf("%s_%s", subsystem, name)) + } + m.lock.Unlock() +} + +func (m *Metrics) add(subsystem string, name string, interval time.Duration, tickfn func()) { + ctx, cancelfn := context.WithCancel(context.TODO()) + m.lock.Lock() + m.cancelfns[fmt.Sprintf("%s_%s", subsystem, name)] = cancelfn + m.lock.Unlock() + + ticker := time.NewTicker(interval) + go func() { + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + tickfn() + } + } + }() +} + +func (m *Metrics) AddMigrator(name string, mig *migrator.Migrator) { + m.add(promSubMigrator, name, migratorTick, func() { + met := mig.GetMetrics() + m.migratorBlockSize.WithLabelValues(name).Set(float64(met.BlockSize)) + m.migratorTotalBlocks.WithLabelValues(name).Set(float64(met.TotalBlocks)) + m.migratorMigratedBlocks.WithLabelValues(name).Set(float64(met.MigratedBlocks)) + m.migratorReadyBlocks.WithLabelValues(name).Set(float64(met.ReadyBlocks)) + m.migratorActiveBlocks.WithLabelValues(name).Set(float64(met.ActiveBlocks)) + m.migratorTotalMigratedBlocks.WithLabelValues(name).Set(float64(met.TotalMigratedBlocks)) + }) +} + +func (m *Metrics) RemoveMigrator(name string) { + m.remove(promSubMigrator, name) +} + +func (m *Metrics) AddProtocol(name string, proto *protocol.RW) { + m.add(promSubProtocol, name, protocolTick, func() { + met := proto.GetMetrics() + m.protocolPacketsSent.WithLabelValues(name).Set(float64(met.PacketsSent)) + m.protocolDataSent.WithLabelValues(name).Set(float64(met.DataSent)) + m.protocolPacketsRecv.WithLabelValues(name).Set(float64(met.PacketsRecv)) + m.protocolDataRecv.WithLabelValues(name).Set(float64(met.DataRecv)) + m.protocolWrites.WithLabelValues(name).Set(float64(met.Writes)) + m.protocolWriteErrors.WithLabelValues(name).Set(float64(met.WriteErrors)) + m.protocolWaitingForId.WithLabelValues(name).Set(float64(met.WaitingForID)) + }) +} + +func (m *Metrics) RemoveProtocol(name string) { + m.remove(promSubProtocol, name) +} + +func (m *Metrics) AddToProtocol(name string, proto *protocol.ToProtocol) { + m.add(promSubToProtocol, name, toProtocolTick, func() { + met := proto.GetMetrics() + + m.toProtocolSentEvents.WithLabelValues(name).Set(float64(met.SentEvents)) + m.toProtocolSentAltSources.WithLabelValues(name).Set(float64(met.SentAltSources)) + m.toProtocolSentHashes.WithLabelValues(name).Set(float64(met.SentHashes)) + m.toProtocolSentDevInfo.WithLabelValues(name).Set(float64(met.SentDevInfo)) + m.toProtocolSentDirtyList.WithLabelValues(name).Set(float64(met.SentDirtyList)) + m.toProtocolSentReadAt.WithLabelValues(name).Set(float64(met.SentReadAt)) + m.toProtocolSentWriteAtHash.WithLabelValues(name).Set(float64(met.SentWriteAtHash)) + m.toProtocolSentWriteAtHashBytes.WithLabelValues(name).Set(float64(met.SentWriteAtHashBytes)) + m.toProtocolSentWriteAtComp.WithLabelValues(name).Set(float64(met.SentWriteAtComp)) + m.toProtocolSentWriteAtCompBytes.WithLabelValues(name).Set(float64(met.SentWriteAtCompBytes)) + m.toProtocolSentWriteAtCompDataBytes.WithLabelValues(name).Set(float64(met.SentWriteAtCompDataBytes)) + m.toProtocolSentWriteAt.WithLabelValues(name).Set(float64(met.SentWriteAt)) + m.toProtocolSentWriteAtBytes.WithLabelValues(name).Set(float64(met.SentWriteAtBytes)) + m.toProtocolSentWriteAtWithMap.WithLabelValues(name).Set(float64(met.SentWriteAtWithMap)) + m.toProtocolSentRemoveFromMap.WithLabelValues(name).Set(float64(met.SentRemoveFromMap)) + m.toProtocolRecvNeedAt.WithLabelValues(name).Set(float64(met.RecvNeedAt)) + m.toProtocolRecvDontNeedAt.WithLabelValues(name).Set(float64(met.RecvDontNeedAt)) + }) +} + +func (m *Metrics) RemoveToProtocol(name string) { + m.remove(promSubToProtocol, name) +} + +func (m *Metrics) AddFromProtocol(name string, proto *protocol.FromProtocol) { + m.add(promSubFromProtocol, name, fromProtocolTick, func() { + met := proto.GetMetrics() + + m.fromProtocolRecvEvents.WithLabelValues(name).Set(float64(met.RecvEvents)) + m.fromProtocolRecvHashes.WithLabelValues(name).Set(float64(met.RecvHashes)) + m.fromProtocolRecvDevInfo.WithLabelValues(name).Set(float64(met.RecvDevInfo)) + m.fromProtocolRecvAltSources.WithLabelValues(name).Set(float64(met.RecvAltSources)) + m.fromProtocolRecvReadAt.WithLabelValues(name).Set(float64(met.RecvReadAt)) + m.fromProtocolRecvWriteAtHash.WithLabelValues(name).Set(float64(met.RecvWriteAtHash)) + m.fromProtocolRecvWriteAtComp.WithLabelValues(name).Set(float64(met.RecvWriteAtComp)) + m.fromProtocolRecvWriteAt.WithLabelValues(name).Set(float64(met.RecvWriteAt)) + m.fromProtocolRecvWriteAtWithMap.WithLabelValues(name).Set(float64(met.RecvWriteAtWithMap)) + m.fromProtocolRecvRemoveFromMap.WithLabelValues(name).Set(float64(met.RecvRemoveFromMap)) + m.fromProtocolRecvRemoveDev.WithLabelValues(name).Set(float64(met.RecvRemoveDev)) + m.fromProtocolRecvDirtyList.WithLabelValues(name).Set(float64(met.RecvDirtyList)) + m.fromProtocolSentNeedAt.WithLabelValues(name).Set(float64(met.SentNeedAt)) + m.fromProtocolSentDontNeedAt.WithLabelValues(name).Set(float64(met.SentDontNeedAt)) + }) +} + +func (m *Metrics) RemoveFromProtocol(name string) { + m.remove(promSubFromProtocol, name) +} + +func (m *Metrics) AddS3Storage(name string, s3 *sources.S3Storage) { + m.add(promSubS3, name, s3Tick, func() { + met := s3.Metrics() + m.s3BlocksW.WithLabelValues(name).Set(float64(met.BlocksWCount)) + m.s3BlocksWBytes.WithLabelValues(name).Set(float64(met.BlocksWBytes)) + m.s3BlocksR.WithLabelValues(name).Set(float64(met.BlocksRCount)) + m.s3BlocksRBytes.WithLabelValues(name).Set(float64(met.BlocksRBytes)) + }) + +} + +func (m *Metrics) RemoveS3Storage(name string) { + m.remove(promSubS3, name) +} + +func (m *Metrics) AddDirtyTracker(name string, dt *dirtytracker.Remote) { + m.add(promSubDirtyTracker, name, dirtyTrackerTick, func() { + met := dt.GetMetrics() + m.dirtyTrackerBlockSize.WithLabelValues(name).Set(float64(met.BlockSize)) + m.dirtyTrackerTrackingBlocks.WithLabelValues(name).Set(float64(met.TrackingBlocks)) + m.dirtyTrackerDirtyBlocks.WithLabelValues(name).Set(float64(met.DirtyBlocks)) + m.dirtyTrackerMaxAgeDirtyMS.WithLabelValues(name).Set(float64(met.MaxAgeDirty)) + }) +} + +func (m *Metrics) RemoveDirtyTracker(name string) { + m.remove(promSubDirtyTracker, name) +} + +func (m *Metrics) AddVolatilityMonitor(name string, vm *volatilitymonitor.VolatilityMonitor) { + m.add(promSubVolatilityMonitor, name, volatilityMonitorTick, func() { + met := vm.GetMetrics() + m.volatilityMonitorBlockSize.WithLabelValues(name).Set(float64(met.BlockSize)) + m.volatilityMonitorAvailable.WithLabelValues(name).Set(float64(met.Available)) + m.volatilityMonitorVolatility.WithLabelValues(name).Set(float64(met.Volatility)) + }) +} + +func (m *Metrics) RemoveVolatilityMonitor(name string) { + m.remove(promSubVolatilityMonitor, name) +} + +func (m *Metrics) AddMetrics(name string, mm *modules.Metrics) { + m.add(promSubMetrics, name, metricsTick, func() { + met := mm.GetMetrics() + m.metricsReadOps.WithLabelValues(name).Set(float64(met.ReadOps)) + m.metricsReadBytes.WithLabelValues(name).Set(float64(met.ReadBytes)) + m.metricsReadErrors.WithLabelValues(name).Set(float64(met.ReadErrors)) + m.metricsReadTime.WithLabelValues(name).Set(float64(met.ReadTime)) + m.metricsWriteOps.WithLabelValues(name).Set(float64(met.WriteOps)) + m.metricsWriteBytes.WithLabelValues(name).Set(float64(met.WriteBytes)) + m.metricsWriteErrors.WithLabelValues(name).Set(float64(met.WriteErrors)) + m.metricsWriteTime.WithLabelValues(name).Set(float64(met.WriteTime)) + m.metricsFlushOps.WithLabelValues(name).Set(float64(met.FlushOps)) + m.metricsFlushErrors.WithLabelValues(name).Set(float64(met.FlushErrors)) + m.metricsFlushTime.WithLabelValues(name).Set(float64(met.FlushTime)) + }) +} + +func (m *Metrics) RemoveMetrics(name string) { + m.remove(promSubMetrics, name) +} + +func (m *Metrics) AddNBD(name string, mm *expose.ExposedStorageNBDNL) { + m.add(promSubNbd, name, nbdTick, func() { + met := mm.GetMetrics() + m.nbdPacketsIn.WithLabelValues(name).Set(float64(met.PacketsIn)) + m.nbdPacketsOut.WithLabelValues(name).Set(float64(met.PacketsOut)) + m.nbdReadAt.WithLabelValues(name).Set(float64(met.ReadAt)) + m.nbdReadAtBytes.WithLabelValues(name).Set(float64(met.ReadAtBytes)) + m.nbdWriteAt.WithLabelValues(name).Set(float64(met.WriteAt)) + m.nbdWriteAtBytes.WithLabelValues(name).Set(float64(met.WriteAtBytes)) + }) +} + +func (m *Metrics) RemoveNBD(name string) { + m.remove(promSubNbd, name) +} + +func (m *Metrics) AddWaitingCache(name string, wc *waitingcache.Remote) { + m.add(promSubWaitingCache, name, waitingCacheTick, func() { + met := wc.GetMetrics() + m.waitingCacheWaitForBlock.WithLabelValues(name).Set(float64(met.WaitForBlock)) + m.waitingCacheWaitForBlockHadRemote.WithLabelValues(name).Set(float64(met.WaitForBlockHadRemote)) + m.waitingCacheWaitForBlockHadLocal.WithLabelValues(name).Set(float64(met.WaitForBlockHadLocal)) + m.waitingCacheWaitForBlockLock.WithLabelValues(name).Set(float64(met.WaitForBlockLock)) + m.waitingCacheWaitForBlockLockDone.WithLabelValues(name).Set(float64(met.WaitForBlockLockDone)) + m.waitingCacheMarkAvailableLocalBlock.WithLabelValues(name).Set(float64(met.MarkAvailableLocalBlock)) + m.waitingCacheMarkAvailableRemoteBlock.WithLabelValues(name).Set(float64(met.MarkAvailableRemoteBlock)) + m.waitingCacheAvailableLocal.WithLabelValues(name).Set(float64(met.AvailableLocal)) + m.waitingCacheAvailableRemote.WithLabelValues(name).Set(float64(met.AvailableRemote)) + }) +} + +func (m *Metrics) RemoveWaitingCache(name string) { + m.remove(promSubWaitingCache, name) +} + +func (m *Metrics) SetVMRunning(running bool) { + if running { + m.vmRunning.Set(float64(1)) + } else { + m.vmRunning.Set(float64(0)) + } +} diff --git a/pkg/storage/migrator/migrator.go b/pkg/storage/migrator/migrator.go index e2896c84..2e1afa89 100644 --- a/pkg/storage/migrator/migrator.go +++ b/pkg/storage/migrator/migrator.go @@ -10,7 +10,6 @@ import ( "github.com/google/uuid" "github.com/loopholelabs/logging/types" - "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/integrity" "github.com/loopholelabs/silo/pkg/storage/modules" @@ -61,6 +60,7 @@ func (mc *Config) WithBlockSize(bs int) *Config { } type MigrationProgress struct { + BlockSize int TotalBlocks int // Total blocks MigratedBlocks int // Number of blocks that have been migrated MigratedBlocksPerc float64 @@ -107,6 +107,7 @@ type Migrator struct { dedupeWrites bool recentWriteAge time.Duration migrationStarted bool + migrationStartTime time.Time } func NewMigrator(source storage.TrackingProvider, @@ -119,6 +120,7 @@ func NewMigrator(source storage.TrackingProvider, uuid: uuid.New(), logger: config.Logger, migrationStarted: false, + migrationStartTime: time.Now(), dest: dest, sourceTracker: source, sourceLockFn: config.LockerHandler, @@ -197,6 +199,7 @@ func (m *Migrator) startMigration() { return } m.migrationStarted = true + m.migrationStartTime = time.Now() if m.logger != nil { m.logger.Debug(). @@ -384,6 +387,7 @@ func (m *Migrator) WaitForCompletion() error { m.logger.Debug(). Str("uuid", m.uuid.String()). Uint64("size", m.sourceTracker.Size()). + Int64("TimeMigratingMS", time.Since(m.migrationStartTime).Milliseconds()). Msg("Migration complete") } return nil @@ -417,6 +421,7 @@ func (m *Migrator) reportProgress(forced bool) { m.progressLast = time.Now() m.progressLastStatus = &MigrationProgress{ + BlockSize: m.blockSize, TotalBlocks: m.numBlocks, MigratedBlocks: migrated, MigratedBlocksPerc: percMig, @@ -441,6 +446,7 @@ func (m *Migrator) reportProgress(forced bool) { Int("TotalCanceledBlocks", m.progressLastStatus.TotalCanceledBlocks). Int("TotalMigratedBlocks", m.progressLastStatus.TotalMigratedBlocks). Int("TotalDuplicatedBlocks", m.progressLastStatus.TotalDuplicatedBlocks). + Int64("TimeMigratingMS", time.Since(m.migrationStartTime).Milliseconds()). Msg("Migration progress") } @@ -452,7 +458,7 @@ func (m *Migrator) reportProgress(forced bool) { * Get overall status of the migration * */ -func (m *Migrator) Status() *MigrationProgress { +func (m *Migrator) GetMetrics() *MigrationProgress { m.progressLock.Lock() defer m.progressLock.Unlock() @@ -463,6 +469,7 @@ func (m *Migrator) Status() *MigrationProgress { percComplete := float64(completed*100) / float64(m.numBlocks) return &MigrationProgress{ + BlockSize: m.blockSize, TotalBlocks: m.numBlocks, MigratedBlocks: migrated, MigratedBlocksPerc: percMig, diff --git a/pkg/storage/migrator/migrator_benchmark_test.go b/pkg/storage/migrator/migrator_benchmark_test.go index a7a58349..5ff4bd69 100644 --- a/pkg/storage/migrator/migrator_benchmark_test.go +++ b/pkg/storage/migrator/migrator_benchmark_test.go @@ -4,7 +4,6 @@ import ( "context" "io" "testing" - "time" "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/blocks" @@ -186,11 +185,8 @@ func BenchmarkMigrationPipe(mb *testing.B) { _ = prDestRW.Handle() }() - prSource := protocol.NewTestProtocolLatency(prSourceRW, 80*time.Millisecond) - prDest := protocol.NewTestProtocolLatency(prDestRW, 80*time.Millisecond) - - prSource = protocol.NewTestProtocolBandwidth(prSource, 1024*1024*1024) // 1GB/sec - prDest = protocol.NewTestProtocolBandwidth(prDest, 1024*1024*1024) // 1GB/sec + prSource := protocol.NewTestProtocolBandwidth(prSourceRW, 1024*1024*1024) // 1GB/sec + prDest := protocol.NewTestProtocolBandwidth(prDestRW, 1024*1024*1024) // 1GB/sec // Make sure new devs get given the latency/bandwidth protocol... diff --git a/pkg/storage/migrator/sync.go b/pkg/storage/migrator/sync.go index 10a9fba6..f3b15096 100644 --- a/pkg/storage/migrator/sync.go +++ b/pkg/storage/migrator/sync.go @@ -43,6 +43,7 @@ type Syncer struct { blockStatusLock sync.Mutex blockStatus []BlockStatus currentDirtyID uint64 + migrator *Migrator } type BlockStatus struct { @@ -88,6 +89,13 @@ func (s *Syncer) GetSafeBlockMap() map[uint][sha256.Size]byte { return blocks } +func (s *Syncer) GetMetrics() *MigrationProgress { + if s.migrator == nil { + return nil + } + return s.migrator.GetMetrics() +} + /** * * @@ -122,7 +130,7 @@ func (s *Syncer) Sync(syncAllFirst bool, continuous bool) (*MigrationProgress, e conf.ProgressHandler = func(p *MigrationProgress) { if s.config.Logger != nil { - s.config.Logger.Info(). + s.config.Logger.Debug(). Str("name", s.config.Name). Float64("migrated_blocks_perc", p.MigratedBlocksPerc). Int("ready_blocks", p.ReadyBlocks). @@ -173,13 +181,28 @@ func (s *Syncer) Sync(syncAllFirst bool, continuous bool) (*MigrationProgress, e return nil, err } + s.migrator = mig + numBlocks := (s.config.Tracker.Size() + uint64(s.config.BlockSize) - 1) / uint64(s.config.BlockSize) if syncAllFirst { - // Now do the initial migration... - err = mig.Migrate(int(numBlocks)) - if err != nil { - return nil, err + // Do initial migration + for b := 0; b < int(numBlocks); b++ { + select { + case <-s.ctx.Done(): + // Context has been cancelled. We should wait for any pending migrations to complete + err = mig.WaitForCompletion() + if err != nil { + return mig.GetMetrics(), err + } + return mig.GetMetrics(), s.ctx.Err() + default: + } + + err = mig.Migrate(1) + if err != nil { + return nil, err + } } // Wait for completion. @@ -209,9 +232,9 @@ func (s *Syncer) Sync(syncAllFirst bool, continuous bool) (*MigrationProgress, e // Context has been cancelled. We should wait for any pending migrations to complete err = mig.WaitForCompletion() if err != nil { - return mig.Status(), err + return mig.GetMetrics(), err } - return mig.Status(), s.ctx.Err() + return mig.GetMetrics(), s.ctx.Err() default: } blocks := mig.GetLatestDirtyFunc(s.config.DirtyBlockGetter) @@ -226,13 +249,13 @@ func (s *Syncer) Sync(syncAllFirst bool, continuous bool) (*MigrationProgress, e s.blockStatusLock.Unlock() err = mig.MigrateDirtyWithID(blocks, id) if err != nil { - return mig.Status(), err + return mig.GetMetrics(), err } } else { if !continuous { // We are done! Everything is synced, and the source is locked. err = mig.WaitForCompletion() - return mig.Status(), err + return mig.GetMetrics(), err } mig.Unlock() } diff --git a/pkg/storage/modules/metrics.go b/pkg/storage/modules/metrics.go index 37563d87..de0406a2 100644 --- a/pkg/storage/modules/metrics.go +++ b/pkg/storage/modules/metrics.go @@ -111,7 +111,7 @@ func (i *Metrics) ShowStats(prefix string) { ) } -func (i *Metrics) Snapshot() *MetricsSnapshot { +func (i *Metrics) GetMetrics() *MetricsSnapshot { return &MetricsSnapshot{ ReadOps: atomic.LoadUint64(&i.metricReadOps), ReadBytes: atomic.LoadUint64(&i.metricReadBytes), diff --git a/pkg/storage/protocol/from_protocol.go b/pkg/storage/protocol/from_protocol.go index 64214d39..66b782e6 100644 --- a/pkg/storage/protocol/from_protocol.go +++ b/pkg/storage/protocol/from_protocol.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "sync" + "sync/atomic" "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/protocol/packets" @@ -24,6 +25,38 @@ type FromProtocol struct { alternateSourcesLock sync.Mutex alternateSources []packets.AlternateSource + + metricRecvEvents uint64 + metricRecvHashes uint64 + metricRecvDevInfo uint64 + metricRecvAltSources uint64 + metricRecvReadAt uint64 + metricRecvWriteAtHash uint64 + metricRecvWriteAtComp uint64 + metricRecvWriteAt uint64 + metricRecvWriteAtWithMap uint64 + metricRecvRemoveFromMap uint64 + metricRecvRemoveDev uint64 + metricRecvDirtyList uint64 + metricSentNeedAt uint64 + metricSentDontNeedAt uint64 +} + +type FromProtocolMetrics struct { + RecvEvents uint64 + RecvHashes uint64 + RecvDevInfo uint64 + RecvAltSources uint64 + RecvReadAt uint64 + RecvWriteAtHash uint64 + RecvWriteAtComp uint64 + RecvWriteAt uint64 + RecvWriteAtWithMap uint64 + RecvRemoveFromMap uint64 + RecvRemoveDev uint64 + RecvDirtyList uint64 + SentNeedAt uint64 + SentDontNeedAt uint64 } func NewFromProtocol(ctx context.Context, dev uint32, provFactory func(*packets.DevInfo) storage.Provider, protocol Protocol) *FromProtocol { @@ -38,6 +71,25 @@ func NewFromProtocol(ctx context.Context, dev uint32, provFactory func(*packets. return fp } +func (fp *FromProtocol) GetMetrics() *FromProtocolMetrics { + return &FromProtocolMetrics{ + RecvEvents: atomic.LoadUint64(&fp.metricRecvEvents), + RecvHashes: atomic.LoadUint64(&fp.metricRecvHashes), + RecvDevInfo: atomic.LoadUint64(&fp.metricRecvDevInfo), + RecvAltSources: atomic.LoadUint64(&fp.metricRecvAltSources), + RecvReadAt: atomic.LoadUint64(&fp.metricRecvReadAt), + RecvWriteAtHash: atomic.LoadUint64(&fp.metricRecvWriteAtHash), + RecvWriteAtComp: atomic.LoadUint64(&fp.metricRecvWriteAtComp), + RecvWriteAt: atomic.LoadUint64(&fp.metricRecvWriteAt), + RecvWriteAtWithMap: atomic.LoadUint64(&fp.metricRecvWriteAtWithMap), + RecvRemoveFromMap: atomic.LoadUint64(&fp.metricRecvRemoveFromMap), + RecvRemoveDev: atomic.LoadUint64(&fp.metricRecvRemoveDev), + RecvDirtyList: atomic.LoadUint64(&fp.metricRecvDirtyList), + SentNeedAt: atomic.LoadUint64(&fp.metricSentNeedAt), + SentDontNeedAt: atomic.LoadUint64(&fp.metricSentDontNeedAt), + } +} + func (fp *FromProtocol) GetAlternateSources() []packets.AlternateSource { fp.alternateSourcesLock.Lock() defer fp.alternateSourcesLock.Unlock() @@ -161,6 +213,8 @@ func (fp *FromProtocol) HandleEvent(cb func(*packets.Event)) error { return err } + atomic.AddUint64(&fp.metricRecvEvents, 1) + if ev.Type == packets.EventCompleted { // Deal with the sync here, and WAIT if needed. storage.SendSiloEvent(fp.prov, "sync.start", storage.SyncStartConfig{ @@ -196,6 +250,8 @@ func (fp *FromProtocol) HandleHashes(cb func(map[uint][sha256.Size]byte)) error return err } + atomic.AddUint64(&fp.metricRecvHashes, 1) + // Relay the hashes, wait and then respond cb(hashes) @@ -217,6 +273,8 @@ func (fp *FromProtocol) HandleDevInfo() error { return err } + atomic.AddUint64(&fp.metricRecvDevInfo, 1) + // Create storage fp.prov = fp.providerFactory(di) numBlocks := (int(fp.prov.Size()) + fp.presentBlockSize - 1) / fp.presentBlockSize @@ -236,6 +294,8 @@ func (fp *FromProtocol) HandleDevInfo() error { return } + atomic.AddUint64(&fp.metricRecvAltSources, 1) + // For now just set it. It only gets sent ONCE at the start of a migration at the moment. fp.alternateSourcesLock.Lock() fp.alternateSources = altSources @@ -274,6 +334,8 @@ func (fp *FromProtocol) HandleReadAt() error { return err } + atomic.AddUint64(&fp.metricRecvReadAt, 1) + // Handle them in goroutines go func(goffset int64, glength int32, gid uint32) { buff := make([]byte, glength) @@ -324,6 +386,8 @@ func (fp *FromProtocol) HandleWriteAt() error { return err } + atomic.AddUint64(&fp.metricRecvWriteAtHash, 1) + // For now, we will simply ack. We do NOT mark it as present. That part will be done when the alternateSources is retrieved. // fp.mark_range_present(int(offset), int(length)) @@ -342,8 +406,14 @@ func (fp *FromProtocol) HandleWriteAt() error { if len(data) > 1 && data[1] == packets.WriteAtCompRLE { offset, writeData, err = packets.DecodeWriteAtComp(data) + if err == nil { + atomic.AddUint64(&fp.metricRecvWriteAtComp, 1) + } } else { offset, writeData, err = packets.DecodeWriteAt(data) + if err == nil { + atomic.AddUint64(&fp.metricRecvWriteAt, 1) + } } if err != nil { return err @@ -388,6 +458,8 @@ func (fp *FromProtocol) HandleWriteAtWithMap(cb func(offset int64, data []byte, return err } + atomic.AddUint64(&fp.metricRecvWriteAtWithMap, 1) + err = cb(offset, writeData, idMap) if err == nil { fp.markRangePresent(int(offset), len(writeData)) @@ -421,6 +493,8 @@ func (fp *FromProtocol) HandleRemoveFromMap(cb func(ids []uint64)) error { return err } + atomic.AddUint64(&fp.metricRecvRemoveFromMap, 1) + cb(ids) /* // TODO: Should probably do this @@ -448,6 +522,8 @@ func (fp *FromProtocol) HandleRemoveDev(cb func()) error { return err } + atomic.AddUint64(&fp.metricRecvRemoveDev, 1) + cb() return nil } @@ -469,6 +545,8 @@ func (fp *FromProtocol) HandleDirtyList(cb func(blocks []uint)) error { return err } + atomic.AddUint64(&fp.metricRecvDirtyList, 1) + // Mark these as non-present (useful for debugging issues) for _, b := range blocks { offset := int(b) * blockSize @@ -488,11 +566,17 @@ func (fp *FromProtocol) HandleDirtyList(cb func(blocks []uint)) error { func (fp *FromProtocol) NeedAt(offset int64, length int32) error { b := packets.EncodeNeedAt(offset, length) _, err := fp.protocol.SendPacket(fp.dev, IDPickAny, b, UrgencyUrgent) + if err != nil { + atomic.AddUint64(&fp.metricSentNeedAt, 1) + } return err } func (fp *FromProtocol) DontNeedAt(offset int64, length int32) error { b := packets.EncodeDontNeedAt(offset, length) _, err := fp.protocol.SendPacket(fp.dev, IDPickAny, b, UrgencyUrgent) + if err != nil { + atomic.AddUint64(&fp.metricSentDontNeedAt, 1) + } return err } diff --git a/pkg/storage/protocol/protocol_rw.go b/pkg/storage/protocol/protocol_rw.go index 679abe0a..55ff356c 100644 --- a/pkg/storage/protocol/protocol_rw.go +++ b/pkg/storage/protocol/protocol_rw.go @@ -11,6 +11,8 @@ import ( "github.com/loopholelabs/silo/pkg/storage/protocol/packets" ) +const packetBufferSize = 32 + type packetinfo struct { id uint32 data []byte @@ -22,18 +24,40 @@ type Waiters struct { } type RW struct { - ctx context.Context - readers []io.Reader - writers []io.Writer - writerHeaders [][]byte - writerLocks []sync.Mutex - txID uint32 - activeDevs map[uint32]bool - activeDevsLock sync.Mutex - waiters map[uint32]Waiters - waitersLock sync.Mutex - newdevFn func(context.Context, Protocol, uint32) - newdevProtocol Protocol + ctx context.Context + readers []io.Reader + writers []io.Writer + writerHeaders [][]byte + writerLocks []sync.Mutex + txID uint32 + activeDevs map[uint32]bool + activeDevsLock sync.Mutex + waiters map[uint32]Waiters + waitersLock sync.Mutex + newdevFn func(context.Context, Protocol, uint32) + newdevProtocol Protocol + metricPacketsSent uint64 + metricDataSent uint64 + metricPacketsRecv uint64 + metricDataRecv uint64 + metricWaitingForID int64 + metricWrites uint64 + metricWriteErrors uint64 +} + +// Wrap the writers and gather metrics on them. +type writeWrapper struct { + w io.Writer + rw *RW +} + +func (ww *writeWrapper) Write(buffer []byte) (int, error) { + n, err := ww.w.Write(buffer) + atomic.AddUint64(&ww.rw.metricWrites, 1) + if err != nil { + atomic.AddUint64(&ww.rw.metricWriteErrors, 1) + } + return n, err } func NewRW(ctx context.Context, readers []io.Reader, writers []io.Writer, newdevFN func(context.Context, Protocol, uint32)) *RW { @@ -52,10 +76,11 @@ func NewRWWithBuffering(ctx context.Context, readers []io.Reader, writers []io.W prw.writers = make([]io.Writer, 0) for _, w := range writers { + ww := &writeWrapper{w: w, rw: prw} if bufferConfig != nil { - prw.writers = append(prw.writers, NewBufferedWriter(w, bufferConfig)) + prw.writers = append(prw.writers, NewBufferedWriter(ww, bufferConfig)) } else { - prw.writers = append(prw.writers, w) + prw.writers = append(prw.writers, ww) } } @@ -68,6 +93,30 @@ func NewRWWithBuffering(ctx context.Context, readers []io.Reader, writers []io.W return prw } +type Metrics struct { + PacketsSent uint64 + DataSent uint64 + UrgentPacketsSent uint64 + UrgentDataSent uint64 + PacketsRecv uint64 + DataRecv uint64 + Writes uint64 + WriteErrors uint64 + WaitingForID int64 +} + +func (p *RW) GetMetrics() *Metrics { + return &Metrics{ + PacketsSent: atomic.LoadUint64(&p.metricPacketsSent), + DataSent: atomic.LoadUint64(&p.metricDataSent), + PacketsRecv: atomic.LoadUint64(&p.metricPacketsRecv), + DataRecv: atomic.LoadUint64(&p.metricDataRecv), + Writes: atomic.LoadUint64(&p.metricWrites), + WriteErrors: atomic.LoadUint64(&p.metricWriteErrors), + WaitingForID: atomic.LoadInt64(&p.metricWaitingForID), + } +} + func (p *RW) SetNewDevProtocol(proto Protocol) { p.newdevProtocol = proto } @@ -120,7 +169,6 @@ func (p *RW) SendPacket(dev uint32, id uint32, data []byte, urgency Urgency) (ui binary.LittleEndian.PutUint32(p.writerHeaders[i][8:], uint32(len(data))) _, err := p.writers[i].Write(p.writerHeaders[i]) - if err != nil { return 0, err } @@ -133,6 +181,11 @@ func (p *RW) SendPacket(dev uint32, id uint32, data []byte, urgency Urgency) (ui _, err = p.writers[i].Write(data) } + if err == nil { + atomic.AddUint64(&p.metricPacketsSent, 1) + atomic.AddUint64(&p.metricDataSent, 4+4+4+uint64(len(data))) + } + return id, err } @@ -168,6 +221,9 @@ func (p *RW) Handle() error { return } + atomic.AddUint64(&p.metricPacketsRecv, 1) + atomic.AddUint64(&p.metricDataRecv, 4+4+4+uint64(length)) + err = p.handlePacket(dev, id, data) if err != nil { errs <- err @@ -205,48 +261,56 @@ func (p *RW) handlePacket(dev uint32, id uint32, data []byte) error { p.waiters[dev] = w } - wqID, okk := w.byID[id] - if !okk { - wqID = make(chan packetinfo, 8) // Some buffer here... - w.byID[id] = wqID - } - - wqCmd, okk := w.byCmd[cmd] - if !okk { - wqCmd = make(chan packetinfo, 8) // Some buffer here... - w.byCmd[cmd] = wqCmd + pi := packetinfo{ + id: id, + data: data, } - p.waitersLock.Unlock() - if packets.IsResponse(cmd) { - wqID <- packetinfo{ - id: id, - data: data, + wqID, okk := w.byID[id] + if !okk { + wqID = make(chan packetinfo, packetBufferSize) + w.byID[id] = wqID } + p.waitersLock.Unlock() + + wqID <- pi } else { - wqCmd <- packetinfo{ - id: id, - data: data, + wqCmd, okk := w.byCmd[cmd] + if !okk { + wqCmd = make(chan packetinfo, packetBufferSize) + w.byCmd[cmd] = wqCmd } + + p.waitersLock.Unlock() + + wqCmd <- pi } return nil } func (p *RW) WaitForPacket(dev uint32, id uint32) ([]byte, error) { + atomic.AddInt64(&p.metricWaitingForID, 1) + defer atomic.AddInt64(&p.metricWaitingForID, -1) + p.waitersLock.Lock() w := p.waiters[dev] wq, okk := w.byID[id] if !okk { - wq = make(chan packetinfo, 8) // Some buffer here... + wq = make(chan packetinfo, packetBufferSize) w.byID[id] = wq } p.waitersLock.Unlock() select { - case p := <-wq: - // TODO: Remove the channel, as we only expect a SINGLE response with this ID. - return p.data, nil + case pack := <-wq: + // Remove the channel, as we only expect a SINGLE response with this ID. + p.waitersLock.Lock() + w := p.waiters[dev] + delete(w.byID, id) + p.waitersLock.Unlock() + + return pack.data, nil case <-p.ctx.Done(): return nil, p.ctx.Err() } @@ -257,7 +321,7 @@ func (p *RW) WaitForCommand(dev uint32, cmd byte) (uint32, []byte, error) { w := p.waiters[dev] wq, okk := w.byCmd[cmd] if !okk { - wq = make(chan packetinfo, 8) // Some buffer here... + wq = make(chan packetinfo, packetBufferSize) w.byCmd[cmd] = wq } p.waitersLock.Unlock() diff --git a/pkg/storage/protocol/test_protocol_latency.go b/pkg/storage/protocol/test_protocol_latency.go deleted file mode 100644 index 4f3cc197..00000000 --- a/pkg/storage/protocol/test_protocol_latency.go +++ /dev/null @@ -1,47 +0,0 @@ -package protocol - -import ( - "sync/atomic" - "time" -) - -/** - * FIXME: This is lame, doesn't accurately model latency. - * - */ - -type TestProtocolLatency struct { - proto Protocol - recvLatency time.Duration - isFirst atomic.Bool -} - -func NewTestProtocolLatency(proto Protocol, recvLatency time.Duration) Protocol { - p := &TestProtocolLatency{ - proto: proto, - recvLatency: recvLatency, - } - - p.isFirst.Store(true) - return p -} - -func (p *TestProtocolLatency) SendPacket(dev uint32, id uint32, data []byte, urgency Urgency) (uint32, error) { - return p.proto.SendPacket(dev, id, data, urgency) -} - -func (p *TestProtocolLatency) WaitForPacket(dev uint32, id uint32) ([]byte, error) { - if p.isFirst.Load() { - time.Sleep(p.recvLatency) - p.isFirst.Store(false) - } - return p.proto.WaitForPacket(dev, id) -} - -func (p *TestProtocolLatency) WaitForCommand(dev uint32, cmd byte) (uint32, []byte, error) { - if p.isFirst.Load() { - time.Sleep(p.recvLatency) - p.isFirst.Store(false) - } - return p.proto.WaitForCommand(dev, cmd) -} diff --git a/pkg/storage/protocol/test_protocol_latency_test.go b/pkg/storage/protocol/test_protocol_latency_test.go deleted file mode 100644 index 72288c95..00000000 --- a/pkg/storage/protocol/test_protocol_latency_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package protocol - -import ( - "context" - "crypto/rand" - "testing" - "time" - - "github.com/loopholelabs/silo/pkg/storage" - "github.com/loopholelabs/silo/pkg/storage/protocol/packets" - "github.com/loopholelabs/silo/pkg/storage/sources" - "github.com/stretchr/testify/assert" -) - -func TestTestProtocolLatency(t *testing.T) { - - size := 1024 * 1024 - var store storage.Provider - - // Setup a protocol in the middle, and make sure our reads/writes get through ok - - prm := NewMockProtocol(context.TODO()) - // Add some recv latency - pr := NewTestProtocolLatency(prm, 50*time.Millisecond) - - sourceToProtocol := NewToProtocol(uint64(size), 1, pr) - - storeFactory := func(di *packets.DevInfo) storage.Provider { - store = sources.NewMemoryStorage(int(di.Size)) - return store - } - - destFromProtocol := NewFromProtocol(context.TODO(), 1, storeFactory, pr) - - go func() { - _ = destFromProtocol.HandleDevInfo() - }() - go func() { - _ = destFromProtocol.HandleReadAt() - }() - go func() { - _ = destFromProtocol.HandleWriteAt() - }() - - ctime := time.Now() - - // Send devInfo - err := sourceToProtocol.SendDevInfo("test", 4096, "") - assert.NoError(t, err) - - buff := make([]byte, 4096) - _, err = rand.Read(buff) - assert.NoError(t, err) - n, err := sourceToProtocol.WriteAt(buff, 12) - - assert.NoError(t, err) - assert.Equal(t, len(buff), n) - - // Check it took some time - assert.WithinDuration(t, ctime.Add(50*time.Millisecond), time.Now(), 10*time.Millisecond) -} diff --git a/pkg/storage/protocol/to_protocol.go b/pkg/storage/protocol/to_protocol.go index 705d1b7d..50202f7b 100644 --- a/pkg/storage/protocol/to_protocol.go +++ b/pkg/storage/protocol/to_protocol.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/sha256" "encoding/binary" + "sync/atomic" "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/protocol/packets" @@ -11,11 +12,29 @@ import ( type ToProtocol struct { storage.ProviderWithEvents - size uint64 - dev uint32 - protocol Protocol - CompressedWrites bool - alternateSources []packets.AlternateSource + size uint64 + dev uint32 + protocol Protocol + CompressedWrites bool + alternateSources []packets.AlternateSource + metricSentEvents uint64 + metricSentAltSources uint64 + metricSentHashes uint64 + metricSentDevInfo uint64 + metricSentRemoveDev uint64 + metricSentDirtyList uint64 + metricSentReadAt uint64 + metricSentWriteAtHash uint64 + metricSentWriteAtHashBytes uint64 + metricSentWriteAtComp uint64 + metricSentWriteAtCompBytes uint64 + metricSentWriteAtCompDataBytes uint64 + metricSentWriteAt uint64 + metricSentWriteAtBytes uint64 + metricSentWriteAtWithMap uint64 + metricSentRemoveFromMap uint64 + metricRecvNeedAt uint64 + metricRecvDontNeedAt uint64 } func NewToProtocol(size uint64, deviceID uint32, p Protocol) *ToProtocol { @@ -27,18 +46,70 @@ func NewToProtocol(size uint64, deviceID uint32, p Protocol) *ToProtocol { } } +type ToProtocolMetrics struct { + SentEvents uint64 + SentAltSources uint64 + SentHashes uint64 + SentDevInfo uint64 + SentRemoveDev uint64 + SentDirtyList uint64 + SentReadAt uint64 + SentWriteAtHash uint64 + SentWriteAtHashBytes uint64 + SentWriteAtComp uint64 + SentWriteAtCompBytes uint64 + SentWriteAtCompDataBytes uint64 + SentWriteAt uint64 + SentWriteAtBytes uint64 + SentWriteAtWithMap uint64 + SentRemoveFromMap uint64 + RecvNeedAt uint64 + RecvDontNeedAt uint64 +} + +func (i *ToProtocol) GetMetrics() *ToProtocolMetrics { + return &ToProtocolMetrics{ + SentEvents: atomic.LoadUint64(&i.metricSentEvents), + SentAltSources: atomic.LoadUint64(&i.metricSentAltSources), + SentHashes: atomic.LoadUint64(&i.metricSentHashes), + SentDevInfo: atomic.LoadUint64(&i.metricSentDevInfo), + SentRemoveDev: atomic.LoadUint64(&i.metricSentRemoveDev), + SentDirtyList: atomic.LoadUint64(&i.metricSentDirtyList), + SentReadAt: atomic.LoadUint64(&i.metricSentReadAt), + SentWriteAtHash: atomic.LoadUint64(&i.metricSentWriteAtHash), + SentWriteAtHashBytes: atomic.LoadUint64(&i.metricSentWriteAtHashBytes), + SentWriteAtComp: atomic.LoadUint64(&i.metricSentWriteAtComp), + SentWriteAtCompBytes: atomic.LoadUint64(&i.metricSentWriteAtCompBytes), + SentWriteAtCompDataBytes: atomic.LoadUint64(&i.metricSentWriteAtCompDataBytes), + SentWriteAt: atomic.LoadUint64(&i.metricSentWriteAt), + SentWriteAtBytes: atomic.LoadUint64(&i.metricSentWriteAtBytes), + SentWriteAtWithMap: atomic.LoadUint64(&i.metricSentWriteAtWithMap), + SentRemoveFromMap: atomic.LoadUint64(&i.metricSentRemoveFromMap), + RecvNeedAt: atomic.LoadUint64(&i.metricRecvNeedAt), + RecvDontNeedAt: atomic.LoadUint64(&i.metricRecvDontNeedAt), + } +} + // Support Silo Events func (i *ToProtocol) SendSiloEvent(eventType storage.EventType, eventData storage.EventData) []storage.EventReturnData { if eventType == storage.EventType("sources") { i.alternateSources = eventData.([]packets.AlternateSource) // Send the list of alternate sources here... - h := packets.EncodeAlternateSources(i.alternateSources) - _, _ = i.protocol.SendPacket(i.dev, IDPickAny, h, UrgencyUrgent) + i.SendAltSources(i.alternateSources) // For now, we do not check the error. If there was a protocol / io error, we should see it on the next send } return nil } +func (i *ToProtocol) SendAltSources(s []packets.AlternateSource) error { + h := packets.EncodeAlternateSources(s) + _, err := i.protocol.SendPacket(i.dev, IDPickAny, h, UrgencyUrgent) + if err == nil { + atomic.AddUint64(&i.metricSentAltSources, 1) + } + return err +} + func (i *ToProtocol) SendEvent(e *packets.Event) error { b := packets.EncodeEvent(e) id, err := i.protocol.SendPacket(i.dev, IDPickAny, b, UrgencyUrgent) @@ -46,6 +117,8 @@ func (i *ToProtocol) SendEvent(e *packets.Event) error { return err } + atomic.AddUint64(&i.metricSentEvents, 1) + // Wait for acknowledgement r, err := i.protocol.WaitForPacket(i.dev, id) if err != nil { @@ -61,6 +134,9 @@ func (i *ToProtocol) SendHashes(hashes map[uint][sha256.Size]byte) error { if err != nil { return err } + + atomic.AddUint64(&i.metricSentHashes, 1) + // Wait for an ack r, err := i.protocol.WaitForPacket(i.dev, id) if err != nil { @@ -79,13 +155,24 @@ func (i *ToProtocol) SendDevInfo(name string, blockSize uint32, schema string) e } b := packets.EncodeDevInfo(di) _, err := i.protocol.SendPacket(i.dev, IDPickAny, b, UrgencyUrgent) + if err != nil { + return err + } + + atomic.AddUint64(&i.metricSentDevInfo, 1) return err } func (i *ToProtocol) RemoveDev() error { f := packets.EncodeRemoveDev() _, err := i.protocol.SendPacket(i.dev, IDPickAny, f, UrgencyUrgent) - return err + if err != nil { + return err + } + + atomic.AddUint64(&i.metricSentRemoveDev, 1) + + return nil } func (i *ToProtocol) DirtyList(blockSize int, blocks []uint) error { @@ -95,6 +182,8 @@ func (i *ToProtocol) DirtyList(blockSize int, blocks []uint) error { return err } + atomic.AddUint64(&i.metricSentDirtyList, 1) + // Wait for the response... r, err := i.protocol.WaitForPacket(i.dev, id) if err != nil { @@ -112,6 +201,9 @@ func (i *ToProtocol) ReadAt(buffer []byte, offset int64) (int, error) { if err != nil { return 0, err } + + atomic.AddUint64(&i.metricSentReadAt, 1) + // Wait for the response... r, err := i.protocol.WaitForPacket(i.dev, id) if err != nil { @@ -143,6 +235,10 @@ func (i *ToProtocol) WriteAt(buffer []byte, offset int64) (int, error) { if bytes.Equal(hash[:], as.Hash[:]) { data := packets.EncodeWriteAtHash(as.Offset, as.Length, as.Hash[:]) id, err = i.protocol.SendPacket(i.dev, IDPickAny, data, UrgencyNormal) + if err == nil { + atomic.AddUint64(&i.metricSentWriteAtHash, 1) + atomic.AddUint64(&i.metricSentWriteAtHashBytes, uint64(as.Length)) + } dontSendData = true } break @@ -153,9 +249,18 @@ func (i *ToProtocol) WriteAt(buffer []byte, offset int64) (int, error) { if i.CompressedWrites { data := packets.EncodeWriteAtComp(offset, buffer) id, err = i.protocol.SendPacket(i.dev, IDPickAny, data, UrgencyNormal) + if err == nil { + atomic.AddUint64(&i.metricSentWriteAtComp, 1) + atomic.AddUint64(&i.metricSentWriteAtCompBytes, uint64(len(buffer))) + atomic.AddUint64(&i.metricSentWriteAtCompDataBytes, uint64(len(data))) + } } else { data := packets.EncodeWriteAt(offset, buffer) id, err = i.protocol.SendPacket(i.dev, IDPickAny, data, UrgencyNormal) + if err == nil { + atomic.AddUint64(&i.metricSentWriteAt, 1) + atomic.AddUint64(&i.metricSentWriteAtBytes, uint64(len(buffer))) + } } } if err != nil { @@ -190,6 +295,9 @@ func (i *ToProtocol) WriteAtWithMap(buffer []byte, offset int64, idMap map[uint6 if err != nil { return 0, err } + + atomic.AddUint64(&i.metricSentWriteAtWithMap, 1) + // Wait for the response... r, err := i.protocol.WaitForPacket(i.dev, id) if err != nil { @@ -214,6 +322,9 @@ func (i *ToProtocol) WriteAtWithMap(buffer []byte, offset int64, idMap map[uint6 func (i *ToProtocol) RemoveFromMap(ids []uint64) error { f := packets.EncodeRemoveFromMap(ids) _, err := i.protocol.SendPacket(i.dev, IDPickAny, f, UrgencyUrgent) + if err == nil { + atomic.AddUint64(&i.metricSentRemoveFromMap, 1) + } return err } @@ -246,6 +357,8 @@ func (i *ToProtocol) HandleNeedAt(cb func(offset int64, length int32)) error { return err } + atomic.AddUint64(&i.metricRecvNeedAt, 1) + // We could spin up a goroutine here, but the assumption is that cb won't take long. cb(offset, length) } @@ -263,6 +376,8 @@ func (i *ToProtocol) HandleDontNeedAt(cb func(offset int64, length int32)) error return err } + atomic.AddUint64(&i.metricRecvDontNeedAt, 1) + // We could spin up a goroutine here, but the assumption is that cb won't take long. cb(offset, length) } diff --git a/pkg/storage/storage_events_test.go b/pkg/storage/storage_events_test.go index 4bb16958..7209925d 100644 --- a/pkg/storage/storage_events_test.go +++ b/pkg/storage/storage_events_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/loopholelabs/logging" - "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/dirtytracker" "github.com/loopholelabs/silo/pkg/storage/modules" @@ -199,6 +198,8 @@ func TestStorageEventsForModules(tt *testing.T) { } } + log := logging.New(logging.Zerolog, "storage", os.Stderr) + // Start with some memory storage, and register a handler on it sourceStorageMem := sources.NewMemoryStorage(size) addModule(sourceStorageMem) @@ -228,7 +229,7 @@ func TestStorageEventsForModules(tt *testing.T) { addModule(mod7) mod8 := modules.NewLockable(mod7) addModule(mod8) - mod9 := modules.NewLogger(mod8, "prefix", logging.New(logging.Zerolog, "silo", os.Stdout)) + mod9 := modules.NewLogger(mod8, "prefix", log) addModule(mod9) mod10 := modules.NewMetrics(mod9) addModule(mod10) diff --git a/pkg/storage/volatilitymonitor/volatility_monitor.go b/pkg/storage/volatilitymonitor/volatility_monitor.go index 90a3fdc9..3fab9c4c 100644 --- a/pkg/storage/volatilitymonitor/volatility_monitor.go +++ b/pkg/storage/volatilitymonitor/volatility_monitor.go @@ -21,6 +21,12 @@ type VolatilityMonitor struct { totalData *volatilityData } +type Metrics struct { + BlockSize uint64 + Available uint64 + Volatility uint64 +} + // Relay events to embedded StorageProvider func (i *VolatilityMonitor) SendSiloEvent(eventType storage.EventType, eventData storage.EventData) []storage.EventReturnData { data := i.ProviderWithEvents.SendSiloEvent(eventType, eventData) @@ -41,6 +47,14 @@ func NewVolatilityMonitor(prov storage.Provider, blockSize int, expiry time.Dura } } +func (i *VolatilityMonitor) GetMetrics() *Metrics { + return &Metrics{ + BlockSize: uint64(i.blockSize), + Available: uint64(i.available.Count(0, i.available.Length())), + Volatility: uint64(i.GetTotalVolatility()), + } +} + // from storage.BlockOrder func (i *VolatilityMonitor) GetNext() *storage.BlockInfo { diff --git a/pkg/storage/waitingcache/waiting_cache.go b/pkg/storage/waitingcache/waiting_cache.go index 38a7025f..471c77ea 100644 --- a/pkg/storage/waitingcache/waiting_cache.go +++ b/pkg/storage/waitingcache/waiting_cache.go @@ -2,6 +2,8 @@ package waitingcache import ( "sync" + "sync/atomic" + "time" "github.com/google/uuid" "github.com/loopholelabs/logging/types" @@ -16,17 +18,38 @@ import ( * */ type WaitingCache struct { - logger types.RootLogger - uuid uuid.UUID - prov storage.Provider - local *Local - remote *Remote - writeLock sync.Mutex - blockSize int - size uint64 - lockers map[uint]*sync.RWMutex - lockersLock sync.Mutex - allowLocalWrites bool + logger types.RootLogger + uuid uuid.UUID + prov storage.Provider + local *Local + remote *Remote + writeLock sync.Mutex + blockSize int + size uint64 + lockers map[uint]*sync.RWMutex + lockersLock sync.Mutex + allowLocalWrites bool + metricWaitForBlock uint64 + metricWaitForBlockHadRemote uint64 + metricWaitForBlockHadLocal uint64 + metricWaitForBlockTime uint64 + metricWaitForBlockLock uint64 + metricWaitForBlockLockDone uint64 + metricMarkAvailableLocalBlock uint64 + metricMarkAvailableRemoteBlock uint64 +} + +type Metrics struct { + WaitForBlock uint64 + WaitForBlockHadRemote uint64 + WaitForBlockHadLocal uint64 + WaitForBlockTime time.Duration + WaitForBlockLock uint64 + WaitForBlockLockDone uint64 + MarkAvailableLocalBlock uint64 + MarkAvailableRemoteBlock uint64 + AvailableLocal uint64 + AvailableRemote uint64 } func NewWaitingCache(prov storage.Provider, blockSize int) (*Local, *Remote) { @@ -57,6 +80,21 @@ func NewWaitingCacheWithLogger(prov storage.Provider, blockSize int, log types.R return wc.local, wc.remote } +func (i *WaitingCache) GetMetrics() *Metrics { + return &Metrics{ + WaitForBlock: atomic.LoadUint64(&i.metricWaitForBlock), + WaitForBlockHadRemote: atomic.LoadUint64(&i.metricWaitForBlockHadRemote), + WaitForBlockHadLocal: atomic.LoadUint64(&i.metricWaitForBlockHadLocal), + WaitForBlockTime: time.Duration(atomic.LoadUint64(&i.metricWaitForBlockTime)), + WaitForBlockLock: atomic.LoadUint64(&i.metricWaitForBlockLock), + WaitForBlockLockDone: atomic.LoadUint64(&i.metricWaitForBlockLockDone), + MarkAvailableLocalBlock: atomic.LoadUint64(&i.metricMarkAvailableLocalBlock), + MarkAvailableRemoteBlock: atomic.LoadUint64(&i.metricMarkAvailableRemoteBlock), + AvailableLocal: uint64(i.local.available.Count(0, i.local.available.Length())), + AvailableRemote: uint64(i.remote.available.Count(0, i.local.available.Length())), + } +} + func (i *WaitingCache) waitForBlocks(bStart uint, bEnd uint, lockCB func(b uint)) { // TODO: Optimize this for b := bStart; b < bEnd; b++ { @@ -76,17 +114,23 @@ func (i *WaitingCache) waitForBlock(b uint, lockCB func(b uint)) { Msg("waitForBlock complete") } + atomic.AddUint64(&i.metricWaitForBlock, 1) + + i.lockersLock.Lock() // If we have it locally, return. if i.local.available.BitSet(int(b)) { + i.lockersLock.Unlock() + atomic.AddUint64(&i.metricWaitForBlockHadLocal, 1) return } - i.lockersLock.Lock() - avail := i.remote.available.BitSet(int(b)) - if avail { + // If we have it remote, return. + if i.remote.available.BitSet(int(b)) { i.lockersLock.Unlock() + atomic.AddUint64(&i.metricWaitForBlockHadRemote, 1) return } + rwl, ok := i.lockers[b] if !ok { // The first waiter will call .Lock() @@ -98,55 +142,62 @@ func (i *WaitingCache) waitForBlock(b uint, lockCB func(b uint)) { i.lockersLock.Unlock() // Lock for reading (This will wait until the write lock has been unlocked by a writer for this block). + atomic.AddUint64(&i.metricWaitForBlockLock, 1) + ctime := time.Now() rwl.RLock() + atomic.AddUint64(&i.metricWaitForBlockTime, uint64(time.Since(ctime))) + atomic.AddUint64(&i.metricWaitForBlockLockDone, 1) } -func (i *WaitingCache) markAvailableBlockLocal(b uint) { +func (i *WaitingCache) markAvailableRemoteBlocks(bStart uint, bEnd uint) { + // TODO: Optimize this + for b := bStart; b < bEnd; b++ { + i.markAvailableRemoteBlock(b) + } +} + +func (i *WaitingCache) markAvailableRemoteBlock(b uint) { if i.logger != nil { i.logger.Trace(). Str("uuid", i.uuid.String()). Uint("block", b). - Msg("markAvailableLocalBlock") + Msg("markAvailableRemoteBlock") } + atomic.AddUint64(&i.metricMarkAvailableRemoteBlock, 1) + i.lockersLock.Lock() - avail := i.local.available.BitSet(int(b)) + avail := i.remote.available.BitSet(int(b)) rwl, ok := i.lockers[b] if !avail { - i.local.available.SetBit(int(b)) + i.remote.available.SetBit(int(b)) } - i.lockersLock.Unlock() + // If we have waiters for it, we can go ahead and unlock to allow them to read it. if !avail && ok { rwl.Unlock() } - // Now we can get rid of the lock... - i.lockersLock.Lock() + // Now we can get rid of the lock on this block... delete(i.lockers, b) i.lockersLock.Unlock() } -func (i *WaitingCache) markAvailableRemoteBlocks(bStart uint, bEnd uint) { - // TODO: Optimize this - for b := bStart; b < bEnd; b++ { - i.markAvailableRemoteBlock(b) - } -} - -func (i *WaitingCache) markAvailableRemoteBlock(b uint) { +func (i *WaitingCache) markAvailableLocalBlock(b uint) { if i.logger != nil { i.logger.Trace(). Str("uuid", i.uuid.String()). Uint("block", b). - Msg("markAvailableRemoteBlock") + Msg("markAvailableLocalBlock") } + atomic.AddUint64(&i.metricMarkAvailableLocalBlock, 1) + i.lockersLock.Lock() - avail := i.remote.available.BitSet(int(b)) + avail := i.local.available.BitSet(int(b)) rwl, ok := i.lockers[b] if !avail { - i.remote.available.SetBit(int(b)) + i.local.available.SetBit(int(b)) } // If we have waiters for it, we can go ahead and unlock to allow them to read it. @@ -154,7 +205,7 @@ func (i *WaitingCache) markAvailableRemoteBlock(b uint) { rwl.Unlock() } - // Now we can get rid of the lock on this block... + // Now we can get rid of the lock... delete(i.lockers, b) i.lockersLock.Unlock() } diff --git a/pkg/storage/waitingcache/waiting_cache_local.go b/pkg/storage/waitingcache/waiting_cache_local.go index bae8f9ec..18236c37 100644 --- a/pkg/storage/waitingcache/waiting_cache_local.go +++ b/pkg/storage/waitingcache/waiting_cache_local.go @@ -19,6 +19,10 @@ func (wcl *Local) SendSiloEvent(eventType storage.EventType, eventData storage.E return append(data, storage.SendSiloEvent(wcl.wc.prov, eventType, eventData)...) } +func (wcl *Local) GetMetrics() *Metrics { + return wcl.wc.GetMetrics() +} + func (wcl *Local) ReadAt(buffer []byte, offset int64) (int, error) { if wcl.wc.logger != nil { wcl.wc.logger.Trace(). @@ -32,7 +36,6 @@ func (wcl *Local) ReadAt(buffer []byte, offset int64) (int, error) { Int("length", len(buffer)). Msg("local ReadAt complete") } - end := uint64(offset + int64(len(buffer))) if end > wcl.wc.size { end = wcl.wc.size @@ -90,7 +93,7 @@ func (wcl *Local) WriteAt(buffer []byte, offset int64) (int, error) { if bEnd > bStart { num := int32(0) for b := bStart; b < bEnd; b++ { - wcl.wc.markAvailableBlockLocal(b) + wcl.wc.markAvailableLocalBlock(b) num += int32(wcl.wc.blockSize) } wcl.DontNeedAt(int64(bStart)*int64(wcl.wc.blockSize), num) diff --git a/pkg/storage/waitingcache/waiting_cache_remote.go b/pkg/storage/waitingcache/waiting_cache_remote.go index f408bc57..0e5fa5cc 100644 --- a/pkg/storage/waitingcache/waiting_cache_remote.go +++ b/pkg/storage/waitingcache/waiting_cache_remote.go @@ -19,6 +19,10 @@ func (wcr *Remote) SendSiloEvent(eventType storage.EventType, eventData storage. return append(data, storage.SendSiloEvent(wcr.wc.prov, eventType, eventData)...) } +func (wcr *Remote) GetMetrics() *Metrics { + return wcr.wc.GetMetrics() +} + func (wcr *Remote) ReadAt(_ []byte, _ int64) (int, error) { // Remote reads are unsupported at the moment. return 0, io.EOF