Skip to content

Commit

Permalink
Sync now passes logger into migrator
Browse files Browse the repository at this point in the history
Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Added logging to nbd, tidied up nbd config

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Added logging to device

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Added log for connect/serve commands

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

storage logger now uses loopholelabs lib

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

lint

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Updated with threadsafe bytes buffer for log in tests

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

nbd dispatch returns fatal errors to caller

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Added optional logging to waiting cache.

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Added metrics to ProtocolRW and ToProtocol. Logged in cmd/serve

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

lint fix

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Added metrics to waitingcache

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

connect now logs protocol metrics at end

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Added metrics to FromProtocol and logs at end of cmd/connect

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Added metrics for nbd expose. Logged in cmd/serve

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Latest metrics

Added metrics on dirtyTracker

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Added metrics to volatilitymonitor

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Added general provider metrics

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Added other toProtocol metris

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

metrics tidy and new buffered writer

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Add buffered writer option delay timer

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

RW can now use buffering

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Buffered writer working

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Fix

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

nbd lock

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Added nbd and waitingcache metrics

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Removed buffered write and rebase to new log pr

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>

Removed write buffer bits

Signed-off-by: Jimmy Moore <jamesmoore@loopholelabs.io>
  • Loading branch information
jimmyaxod committed Nov 19, 2024
1 parent 88bdb38 commit d504fe4
Show file tree
Hide file tree
Showing 25 changed files with 1,484 additions and 244 deletions.
108 changes: 107 additions & 1 deletion cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,29 @@ import (
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"os/signal"
"sync"
"syscall"
"time"

"github.com/loopholelabs/logging"
"github.com/loopholelabs/logging/types"
"github.com/loopholelabs/silo/pkg/storage"
"github.com/loopholelabs/silo/pkg/storage/config"
"github.com/loopholelabs/silo/pkg/storage/expose"
"github.com/loopholelabs/silo/pkg/storage/integrity"
"github.com/loopholelabs/silo/pkg/storage/metrics"
"github.com/loopholelabs/silo/pkg/storage/modules"
"github.com/loopholelabs/silo/pkg/storage/protocol"
"github.com/loopholelabs/silo/pkg/storage/protocol/packets"
"github.com/loopholelabs/silo/pkg/storage/sources"
"github.com/loopholelabs/silo/pkg/storage/waitingcache"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"

"github.com/fatih/color"
Expand Down Expand Up @@ -49,6 +56,10 @@ var connectMountDev bool

var connectProgress bool

var connectDebug bool

var connectMetrics string

// List of ExposedStorage so they can be cleaned up on exit.
var dstExposed []storage.ExposedStorage

Expand All @@ -63,13 +74,48 @@ func init() {
cmdConnect.Flags().BoolVarP(&connectExposeDev, "expose", "e", false, "Expose as an nbd devices")
cmdConnect.Flags().BoolVarP(&connectMountDev, "mount", "m", false, "Mount the nbd devices")
cmdConnect.Flags().BoolVarP(&connectProgress, "progress", "p", false, "Show progress")
cmdConnect.Flags().BoolVarP(&connectDebug, "debug", "d", false, "Debug logging (trace)")
cmdConnect.Flags().StringVarP(&connectMetrics, "metrics", "M", "", "Prom metrics address")
}

/**
* Connect to a silo source and stream whatever devices are available.
*
*/
func runConnect(_ *cobra.Command, _ []string) {
var log types.RootLogger
var reg *prometheus.Registry
var siloMetrics *metrics.Metrics

if connectDebug {
log = logging.New(logging.Zerolog, "silo.connect", os.Stderr)
log.SetLevel(types.TraceLevel)
}

if connectMetrics != "" {
reg = prometheus.NewRegistry()

siloMetrics = metrics.New(reg)

// Add the default go metrics
reg.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)

http.Handle("/metrics", promhttp.HandlerFor(
reg,
promhttp.HandlerOpts{
// Opt into OpenMetrics to support exemplars.
EnableOpenMetrics: true,
// Pass custom registry
Registry: reg,
},
))

go http.ListenAndServe(connectMetrics, nil)
}

if connectProgress {
dstProgress = mpb.New(
mpb.WithOutput(color.Output),
Expand Down Expand Up @@ -107,6 +153,10 @@ func runConnect(_ *cobra.Command, _ []string) {

protoCtx, protoCancelfn := context.WithCancel(context.TODO())

handleIncomingDevice := func(ctx context.Context, pro protocol.Protocol, dev uint32) {
handleIncomingDeviceWithLogging(ctx, pro, dev, log, siloMetrics)
}

pro := protocol.NewRW(protoCtx, []io.Reader{con}, []io.Writer{con}, handleIncomingDevice)

// Let the protocol do its thing.
Expand All @@ -121,12 +171,26 @@ func runConnect(_ *cobra.Command, _ []string) {
protoCancelfn()
}()

if siloMetrics != nil {
siloMetrics.AddProtocol("protocol", pro)
}

dstWG.Wait() // Wait until the migrations have completed...

if connectProgress {
dstProgress.Wait()
}

if log != nil {
metrics := pro.GetMetrics()
log.Debug().
Uint64("PacketsSent", metrics.PacketsSent).
Uint64("DataSent", metrics.DataSent).
Uint64("PacketsRecv", metrics.PacketsRecv).
Uint64("DataRecv", metrics.DataRecv).
Msg("protocol metrics")
}

fmt.Printf("\nMigrations completed. Please ctrl-c if you want to shut down, or wait an hour :)\n")

// We should pause here, to allow the user to do things with the devices
Expand All @@ -139,7 +203,7 @@ func runConnect(_ *cobra.Command, _ []string) {
}

// Handle a new incoming device. This is called when a packet is received for a device we haven't heard about before.
func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32) {
func handleIncomingDeviceWithLogging(ctx context.Context, pro protocol.Protocol, dev uint32, log types.RootLogger, met *metrics.Metrics) {
var destStorage storage.Provider
var destWaitingLocal *waitingcache.Local
var destWaitingRemote *waitingcache.Remote
Expand All @@ -151,6 +215,7 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32
var bar *mpb.Bar

var blockSize uint
var deviceName string

var statusString = " "
var statusVerify = " "
Expand All @@ -174,6 +239,7 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32
}

blockSize = uint(di.BlockSize)
deviceName = di.Name

statusFn := func(_ decor.Statistics) string {
return statusString + statusVerify
Expand Down Expand Up @@ -267,6 +333,10 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32

dest = protocol.NewFromProtocol(ctx, dev, storageFactory, pro)

if met != nil {
met.AddFromProtocol(deviceName, dest)
}

var handlerWG sync.WaitGroup

handlerWG.Add(1)
Expand Down Expand Up @@ -304,6 +374,42 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32
// Check we have all data...
case packets.EventCompleted:

if log != nil {
m := destWaitingLocal.GetMetrics()
log.Debug().
Uint64("WaitForBlock", m.WaitForBlock).
Uint64("WaitForBlockHadRemote", m.WaitForBlockHadRemote).
Uint64("WaitForBlockHadLocal", m.WaitForBlockHadLocal).
Uint64("WaitForBlockTimeMS", uint64(m.WaitForBlockTime.Milliseconds())).
Uint64("WaitForBlockLock", m.WaitForBlockLock).
Uint64("WaitForBlockLockDone", m.WaitForBlockLockDone).
Uint64("MarkAvailableLocalBlock", m.MarkAvailableLocalBlock).
Uint64("MarkAvailableRemoteBlock", m.MarkAvailableRemoteBlock).
Uint64("AvailableLocal", m.AvailableLocal).
Uint64("AvailableRemote", m.AvailableRemote).
Str("name", deviceName).
Msg("waitingCacheMetrics")

fromMetrics := dest.GetMetrics()
log.Debug().
Uint64("RecvEvents", fromMetrics.RecvEvents).
Uint64("RecvHashes", fromMetrics.RecvHashes).
Uint64("RecvDevInfo", fromMetrics.RecvDevInfo).
Uint64("RecvAltSources", fromMetrics.RecvAltSources).
Uint64("RecvReadAt", fromMetrics.RecvReadAt).
Uint64("RecvWriteAtHash", fromMetrics.RecvWriteAtHash).
Uint64("RecvWriteAtComp", fromMetrics.RecvWriteAtComp).
Uint64("RecvWriteAt", fromMetrics.RecvWriteAt).
Uint64("RecvWriteAtWithMap", fromMetrics.RecvWriteAtWithMap).
Uint64("RecvRemoveFromMap", fromMetrics.RecvRemoveFromMap).
Uint64("RecvRemoveDev", fromMetrics.RecvRemoveDev).
Uint64("RecvDirtyList", fromMetrics.RecvDirtyList).
Uint64("SentNeedAt", fromMetrics.SentNeedAt).
Uint64("SentDontNeedAt", fromMetrics.SentDontNeedAt).
Str("name", deviceName).
Msg("fromProtocolMetrics")
}

// We completed the migration, but we should wait for handlers to finish before we ok things...
// fmt.Printf("Completed, now wait for handlers...\n")
go func() {
Expand Down
Loading

0 comments on commit d504fe4

Please sign in to comment.