Skip to content

Commit

Permalink
chore:Merge fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
TrekkieCoder committed Jan 1, 2025
1 parent 190a1e9 commit 94b72ec
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 114 deletions.
53 changes: 19 additions & 34 deletions pkg/loxinet/dpbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package loxinet

import (
"fmt"
cmn "github.com/loxilb-io/loxilb/common"
tk "github.com/loxilb-io/loxilib"
"net"
"os"
"runtime/debug"
"sync"
"time"

cmn "github.com/loxilb-io/loxilb/common"
tk "github.com/loxilb-io/loxilib"
)

// man names constants
Expand All @@ -41,10 +42,6 @@ const (
MapNameFw4 = "FW4"
)

const (
UseRPCPeer = false
)

// error codes
const (
DpErrBase = iota - 103000
Expand Down Expand Up @@ -451,8 +448,6 @@ type DpHookInterface interface {
DpCtDel(w *DpCtInfo) int
DpSockVIPAdd(w *SockVIPDpWorkQ) int
DpSockVIPDel(w *SockVIPDpWorkQ) int
DpCnodeAdd(w *PeerDpWorkQ) int
DpCnodeDel(w *PeerDpWorkQ) int
DpTableGC()
DpCtGetAsync()
DpGetLock()
Expand Down Expand Up @@ -518,8 +513,6 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, arg interface{}) int {
var ret int
var err error

return 0

dp.SyncMtx.Lock()
defer dp.SyncMtx.Unlock()

Expand Down Expand Up @@ -775,35 +768,27 @@ func (dp *DpH) DpWorkOnSockVIP(vsWq *SockVIPDpWorkQ) DpRetT {
// DpWorkOnPeerOp - routine to work on a peer request for clustering
func (dp *DpH) DpWorkOnPeerOp(pWq *PeerDpWorkQ) DpRetT {
if pWq.Work == DpCreate {
if UseRPCPeer {
var newPeer DpPeer
for _, pe := range dp.Peers {
if pe.Peer.Equal(pWq.PeerIP) {
return DpCreateErr
}
var newPeer DpPeer
for _, pe := range dp.Peers {
if pe.Peer.Equal(pWq.PeerIP) {
return DpCreateErr
}
newPeer.Peer = pWq.PeerIP
dp.Peers = append(dp.Peers, newPeer)
tk.LogIt(tk.LogInfo, "Added cluster-peer %s\n", newPeer.Peer.String())
return 0
} else {
return dp.DpHooks.DpCnodeAdd(pWq)
}
newPeer.Peer = pWq.PeerIP
dp.Peers = append(dp.Peers, newPeer)
tk.LogIt(tk.LogInfo, "Added cluster-peer %s\n", newPeer.Peer.String())
return 0
} else if pWq.Work == DpRemove {
if UseRPCPeer {
for idx := range dp.Peers {
pe := &dp.Peers[idx]
if pe.Peer.Equal(pWq.PeerIP) {
if pe.Client != nil {
dp.RPC.RPCHooks.RPCClose(pe)
}
dp.Peers = append(dp.Peers[:idx], dp.Peers[idx+1:]...)
tk.LogIt(tk.LogInfo, "Deleted cluster-peer %s\n", pWq.PeerIP.String())
return 0
for idx := range dp.Peers {
pe := &dp.Peers[idx]
if pe.Peer.Equal(pWq.PeerIP) {
if pe.Client != nil {
dp.RPC.RPCHooks.RPCClose(pe)
}
dp.Peers = append(dp.Peers[:idx], dp.Peers[idx+1:]...)
tk.LogIt(tk.LogInfo, "Deleted cluster-peer %s\n", pWq.PeerIP.String())
return 0
}
} else {
return dp.DpHooks.DpCnodeDel(pWq)
}
}

Expand Down
62 changes: 7 additions & 55 deletions pkg/loxinet/dpebpf_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,30 +268,14 @@ func DpEbpfSetLogLevel(logLevel tk.LogLevelT) {
}

// DpEbpfInit - initialize the ebpf dp subsystem
func DpEbpfInit(clusterNodes string, rssEn, egrHooks, localSockPolicy, sockMapEn bool, nodeNum int, disBPF bool, logLevel tk.LogLevelT) *DpEbpfH {
func DpEbpfInit(clusterEn, rssEn, egrHooks, localSockPolicy, sockMapEn bool, nodeNum int, disBPF bool, logLevel tk.LogLevelT) *DpEbpfH {
var cfg C.struct_ebpfcfg

//cNodes := strings.Split(clusterNodes, ",")
//for i, cNode := range cNodes {
// addr := net.ParseIP(cNode)
// if addr == nil {
// continue
// }
// if utils.IsIPHostAddr(cNode) {
// continue
// }
// if i == 0 {
// cfg.cluster1 = C.CString(cNode)
// } else if i == 1 {
// cfg.cluster2 = C.CString(cNode)
// }
//}

//if len(clusterEn) > 0 {
// cfg.have_mtrace = 1
//} else {
// cfg.have_mtrace = 0
//}
if clusterEn {
cfg.have_mtrace = 1
} else {
cfg.have_mtrace = 0
}
if egrHooks {
cfg.egr_hooks = 1
} else {
Expand Down Expand Up @@ -1080,7 +1064,7 @@ func DpLBRuleMod(w *LBDpWorkQ) int {
nxfa.inactive = 1
}

dat.nxfrm = C.ushort(len(w.endPoints))
dat.nxfrm = C.uchar(len(w.endPoints))
if w.CsumDis {
dat.cdis = 1
} else {
Expand Down Expand Up @@ -1943,38 +1927,6 @@ func (e *DpEbpfH) DpSockVIPDel(w *SockVIPDpWorkQ) int {
return ec
}

// DpCnodeAdd - routine to work on adding a cnode
func (e *DpEbpfH) DpCnodeAdd(w *PeerDpWorkQ) int {
cnode := w.PeerIP.String()

cnodeStr := C.CString(cnode)
defer C.free(unsafe.Pointer(cnodeStr))

ec := int(C.llb_add_cnode(cnodeStr))
if ec != 0 {
*w.Status = DpCreateErr
} else {
*w.Status = 0
}
return ec
}

// DpCnodeDel - routine to work on deleting a cnode
func (e *DpEbpfH) DpCnodeDel(w *PeerDpWorkQ) int {
cnode := w.PeerIP.String()

cnodeStr := C.CString(cnode)
defer C.free(unsafe.Pointer(cnodeStr))

ec := int(C.llb_delete_cnode(cnodeStr))
if ec != 0 {
*w.Status = DpRemoveErr
} else {
*w.Status = 0
}
return ec
}

//export goMapNotiHandler
func goMapNotiHandler(m *mapNoti) {

Expand Down
2 changes: 1 addition & 1 deletion pkg/loxinet/loxinet.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func loxiNetInit() {
RunCommand(MkMountCG2, false)
}
// Initialize the ebpf datapath subsystem
mh.dpEbpf = DpEbpfInit(opts.Opts.ClusterNodes, mh.rssEn, mh.eHooks, mh.lSockPolicy, mh.sockMapEn, mh.self, mh.disBPF, logLevel)
mh.dpEbpf = DpEbpfInit(clusterMode, mh.rssEn, mh.eHooks, mh.lSockPolicy, mh.sockMapEn, mh.self, mh.disBPF, logLevel)
mh.dp = DpBrokerInit(mh.dpEbpf, rpcMode)

// Initialize the security zone subsystem
Expand Down
28 changes: 10 additions & 18 deletions pkg/loxinet/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ const (

// constants
const (
MaxLBEndPoints = 1500
MaxLBEndPointsRR = 32
MaxLBEndPoints = 24
DflLbaInactiveTries = 2 // Default number of inactive tries before LB arm is turned off
MaxDflLbaInactiveTries = 100 // Max number of inactive tries before LB arm is turned off
DflLbaCheckTimeout = 10 // Default timeout for checking LB arms
Expand All @@ -91,7 +90,7 @@ const (
LbMaxInactiveTimeout = 24 * 3600 // Maximum inactive timeout for established sessions
MaxEndPointCheckers = 4 // Maximum helpers to check endpoint health
EndPointCheckerDuration = 2 // Duration at which ep-helpers will run
MaxEndPointSweeps = 40 // Maximum end-point sweeps per round
MaxEndPointSweeps = 20 // Maximum end-point sweeps per round
VIPSweepDuration = 30 // Duration of periodic VIP maintenance
DefaultPersistTimeOut = 10800 // Default persistent LB session timeout
SnatFwMark = 0x80000000 // Snat Marker
Expand Down Expand Up @@ -911,7 +910,7 @@ func (R *RuleH) modNatEpHost(r *ruleEnt, endpoints []ruleLBEp, doAddOp bool, liv
pType = HostProbeConnectTCP
pPort = nep.xPort
} else if r.tuples.l4Prot.val == 17 {
//pType = HostProbeConnectUDP
pType = HostProbeConnectUDP
pType = HostProbeConnectTCP // FIXME
pPort = nep.xPort
} else if r.tuples.l4Prot.val == 1 {
Expand Down Expand Up @@ -1261,8 +1260,7 @@ func (R *RuleH) syncEPHostState2Rule(rule *ruleEnt, checkNow bool) bool {
if rule.tuples.l4Prot.val == 6 {
sType = HostProbeConnectTCP
} else if rule.tuples.l4Prot.val == 17 {
//sType = HostProbeConnectUDP
sType = HostProbeConnectTCP // FIXME
sType = HostProbeConnectUDP
} else if rule.tuples.l4Prot.val == 1 {
sType = HostProbePing
} else if rule.tuples.l4Prot.val == 132 {
Expand Down Expand Up @@ -1574,12 +1572,6 @@ func (R *RuleH) AddLbRule(serv cmn.LbServiceArg, servSecIPs []cmn.LbSecIPArg, al
return RuleEpCountErr, errors.New("endpoints-range error")
}

if (serv.Sel == cmn.LbSelRr || serv.Sel == cmn.LbSelLeastConnections ||
serv.Sel == cmn.LbSelPrio || serv.Sel == cmn.LbSelN2 || serv.Sel == cmn.LbSelN3) &&
len(servEndPoints) > MaxLBEndPointsRR {
return RuleEpCountErr, errors.New("endpoints-range1 error")
}

// Validate persist timeout
if serv.Sel == cmn.LbSelRrPersist {
if serv.PersistTimeout == 0 || serv.PersistTimeout > 24*60*60 {
Expand Down Expand Up @@ -2930,19 +2922,19 @@ func (r *ruleEnt) LB2DP(work DpWorkT) int {
if at.sel == cmn.LbSelPrio {
j := 0
k := 0
var small [MaxLBEndPointsRR]int
var neps [MaxLBEndPointsRR]ruleLBEp
var small [MaxLBEndPoints]int
var neps [MaxLBEndPoints]ruleLBEp
for i, ep := range at.endPoints {
if ep.inActiveEP {
continue
}
oEp := &at.endPoints[i]
sw := (int(ep.weight) * MaxLBEndPointsRR) / 100
sw := (int(ep.weight) * MaxLBEndPoints) / 100
if sw == 0 {
small[k] = i
k++
}
for x := 0; x < sw && j < MaxLBEndPointsRR; x++ {
for x := 0; x < sw && j < MaxLBEndPoints; x++ {
neps[j].xIP = oEp.xIP
neps[j].rIP = oEp.rIP
neps[j].xPort = oEp.xPort
Expand All @@ -2955,12 +2947,12 @@ func (r *ruleEnt) LB2DP(work DpWorkT) int {
j++
}
}
if j < MaxLBEndPointsRR {
if j < MaxLBEndPoints {
v := 0
if k == 0 {
k = len(at.endPoints)
}
for j < MaxLBEndPointsRR {
for j < MaxLBEndPoints {
idx := small[v%k]
oEp := &at.endPoints[idx]
neps[j].xIP = oEp.xIP
Expand Down
6 changes: 0 additions & 6 deletions pkg/loxinet/xsync_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
"net/rpc"
"os"
"runtime/debug"
"time"

opts "github.com/loxilb-io/loxilb/options"
tk "github.com/loxilb-io/loxilib"
"google.golang.org/grpc"
Expand Down Expand Up @@ -214,10 +212,6 @@ func LoxiXsyncMain(mode string) {
return
}

for {
time.Sleep(1 * time.Second)
}

// Stack trace logger
defer func() {
if e := recover(); e != nil {
Expand Down

0 comments on commit 94b72ec

Please sign in to comment.