Skip to content

Commit

Permalink
Merge pull request #96 from sallylsy/gnmi_ma_impl
Browse files Browse the repository at this point in the history
Implementation of Master Arbitration.
  • Loading branch information
tomek-US authored Aug 17, 2023
2 parents 6e9ef05 + 9943ae1 commit 54c6c91
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 2 deletions.
90 changes: 90 additions & 0 deletions gnmi_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type Server struct {
config *Config
cMu sync.Mutex
clients map[string]*Client
// ReqFromMaster point to a function that is called to verify if the request
// comes from a master controller.
ReqFromMaster func(req *gnmipb.SetRequest, masterEID *uint128) error
masterEID uint128
}
type AuthTypes map[string]bool

Expand All @@ -57,6 +61,7 @@ type Config struct {
}

var AuthLock sync.Mutex
var maMu sync.Mutex

func (i AuthTypes) String() string {
if i["none"] {
Expand Down Expand Up @@ -137,6 +142,10 @@ func NewServer(config *Config, opts []grpc.ServerOption) (*Server, error) {
s: s,
config: config,
clients: map[string]*Client{},
// ReqFromMaster point to a function that is called to verify if
// the request comes from a master controller.
ReqFromMaster: ReqFromMasterDisabledMA,
masterEID: uint128{High: 0, Low: 0},
}
var err error
if srv.config.Port < 0 {
Expand Down Expand Up @@ -379,6 +388,11 @@ func (s *Server) Get(ctx context.Context, req *gnmipb.GetRequest) (*gnmipb.GetRe
}

func (s *Server) Set(ctx context.Context, req *gnmipb.SetRequest) (*gnmipb.SetResponse, error) {
e := s.ReqFromMaster(req, &s.masterEID)
if e != nil {
return nil, e
}

common_utils.IncCounter(common_utils.GNMI_SET)
if s.config.EnableTranslibWrite == false && s.config.EnableNativeWrite == false {
common_utils.IncCounter(common_utils.GNMI_SET_FAIL)
Expand Down Expand Up @@ -518,3 +532,79 @@ func (s *Server) Capabilities(ctx context.Context, req *gnmipb.CapabilityRequest
GNMIVersion: "0.7.0",
Extension: exts}, nil
}

type uint128 struct {
High uint64
Low uint64
}

func (lh *uint128) Compare(rh *uint128) int {
if rh == nil {
// For MA disabled case, EID supposed to be 0.
rh = &uint128{High: 0, Low: 0}
}
if lh.High > rh.High {
return 1
}
if lh.High < rh.High {
return -1
}
if lh.Low > rh.Low {
return 1
}
if lh.Low < rh.Low {
return -1
}
return 0
}

// ReqFromMasterEnabledMA returns true if the request is sent by the master
// controller.
func ReqFromMasterEnabledMA(req *gnmipb.SetRequest, masterEID *uint128) error {
// Read the election_id.
reqEID := uint128{High: 0, Low: 0}
hasMaExt := false
// It can be one of many extensions, so iterate through them to find it.
for _, e := range req.GetExtension() {
ma := e.GetMasterArbitration()
if ma == nil {
continue
}

hasMaExt = true
// The Master Arbitration descriptor has been found.
if ma.ElectionId == nil {
return status.Errorf(codes.InvalidArgument, "MA: ElectionId missing")
}

if ma.Role != nil {
// Role will be implemented later.
return status.Errorf(codes.Unimplemented, "MA: Role is not implemented")
}

reqEID = uint128{High: ma.ElectionId.High, Low: ma.ElectionId.Low}
// Use the election ID that is in the last extension, so, no 'break' here.
}

if !hasMaExt {
log.V(0).Infof("MA: No Master Arbitration in setRequest extension, masterEID %v is not updated", masterEID)
return nil
}

maMu.Lock()
defer maMu.Unlock()
switch masterEID.Compare(&reqEID) {
case 1: // This Election ID is smaller than the known Master Election ID.
return status.Errorf(codes.PermissionDenied, "Election ID is smaller than the current master. Rejected. Master EID: %v. Current EID: %v.", masterEID, reqEID)
case -1: // New Master Election ID received!
log.V(0).Infof("New master has been elected with %v\n", reqEID)
*masterEID = reqEID
}
return nil
}

// ReqFromMasterDisabledMA always returns true. It is used when Master Arbitration
// is disabled.
func ReqFromMasterDisabledMA(req *gnmipb.SetRequest, masterEID *uint128) error {
return nil
}
195 changes: 193 additions & 2 deletions gnmi_server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"path/filepath"
"flag"
"fmt"
"sync"
"sync"
"strings"
"unsafe"

Expand Down Expand Up @@ -3781,11 +3781,202 @@ func TestParseOrigin(t *testing.T) {
}
}

func TestMasterArbitration(t *testing.T) {
s := createServer(t, 8088)
// Turn on Master Arbitration
s.ReqFromMaster = ReqFromMasterEnabledMA
go runServer(t, s)
defer s.s.Stop()

tlsConfig := &tls.Config{InsecureSkipVerify: true}
opts := []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))}

//targetAddr := "30.57.185.38:8080"
targetAddr := "127.0.0.1:8088"
conn, err := grpc.Dial(targetAddr, opts...)
if err != nil {
t.Fatalf("Dialing to %q failed: %v", targetAddr, err)
}
defer conn.Close()

gClient := pb.NewGNMIClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

maExt0 := &ext_pb.Extension{
Ext: &ext_pb.Extension_MasterArbitration{
MasterArbitration: &ext_pb.MasterArbitration{
ElectionId: &ext_pb.Uint128{High: 0, Low: 0},
},
},
}
maExt1 := &ext_pb.Extension{
Ext: &ext_pb.Extension_MasterArbitration{
MasterArbitration: &ext_pb.MasterArbitration{
ElectionId: &ext_pb.Uint128{High: 0, Low: 1},
},
},
}
maExt1H0L := &ext_pb.Extension{
Ext: &ext_pb.Extension_MasterArbitration{
MasterArbitration: &ext_pb.MasterArbitration{
ElectionId: &ext_pb.Uint128{High: 1, Low: 0},
},
},
}
regExt := &ext_pb.Extension{
Ext: &ext_pb.Extension_RegisteredExt{
RegisteredExt: &ext_pb.RegisteredExtension{},
},
}

// By default ElectionID starts from 0 so this test does not change it.
t.Run("MasterArbitrationOnElectionIdZero", func(t *testing.T) {
req := &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt0},
}
_, err = gClient.Set(ctx, req)
if err != nil {
t.Fatal("Did not expected an error: " + err.Error())
}
if _, ok := status.FromError(err); !ok {
t.Fatal("Got a non-grpc error from grpc call")
}
reqEid0 := maExt0.GetMasterArbitration().GetElectionId()
expectedEID0 := uint128{High: reqEid0.GetHigh(), Low: reqEid0.GetLow()}
if s.masterEID.Compare(&expectedEID0) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID0, s.masterEID)
}
})
// After this test ElectionID is one.
t.Run("MasterArbitrationOnElectionIdZeroThenOne", func(t *testing.T) {
req := &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt0},
}
if _, err = gClient.Set(ctx, req); err != nil {
t.Fatal("Did not expected an error: " + err.Error())
}
reqEid0 := maExt0.GetMasterArbitration().GetElectionId()
expectedEID0 := uint128{High: reqEid0.GetHigh(), Low: reqEid0.GetLow()}
if s.masterEID.Compare(&expectedEID0) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID0, s.masterEID)
}
req = &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt1},
}
if _, err = gClient.Set(ctx, req); err != nil {
t.Fatal("Set gRPC failed")
}
reqEid1 := maExt1.GetMasterArbitration().GetElectionId()
expectedEID1 := uint128{High: reqEid1.GetHigh(), Low: reqEid1.GetLow()}
if s.masterEID.Compare(&expectedEID1) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID1, s.masterEID)
}
})
// Multiple ElectionIDs with the last being one.
t.Run("MasterArbitrationOnElectionIdMultipleIdsZeroThenOne", func(t *testing.T) {
req := &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt0, maExt1, regExt},
}
_, err = gClient.Set(ctx, req)
if err != nil {
t.Fatal("Did not expected an error: " + err.Error())
}
if _, ok := status.FromError(err); !ok {
t.Fatal("Got a non-grpc error from grpc call")
}
reqEid1 := maExt1.GetMasterArbitration().GetElectionId()
expectedEID1 := uint128{High: reqEid1.GetHigh(), Low: reqEid1.GetLow()}
if s.masterEID.Compare(&expectedEID1) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID1, s.masterEID)
}
})
// ElectionIDs with the high word set to 1 and low word to 0.
t.Run("MasterArbitrationOnElectionIdHighOne", func(t *testing.T) {
req := &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt1H0L},
}
_, err = gClient.Set(ctx, req)
if err != nil {
t.Fatal("Did not expected an error: " + err.Error())
}
if _, ok := status.FromError(err); !ok {
t.Fatal("Got a non-grpc error from grpc call")
}
reqEid10 := maExt1H0L.GetMasterArbitration().GetElectionId()
expectedEID10 := uint128{High: reqEid10.GetHigh(), Low: reqEid10.GetLow()}
if s.masterEID.Compare(&expectedEID10) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID10, s.masterEID)
}
})
// As the ElectionID is one, a request with ElectionID==0 will fail.
// Also a request without Election ID will fail.
t.Run("MasterArbitrationOnElectionIdZeroThenNone", func(t *testing.T) {
req := &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt0},
}
_, err = gClient.Set(ctx, req)
if err == nil {
t.Fatal("Expected a PermissionDenied error")
}
ret, ok := status.FromError(err)
if !ok {
t.Fatal("Got a non-grpc error from grpc call")
}
if ret.Code() != codes.PermissionDenied {
t.Fatalf("Expected PermissionDenied. Got %v", ret.Code())
}
reqEid10 := maExt1H0L.GetMasterArbitration().GetElectionId()
expectedEID10 := uint128{High: reqEid10.GetHigh(), Low: reqEid10.GetLow()}
if s.masterEID.Compare(&expectedEID10) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID10, s.masterEID)
}
req = &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{},
}
_, err = gClient.Set(ctx, req)
if err != nil {
t.Fatal("Expected a successful set call.")
}
if s.masterEID.Compare(&expectedEID10) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID10, s.masterEID)
}
})
}

