Skip to content

Commit

Permalink
fix: using multiplexer in network layer
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Mar 1, 2021
1 parent 4be5b31 commit 9f869eb
Show file tree
Hide file tree
Showing 15 changed files with 290 additions and 118 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ jobs:
go-version: ${{ matrix.go }}

- name: Run go test
run: make test
run: make test

- name: Run example test
run:
- cd example
-
6 changes: 2 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ type Config struct {
JoinAddress string
// DataDir holds raft data.
DataDir string
// RaftListenAddress is a network address for raft server.
// It should be noted that we will use the port of this address plus an offset of 1 as the listen address of the HTTP server.
// If set to 10.0.10.10:6790, the Raft server runs on 10.0.10.10:6790, the HTTP server runs on10.0.10.10:6791.
RaftListenAddress string
// ListenAddress is a network address for raft server and HTTP server.
ListenAddress string
// TLSConfig is used to configure a TLS server and client.
// You have to provide a peer certificate.
// We recommend using cfssl tool to create this certificates.
Expand Down
51 changes: 30 additions & 21 deletions dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package hraftdispatcher
import (
"context"
"crypto/tls"
"github.com/soheilhy/cmux"
"net"

"github.com/hashicorp/go-multierror"

Expand Down Expand Up @@ -41,26 +43,30 @@ func NewHRaftDispatcher(config *Config) (*HRaftDispatcher, error) {
return nil, errors.New("DataDir is not provided in config")
}

if len(config.RaftListenAddress) == 0 {
return nil, errors.New("RaftListenAddress is not provided in config")
if len(config.ListenAddress) == 0 {
return nil, errors.New("ListenAddress is not provided in config")
}

if config.TLSConfig == nil {
return nil, errors.New("TLSConfig is not provided in config")
if len(config.ServerID) == 0 {
config.ServerID = config.ListenAddress
}

httpListenAddress, err := http.ConvertRaftAddressToHTTPAddress(config.RaftListenAddress)
if err != nil {
return nil, err
}
logger := zap.NewExample()

if len(config.ServerID) == 0 {
config.ServerID = config.RaftListenAddress
var ln net.Listener
var err error
if config.TLSConfig == nil {
ln, err = net.Listen("tcp", config.ListenAddress)
} else {
ln, err = tls.Listen("tcp", config.ListenAddress, config.TLSConfig)
}

logger := zap.NewExample()
mux := cmux.New(ln)
httpLn := mux.Match(cmux.HTTP1Fast())
raftLn := mux.Match(cmux.Any())
go mux.Serve()

streamLayer, err := store.NewTCPStreamLayer(config.RaftListenAddress, config.TLSConfig)
streamLayer, err := store.NewTCPStreamLayer(raftLn, config.TLSConfig)
if err != nil {
logger.Error(err.Error())
return nil, err
Expand Down Expand Up @@ -112,22 +118,17 @@ func NewHRaftDispatcher(config *Config) (*HRaftDispatcher, error) {
}
}

if isNewCluster && config.JoinAddress != config.RaftListenAddress && len(config.JoinAddress) != 0 {
if isNewCluster && config.JoinAddress != config.ListenAddress && len(config.JoinAddress) != 0 {
logger.Info("start joining the current node to existing cluster")
entryAddress, err := http.ConvertRaftAddressToHTTPAddress(config.JoinAddress)
err = http.DoJoinNodeRequest(config.JoinAddress, config.ServerID, config.ListenAddress, config.TLSConfig)
if err != nil {
logger.Error("failed to convert the Raft address to HTTP address", zap.String("nodeID", config.ServerID), zap.String("nodeAddress", config.RaftListenAddress), zap.String("clusterAddress", config.JoinAddress), zap.Error(err))
return nil, err
}
err = http.DoJoinNodeRequest(entryAddress, config.ServerID, config.RaftListenAddress, config.TLSConfig)
if err != nil {
logger.Error("failed to join the current node to existing cluster", zap.String("nodeID", config.ServerID), zap.String("nodeAddress", config.RaftListenAddress), zap.String("clusterAddress", config.JoinAddress), zap.Error(err))
logger.Error("failed to join the current node to existing cluster", zap.String("nodeID", config.ServerID), zap.String("nodeAddress", config.ListenAddress), zap.String("clusterAddress", config.JoinAddress), zap.Error(err))
return nil, err
}
logger.Info("the current node has joined to existing cluster")
}

httpService, err := http.NewService(httpListenAddress, config.TLSConfig, s)
httpService, err := http.NewService(httpLn, config.TLSConfig, s)
if err != nil {
return nil, err
}
Expand All @@ -146,14 +147,22 @@ func NewHRaftDispatcher(config *Config) (*HRaftDispatcher, error) {

h.shutdownFn = func() error {
var ret error

err := s.Stop()
if err != nil {
ret = multierror.Append(ret, err)
}

err = httpService.Stop(context.Background())
if err != nil {
ret = multierror.Append(ret, err)
}

err = ln.Close()
if err != nil {
ret = multierror.Append(ret, err)
}

return ret
}

Expand Down
10 changes: 5 additions & 5 deletions dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,11 +515,11 @@ m = g(r.sub, p.sub) && r.obj == p.obj && r.act == p.act
}

dispatcher, err := NewHRaftDispatcher(&Config{
Enforcer: e,
JoinAddress: joinAddress,
RaftListenAddress: raftListenAddress,
TLSConfig: tlsConfig,
DataDir: dir,
Enforcer: e,
JoinAddress: joinAddress,
ListenAddress: raftListenAddress,
TLSConfig: tlsConfig,
DataDir: dir,
})
if err != nil {
return nil, nil, err
Expand Down
20 changes: 10 additions & 10 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ type TLS struct {
}

type Config struct {
TLS TLS `yaml:"tls"`
ServerID string `yaml:"serverID"`
DataDir string `yaml:"dataDir"`
JoinAddress string `yaml:"joinAddress"`
RaftListenAddress string `yaml:"raftListenAddress"`
TLS TLS `yaml:"tls"`
ServerID string `yaml:"serverID"`
DataDir string `yaml:"dataDir"`
JoinAddress string `yaml:"joinAddress"`
ListenAddress string `yaml:"listenAddress"`

HTTPListenAddress string `yaml:"httpListenAddress"`
}
Expand Down Expand Up @@ -106,11 +106,11 @@ m = g(r.sub, p.sub) && r.obj == p.obj && r.act == p.act

// New a Dispatcher
dispatcher, err := hraftdispatcher.NewHRaftDispatcher(&hraftdispatcher.Config{
Enforcer: e,
JoinAddress: config.JoinAddress,
RaftListenAddress: config.RaftListenAddress,
TLSConfig: tlsConfig,
DataDir: config.DataDir,
Enforcer: e,
JoinAddress: config.JoinAddress,
ListenAddress: config.ListenAddress,
TLSConfig: tlsConfig,
DataDir: config.DataDir,
})
if err != nil {
log.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion example/node-follower-conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ tls:

serverID: node-leader
dataDir: ./tmp/follow-data
raftListenAddress: 127.0.0.1:6791
listenAddress: 127.0.0.1:6791
joinAddress: 127.0.0.1:6781
2 changes: 1 addition & 1 deletion example/node-leader-conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ tls:

serverID: node-leader
dataDir: ./tmp/leader-data
raftListenAddress: 127.0.0.1:6781
listenAddress: 127.0.0.1:6781
joinAddress: ""

4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ require (
github.com/json-iterator/go v1.1.10
github.com/pkg/errors v0.8.1
github.com/smartystreets/goconvey v1.6.4
github.com/soheilhy/cmux v0.1.4
github.com/stretchr/testify v1.6.1
go.etcd.io/bbolt v1.3.5
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
google.golang.org/protobuf v1.25.0
gopkg.in/yaml.v2 v2.4.0
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand Down Expand Up @@ -135,6 +137,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwL
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 h1:qWPm9rbaAMKs8Bq/9LRpbMqxWRVUAQwMI9fVrssnTfw=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -146,6 +150,8 @@ golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
Expand Down
Loading

0 comments on commit 9f869eb

Please sign in to comment.