Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dmsgweb handle multiple tcp connections correctly #270

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions cmd/dmsgweb/commands/dmsgweb.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ dmsgweb conf file detected: ` + dmsgwebconffile
dmsgWebLog.Info("dmsg client pk: ", pk.String())
if len(resolveDmsgAddr) > 0 {
dialPK = make([]cipher.PubKey, len(resolveDmsgAddr))
dmsgPorts = make([]uint, dmsgSessions)
dmsgPorts = make([]uint, len(resolveDmsgAddr))
for i, dmsgaddr := range resolveDmsgAddr {
dmsgWebLog.Info("dmsg address to dial: ", dmsgaddr)
dmsgAddr = strings.Split(dmsgaddr, ":")
Expand All @@ -202,8 +202,7 @@ dmsgweb conf file detected: ` + dmsgwebconffile
}
}
}
dmsgWebLog.Info("test")
dmsgC, closeDmsg, err := startDmsg(ctx, pk, sk)
dmsgC, closeDmsg, err = startDmsg(ctx, pk, sk)
if err != nil {
dmsgWebLog.WithError(err).Fatal("failed to start dmsg")
}
Expand Down Expand Up @@ -273,15 +272,19 @@ dmsgweb conf file detected: ` + dmsgwebconffile

if len(resolveDmsgAddr) == 0 && len(webPort) == 1 {
if rawTCP[0] {
dmsgWebLog.Debug("proxyTCPConn(-1)")
proxyTCPConn(-1)
} else {
dmsgWebLog.Debug("proxyHTTPConn(-1)")
proxyHTTPConn(-1)
}
} else {
for i := range resolveDmsgAddr {
if rawTCP[i] {
dmsgWebLog.Debug("proxyTCPConn(" + fmt.Sprintf("%v", i) + ")")
proxyTCPConn(i)
} else {
dmsgWebLog.Debug("proxyHTTPConn(" + fmt.Sprintf("%v", i) + ")")
proxyHTTPConn(i)
}
}
Expand Down Expand Up @@ -387,7 +390,7 @@ func proxyTCPConn(n int) {
}

wg.Add(1)
go func(conn net.Conn, n int) {
go func(conn net.Conn, n int, dmsgC *dmsg.Client) {
defer wg.Done()
defer conn.Close() //nolint

Expand All @@ -401,19 +404,19 @@ func proxyTCPConn(n int) {
go func() {
_, err := io.Copy(dmsgConn, conn)
if err != nil {
log.Printf("Error copying data to dmsg server: %v", err)
log.Printf("Error copying data to dmsg client: %v", err)
}
dmsgConn.Close() //nolint
}()

go func() {
_, err := io.Copy(conn, dmsgConn)
if err != nil {
log.Printf("Error copying data from dmsg server: %v", err)
log.Printf("Error copying data from dmsg client: %v", err)
}
conn.Close() //nolint
}()
}(conn, n)
}(conn, n, dmsgC)
}
}

Expand Down
1 change: 1 addition & 0 deletions cmd/dmsgweb/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
var (
httpC http.Client
dmsgC *dmsg.Client
closeDmsg func()
dmsgDisc string
dmsgSessions int
dmsgAddr []string
Expand Down
211 changes: 211 additions & 0 deletions examples/tcp-proxy-dmsg/tcp-proxy-dmsg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package main

import (
"context"
"fmt"
"io"
"net"
"net/http"
"os"
"sync"

cc "github.com/ivanpirog/coloredcobra"
"github.com/skycoin/skywire-utilities/pkg/cipher"
"github.com/skycoin/skywire-utilities/pkg/cmdutil"
"github.com/skycoin/skywire-utilities/pkg/logging"
"github.com/skycoin/skywire-utilities/pkg/skyenv"
"github.com/spf13/cobra"

"github.com/skycoin/dmsg/pkg/disc"
dmsg "github.com/skycoin/dmsg/pkg/dmsg"
)

func main() {
cc.Init(&cc.Config{
RootCmd: srvCmd,
Headings: cc.HiBlue + cc.Bold,
Commands: cc.HiBlue + cc.Bold,
CmdShortDescr: cc.HiBlue,
Example: cc.HiBlue + cc.Italic,
ExecName: cc.HiBlue + cc.Bold,
Flags: cc.HiBlue + cc.Bold,
FlagsDescr: cc.HiBlue,
NoExtraNewlines: true,
NoBottomNewline: true,
})
srvCmd.Execute()
}

const help = "Usage:\r\n" +
" {{.UseLine}}{{if .HasAvailableSubCommands}}{{end}} {{if gt (len .Aliases) 0}}\r\n\r\n" +
"{{.NameAndAliases}}{{end}}{{if .HasAvailableSubCommands}}\r\n\r\n" +
"Available Commands:{{range .Commands}}{{if (or .IsAvailableCommand)}}\r\n " +
"{{rpad .Name .NamePadding }} {{.Short}}{{end}}{{end}}{{end}}{{if .HasAvailableLocalFlags}}\r\n\r\n" +
"Flags:\r\n" +
"{{.LocalFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}{{if .HasAvailableInheritedFlags}}\r\n\r\n" +
"Global Flags:\r\n" +
"{{.InheritedFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}\r\n\r\n"

var (
httpC http.Client
dmsgC *dmsg.Client
closeDmsg func()
dmsgDisc string
dmsgSessions int
dmsgAddr []string
dialPK []cipher.PubKey
filterDomainSuffix string
sk cipher.SecKey
pk cipher.PubKey
dmsgWebLog *logging.Logger
logLvl string
webPort []uint
proxyPort uint
addProxy string
resolveDmsgAddr []string
wg sync.WaitGroup
isEnvs bool
dmsgPort uint
dmsgPorts []uint
dmsgSess int
wl []string
wlkeys []cipher.PubKey
localPort uint
err error
rawTCP []bool
RootCmd = srvCmd
)

func init() {
srvCmd.Flags().UintVarP(&localPort, "lport", "l", 8086, "local application http interface port(s)")
srvCmd.Flags().UintVarP(&dmsgPort, "dport", "d", 8086, "dmsg port(s) to serve")
srvCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "D", skyenv.DmsgDiscAddr, "dmsg discovery url")
srvCmd.Flags().IntVarP(&dmsgSess, "dsess", "e", 1, "dmsg sessions")
srvCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified\n\r")

srvCmd.CompletionOptions.DisableDefaultCmd = true
var helpflag bool
srvCmd.SetUsageTemplate(help)
srvCmd.PersistentFlags().BoolVarP(&helpflag, "help", "h", false, "help for dmsgweb")
srvCmd.SetHelpCommand(&cobra.Command{Hidden: true})
srvCmd.PersistentFlags().MarkHidden("help") //nolint
}

var srvCmd = &cobra.Command{
Use: "srv",
Short: "serve raw TCP from local port over dmsg",
Long: `DMSG web server - serve http or raw TCP interface from local port over dmsg`,
Run: func(_ *cobra.Command, _ []string) {
server()
},
}

func server() {
log := logging.MustGetLogger("dmsgwebsrv")

ctx, cancel := cmdutil.SignalContext(context.Background(), log)

defer cancel()
pk, err = sk.PubKey()
if err != nil {
pk, sk = cipher.GenerateKeyPair()
}
log.Infof("dmsg client pk: %v", pk.String())

dmsgC := dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, &http.Client{}, log), dmsg.DefaultConfig())
defer func() {
if err := dmsgC.Close(); err != nil {
log.WithError(err).Error()
}
}()

go dmsgC.Serve(context.Background())

select {
case <-ctx.Done():
log.WithError(ctx.Err()).Warn()
return

case <-dmsgC.Ready():
}

lis, err := dmsgC.Listen(uint16(dmsgPort))
if err != nil {
log.Fatalf("Error listening on port %d: %v", dmsgPort, err)
}

go func(l net.Listener, port uint) {
<-ctx.Done()
if err := l.Close(); err != nil {
log.Printf("Error closing listener on port %d: %v", port, err)
log.WithError(err).Error()
}
}(lis, dmsgPort)

wg := new(sync.WaitGroup)

wg.Add(1)
go func(localPort uint, lis net.Listener) {
defer wg.Done()
proxyTCPConnections(localPort, lis, log)
}(localPort, lis)

wg.Wait()
}

func proxyTCPConnections(localPort uint, lis net.Listener, log *logging.Logger) {
for {
conn, err := lis.Accept()
if err != nil {
log.Printf("Error accepting connection: %v", err)
return
}

go handleTCPConnection(conn, localPort, log)
}
}

func handleTCPConnection(dmsgConn net.Conn, localPort uint, log *logging.Logger) {
defer dmsgConn.Close() //nolint

localConn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", localPort))
if err != nil {
log.Printf("Error connecting to local port %d: %v", localPort, err)
return
}
defer localConn.Close() //nolint

copyConn := func(dst net.Conn, src net.Conn) {
_, err := io.Copy(dst, src)
if err != nil {
log.Printf("Error during copy: %v", err)
}
}

go copyConn(dmsgConn, localConn)
go copyConn(localConn, dmsgConn)
}

func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *dmsg.Client, stop func(), err error) {
dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, &http.Client{}, dmsgWebLog), &dmsg.Config{MinSessions: dmsgSessions})
go dmsgC.Serve(context.Background())

stop = func() {
err := dmsgC.Close()
dmsgWebLog.WithError(err).Debug("Disconnected from dmsg network.")
fmt.Printf("\n")
}
dmsgWebLog.WithField("public_key", pk.String()).WithField("dmsg_disc", dmsgDisc).
Debug("Connecting to dmsg network...")

select {
case <-ctx.Done():
stop()
os.Exit(0)
return nil, nil, ctx.Err()

case <-dmsgC.Ready():
dmsgWebLog.Debug("Dmsg network ready.")
return dmsgC, stop, nil
}
}
88 changes: 88 additions & 0 deletions examples/tcp-proxy/tcp-proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"fmt"
"io"
"log"
"net"
"os"
"strconv"
"sync"
)

func main() {
if len(os.Args) < 3 {
log.Fatalf("requires two arguments; usage: tcp1 <target-port> <source-port>")
}
sourcePort, err := strconv.Atoi(os.Args[2])
if err != nil {
log.Fatalf("Failed to parse tcp source port string \"%v\" to int: %v", sourcePort, err)
}
targetPort, err := strconv.Atoi(os.Args[1])
if err != nil {
log.Fatalf("Failed to parse tcp target port string \"%v\" to int: %v", targetPort, err)
}
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", sourcePort))
if err != nil {
log.Fatalf("Failed to start TCP listener on port %d: %v", sourcePort, err)
}
defer listener.Close()
log.Printf("TCP proxy started: Listening on port %d and forwarding to port %d", sourcePort, targetPort)

for {
conn, err := listener.Accept()
if err != nil {
log.Printf("Failed to accept connection: %v", err)
continue
}

go handleConnection(conn, targetPort)
}
}

func handleConnection(conn net.Conn, targetPort int) {
defer conn.Close()

targetAddr := fmt.Sprintf("localhost:%d", targetPort)
target, err := net.Dial("tcp", targetAddr)
if err != nil {
log.Printf("Failed to dial target server %s: %v", targetAddr, err)
return
}
defer target.Close()

var wg sync.WaitGroup
wg.Add(2)

// Copy from client to target
go func() {
_, err := io.Copy(target, conn)
if err != nil && !isClosedConnErr(err) {
log.Printf("Error copying from client to target: %v", err)
}
target.Close() // Close target side after copy
wg.Done()
}()

// Copy from target to client
go func() {
_, err := io.Copy(conn, target)
if err != nil && !isClosedConnErr(err) {
log.Printf("Error copying from target to client: %v", err)
}
conn.Close() // Close client side after copy
wg.Done()
}()

// Wait for both copies to finish
wg.Wait()
}

// isClosedConnErr checks if the error indicates a closed connection.
func isClosedConnErr(err error) bool {
if err == io.EOF {
return true
}
netErr, ok := err.(net.Error)
return ok && netErr.Timeout() // Check for timeout error indicating closed connection
}
Loading
Loading