func init() {
// Enable logs at UT setup
flag.Lookup("v").Value.Set("10")
flag.Lookup("log_dir").Value.Set("/tmp/telemetrytest")

// Inform gNMI server to use redis tcp localhost connection
sdc.UseRedisLocalTcpPort = true
}
}
5 changes: 5 additions & 0 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var (
gnmi_translib_write = flag.Bool("gnmi_translib_write", gnmi.ENABLE_TRANSLIB_WRITE, "Enable gNMI translib write for management framework")
gnmi_native_write = flag.Bool("gnmi_native_write", gnmi.ENABLE_NATIVE_WRITE, "Enable gNMI native write")
threshold = flag.Int("threshold", 100, "max number of client connections")
withMasterArbitration = flag.Bool("with-master-arbitration", false, "Enables master arbitration policy.")
idle_conn_duration = flag.Int("idle_conn_duration", 5, "Seconds before server closes idle connections")
)

Expand Down Expand Up @@ -179,6 +180,10 @@ func main() {
return
}

if *withMasterArbitration {
s.ReqFromMaster = gnmi.ReqFromMasterEnabledMA
}

log.V(1).Infof("Auth Modes: ", userAuth)
log.V(1).Infof("Starting RPC server on address: %s", s.Address())
s.Serve() // blocks until close
Expand Down

0 comments on commit 54c6c91

Please sign in to comment.