diff --git a/workers/sniffer_afpacket_linux.go b/workers/sniffer_afpacket_linux.go index 806479c1..d92fd69d 100644 --- a/workers/sniffer_afpacket_linux.go +++ b/workers/sniffer_afpacket_linux.go @@ -108,10 +108,13 @@ func (w *AfpacketSniffer) StartCollect() { // defrag ipv4 go netutils.IPDefragger(fragIP4Chan, udpChan, tcpChan, w.GetConfig().Collectors.AfpacketLiveCapture.Port) + // defrag ipv6 go netutils.IPDefragger(fragIP6Chan, udpChan, tcpChan, w.GetConfig().Collectors.AfpacketLiveCapture.Port) + // tcp assembly go netutils.TCPAssembler(tcpChan, dnsChan, 0) + // udp processor go netutils.UDPProcessor(udpChan, dnsChan, 0) @@ -119,11 +122,10 @@ func (w *AfpacketSniffer) StartCollect() { done := make(chan struct{}) go func(ctx context.Context) { defer func() { - dnsProcessor.Stop() netutils.RemoveBpfFilter(w.fd) syscall.Close(w.fd) w.LogInfo("read data terminated") - defer close(done) + close(done) }() buf := make([]byte, 65536) @@ -133,7 +135,7 @@ func (w *AfpacketSniffer) StartCollect() { select { case <-ctx.Done(): w.LogInfo("stopping sniffer...") - syscall.Close(w.fd) + // syscall.Close(w.fd) return default: var fdSet syscall.FdSet @@ -144,7 +146,7 @@ func (w *AfpacketSniffer) StartCollect() { if errors.Is(err, syscall.EINTR) { continue } - panic(err) + w.LogFatal(pkgconfig.PrefixLogWorker+"["+w.GetName()+"select", err) } if nReady == 0 { continue @@ -227,11 +229,19 @@ func (w *AfpacketSniffer) StartCollect() { // tcp or udp packets ? if packet.TransportLayer().LayerType() == layers.LayerTypeUDP { - udpChan <- packet + select { + case <-ctx.Done(): + return + case udpChan <- packet: + } } if packet.TransportLayer().LayerType() == layers.LayerTypeTCP { - tcpChan <- packet + select { + case <-ctx.Done(): + return + case tcpChan <- packet: + } } } }