diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..a9ab2fbe --- /dev/null +++ b/Makefile @@ -0,0 +1,35 @@ +# SPDX-FileCopyrightText: 2023 The Pion community +# SPDX-License-Identifier: MIT + +GO=go +CLANG_FORMAT=clang-format + +default: build + +build: generate + +fetch-libbpf-headers: + @if ! find internal/offload/xdp/headers/bpf_* >/dev/null 2>&1; then\ + cd internal/offload/xdp/headers && \ + ./fetch-libbpf-headers.sh;\ + fi + +generate: fetch-libbpf-headers + cd internal/offload/xdp/ && \ + $(GO) generate + +format-offload: + $(CLANG_FORMAT) -i --style=file internal/offload/xdp/xdp.c + +clean-offload: + rm -vf internal/offload/xdp/bpf_bpfe*.o + rm -vf internal/offload/xdp/bpf_bpfe*.go + +purge-offload: clean-offload + rm -vf internal/offload/xdp/headers/bpf_* + +test: + go test -v + +bench: build + go test -bench=. diff --git a/errors.go b/errors.go index 3ebd26ae..1ce848b2 100644 --- a/errors.go +++ b/errors.go @@ -29,4 +29,5 @@ var ( errFailedToDecodeSTUN = errors.New("failed to decode STUN message") errUnexpectedSTUNRequestMessage = errors.New("unexpected STUN request message") errRelayAddressGeneratorNil = errors.New("RelayAddressGenerator is nil") + errUnsupportedOffloadMechanism = errors.New("unsupported offload mechanism") ) diff --git a/examples/turn-server/xdp/Dockerfile b/examples/turn-server/xdp/Dockerfile new file mode 100644 index 00000000..bb05706f --- /dev/null +++ b/examples/turn-server/xdp/Dockerfile @@ -0,0 +1,79 @@ +# SPDX-FileCopyrightText: 2023 The Pion community +# SPDX-License-Identifier: MIT + +##### builder +FROM golang:alpine as builder + +ARG VERSION=master + +RUN apk update && \ + apk upgrade && \ + apk add --no-cache \ + clang \ + llvm \ + linux-headers \ + bsd-compat-headers \ + musl-dev \ + make \ + git \ + bash \ + curl \ + tar + + +WORKDIR /build +# Clone Source using GIT +#RUN git clone --branch=$VERSION --depth=1 https://github.com/pion/turn.git turn && rm -rf turn/.git +RUN git clone --branch=server-ebpf-offload --depth=1 https://github.com/l7mp/turn.git turn && rm -rf turn/.git + +WORKDIR /build/turn + +#RUN rm internal/offload/xdp/*.o +RUN make + +WORKDIR /build/turn/examples/turn-server/xdp + +# Download all the dependencies +# RUN go get -d -v ./... + + + +# Build static binary +RUN CGO_ENABLED=0 go build -trimpath -ldflags="-w -s" -o turn-server main.go + +##### main +FROM alpine + +ARG BUILD_DATE +ARG VCS_REF +ARG VERSION=master + +LABEL org.label-schema.build-date="${BUILD_DATE}" \ + org.label-schema.name="pion-turn" \ + org.label-schema.description="A toolkit for building TURN clients and servers in Go" \ + org.label-schema.usage="https://github.com/pion/turn#readme" \ + org.label-schema.vcs-ref="${VCS_REF}" \ + org.label-schema.vcs-url="https://github.com/pion/turn" \ + org.label-schema.vendor="Sean-Der" \ + org.label-schema.version="${VERSION}" \ + maintainer="https://github.com/pion" + +ENV REALM localhost +ENV USERS username=password +ENV UDP_PORT 3478 +ENV PUBLIC_IP 127.0.0.1 + +EXPOSE 3478 +#EXPOSE 49152:65535/tcp +#EXPOSE 49152:65535/udp + +USER nobody + +# Copy the executable +COPY --from=builder /build/turn/examples/turn-server/xdp/turn-server /usr/bin/ + +# Run the executable +CMD turn-server -public-ip $PUBLIC_IP -users $USERS -realm $REALM -port $UDP_PORT + +# docker build -t pion-turn -f Dockerfile . +# docker run --rm --cap-add=NET_ADMIN --cap-add=SYS_ADMIN --cap-add=BPF --privileged -e REALM="localhost" -e USERS="username=password" -e UDP_PORT="3478" -e PUBLIC_IP="127.0.0.1" -p 3478:3478 pion-turn diff --git a/examples/turn-server/xdp/docker-compose.yml b/examples/turn-server/xdp/docker-compose.yml new file mode 100644 index 00000000..cc92b79e --- /dev/null +++ b/examples/turn-server/xdp/docker-compose.yml @@ -0,0 +1,28 @@ +# SPDX-FileCopyrightText: 2023 The Pion community +# SPDX-License-Identifier: MIT + +version: "3.1" + +services: + pion-turn: + container_name: "pion-turn" + image: pion-turn:${VERSION:-latest} + build: + context: ./ + stdin_open: true + environment: + - VERSION=${PION_TURN_VERSION:-master} + - REALM=${PION_TURN_REALM:-localhost} + - USERS=${PION_TURN_USERS:-username=password} + - PUBLIC_IP=${PION_TURN_PUBLIC_IP:-127.0.0.1} + - UDP_PORT=${PION_TURN_UDP_PORT:-3478} + network_mode: host + ports: + # STUN + - "${PION_TURN_UDP_PORT:-3478}:${PION_TURN_UDP_PORT:-3478}" + # TURN + - "49152-65535:49152-65535" + cap_add: + - NET_ADMIN + - SYS_ADMIN + - NET_RAW diff --git a/examples/turn-server/xdp/main.go b/examples/turn-server/xdp/main.go new file mode 100644 index 00000000..dd7dd03c --- /dev/null +++ b/examples/turn-server/xdp/main.go @@ -0,0 +1,91 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +// Package main implements a simple TURN server with XDP offload +package main + +import ( + "flag" + "log" + "net" + "os" + "os/signal" + "regexp" + "strconv" + "syscall" + + "github.com/pion/logging" + "github.com/pion/turn/v3" +) + +func main() { + publicIP := flag.String("public-ip", "", "IP Address that TURN can be contacted by.") + port := flag.Int("port", 3478, "Listening port.") + users := flag.String("users", "", "List of username and password (e.g. \"user=pass,user=pass\")") + realm := flag.String("realm", "pion.ly", "Realm (defaults to \"pion.ly\")") + flag.Parse() + + if len(*publicIP) == 0 { + log.Fatalf("'public-ip' is required") + } else if len(*users) == 0 { + log.Fatalf("'users' is required") + } + + // Create a UDP listener to pass into pion/turn + // pion/turn itself doesn't allocate any UDP sockets, but lets the user pass them in + // this allows us to add logging, storage or modify inbound/outbound traffic + udpListener, err := net.ListenPacket("udp4", "0.0.0.0:"+strconv.Itoa(*port)) + if err != nil { + log.Panicf("Failed to create TURN server listener: %s", err) + } + + // Cache -users flag for easy lookup later + // If passwords are stored they should be saved to your DB hashed using turn.GenerateAuthKey + usersMap := map[string][]byte{} + for _, kv := range regexp.MustCompile(`(\w+)=(\w+)`).FindAllStringSubmatch(*users, -1) { + usersMap[kv[1]] = turn.GenerateAuthKey(kv[1], *realm, kv[2]) + } + + // Init the XDP offload engine + loggerFactory := logging.NewDefaultLoggerFactory() + err = turn.InitOffload(turn.OffloadConfig{Log: loggerFactory.NewLogger("offload")}) + if err != nil { + log.Fatalf("Failed to init offload engine: %s", err) + } + defer turn.ShutdownOffload() + + s, err := turn.NewServer(turn.ServerConfig{ + Realm: *realm, + // Set AuthHandler callback + // This is called every time a user tries to authenticate with the TURN server + // Return the key for that user, or false when no user is found + AuthHandler: func(username string, realm string, srcAddr net.Addr) ([]byte, bool) { + if key, ok := usersMap[username]; ok { + return key, true + } + return nil, false + }, + // PacketConnConfigs is a list of UDP Listeners and the configuration around them + PacketConnConfigs: []turn.PacketConnConfig{ + { + PacketConn: udpListener, + RelayAddressGenerator: &turn.RelayAddressGeneratorStatic{ + RelayAddress: net.ParseIP(*publicIP), // Claim that we are listening on IP passed by user (This should be your Public IP) + Address: "0.0.0.0", // But actually be listening on every interface + }, + }, + }, + }) + if err != nil { + log.Panic(err) + } + + // Block until user sends SIGINT or SIGTERM + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + <-sigs + + if err = s.Close(); err != nil { + log.Panic(err) + } +} diff --git a/go.mod b/go.mod index 43f967a7..204c776d 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/pion/turn/v3 -go 1.19 +go 1.21.0 require ( github.com/pion/logging v0.2.2 @@ -17,5 +17,9 @@ require ( github.com/pion/transport/v2 v2.2.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/crypto v0.21.0 // indirect + golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +// ebpf/xdp offload +require github.com/cilium/ebpf v0.15.0 diff --git a/go.sum b/go.sum index bb9dcc87..c5b16652 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,16 @@ +github.com/cilium/ebpf v0.15.0 h1:7NxJhNiBT3NG8pZJ3c+yfrVdHY8ScgKD27sScgjLMMk= +github.com/cilium/ebpf v0.15.0/go.mod h1:DHp1WyrLeiBh19Cf/tfiSMhqheEiK8fXFZ4No0P1Hso= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI= +github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pion/dtls/v2 v2.2.7 h1:cSUBsETxepsCSFSxC3mc/aDo14qQLMSL+O6IjG28yV8= github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= @@ -16,6 +26,8 @@ github.com/pion/transport/v3 v3.0.2 h1:r+40RJR25S9w3jbA6/5uEPTzcdn7ncyU44RWCbHkL github.com/pion/transport/v3 v3.0.2/go.mod h1:nIToODoOlb5If2jF9y2Igfx3PFYWfuXi37m0IlWa/D0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -34,6 +46,8 @@ golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98y golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 h1:Jvc7gsqn21cJHCmAWx0LiimpP18LZmUxkT5Mp7EZ1mI= +golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= diff --git a/internal/allocation/allocation.go b/internal/allocation/allocation.go index 9e542fb3..5d22e5f6 100644 --- a/internal/allocation/allocation.go +++ b/internal/allocation/allocation.go @@ -13,6 +13,7 @@ import ( "github.com/pion/logging" "github.com/pion/stun/v2" "github.com/pion/turn/v3/internal/ipnet" + "github.com/pion/turn/v3/internal/offload" "github.com/pion/turn/v3/internal/proto" ) @@ -111,6 +112,21 @@ func (a *Allocation) AddChannelBind(c *ChannelBind, lifetime time.Duration) erro a.channelBindings = append(a.channelBindings, c) c.start(lifetime) + // enable offload + // currently we support offload for UDP connections only + peer := offload.Connection{ + RemoteAddr: c.Peer, + LocalAddr: a.RelayAddr, + Protocol: proto.ProtoUDP, + } + client := offload.Connection{ + RemoteAddr: a.fiveTuple.SrcAddr, + LocalAddr: a.fiveTuple.DstAddr, + Protocol: proto.ProtoUDP, + ChannelID: uint32(c.Number), + } + _ = offload.Engine.Upsert(client, peer) + // Channel binds also refresh permissions. a.AddPermission(NewPermission(c.Peer, a.log)) } else { @@ -128,14 +144,33 @@ func (a *Allocation) RemoveChannelBind(number proto.ChannelNumber) bool { a.channelBindingsLock.Lock() defer a.channelBindingsLock.Unlock() + var cAddr net.Addr + ret := false + for i := len(a.channelBindings) - 1; i >= 0; i-- { if a.channelBindings[i].Number == number { + cAddr = a.channelBindings[i].Peer a.channelBindings = append(a.channelBindings[:i], a.channelBindings[i+1:]...) - return true + ret = true + break } } - return false + // disable offload + peer := offload.Connection{ + RemoteAddr: cAddr, + LocalAddr: a.RelayAddr, + Protocol: proto.ProtoUDP, + ChannelID: uint32(number), + } + client := offload.Connection{ + RemoteAddr: a.RelayAddr, + LocalAddr: cAddr, + Protocol: proto.ProtoUDP, + } + _ = offload.Engine.Remove(client, peer) + + return ret } // GetChannelByNumber gets the ChannelBind from this allocation by id diff --git a/internal/allocation/allocation_test.go b/internal/allocation/allocation_test.go index 70c383a2..97005ff2 100644 --- a/internal/allocation/allocation_test.go +++ b/internal/allocation/allocation_test.go @@ -19,6 +19,12 @@ import ( "github.com/stretchr/testify/assert" ) +//nolint:gochecknoglobals +var testFiveTuple = FiveTuple{ + SrcAddr: &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 2000}, + DstAddr: &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4000}, +} + func TestAllocation(t *testing.T) { tt := []struct { name string @@ -46,7 +52,7 @@ func TestAllocation(t *testing.T) { } func subTestGetPermission(t *testing.T) { - a := NewAllocation(nil, nil, nil) + a := NewAllocation(nil, &testFiveTuple, nil) addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:3478") if err != nil { @@ -88,7 +94,7 @@ func subTestGetPermission(t *testing.T) { } func subTestAddPermission(t *testing.T) { - a := NewAllocation(nil, nil, nil) + a := NewAllocation(nil, &testFiveTuple, nil) addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:3478") if err != nil { @@ -107,7 +113,7 @@ func subTestAddPermission(t *testing.T) { } func subTestRemovePermission(t *testing.T) { - a := NewAllocation(nil, nil, nil) + a := NewAllocation(nil, &testFiveTuple, nil) addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:3478") if err != nil { @@ -130,7 +136,7 @@ func subTestRemovePermission(t *testing.T) { } func subTestAddChannelBind(t *testing.T) { - a := NewAllocation(nil, nil, nil) + a := NewAllocation(nil, &testFiveTuple, nil) addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:3478") if err != nil { @@ -154,7 +160,7 @@ func subTestAddChannelBind(t *testing.T) { } func subTestGetChannelByNumber(t *testing.T) { - a := NewAllocation(nil, nil, nil) + a := NewAllocation(nil, &testFiveTuple, nil) addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:3478") if err != nil { @@ -173,7 +179,7 @@ func subTestGetChannelByNumber(t *testing.T) { } func subTestGetChannelByAddr(t *testing.T) { - a := NewAllocation(nil, nil, nil) + a := NewAllocation(nil, &testFiveTuple, nil) addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:3478") if err != nil { @@ -193,7 +199,7 @@ func subTestGetChannelByAddr(t *testing.T) { } func subTestRemoveChannelBind(t *testing.T) { - a := NewAllocation(nil, nil, nil) + a := NewAllocation(nil, &testFiveTuple, nil) addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:3478") if err != nil { @@ -214,7 +220,7 @@ func subTestRemoveChannelBind(t *testing.T) { } func subTestAllocationRefresh(t *testing.T) { - a := NewAllocation(nil, nil, nil) + a := NewAllocation(nil, &testFiveTuple, nil) var wg sync.WaitGroup wg.Add(1) @@ -236,7 +242,7 @@ func subTestAllocationClose(t *testing.T) { panic(err) } - a := NewAllocation(nil, nil, nil) + a := NewAllocation(nil, &testFiveTuple, nil) a.RelaySocket = l // Add mock lifetimeTimer a.lifetimeTimer = time.AfterFunc(proto.DefaultLifetime, func() {}) @@ -357,7 +363,7 @@ func subTestPacketHandler(t *testing.T) { } func subTestResponseCache(t *testing.T) { - a := NewAllocation(nil, nil, nil) + a := NewAllocation(nil, &testFiveTuple, nil) transactionID := [stun.TransactionIDSize]byte{1, 2, 3} responseAttrs := []stun.Setter{ &proto.Lifetime{ diff --git a/internal/allocation/channel_bind_test.go b/internal/allocation/channel_bind_test.go index 342e7612..64ce927f 100644 --- a/internal/allocation/channel_bind_test.go +++ b/internal/allocation/channel_bind_test.go @@ -42,7 +42,7 @@ func TestChannelBindReset(t *testing.T) { } func newChannelBind(lifetime time.Duration) *ChannelBind { - a := NewAllocation(nil, nil, nil) + a := NewAllocation(nil, &testFiveTuple, nil) addr, _ := net.ResolveUDPAddr("udp", "0.0.0.0:0") c := &ChannelBind{ diff --git a/internal/offload/errors.go b/internal/offload/errors.go new file mode 100644 index 00000000..9a94ac0f --- /dev/null +++ b/internal/offload/errors.go @@ -0,0 +1,14 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package offload + +import "errors" + +//nolint:revive +var ( + ErrUnsupportedProtocol = errors.New("offload: protocol not supported") + ErrConnectionNotFound = errors.New("offload: connection not found") + ErrXDPAlreadyInitialized = errors.New("offload: XDP engine is already initialized") + ErrXDPLocalRedirectProhibited = errors.New("offload: XDP local redirect not allowed") +) diff --git a/internal/offload/null.go b/internal/offload/null.go new file mode 100644 index 00000000..985b526b --- /dev/null +++ b/internal/offload/null.go @@ -0,0 +1,64 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package offload + +import ( + "github.com/pion/logging" +) + +// NullEngine is a null offload engine +type NullEngine struct { + conntrack map[Connection]Connection + log logging.LeveledLogger +} + +// NewNullEngine creates an uninitialized null offload engine +func NewNullEngine(log logging.LeveledLogger) (*NullEngine, error) { + c := make(map[Connection]Connection) + return &NullEngine{conntrack: c, log: log}, nil +} + +// Init initializes the Null engine +func (o *NullEngine) Init() error { + o.log.Info("(NullOffload) Init done") + return nil +} + +// Shutdown stops the null offloading engine +func (o *NullEngine) Shutdown() { + if o.log == nil { + return + } + o.log.Info("(NullOffload) Shutdown done") +} + +// Upsert imitates an offload creation between a client and a peer +func (o *NullEngine) Upsert(client, peer Connection) error { + o.log.Debugf("Would create offload between client: %+v and peer: %+v", client, peer) + o.conntrack[client] = peer + return nil +} + +// Remove imitates offload deletion between a client and a peer +func (o *NullEngine) Remove(client, peer Connection) error { + o.log.Debugf("Would remove offload between client: %+v and peer: %+v", client, peer) + + if _, ok := o.conntrack[client]; !ok { + return ErrConnectionNotFound + } + delete(o.conntrack, client) + + return nil +} + +// List returns the internal conntrack map, which keeps track of all +// the connections through the proxy +func (o *NullEngine) List() (map[Connection]Connection, error) { + r := make(map[Connection]Connection) + for k, v := range o.conntrack { + r[k] = v + } + + return r, nil +} diff --git a/internal/offload/null_test.go b/internal/offload/null_test.go new file mode 100644 index 00000000..f77a7b43 --- /dev/null +++ b/internal/offload/null_test.go @@ -0,0 +1,92 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package offload + +import ( + "net" + "testing" + + "github.com/pion/logging" + "github.com/pion/turn/v3/internal/proto" + "github.com/stretchr/testify/assert" +) + +// TestNullOffload executes Null offload unit tests +func TestNullOffload(t *testing.T) { + loggerFactory := logging.NewDefaultLoggerFactory() + logger := loggerFactory.NewLogger("null-test") + + nullEngine, err := NewNullEngine(logger) + assert.NoError(t, err, "cannot instantiate Null offload engine") + defer nullEngine.Shutdown() + + err = nullEngine.Init() + assert.NoError(t, err, "cannot init Null offload engine") + + clientAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 2000} + turnListenAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 3478} + turnRelayAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4000} + peerAddr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 5000} + + client := Connection{ + LocalAddr: turnListenAddr, + RemoteAddr: clientAddr, + Protocol: proto.ProtoUDP, + ChannelID: 0x4000, + } + peer := Connection{ + LocalAddr: turnRelayAddr, + RemoteAddr: peerAddr, + Protocol: proto.ProtoUDP, + } + + t.Run("remove from conntrack map", func(t *testing.T) { + assert.NoError(t, + nullEngine.Upsert(client, peer), + "error in upserting client connection") + + assert.NoError(t, + nullEngine.Upsert(peer, client), + "error in upserting peer connection") + + assert.NoError(t, + nullEngine.Remove(client, peer), + "error in removing client connection") + + assert.Error(t, + nullEngine.Remove(client, peer), + "error in removing non-existing client connection") + + assert.NoError(t, + nullEngine.Remove(peer, client), + "error in removing peer connection") + + assert.Error(t, + nullEngine.Remove(peer, client), + "error in removing non-existing peer connection") + }) + + t.Run("upsert/remove entries of the conntrack map", func(t *testing.T) { + ct, _ := nullEngine.List() + assert.Equal(t, 0, len(ct), "map should be empty at start") + + assert.NoError(t, + nullEngine.Upsert(client, peer), + "error in upserting client connection") + + assert.NoError(t, + nullEngine.Upsert(peer, client), + "error in upserting peer connection") + + ct, _ = nullEngine.List() + assert.Equal(t, 2, len(ct), "map should have two elements") + + assert.NoError(t, + nullEngine.Remove(client, peer), + "error in removing non-existing client connection") + + ct, _ = nullEngine.List() + assert.Equal(t, 1, len(ct), "map should have two elements") + }) +} diff --git a/internal/offload/offload.go b/internal/offload/offload.go new file mode 100644 index 00000000..47489237 --- /dev/null +++ b/internal/offload/offload.go @@ -0,0 +1,52 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +// Package offload implements a kernel-offload engine to speed up transporting ChannelData messages +// +//nolint:gochecknoinits +package offload + +import ( + "fmt" + "net" + + "github.com/pion/logging" + "github.com/pion/turn/v3/internal/proto" +) + +// Engine represents the network offloading engine +// +//nolint:gochecknoglobals +var Engine OffloadEngine + +// Init Engine as NullOffload +func init() { + log := logging.NewDefaultLoggerFactory().NewLogger("offload") + Engine, _ = NewNullEngine(log) +} + +// OffloadEngine provides a general interface for offloading techniques (e.g., XDP) +// +//nolint:revive +type OffloadEngine interface { + Init() error + Shutdown() + Upsert(client, peer Connection) error + Remove(client, peer Connection) error + List() (map[Connection]Connection, error) +} + +// Connection combines offload engine identifiers required for uinquely identifying allocation channel bindings. Depending of the used offload engine, some values are not required. For example, the SockFd has no role for an XDP offload +type Connection struct { + RemoteAddr net.Addr + LocalAddr net.Addr + Protocol proto.Protocol + SocketFd uintptr + ChannelID uint32 +} + +func (c *Connection) String() string { + return fmt.Sprintf("%s:local:%s-remote:%s-chan:%d", + c.RemoteAddr.Network(), c.LocalAddr.String(), c.RemoteAddr.String(), + c.ChannelID) +} diff --git a/internal/offload/xdp.go b/internal/offload/xdp.go new file mode 100644 index 00000000..f24c0b67 --- /dev/null +++ b/internal/offload/xdp.go @@ -0,0 +1,398 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package offload + +import ( + "encoding/binary" + "net" + "os" + "strconv" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/link" + "github.com/pion/logging" + "github.com/pion/turn/v3/internal/offload/xdp" + "github.com/pion/turn/v3/internal/proto" +) + +// XdpEngine represents an XDP offload engine; implements OffloadEngine +type XdpEngine struct { + interfaces []net.Interface + upstreamMap *ebpf.Map + downstreamMap *ebpf.Map + ipaddrsMap *ebpf.Map + statsMap *ebpf.Map + objs xdp.BpfObjects + links []link.Link + log logging.LeveledLogger +} + +// NewXdpEngine creates an uninitialized XDP offload engine +func NewXdpEngine(ifs []net.Interface, log logging.LeveledLogger) (*XdpEngine, error) { + if xdp.IsInitialized { + return nil, ErrXDPAlreadyInitialized + } + e := &XdpEngine{ + interfaces: ifs, + log: log, + } + return e, nil +} + +func (o *XdpEngine) unpinMaps() error { + // unlink maps + if o.downstreamMap != nil { + if err := o.downstreamMap.Unpin(); err != nil { + return err + } + } + if o.upstreamMap != nil { + if err := o.upstreamMap.Unpin(); err != nil { + return err + } + } + if o.statsMap != nil { + if err := o.statsMap.Unpin(); err != nil { + return err + } + } + + if o.ipaddrsMap != nil { + if err := o.ipaddrsMap.Unpin(); err != nil { + return err + } + } + + return nil +} + +// Init sets up the environment for the XDP program: enables IPv4 +// forwarding in the kernel; links maps of the XDP program; and, +// starts the XDP program on network interfaces. +// Based on https://github.com/l7mp/l7mp/blob/master/udp-offload.js#L232 +func (o *XdpEngine) Init() error { + if xdp.IsInitialized { + return ErrXDPAlreadyInitialized + } + // enable ipv4 forwarding + f := "/proc/sys/net/ipv4/conf/all/forwarding" + data, err := os.ReadFile(f) + if err != nil { + return err + } + val, err := strconv.Atoi(string(data[:len(data)-1])) + if err != nil { + return err + } + if val != 1 { + //nolint:gosec + if e := os.WriteFile(f, []byte("1"), 0o644); e != nil { + return e + } + } + + // unlink maps if they exist + if err = o.unpinMaps(); err != nil { + return err + } + + // Load pre-compiled programs into the kernel + o.objs = xdp.BpfObjects{} + opts := ebpf.CollectionOptions{Maps: ebpf.MapOptions{PinPath: xdp.BpfMapPinPath}} + if err = xdp.LoadBpfObjects(&o.objs, &opts); err != nil { + return err + } + o.downstreamMap = o.objs.TurnServerDownstreamMap + o.upstreamMap = o.objs.TurnServerUpstreamMap + o.ipaddrsMap = o.objs.TurnServerInterfaceIpAddressesMap + o.statsMap = o.objs.TurnServerStatsMap + + ifNames := []string{} + // Attach program to interfaces + for _, iface := range o.interfaces { + l, linkErr := link.AttachXDP(link.XDPOptions{ + Program: o.objs.XdpProgFunc, + Interface: iface.Index, + }) + if linkErr != nil { + return linkErr + } + o.links = append(o.links, l) + ifNames = append(ifNames, iface.Name) + } + + // populate interface IP addresses map + ifIPMap, err := collectInterfaceIpv4Addrs() + if err != nil { + return err + } + for idx, addr := range ifIPMap { + err := o.ipaddrsMap.Put(uint32(idx), + binary.LittleEndian.Uint32(addr)) + if err != nil { + return err + } + } + + xdp.IsInitialized = true + + o.log.Infof("Init done on interfaces: %s", ifNames) + return nil +} + +// Shutdown stops the XDP offloading engine +func (o *XdpEngine) Shutdown() { + if !xdp.IsInitialized { + return + } + + // close objects + if err := o.objs.Close(); err != nil { + o.log.Errorf("Error during shutdown: %s", err.Error()) + return + } + + // close links + for _, l := range o.links { + if err := l.Close(); err != nil { + o.log.Errorf("Error during shutdown: %s", err.Error()) + return + } + } + + // unlink maps + if err := o.unpinMaps(); err != nil { + o.log.Errorf("Error during shutdown: %s", err.Error()) + return + } + + xdp.IsInitialized = false + + o.log.Info("Shutdown done") +} + +// Upsert creates a new XDP offload between a client and a peer +func (o *XdpEngine) Upsert(client, peer Connection) error { + err := o.validate(client, peer) + if err != nil { + return err + } + p, err := bpfFourTuple(peer) + if err != nil { + return err + } + cft, err := bpfFourTuple(client) + if err != nil { + return err + } + c := xdp.BpfFourTupleWithChannelId{ + FourTuple: *cft, + ChannelId: client.ChannelID, + } + + if err := o.downstreamMap.Put(p, c); err != nil { + o.log.Errorf("Error in upsert (downstream map): %s", err.Error()) + return err + } + if err := o.upstreamMap.Put(c, p); err != nil { + o.log.Errorf("Error in upsert (upstream map): %s", err.Error()) + return err + } + + // register with Local IP = 0 to support multi NIC setups + p.LocalIp = 0 + if err := o.downstreamMap.Put(p, c); err != nil { + o.log.Errorf("Error in upsert (downstream map): %s", err.Error()) + return err + } + + o.log.Infof("Create offload between client: %+v and peer: %+v", client, peer) + return nil +} + +// Remove removes an XDP offload between a client and a peer +func (o *XdpEngine) Remove(client, peer Connection) error { + p, err := bpfFourTuple(peer) + if err != nil { + return err + } + cft, err := bpfFourTuple(client) + if err != nil { + return err + } + c := xdp.BpfFourTupleWithChannelId{ + FourTuple: *cft, + ChannelId: client.ChannelID, + } + + if err := o.downstreamMap.Delete(p); err != nil { + return err + } + + if err := o.upstreamMap.Delete(c); err != nil { + return err + } + + p.LocalIp = 0 + if err := o.downstreamMap.Delete(p); err != nil { + return err + } + + o.log.Infof("Remove offload between client: %+v and peer: %+v", client, peer) + return nil +} + +// List returns all upstream/downstream offloads stored in the corresponding eBPF maps +func (o *XdpEngine) List() (map[Connection]Connection, error) { + var p xdp.BpfFourTuple + var c xdp.BpfFourTupleWithChannelId + + r := make(map[Connection]Connection) + + iterD := o.downstreamMap.Iterate() + for iterD.Next(&p, &c) { + k := Connection{ + RemoteAddr: &net.UDPAddr{IP: ipv4(p.RemoteIp), Port: int(p.RemotePort)}, + LocalAddr: &net.UDPAddr{IP: ipv4(p.LocalIp), Port: int(p.LocalPort)}, + Protocol: proto.ProtoUDP, + } + v := Connection{ + RemoteAddr: &net.UDPAddr{ + IP: ipv4(c.FourTuple.RemoteIp), + Port: int(c.FourTuple.RemotePort), + }, + LocalAddr: &net.UDPAddr{ + IP: ipv4(c.FourTuple.LocalIp), + Port: int(c.FourTuple.LocalPort), + }, + Protocol: proto.ProtoUDP, + ChannelID: c.ChannelId, + } + r[k] = v + } + if err := iterD.Err(); err != nil { + return nil, err + } + + iterU := o.upstreamMap.Iterate() + for iterU.Next(&c, &p) { + k := Connection{ + RemoteAddr: &net.UDPAddr{ + IP: ipv4(c.FourTuple.RemoteIp), + Port: int(c.FourTuple.RemotePort), + }, + LocalAddr: &net.UDPAddr{ + IP: ipv4(c.FourTuple.LocalIp), + Port: int(c.FourTuple.LocalPort), + }, + Protocol: proto.ProtoUDP, + ChannelID: c.ChannelId, + } + v := Connection{ + RemoteAddr: &net.UDPAddr{IP: ipv4(p.RemoteIp), Port: int(p.RemotePort)}, + LocalAddr: &net.UDPAddr{IP: ipv4(p.LocalIp), Port: int(p.LocalPort)}, + Protocol: proto.ProtoUDP, + } + r[k] = v + } + if err := iterU.Err(); err != nil { + return nil, err + } + + return r, nil +} + +// validate checks the eligibility of an offload between a client and a peer +func (o *XdpEngine) validate(client, peer Connection) error { + // check UDP + if (client.Protocol != proto.ProtoUDP) || (peer.Protocol != proto.ProtoUDP) { + err := ErrUnsupportedProtocol + o.log.Warn(err.Error()) + return err + } + // check this is not a local redirect + p, ok := peer.RemoteAddr.(*net.UDPAddr) + if !ok { + err := ErrUnsupportedProtocol + o.log.Warn(err.Error()) + return err + } + localNet := net.IPNet{IP: net.ParseIP("127.0.0.1"), Mask: net.CIDRMask(8, 32)} + ifAddrs, err := collectInterfaceIpv4Addrs() + if err != nil { + o.log.Warn(err.Error()) + return err + } + for _, ip := range ifAddrs { + if (p.IP.Equal(ip)) && (!localNet.Contains(p.IP)) { + err := ErrXDPLocalRedirectProhibited + o.log.Warn(err.Error()) + return err + } + } + + return nil +} + +// collectInterfaceIpv4Addrs creates a map of interface ids to interface IPv4 addresses +func collectInterfaceIpv4Addrs() (map[int]net.IP, error) { + ifs, err := net.Interfaces() + if err != nil { + return nil, err + } + m := make(map[int]net.IP) + for _, netIf := range ifs { + addrs, err := netIf.Addrs() + if err == nil && len(addrs) > 0 { + n, ok := addrs[0].(*net.IPNet) + if !ok { + continue + } + if addr := n.IP.To4(); addr != nil { + m[netIf.Index] = addr + } + } + } + return m, nil +} + +func hostToNetShort(i uint16) uint16 { + b := make([]byte, 2) + binary.LittleEndian.PutUint16(b, i) + return binary.BigEndian.Uint16(b) +} + +func ipv4(i uint32) net.IP { + ip := make(net.IP, 4) + binary.LittleEndian.PutUint32(ip, i) + return ip +} + +// bpfFourTuple creates an xdp.BpfFourTuple struct that can be used in the XDP offload maps +func bpfFourTuple(c Connection) (*xdp.BpfFourTuple, error) { + if c.Protocol != proto.ProtoUDP { + return nil, ErrUnsupportedProtocol + } + l, lok := c.LocalAddr.(*net.UDPAddr) + r, rok := c.RemoteAddr.(*net.UDPAddr) + if !lok || !rok { + return nil, ErrUnsupportedProtocol + } + var localIP uint32 + if l.IP.To4() != nil { + localIP = binary.LittleEndian.Uint32(l.IP.To4()) + } + var remoteIP uint32 + if r.IP.To4() != nil { + remoteIP = binary.LittleEndian.Uint32(r.IP.To4()) + } + + t := xdp.BpfFourTuple{ + RemoteIp: remoteIP, + LocalIp: localIP, + RemotePort: hostToNetShort(uint16(r.Port)), + LocalPort: hostToNetShort(uint16(l.Port)), + } + return &t, nil +} diff --git a/internal/offload/xdp/.clang-format b/internal/offload/xdp/.clang-format new file mode 100644 index 00000000..242e6e16 --- /dev/null +++ b/internal/offload/xdp/.clang-format @@ -0,0 +1,13 @@ +# SPDX-FileCopyrightText: 2023 The Pion community +# SPDX-License-Identifier: MIT + +BasedOnStyle: LLVM +IndentWidth: 8 +UseTab: Always +BreakBeforeBraces: Linux +AllowShortIfStatementsOnASingleLine: false +IndentCaseLabels: false +AlwaysBreakBeforeMultilineStrings: true +AllowShortBlocksOnASingleLine: false +ColumnLimit: 103 +ContinuationIndentWidth: 8 diff --git a/internal/offload/xdp/bpf_bpfeb.go b/internal/offload/xdp/bpf_bpfeb.go new file mode 100644 index 00000000..2ad8b1ff --- /dev/null +++ b/internal/offload/xdp/bpf_bpfeb.go @@ -0,0 +1,146 @@ +// Code generated by bpf2go; DO NOT EDIT. +//go:build mips || mips64 || ppc64 || s390x + +package xdp + +import ( + "bytes" + _ "embed" + "fmt" + "io" + + "github.com/cilium/ebpf" +) + +type bpfFourTuple struct { + RemoteIp uint32 + LocalIp uint32 + RemotePort uint16 + LocalPort uint16 +} + +type bpfFourTupleStat struct { + Pkts uint64 + Bytes uint64 + TimestampLast uint64 +} + +type bpfFourTupleWithChannelId struct { + FourTuple bpfFourTuple + ChannelId uint32 +} + +// loadBpf returns the embedded CollectionSpec for bpf. +func loadBpf() (*ebpf.CollectionSpec, error) { + reader := bytes.NewReader(_BpfBytes) + spec, err := ebpf.LoadCollectionSpecFromReader(reader) + if err != nil { + return nil, fmt.Errorf("can't load bpf: %w", err) + } + + return spec, err +} + +// loadBpfObjects loads bpf and converts it into a struct. +// +// The following types are suitable as obj argument: +// +// *bpfObjects +// *bpfPrograms +// *bpfMaps +// +// See ebpf.CollectionSpec.LoadAndAssign documentation for details. +func loadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { + spec, err := loadBpf() + if err != nil { + return err + } + + return spec.LoadAndAssign(obj, opts) +} + +// bpfSpecs contains maps and programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfSpecs struct { + bpfProgramSpecs + bpfMapSpecs +} + +// bpfSpecs contains programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfProgramSpecs struct { + XdpProgFunc *ebpf.ProgramSpec `ebpf:"xdp_prog_func"` +} + +// bpfMapSpecs contains maps before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfMapSpecs struct { + TurnServerDownstreamMap *ebpf.MapSpec `ebpf:"turn_server_downstream_map"` + TurnServerInterfaceIpAddressesMap *ebpf.MapSpec `ebpf:"turn_server_interface_ip_addresses_map"` + TurnServerStatsMap *ebpf.MapSpec `ebpf:"turn_server_stats_map"` + TurnServerUpstreamMap *ebpf.MapSpec `ebpf:"turn_server_upstream_map"` +} + +// bpfObjects contains all objects after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfObjects struct { + bpfPrograms + bpfMaps +} + +func (o *bpfObjects) Close() error { + return _BpfClose( + &o.bpfPrograms, + &o.bpfMaps, + ) +} + +// bpfMaps contains all maps after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfMaps struct { + TurnServerDownstreamMap *ebpf.Map `ebpf:"turn_server_downstream_map"` + TurnServerInterfaceIpAddressesMap *ebpf.Map `ebpf:"turn_server_interface_ip_addresses_map"` + TurnServerStatsMap *ebpf.Map `ebpf:"turn_server_stats_map"` + TurnServerUpstreamMap *ebpf.Map `ebpf:"turn_server_upstream_map"` +} + +func (m *bpfMaps) Close() error { + return _BpfClose( + m.TurnServerDownstreamMap, + m.TurnServerInterfaceIpAddressesMap, + m.TurnServerStatsMap, + m.TurnServerUpstreamMap, + ) +} + +// bpfPrograms contains all programs after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfPrograms struct { + XdpProgFunc *ebpf.Program `ebpf:"xdp_prog_func"` +} + +func (p *bpfPrograms) Close() error { + return _BpfClose( + p.XdpProgFunc, + ) +} + +func _BpfClose(closers ...io.Closer) error { + for _, closer := range closers { + if err := closer.Close(); err != nil { + return err + } + } + return nil +} + +// Do not access this directly. +// +//go:embed bpf_bpfeb.o +var _BpfBytes []byte diff --git a/internal/offload/xdp/bpf_bpfeb.o b/internal/offload/xdp/bpf_bpfeb.o new file mode 100644 index 00000000..02f0f3ac Binary files /dev/null and b/internal/offload/xdp/bpf_bpfeb.o differ diff --git a/internal/offload/xdp/bpf_bpfel.go b/internal/offload/xdp/bpf_bpfel.go new file mode 100644 index 00000000..f2b2bb84 --- /dev/null +++ b/internal/offload/xdp/bpf_bpfel.go @@ -0,0 +1,146 @@ +// Code generated by bpf2go; DO NOT EDIT. +//go:build 386 || amd64 || arm || arm64 || loong64 || mips64le || mipsle || ppc64le || riscv64 + +package xdp + +import ( + "bytes" + _ "embed" + "fmt" + "io" + + "github.com/cilium/ebpf" +) + +type bpfFourTuple struct { + RemoteIp uint32 + LocalIp uint32 + RemotePort uint16 + LocalPort uint16 +} + +type bpfFourTupleStat struct { + Pkts uint64 + Bytes uint64 + TimestampLast uint64 +} + +type bpfFourTupleWithChannelId struct { + FourTuple bpfFourTuple + ChannelId uint32 +} + +// loadBpf returns the embedded CollectionSpec for bpf. +func loadBpf() (*ebpf.CollectionSpec, error) { + reader := bytes.NewReader(_BpfBytes) + spec, err := ebpf.LoadCollectionSpecFromReader(reader) + if err != nil { + return nil, fmt.Errorf("can't load bpf: %w", err) + } + + return spec, err +} + +// loadBpfObjects loads bpf and converts it into a struct. +// +// The following types are suitable as obj argument: +// +// *bpfObjects +// *bpfPrograms +// *bpfMaps +// +// See ebpf.CollectionSpec.LoadAndAssign documentation for details. +func loadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { + spec, err := loadBpf() + if err != nil { + return err + } + + return spec.LoadAndAssign(obj, opts) +} + +// bpfSpecs contains maps and programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfSpecs struct { + bpfProgramSpecs + bpfMapSpecs +} + +// bpfSpecs contains programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfProgramSpecs struct { + XdpProgFunc *ebpf.ProgramSpec `ebpf:"xdp_prog_func"` +} + +// bpfMapSpecs contains maps before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfMapSpecs struct { + TurnServerDownstreamMap *ebpf.MapSpec `ebpf:"turn_server_downstream_map"` + TurnServerInterfaceIpAddressesMap *ebpf.MapSpec `ebpf:"turn_server_interface_ip_addresses_map"` + TurnServerStatsMap *ebpf.MapSpec `ebpf:"turn_server_stats_map"` + TurnServerUpstreamMap *ebpf.MapSpec `ebpf:"turn_server_upstream_map"` +} + +// bpfObjects contains all objects after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfObjects struct { + bpfPrograms + bpfMaps +} + +func (o *bpfObjects) Close() error { + return _BpfClose( + &o.bpfPrograms, + &o.bpfMaps, + ) +} + +// bpfMaps contains all maps after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfMaps struct { + TurnServerDownstreamMap *ebpf.Map `ebpf:"turn_server_downstream_map"` + TurnServerInterfaceIpAddressesMap *ebpf.Map `ebpf:"turn_server_interface_ip_addresses_map"` + TurnServerStatsMap *ebpf.Map `ebpf:"turn_server_stats_map"` + TurnServerUpstreamMap *ebpf.Map `ebpf:"turn_server_upstream_map"` +} + +func (m *bpfMaps) Close() error { + return _BpfClose( + m.TurnServerDownstreamMap, + m.TurnServerInterfaceIpAddressesMap, + m.TurnServerStatsMap, + m.TurnServerUpstreamMap, + ) +} + +// bpfPrograms contains all programs after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfPrograms struct { + XdpProgFunc *ebpf.Program `ebpf:"xdp_prog_func"` +} + +func (p *bpfPrograms) Close() error { + return _BpfClose( + p.XdpProgFunc, + ) +} + +func _BpfClose(closers ...io.Closer) error { + for _, closer := range closers { + if err := closer.Close(); err != nil { + return err + } + } + return nil +} + +// Do not access this directly. +// +//go:embed bpf_bpfel.o +var _BpfBytes []byte diff --git a/internal/offload/xdp/bpf_bpfel.o b/internal/offload/xdp/bpf_bpfel.o new file mode 100644 index 00000000..02690cbe Binary files /dev/null and b/internal/offload/xdp/bpf_bpfel.o differ diff --git a/internal/offload/xdp/headers/.gitignore b/internal/offload/xdp/headers/.gitignore new file mode 100644 index 00000000..67d728ce --- /dev/null +++ b/internal/offload/xdp/headers/.gitignore @@ -0,0 +1,8 @@ +# SPDX-FileCopyrightText: 2023 The Pion community +# SPDX-License-Identifier: MIT + +# headers automatically fetched from libbpf +bpf_endian.h +bpf_helper_defs.h +bpf_helpers.h +bpf_tracing.h \ No newline at end of file diff --git a/internal/offload/xdp/headers/common.h b/internal/offload/xdp/headers/common.h new file mode 100644 index 00000000..936de5be --- /dev/null +++ b/internal/offload/xdp/headers/common.h @@ -0,0 +1,149 @@ +/* SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) */ +// This is a compact version of `vmlinux.h` to be used in the examples using C code. + +#pragma once + +typedef unsigned char __u8; +typedef short int __s16; +typedef short unsigned int __u16; +typedef int __s32; +typedef unsigned int __u32; +typedef long long int __s64; +typedef long long unsigned int __u64; +typedef __u8 u8; +typedef __s16 s16; +typedef __u16 u16; +typedef __s32 s32; +typedef __u32 u32; +typedef __s64 s64; +typedef __u64 u64; +typedef __u16 __le16; +typedef __u16 __be16; +typedef __u32 __be32; +typedef __u64 __be64; +typedef __u32 __wsum; + +#include "bpf_helpers.h" + +enum bpf_map_type { + BPF_MAP_TYPE_UNSPEC = 0, + BPF_MAP_TYPE_HASH = 1, + BPF_MAP_TYPE_ARRAY = 2, + BPF_MAP_TYPE_PROG_ARRAY = 3, + BPF_MAP_TYPE_PERF_EVENT_ARRAY = 4, + BPF_MAP_TYPE_PERCPU_HASH = 5, + BPF_MAP_TYPE_PERCPU_ARRAY = 6, + BPF_MAP_TYPE_STACK_TRACE = 7, + BPF_MAP_TYPE_CGROUP_ARRAY = 8, + BPF_MAP_TYPE_LRU_HASH = 9, + BPF_MAP_TYPE_LRU_PERCPU_HASH = 10, + BPF_MAP_TYPE_LPM_TRIE = 11, + BPF_MAP_TYPE_ARRAY_OF_MAPS = 12, + BPF_MAP_TYPE_HASH_OF_MAPS = 13, + BPF_MAP_TYPE_DEVMAP = 14, + BPF_MAP_TYPE_SOCKMAP = 15, + BPF_MAP_TYPE_CPUMAP = 16, + BPF_MAP_TYPE_XSKMAP = 17, + BPF_MAP_TYPE_SOCKHASH = 18, + BPF_MAP_TYPE_CGROUP_STORAGE = 19, + BPF_MAP_TYPE_REUSEPORT_SOCKARRAY = 20, + BPF_MAP_TYPE_PERCPU_CGROUP_STORAGE = 21, + BPF_MAP_TYPE_QUEUE = 22, + BPF_MAP_TYPE_STACK = 23, + BPF_MAP_TYPE_SK_STORAGE = 24, + BPF_MAP_TYPE_DEVMAP_HASH = 25, + BPF_MAP_TYPE_STRUCT_OPS = 26, + BPF_MAP_TYPE_RINGBUF = 27, + BPF_MAP_TYPE_INODE_STORAGE = 28, +}; + +enum xdp_action { + XDP_ABORTED = 0, + XDP_DROP = 1, + XDP_PASS = 2, + XDP_TX = 3, + XDP_REDIRECT = 4, +}; + +struct xdp_md { + __u32 data; + __u32 data_end; + __u32 data_meta; + __u32 ingress_ifindex; + __u32 rx_queue_index; + __u32 egress_ifindex; +}; + +typedef __u16 __sum16; + +#define ETH_P_IP 0x0800 + +struct ethhdr { + unsigned char h_dest[6]; + unsigned char h_source[6]; + __be16 h_proto; +}; + +struct iphdr { + __u8 ihl: 4; + __u8 version: 4; + __u8 tos; + __be16 tot_len; + __be16 id; + __be16 frag_off; + __u8 ttl; + __u8 protocol; + __sum16 check; + __be32 saddr; + __be32 daddr; +}; + +enum { + BPF_ANY = 0, + BPF_NOEXIST = 1, + BPF_EXIST = 2, + BPF_F_LOCK = 4, +}; + +/* BPF_FUNC_perf_event_output, BPF_FUNC_perf_event_read and + * BPF_FUNC_perf_event_read_value flags. + */ +#define BPF_F_INDEX_MASK 0xffffffffULL +#define BPF_F_CURRENT_CPU BPF_F_INDEX_MASK + +#if defined(__TARGET_ARCH_x86) +struct pt_regs { + /* + * C ABI says these regs are callee-preserved. They aren't saved on kernel entry + * unless syscall needs a complete, fully filled "struct pt_regs". + */ + unsigned long r15; + unsigned long r14; + unsigned long r13; + unsigned long r12; + unsigned long rbp; + unsigned long rbx; + /* These regs are callee-clobbered. Always saved on kernel entry. */ + unsigned long r11; + unsigned long r10; + unsigned long r9; + unsigned long r8; + unsigned long rax; + unsigned long rcx; + unsigned long rdx; + unsigned long rsi; + unsigned long rdi; + /* + * On syscall entry, this is syscall#. On CPU exception, this is error code. + * On hw interrupt, it's IRQ number: + */ + unsigned long orig_rax; + /* Return frame for iretq */ + unsigned long rip; + unsigned long cs; + unsigned long eflags; + unsigned long rsp; + unsigned long ss; + /* top of stack page */ +}; +#endif /* __TARGET_ARCH_x86 */ diff --git a/internal/offload/xdp/headers/fetch-libbpf-headers.sh b/internal/offload/xdp/headers/fetch-libbpf-headers.sh new file mode 100755 index 00000000..1ecad3fa --- /dev/null +++ b/internal/offload/xdp/headers/fetch-libbpf-headers.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +# SPDX-FileCopyrightText: 2023 The Pion community +# SPDX-License-Identifier: MIT + +# Version of libbpf to fetch headers from +LIBBPF_VERSION=1.4.3 + +# The headers we want +prefix=libbpf-"$LIBBPF_VERSION" +headers=( + "$prefix"/src/bpf_endian.h + "$prefix"/src/bpf_helper_defs.h + "$prefix"/src/bpf_helpers.h + "$prefix"/src/bpf_tracing.h +) + +# Fetch libbpf release and extract the desired headers +curl -sL "https://github.com/libbpf/libbpf/archive/refs/tags/v${LIBBPF_VERSION}.tar.gz" | \ + tar -xz --xform='s#.*/##' "${headers[@]}" diff --git a/internal/offload/xdp/headers/parsing_helpers.h b/internal/offload/xdp/headers/parsing_helpers.h new file mode 100644 index 00000000..70432333 --- /dev/null +++ b/internal/offload/xdp/headers/parsing_helpers.h @@ -0,0 +1,277 @@ +/* SPDX-License-Identifier: (GPL-2.0-or-later OR BSD-2-clause) */ +/* + * This file contains parsing functions that are used in the packetXX XDP + * programs. The functions are marked as __always_inline, and fully defined in + * this header file to be included in the BPF program. + * + * Each helper parses a packet header, including doing bounds checking, and + * returns the type of its contents if successful, and -1 otherwise. + * + * For Ethernet and IP headers, the content type is the type of the payload + * (h_proto for Ethernet, nexthdr for IPv6), for ICMP it is the ICMP type field. + * All return values are in host byte order. + * + * The versions of the functions included here are slightly expanded versions of + * the functions in the packet01 lesson. For instance, the Ethernet header + * parsing has support for parsing VLAN tags. + */ + +#ifndef __PARSING_HELPERS_H +#define __PARSING_HELPERS_H + +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wcompare-distinct-pointer-types" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* Header cursor to keep track of current parsing position */ +struct hdr_cursor { + void *pos; +}; + +/* + * struct vlan_hdr - vlan header + * @h_vlan_TCI: priority and VLAN ID + * @h_vlan_encapsulated_proto: packet type ID or len + */ +struct vlan_hdr { + __be16 h_vlan_TCI; + __be16 h_vlan_encapsulated_proto; +}; + +/* + * Struct icmphdr_common represents the common part of the icmphdr and icmp6hdr + * structures. + */ +struct icmphdr_common { + __u8 type; + __u8 code; + __sum16 cksum; +}; + +/* Allow users of header file to redefine VLAN max depth */ +#ifndef VLAN_MAX_DEPTH +#define VLAN_MAX_DEPTH 2 +#endif + +#define VLAN_VID_MASK 0x0fff /* VLAN Identifier */ +/* Struct for collecting VLANs after parsing via parse_ethhdr_vlan */ +struct collect_vlans { + __u16 id[VLAN_MAX_DEPTH]; +}; + +static __always_inline int proto_is_vlan(__u16 h_proto) +{ + return !!(h_proto == bpf_htons(ETH_P_8021Q) || + h_proto == bpf_htons(ETH_P_8021AD)); +} + +/* Notice, parse_ethhdr() will skip VLAN tags, by advancing nh->pos and returns + * next header EtherType, BUT the ethhdr pointer supplied still points to the + * Ethernet header. Thus, caller can look at eth->h_proto to see if this was a + * VLAN tagged packet. + */ +static __always_inline int parse_ethhdr_vlan(struct hdr_cursor *nh, + void *data_end, + struct ethhdr **ethhdr, + struct collect_vlans *vlans) +{ + struct ethhdr *eth = nh->pos; + int hdrsize = sizeof(*eth); + struct vlan_hdr *vlh; + __u16 h_proto; + int i; + + /* Byte-count bounds check; check if current pointer + size of header + * is after data_end. + */ + if (nh->pos + hdrsize > data_end) + return -1; + + nh->pos += hdrsize; + *ethhdr = eth; + vlh = nh->pos; + h_proto = eth->h_proto; + + /* Use loop unrolling to avoid the verifier restriction on loops; + * support up to VLAN_MAX_DEPTH layers of VLAN encapsulation. + */ + #pragma unroll + for (i = 0; i < VLAN_MAX_DEPTH; i++) { + if (!proto_is_vlan(h_proto)) + break; + + if (vlh + 1 > data_end) + break; + + h_proto = vlh->h_vlan_encapsulated_proto; + if (vlans) /* collect VLAN ids */ + vlans->id[i] = + (bpf_ntohs(vlh->h_vlan_TCI) & VLAN_VID_MASK); + + vlh++; + } + + nh->pos = vlh; + return h_proto; /* network-byte-order */ +} + +static __always_inline int parse_ethhdr(struct hdr_cursor *nh, + void *data_end, + struct ethhdr **ethhdr) +{ + /* Expect compiler removes the code that collects VLAN ids */ + return parse_ethhdr_vlan(nh, data_end, ethhdr, NULL); +} + +static __always_inline int parse_ip6hdr(struct hdr_cursor *nh, + void *data_end, + struct ipv6hdr **ip6hdr) +{ + struct ipv6hdr *ip6h = nh->pos; + + /* Pointer-arithmetic bounds check; pointer +1 points to after end of + * thing being pointed to. We will be using this style in the remainder + * of the tutorial. + */ + if (ip6h + 1 > data_end) + return -1; + + nh->pos = ip6h + 1; + *ip6hdr = ip6h; + + return ip6h->nexthdr; +} + +static __always_inline int parse_iphdr(struct hdr_cursor *nh, + void *data_end, + struct iphdr **iphdr) +{ + struct iphdr *iph = nh->pos; + int hdrsize; + + if (iph + 1 > data_end) + return -1; + + hdrsize = iph->ihl * 4; + /* Sanity check packet field is valid */ + if(hdrsize < sizeof(*iph)) + return -1; + + /* Variable-length IPv4 header, need to use byte-based arithmetic */ + if (nh->pos + hdrsize > data_end) + return -1; + + nh->pos += hdrsize; + *iphdr = iph; + + return iph->protocol; +} + +static __always_inline int parse_icmp6hdr(struct hdr_cursor *nh, + void *data_end, + struct icmp6hdr **icmp6hdr) +{ + struct icmp6hdr *icmp6h = nh->pos; + + if (icmp6h + 1 > data_end) + return -1; + + nh->pos = icmp6h + 1; + *icmp6hdr = icmp6h; + + return icmp6h->icmp6_type; +} + +static __always_inline int parse_icmphdr(struct hdr_cursor *nh, + void *data_end, + struct icmphdr **icmphdr) +{ + struct icmphdr *icmph = nh->pos; + + if (icmph + 1 > data_end) + return -1; + + nh->pos = icmph + 1; + *icmphdr = icmph; + + return icmph->type; +} + +static __always_inline int parse_icmphdr_common(struct hdr_cursor *nh, + void *data_end, + struct icmphdr_common **icmphdr) +{ + struct icmphdr_common *h = nh->pos; + + if (h + 1 > data_end) + return -1; + + nh->pos = h + 1; + *icmphdr = h; + + return h->type; +} + +/* + * parse_udphdr: parse the udp header and return the length of the udp payload + */ +static __always_inline int parse_udphdr(struct hdr_cursor *nh, + void *data_end, + struct udphdr **udphdr) +{ + int len; + struct udphdr *h = nh->pos; + + if (h + 1 > data_end) + return -1; + + nh->pos = h + 1; + *udphdr = h; + + len = bpf_ntohs(h->len) - sizeof(struct udphdr); + if (len < 0) + return -1; + + return len; +} + +/* + * parse_tcphdr: parse and return the length of the tcp header + */ +static __always_inline int parse_tcphdr(struct hdr_cursor *nh, + void *data_end, + struct tcphdr **tcphdr) +{ + int len; + struct tcphdr *h = nh->pos; + + if (h + 1 > data_end) + return -1; + + len = h->doff * 4; + /* Sanity check packet field is valid */ + if(len < sizeof(*h)) + return -1; + + /* Variable-length TCP header, need to use byte-based arithmetic */ + if (nh->pos + len > data_end) + return -1; + + nh->pos += len; + *tcphdr = h; + + return len; +} + +#pragma clang diagnostic pop + +#endif /* __PARSING_HELPERS_H */ diff --git a/internal/offload/xdp/utils.h b/internal/offload/xdp/utils.h new file mode 100644 index 00000000..0a297990 --- /dev/null +++ b/internal/offload/xdp/utils.h @@ -0,0 +1,76 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +// +build ignore + +#ifndef __XDP_TURN_OFFLOAD_UTILS__ +#define __XDP_TURN_OFFLOAD_UTILS__ + + +#ifndef memcpy +#define memcpy(dest, src, n) __builtin_memcpy((dest), (src), (n)) +#endif +#ifndef memmove +#define memmove(dest, src, n) __builtin_memmove((dest), (src), (n)) +#endif + +#ifndef likely +#define likely(x) __builtin_expect((x),1) +#endif +#ifndef unlikely +#define unlikely(x) __builtin_expect((x),0) +#endif +/* from katran/lib/bpf/csum_helpers.h */ +__attribute__((__always_inline__)) static inline __u16 csum_fold_helper(__u64 csum) +{ + int i; +#pragma unroll + for (i = 0; i < 4; i++) { + if (csum >> 16) + csum = (csum & 0xffff) + (csum >> 16); + } + return ~csum; +} + +/* from katran/lib/bpf/csum_helpers.h */ +__attribute__((__always_inline__)) static inline void ipv4_csum(void *data_start, int data_size, + __u64 *csum) +{ + *csum = bpf_csum_diff(0, 0, data_start, data_size, *csum); + *csum = csum_fold_helper(*csum); +} + +/* from AirVantage/sbulb/sbulb/bpf/checksum.c */ +// Update checksum following RFC 1624 (Eqn. 3): +// https://tools.ietf.org/html/rfc1624 +// HC' = ~(~HC + ~m + m') +// Where : +// HC - old checksum in header +// HC' - new checksum in header +// m - old value +// m' - new value +__attribute__((__always_inline__)) static inline void update_csum(__u64 *csum, __be32 old_addr, + __be32 new_addr) +{ + // ~HC + *csum = ~*csum; + *csum = *csum & 0xffff; + // + ~m + __u32 tmp; + tmp = ~old_addr; + *csum += tmp; + // + m + *csum += new_addr; + // then fold and complement result ! + *csum = csum_fold_helper(*csum); +} + +/* from AirVantage/sbulb/sbulb/bpf/ipv4.c */ +__attribute__((__always_inline__)) static inline int update_udp_checksum(__u64 cs, int old_addr, + int new_addr) +{ + update_csum(&cs, old_addr, new_addr); + return cs; +} + +#endif /* __XDP_TURN_OFFLOAD_UTILS__ */ diff --git a/internal/offload/xdp/xdp.c b/internal/offload/xdp/xdp.c new file mode 100644 index 00000000..34fac0d2 --- /dev/null +++ b/internal/offload/xdp/xdp.c @@ -0,0 +1,494 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +// +build ignore + +#include +#include +#include + +#include "bpf_endian.h" +#include "bpf_helpers.h" +#include "parsing_helpers.h" // taken from xdp-tutorial + +#include "utils.h" + +char __license[] SEC("license") = "Dual MIT/GPL"; + +#define MAX_MAP_ENTRIES 10240 +#define MAX_UDP_SIZE 1480 + +struct FourTuple { + __u32 remote_ip; + __u32 local_ip; + __u16 remote_port; + __u16 local_port; +}; + +struct FourTupleWithChannelId { + struct FourTuple four_tuple; + __u32 channel_id; +}; + +struct FourTupleStat { + __u64 pkts; + __u64 bytes; + __u64 timestamp_last; +}; + +enum ChanHdrAction { HDR_ADD, HDR_REMOVE }; + +// TURN TURN Peer Peer +// client server A B +// | | | | +// |-- ChannelBind req --------------->| | | +// | (Peer A to 0x4001) | | | +// | | | | +// |<---------- ChannelBind succ resp -| | | +// | | | | +// |-- (0x4001) data ----------------->| | | +// | |=== data ===>| | +// | | | | +// | |<== data ====| | +// |<------------------ (0x4001) data -| | | +// | | | | +// |--- Send ind (Peer A)------------->| | | +// | |=== data ===>| | +// | | | | +// | |<== data ====| | +// |<------------------ (0x4001) data -| | | +// | | | | +// RFC 8656 Figure 4 + +// to client +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __uint(max_entries, MAX_MAP_ENTRIES); + __uint(pinning, LIBBPF_PIN_BY_NAME); + __type(key, struct FourTuple); + __type(value, struct FourTupleWithChannelId); +} turn_server_downstream_map SEC(".maps"); + +// to media server +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __uint(max_entries, MAX_MAP_ENTRIES); + __uint(pinning, LIBBPF_PIN_BY_NAME); + __type(key, struct FourTupleWithChannelId); + __type(value, struct FourTuple); +} turn_server_upstream_map SEC(".maps"); + +// fourtuple stats +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __uint(max_entries, MAX_MAP_ENTRIES); + __uint(pinning, LIBBPF_PIN_BY_NAME); + __type(key, struct FourTuple); + __type(value, struct FourTupleStat); +} turn_server_stats_map SEC(".maps"); + +// interface IP addresses +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __uint(max_entries, MAX_MAP_ENTRIES); + __uint(pinning, LIBBPF_PIN_BY_NAME); + __type(key, __u32); + __type(value, __be32); +} turn_server_interface_ip_addresses_map SEC(".maps"); + +SEC("xdp") +int xdp_prog_func(struct xdp_md *ctx) +{ + void *data_end = (void *)(long)ctx->data_end; + void *data = (void *)(long)ctx->data; + int action = XDP_PASS; + // fib lookup + struct bpf_fib_lookup fib_params = {}; + // parsing + int eth_type, ip_type, udp_payload_len; + struct hdr_cursor nh; + struct ethhdr *eth; + struct iphdr *iphdr; + struct udphdr *udphdr; + __u32 *udp_payload; + // store original values of pkt fields + __be32 orig_saddr, orig_daddr; + __be16 orig_udphdr_len; + // return values + int rc; + long r; + // TURN processing + struct FourTuple *out_tuple = NULL; + struct FourTupleStat *stat; + struct FourTupleStat stat_new; + enum ChanHdrAction chan_hdr_action; + __u32 chan_data_hdr; + __u32 chan_id; + __u16 chan_len; + __u16 padding; + + /* These keep track of the next header type and iterator pointer */ + nh.pos = data; + + eth_type = parse_ethhdr(&nh, data_end, ð); + if (eth_type < 0) { + action = XDP_DROP; + goto out; + } + if (eth_type != bpf_htons(ETH_P_IP)) + goto out; + + ip_type = parse_iphdr(&nh, data_end, &iphdr); + if (ip_type < 0) { + action = XDP_DROP; + goto out; + } + if (ip_type != IPPROTO_UDP) + goto out; + + udp_payload_len = parse_udphdr(&nh, data_end, &udphdr); + if (udp_payload_len < 0) { + action = XDP_DROP; + goto out; + } else if (udp_payload_len > MAX_UDP_SIZE) { + goto out; + } + orig_udphdr_len = udphdr->len; + + // construct four tuple + struct FourTuple in_tuple = {.remote_ip = iphdr->saddr, + .local_ip = iphdr->daddr, + .remote_port = udphdr->source, + .local_port = udphdr->dest}; + + // downstream? + struct FourTupleWithChannelId *out_tuplec_ds; + out_tuplec_ds = bpf_map_lookup_elem(&turn_server_downstream_map, &in_tuple); + if (likely(!out_tuplec_ds)) { + // to overcome the situation of TURN server not knowing its local IP address: + // try lookup '0.0.0.0' + in_tuple.local_ip = 0; + out_tuplec_ds = bpf_map_lookup_elem(&turn_server_downstream_map, &in_tuple); + in_tuple.local_ip = iphdr->daddr; + } + if (out_tuplec_ds) { + chan_id = out_tuplec_ds->channel_id; + // add 4-byte space for the channel ID + r = bpf_xdp_adjust_head(ctx, -4); + if (r != 0) + goto out; + udp_payload_len += 4; + data_end = (void *)(long)ctx->data_end; + data = (void *)(long)ctx->data; + // note: data_end - data is the NIC-padded length of the packet + __u16 pkt_buf_len = data_end - data; + udp_payload = + data + sizeof(struct ethhdr) + sizeof(struct iphdr) + sizeof(struct udphdr); + + // shift headers by -4 bytes (this extend UDP payload by 4 bytes) + int bytes_left = sizeof(struct ethhdr) + sizeof(struct iphdr) + sizeof(struct udphdr); + int hdrs_len = bytes_left; + while (bytes_left > 0) { + __u8 *c = (__u8 *)data + (hdrs_len - bytes_left) + 4; + if (c - 4 < (__u8 *)data) + goto out; + if (bytes_left >= 32) { + if (c + 32 > (__u8 *)data_end) + goto out; + memmove(c - 4, c, 32); + bytes_left -= 32; + } else if (bytes_left >= 16) { + if (c + 16 > (__u8 *)data_end) + goto out; + memmove(c - 4, c, 16); + bytes_left -= 16; + } else if (bytes_left >= 8) { + if (c + 8 > (__u8 *)data_end) + goto out; + memmove(c - 4, c, 8); + bytes_left -= 8; + } else if (bytes_left >= 4) { + if (c + 4 > (__u8 *)data_end) + goto out; + memmove(c - 4, c, 4); + bytes_left -= 4; + } else if (bytes_left >= 2) { + if (c + 2 > (__u8 *)data_end) + goto out; + memmove(c - 4, c, 2); + bytes_left -= 2; + } else if (bytes_left >= 1) { + if (c + 1 > (__u8 *)data_end) + goto out; + memmove(c - 4, c, 1); + bytes_left -= 1; + } else { + break; + } + } + + // write ChannelData header with fields Channel Number and Length + // Details: https://www.rfc-editor.org/rfc/rfc8656.html#section-12.4 + if ((__u8 *)udp_payload + 4 > (__u8 *)data_end) { + goto out; + } + chan_len = (__u16)(udp_payload_len - 4); + udp_payload[0] = bpf_htonl(((__u16)chan_id << 16) | chan_len); + chan_data_hdr = udp_payload[0]; + chan_hdr_action = HDR_ADD; + + // add padding + + // check if new padding is necessary + // e.g., in case of NIC-padded packets we can reuse existing padding + int useful_len = hdrs_len + udp_payload_len; + int existing_padding = pkt_buf_len - useful_len; + + __u16 padded_len = 4 * ((__u16)udp_payload_len / 4); + if (padded_len < udp_payload_len) { + padded_len += 4; + } + padding = padded_len - (__u16)udp_payload_len; + udp_payload_len += padding; + + if ((existing_padding > 0) && (padding != 0)) { + padding -= existing_padding - ((__u32)existing_padding / padding * padding); + if (existing_padding > padding) { + padding = 0; + } + } + + // add padding + r = bpf_xdp_adjust_tail(ctx, padding); + if (r != 0) + goto out; + + // set out_tuple for further processing + out_tuple = &out_tuplec_ds->four_tuple; + } else { + // read channel id + udp_payload = + data + sizeof(struct ethhdr) + sizeof(struct iphdr) + sizeof(struct udphdr); + if ((__u8 *)udp_payload + 4 > (__u8 *)data_end) { + goto out; + } + chan_id = (bpf_ntohl(udp_payload[0]) >> 16) & 0xFFFF; + chan_len = bpf_ntohl(udp_payload[0]); // last 16 bits only + chan_data_hdr = udp_payload[0]; + chan_hdr_action = HDR_REMOVE; + + // upstream? + struct FourTupleWithChannelId in_tuplec_us = {.four_tuple = in_tuple, + .channel_id = chan_id}; + out_tuple = bpf_map_lookup_elem(&turn_server_upstream_map, &in_tuplec_us); + if (!out_tuple) { + // to overcome the situation of TURN server not knowing its local IP address: + // try lookup '0.0.0.0' + in_tuplec_us.four_tuple.local_ip = 0; + out_tuple = bpf_map_lookup_elem(&turn_server_upstream_map, &in_tuplec_us); + } + if (!out_tuple) { + goto out; + } + + // remove channel id + // step1: shift the headers + int bytes_left = sizeof(struct ethhdr) + sizeof(struct iphdr) + sizeof(struct udphdr); + while (bytes_left > 0) { + __u8 *c = (__u8 *)data + bytes_left; + if (bytes_left >= 32) { + memmove(c - 28, c - 32, 32); + bytes_left -= 32; + } else if (bytes_left >= 16) { + memmove(c - 12, c - 16, 16); + bytes_left -= 16; + } else if (bytes_left >= 8) { + memmove(c - 4, c - 8, 8); + bytes_left -= 8; + } else if (bytes_left >= 4) { + memmove(c, c - 4, 4); + bytes_left -= 4; + } else if (bytes_left >= 2) { + memmove(c + 2, c - 2, 2); + bytes_left -= 2; + } else if (bytes_left >= 1) { + memmove(c + 1, c - 3, 1); + bytes_left -= 1; + } else { + break; + } + } + + // step2: trim packet + r = bpf_xdp_adjust_head(ctx, 4); + if (r != 0) + goto out; + udp_payload_len -= 4; + + // remove padding + padding = (__u16)udp_payload_len - chan_len; + if (padding >= 0 && padding <= 3) { + r = bpf_xdp_adjust_tail(ctx, -padding); + if (r != 0) + goto out; + udp_payload_len -= padding; + } else { + goto out; + } + } + + // Update header fields + + // reparse headers to please the verifier + data_end = (void *)(long)ctx->data_end; + data = (void *)(long)ctx->data; + nh.pos = data; + eth_type = parse_ethhdr(&nh, data_end, ð); + if (eth_type != bpf_htons(ETH_P_IP)) { + action = XDP_DROP; + goto out; + } + ip_type = parse_iphdr(&nh, data_end, &iphdr); + if (ip_type != IPPROTO_UDP) { + action = XDP_DROP; + goto out; + } + int orig_udp_data_len = parse_udphdr(&nh, data_end, &udphdr); + if (orig_udp_data_len < 0) { + action = XDP_DROP; + goto out; + } else if (orig_udp_data_len > MAX_UDP_SIZE) { + action = XDP_DROP; + goto out; + } + + // udp_payload_len contains the padded UDP data, + // orig_udp_data_len is the Data length of the incoming UDP packet + short len_diff = udp_payload_len - orig_udp_data_len; + // update IP len: payload + header size + iphdr->tot_len = bpf_htons(bpf_ntohs(iphdr->tot_len) + len_diff); + // update UDP len: payload (data and padding) changes + header size + udphdr->len = bpf_htons(bpf_ntohs(udphdr->len) + len_diff); + + // update IP addresses + orig_saddr = iphdr->saddr; + orig_daddr = iphdr->daddr; + iphdr->saddr = out_tuple->local_ip; + iphdr->daddr = out_tuple->remote_ip; + iphdr->check = 0; + __u64 ip_csum = 0; + ipv4_csum(iphdr, sizeof(*iphdr), &ip_csum); + iphdr->check = ip_csum; + + // update UDP ports and checksum + udphdr->source = out_tuple->local_port; + udphdr->dest = out_tuple->remote_port; + udphdr->check = update_udp_checksum(udphdr->check, in_tuple.local_port, udphdr->source); + udphdr->check = update_udp_checksum(udphdr->check, in_tuple.remote_port, udphdr->dest); + + udphdr->check = update_udp_checksum(udphdr->check, orig_saddr, iphdr->saddr); + udphdr->check = update_udp_checksum(udphdr->check, orig_daddr, iphdr->daddr); + + /* Note: we have to account two changes: + 1 - update of the len field + 2 - addition of new \0 blocks (e.g., padding and chan data) + + To demo this phenomenon with Scapy: + pkt1 = IP()/UDP()/Raw("a") + pkt2 = IP()/UDP()/Raw("a\0") + pkt1.show2() + pkt2.show2() + Checksums: + - pkt1: 0xa06f + - pkt2: 0xa06d + diff: 2 + */ + udphdr->check = update_udp_checksum(udphdr->check, orig_udphdr_len, udphdr->len); + udphdr->check = update_udp_checksum(udphdr->check, orig_udphdr_len, udphdr->len); + + switch (chan_hdr_action) { + case HDR_ADD: + udphdr->check = update_udp_checksum(udphdr->check, 0, chan_data_hdr); + break; + case HDR_REMOVE: + udphdr->check = update_udp_checksum(udphdr->check, chan_data_hdr, 0); + break; + default: + // something has really really gone wrong + action = XDP_DROP; + goto out; + break; + } + + // Send packet + fib_params.family = AF_INET; + fib_params.tos = iphdr->tos; + fib_params.l4_protocol = iphdr->protocol; + fib_params.sport = 0; + fib_params.dport = 0; + fib_params.tot_len = bpf_ntohs(iphdr->tot_len); + fib_params.ipv4_src = iphdr->saddr; + fib_params.ipv4_dst = iphdr->daddr; + + fib_params.ifindex = ctx->ingress_ifindex; + + rc = bpf_fib_lookup(ctx, &fib_params, sizeof(fib_params), 0); + switch (rc) { + case BPF_FIB_LKUP_RET_SUCCESS: /* lookup successful */ + // set eth addrs + memcpy(eth->h_dest, fib_params.dmac, ETH_ALEN); + memcpy(eth->h_source, fib_params.smac, ETH_ALEN); + + // update IP source address with the interface's address + orig_saddr = iphdr->saddr; + __be32 *new_saddr; + new_saddr = bpf_map_lookup_elem(&turn_server_interface_ip_addresses_map, + &fib_params.ifindex); + if (!new_saddr) { + action = XDP_DROP; + goto out; + } + iphdr->saddr = *new_saddr; + + // update ip and udp checksums + iphdr->check = update_udp_checksum(iphdr->check, orig_saddr, iphdr->saddr); + udphdr->check = update_udp_checksum(udphdr->check, orig_saddr, iphdr->saddr); + + // redirect packet + action = bpf_redirect(fib_params.ifindex, 0); + break; + + case BPF_FIB_LKUP_RET_BLACKHOLE: /* dest is blackholed; can be dropped */ + case BPF_FIB_LKUP_RET_UNREACHABLE: /* dest is unreachable; can be dropped */ + case BPF_FIB_LKUP_RET_PROHIBIT: /* dest not allowed; can be dropped */ + action = XDP_DROP; + break; + + case BPF_FIB_LKUP_RET_NOT_FWDED: /* packet is not forwarded */ + case BPF_FIB_LKUP_RET_FWD_DISABLED: /* fwding is not enabled on ingress */ + case BPF_FIB_LKUP_RET_UNSUPP_LWT: /* fwd requires encapsulation */ + case BPF_FIB_LKUP_RET_NO_NEIGH: /* no neighbor entry for nh */ + case BPF_FIB_LKUP_RET_FRAG_NEEDED: /* fragmentation required to fwd */ + break; + } + + // Account sent packet + if (((action == XDP_PASS) || (action == XDP_REDIRECT))) { + stat = bpf_map_lookup_elem(&turn_server_stats_map, out_tuple); + __u64 bytes = data_end - data; + __u64 ts = bpf_ktime_get_ns(); + if (stat) { + stat->pkts += 1; + stat->bytes += bytes; + stat->timestamp_last = ts; + } else { + stat_new.pkts = 1; + stat_new.bytes = bytes; + stat_new.timestamp_last = ts; + bpf_map_update_elem(&turn_server_stats_map, out_tuple, &stat_new, BPF_ANY); + } + } + +out: + return action; +} diff --git a/internal/offload/xdp/xdp.go b/internal/offload/xdp/xdp.go new file mode 100644 index 00000000..a64cfcad --- /dev/null +++ b/internal/offload/xdp/xdp.go @@ -0,0 +1,44 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cflags "-Wall -O2" bpf xdp.c -- -I./headers + +// Package xdp package implements the XDP offload. +package xdp + +import "github.com/cilium/ebpf" + +type ( + // BpfObjects is the exposed internal bpfObjects generated by bpf2go. + // Original description: bpfObjects contains all objects after they have been loaded into the kernel. + BpfObjects = bpfObjects + + // BpfFourTuple holds a 4-tuple: local IP and port, remote IP and port. + // Used by the XDP offload in its eBPF maps. + BpfFourTuple = bpfFourTuple + + // BpfFourTupleWithChannelId holds a 4-tuple and a channel number. + // Used by the XDP offload in its eBPF maps. + // + //nolint:stylecheck,revive + BpfFourTupleWithChannelId = bpfFourTupleWithChannelId + + // BpfFourTupleStat contains statistics (num bytes, num packets) of + // forwarded traffic by 4-tuples. + // Used by the XDP offload in its eBPF maps. + BpfFourTupleStat = bpfFourTupleStat +) + +// BpfMapPinPath is the path where eBPF maps will be pinned +const BpfMapPinPath = "/sys/fs/bpf" + +// IsInitialized is a package local variable +// +//nolint:gochecknoglobals +var IsInitialized bool + +// LoadBpfObjects is the exposed internal loadBpfObjects generated by bpf2go. +// Original description: loadBpfObjects loads bpf and converts it into a struct. +func LoadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { + return loadBpfObjects(obj, opts) +} diff --git a/internal/offload/xdp_test.go b/internal/offload/xdp_test.go new file mode 100644 index 00000000..6f52d992 --- /dev/null +++ b/internal/offload/xdp_test.go @@ -0,0 +1,175 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +//go:build offloadxdp && !js + +package offload + +import ( + "net" + "testing" + "time" + + "github.com/pion/logging" + "github.com/pion/turn/v3/internal/proto" + "github.com/stretchr/testify/assert" +) + +// TestXDPOffload executes XDP offload unit tests on localhost +// +// The tests include forwarding clients ChannelData traffic to peer, and peer UDP traffic to a client. +// +// The test setup on localhost exercising the 'lo' interface: +// client (port 2000) <-> turn listen (3478) <- XDP offload -> turn relay (4000) <-> peer (5000) +// +// The loopback interface tends to generate incorrect UDP checksums, which is fine in most cases. +// However, the XDP offload incrementally updates this checksum resulting errnous checksum on the +// receiver side. This leads to packet drops. To mitigate this issue, tests use raw sockets to send +// pre-crafted packets having correct checksums. +// +// Note: Running these tests require privileged users (e.g., root) to run the XDP offload. +func TestXDPOffload(t *testing.T) { + // timeout of connnection read deadlines + const readTimeoutInterval = 500 * time.Millisecond + + clientAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:2000") + assert.NoError(t, err) + turnListenAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:3478") + assert.NoError(t, err) + turnRelayAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:4000") + assert.NoError(t, err) + peerAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:5000") + assert.NoError(t, err) + + // IP addrs for raw sockets + clientIPAddr, err := net.ResolveIPAddr("ip4", "127.0.0.1") + assert.NoError(t, err) + turnListenIPAddr, err := net.ResolveIPAddr("ip4", "127.0.0.1") + assert.NoError(t, err) + turnRelayIPAddr, err := net.ResolveIPAddr("ip4", "127.0.0.1") + assert.NoError(t, err) + peerIPAddr, err := net.ResolveIPAddr("ip4", "127.0.0.1") + assert.NoError(t, err) + + // Channel Data Header (channel number: 0x4000, message length: 7) + chanDataHdr := []byte{0x40, 0x00, 0x00, 0x07} + // payload is the string 'payload' + payload := []byte{0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64} + + // ChannelData header + 'payload' + padding + chanDataRecv := append(append(chanDataHdr, payload...), 0x00) + + // a UDP header + ChannelData header + 'payload' + padding + chanDataSent := append( + []byte{0x07, 0xd0, 0x0d, 0x96, 0x00, 0x14, 0xef, 0x26}, + append(append(chanDataHdr, payload...), 0x00)...) + + // UDP Data is 'payload' + udpRecv := payload + + // a UDP header + 'payload' + udpSent := append( + []byte{0x13, 0x88, 0x0f, 0xa0, 0x00, 0x0f, 0x21, 0x76}, + payload...) + + // setup client side + clientIPConn, err := net.ListenIP("ip4:udp", clientIPAddr) + assert.NoError(t, err, "cannot create client connection") + defer clientIPConn.Close() //nolint:errcheck + + // setup peer side + peerIPConn, err := net.ListenIP("ip4:udp", peerIPAddr) + assert.NoError(t, err, "cannot create peer connection") + defer peerIPConn.Close() //nolint:errcheck + + clientConn, err := net.ListenUDP("udp", clientAddr) + assert.NoError(t, err, "cannot create client connection") + defer clientConn.Close() //nolint:errcheck + + peerConn, err := net.ListenUDP("udp", peerAddr) + assert.NoError(t, err, "cannot create peer connection") + defer peerConn.Close() //nolint:errcheck + + // init offload + client := Connection{ + LocalAddr: turnListenAddr, + RemoteAddr: clientAddr, + Protocol: proto.ProtoUDP, + ChannelID: 0x4000, + } + peer := Connection{ + LocalAddr: turnRelayAddr, + RemoteAddr: peerAddr, + Protocol: proto.ProtoUDP, + } + + loggerFactory := logging.NewDefaultLoggerFactory() + logger := loggerFactory.NewLogger("xdp-test") + lo, err := net.InterfaceByName("lo") + assert.NoError(t, err, "no interface: 'lo'") + + xdpEngine, err := NewXdpEngine([]net.Interface{*lo}, logger) + assert.NoError(t, err, "cannot instantiate XDP offload engine") + defer xdpEngine.Shutdown() + + err = xdpEngine.Init() + assert.NoError(t, err, "cannot init XDP offload engine") + + t.Run("upsert/remove entries of the eBPF maps", func(t *testing.T) { + c, err := xdpEngine.List() + assert.NoError(t, err, "cannot list XDP offload maps") + assert.Equal(t, 0, len(c), "map should be empty at start") + + assert.NoError(t, xdpEngine.Upsert(client, peer), "cannot upsert new offload") + + c, err = xdpEngine.List() + assert.NoError(t, err, "cannot list XDP offload maps") + assert.Equal(t, 3, len(c), + "maps should have three elements (1 upstream and 2 downstreams)") + + assert.NoError(t, + xdpEngine.Remove(client, peer), + "error in removing peer connection") + + assert.Error(t, + xdpEngine.Remove(client, peer), + "error in removing non-existing peer connection") + }) + + t.Run("pass packet from peer to client", func(t *testing.T) { + assert.NoError(t, xdpEngine.Upsert(client, peer), "cannot upsert new offload") + + received := make([]byte, len(chanDataRecv)) + + _, err := peerIPConn.WriteToIP(udpSent, turnRelayIPAddr) + assert.NoError(t, err, "error in sending peer data") + + err = clientConn.SetReadDeadline(time.Now().Add(readTimeoutInterval)) + assert.NoError(t, err) + n, addr, err := clientConn.ReadFromUDP(received) + assert.NoError(t, err, "error in receiving data on client side") + + assert.Equal(t, chanDataRecv, received, "expect match") + assert.True(t, proto.IsChannelData(received), "expect channel data") + assert.Equal(t, len(chanDataRecv), n, "expect payload length padded to align 4B") + assert.Equal(t, turnListenAddr.String(), addr.String(), "expect server listener address") + }) + + t.Run("pass packet from client to peer", func(t *testing.T) { + assert.NoError(t, xdpEngine.Upsert(client, peer), "cannot upsert new offload") + + received := make([]byte, len(udpRecv)) + + _, err := clientIPConn.WriteToIP(chanDataSent, turnListenIPAddr) + assert.NoError(t, err, "error in sending channel data") + + err = peerConn.SetReadDeadline(time.Now().Add(readTimeoutInterval)) + assert.NoError(t, err) + n, addr, err := peerConn.ReadFromUDP(received) + assert.NoError(t, err, "error in receiving data on client side") + + assert.Equal(t, udpRecv, received, "expect match") + assert.Equal(t, len(payload), n, "expect payload length") + assert.Equal(t, turnRelayAddr.String(), addr.String(), "expect server relay address") + }) +} diff --git a/offload.go b/offload.go new file mode 100644 index 00000000..cc981962 --- /dev/null +++ b/offload.go @@ -0,0 +1,85 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package turn + +import ( + "net" + + "github.com/pion/logging" + "github.com/pion/turn/v3/internal/offload" +) + +// OffloadConfig defines various offload options +type OffloadConfig struct { + // Logger is a leveled logger + Log logging.LeveledLogger + // Mechanisms are the offload mechanisms to be used. First element has the highest priority. + // Available mechanisms are: + // - "xdp": XDP/eBPF offload for UDP traffic + // - "null": no offload + Mechanisms []string + // Interfaces on which to enable offload. Unless set, it is set to all available interfaces + Interfaces []net.Interface +} + +// InitOffload initializes offloading engine (e.g., eBPF kernel offload engine) to speed up networking +func InitOffload(o OffloadConfig) error { + var err error + offload.Engine, err = newEngine(o) + if err != nil { + return err + } + err = offload.Engine.Init() + return err +} + +// newEngine instantiates a new offload engine. It probes strategies until a fitting one is ousable one is found +func newEngine(opt OffloadConfig) (offload.OffloadEngine, error) { + // set defaults + if len(opt.Mechanisms) == 0 { + opt.Mechanisms = []string{"xdp", "null"} + } + if len(opt.Interfaces) == 0 { + ifs, err := net.Interfaces() + if err != nil { + return nil, err + } + opt.Interfaces = ifs + } + // iterate over mechanisms until a working solution is found + var off offload.OffloadEngine + var err error + for _, m := range opt.Mechanisms { + switch m { + case "xdp": + // try XDP/eBPF + off, err = offload.NewXdpEngine(opt.Interfaces, opt.Log) + // in case it is already running, restart it + _, isExist := offload.Engine.(*offload.XdpEngine) + if err != nil && isExist { + offload.Engine.Shutdown() + off, err = offload.NewXdpEngine(opt.Interfaces, opt.Log) + } + case "null": + // no offload + off, err = offload.NewNullEngine(opt.Log) + default: + opt.Log.Error("unsupported mechanism") + off, err = nil, errUnsupportedOffloadMechanism + } + if off != nil && err == nil { + break + } + } + // fallback to no offload + if err != nil { + return offload.NewNullEngine(opt.Log) + } + return off, err +} + +// ShutdownOffload shuts down the offloading engine +func ShutdownOffload() { + offload.Engine.Shutdown() +}