Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jamesmoore/logging #49

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 107 additions & 1 deletion cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,29 @@ import (
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"os/signal"
"sync"
"syscall"
"time"

"github.com/loopholelabs/logging"
"github.com/loopholelabs/logging/types"
"github.com/loopholelabs/silo/pkg/storage"
"github.com/loopholelabs/silo/pkg/storage/config"
"github.com/loopholelabs/silo/pkg/storage/expose"
"github.com/loopholelabs/silo/pkg/storage/integrity"
"github.com/loopholelabs/silo/pkg/storage/metrics"
"github.com/loopholelabs/silo/pkg/storage/modules"
"github.com/loopholelabs/silo/pkg/storage/protocol"
"github.com/loopholelabs/silo/pkg/storage/protocol/packets"
"github.com/loopholelabs/silo/pkg/storage/sources"
"github.com/loopholelabs/silo/pkg/storage/waitingcache"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"

"github.com/fatih/color"
Expand Down Expand Up @@ -49,6 +56,10 @@ var connectMountDev bool

var connectProgress bool

var connectDebug bool

var connectMetrics string

// List of ExposedStorage so they can be cleaned up on exit.
var dstExposed []storage.ExposedStorage

Expand All @@ -63,13 +74,48 @@ func init() {
cmdConnect.Flags().BoolVarP(&connectExposeDev, "expose", "e", false, "Expose as an nbd devices")
cmdConnect.Flags().BoolVarP(&connectMountDev, "mount", "m", false, "Mount the nbd devices")
cmdConnect.Flags().BoolVarP(&connectProgress, "progress", "p", false, "Show progress")
cmdConnect.Flags().BoolVarP(&connectDebug, "debug", "d", false, "Debug logging (trace)")
cmdConnect.Flags().StringVarP(&connectMetrics, "metrics", "M", "", "Prom metrics address")
}

/**
* Connect to a silo source and stream whatever devices are available.
*
*/
func runConnect(_ *cobra.Command, _ []string) {
var log types.RootLogger
var reg *prometheus.Registry
var siloMetrics *metrics.Metrics

if connectDebug {
log = logging.New(logging.Zerolog, "silo.connect", os.Stderr)
log.SetLevel(types.TraceLevel)
}

if connectMetrics != "" {
reg = prometheus.NewRegistry()

siloMetrics = metrics.New(reg)

// Add the default go metrics
reg.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)

http.Handle("/metrics", promhttp.HandlerFor(
reg,
promhttp.HandlerOpts{
// Opt into OpenMetrics to support exemplars.
EnableOpenMetrics: true,
// Pass custom registry
Registry: reg,
},
))

go http.ListenAndServe(connectMetrics, nil)
}

if connectProgress {
dstProgress = mpb.New(
mpb.WithOutput(color.Output),
Expand Down Expand Up @@ -107,6 +153,10 @@ func runConnect(_ *cobra.Command, _ []string) {

protoCtx, protoCancelfn := context.WithCancel(context.TODO())

handleIncomingDevice := func(ctx context.Context, pro protocol.Protocol, dev uint32) {
handleIncomingDeviceWithLogging(ctx, pro, dev, log, siloMetrics)
}

pro := protocol.NewRW(protoCtx, []io.Reader{con}, []io.Writer{con}, handleIncomingDevice)

// Let the protocol do its thing.
Expand All @@ -121,12 +171,26 @@ func runConnect(_ *cobra.Command, _ []string) {
protoCancelfn()
}()

if siloMetrics != nil {
siloMetrics.AddProtocol("protocol", pro)
}

dstWG.Wait() // Wait until the migrations have completed...

if connectProgress {
dstProgress.Wait()
}

if log != nil {
metrics := pro.GetMetrics()
log.Debug().
Uint64("PacketsSent", metrics.PacketsSent).
Uint64("DataSent", metrics.DataSent).
Uint64("PacketsRecv", metrics.PacketsRecv).
Uint64("DataRecv", metrics.DataRecv).
Msg("protocol metrics")
}

fmt.Printf("\nMigrations completed. Please ctrl-c if you want to shut down, or wait an hour :)\n")

// We should pause here, to allow the user to do things with the devices
Expand All @@ -139,7 +203,7 @@ func runConnect(_ *cobra.Command, _ []string) {
}

// Handle a new incoming device. This is called when a packet is received for a device we haven't heard about before.
func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32) {
func handleIncomingDeviceWithLogging(ctx context.Context, pro protocol.Protocol, dev uint32, log types.RootLogger, met *metrics.Metrics) {
var destStorage storage.Provider
var destWaitingLocal *waitingcache.Local
var destWaitingRemote *waitingcache.Remote
Expand All @@ -151,6 +215,7 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32
var bar *mpb.Bar

var blockSize uint
var deviceName string

var statusString = " "
var statusVerify = " "
Expand All @@ -174,6 +239,7 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32
}

blockSize = uint(di.BlockSize)
deviceName = di.Name

statusFn := func(_ decor.Statistics) string {
return statusString + statusVerify
Expand Down Expand Up @@ -267,6 +333,10 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32

dest = protocol.NewFromProtocol(ctx, dev, storageFactory, pro)

if met != nil {
met.AddFromProtocol(deviceName, dest)
}

var handlerWG sync.WaitGroup

handlerWG.Add(1)
Expand Down Expand Up @@ -304,6 +374,42 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32
// Check we have all data...
case packets.EventCompleted:

if log != nil {
m := destWaitingLocal.GetMetrics()
log.Debug().
Uint64("WaitForBlock", m.WaitForBlock).
Uint64("WaitForBlockHadRemote", m.WaitForBlockHadRemote).
Uint64("WaitForBlockHadLocal", m.WaitForBlockHadLocal).
Uint64("WaitForBlockTimeMS", uint64(m.WaitForBlockTime.Milliseconds())).
Uint64("WaitForBlockLock", m.WaitForBlockLock).
Uint64("WaitForBlockLockDone", m.WaitForBlockLockDone).
Uint64("MarkAvailableLocalBlock", m.MarkAvailableLocalBlock).
Uint64("MarkAvailableRemoteBlock", m.MarkAvailableRemoteBlock).
Uint64("AvailableLocal", m.AvailableLocal).
Uint64("AvailableRemote", m.AvailableRemote).
Str("name", deviceName).
Msg("waitingCacheMetrics")

fromMetrics := dest.GetMetrics()
log.Debug().
Uint64("RecvEvents", fromMetrics.RecvEvents).
Uint64("RecvHashes", fromMetrics.RecvHashes).
Uint64("RecvDevInfo", fromMetrics.RecvDevInfo).
Uint64("RecvAltSources", fromMetrics.RecvAltSources).
Uint64("RecvReadAt", fromMetrics.RecvReadAt).
Uint64("RecvWriteAtHash", fromMetrics.RecvWriteAtHash).
Uint64("RecvWriteAtComp", fromMetrics.RecvWriteAtComp).
Uint64("RecvWriteAt", fromMetrics.RecvWriteAt).
Uint64("RecvWriteAtWithMap", fromMetrics.RecvWriteAtWithMap).
Uint64("RecvRemoveFromMap", fromMetrics.RecvRemoveFromMap).
Uint64("RecvRemoveDev", fromMetrics.RecvRemoveDev).
Uint64("RecvDirtyList", fromMetrics.RecvDirtyList).
Uint64("SentNeedAt", fromMetrics.SentNeedAt).
Uint64("SentDontNeedAt", fromMetrics.SentDontNeedAt).
Str("name", deviceName).
Msg("fromProtocolMetrics")
}

// We completed the migration, but we should wait for handlers to finish before we ok things...
// fmt.Printf("Completed, now wait for handlers...\n")
go func() {
Expand Down
Loading
Loading