diff --git a/Sources/LCLPing/HTTP/HTTPChannelHandlers.swift b/Sources/LCLPing/HTTP/HTTPChannelHandlers.swift index cb1a517..f7ba2f8 100644 --- a/Sources/LCLPing/HTTP/HTTPChannelHandlers.swift +++ b/Sources/LCLPing/HTTP/HTTPChannelHandlers.swift @@ -54,12 +54,12 @@ internal final class HTTPDuplexer: ChannelDuplexHandler { func channelActive(context: ChannelHandlerContext) { switch self.state { case .operational: - logger.debug("[\(#function)]: Channel already active") + logger.debug("[HTTPDuplexer][\(#function)]: Channel already active") break case .error: - assertionFailure("[\(#function)] in an incorrect state: \(state)") + assertionFailure("[HTTPDuplexer][\(#function)] in an incorrect state: \(state)") case .inactive: - logger.debug("[\(#function)]: Channel active") + logger.debug("[HTTPDuplexer][\(#function)]: Channel active") self.state = .operational } } @@ -68,12 +68,12 @@ internal final class HTTPDuplexer: ChannelDuplexHandler { switch self.state { case .operational: self.state = .inactive - logger.debug("[\(#function)]: Channel inactive") + logger.debug("[HTTPDuplexer][\(#function)]: Channel inactive") case .error: break case .inactive: - logger.error("[\(#function)]: received inactive signal when channel is already in inactive state.") - assertionFailure("[\(#function)]: received inactive signal when channel is already in inactive state.") + logger.error("[HTTPDuplexer][\(#function)]: received inactive signal when channel is already in inactive state.") + assertionFailure("[HTTPDuplexer][\(#function)]: received inactive signal when channel is already in inactive state.") } } @@ -89,8 +89,8 @@ internal final class HTTPDuplexer: ChannelDuplexHandler { header.add(name: "Host", value: host) } - logger.debug("Header is \(header)") - logger.debug("url is \(self.url.absoluteString)") + logger.debug("[HTTPDuplexer]: Header is \(header)") + logger.debug("[HTTPDuplexer]: url is \(self.url.absoluteString)") let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: url.path.isEmpty ? "/" : url.path, headers: header) context.write(self.wrapOutboundOut((sequenceNumber, requestHead)), promise: promise) @@ -99,7 +99,7 @@ internal final class HTTPDuplexer: ChannelDuplexHandler { func channelRead(context: ChannelHandlerContext, data: NIOAny) { let latencyEntry = self.unwrapInboundIn(data) guard self.state.isOperational else { - logger.debug("[\(#function)]: drop data: \(data) because channel is not in operational state") + logger.debug("[HTTPDuplexer][\(#function)]: drop data: \(data) because channel is not in operational state") return } @@ -113,7 +113,7 @@ internal final class HTTPDuplexer: ChannelDuplexHandler { switch statusCode { case 200...299: self.state = .error - fatalError("HTTP Handler in some error state while the status code is \(statusCode). Please report this to the developer") + fatalError("[HTTPDuplexer]: HTTP Handler in some error state while the status code is \(statusCode). Please report this to the developer") case 300...399: context.fireChannelRead(self.wrapInboundOut(.error(latencyEntry.seqNum, PingError.httpRedirect))) case 400...499: @@ -129,12 +129,12 @@ internal final class HTTPDuplexer: ChannelDuplexHandler { } context.channel.close(mode: .all, promise: nil) - logger.debug("[\(#function)]: Closing all channels ... because packet #\(latencyEntry.seqNum) done") + logger.debug("[HTTPDuplexer][\(#function)]: Closing all channels ... because packet #\(latencyEntry.seqNum) done") } func errorCaught(context: ChannelHandlerContext, error: Error) { guard self.state.isOperational else { - logger.debug("already in error state. ignore error \(error)") + logger.debug("[HTTPDuplexer]: already in error state. ignore error \(error)") return } self.state = .error @@ -184,10 +184,10 @@ internal final class HTTPTracingHandler: ChannelDuplexHandler { case .operational: break case .error: - logger.error("[\(#function)]: in an incorrect state: \(self.state)") + logger.error("[HTTPTracingHandler][\(#function)]: in an incorrect state: \(self.state)") assertionFailure("[\(#function)]: in an incorrect state: \(self.state)") case .inactive: - logger.debug("[\(#function)]: Channel active") + logger.debug("[HTTPTracingHandler][\(#function)]: Channel active") context.fireChannelActive() self.state = .operational } @@ -196,21 +196,21 @@ internal final class HTTPTracingHandler: ChannelDuplexHandler { func channelInactive(context: ChannelHandlerContext) { switch self.state { case .operational: - logger.debug("[\(#function)]: Channel inactive") + logger.debug("[HTTPTracingHandler][\(#function)]: Channel inactive") context.fireChannelInactive() self.state = .inactive case .error: break case .inactive: - logger.error("[\(#function)]: received inactive signal when channel is already in inactive state.") - assertionFailure("[\(#function)]: received inactive signal when channel is already in inactive state.") + logger.error("[HTTPTracingHandler][\(#function)]: received inactive signal when channel is already in inactive state.") + assertionFailure("[HTTPTracingHandler][\(#function)]: received inactive signal when channel is already in inactive state.") } } func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { let (sequenceNum, httpRequest) = self.unwrapOutboundIn(data) guard self.state.isOperational else { - logger.error("[\(#function)]: error: IO on closed channel") + logger.error("[HTTPTracingHandler][\(#function)]: error: IO on closed channel") context.fireErrorCaught(ChannelError.ioOnClosedChannel) return } @@ -224,7 +224,7 @@ internal final class HTTPTracingHandler: ChannelDuplexHandler { timerScheduler.schedule(delay: self.configuration.timeout, key: sequenceNum) { [weak self, context] in if let self = self, var le = self.latencyEntry { context.eventLoop.execute { - logger.debug("[\(#function)]: packet #\(le.seqNum) timed out") + logger.debug("[HTTPTracingHandler][\(#function)]: packet #\(le.seqNum) timed out") le.latencyStatus = .timeout context.fireChannelRead(self.wrapInboundOut(le)) return @@ -235,7 +235,7 @@ internal final class HTTPTracingHandler: ChannelDuplexHandler { func channelRead(context: ChannelHandlerContext, data: NIOAny) { guard self.state.isOperational else { - logger.debug("[\(#function)]: drop data: \(data) because channel is not in operational state") + logger.debug("[HTTPTracingHandler][\(#function)]: drop data: \(data) because channel is not in operational state") return } @@ -281,7 +281,7 @@ internal final class HTTPTracingHandler: ChannelDuplexHandler { func errorCaught(context: ChannelHandlerContext, error: Error) { guard self.state.isOperational else { - logger.debug("already in error state. ignore error \(error)") + logger.debug("[HTTPTracingHandler]: already in error state. ignore error \(error)") return } self.state = .error diff --git a/Sources/LCLPing/ICMP/ICMPChannelHandlers.swift b/Sources/LCLPing/ICMP/ICMPChannelHandlers.swift index 3ba62b1..ec5b536 100644 --- a/Sources/LCLPing/ICMP/ICMPChannelHandlers.swift +++ b/Sources/LCLPing/ICMP/ICMPChannelHandlers.swift @@ -21,7 +21,7 @@ internal final class ICMPDuplexer: ChannelDuplexHandler { typealias InboundIn = ICMPHeader typealias InboundOut = PingResponse typealias OutboundIn = ICMPOutboundIn - typealias OutboundOut = ByteBuffer + typealias OutboundOut = AddressedEnvelope private enum State { case operational @@ -63,7 +63,7 @@ internal final class ICMPDuplexer: ChannelDuplexHandler { func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { let (identifier, sequenceNum) = self.unwrapOutboundIn(data) guard self.state.isOperational else { - logger.error("[\(#function)]: Error: IO on closed channel") + logger.error("[ICMPDuplexer][\(#function)]: Error: IO on closed channel") context.fireChannelRead(self.wrapInboundOut(.error(sequenceNum, ChannelError.ioOnClosedChannel))) return } @@ -71,13 +71,23 @@ internal final class ICMPDuplexer: ChannelDuplexHandler { var icmpRequest = ICMPHeader(idenifier: identifier, sequenceNum: sequenceNum) icmpRequest.setChecksum() + let ipAddress: String + switch self.configuration.endpoint { + case .ipv4(let addr, _): + ipAddress = addr + case .ipv6(let addr): + ipAddress = addr + } + let buffer = context.channel.allocator.buffer(bytes: icmpRequest.toData()) - context.writeAndFlush(self.wrapOutboundOut(buffer), promise: promise) + let evelope = AddressedEnvelope(remoteAddress: try! SocketAddress(ipAddress: ipAddress, port: 0), data: buffer) + + context.writeAndFlush(self.wrapOutboundOut(evelope), promise: promise) self.seqToRequest[sequenceNum] = icmpRequest self.timerScheduler.schedule(delay: self.configuration.timeout, key: sequenceNum) { [weak self, context] in if let self = self, !self.seqToResponse.keys.contains(sequenceNum) { - logger.debug("[\(#function)]: packet #\(sequenceNum) timed out") + logger.debug("[ICMPDuplexer][\(#function)]: packet #\(sequenceNum) timed out") self.responseSeqNumSet.insert(sequenceNum) context.eventLoop.execute { context.fireChannelRead(self.wrapInboundOut(.timeout(sequenceNum))) @@ -85,19 +95,19 @@ internal final class ICMPDuplexer: ChannelDuplexHandler { } } } - logger.debug("[\(#function)]: schedule timer for # \(sequenceNum) for \(self.configuration.timeout) second") + logger.debug("[ICMPDuplexer][\(#function)]: schedule timer for # \(sequenceNum) for \(self.configuration.timeout) second") } private func closeWhenComplete(context: ChannelHandlerContext) { if self.seqToRequest.count == self.configuration.count && self.responseSeqNumSet.count == self.configuration.count { - logger.debug("Ping finished. Closing all channels") + logger.debug("[ICMPDuplexer]: Ping finished. Closing all channels") context.close(mode: .all, promise: nil) } } func channelRead(context: ChannelHandlerContext, data: NIOAny) { guard self.state.isOperational else { - logger.debug("[\(#function)]: drop data: \(data) because channel is not in operational state") + logger.debug("[ICMPDuplexer][\(#function)]: drop data: \(data) because channel is not in operational state") return } @@ -107,7 +117,7 @@ internal final class ICMPDuplexer: ChannelDuplexHandler { let code = icmpResponse.code let sequenceNum = icmpResponse.sequenceNum let identifier = icmpResponse.idenifier - logger.debug("[\(#function)]: received icmp response with type: \(type), code: \(code), sequence number: \(sequenceNum), identifier: \(identifier)") + logger.debug("[ICMPDuplexer][\(#function)]: received icmp response with type: \(type), code: \(code), sequence number: \(sequenceNum), identifier: \(identifier)") switch (type, code) { case (ICMPType.EchoReply.rawValue, 0): break @@ -201,7 +211,7 @@ internal final class ICMPDuplexer: ChannelDuplexHandler { if self.responseSeqNumSet.contains(sequenceNum) { let pingResponse: PingResponse = self.seqToResponse[sequenceNum] == nil ? .timeout(sequenceNum) : .duplicated(sequenceNum) - logger.debug("[\(#function)]: response for #\(sequenceNum) is \(self.seqToResponse[sequenceNum] == nil ? "timeout" : "duplicate")") + logger.debug("[ICMPDuplexer][\(#function)]: response for #\(sequenceNum) is \(self.seqToResponse[sequenceNum] == nil ? "timeout" : "duplicate")") context.fireChannelRead(self.wrapInboundOut(pingResponse)) closeWhenComplete(context: context) return @@ -210,7 +220,7 @@ internal final class ICMPDuplexer: ChannelDuplexHandler { self.seqToResponse[sequenceNum] = icmpResponse self.responseSeqNumSet.insert(sequenceNum) guard let icmpRequest = self.seqToRequest[sequenceNum] else { - logger.error("[\(#function)]: Unable to find matching request with sequence number \(sequenceNum)") + logger.error("[ICMPDuplexer][\(#function)]: Unable to find matching request with sequence number \(sequenceNum)") context.fireChannelRead(self.wrapInboundOut(.error(sequenceNum, PingError.invalidICMPResponse))) closeWhenComplete(context: context) return @@ -222,11 +232,13 @@ internal final class ICMPDuplexer: ChannelDuplexHandler { return } + #if canImport(Darwin) if identifier != icmpRequest.idenifier { context.fireChannelRead(self.wrapInboundOut(.error(sequenceNum, PingError.invalidICMPIdentifier))) closeWhenComplete(context: context) return } + #endif let currentTimestamp = Date.currentTimestamp let latency = (currentTimestamp - icmpRequest.payload.timestamp) * 1000 @@ -240,13 +252,13 @@ internal final class ICMPDuplexer: ChannelDuplexHandler { func channelActive(context: ChannelHandlerContext) { switch self.state { case .operational: - logger.debug("[\(#function)]: Channel already active") + logger.debug("[ICMPDuplexer][\(#function)]: Channel already active") break case .error: - logger.error("[\(#function)]: in an incorrect state: \(state)") + logger.error("[ICMPDuplexer][\(#function)]: in an incorrect state: \(state)") assertionFailure("[\(#function)]: in an incorrect state: \(state)") case .inactive: - logger.debug("[\(#function)]: Channel active") + logger.debug("[ICMPDuplexer][\(#function)]: Channel active") context.fireChannelActive() self.state = .operational } @@ -260,18 +272,18 @@ internal final class ICMPDuplexer: ChannelDuplexHandler { self.seqToRequest.removeAll() self.seqToResponse.removeAll() self.timerScheduler.reset() - logger.debug("[\(#function)]: Channel inactive") + logger.debug("[ICMPDuplexer][\(#function)]: Channel inactive") case .error: break case .inactive: - logger.error("[\(#function)] received inactive signal when channel is already in inactive state.") - assertionFailure("[\(#function)] received inactive signal when channel is already in inactive state.") + logger.error("[ICMPDuplexer][\(#function)] received inactive signal when channel is already in inactive state.") + assertionFailure("[ICMPDuplexer][\(#function)] received inactive signal when channel is already in inactive state.") } } func errorCaught(context: ChannelHandlerContext, error: Error) { guard self.state.isOperational else { - logger.debug("already in error state. ignore error \(error)") + logger.debug("[ICMPDuplexer]: already in error state. ignore error \(error)") return } self.state = .error @@ -306,13 +318,13 @@ internal final class IPDecoder: ChannelInboundHandler { func channelActive(context: ChannelHandlerContext) { switch self.state { case .operational: - logger.debug("[\(#function)]: Channel already active") + logger.debug("[IPDecoder][\(#function)]: Channel already active") break case .error: - logger.error("[\(#function)] in an incorrect state: \(state)") - assertionFailure("[\(#function)] in an incorrect state: \(state)") + logger.error("[IPDecoder][\(#function)] in an incorrect state: \(state)") + assertionFailure("[IPDecoder][\(#function)] in an incorrect state: \(state)") case .inactive: - logger.debug("[\(#function)]: Channel active") + logger.debug("[IPDecoder][\(#function)]: Channel active") context.fireChannelActive() self.state = .operational } @@ -321,25 +333,26 @@ internal final class IPDecoder: ChannelInboundHandler { func channelInactive(context: ChannelHandlerContext) { switch self.state { case .operational: - logger.debug("[\(#function)]: Channel inactive") + logger.debug("[IPDecoder][\(#function)]: Channel inactive") context.fireChannelInactive() self.state = .inactive case .error: break case .inactive: - logger.debug("[\(#function)]: received inactive signal when channel is already in inactive state.") - assertionFailure("[\(#function)]: received inactive signal when channel is already in inactive state.") + logger.debug("[IPDecoder][\(#function)]: received inactive signal when channel is already in inactive state.") + assertionFailure("[IPDecoder][\(#function)]: received inactive signal when channel is already in inactive state.") } } func channelRead(context: ChannelHandlerContext, data: NIOAny) { guard self.state.isOperational else { - logger.debug("[\(#function)]: drop data: \(data) because channel is not in operational state") + logger.debug("[IPDecoder][\(#function)]: drop data: \(data) because channel is not in operational state") return } let addressedBuffer = self.unwrapInboundIn(data) var buffer = addressedBuffer.data + #if canImport(Darwin) let ipv4Header: IPv4Header do { ipv4Header = try decodeByteBuffer(of: IPv4Header.self, data: &buffer) @@ -360,12 +373,13 @@ internal final class IPDecoder: ChannelInboundHandler { } let headerLength = (Int(ipv4Header.versionAndHeaderLength) & 0x0F) * sizeof(UInt32.self) buffer.moveReaderIndex(to: headerLength) + #endif // canImport(Darwin) context.fireChannelRead(self.wrapInboundOut(buffer.slice())) } func errorCaught(context: ChannelHandlerContext, error: Error) { guard self.state.isOperational else { - logger.debug("already in error state. ignore error \(error)") + logger.debug("[IPDecoder]: already in error state. ignore error \(error)") return } @@ -398,13 +412,13 @@ internal final class ICMPDecoder: ChannelInboundHandler { func channelActive(context: ChannelHandlerContext) { switch self.state { case .operational: - logger.debug("[\(#function)]: Channel already active") + logger.debug("[ICMPDecoder][\(#function)]: Channel already active") break case .error: - logger.error("[\(#function)] in an incorrect state: \(state)") + logger.error("[ICMPDecoder][\(#function)] in an incorrect state: \(state)") assertionFailure("[\(#function)] in an incorrect state: \(state)") case .inactive: - logger.debug("[\(#function)]: Channel active") + logger.debug("[ICMPDecoder][\(#function)]: Channel active") context.fireChannelActive() self.state = .operational } @@ -413,20 +427,20 @@ internal final class ICMPDecoder: ChannelInboundHandler { func channelInactive(context: ChannelHandlerContext) { switch self.state { case .operational: - logger.debug("[\(#function)]: Channel inactive") + logger.debug("[ICMPDecoder][\(#function)]: Channel inactive") context.fireChannelInactive() self.state = .inactive case .error: break case .inactive: - logger.error("[\(#function)] received inactive signal when channel is already in inactive state.") - assertionFailure("[\(#function)] received inactive signal when channel is already in inactive state.") + logger.error("[ICMPDecoder][\(#function)] received inactive signal when channel is already in inactive state.") + assertionFailure("[ICMPDecoder][\(#function)] received inactive signal when channel is already in inactive state.") } } func channelRead(context: ChannelHandlerContext, data: NIOAny) { guard self.state.isOperational else { - logger.debug("drop data: \(data) because channel is not in operational state") + logger.debug("[ICMPDecoder]: drop data: \(data) because channel is not in operational state") return } @@ -439,12 +453,12 @@ internal final class ICMPDecoder: ChannelInboundHandler { return } context.fireChannelRead(self.wrapInboundOut(icmpResponseHeader)) - logger.debug("[\(#function)] finish decoding icmp header: \(icmpResponseHeader)") + logger.debug("[ICMPDecoder][\(#function)] finish decoding icmp header: \(icmpResponseHeader)") } func errorCaught(context: ChannelHandlerContext, error: Error) { guard self.state.isOperational else { - logger.debug("already in error state. ignore error \(error)") + logger.debug("[ICMPDecoder]: already in error state. ignore error \(error)") return } diff --git a/Tests/ICMPChannelTests/ICMPDuplexerTests.swift b/Tests/ICMPChannelTests/ICMPDuplexerTests.swift index 234d0f1..cec709a 100644 --- a/Tests/ICMPChannelTests/ICMPDuplexerTests.swift +++ b/Tests/ICMPChannelTests/ICMPDuplexerTests.swift @@ -52,9 +52,12 @@ final class ICMPDuplexerTests: XCTestCase { let outboundInData: ICMPOutboundIn = (1,2) try channel.writeOutbound(outboundInData) self.loop.run() - var outboundOutData = try channel.readOutbound(as: ByteBuffer.self) + let outboundOutData = try channel.readOutbound(as: AddressedEnvelope.self) XCTAssertNotNil(outboundOutData) - let sent = try decodeByteBuffer(of: ICMPHeader.self, data: &outboundOutData!) + var data = outboundOutData!.data + let remoteAddr = outboundOutData!.remoteAddress + let sent = try decodeByteBuffer(of: ICMPHeader.self, data: &data) + XCTAssertEqual(remoteAddr, try! SocketAddress(ipAddress: "127.0.0.1", port: 0)) XCTAssertEqual(sent.type, ICMPType.EchoRequest.rawValue) XCTAssertEqual(sent.code, 0) XCTAssertEqual(sent.idenifier, 1) @@ -175,7 +178,9 @@ final class ICMPDuplexerTests: XCTestCase { } } + // On Linux platform, idenfier is unpredictable, we skip if test is run on Linux platform func testInvalidICMPIdentifier() throws { + #if canImport(Darwin) XCTAssertNoThrow(try channel.pipeline.addHandler(ICMPDuplexer(configuration: self.icmpConfiguration)).wait()) channel.pipeline.fireChannelActive() var inboundInData: ICMPHeader = ICMPHeader(type: ICMPType.EchoReply.rawValue, code: 0, idenifier: 0xbeef, sequenceNum: 2) @@ -193,6 +198,7 @@ final class ICMPDuplexerTests: XCTestCase { default: XCTFail("Should receive a PingResponse.error, but received \(inboundInResult)") } + #endif // canImport(Darwin) } func testICMPResponseTimeout() throws { diff --git a/Tests/ICMPChannelTests/IPDecoderTests.swift b/Tests/ICMPChannelTests/IPDecoderTests.swift index 7251204..b49d8a3 100644 --- a/Tests/ICMPChannelTests/IPDecoderTests.swift +++ b/Tests/ICMPChannelTests/IPDecoderTests.swift @@ -52,41 +52,57 @@ final class IPDecoderTests: XCTestCase { func testDecodeValidIPHeader() throws { XCTAssertNoThrow(try sendIPPacket(byteString: "45000054000000003a0157518efad94ec0a80067")) let remaining = try channel.readInbound(as: ByteBuffer.self) + #if canImport(Darwin) XCTAssertEqual(remaining?.readableBytes, 0) + #else // !canImport(Darwin) + XCTAssertEqual(remaining?.readableBytes, 20) + #endif // !canImport(Darwin) } func testDecodeCorrectSliceAfterDecode() throws { XCTAssertNoThrow(try sendIPPacket(byteString: "45000054000000003a0157518efad94ec0a800671122334455667788")) let remaining = try channel.readInbound(as: ByteBuffer.self) + #if canImport(Darwin) XCTAssertEqual(remaining?.readableBytes, 8) + #else // !canImport(Darwin) + XCTAssertEqual(remaining?.readableBytes, 28) + #endif // !canImport(Darwin) } func testDecodeInvalidIPVersion() throws { + #if canImport(Darwin) let expectedError: PingError = .invalidIPVersion XCTAssertThrowsError(try sendIPPacket(byteString: "15000054000000003a0157518efad94ec0a80067")) { error in XCTAssertEqual(error.localizedDescription, expectedError.localizedDescription) } + #endif } func testDecodeInvalidIPProtocol() throws { + #if canImport(Darwin) let expectedError: PingError = .invalidIPProtocol XCTAssertThrowsError(try sendIPPacket(byteString: "45000054000000003a0257518efad94ec0a80067")) { error in XCTAssertEqual(error.localizedDescription, expectedError.localizedDescription) } + #endif } func testDecodeInsufficientByteLength() throws { + #if canImport(Darwin) let expectedError: RuntimeError = .insufficientBytes("Not enough bytes in the reponse message. Need 20 bytes. But received 8") XCTAssertThrowsError(try sendIPPacket(byteString: "4500005400000000")) { error in XCTAssertEqual(error as? RuntimeError, expectedError) } + #endif } func testDecodeEmptyPacket() throws { + #if canImport(Darwin) let expectedError: RuntimeError = .insufficientBytes("Not enough bytes in the reponse message. Need 20 bytes. But received 0") XCTAssertThrowsError(try sendIPPacket(byteString: "")) { error in XCTAssertEqual(error as? RuntimeError, expectedError) } + #endif } }