Skip to content

Commit

Permalink
Refactor Channel creation (#377)
Browse files Browse the repository at this point in the history
- The connection creation logic has been refactored into a number of smaller methods that can be combined
- Connection creation now has a logical home. It is moved from `Utils.swift` into a `ConnectionFactory`
- There are explicit `ChannelHandlers` that are used for connection creation:
  - `TLSEventsHandler` got its own file and unit tests
  - `HTTP1ProxyConnectHandler` got its own file and unit tests
  - `SOCKSEventsHandler` got its own file and unit tests
- Some small things are already part of this pr that will get their context later. For example: 
  - `HTTPConnectionPool` is added as a namespace to not cause major renames in follow up PRs
  - `HTTPConnectionPool.Connection.ID` and its generator were added now. (This will be used later to identify a connection during its lifetime)
  - the file `HTTPConnectionPool+Manager` was added to give `HTTPConnectionPool.Connection.ID.Generator` already its final destination.
  • Loading branch information
fabianfett authored Jun 28, 2021
1 parent af837ed commit 8e4d519
Show file tree
Hide file tree
Showing 23 changed files with 1,753 additions and 559 deletions.
29 changes: 22 additions & 7 deletions Sources/AsyncHTTPClient/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import Logging
import NIO
import NIOConcurrencyHelpers
import NIOHTTP1
import NIOHTTPCompression
import NIOSSL
import NIOTLS
import NIOTransportServices
Expand Down Expand Up @@ -86,7 +85,9 @@ final class ConnectionPool {
let provider = HTTP1ConnectionProvider(key: key,
eventLoop: taskEventLoop,
configuration: key.config(overriding: self.configuration),
tlsConfiguration: request.tlsConfiguration,
pool: self,
sslContextCache: self.sslContextCache,
backgroundActivityLogger: self.backgroundActivityLogger)
let enqueued = provider.enqueue()
assert(enqueued)
Expand Down Expand Up @@ -213,6 +214,8 @@ class HTTP1ConnectionProvider {

private let backgroundActivityLogger: Logger

private let factory: HTTPConnectionPool.ConnectionFactory

/// Creates a new `HTTP1ConnectionProvider`
///
/// - parameters:
Expand All @@ -225,7 +228,9 @@ class HTTP1ConnectionProvider {
init(key: ConnectionPool.Key,
eventLoop: EventLoop,
configuration: HTTPClient.Configuration,
tlsConfiguration: TLSConfiguration?,
pool: ConnectionPool,
sslContextCache: SSLContextCache,
backgroundActivityLogger: Logger) {
self.eventLoop = eventLoop
self.configuration = configuration
Expand All @@ -234,6 +239,13 @@ class HTTP1ConnectionProvider {
self.closePromise = eventLoop.makePromise()
self.state = .init(eventLoop: eventLoop)
self.backgroundActivityLogger = backgroundActivityLogger

self.factory = HTTPConnectionPool.ConnectionFactory(
key: self.key,
tlsConfiguration: tlsConfiguration,
clientConfiguration: self.configuration,
sslContextCache: sslContextCache
)
}

deinit {
Expand Down Expand Up @@ -440,12 +452,15 @@ class HTTP1ConnectionProvider {

private func makeChannel(preference: HTTPClient.EventLoopPreference,
logger: Logger) -> EventLoopFuture<Channel> {
return NIOClientTCPBootstrap.makeHTTP1Channel(destination: self.key,
eventLoop: self.eventLoop,
configuration: self.configuration,
sslContextCache: self.pool.sslContextCache,
preference: preference,
logger: logger)
let connectionID = HTTPConnectionPool.Connection.ID.globalGenerator.next()
let eventLoop = preference.bestEventLoop ?? self.eventLoop
let deadline = .now() + self.configuration.timeout.connectionCreationTimeout
return self.factory.makeHTTP1Channel(
connectionID: connectionID,
deadline: deadline,
eventLoop: eventLoop,
logger: logger
)
}

/// A `Waiter` represents a request that waits for a connection when none is
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIO
import NIOHTTP1

final class HTTP1ProxyConnectHandler: ChannelDuplexHandler, RemovableChannelHandler {
typealias OutboundIn = Never
typealias OutboundOut = HTTPClientRequestPart
typealias InboundIn = HTTPClientResponsePart

enum State {
// transitions to `.connectSent` or `.failed`
case initialized
// transitions to `.headReceived` or `.failed`
case connectSent(Scheduled<Void>)
// transitions to `.completed` or `.failed`
case headReceived(Scheduled<Void>)
// final error state
case failed(Error)
// final success state
case completed
}

private var state: State = .initialized

private let targetHost: String
private let targetPort: Int
private let proxyAuthorization: HTTPClient.Authorization?
private let deadline: NIODeadline

private var proxyEstablishedPromise: EventLoopPromise<Void>?
var proxyEstablishedFuture: EventLoopFuture<Void>? {
return self.proxyEstablishedPromise?.futureResult
}

init(targetHost: String,
targetPort: Int,
proxyAuthorization: HTTPClient.Authorization?,
deadline: NIODeadline) {
self.targetHost = targetHost
self.targetPort = targetPort
self.proxyAuthorization = proxyAuthorization
self.deadline = deadline
}

func handlerAdded(context: ChannelHandlerContext) {
self.proxyEstablishedPromise = context.eventLoop.makePromise(of: Void.self)

self.sendConnect(context: context)
}

func handlerRemoved(context: ChannelHandlerContext) {
switch self.state {
case .failed, .completed:
break
case .initialized, .connectSent, .headReceived:
struct NoResult: Error {}
self.state = .failed(NoResult())
self.proxyEstablishedPromise?.fail(NoResult())
}
}

func channelActive(context: ChannelHandlerContext) {
self.sendConnect(context: context)
}

func channelInactive(context: ChannelHandlerContext) {
switch self.state {
case .initialized:
preconditionFailure("How can we receive a channelInactive before a channelActive?")
case .connectSent(let timeout), .headReceived(let timeout):
timeout.cancel()
self.failWithError(HTTPClientError.remoteConnectionClosed, context: context, closeConnection: false)

case .failed, .completed:
break
}
}

func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
preconditionFailure("We don't support outgoing traffic during HTTP Proxy update.")
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
switch self.unwrapInboundIn(data) {
case .head(let head):
self.handleHTTPHeadReceived(head, context: context)
case .body:
self.handleHTTPBodyReceived(context: context)
case .end:
self.handleHTTPEndReceived(context: context)
}
}

private func sendConnect(context: ChannelHandlerContext) {
guard case .initialized = self.state else {
// we might run into this handler twice, once in handlerAdded and once in channelActive.
return
}

let timeout = context.eventLoop.scheduleTask(deadline: self.deadline) {
switch self.state {
case .initialized:
preconditionFailure("How can we have a scheduled timeout, if the connection is not even up?")

case .connectSent, .headReceived:
self.failWithError(HTTPClientError.httpProxyHandshakeTimeout, context: context)

case .failed, .completed:
break
}
}

self.state = .connectSent(timeout)

var head = HTTPRequestHead(
version: .init(major: 1, minor: 1),
method: .CONNECT,
uri: "\(self.targetHost):\(self.targetPort)"
)
if let authorization = self.proxyAuthorization {
head.headers.replaceOrAdd(name: "proxy-authorization", value: authorization.headerValue)
}
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
context.flush()
}

private func handleHTTPHeadReceived(_ head: HTTPResponseHead, context: ChannelHandlerContext) {
guard case .connectSent(let scheduled) = self.state else {
preconditionFailure("HTTPDecoder should throw an error, if we have not send a request")
}

switch head.status.code {
case 200..<300:
// Any 2xx (Successful) response indicates that the sender (and all
// inbound proxies) will switch to tunnel mode immediately after the
// blank line that concludes the successful response's header section
self.state = .headReceived(scheduled)
case 407:
self.failWithError(HTTPClientError.proxyAuthenticationRequired, context: context)

default:
// Any response other than a successful response indicates that the tunnel
// has not yet been formed and that the connection remains governed by HTTP.
self.failWithError(HTTPClientError.invalidProxyResponse, context: context)
}
}

private func handleHTTPBodyReceived(context: ChannelHandlerContext) {
switch self.state {
case .headReceived(let timeout):
timeout.cancel()
// we don't expect a body
self.failWithError(HTTPClientError.invalidProxyResponse, context: context)
case .failed:
// ran into an error before... ignore this one
break
case .completed, .connectSent, .initialized:
preconditionFailure("Invalid state: \(self.state)")
}
}

private func handleHTTPEndReceived(context: ChannelHandlerContext) {
switch self.state {
case .headReceived(let timeout):
timeout.cancel()
self.state = .completed
self.proxyEstablishedPromise?.succeed(())

case .failed:
// ran into an error before... ignore this one
break
case .initialized, .connectSent, .completed:
preconditionFailure("Invalid state: \(self.state)")
}
}

private func failWithError(_ error: Error, context: ChannelHandlerContext, closeConnection: Bool = true) {
self.state = .failed(error)
self.proxyEstablishedPromise?.fail(error)
context.fireErrorCaught(error)
if closeConnection {
context.close(mode: .all, promise: nil)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIO
import NIOSOCKS

final class SOCKSEventsHandler: ChannelInboundHandler, RemovableChannelHandler {
typealias InboundIn = NIOAny

enum State {
// transitions to channelActive or failed
case initialized
// transitions to socksEstablished or failed
case channelActive(Scheduled<Void>)
// final success state
case socksEstablished
// final success state
case failed(Error)
}

private var socksEstablishedPromise: EventLoopPromise<Void>?
var socksEstablishedFuture: EventLoopFuture<Void>? {
return self.socksEstablishedPromise?.futureResult
}

private let deadline: NIODeadline
private var state: State = .initialized

init(deadline: NIODeadline) {
self.deadline = deadline
}

func handlerAdded(context: ChannelHandlerContext) {
self.socksEstablishedPromise = context.eventLoop.makePromise(of: Void.self)

if context.channel.isActive {
self.connectionStarted(context: context)
}
}

func handlerRemoved(context: ChannelHandlerContext) {
struct NoResult: Error {}
self.socksEstablishedPromise!.fail(NoResult())
}

func channelActive(context: ChannelHandlerContext) {
self.connectionStarted(context: context)
}

func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
guard event is SOCKSProxyEstablishedEvent else {
return context.fireUserInboundEventTriggered(event)
}

switch self.state {
case .initialized:
preconditionFailure("How can we establish a SOCKS connection, if we are not connected?")
case .socksEstablished:
preconditionFailure("`SOCKSProxyEstablishedEvent` must only be fired once.")
case .channelActive(let scheduled):
self.state = .socksEstablished
scheduled.cancel()
self.socksEstablishedPromise?.succeed(())
context.fireUserInboundEventTriggered(event)
case .failed:
// potentially a race with the timeout...
break
}
}

func errorCaught(context: ChannelHandlerContext, error: Error) {
switch self.state {
case .initialized:
self.state = .failed(error)
self.socksEstablishedPromise?.fail(error)
case .channelActive(let scheduled):
scheduled.cancel()
self.state = .failed(error)
self.socksEstablishedPromise?.fail(error)
case .socksEstablished, .failed:
break
}
context.fireErrorCaught(error)
}

private func connectionStarted(context: ChannelHandlerContext) {
guard case .initialized = self.state else {
return
}

let scheduled = context.eventLoop.scheduleTask(deadline: self.deadline) {
switch self.state {
case .initialized, .channelActive:
// close the connection, if the handshake timed out
context.close(mode: .all, promise: nil)
let error = HTTPClientError.socksHandshakeTimeout
self.state = .failed(error)
self.socksEstablishedPromise?.fail(error)
case .failed, .socksEstablished:
break
}
}

self.state = .channelActive(scheduled)
}
}
Loading

0 comments on commit 8e4d519

Please sign in to comment.