From 7cd24c0efcf9700033f671b6a8eaa64a77dd0b72 Mon Sep 17 00:00:00 2001 From: Johannes Weiss Date: Mon, 18 May 2020 11:37:20 +0100 Subject: [PATCH] WritePCAPHandler: support logging more than 4GiB of data (#85) Motivation: Previously, WritePCAPHandler would crash if more than 4GiB of data were either received or sent through the same instance of the WritePCAPHandler because of a UInt32 overflow representing the TCP sequence/ACK numbers. Modifications: Make TCP sequence/ACK numbers wrap around correctly. Result: - now you can send/receive up to 16 EiB of data :P. - fixes rdar://61887658 --- Sources/NIOExtras/WritePCAPHandler.swift | 30 ++++++----- .../WritePCAPHandlerTest+XCTest.swift | 1 + .../NIOExtrasTests/WritePCAPHandlerTest.swift | 54 +++++++++++++++++-- 3 files changed, 70 insertions(+), 15 deletions(-) diff --git a/Sources/NIOExtras/WritePCAPHandler.swift b/Sources/NIOExtras/WritePCAPHandler.swift index 2d7d3880..936cc9bf 100644 --- a/Sources/NIOExtras/WritePCAPHandler.swift +++ b/Sources/NIOExtras/WritePCAPHandler.swift @@ -42,8 +42,8 @@ struct TCPHeader { } var flags: Flags - var ackNumber: Int? - var sequenceNumber: Int + var ackNumber: UInt32? + var sequenceNumber: UInt32 var srcPort: UInt16 var dstPort: UInt16 } @@ -174,8 +174,8 @@ public class NIOWritePCAPHandler: RemovableChannelHandler { private let maxPayloadSize = Int(UInt16.max - 40 /* needs to fit into the IPv4 header which adds 40 */) private let settings: Settings private var buffer: ByteBuffer! - private var readInboundBytes = 0 - private var writtenOutboundBytes = 0 + private var readInboundBytes: UInt64 = 0 + private var writtenOutboundBytes: UInt64 = 0 private var closeState = CloseState.notClosing private static let fakeLocalAddress = try! SocketAddress(ipAddress: "111.111.111.111", port: 1111) @@ -283,6 +283,10 @@ public class NIOWritePCAPHandler: RemovableChannelHandler { return buffer.readSlice(length: min(buffer.readableBytes, self.maxPayloadSize)) } + + private func sequenceNumber(byteCount: UInt64) -> UInt32 { + return UInt32(byteCount % (UInt64(UInt32.max) + 1)) + } } extension NIOWritePCAPHandler: ChannelDuplexHandler { @@ -349,8 +353,10 @@ extension NIOWritePCAPHandler: ChannelDuplexHandler { do { let closeInitiatorAddress = didLocalInitiateTheClose ? self.localAddress(context: context) : self.remoteAddress(context: context) let closeRecipientAddress = didLocalInitiateTheClose ? self.remoteAddress(context: context) : self.localAddress(context: context) - let initiatorSeq = didLocalInitiateTheClose ? self.writtenOutboundBytes : self.readInboundBytes - let recipientSeq = didLocalInitiateTheClose ? self.readInboundBytes : self.writtenOutboundBytes + let initiatorSeq = self.sequenceNumber(byteCount: didLocalInitiateTheClose ? + self.writtenOutboundBytes : self.readInboundBytes) + let recipientSeq = self.sequenceNumber(byteCount: didLocalInitiateTheClose ? + self.readInboundBytes : self.writtenOutboundBytes) // terminate the connection cleanly try self.buffer.writePCAPRecord(.init(payloadLength: 0, @@ -406,10 +412,10 @@ extension NIOWritePCAPHandler: ChannelDuplexHandler { dst: self.localAddress(context: context), tcp: TCPHeader(flags: [], ackNumber: nil, - sequenceNumber: self.readInboundBytes, + sequenceNumber: self.sequenceNumber(byteCount: self.readInboundBytes), srcPort: .init(self.remoteAddress(context: context).port!), dstPort: .init(self.localAddress(context: context).port!)))) - self.readInboundBytes += payloadToSend.readableBytes + self.readInboundBytes += UInt64(payloadToSend.readableBytes) self.buffer.writeBuffer(&payloadToSend) } assert(data.readableBytes == 0) @@ -431,10 +437,10 @@ extension NIOWritePCAPHandler: ChannelDuplexHandler { dst: self.remoteAddress(context: context), tcp: TCPHeader(flags: [], ackNumber: nil, - sequenceNumber: self.writtenOutboundBytes, + sequenceNumber: self.sequenceNumber(byteCount: self.writtenOutboundBytes), srcPort: .init(self.localAddress(context: context).port!), dstPort: .init(self.remoteAddress(context: context).port!)))) - self.writtenOutboundBytes += payloadToSend.readableBytes + self.writtenOutboundBytes += UInt64(payloadToSend.readableBytes) self.buffer.writeBuffer(&payloadToSend) } self.writeBuffer(self.buffer) @@ -573,8 +579,8 @@ extension ByteBuffer { self.writeInteger(record.tcp.srcPort, as: UInt16.self) self.writeInteger(record.tcp.dstPort, as: UInt16.self) - self.writeInteger(.init(record.tcp.sequenceNumber), as: UInt32.self) // seq no - self.writeInteger(.init(record.tcp.ackNumber ?? 0), as: UInt32.self) // ack no + self.writeInteger(record.tcp.sequenceNumber, as: UInt32.self) // seq no + self.writeInteger(record.tcp.ackNumber ?? 0, as: UInt32.self) // ack no self.writeInteger(5 << 12 | UInt16(record.tcp.flags.rawValue), as: UInt16.self) // data offset + reserved bits + fancy stuff self.writeInteger(.max /* we don't do actual window sizes */, as: UInt16.self) // window size diff --git a/Tests/NIOExtrasTests/WritePCAPHandlerTest+XCTest.swift b/Tests/NIOExtrasTests/WritePCAPHandlerTest+XCTest.swift index b5dd03bd..3321682b 100644 --- a/Tests/NIOExtrasTests/WritePCAPHandlerTest+XCTest.swift +++ b/Tests/NIOExtrasTests/WritePCAPHandlerTest+XCTest.swift @@ -40,6 +40,7 @@ extension WritePCAPHandlerTest { ("testUnflushedOutboundDataIsNotWritten", testUnflushedOutboundDataIsNotWritten), ("testDataWrittenAfterCloseIsDiscarded", testDataWrittenAfterCloseIsDiscarded), ("testUnflushedOutboundDataIsWrittenWhenEmittingWritesOnIssue", testUnflushedOutboundDataIsWrittenWhenEmittingWritesOnIssue), + ("testWeDoNotCrashIfMoreThan4GBOfDataGoThrough", testWeDoNotCrashIfMoreThan4GBOfDataGoThrough), ] } } diff --git a/Tests/NIOExtrasTests/WritePCAPHandlerTest.swift b/Tests/NIOExtrasTests/WritePCAPHandlerTest.swift index 720a2cd0..64d2c7eb 100644 --- a/Tests/NIOExtrasTests/WritePCAPHandlerTest.swift +++ b/Tests/NIOExtrasTests/WritePCAPHandlerTest.swift @@ -640,6 +640,54 @@ class WritePCAPHandlerTest: XCTestCase { XCTAssertNotNil(packet2Bytes?.readPCAPRecord()) } + func testWeDoNotCrashIfMoreThan4GBOfDataGoThrough() { + let channel = EmbeddedChannel() + var numberOfBytesLogged: Int64 = 0 + + final class DropAllChannelReads: ChannelInboundHandler { + typealias InboundIn = ByteBuffer + + func channelRead(context: ChannelHandlerContext, data: NIOAny) {} + } + final class DropAllWritesAndFlushes: ChannelOutboundHandler { + typealias OutboundIn = ByteBuffer + + func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + promise?.succeed(()) + } + + func flush(context: ChannelHandlerContext) {} + } + + // Let's drop all writes/flushes so EmbeddedChannel won't accumulate them. + XCTAssertNoThrow(try channel.pipeline.addHandler(DropAllWritesAndFlushes()).wait()) + XCTAssertNoThrow(try channel.pipeline.addHandler(NIOWritePCAPHandler(mode: .client, + fakeLocalAddress: .init(ipAddress: "::1", port: 1), + fakeRemoteAddress: .init(ipAddress: "::2", port: 2), + fileSink: { + numberOfBytesLogged += Int64($0.readableBytes) + })).wait()) + // Let's also drop all channelReads to prevent accumulation of all the data. + XCTAssertNoThrow(try channel.pipeline.addHandler(DropAllChannelReads()).wait()) + + let chunkSize = Int(UInt16.max - 40 /* needs to fit into the IPv4 header which adds 40 */) + self.scratchBuffer = channel.allocator.buffer(capacity: chunkSize) + self.scratchBuffer.writeString(String(repeating: "X", count: chunkSize)) + + let fourGB: Int64 = 4 * 1024 * 1024 * 1024 + + // Let's send 4 GiB inbound, ... + for _ in 0..<((fourGB / Int64(chunkSize)) + 2) { + XCTAssertNoThrow(try channel.writeInbound(self.scratchBuffer)) + } + // ... and 4 GiB outbound. + for _ in 0..<((fourGB / Int64(chunkSize)) + 2) { + XCTAssertNoThrow(try channel.writeOutbound(self.scratchBuffer)) + } + XCTAssertGreaterThan(numberOfBytesLogged, 2 * (fourGB + 1000)) + XCTAssertNoThrow(XCTAssertTrue(try channel.finish().isClean)) + } + } struct PCAPRecord { @@ -687,8 +735,8 @@ extension ByteBuffer { } return TCPHeader(flags: .init(rawValue: UInt8(flagsAndFriends & 0xfff)), - ackNumber: ackNo == 0 ? nil : Int(ackNo), - sequenceNumber: Int(seqNo), + ackNumber: ackNo == 0 ? nil : ackNo, + sequenceNumber: seqNo, srcPort: srcPort, dstPort: dstPort) } @@ -785,7 +833,7 @@ extension ByteBuffer { assert(lenPacket == lenDisk, "\(lenPacket) != \(lenDisk)") let notImplementedAddress = try! SocketAddress(ipAddress: "9.9.9.9", port: 0xbad) - let tcp = TCPHeader(flags: [], ackNumber: nil, sequenceNumber: -1, srcPort: 0xbad, dstPort: 0xbad) + let tcp = TCPHeader(flags: [], ackNumber: nil, sequenceNumber: 0xbad, srcPort: 0xbad, dstPort: 0xbad) return .init(time: timeval(tv_sec: .init(timeSecs), tv_usec: .init(timeUSecs)), header: try! PCAPRecordHeader(payloadLength: .init(lenPacket), src: notImplementedAddress,