diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index f4fe7e774..f8d4b611d 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -16,8 +16,8 @@ jobs: - name: Login to Docker Hub uses: docker/login-action@v2 with: - username: ${{ secrets.DOCKERHUB_USERNAME }} - password: ${{ secrets.DOCKERHUB_TOKEN }} + username: ${{ secrets.USERNAME }} + password: ${{ secrets.TOKEN }} - uses: actions/checkout@v3 - name: deploy to docker run: | diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 68050beea..1fbedb1ae 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,6 +18,7 @@ jobs: run: make build darwin: + needs: linux runs-on: macos-latest steps: - uses: actions/setup-go@v3 @@ -34,6 +35,7 @@ jobs: run: make build windows: + needs: linux runs-on: windows-latest steps: - uses: actions/setup-go@v3 @@ -50,4 +52,4 @@ jobs: set GO111MODULE=on make test-windows - name: Build - run: make build-windows \ No newline at end of file + run: make build-windows diff --git a/cmd/dmsg-server/commands/start/root.go b/cmd/dmsg-server/commands/start/root.go index 8938cac32..b1e4815c3 100644 --- a/cmd/dmsg-server/commands/start/root.go +++ b/cmd/dmsg-server/commands/start/root.go @@ -27,11 +27,13 @@ import ( ) var ( - sf cmdutil.ServiceFlags + sf cmdutil.ServiceFlags + limitIP int ) func init() { - sf.Init(RootCmd, "dmsg_srv", "") + sf.Init(RootCmd, "dmsg_srv", dmsgserver.DefaultConfigPath) + RootCmd.Flags().IntVar(&limitIP, "limit-ip", 15, "set limitation of IPs want connect to specific dmsg-server, default value is 15") } // RootCmd contains commands for dmsg-server @@ -90,6 +92,7 @@ var RootCmd = &cobra.Command{ srvConf := dmsg.ServerConfig{ MaxSessions: conf.MaxSessions, UpdateInterval: conf.UpdateInterval, + LimitIP: limitIP, } srv := dmsg.NewServer(conf.PubKey, conf.SecKey, disc.NewHTTP(conf.Discovery, &http.Client{}, log), &srvConf, m) srv.SetLogger(log) diff --git a/internal/dmsg-discovery/api/api.go b/internal/dmsg-discovery/api/api.go index 837394779..8ac21b3fc 100644 --- a/internal/dmsg-discovery/api/api.go +++ b/internal/dmsg-discovery/api/api.go @@ -110,7 +110,7 @@ func (a *API) RunBackgroundTasks(ctx context.Context, log logrus.FieldLogger) { } // AllServers is used to get all the available servers registered to the dmsg-discovery. -func (a *API) AllServers(ctx context.Context, log logrus.FieldLogger) (entries []*disc.Entry, err error) { +func (a *API) AllServers(ctx context.Context, _ logrus.FieldLogger) (entries []*disc.Entry, err error) { entries, err = a.db.AllServers(ctx) if err != nil { return entries, err diff --git a/internal/dmsg-discovery/store/testing.go b/internal/dmsg-discovery/store/testing.go index ba8c6ba97..69c5e4c63 100644 --- a/internal/dmsg-discovery/store/testing.go +++ b/internal/dmsg-discovery/store/testing.go @@ -57,7 +57,7 @@ func NewMock() Storer { } // Entry implements Storer Entry method for MockStore -func (ms *MockStore) Entry(ctx context.Context, staticPubKey cipher.PubKey) (*disc.Entry, error) { +func (ms *MockStore) Entry(_ context.Context, staticPubKey cipher.PubKey) (*disc.Entry, error) { payload, ok := ms.entry(staticPubKey.Hex()) if !ok { return nil, disc.ErrKeyNotFound @@ -80,7 +80,7 @@ func (ms *MockStore) Entry(ctx context.Context, staticPubKey cipher.PubKey) (*di } // SetEntry implements Storer SetEntry method for MockStore -func (ms *MockStore) SetEntry(ctx context.Context, entry *disc.Entry, timeout time.Duration) error { +func (ms *MockStore) SetEntry(_ context.Context, entry *disc.Entry, _ time.Duration) error { payload, err := json.Marshal(entry) if err != nil { return disc.ErrUnexpected @@ -96,13 +96,13 @@ func (ms *MockStore) SetEntry(ctx context.Context, entry *disc.Entry, timeout ti } // DelEntry implements Storer DelEntry method for MockStore -func (ms *MockStore) DelEntry(ctx context.Context, staticPubKey cipher.PubKey) error { +func (ms *MockStore) DelEntry(_ context.Context, staticPubKey cipher.PubKey) error { ms.delEntry(staticPubKey.Hex()) return nil } // RemoveOldServerEntries implements Storer RemoveOldServerEntries method for MockStore -func (ms *MockStore) RemoveOldServerEntries(ctx context.Context) error { +func (ms *MockStore) RemoveOldServerEntries(_ context.Context) error { return nil } @@ -113,7 +113,7 @@ func (ms *MockStore) Clear() { } // AvailableServers implements Storer AvailableServers method for MockStore -func (ms *MockStore) AvailableServers(ctx context.Context, maxCount int) ([]*disc.Entry, error) { +func (ms *MockStore) AvailableServers(_ context.Context, _ int) ([]*disc.Entry, error) { entries := make([]*disc.Entry, 0) ms.serversLock.RLock() @@ -135,7 +135,7 @@ func (ms *MockStore) AvailableServers(ctx context.Context, maxCount int) ([]*dis } // AllServers implements Storer AllServers method for MockStore -func (ms *MockStore) AllServers(ctx context.Context) ([]*disc.Entry, error) { +func (ms *MockStore) AllServers(_ context.Context) ([]*disc.Entry, error) { entries := make([]*disc.Entry, 0) ms.serversLock.RLock() @@ -157,7 +157,7 @@ func (ms *MockStore) AllServers(ctx context.Context) ([]*disc.Entry, error) { } // CountEntries implements Storer CountEntries method for MockStore -func (ms *MockStore) CountEntries(ctx context.Context) (int64, int64, error) { +func (ms *MockStore) CountEntries(_ context.Context) (int64, int64, error) { var numberOfServers int64 var numberOfClients int64 ms.serversLock.RLock() @@ -198,7 +198,7 @@ func arrayFromMap(m map[string][]byte) [][]byte { } // AllEntries implements Storer CountEntries method for MockStore -func (ms *MockStore) AllEntries(ctx context.Context) ([]string, error) { +func (ms *MockStore) AllEntries(_ context.Context) ([]string, error) { entries := []string{} ms.mLock.RLock() diff --git a/pkg/dmsg/server.go b/pkg/dmsg/server.go index bc44b9a42..af40b75a2 100644 --- a/pkg/dmsg/server.go +++ b/pkg/dmsg/server.go @@ -4,6 +4,7 @@ package dmsg import ( "context" "net" + "strings" "sync" "time" @@ -19,6 +20,7 @@ import ( type ServerConfig struct { MaxSessions int UpdateInterval time.Duration + LimitIP int } // DefaultServerConfig returns the default server config. @@ -48,6 +50,10 @@ type Server struct { addrDone chan struct{} maxSessions int + + limitIP int + ipCounter map[string]int + ipCounterLocker sync.RWMutex } // NewServer creates a new dmsg server entity. @@ -73,6 +79,8 @@ func NewServer(pk cipher.PubKey, sk cipher.SecKey, dc disc.APIClient, conf *Serv s.delSessionCallback = func(ctx context.Context) error { return s.updateServerEntry(ctx, s.AdvertisedAddr(), s.maxSessions) } + s.ipCounter = make(map[string]int) + s.limitIP = conf.LimitIP return s } @@ -150,10 +158,21 @@ func (s *Server) Serve(lis net.Listener, addr string) error { WithField("remote_tcp", conn.RemoteAddr()). Debug("Max sessions is reached, but still accepting so clients who delegated us can still listen.") } - + connIP := strings.Split(conn.RemoteAddr().String(), ":")[0] + s.ipCounterLocker.Lock() + if s.ipCounter[connIP] >= s.limitIP { + log.Warnf("Maximum client per IP for %s reached.", connIP) + s.ipCounterLocker.Unlock() + continue + } + s.ipCounter[connIP]++ + s.ipCounterLocker.Unlock() s.wg.Add(1) go func(conn net.Conn) { defer func() { + s.ipCounterLocker.Lock() + s.ipCounter[connIP]-- + s.ipCounterLocker.Unlock() err := recover() if err != nil { log.Warnf("panic in handleSession: %+v", err) diff --git a/pkg/dmsg/stream_test.go b/pkg/dmsg/stream_test.go index c1081dffe..2356f4b08 100644 --- a/pkg/dmsg/stream_test.go +++ b/pkg/dmsg/stream_test.go @@ -29,6 +29,7 @@ func TestStream(t *testing.T) { srvConf := &ServerConfig{ MaxSessions: maxSessions, UpdateInterval: 0, + LimitIP: 200, } srv := NewServer(pkSrv, skSrv, dc, srvConf, nil) srv.SetLogger(logging.MustGetLogger("server")) diff --git a/pkg/dmsgget/dmsgget_test.go b/pkg/dmsgget/dmsgget_test.go index c194e0022..1f165ad59 100644 --- a/pkg/dmsgget/dmsgget_test.go +++ b/pkg/dmsgget/dmsgget_test.go @@ -110,6 +110,7 @@ func startDmsgEnv(t *testing.T, nSrvs, maxSessions int) disc.APIClient { conf := dmsg.ServerConfig{ MaxSessions: maxSessions, UpdateInterval: 0, + LimitIP: 200, } srv := dmsg.NewServer(pk, sk, dc, &conf, nil) srv.SetLogger(logging.MustGetLogger(fmt.Sprintf("server_%d", i))) diff --git a/pkg/dmsghttp/examples_test.go b/pkg/dmsghttp/examples_test.go index df9493137..010bd4855 100644 --- a/pkg/dmsghttp/examples_test.go +++ b/pkg/dmsghttp/examples_test.go @@ -33,6 +33,7 @@ func ExampleMakeHTTPTransport() { srvConf := dmsg.ServerConfig{ MaxSessions: maxSessions, UpdateInterval: 0, + LimitIP: 200, } srv := dmsg.NewServer(srvPK, srvSK, dc, &srvConf, nil) defer func() { diff --git a/pkg/dmsghttp/http_transport_test.go b/pkg/dmsghttp/http_transport_test.go index a7edcd3a6..4e4364cef 100644 --- a/pkg/dmsghttp/http_transport_test.go +++ b/pkg/dmsghttp/http_transport_test.go @@ -107,6 +107,7 @@ func startDmsgEnv(t *testing.T, nSrvs, maxSessions int) disc.APIClient { conf := dmsg.ServerConfig{ MaxSessions: maxSessions, UpdateInterval: 0, + LimitIP: 200, } srv := dmsg.NewServer(pk, sk, dc, &conf, nil) srv.SetLogger(logging.MustGetLogger(fmt.Sprintf("server_%d", i))) diff --git a/pkg/dmsgtest/env.go b/pkg/dmsgtest/env.go index 9a294ea84..97520be36 100644 --- a/pkg/dmsgtest/env.go +++ b/pkg/dmsgtest/env.go @@ -90,6 +90,7 @@ func (env *Env) newServer(ctx context.Context, updateInterval time.Duration) (*d conf := dmsg.ServerConfig{ MaxSessions: maxSessions, UpdateInterval: updateInterval, + LimitIP: 200, } srv := dmsg.NewServer(pk, sk, env.d, &conf, nil) env.s[pk] = srv