Skip to content

Commit

Permalink
WritePCAPHandler: support logging more than 4GiB of data (#85)
Browse files Browse the repository at this point in the history
Motivation:

Previously, WritePCAPHandler would crash if more than 4GiB of data were
either received or sent through the same instance of the
WritePCAPHandler because of a UInt32 overflow representing the TCP
sequence/ACK numbers.

Modifications:

Make TCP sequence/ACK numbers wrap around correctly.

Result:

- now you can send/receive up to 16 EiB of data :P.
- fixes rdar://61887658
  • Loading branch information
weissi authored May 18, 2020
1 parent f21a87d commit 7cd24c0
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 15 deletions.
30 changes: 18 additions & 12 deletions Sources/NIOExtras/WritePCAPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ struct TCPHeader {
}

var flags: Flags
var ackNumber: Int?
var sequenceNumber: Int
var ackNumber: UInt32?
var sequenceNumber: UInt32
var srcPort: UInt16
var dstPort: UInt16
}
Expand Down Expand Up @@ -174,8 +174,8 @@ public class NIOWritePCAPHandler: RemovableChannelHandler {
private let maxPayloadSize = Int(UInt16.max - 40 /* needs to fit into the IPv4 header which adds 40 */)
private let settings: Settings
private var buffer: ByteBuffer!
private var readInboundBytes = 0
private var writtenOutboundBytes = 0
private var readInboundBytes: UInt64 = 0
private var writtenOutboundBytes: UInt64 = 0
private var closeState = CloseState.notClosing

private static let fakeLocalAddress = try! SocketAddress(ipAddress: "111.111.111.111", port: 1111)
Expand Down Expand Up @@ -283,6 +283,10 @@ public class NIOWritePCAPHandler: RemovableChannelHandler {

return buffer.readSlice(length: min(buffer.readableBytes, self.maxPayloadSize))
}

private func sequenceNumber(byteCount: UInt64) -> UInt32 {
return UInt32(byteCount % (UInt64(UInt32.max) + 1))
}
}

extension NIOWritePCAPHandler: ChannelDuplexHandler {
Expand Down Expand Up @@ -349,8 +353,10 @@ extension NIOWritePCAPHandler: ChannelDuplexHandler {
do {
let closeInitiatorAddress = didLocalInitiateTheClose ? self.localAddress(context: context) : self.remoteAddress(context: context)
let closeRecipientAddress = didLocalInitiateTheClose ? self.remoteAddress(context: context) : self.localAddress(context: context)
let initiatorSeq = didLocalInitiateTheClose ? self.writtenOutboundBytes : self.readInboundBytes
let recipientSeq = didLocalInitiateTheClose ? self.readInboundBytes : self.writtenOutboundBytes
let initiatorSeq = self.sequenceNumber(byteCount: didLocalInitiateTheClose ?
self.writtenOutboundBytes : self.readInboundBytes)
let recipientSeq = self.sequenceNumber(byteCount: didLocalInitiateTheClose ?
self.readInboundBytes : self.writtenOutboundBytes)

// terminate the connection cleanly
try self.buffer.writePCAPRecord(.init(payloadLength: 0,
Expand Down Expand Up @@ -406,10 +412,10 @@ extension NIOWritePCAPHandler: ChannelDuplexHandler {
dst: self.localAddress(context: context),
tcp: TCPHeader(flags: [],
ackNumber: nil,
sequenceNumber: self.readInboundBytes,
sequenceNumber: self.sequenceNumber(byteCount: self.readInboundBytes),
srcPort: .init(self.remoteAddress(context: context).port!),
dstPort: .init(self.localAddress(context: context).port!))))
self.readInboundBytes += payloadToSend.readableBytes
self.readInboundBytes += UInt64(payloadToSend.readableBytes)
self.buffer.writeBuffer(&payloadToSend)
}
assert(data.readableBytes == 0)
Expand All @@ -431,10 +437,10 @@ extension NIOWritePCAPHandler: ChannelDuplexHandler {
dst: self.remoteAddress(context: context),
tcp: TCPHeader(flags: [],
ackNumber: nil,
sequenceNumber: self.writtenOutboundBytes,
sequenceNumber: self.sequenceNumber(byteCount: self.writtenOutboundBytes),
srcPort: .init(self.localAddress(context: context).port!),
dstPort: .init(self.remoteAddress(context: context).port!))))
self.writtenOutboundBytes += payloadToSend.readableBytes
self.writtenOutboundBytes += UInt64(payloadToSend.readableBytes)
self.buffer.writeBuffer(&payloadToSend)
}
self.writeBuffer(self.buffer)
Expand Down Expand Up @@ -573,8 +579,8 @@ extension ByteBuffer {
self.writeInteger(record.tcp.srcPort, as: UInt16.self)
self.writeInteger(record.tcp.dstPort, as: UInt16.self)

self.writeInteger(.init(record.tcp.sequenceNumber), as: UInt32.self) // seq no
self.writeInteger(.init(record.tcp.ackNumber ?? 0), as: UInt32.self) // ack no
self.writeInteger(record.tcp.sequenceNumber, as: UInt32.self) // seq no
self.writeInteger(record.tcp.ackNumber ?? 0, as: UInt32.self) // ack no

self.writeInteger(5 << 12 | UInt16(record.tcp.flags.rawValue), as: UInt16.self) // data offset + reserved bits + fancy stuff
self.writeInteger(.max /* we don't do actual window sizes */, as: UInt16.self) // window size
Expand Down
1 change: 1 addition & 0 deletions Tests/NIOExtrasTests/WritePCAPHandlerTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ extension WritePCAPHandlerTest {
("testUnflushedOutboundDataIsNotWritten", testUnflushedOutboundDataIsNotWritten),
("testDataWrittenAfterCloseIsDiscarded", testDataWrittenAfterCloseIsDiscarded),
("testUnflushedOutboundDataIsWrittenWhenEmittingWritesOnIssue", testUnflushedOutboundDataIsWrittenWhenEmittingWritesOnIssue),
("testWeDoNotCrashIfMoreThan4GBOfDataGoThrough", testWeDoNotCrashIfMoreThan4GBOfDataGoThrough),
]
}
}
Expand Down
54 changes: 51 additions & 3 deletions Tests/NIOExtrasTests/WritePCAPHandlerTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,54 @@ class WritePCAPHandlerTest: XCTestCase {
XCTAssertNotNil(packet2Bytes?.readPCAPRecord())
}

