Skip to content

Commit

Permalink
feat: Telemetry for operations and HTTP (#735)
Browse files Browse the repository at this point in the history
* chore: add `SelectNoAuthScheme` in test utils

* fix: change `LongAsyncMeasurement` to `Int

* feat: wire telemetry in Orchestrator

* feat: add HTTP client telemetry

* feat: wire `CRTClientEngine` with telemetry

* feat: wire `URLSessionHTTPClient` with telemetry

* feat: add telemetry client codegen

* Fix platform related error.

* Add default value

---------

Co-authored-by: Sichan Yoo <chanyoo@amazon.com>
  • Loading branch information
syall and Sichan Yoo authored Jul 9, 2024
1 parent 38c83b1 commit 9e3ea22
Show file tree
Hide file tree
Showing 22 changed files with 1,036 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public extension DefaultSDKRuntimeConfiguration {
let connectTimeoutMs = httpClientConfiguration.connectTimeout.map { UInt32($0 * 1000) }
let socketTimeout = UInt32(httpClientConfiguration.socketTimeout)
let config = CRTClientEngineConfig(
telemetry: httpClientConfiguration.telemetry ?? CRTClientEngine.noOpCrtClientEngineTelemetry,
connectTimeoutMs: connectTimeoutMs,
crtTlsOptions: httpClientConfiguration.tlsConfiguration as? CRTClientTLSOptions,
socketTimeout: socketTimeout
Expand Down
393 changes: 280 additions & 113 deletions Sources/ClientRuntime/Networking/Http/CRT/CRTClientEngine.swift

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
* SPDX-License-Identifier: Apache-2.0.
*/

import AwsCommonRuntimeKit

struct CRTClientEngineConfig {

/// Max connections the manager can contain per endpoint
Expand All @@ -13,6 +11,9 @@ struct CRTClientEngineConfig {
/// The IO channel window size to use for connections in the connection pool
let windowSize: Int

/// HTTP Client Telemetry
let telemetry: HttpTelemetry

/// The default is true for clients and false for servers.
/// You should not change this default for clients unless
/// you're testing and don't want to fool around with CA trust stores.
Expand All @@ -31,13 +32,15 @@ struct CRTClientEngineConfig {
public init(
maxConnectionsPerEndpoint: Int = 50,
windowSize: Int = 16 * 1024 * 1024,
telemetry: HttpTelemetry = CRTClientEngine.noOpCrtClientEngineTelemetry,
verifyPeer: Bool = true,
connectTimeoutMs: UInt32? = nil,
crtTlsOptions: CRTClientTLSOptions? = nil,
socketTimeout: UInt32? = nil
) {
self.maxConnectionsPerEndpoint = maxConnectionsPerEndpoint
self.windowSize = windowSize
self.telemetry = telemetry
self.verifyPeer = verifyPeer
self.connectTimeoutMs = connectTimeoutMs
self.crtTlsOptions = crtTlsOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//

import enum Smithy.ByteStream
import struct Smithy.Attributes
import AwsCommonRuntimeKit

extension HTTP2Stream {
Expand All @@ -19,13 +20,24 @@ extension HTTP2Stream {
/// There is no recommended size for the data to write. The data will be written in chunks of `manualWriteBufferSize` bytes.
/// - Parameter body: The body to write
/// - Throws: Throws an error if the write fails
func write(body: ByteStream) async throws {
func write(body: ByteStream, telemetry: HttpTelemetry, serverAddress: String) async throws {
let context = telemetry.contextManager.current()
var bytesSentAttributes = Attributes()
bytesSentAttributes.set(key: HttpMetricsAttributesKeys.serverAddress, value: serverAddress)
switch body {
case .data(let data):
try await writeChunk(chunk: data ?? .init(), endOfStream: true)
telemetry.bytesSent.add(
value: data?.count ?? 0,
attributes: bytesSentAttributes,
context: context)
case .stream(let stream):
while let data = try await stream.readAsync(upToCount: manualWriteBufferSize) {
try await writeChunk(chunk: data, endOfStream: false)
telemetry.bytesSent.add(
value: data.count,
attributes: bytesSentAttributes,
context: context)
}
try await writeChunk(chunk: .init(), endOfStream: true)
case .noStream:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class HttpClientConfiguration {
/// Defaults to system's TLS settings if `nil`.
public var tlsConfiguration: (any TLSConfiguration)?

/// HTTP Client Telemetry
public var telemetry: HttpTelemetry?

/// Creates a configuration object for a SDK HTTP client.
///
/// Not all configuration settings may be followed by all clients.
Expand All @@ -58,12 +61,14 @@ public class HttpClientConfiguration {
socketTimeout: TimeInterval = 60.0,
protocolType: URIScheme = .https,
defaultHeaders: Headers = Headers(),
tlsConfiguration: (any TLSConfiguration)? = nil
tlsConfiguration: (any TLSConfiguration)? = nil,
telemetry: HttpTelemetry? = nil
) {
self.socketTimeout = socketTimeout
self.protocolType = protocolType
self.defaultHeaders = defaultHeaders
self.connectTimeout = connectTimeout
self.tlsConfiguration = tlsConfiguration
self.telemetry = telemetry
}
}
169 changes: 169 additions & 0 deletions Sources/ClientRuntime/Networking/Http/HttpTelemetry.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

import protocol SmithyHTTPAPI.HTTPClient
import struct Smithy.Attributes
import struct Smithy.AttributeKey

/// Container for HTTPClient telemetry, including configurable attributes and names.
///
/// Note: This is intended to be used within generated code, not directly.
public class HttpTelemetry {
private static var idleConnectionAttributes: Attributes {
var attributes = Attributes()
attributes.set(key: HttpMetricsAttributesKeys.state, value: ConnectionState.idle)
return attributes
}
private static var acquiredConnectionAttributes: Attributes {
var attributes = Attributes()
attributes.set(key: HttpMetricsAttributesKeys.state, value: ConnectionState.acquired)
return attributes
}
private static var inflightRequestsAttributes: Attributes {
var attributes = Attributes()
attributes.set(key: HttpMetricsAttributesKeys.state, value: RequestState.inflight)
return attributes
}
private static var queuedRequestsAttributes: Attributes {
var attributes = Attributes()
attributes.set(key: HttpMetricsAttributesKeys.state, value: RequestState.queued)
return attributes
}

internal let contextManager: any TelemetryContextManager
internal let tracerProvider: any TracerProvider
internal let loggerProvider: any LoggerProvider

internal let tracerScope: String
internal let tracerAttributes: Attributes?
internal let spanName: String
internal let spanAttributes: Attributes?

internal let connectionsAcquireDuration: any Histogram
private let connectionsLimit: any AsyncMeasurementHandle
private let connectionsUsage: any AsyncMeasurementHandle
internal var httpMetricsUsage: HttpMetricsUsage
internal let connectionsUptime: any Histogram
private let requestsUsage: any AsyncMeasurementHandle
internal let requestsQueuedDuration: any Histogram
internal let bytesSent: any MonotonicCounter
internal let bytesReceived: any MonotonicCounter

public init(
httpScope: String,
telemetryProvider: any TelemetryProvider = DefaultTelemetry.provider,
meterScope: String? = nil,
meterAttributes: Attributes? = nil,
tracerScope: String? = nil,
tracerAttributes: Attributes? = nil,
spanName: String? = nil,
spanAttributes: Attributes? = nil
) {
self.contextManager = telemetryProvider.contextManager
self.tracerProvider = telemetryProvider.tracerProvider
self.loggerProvider = telemetryProvider.loggerProvider
let meterScope: String = meterScope != nil ? meterScope! : httpScope
self.tracerScope = tracerScope != nil ? tracerScope! : httpScope
self.tracerAttributes = tracerAttributes
self.spanName = spanName != nil ? spanName! : "HTTP"
self.spanAttributes = spanAttributes
let httpMetricsUsage = HttpMetricsUsage()
self.httpMetricsUsage = httpMetricsUsage

let meter = telemetryProvider.meterProvider.getMeter(scope: meterScope, attributes: meterAttributes)
self.connectionsAcquireDuration = meter.createHistogram(
name: "smithy.client.http.connections.acquire_duration",
units: "s",
description: "The time it takes a request to acquire a connection")
self.connectionsLimit = meter.createAsyncUpDownCounter(
name: "smithy.client.http.connections.limit",
callback: { handle in
handle.record(
value: httpMetricsUsage.connectionsLimit,
attributes: Attributes(),
context: telemetryProvider.contextManager.current()
)
},
units: "{connection}",
description: "The maximum open connections allowed/configured for the HTTP client")
self.connectionsUsage = meter.createAsyncUpDownCounter(
name: "smithy.client.http.connections.usage",
callback: { handle in
handle.record(
value: httpMetricsUsage.idleConnections,
attributes: HttpTelemetry.idleConnectionAttributes,
context: telemetryProvider.contextManager.current()
)
handle.record(
value: httpMetricsUsage.acquiredConnections,
attributes: HttpTelemetry.acquiredConnectionAttributes,
context: telemetryProvider.contextManager.current()
)
},
units: "{connection}",
description: "Current state of connections pool")
self.connectionsUptime = meter.createHistogram(
name: "smithy.client.http.connections.uptime",
units: "s",
description: "The amount of time a connection has been open")
self.requestsUsage = meter.createAsyncUpDownCounter(
name: "smithy.client.http.requests.usage",
callback: { handle in
handle.record(
value: httpMetricsUsage.idleConnections,
attributes: HttpTelemetry.idleConnectionAttributes,
context: telemetryProvider.contextManager.current()
)
handle.record(
value: httpMetricsUsage.acquiredConnections,
attributes: HttpTelemetry.acquiredConnectionAttributes,
context: telemetryProvider.contextManager.current()
)
},
units: "{request}",
description: "The current state of HTTP client request concurrency")
self.requestsQueuedDuration = meter.createHistogram(
name: "smithy.client.http.requests.queued_duration",
units: "s",
description: "The amount of time a request spent queued waiting to be executed by the HTTP client")
self.bytesSent = meter.createCounter(
name: "smithy.client.http.bytes_sent",
units: "By",
description: "The total number of bytes sent by the HTTP client")
self.bytesReceived = meter.createCounter(
name: "smithy.client.http.bytes_received",
units: "By",
description: "The total number of bytes received by the HTTP client")
}

deinit {
self.connectionsLimit.stop()
self.connectionsUsage.stop()
self.requestsUsage.stop()
}
}

private enum ConnectionState {
fileprivate static let idle = "idle"
fileprivate static let acquired = "acquired"
}

private enum RequestState {
fileprivate static let inflight = "inflight"
fileprivate static let queued = "queued"
}

internal enum HttpMetricsAttributesKeys {
fileprivate static let state = AttributeKey<String>(name: "state")
internal static let serverAddress = AttributeKey<String>(name: "server.address")
}

internal struct HttpMetricsUsage {
public var connectionsLimit: Int = 0
public var idleConnections: Int = 0
public var acquiredConnections: Int = 0
public var inflightRequests: Int = 0
public var queuedRequests: Int = 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import protocol Smithy.LogAgent
import protocol Smithy.ReadableStream
import struct Smithy.Attributes
import func Foundation.CFWriteStreamSetDispatchQueue
import class Foundation.DispatchQueue
import class Foundation.NSObject
Expand Down Expand Up @@ -61,6 +62,12 @@ class FoundationStreamBridge: NSObject, StreamDelegate {
/// A Logger for logging events.
private let logger: LogAgent

/// HTTP Client Telemetry
private let telemetry: HttpTelemetry

/// Server address
private let serverAddress: String

/// Actor used to ensure writes are performed in series, one at a time.
private actor WriteCoordinator {
var task: Task<Void, Error>?
Expand Down Expand Up @@ -119,13 +126,17 @@ class FoundationStreamBridge: NSObject, StreamDelegate {
readableStream: ReadableStream,
bridgeBufferSize: Int = 65_536,
boundStreamBufferSize: Int? = nil,
logger: LogAgent
logger: LogAgent,
telemetry: HttpTelemetry,
serverAddress: String = "unknown"
) {
self.bridgeBufferSize = bridgeBufferSize
self.boundStreamBufferSize = boundStreamBufferSize ?? bridgeBufferSize
self.buffer = Data(capacity: bridgeBufferSize)
self.readableStream = readableStream
self.logger = logger
self.telemetry = telemetry
self.serverAddress = serverAddress
(inputStream, outputStream) = Self.makeStreams(boundStreamBufferSize: self.boundStreamBufferSize, queue: queue)
}

Expand Down Expand Up @@ -272,6 +283,15 @@ class FoundationStreamBridge: NSObject, StreamDelegate {
if writeCount > 0 {
logger.info("FoundationStreamBridge: wrote \(writeCount) bytes to request body")
buffer.removeFirst(writeCount)
// TICK - smithy.client.http.bytes_sent
var attributes = Attributes()
attributes.set(
key: HttpMetricsAttributesKeys.serverAddress,
value: serverAddress)
telemetry.bytesSent.add(
value: writeCount,
attributes: attributes,
context: telemetry.contextManager.current())
}

// Resume the caller now that the write is complete, returning the stream error, if any.
Expand Down
Loading

0 comments on commit 9e3ea22

Please sign in to comment.