From 5e932dbbb9b5aac03ec9dde73335d5011b51fbde Mon Sep 17 00:00:00 2001 From: Moses Narrow Date: Sun, 30 Jun 2024 10:53:40 -0500 Subject: [PATCH 1/7] improve dmsgweb --- cmd/dmsgweb/commands/dmsgweb.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cmd/dmsgweb/commands/dmsgweb.go b/cmd/dmsgweb/commands/dmsgweb.go index 8f74616a..d85f2a50 100644 --- a/cmd/dmsgweb/commands/dmsgweb.go +++ b/cmd/dmsgweb/commands/dmsgweb.go @@ -180,7 +180,7 @@ dmsgweb conf file detected: ` + dmsgwebconffile dmsgWebLog.Info("dmsg client pk: ", pk.String()) if len(resolveDmsgAddr) > 0 { dialPK = make([]cipher.PubKey, len(resolveDmsgAddr)) - dmsgPorts = make([]uint, dmsgSessions) + dmsgPorts = make([]uint, len(resolveDmsgAddr)) for i, dmsgaddr := range resolveDmsgAddr { dmsgWebLog.Info("dmsg address to dial: ", dmsgaddr) dmsgAddr = strings.Split(dmsgaddr, ":") @@ -201,7 +201,6 @@ dmsgweb conf file detected: ` + dmsgwebconffile } } } - dmsgWebLog.Info("test") dmsgC, closeDmsg, err := startDmsg(ctx, pk, sk) if err != nil { dmsgWebLog.WithError(err).Fatal("failed to start dmsg") @@ -272,15 +271,19 @@ dmsgweb conf file detected: ` + dmsgwebconffile if len(resolveDmsgAddr) == 0 && len(webPort) == 1 { if rawTCP[0] { + dmsgWebLog.Debug("proxyTCPConn(-1)") proxyTCPConn(-1) } else { + dmsgWebLog.Debug("proxyHTTPConn(-1)") proxyHTTPConn(-1) } } else { for i := range resolveDmsgAddr { if rawTCP[i] { + dmsgWebLog.Debug("proxyTCPConn("+fmt.Sprintf("%v",i)+")") proxyTCPConn(i) } else { + dmsgWebLog.Debug("proxyHTTPConn("+fmt.Sprintf("%v",i)+")") proxyHTTPConn(i) } } From d9b55f338b30544bd08d0ad6fa8f3b0b2b188561 Mon Sep 17 00:00:00 2001 From: Moses Narrow Date: Sun, 30 Jun 2024 12:16:23 -0500 Subject: [PATCH 2/7] make format --- cmd/dmsgweb/commands/dmsgweb.go | 79 +++++++++++++++++++-------------- 1 file changed, 45 insertions(+), 34 deletions(-) diff --git a/cmd/dmsgweb/commands/dmsgweb.go b/cmd/dmsgweb/commands/dmsgweb.go index d85f2a50..22438a26 100644 --- a/cmd/dmsgweb/commands/dmsgweb.go +++ b/cmd/dmsgweb/commands/dmsgweb.go @@ -205,7 +205,7 @@ dmsgweb conf file detected: ` + dmsgwebconffile if err != nil { dmsgWebLog.WithError(err).Fatal("failed to start dmsg") } - defer closeDmsg() + // defer closeDmsg() go func() { <-ctx.Done() @@ -280,10 +280,10 @@ dmsgweb conf file detected: ` + dmsgwebconffile } else { for i := range resolveDmsgAddr { if rawTCP[i] { - dmsgWebLog.Debug("proxyTCPConn("+fmt.Sprintf("%v",i)+")") + dmsgWebLog.Debug("proxyTCPConn(" + fmt.Sprintf("%v", i) + ")") proxyTCPConn(i) } else { - dmsgWebLog.Debug("proxyHTTPConn("+fmt.Sprintf("%v",i)+")") + dmsgWebLog.Debug("proxyHTTPConn(" + fmt.Sprintf("%v", i) + ")") proxyHTTPConn(i) } } @@ -378,45 +378,56 @@ func proxyTCPConn(n int) { if err != nil { dmsgWebLog.Fatalf("Failed to start TCP listener on port %v: %v", thiswebport, err) } - defer listener.Close() //nolint log.Printf("Serving TCP on 127.0.0.1:%v", thiswebport) - for { - conn, err := listener.Accept() - if err != nil { - log.Printf("Failed to accept connection: %v", err) - continue - } - - wg.Add(1) - go func(conn net.Conn, n int) { - defer wg.Done() - defer conn.Close() //nolint - - dmsgConn, err := dmsgC.DialStream(context.Background(), dmsg.Addr{PK: dialPK[n], Port: uint16(dmsgPorts[n])}) + go func() { + for { + conn, err := listener.Accept() if err != nil { - log.Printf("Failed to dial dmsg address %v:%v %v", dialPK[n].String(), dmsgPorts[n], err) - return + if opErr, ok := err.(*net.OpError); ok && opErr.Err.Error() == "use of closed network connection" { + log.Printf("Listener on port %v closed", thiswebport) + return + } + log.Printf("Failed to accept connection: %v", err) + continue } - defer dmsgConn.Close() //nolint + wg.Add(1) + go func(conn net.Conn, n int) { + defer wg.Done() + defer conn.Close() //nolint - go func() { - _, err := io.Copy(dmsgConn, conn) + dmsgConn, err := dmsgC.DialStream(context.Background(), dmsg.Addr{PK: dialPK[n], Port: uint16(dmsgPorts[n])}) if err != nil { - log.Printf("Error copying data to dmsg server: %v", err) + log.Printf("Failed to dial dmsg address %v:%v %v", dialPK[n].String(), dmsgPorts[n], err) + return } - dmsgConn.Close() //nolint - }() + defer dmsgConn.Close() //nolint - go func() { - _, err := io.Copy(conn, dmsgConn) - if err != nil { - log.Printf("Error copying data from dmsg server: %v", err) - } - conn.Close() //nolint - }() - }(conn, n) - } + go func() { + _, err := io.Copy(dmsgConn, conn) + if err != nil { + log.Printf("Error copying data to dmsg server: %v", err) + } + dmsgConn.Close() //nolint + }() + + go func() { + _, err := io.Copy(conn, dmsgConn) + if err != nil { + log.Printf("Error copying data from dmsg server: %v", err) + } + conn.Close() //nolint + }() + }(conn, n) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + <-context.Background().Done() + listener.Close() + }() } const envfileLinux = ` From 7c1b82c8df0986323a73d1f8d5780871305f3ada Mon Sep 17 00:00:00 2001 From: Moses Narrow Date: Sun, 30 Jun 2024 13:06:05 -0500 Subject: [PATCH 3/7] fix nil pointer error --- cmd/dmsgweb/commands/dmsgweb.go | 77 ++++++++++++++------------------- cmd/dmsgweb/commands/root.go | 1 + 2 files changed, 34 insertions(+), 44 deletions(-) diff --git a/cmd/dmsgweb/commands/dmsgweb.go b/cmd/dmsgweb/commands/dmsgweb.go index 22438a26..5029e615 100644 --- a/cmd/dmsgweb/commands/dmsgweb.go +++ b/cmd/dmsgweb/commands/dmsgweb.go @@ -201,11 +201,11 @@ dmsgweb conf file detected: ` + dmsgwebconffile } } } - dmsgC, closeDmsg, err := startDmsg(ctx, pk, sk) + dmsgC, closeDmsg, err = startDmsg(ctx, pk, sk) if err != nil { dmsgWebLog.WithError(err).Fatal("failed to start dmsg") } - // defer closeDmsg() + defer closeDmsg() go func() { <-ctx.Done() @@ -378,56 +378,45 @@ func proxyTCPConn(n int) { if err != nil { dmsgWebLog.Fatalf("Failed to start TCP listener on port %v: %v", thiswebport, err) } + defer listener.Close() //nolint log.Printf("Serving TCP on 127.0.0.1:%v", thiswebport) - go func() { - for { - conn, err := listener.Accept() + for { + conn, err := listener.Accept() + if err != nil { + log.Printf("Failed to accept connection: %v", err) + continue + } + + wg.Add(1) + go func(conn net.Conn, n int, dmsgC *dmsg.Client) { + defer wg.Done() + defer conn.Close() //nolint + + dmsgConn, err := dmsgC.DialStream(context.Background(), dmsg.Addr{PK: dialPK[n], Port: uint16(dmsgPorts[n])}) if err != nil { - if opErr, ok := err.(*net.OpError); ok && opErr.Err.Error() == "use of closed network connection" { - log.Printf("Listener on port %v closed", thiswebport) - return - } - log.Printf("Failed to accept connection: %v", err) - continue + log.Printf("Failed to dial dmsg address %v:%v %v", dialPK[n].String(), dmsgPorts[n], err) + return } - wg.Add(1) - go func(conn net.Conn, n int) { - defer wg.Done() - defer conn.Close() //nolint + defer dmsgConn.Close() //nolint - dmsgConn, err := dmsgC.DialStream(context.Background(), dmsg.Addr{PK: dialPK[n], Port: uint16(dmsgPorts[n])}) + go func() { + _, err := io.Copy(dmsgConn, conn) if err != nil { - log.Printf("Failed to dial dmsg address %v:%v %v", dialPK[n].String(), dmsgPorts[n], err) - return + log.Printf("Error copying data to dmsg client: %v", err) } - defer dmsgConn.Close() //nolint - - go func() { - _, err := io.Copy(dmsgConn, conn) - if err != nil { - log.Printf("Error copying data to dmsg server: %v", err) - } - dmsgConn.Close() //nolint - }() - - go func() { - _, err := io.Copy(conn, dmsgConn) - if err != nil { - log.Printf("Error copying data from dmsg server: %v", err) - } - conn.Close() //nolint - }() - }(conn, n) - } - }() + dmsgConn.Close() //nolint + }() - wg.Add(1) - go func() { - defer wg.Done() - <-context.Background().Done() - listener.Close() - }() + go func() { + _, err := io.Copy(conn, dmsgConn) + if err != nil { + log.Printf("Error copying data from dmsg client: %v", err) + } + conn.Close() //nolint + }() + }(conn, n, dmsgC) + } } const envfileLinux = ` diff --git a/cmd/dmsgweb/commands/root.go b/cmd/dmsgweb/commands/root.go index 091a3123..00d5e7c5 100644 --- a/cmd/dmsgweb/commands/root.go +++ b/cmd/dmsgweb/commands/root.go @@ -26,6 +26,7 @@ import ( var ( httpC http.Client dmsgC *dmsg.Client + closeDmsg func() dmsgDisc string dmsgSessions int dmsgAddr []string From 5232456ec1ec7b7d2f18bd0c5f6031d955fb5d30 Mon Sep 17 00:00:00 2001 From: Moses Narrow Date: Sun, 30 Jun 2024 13:10:43 -0500 Subject: [PATCH 4/7] add example tcp server & proxy --- examples/tcp/tcp-proxy.go | 88 +++++++++++++++++++++++++++++++++++++++ examples/tcp/tcp.go | 40 ++++++++++++++++++ 2 files changed, 128 insertions(+) create mode 100644 examples/tcp/tcp-proxy.go create mode 100644 examples/tcp/tcp.go diff --git a/examples/tcp/tcp-proxy.go b/examples/tcp/tcp-proxy.go new file mode 100644 index 00000000..96271c1a --- /dev/null +++ b/examples/tcp/tcp-proxy.go @@ -0,0 +1,88 @@ +package main + +import ( + "fmt" + "io" + "log" + "net" + "os" + "strconv" + "sync" +) + +func main() { + if len(os.Args) < 3 { + log.Fatalf("requires two arguments; usage: tcp1 ") + } + sourcePort, err := strconv.Atoi(os.Args[2]) + if err != nil { + log.Fatalf("Failed to parse tcp source port string \"%v\" to int: %v", sourcePort, err) + } + targetPort, err := strconv.Atoi(os.Args[1]) + if err != nil { + log.Fatalf("Failed to parse tcp target port string \"%v\" to int: %v", targetPort, err) + } + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", sourcePort)) + if err != nil { + log.Fatalf("Failed to start TCP listener on port %d: %v", sourcePort, err) + } + defer listener.Close() + log.Printf("TCP proxy started: Listening on port %d and forwarding to port %d", sourcePort, targetPort) + + for { + conn, err := listener.Accept() + if err != nil { + log.Printf("Failed to accept connection: %v", err) + continue + } + + go handleConnection(conn, targetPort) + } +} + +func handleConnection(conn net.Conn, targetPort int) { + defer conn.Close() + + targetAddr := fmt.Sprintf("localhost:%d", targetPort) + target, err := net.Dial("tcp", targetAddr) + if err != nil { + log.Printf("Failed to dial target server %s: %v", targetAddr, err) + return + } + defer target.Close() + + var wg sync.WaitGroup + wg.Add(2) + + // Copy from client to target + go func() { + _, err := io.Copy(target, conn) + if err != nil && !isClosedConnErr(err) { + log.Printf("Error copying from client to target: %v", err) + } + target.Close() // Close target side after copy + wg.Done() + }() + + // Copy from target to client + go func() { + _, err := io.Copy(conn, target) + if err != nil && !isClosedConnErr(err) { + log.Printf("Error copying from target to client: %v", err) + } + conn.Close() // Close client side after copy + wg.Done() + }() + + // Wait for both copies to finish + wg.Wait() +} + +// isClosedConnErr checks if the error indicates a closed connection. +func isClosedConnErr(err error) bool { + if err == io.EOF { + return true + } + netErr, ok := err.(net.Error) + return ok && netErr.Timeout() // Check for timeout error indicating closed connection +} diff --git a/examples/tcp/tcp.go b/examples/tcp/tcp.go new file mode 100644 index 00000000..03f4acb6 --- /dev/null +++ b/examples/tcp/tcp.go @@ -0,0 +1,40 @@ +package main + +import ( + "fmt" + "net" + "os" +) + +func main() { + // Start a TCP server listening on port 8000 + listener, err := net.Listen("tcp", os.Args[1]) //":8000") + if err != nil { + fmt.Printf("Failed to start server: %v\n", err) + return + } + defer listener.Close() + fmt.Println("TCP server started on port " + os.Args[1]) + + // Accept and handle incoming connections + for { + conn, err := listener.Accept() + if err != nil { + fmt.Printf("Failed to accept connection: %v\n", err) + continue + } + go handleConnection(conn) + } +} + +func handleConnection(conn net.Conn) { + defer conn.Close() + + // Send a greeting message to the client + message := "Hello, World!\n" + _, err := conn.Write([]byte(message)) + if err != nil { + fmt.Printf("Error writing response: %v\n", err) + return + } +} From 4bb2a0cbee5347361e9a15b20d01d224be508bda Mon Sep 17 00:00:00 2001 From: Moses Narrow Date: Sun, 30 Jun 2024 13:13:39 -0500 Subject: [PATCH 5/7] fix ci errors --- examples/{tcp => tcp-proxy}/tcp-proxy.go | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename examples/{tcp => tcp-proxy}/tcp-proxy.go (100%) diff --git a/examples/tcp/tcp-proxy.go b/examples/tcp-proxy/tcp-proxy.go similarity index 100% rename from examples/tcp/tcp-proxy.go rename to examples/tcp-proxy/tcp-proxy.go From 1b40ba1af4d0c6275019ada68e9ce846e4e9f8ec Mon Sep 17 00:00:00 2001 From: Moses Narrow Date: Sun, 30 Jun 2024 14:48:36 -0500 Subject: [PATCH 6/7] update examples --- examples/tcp-proxy-dmsg/tcp-proxy-dmsg.go | 219 ++++++++++++++++ .../tcp-reverse-proxy-dmsg.go | 236 ++++++++++++++++++ 2 files changed, 455 insertions(+) create mode 100644 examples/tcp-proxy-dmsg/tcp-proxy-dmsg.go create mode 100644 examples/tcp-reverse-proxy-dmsg/tcp-reverse-proxy-dmsg.go diff --git a/examples/tcp-proxy-dmsg/tcp-proxy-dmsg.go b/examples/tcp-proxy-dmsg/tcp-proxy-dmsg.go new file mode 100644 index 00000000..710569dc --- /dev/null +++ b/examples/tcp-proxy-dmsg/tcp-proxy-dmsg.go @@ -0,0 +1,219 @@ +package main + +import ( + "context" + "fmt" + "io" + "net" + "net/http" + "os" + "sync" + + "github.com/skycoin/skywire-utilities/pkg/cipher" + "github.com/skycoin/skywire-utilities/pkg/cmdutil" + "github.com/skycoin/skywire-utilities/pkg/logging" + "github.com/skycoin/skywire-utilities/pkg/skyenv" + cc "github.com/ivanpirog/coloredcobra" + "github.com/spf13/cobra" + + "github.com/skycoin/dmsg/pkg/disc" + dmsg "github.com/skycoin/dmsg/pkg/dmsg" + +) + + + +func main() { + cc.Init(&cc.Config{ + RootCmd: srvCmd, + Headings: cc.HiBlue + cc.Bold, + Commands: cc.HiBlue + cc.Bold, + CmdShortDescr: cc.HiBlue, + Example: cc.HiBlue + cc.Italic, + ExecName: cc.HiBlue + cc.Bold, + Flags: cc.HiBlue + cc.Bold, + FlagsDescr: cc.HiBlue, + NoExtraNewlines: true, + NoBottomNewline: true, + }) + srvCmd.Execute() +} + +const help = "Usage:\r\n" + + " {{.UseLine}}{{if .HasAvailableSubCommands}}{{end}} {{if gt (len .Aliases) 0}}\r\n\r\n" + + "{{.NameAndAliases}}{{end}}{{if .HasAvailableSubCommands}}\r\n\r\n" + + "Available Commands:{{range .Commands}}{{if (or .IsAvailableCommand)}}\r\n " + + "{{rpad .Name .NamePadding }} {{.Short}}{{end}}{{end}}{{end}}{{if .HasAvailableLocalFlags}}\r\n\r\n" + + "Flags:\r\n" + + "{{.LocalFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}{{if .HasAvailableInheritedFlags}}\r\n\r\n" + + "Global Flags:\r\n" + + "{{.InheritedFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}\r\n\r\n" + + + +var ( + httpC http.Client + dmsgC *dmsg.Client + closeDmsg func() + dmsgDisc string + dmsgSessions int + dmsgAddr []string + dialPK []cipher.PubKey + filterDomainSuffix string + sk cipher.SecKey + pk cipher.PubKey + dmsgWebLog *logging.Logger + logLvl string + webPort []uint + proxyPort uint + addProxy string + resolveDmsgAddr []string + wg sync.WaitGroup + isEnvs bool + dmsgPort uint + dmsgPorts []uint + dmsgSess int + wl []string + wlkeys []cipher.PubKey + localPort uint + err error + rawTCP []bool + RootCmd = srvCmd +) + + +func init() { + srvCmd.Flags().UintVarP(&localPort, "lport", "l", 8086, "local application http interface port(s)") + srvCmd.Flags().UintVarP(&dmsgPort, "dport", "d", 8086, "dmsg port(s) to serve") + srvCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "D", skyenv.DmsgDiscAddr, "dmsg discovery url") + srvCmd.Flags().IntVarP(&dmsgSess, "dsess", "e", 1, "dmsg sessions") + srvCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified\n\r") + + srvCmd.CompletionOptions.DisableDefaultCmd = true + var helpflag bool + srvCmd.SetUsageTemplate(help) + srvCmd.PersistentFlags().BoolVarP(&helpflag, "help", "h", false, "help for dmsgweb") + srvCmd.SetHelpCommand(&cobra.Command{Hidden: true}) + srvCmd.PersistentFlags().MarkHidden("help") //nolint +} +var srvCmd = &cobra.Command{ + Use: "srv", + Short: "serve raw TCP from local port over dmsg", + Long: `DMSG web server - serve http or raw TCP interface from local port over dmsg`, + Run: func(_ *cobra.Command, _ []string) { + server() + }, +} + +func server() { + log := logging.MustGetLogger("dmsgwebsrv") + + ctx, cancel := cmdutil.SignalContext(context.Background(), log) + + defer cancel() + pk, err = sk.PubKey() + if err != nil { + pk, sk = cipher.GenerateKeyPair() + } + log.Infof("dmsg client pk: %v", pk.String()) + + + dmsgC := dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, &http.Client{}, log), dmsg.DefaultConfig()) + defer func() { + if err := dmsgC.Close(); err != nil { + log.WithError(err).Error() + } + }() + + go dmsgC.Serve(context.Background()) + + select { + case <-ctx.Done(): + log.WithError(ctx.Err()).Warn() + return + + case <-dmsgC.Ready(): + } + + lis, err := dmsgC.Listen(uint16(dmsgPort)) + if err != nil { + log.Fatalf("Error listening on port %d: %v", dmsgPort, err) + } + + + go func(l net.Listener, port uint) { + <-ctx.Done() + if err := l.Close(); err != nil { + log.Printf("Error closing listener on port %d: %v", port, err) + log.WithError(err).Error() + } + }(lis, dmsgPort) + + + wg := new(sync.WaitGroup) + + wg.Add(1) + go func(localPort uint, lis net.Listener) { + defer wg.Done() + proxyTCPConnections(localPort, lis, log) + }(localPort, lis) + + wg.Wait() +} + +func proxyTCPConnections(localPort uint, lis net.Listener, log *logging.Logger) { + for { + conn, err := lis.Accept() + if err != nil { + log.Printf("Error accepting connection: %v", err) + return + } + + go handleTCPConnection(conn, localPort, log) + } +} + +func handleTCPConnection(dmsgConn net.Conn, localPort uint, log *logging.Logger) { + defer dmsgConn.Close() //nolint + + localConn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", localPort)) + if err != nil { + log.Printf("Error connecting to local port %d: %v", localPort, err) + return + } + defer localConn.Close() //nolint + + copyConn := func(dst net.Conn, src net.Conn) { + _, err := io.Copy(dst, src) + if err != nil { + log.Printf("Error during copy: %v", err) + } + } + + go copyConn(dmsgConn, localConn) + go copyConn(localConn, dmsgConn) +} + +func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *dmsg.Client, stop func(), err error) { + dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, &http.Client{}, dmsgWebLog), &dmsg.Config{MinSessions: dmsgSessions}) + go dmsgC.Serve(context.Background()) + + stop = func() { + err := dmsgC.Close() + dmsgWebLog.WithError(err).Debug("Disconnected from dmsg network.") + fmt.Printf("\n") + } + dmsgWebLog.WithField("public_key", pk.String()).WithField("dmsg_disc", dmsgDisc). + Debug("Connecting to dmsg network...") + + select { + case <-ctx.Done(): + stop() + os.Exit(0) + return nil, nil, ctx.Err() + + case <-dmsgC.Ready(): + dmsgWebLog.Debug("Dmsg network ready.") + return dmsgC, stop, nil + } +} diff --git a/examples/tcp-reverse-proxy-dmsg/tcp-reverse-proxy-dmsg.go b/examples/tcp-reverse-proxy-dmsg/tcp-reverse-proxy-dmsg.go new file mode 100644 index 00000000..e9c74a97 --- /dev/null +++ b/examples/tcp-reverse-proxy-dmsg/tcp-reverse-proxy-dmsg.go @@ -0,0 +1,236 @@ +package main + +import ( + "context" + "fmt" + "io" + "log" + "net" + "net/http" + "os" + "os/signal" + "path/filepath" + "strconv" + "strings" + "syscall" + "sync" + + "github.com/skycoin/skywire-utilities/pkg/buildinfo" + "github.com/skycoin/skywire-utilities/pkg/cipher" + "github.com/skycoin/skywire-utilities/pkg/cmdutil" + "github.com/skycoin/skywire-utilities/pkg/logging" + "github.com/skycoin/skywire-utilities/pkg/skyenv" + "github.com/spf13/cobra" + cc "github.com/ivanpirog/coloredcobra" + + dmsg "github.com/skycoin/dmsg/pkg/dmsg" + "github.com/skycoin/dmsg/pkg/disc" + + +) + + + +func main() { + cc.Init(&cc.Config{ + RootCmd: RootCmd, + Headings: cc.HiBlue + cc.Bold, + Commands: cc.HiBlue + cc.Bold, + CmdShortDescr: cc.HiBlue, + Example: cc.HiBlue + cc.Italic, + ExecName: cc.HiBlue + cc.Bold, + Flags: cc.HiBlue + cc.Bold, + FlagsDescr: cc.HiBlue, + NoExtraNewlines: true, + NoBottomNewline: true, + }) + RootCmd.Execute() +} + +const help = "Usage:\r\n" + + " {{.UseLine}}{{if .HasAvailableSubCommands}}{{end}} {{if gt (len .Aliases) 0}}\r\n\r\n" + + "{{.NameAndAliases}}{{end}}{{if .HasAvailableSubCommands}}\r\n\r\n" + + "Available Commands:{{range .Commands}}{{if (or .IsAvailableCommand)}}\r\n " + + "{{rpad .Name .NamePadding }} {{.Short}}{{end}}{{end}}{{end}}{{if .HasAvailableLocalFlags}}\r\n\r\n" + + "Flags:\r\n" + + "{{.LocalFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}{{if .HasAvailableInheritedFlags}}\r\n\r\n" + + "Global Flags:\r\n" + + "{{.InheritedFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}\r\n\r\n" + + + var ( + httpC http.Client + dmsgC *dmsg.Client + closeDmsg func() + dmsgDisc string + dmsgSessions int + dmsgAddr []string + dialPK cipher.PubKey + sk cipher.SecKey + pk cipher.PubKey + dmsgWebLog *logging.Logger + logLvl string + webPort uint + resolveDmsgAddr string + wg sync.WaitGroup + dmsgPort uint + dmsgSess int + err error + ) + + func init() { + RootCmd.Flags().UintVarP(&webPort, "port", "p", 8080, "port to serve the web application") + RootCmd.Flags().StringVarP(&resolveDmsgAddr, "resolve", "t", "", "resolve the specified dmsg address:port on the local port & disable proxy") + RootCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "d", skyenv.DmsgDiscAddr, "dmsg discovery url") + RootCmd.Flags().IntVarP(&dmsgSessions, "sess", "e", 1, "number of dmsg servers to connect to") + RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "", "[ debug | warn | error | fatal | panic | trace | info ]\033[0m") + RootCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified\n\r") + } + + // RootCmd contains the root command for dmsgweb + var RootCmd = &cobra.Command{ + Use: func() string { + return strings.Split(filepath.Base(strings.ReplaceAll(strings.ReplaceAll(fmt.Sprintf("%v", os.Args), "[", ""), "]", "")), " ")[0] + }(), + Short: "DMSG reverse tcp proxy", + Long: "DMSG reverse tcp proxy", + SilenceErrors: true, + SilenceUsage: true, + DisableSuggestions: true, + DisableFlagsInUseLine: true, + Version: buildinfo.Version(), + Run: func(cmd *cobra.Command, _ []string) { + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) //nolint + go func() { + <-c + os.Exit(1) + }() + if dmsgWebLog == nil { + dmsgWebLog = logging.MustGetLogger("dmsgweb") + } + if logLvl != "" { + if lvl, err := logging.LevelFromString(logLvl); err == nil { + logging.SetLevel(lvl) + } + } + + if dmsgDisc == "" { + dmsgDisc = skyenv.DmsgDiscAddr + } + ctx, cancel := cmdutil.SignalContext(context.Background(), dmsgWebLog) + defer cancel() + + pk, err := sk.PubKey() + if err != nil { + pk, sk = cipher.GenerateKeyPair() + } + dmsgWebLog.Info("dmsg client pk: ", pk.String()) + + dmsgWebLog.Info("dmsg address to dial: ", resolveDmsgAddr) + dmsgAddr = strings.Split(resolveDmsgAddr, ":") + var setpk cipher.PubKey + err = setpk.Set(dmsgAddr[0]) + if err != nil { + log.Fatalf("failed to parse dmsg
: : %v", err) + } + dialPK = setpk + if len(dmsgAddr) > 1 { + dport, err := strconv.ParseUint(dmsgAddr[1], 10, 64) + if err != nil { + log.Fatalf("Failed to parse dmsg port: %v", err) + } + dmsgPort = uint(dport) + } else { + dmsgPort = uint(80) + } + + dmsgC, closeDmsg, err = startDmsg(ctx, pk, sk) + if err != nil { + dmsgWebLog.WithError(err).Fatal("failed to start dmsg") + } + defer closeDmsg() + + go func() { + <-ctx.Done() + cancel() + closeDmsg() + os.Exit(0) + }() + + proxyTCPConn() + wg.Wait() + }, + } + + func proxyTCPConn() { + listener, err := net.Listen("tcp", fmt.Sprintf(":%v", webPort)) + if err != nil { + dmsgWebLog.Fatalf("Failed to start TCP listener on port %v: %v", webPort, err) + } + defer listener.Close() //nolint + log.Printf("Serving TCP on 127.0.0.1:%v", webPort) + + for { + conn, err := listener.Accept() + if err != nil { + log.Printf("Failed to accept connection: %v", err) + continue + } + + wg.Add(1) + go func(conn net.Conn) { + defer wg.Done() + defer conn.Close() //nolint + + dmsgConn, err := dmsgC.DialStream(context.Background(), dmsg.Addr{PK: dialPK, Port: uint16(dmsgPort)}) + if err != nil { + log.Printf("Failed to dial dmsg address %v:%v %v", dialPK.String(), dmsgPort, err) + return + } + defer dmsgConn.Close() //nolint + + go func() { + _, err := io.Copy(dmsgConn, conn) + if err != nil { + log.Printf("Error copying data to dmsg client: %v", err) + } + dmsgConn.Close() //nolint + }() + + go func() { + _, err := io.Copy(conn, dmsgConn) + if err != nil { + log.Printf("Error copying data from dmsg client: %v", err) + } + conn.Close() //nolint + }() + }(conn) + } + } + + + func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *dmsg.Client, stop func(), err error) { + dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, &http.Client{}, dmsgWebLog), &dmsg.Config{MinSessions: dmsgSessions}) + go dmsgC.Serve(context.Background()) + + stop = func() { + err := dmsgC.Close() + dmsgWebLog.WithError(err).Debug("Disconnected from dmsg network.") + fmt.Printf("\n") + } + dmsgWebLog.WithField("public_key", pk.String()).WithField("dmsg_disc", dmsgDisc). + Debug("Connecting to dmsg network...") + + select { + case <-ctx.Done(): + stop() + os.Exit(0) + return nil, nil, ctx.Err() + + case <-dmsgC.Ready(): + dmsgWebLog.Debug("Dmsg network ready.") + return dmsgC, stop, nil + } + } From 8ed7718e78f666ee456ecc9435c9c9fb72b6fa51 Mon Sep 17 00:00:00 2001 From: Moses Narrow Date: Tue, 2 Jul 2024 11:03:00 -0500 Subject: [PATCH 7/7] update examples --- examples/tcp-proxy-dmsg/tcp-proxy-dmsg.go | 16 +- .../tcp-reverse-proxy-dmsg.go | 310 +++++++++--------- 2 files changed, 156 insertions(+), 170 deletions(-) diff --git a/examples/tcp-proxy-dmsg/tcp-proxy-dmsg.go b/examples/tcp-proxy-dmsg/tcp-proxy-dmsg.go index 710569dc..0b04782a 100644 --- a/examples/tcp-proxy-dmsg/tcp-proxy-dmsg.go +++ b/examples/tcp-proxy-dmsg/tcp-proxy-dmsg.go @@ -9,20 +9,17 @@ import ( "os" "sync" + cc "github.com/ivanpirog/coloredcobra" "github.com/skycoin/skywire-utilities/pkg/cipher" "github.com/skycoin/skywire-utilities/pkg/cmdutil" "github.com/skycoin/skywire-utilities/pkg/logging" "github.com/skycoin/skywire-utilities/pkg/skyenv" - cc "github.com/ivanpirog/coloredcobra" "github.com/spf13/cobra" "github.com/skycoin/dmsg/pkg/disc" dmsg "github.com/skycoin/dmsg/pkg/dmsg" - ) - - func main() { cc.Init(&cc.Config{ RootCmd: srvCmd, @@ -49,8 +46,6 @@ const help = "Usage:\r\n" + "Global Flags:\r\n" + "{{.InheritedFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}\r\n\r\n" - - var ( httpC http.Client dmsgC *dmsg.Client @@ -78,10 +73,9 @@ var ( localPort uint err error rawTCP []bool - RootCmd = srvCmd + RootCmd = srvCmd ) - func init() { srvCmd.Flags().UintVarP(&localPort, "lport", "l", 8086, "local application http interface port(s)") srvCmd.Flags().UintVarP(&dmsgPort, "dport", "d", 8086, "dmsg port(s) to serve") @@ -96,10 +90,11 @@ func init() { srvCmd.SetHelpCommand(&cobra.Command{Hidden: true}) srvCmd.PersistentFlags().MarkHidden("help") //nolint } + var srvCmd = &cobra.Command{ Use: "srv", Short: "serve raw TCP from local port over dmsg", - Long: `DMSG web server - serve http or raw TCP interface from local port over dmsg`, + Long: `DMSG web server - serve http or raw TCP interface from local port over dmsg`, Run: func(_ *cobra.Command, _ []string) { server() }, @@ -117,7 +112,6 @@ func server() { } log.Infof("dmsg client pk: %v", pk.String()) - dmsgC := dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, &http.Client{}, log), dmsg.DefaultConfig()) defer func() { if err := dmsgC.Close(); err != nil { @@ -140,7 +134,6 @@ func server() { log.Fatalf("Error listening on port %d: %v", dmsgPort, err) } - go func(l net.Listener, port uint) { <-ctx.Done() if err := l.Close(); err != nil { @@ -149,7 +142,6 @@ func server() { } }(lis, dmsgPort) - wg := new(sync.WaitGroup) wg.Add(1) diff --git a/examples/tcp-reverse-proxy-dmsg/tcp-reverse-proxy-dmsg.go b/examples/tcp-reverse-proxy-dmsg/tcp-reverse-proxy-dmsg.go index e9c74a97..155829a0 100644 --- a/examples/tcp-reverse-proxy-dmsg/tcp-reverse-proxy-dmsg.go +++ b/examples/tcp-reverse-proxy-dmsg/tcp-reverse-proxy-dmsg.go @@ -12,25 +12,21 @@ import ( "path/filepath" "strconv" "strings" - "syscall" "sync" + "syscall" + cc "github.com/ivanpirog/coloredcobra" "github.com/skycoin/skywire-utilities/pkg/buildinfo" "github.com/skycoin/skywire-utilities/pkg/cipher" "github.com/skycoin/skywire-utilities/pkg/cmdutil" "github.com/skycoin/skywire-utilities/pkg/logging" "github.com/skycoin/skywire-utilities/pkg/skyenv" "github.com/spf13/cobra" - cc "github.com/ivanpirog/coloredcobra" - dmsg "github.com/skycoin/dmsg/pkg/dmsg" "github.com/skycoin/dmsg/pkg/disc" - - + dmsg "github.com/skycoin/dmsg/pkg/dmsg" ) - - func main() { cc.Init(&cc.Config{ RootCmd: RootCmd, @@ -57,180 +53,178 @@ const help = "Usage:\r\n" + "Global Flags:\r\n" + "{{.InheritedFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}\r\n\r\n" +var ( + httpC http.Client + dmsgC *dmsg.Client + closeDmsg func() + dmsgDisc string + dmsgSessions int + dmsgAddr []string + dialPK cipher.PubKey + sk cipher.SecKey + pk cipher.PubKey + dmsgWebLog *logging.Logger + logLvl string + webPort uint + resolveDmsgAddr string + wg sync.WaitGroup + dmsgPort uint + dmsgSess int + err error +) - var ( - httpC http.Client - dmsgC *dmsg.Client - closeDmsg func() - dmsgDisc string - dmsgSessions int - dmsgAddr []string - dialPK cipher.PubKey - sk cipher.SecKey - pk cipher.PubKey - dmsgWebLog *logging.Logger - logLvl string - webPort uint - resolveDmsgAddr string - wg sync.WaitGroup - dmsgPort uint - dmsgSess int - err error - ) - - func init() { - RootCmd.Flags().UintVarP(&webPort, "port", "p", 8080, "port to serve the web application") - RootCmd.Flags().StringVarP(&resolveDmsgAddr, "resolve", "t", "", "resolve the specified dmsg address:port on the local port & disable proxy") - RootCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "d", skyenv.DmsgDiscAddr, "dmsg discovery url") - RootCmd.Flags().IntVarP(&dmsgSessions, "sess", "e", 1, "number of dmsg servers to connect to") - RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "", "[ debug | warn | error | fatal | panic | trace | info ]\033[0m") - RootCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified\n\r") - } +func init() { + RootCmd.Flags().UintVarP(&webPort, "port", "p", 8080, "port to serve the web application") + RootCmd.Flags().StringVarP(&resolveDmsgAddr, "resolve", "t", "", "resolve the specified dmsg address:port on the local port & disable proxy") + RootCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "d", skyenv.DmsgDiscAddr, "dmsg discovery url") + RootCmd.Flags().IntVarP(&dmsgSessions, "sess", "e", 1, "number of dmsg servers to connect to") + RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "", "[ debug | warn | error | fatal | panic | trace | info ]\033[0m") + RootCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified\n\r") +} - // RootCmd contains the root command for dmsgweb - var RootCmd = &cobra.Command{ - Use: func() string { - return strings.Split(filepath.Base(strings.ReplaceAll(strings.ReplaceAll(fmt.Sprintf("%v", os.Args), "[", ""), "]", "")), " ")[0] - }(), - Short: "DMSG reverse tcp proxy", - Long: "DMSG reverse tcp proxy", - SilenceErrors: true, - SilenceUsage: true, - DisableSuggestions: true, - DisableFlagsInUseLine: true, - Version: buildinfo.Version(), - Run: func(cmd *cobra.Command, _ []string) { - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) //nolint - go func() { - <-c - os.Exit(1) - }() - if dmsgWebLog == nil { - dmsgWebLog = logging.MustGetLogger("dmsgweb") - } - if logLvl != "" { - if lvl, err := logging.LevelFromString(logLvl); err == nil { - logging.SetLevel(lvl) - } +// RootCmd contains the root command for dmsgweb +var RootCmd = &cobra.Command{ + Use: func() string { + return strings.Split(filepath.Base(strings.ReplaceAll(strings.ReplaceAll(fmt.Sprintf("%v", os.Args), "[", ""), "]", "")), " ")[0] + }(), + Short: "DMSG reverse tcp proxy", + Long: "DMSG reverse tcp proxy", + SilenceErrors: true, + SilenceUsage: true, + DisableSuggestions: true, + DisableFlagsInUseLine: true, + Version: buildinfo.Version(), + Run: func(cmd *cobra.Command, _ []string) { + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) //nolint + go func() { + <-c + os.Exit(1) + }() + if dmsgWebLog == nil { + dmsgWebLog = logging.MustGetLogger("dmsgweb") + } + if logLvl != "" { + if lvl, err := logging.LevelFromString(logLvl); err == nil { + logging.SetLevel(lvl) } + } - if dmsgDisc == "" { - dmsgDisc = skyenv.DmsgDiscAddr - } - ctx, cancel := cmdutil.SignalContext(context.Background(), dmsgWebLog) - defer cancel() + if dmsgDisc == "" { + dmsgDisc = skyenv.DmsgDiscAddr + } + ctx, cancel := cmdutil.SignalContext(context.Background(), dmsgWebLog) + defer cancel() - pk, err := sk.PubKey() - if err != nil { - pk, sk = cipher.GenerateKeyPair() - } - dmsgWebLog.Info("dmsg client pk: ", pk.String()) + pk, err := sk.PubKey() + if err != nil { + pk, sk = cipher.GenerateKeyPair() + } + dmsgWebLog.Info("dmsg client pk: ", pk.String()) - dmsgWebLog.Info("dmsg address to dial: ", resolveDmsgAddr) - dmsgAddr = strings.Split(resolveDmsgAddr, ":") - var setpk cipher.PubKey - err = setpk.Set(dmsgAddr[0]) + dmsgWebLog.Info("dmsg address to dial: ", resolveDmsgAddr) + dmsgAddr = strings.Split(resolveDmsgAddr, ":") + var setpk cipher.PubKey + err = setpk.Set(dmsgAddr[0]) + if err != nil { + log.Fatalf("failed to parse dmsg
: : %v", err) + } + dialPK = setpk + if len(dmsgAddr) > 1 { + dport, err := strconv.ParseUint(dmsgAddr[1], 10, 64) if err != nil { - log.Fatalf("failed to parse dmsg
: : %v", err) - } - dialPK = setpk - if len(dmsgAddr) > 1 { - dport, err := strconv.ParseUint(dmsgAddr[1], 10, 64) - if err != nil { - log.Fatalf("Failed to parse dmsg port: %v", err) - } - dmsgPort = uint(dport) - } else { - dmsgPort = uint(80) + log.Fatalf("Failed to parse dmsg port: %v", err) } + dmsgPort = uint(dport) + } else { + dmsgPort = uint(80) + } - dmsgC, closeDmsg, err = startDmsg(ctx, pk, sk) - if err != nil { - dmsgWebLog.WithError(err).Fatal("failed to start dmsg") - } - defer closeDmsg() + dmsgC, closeDmsg, err = startDmsg(ctx, pk, sk) + if err != nil { + dmsgWebLog.WithError(err).Fatal("failed to start dmsg") + } + defer closeDmsg() - go func() { - <-ctx.Done() - cancel() - closeDmsg() - os.Exit(0) - }() + go func() { + <-ctx.Done() + cancel() + closeDmsg() + os.Exit(0) + }() + + proxyTCPConn() + wg.Wait() + }, +} - proxyTCPConn() - wg.Wait() - }, +func proxyTCPConn() { + listener, err := net.Listen("tcp", fmt.Sprintf(":%v", webPort)) + if err != nil { + dmsgWebLog.Fatalf("Failed to start TCP listener on port %v: %v", webPort, err) } + defer listener.Close() //nolint + log.Printf("Serving TCP on 127.0.0.1:%v", webPort) - func proxyTCPConn() { - listener, err := net.Listen("tcp", fmt.Sprintf(":%v", webPort)) + for { + conn, err := listener.Accept() if err != nil { - dmsgWebLog.Fatalf("Failed to start TCP listener on port %v: %v", webPort, err) + log.Printf("Failed to accept connection: %v", err) + continue } - defer listener.Close() //nolint - log.Printf("Serving TCP on 127.0.0.1:%v", webPort) - for { - conn, err := listener.Accept() + wg.Add(1) + go func(conn net.Conn) { + defer wg.Done() + defer conn.Close() //nolint + + dmsgConn, err := dmsgC.DialStream(context.Background(), dmsg.Addr{PK: dialPK, Port: uint16(dmsgPort)}) if err != nil { - log.Printf("Failed to accept connection: %v", err) - continue + log.Printf("Failed to dial dmsg address %v:%v %v", dialPK.String(), dmsgPort, err) + return } + defer dmsgConn.Close() //nolint - wg.Add(1) - go func(conn net.Conn) { - defer wg.Done() - defer conn.Close() //nolint + go func() { + _, err := io.Copy(dmsgConn, conn) + if err != nil { + log.Printf("Error copying data to dmsg client: %v", err) + } + dmsgConn.Close() //nolint + }() - dmsgConn, err := dmsgC.DialStream(context.Background(), dmsg.Addr{PK: dialPK, Port: uint16(dmsgPort)}) + go func() { + _, err := io.Copy(conn, dmsgConn) if err != nil { - log.Printf("Failed to dial dmsg address %v:%v %v", dialPK.String(), dmsgPort, err) - return + log.Printf("Error copying data from dmsg client: %v", err) } - defer dmsgConn.Close() //nolint - - go func() { - _, err := io.Copy(dmsgConn, conn) - if err != nil { - log.Printf("Error copying data to dmsg client: %v", err) - } - dmsgConn.Close() //nolint - }() - - go func() { - _, err := io.Copy(conn, dmsgConn) - if err != nil { - log.Printf("Error copying data from dmsg client: %v", err) - } - conn.Close() //nolint - }() - }(conn) - } + conn.Close() //nolint + }() + }(conn) } +} +func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *dmsg.Client, stop func(), err error) { + dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, &http.Client{}, dmsgWebLog), &dmsg.Config{MinSessions: dmsgSessions}) + go dmsgC.Serve(context.Background()) - func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *dmsg.Client, stop func(), err error) { - dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, &http.Client{}, dmsgWebLog), &dmsg.Config{MinSessions: dmsgSessions}) - go dmsgC.Serve(context.Background()) - - stop = func() { - err := dmsgC.Close() - dmsgWebLog.WithError(err).Debug("Disconnected from dmsg network.") - fmt.Printf("\n") - } - dmsgWebLog.WithField("public_key", pk.String()).WithField("dmsg_disc", dmsgDisc). - Debug("Connecting to dmsg network...") - - select { - case <-ctx.Done(): - stop() - os.Exit(0) - return nil, nil, ctx.Err() - - case <-dmsgC.Ready(): - dmsgWebLog.Debug("Dmsg network ready.") - return dmsgC, stop, nil - } + stop = func() { + err := dmsgC.Close() + dmsgWebLog.WithError(err).Debug("Disconnected from dmsg network.") + fmt.Printf("\n") + } + dmsgWebLog.WithField("public_key", pk.String()).WithField("dmsg_disc", dmsgDisc). + Debug("Connecting to dmsg network...") + + select { + case <-ctx.Done(): + stop() + os.Exit(0) + return nil, nil, ctx.Err() + + case <-dmsgC.Ready(): + dmsgWebLog.Debug("Dmsg network ready.") + return dmsgC, stop, nil } +}