func testWeDoNotCrashIfMoreThan4GBOfDataGoThrough() {
let channel = EmbeddedChannel()
var numberOfBytesLogged: Int64 = 0

final class DropAllChannelReads: ChannelInboundHandler {
typealias InboundIn = ByteBuffer

func channelRead(context: ChannelHandlerContext, data: NIOAny) {}
}
final class DropAllWritesAndFlushes: ChannelOutboundHandler {
typealias OutboundIn = ByteBuffer

func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
promise?.succeed(())
}

func flush(context: ChannelHandlerContext) {}
}

// Let's drop all writes/flushes so EmbeddedChannel won't accumulate them.
XCTAssertNoThrow(try channel.pipeline.addHandler(DropAllWritesAndFlushes()).wait())
XCTAssertNoThrow(try channel.pipeline.addHandler(NIOWritePCAPHandler(mode: .client,
fakeLocalAddress: .init(ipAddress: "::1", port: 1),
fakeRemoteAddress: .init(ipAddress: "::2", port: 2),
fileSink: {
numberOfBytesLogged += Int64($0.readableBytes)
})).wait())
// Let's also drop all channelReads to prevent accumulation of all the data.
XCTAssertNoThrow(try channel.pipeline.addHandler(DropAllChannelReads()).wait())

let chunkSize = Int(UInt16.max - 40 /* needs to fit into the IPv4 header which adds 40 */)
self.scratchBuffer = channel.allocator.buffer(capacity: chunkSize)
self.scratchBuffer.writeString(String(repeating: "X", count: chunkSize))

let fourGB: Int64 = 4 * 1024 * 1024 * 1024

// Let's send 4 GiB inbound, ...
for _ in 0..<((fourGB / Int64(chunkSize)) + 2) {
XCTAssertNoThrow(try channel.writeInbound(self.scratchBuffer))
}
// ... and 4 GiB outbound.
for _ in 0..<((fourGB / Int64(chunkSize)) + 2) {
XCTAssertNoThrow(try channel.writeOutbound(self.scratchBuffer))
}
XCTAssertGreaterThan(numberOfBytesLogged, 2 * (fourGB + 1000))
XCTAssertNoThrow(XCTAssertTrue(try channel.finish().isClean))
}

}

struct PCAPRecord {
Expand Down Expand Up @@ -687,8 +735,8 @@ extension ByteBuffer {
}

return TCPHeader(flags: .init(rawValue: UInt8(flagsAndFriends & 0xfff)),
ackNumber: ackNo == 0 ? nil : Int(ackNo),
sequenceNumber: Int(seqNo),
ackNumber: ackNo == 0 ? nil : ackNo,
sequenceNumber: seqNo,
srcPort: srcPort,
dstPort: dstPort)
}
Expand Down Expand Up @@ -785,7 +833,7 @@ extension ByteBuffer {
assert(lenPacket == lenDisk, "\(lenPacket) != \(lenDisk)")

let notImplementedAddress = try! SocketAddress(ipAddress: "9.9.9.9", port: 0xbad)
let tcp = TCPHeader(flags: [], ackNumber: nil, sequenceNumber: -1, srcPort: 0xbad, dstPort: 0xbad)
let tcp = TCPHeader(flags: [], ackNumber: nil, sequenceNumber: 0xbad, srcPort: 0xbad, dstPort: 0xbad)
return .init(time: timeval(tv_sec: .init(timeSecs), tv_usec: .init(timeUSecs)),
header: try! PCAPRecordHeader(payloadLength: .init(lenPacket),
src: notImplementedAddress,
Expand Down

0 comments on commit 7cd24c0

Please sign in to comment.