Skip to content

Commit

Permalink
add more integration tests for icmp and http; fixed errors on http an…
Browse files Browse the repository at this point in the history
…d icmp channel handlers; improve tests
  • Loading branch information
johnnzhou committed Dec 20, 2023
1 parent 234371d commit 6453395
Show file tree
Hide file tree
Showing 16 changed files with 691 additions and 146 deletions.
22 changes: 11 additions & 11 deletions Sources/LCLPing/HTTP/HTTPChannelHandlers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ internal final class HTTPDuplexer: ChannelDuplexHandler {
}

func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let sequenceNumber = self.unwrapOutboundIn(data)
guard self.state.isOperational else {
context.fireChannelRead(self.wrapInboundOut(.error(ChannelError.ioOnClosedChannel)))
context.fireChannelRead(self.wrapInboundOut(.error(sequenceNumber, ChannelError.ioOnClosedChannel)))
return
}

let sequenceNumber = self.unwrapOutboundIn(data)
var header = HTTPHeaders(self.httpOptions.httpHeaders.map { ($0.key, $0.value) })
if !self.httpOptions.httpHeaders.keys.contains("Host"), let host = self.url.host {
header.add(name: "Host", value: host)
Expand Down Expand Up @@ -115,17 +115,17 @@ internal final class HTTPDuplexer: ChannelDuplexHandler {
self.state = .error
fatalError("HTTP Handler in some error state while the status code is \(statusCode). Please report this to the developer")
case 300...399:
context.fireChannelRead(self.wrapInboundOut(.error(PingError.httpRedirect)))
context.fireChannelRead(self.wrapInboundOut(.error(latencyEntry.seqNum, PingError.httpRedirect)))
case 400...499:
context.fireChannelRead(self.wrapInboundOut(.error(PingError.httpClientError)))
context.fireChannelRead(self.wrapInboundOut(.error(latencyEntry.seqNum, PingError.httpClientError)))
case 500...599:
context.fireChannelRead(self.wrapInboundOut(.error(PingError.httpServerError)))
context.fireChannelRead(self.wrapInboundOut(.error(latencyEntry.seqNum, PingError.httpServerError)))
default:
context.fireChannelRead(self.wrapInboundOut(.error(PingError.httpUnknownStatus(statusCode))))
context.fireChannelRead(self.wrapInboundOut(.error(latencyEntry.seqNum, PingError.httpUnknownStatus(statusCode))))
}
case .waiting:
self.state = .error
context.fireChannelRead(self.wrapInboundOut(.error(PingError.invalidLatencyResponseState)))
context.fireChannelRead(self.wrapInboundOut(.error(latencyEntry.seqNum, PingError.invalidLatencyResponseState)))
}

context.channel.close(mode: .all, promise: nil)
Expand All @@ -138,7 +138,7 @@ internal final class HTTPDuplexer: ChannelDuplexHandler {
return
}
self.state = .error
let pingResponse: PingResponse = .error(error)
let pingResponse: PingResponse = .error(nil, error)
context.fireChannelRead(self.wrapInboundOut(pingResponse))
context.channel.close(mode: .all, promise: nil)
}
Expand Down Expand Up @@ -466,13 +466,13 @@ extension HTTPHandler: URLSessionTaskDelegate, URLSessionDataDelegate {
self.continuation?.yield(.timeout(taskToSeqNum[taskIdentifier]!))
default:
logger.debug("task \(taskIdentifier) has error: \(urlError.localizedDescription)")
self.continuation?.yield(.error(urlError))
self.continuation?.yield(.error(UInt16(taskIdentifier), urlError))
}

} else {
// no error, let's check the data received
guard let response = task.response else {
self.continuation?.yield(.error(PingError.httpNoResponse))
self.continuation?.yield(.error(UInt16(taskIdentifier), PingError.httpNoResponse))
logger.debug("request #\(taskIdentifier) doesnt have response")
return
}
Expand All @@ -488,7 +488,7 @@ extension HTTPHandler: URLSessionTaskDelegate, URLSessionDataDelegate {
self.continuation?.yield(.ok(seqNum, latency, Date.currentTimestamp))
}
default:
self.continuation?.yield(.error(PingError.httpRequestFailed(statusCode)))
self.continuation?.yield(.error(UInt16(taskIdentifier), PingError.httpRequestFailed(statusCode)))
}

