Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for pcap replay #132

Merged
merged 7 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var logger *zap.Logger
// Flags
var (
cfgFile string
pcapFile string
logLevel string
logFormat string
)
Expand Down Expand Up @@ -118,6 +119,7 @@ func init() {

func initFlags() {
rootCmd.PersistentFlags().StringVarP(&cfgFile, "config", "c", "", "config file")
rootCmd.PersistentFlags().StringVarP(&pcapFile, "pcap", "p", "", "pcap file (optional)")
rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", envOrDefaultString(appLogLevelEnv, "info"), "log level")
rootCmd.PersistentFlags().StringVarP(&logFormat, "log-format", "f", envOrDefaultString(appLogFormatEnv, "console"), "log format")
}
Expand All @@ -133,6 +135,8 @@ func initConfig() {
viper.AddConfigPath("$HOME/.opengfw")
viper.AddConfigPath("/etc/opengfw")
}

viper.SetDefault("replay.realtime", true)
}

func initLogger() {
Expand Down Expand Up @@ -167,6 +171,7 @@ type cliConfig struct {
IO cliConfigIO `mapstructure:"io"`
Workers cliConfigWorkers `mapstructure:"workers"`
Ruleset cliConfigRuleset `mapstructure:"ruleset"`
Replay cliConfigReplay `mapstructure:"replay"`
}

type cliConfigIO struct {
Expand All @@ -177,6 +182,10 @@ type cliConfigIO struct {
RST bool `mapstructure:"rst"`
}

type cliConfigReplay struct {
Realtime bool `mapstructure:"realtime"`
}

type cliConfigWorkers struct {
Count int `mapstructure:"count"`
QueueSize int `mapstructure:"queueSize"`
Expand All @@ -197,17 +206,30 @@ func (c *cliConfig) fillLogger(config *engine.Config) error {
}

func (c *cliConfig) fillIO(config *engine.Config) error {
nfio, err := io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{
QueueSize: c.IO.QueueSize,
ReadBuffer: c.IO.ReadBuffer,
WriteBuffer: c.IO.WriteBuffer,
Local: c.IO.Local,
RST: c.IO.RST,
})
var ioImpl io.PacketIO
var err error
if pcapFile != "" {
// Setup IO for pcap file replay
logger.Info("replaying from pcap file", zap.String("pcap file", pcapFile))
ioImpl, err = io.NewPcapPacketIO(io.PcapPacketIOConfig{
PcapFile: pcapFile,
Realtime: c.Replay.Realtime,
})
} else {
// Setup IO for nfqueue
ioImpl, err = io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{
QueueSize: c.IO.QueueSize,
ReadBuffer: c.IO.ReadBuffer,
WriteBuffer: c.IO.WriteBuffer,
Local: c.IO.Local,
RST: c.IO.RST,
})
}

if err != nil {
return configError{Field: "io", Err: err}
}
config.IO = nfio
config.IO = ioImpl
return nil
}

Expand Down
11 changes: 9 additions & 2 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,17 @@ func (e *engine) UpdateRuleset(r ruleset.Ruleset) error {
}

func (e *engine) Run(ctx context.Context) error {
workerCtx, workerCancel := context.WithCancel(ctx)
defer workerCancel() // Stop workers

// Register IO shutdown
ioCtx, ioCancel := context.WithCancel(ctx)
defer ioCancel() // Stop workers & IO
e.io.SetCancelFunc(ioCancel)
defer ioCancel() // Stop IO

// Start workers
for _, w := range e.workers {
go w.Run(ioCtx)
go w.Run(workerCtx)
}

// Register IO callback
Expand All @@ -85,6 +90,8 @@ func (e *engine) Run(ctx context.Context) error {
return err
case <-ctx.Done():
return nil
case <-ioCtx.Done():
return nil
}
}

Expand Down
3 changes: 3 additions & 0 deletions io/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type PacketIO interface {
ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error)
// Close closes the packet IO.
Close() error
// SetCancelFunc gives packet IO access to context cancel function, enabling it to
// trigger a shutdown
SetCancelFunc(cancelFunc context.CancelFunc) error
}

type ErrInvalidPacket struct {
Expand Down
5 changes: 5 additions & 0 deletions io/nfqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ func (n *nfqueuePacketIO) Close() error {
return n.n.Close()
}

// nfqueue IO does not issue shutdown
func (n *nfqueuePacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error {
return nil
}

func (n *nfqueuePacketIO) setupNft(local, rst, remove bool) error {
rules, err := generateNftRules(local, rst)
if err != nil {
Expand Down
126 changes: 126 additions & 0 deletions io/pcap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package io

import (
"context"
"hash/crc32"
"net"
"sort"
"strings"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/pcap"
)

var _ PacketIO = (*pcapPacketIO)(nil)

type pcapPacketIO struct {
pcap *pcap.Handle
lastTime *time.Time
ioCancel context.CancelFunc
config PcapPacketIOConfig
}

type PcapPacketIOConfig struct {
PcapFile string
Realtime bool
}

func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) {
handle, err := pcap.OpenOffline(config.PcapFile)

if err != nil {
return nil, err
}

return &pcapPacketIO{
pcap: handle,
lastTime: nil,
ioCancel: nil,
config: config,
}, nil
}

func (p *pcapPacketIO) Register(ctx context.Context, cb PacketCallback) error {
go func() {
packetSource := gopacket.NewPacketSource(p.pcap, p.pcap.LinkType())
for packet := range packetSource.Packets() {
p.wait(packet)

networkLayer := packet.NetworkLayer()
if networkLayer != nil {
src, dst := networkLayer.NetworkFlow().Endpoints()
endpoints := []string{src.String(), dst.String()}
sort.Strings(endpoints)
id := crc32.Checksum([]byte(strings.Join(endpoints, ",")), crc32.IEEETable)

cb(&pcapPacket{
streamID: id,
timestamp: packet.Metadata().Timestamp,
data: packet.LinkLayer().LayerPayload(),
}, nil)
}
}
// Give the workers a chance to finish everything
time.Sleep(time.Second)
// Stop the engine when all packets are finished
p.ioCancel()
}()

return nil
}

func (p *pcapPacketIO) ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) {
tobyxdd marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil
}

func (p *pcapPacketIO) SetVerdict(pkt Packet, v Verdict, newPacket []byte) error {
return nil
}

func (p *pcapPacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error {
p.ioCancel = cancelFunc
return nil
}

func (p *pcapPacketIO) Close() error {
tobyxdd marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

// Intentionally slow down the replay
// In realtime mode, this is to match the timestamps in the capture
func (p *pcapPacketIO) wait(packet gopacket.Packet) error {
if !p.config.Realtime {
return nil
}

if p.lastTime == nil {
p.lastTime = &packet.Metadata().Timestamp
} else {
t := packet.Metadata().Timestamp.Sub(*p.lastTime)
time.Sleep(t)
p.lastTime = &packet.Metadata().Timestamp
}

return nil
}

var _ Packet = (*pcapPacket)(nil)

type pcapPacket struct {
streamID uint32
timestamp time.Time
data []byte
}

func (p *pcapPacket) StreamID() uint32 {
return p.streamID
}

func (p *pcapPacket) Timestamp() time.Time {
return p.timestamp
}

func (p *pcapPacket) Data() []byte {
return p.data
}