Skip to content

Commit

Permalink
refactor: refactor the parsing logic
Browse files Browse the repository at this point in the history
  • Loading branch information
SkuldNorniern committed Nov 16, 2024
1 parent edf54e0 commit 20f7090
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 103 deletions.
177 changes: 112 additions & 65 deletions src/net/parser/fluereflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use log::trace;
use pnet::packet::{
arp::ArpPacket,
ethernet::{EtherTypes, EthernetPacket},
ip::IpNextHeaderProtocols,
ipv4::Ipv4Packet,
ipv6::Ipv6Packet,
udp::UdpPacket,
Expand Down Expand Up @@ -38,113 +39,121 @@ pub fn parse_fluereflow(packet: pcap::Packet) -> Result<(usize, [u8; 9], FluereR

let is_udp: bool = match ethernet_packet_unpack.get_ethertype() {
EtherTypes::Ipv6 => {
let i = Ipv6Packet::new(ethernet_packet_unpack.payload()).unwrap();
if i.payload().is_empty() {
return Err(NetError::EmptyPacket);
}
let is_udp = UdpPacket::new(i.payload()).is_some();

is_udp
let i = match Ipv6Packet::new(ethernet_packet_unpack.payload()) {
Some(packet) => packet,
None => return Err(NetError::InvalidPacket),
};
UdpPacket::new(i.payload()).is_some()
}
EtherTypes::Ipv4 => {
let i = Ipv4Packet::new(ethernet_packet_unpack.payload()).unwrap();
if i.payload().is_empty() {
return Err(NetError::EmptyPacket);
}

let is_udp = UdpPacket::new(i.payload()).is_some();

is_udp
let i = match Ipv4Packet::new(ethernet_packet_unpack.payload()) {
Some(packet) => packet,
None => return Err(NetError::InvalidPacket),
};
UdpPacket::new(i.payload()).is_some()
}
_ => false,
};

let mut decapsulated_data: Option<Vec<u8>> = None;

if is_udp {
let udp_payload = match ethernet_packet_unpack.get_ethertype() {
EtherTypes::Ipv6 => {
let i = Ipv6Packet::new(ethernet_packet_unpack.payload()).unwrap();
if i.payload().is_empty() {
return Err(NetError::EmptyPacket);
let i = match Ipv6Packet::new(ethernet_packet_unpack.payload()) {
Some(packet) => packet,
None => return Err(NetError::InvalidPacket),
};

match UdpPacket::new(i.payload()) {
Some(udp) => {
trace!("UDP payload length: {}", udp.payload().len());
udp.payload().to_vec()
}
None => return Err(NetError::InvalidPacket),
}

UdpPacket::new(i.payload()).unwrap().payload().to_vec()
}
EtherTypes::Ipv4 => {
let i = Ipv4Packet::new(ethernet_packet_unpack.payload()).unwrap();
if i.payload().is_empty() {
return Err(NetError::EmptyPacket);
let i = match Ipv4Packet::new(ethernet_packet_unpack.payload()) {
Some(packet) => packet,
None => return Err(NetError::InvalidPacket),
};

match UdpPacket::new(i.payload()) {
Some(udp) => {
trace!("UDP payload length: {}", udp.payload().len());
udp.payload().to_vec()
}
None => return Err(NetError::InvalidPacket),
}

UdpPacket::new(i.payload()).unwrap().payload().to_vec()
}
_ => Vec::new(),
};
if udp_payload.is_empty() {
return Err(NetError::EmptyPacket);

// Only check if UDP payload is empty when we expect it to have content
if udp_payload.is_empty() && is_udp {
trace!("Empty UDP payload detected");
}
//UdpPacket::new(ethernet_packet_unpack.payload()).unwrap().payload().to_vec();
//println!("UDP payload: {:?}", udp_payload);

decapsulated_data = decapsulate_vxlan(&udp_payload);
}

let ethernet_packet_decapsulated = if let Some(data) = &decapsulated_data {
let ethernet_packet = if let Some(data) = &decapsulated_data {
match EthernetPacket::new(data) {
None => return Err(NetError::EmptyPacket),
None => ethernet_packet_unpack, // Fall back to original packet if decapsulation fails
Some(e) => e,
}
} else {
ethernet_packet_unpack
};

let ethernet_packet = ethernet_packet_decapsulated;

let time = parse_microseconds(
packet.header.ts.tv_sec as u64,
packet.header.ts.tv_usec as u64,
);

let record_result = match ethernet_packet.get_ethertype() {
EtherTypes::Ipv4 => {
let i = Ipv4Packet::new(ethernet_packet.payload()).unwrap();

if i.payload().is_empty() {
return Err(NetError::EmptyPacket);
}

let i = match Ipv4Packet::new(ethernet_packet.payload()) {
Some(packet) => packet,
None => {
trace!("Failed to parse IPv4 packet");
return Err(NetError::InvalidPacket);
}
};
ipv4_packet(time, i)
}
EtherTypes::Ipv6 => {
let i = Ipv6Packet::new(ethernet_packet.payload()).unwrap();
if i.payload().is_empty() {
return Err(NetError::EmptyPacket);
}

let i = match Ipv6Packet::new(ethernet_packet.payload()) {
Some(packet) => packet,
None => {
trace!("Failed to parse IPv6 packet");
return Err(NetError::InvalidPacket);
}
};
ipv6_packet(time, i)
}
EtherTypes::Arp => {
let i = ArpPacket::new(ethernet_packet.payload()).unwrap();
//if i.payload().is_empty() {
// return Err(NetError::EmptyPacket);
//}

let i = match ArpPacket::new(ethernet_packet.payload()) {
Some(packet) => packet,
None => {
trace!("Failed to parse ARP packet");
return Err(NetError::InvalidPacket);
}
};
arp_packet(time, i)
}

_ => {
trace!("Unknown EtherType: {}", ethernet_packet.get_ethertype());
return Err(NetError::UnknownEtherType(
ethernet_packet.get_ethertype().to_string(),
))
));
}
};
}?;

match record_result {
Ok(_) => {}
Err(e) => return Err(e),
}
let (doctets, raw_flags, record) = record_result.unwrap();
Ok((doctets, raw_flags, record))
Ok(record_result)
}

fn arp_packet(time: u64, packet: ArpPacket) -> Result<(usize, [u8; 9], FluereRecord), NetError> {
let src_ip = packet.get_sender_proto_addr();
let dst_ip = packet.get_target_proto_addr();
Expand Down Expand Up @@ -197,16 +206,54 @@ fn ipv4_packet(time: u64, packet: Ipv4Packet) -> Result<(usize, [u8; 9], FluereR
let protocol = packet.get_next_level_protocol().0;
let src_ip = packet.get_source();
let dst_ip = packet.get_destination();
trace!("src_ip: {:?}", src_ip);
trace!("dst_ip: {:?}", dst_ip);

// ports parsing
let (src_port, dst_port) = parse_ports(protocol, packet.payload())?;
// Special handling for DNS over UDP
if packet.get_next_level_protocol() == IpNextHeaderProtocols::Udp {
if let Some(udp) = UdpPacket::new(packet.payload()) {
if udp.get_destination() == 53 || udp.get_source() == 53 {
return Ok((
packet.packet_size(),
[0; 9], // DNS doesn't use TCP flags
FluereRecord::new(
std::net::IpAddr::V4(src_ip),
std::net::IpAddr::V4(dst_ip),
0,
0,
time,
time,
udp.get_source(),
udp.get_destination(),
udp.packet_size() as u32,
udp.packet_size() as u32,
packet.get_ttl(),
packet.get_ttl(),
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
17, // UDP
0, // No TOS for DNS
),
));
}
}
}

// TCP flags Fin Syn Rst Psh Ack Urg Ece Cwr Ns
// Continue with normal packet processing...
let (src_port, dst_port) = parse_ports(protocol, packet.payload()).unwrap_or((0, 0));

// TCP flags
let flags = parse_flags(protocol, packet.payload());

// Autonomous system number of the source and destination, either origin or peer
let doctets = packet.packet_size();
let tos_convert_result = dscp_to_tos(packet.get_dscp());
let tos = tos_convert_result.unwrap_or(0);
Expand Down Expand Up @@ -261,7 +308,7 @@ fn ipv6_packet(time: u64, packet: Ipv6Packet) -> Result<(usize, [u8; 9], FluereR
//first six bits in the 8-bit Traffic Class field
let dscp = packet.get_traffic_class() >> 2;
let tos_convert_result = dscp_to_tos(dscp);
let tos = tos_convert_result.unwrap_or(0);
let tos = tos_convert_result.unwrap_or_default();

Ok((
doctets,
Expand Down
Loading

0 comments on commit 20f7090

Please sign in to comment.