// completes the async stream
Expand Down
72 changes: 49 additions & 23 deletions Sources/LCLPing/HTTP/HTTPPing.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ internal struct HTTPPing: Pingable {
private var pingSummary: PingSummary?
private let httpOptions: LCLPing.PingConfiguration.HTTPOptions

#if INTEGRATION_TEST
private var networkLinkConfig: TrafficControllerChannelHandler.NetworkLinkConfiguration?
internal init(httpOptions: LCLPing.PingConfiguration.HTTPOptions, networkLinkConfig: TrafficControllerChannelHandler.NetworkLinkConfiguration) {
self.httpOptions = httpOptions
self.networkLinkConfig = networkLinkConfig
}
#endif

internal init(httpOptions: LCLPing.PingConfiguration.HTTPOptions) {
self.httpOptions = httpOptions
}
Expand Down Expand Up @@ -74,6 +82,9 @@ internal struct HTTPPing: Pingable {
}

let httpOptions = self.httpOptions
#if INTEGRATION_TEST
let networkLinkConfig = self.networkLinkConfig
#endif
let enableTLS = schema == "https"
port = enableTLS && port == 80 ? 443 : port
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
Expand All @@ -82,6 +93,7 @@ internal struct HTTPPing: Pingable {
}

let resolvedAddress = try SocketAddress.makeAddressResolvingHost(host, port: Int(port))
logger.debug("resolved address is \(resolvedAddress)")

pingStatus = .running
do {
Expand All @@ -94,34 +106,46 @@ internal struct HTTPPing: Pingable {
group.cancelAll()
return pingResponses
}


logger.debug("added task #\(cnt)")
group.addTask {
let asyncChannel = try await ClientBootstrap(group: eventLoopGroup).connect(to: resolvedAddress) { channel in
channel.eventLoop.makeCompletedFuture {
if enableTLS {
do {
let tlsConfiguration = TLSConfiguration.makeClientConfiguration()
let sslContext = try NIOSSLContext(configuration: tlsConfiguration)
let tlsHandler = try NIOSSLClientHandler(context: sslContext, serverHostname: host)
try channel.pipeline.syncOperations.addHandlers(tlsHandler)
} catch {
throw PingError.httpUnableToEstablishTLSConnection
}
var asyncChannel: NIOAsyncChannel<PingResponse, HTTPOutboundIn>
let channel = try await ClientBootstrap(group: eventLoopGroup).connect(to: resolvedAddress).get()
logger.debug("in event loop: \(channel.eventLoop.inEventLoop)")
asyncChannel = try await channel.eventLoop.submit {
if enableTLS {
do {
let tlsConfiguration = TLSConfiguration.makeClientConfiguration()
let sslContext = try NIOSSLContext(configuration: tlsConfiguration)
let tlsHandler = try NIOSSLClientHandler(context: sslContext, serverHostname: host)
try channel.pipeline.syncOperations.addHandlers(tlsHandler)
} catch {
throw PingError.httpUnableToEstablishTLSConnection
}

try channel.pipeline.syncOperations.addHTTPClientHandlers(position: .last)
try channel.pipeline.syncOperations.addHandlers([HTTPTracingHandler(configuration: pingConfiguration, httpOptions: httpOptions), HTTPDuplexer(url: url, httpOptions: httpOptions, configuration: pingConfiguration)], position: .last)

return try NIOAsyncChannel<PingResponse, HTTPOutboundIn>(wrappingChannelSynchronously: channel)
}
}

#if INTEGRATION_TEST
try channel.pipeline.syncOperations.addHandler(TrafficControllerChannelHandler(networkLinkConfig: networkLinkConfig!))
#endif

try channel.pipeline.syncOperations.addHTTPClientHandlers(position: .last)
try channel.pipeline.syncOperations.addHandlers([HTTPTracingHandler(configuration: pingConfiguration, httpOptions: httpOptions), HTTPDuplexer(url: url, httpOptions: httpOptions, configuration: pingConfiguration)], position: .last)

return try NIOAsyncChannel<PingResponse, HTTPOutboundIn>(wrappingChannelSynchronously: channel)
}.get()
asyncChannel.channel.pipeline.fireChannelActive()

logger.debug("pipeline is: \(asyncChannel.channel.pipeline.debugDescription)")

// Task.sleep respects cooperative cancellation. That is, it will throw a cancellation error and finish early if its current task is cancelled.
// NOTE: Task.sleep respects cooperative cancellation. That is, it will throw a cancellation error and finish early if its current task is cancelled.
try await Task.sleep(nanoseconds: UInt64(cnt) * pingConfiguration.interval.nanosecond)
// logger.trace("write packet #\(cnt)")
logger.debug("write packet #\(cnt)")
let result = try await asyncChannel.executeThenClose { inbound, outbound in
try await outbound.write(cnt)

defer {
outbound.finish()
}

var asyncItr = inbound.makeAsyncIterator()
guard let next = try await asyncItr.next() else {
throw PingError.httpMissingResult
Expand All @@ -132,14 +156,16 @@ internal struct HTTPPing: Pingable {
return result
}
}

logger.debug("now waiting to receive data from the channel")
do {
while pingStatus != .stopped, let next = try await group.next() {
logger.debug("received \(next)")
pingResponses.append(next)
}
} catch is CancellationError {
logger.info("Task is cancelled while waiting")
logger.debug("Task is cancelled while waiting")
} catch {
print("received error: \(error)")
throw error
}

Expand Down
Loading

0 comments on commit 6453395

Please sign in to comment.