-
Notifications
You must be signed in to change notification settings - Fork 35
feat: implement send function #980
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
fcf90db
4296287
9e86e23
00d2b11
b8df172
172a026
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,20 +7,25 @@ | |
|
|
||
| import AsyncHTTPClient | ||
| import NIOCore | ||
| import NIOHTTP1 | ||
| import NIOPosix | ||
| import NIOSSL | ||
| import struct Smithy.Attributes | ||
| import struct Smithy.SwiftLogger | ||
| import protocol Smithy.LogAgent | ||
| import struct SmithyHTTPAPI.Headers | ||
| import struct SmithyHTTPAPI.Header | ||
| import protocol SmithyHTTPAPI.HTTPClient | ||
| import class SmithyHTTPAPI.HTTPResponse | ||
| import class SmithyHTTPAPI.HTTPRequest | ||
| import enum SmithyHTTPAPI.HTTPStatusCode | ||
| import enum SmithyHTTPAPI.HTTPMethodType | ||
| import protocol Smithy.ReadableStream | ||
| import enum Smithy.ByteStream | ||
| import class SmithyStreams.BufferedStream | ||
| import struct Foundation.Date | ||
| import struct Foundation.URLComponents | ||
| import struct Foundation.URLQueryItem | ||
| import AwsCommonRuntimeKit | ||
|
|
||
| /// AsyncHTTPClient-based HTTP client implementation that conforms to SmithyHTTPAPI.HTTPClient | ||
|
|
@@ -49,7 +54,7 @@ | |
| /// - httpClientConfiguration: The configuration to use for the client's `AsyncHTTPClient` setup. | ||
| public init( | ||
| httpClientConfiguration: HttpClientConfiguration | ||
| ) throws { | ||
| ) { | ||
| self.config = httpClientConfiguration | ||
| self.telemetry = httpClientConfiguration.telemetry ?? NIOHTTPClient.noOpNIOHTTPClientTelemetry | ||
| self.logger = self.telemetry.loggerProvider.getLogger(name: "NIOHTTPClient") | ||
|
|
@@ -60,84 +65,172 @@ | |
|
|
||
| // Configure TLS if options are provided | ||
| if let tlsOptions = tlsConfiguration { | ||
| clientConfig.tlsConfiguration = try tlsOptions.makeNIOSSLConfiguration() | ||
| do { | ||
| clientConfig.tlsConfiguration = try tlsOptions.makeNIOSSLConfiguration() | ||
| } catch { | ||
| // Log TLS configuration error but continue with default TLS settings | ||
| self.logger.error("Failed to configure TLS: \(error). Using default TLS configuration.") | ||
| } | ||
| } | ||
|
|
||
| self.client = AsyncHTTPClient.HTTPClient(configuration: clientConfig) | ||
| } | ||
|
|
||
| deinit { | ||
| try? client.syncShutdown() | ||
| } | ||
|
|
||
| public func send(request: SmithyHTTPAPI.HTTPRequest) async throws -> SmithyHTTPAPI.HTTPResponse { | ||
| let telemetryContext = telemetry.contextManager.current() | ||
| let tracer = telemetry.tracerProvider.getTracer( | ||
| scope: telemetry.tracerScope | ||
| ) | ||
| do { | ||
| // START - smithy.client.http.requests.queued_duration | ||
| let queuedStart = Date().timeIntervalSinceReferenceDate | ||
| let span = tracer.createSpan( | ||
| name: telemetry.spanName, | ||
| initialAttributes: telemetry.spanAttributes, | ||
| spanKind: SpanKind.internal, | ||
| parentContext: telemetryContext) | ||
| defer { | ||
| span.end() | ||
| } | ||
|
|
||
| // START - smithy.client.http.connections.acquire_duration | ||
| let acquireConnectionStart = Date().timeIntervalSinceReferenceDate | ||
| // START - smithy.client.http.requests.queued_duration | ||
| let queuedStart = Date().timeIntervalSinceReferenceDate | ||
| let span = tracer.createSpan( | ||
| name: telemetry.spanName, | ||
| initialAttributes: telemetry.spanAttributes, | ||
| spanKind: SpanKind.internal, | ||
| parentContext: telemetryContext) | ||
| defer { | ||
| span.end() | ||
| } | ||
|
|
||
| // TODO: Convert Smithy HTTPRequest to AsyncHTTPClient HTTPClientRequest | ||
| // START - smithy.client.http.connections.acquire_duration | ||
| let acquireConnectionStart = Date().timeIntervalSinceReferenceDate | ||
|
|
||
| // Convert Smithy HTTPRequest to AsyncHTTPClient HTTPClientRequest | ||
| let nioRequest = try await makeNIORequest(from: request) | ||
|
|
||
| let acquireConnectionEnd = Date().timeIntervalSinceReferenceDate | ||
| telemetry.connectionsAcquireDuration.record( | ||
| value: acquireConnectionEnd - acquireConnectionStart, | ||
| attributes: Attributes(), | ||
| context: telemetryContext) | ||
| // END - smithy.client.http.connections.acquire_duration | ||
|
|
||
| let queuedEnd = acquireConnectionEnd | ||
| telemetry.requestsQueuedDuration.record( | ||
| value: queuedEnd - queuedStart, | ||
| attributes: Attributes(), | ||
| context: telemetryContext) | ||
| // END - smithy.client.http.requests.queued_duration | ||
|
|
||
| // Update connection and request usage metrics | ||
| telemetry.updateHTTPMetricsUsage { httpMetricsUsage in | ||
| // TICK - smithy.client.http.connections.limit | ||
| // Note: AsyncHTTPClient doesn't expose connection pool configuration publicly | ||
| httpMetricsUsage.connectionsLimit = 0 | ||
|
|
||
| // TICK - smithy.client.http.connections.usage | ||
| // Note: AsyncHTTPClient doesn't expose current connection counts | ||
| httpMetricsUsage.acquiredConnections = 0 | ||
| httpMetricsUsage.idleConnections = 0 | ||
|
|
||
| // TICK - smithy.client.http.requests.usage | ||
| httpMetricsUsage.inflightRequests = httpMetricsUsage.acquiredConnections | ||
| httpMetricsUsage.queuedRequests = httpMetricsUsage.idleConnections | ||
| } | ||
|
|
||
| let acquireConnectionEnd = Date().timeIntervalSinceReferenceDate | ||
| telemetry.connectionsAcquireDuration.record( | ||
| value: acquireConnectionEnd - acquireConnectionStart, | ||
| // DURATION - smithy.client.http.connections.uptime | ||
| let connectionUptimeStart = acquireConnectionEnd | ||
| defer { | ||
| telemetry.connectionsUptime.record( | ||
| value: Date().timeIntervalSinceReferenceDate - connectionUptimeStart, | ||
| attributes: Attributes(), | ||
| context: telemetryContext) | ||
| // END - smithy.client.http.connections.acquire_duration | ||
| } | ||
|
|
||
| let queuedEnd = acquireConnectionEnd | ||
| telemetry.requestsQueuedDuration.record( | ||
| value: queuedEnd - queuedStart, | ||
| attributes: Attributes(), | ||
| context: telemetryContext) | ||
| // END - smithy.client.http.requests.queued_duration | ||
| let httpMethod = request.method.rawValue | ||
| let url = request.destination.url | ||
| logger.debug("NIOHTTPClient(\(httpMethod) \(url)) started") | ||
|
Check warning on line 147 in Sources/ClientRuntime/Networking/Http/NIO/NIOHTTPClient.swift
|
||
| logBodyDescription(request.body) | ||
|
|
||
| // TODO: Update connection and request usage metrics based on AsyncHTTPClient configuration | ||
| telemetry.updateHTTPMetricsUsage { httpMetricsUsage in | ||
| // TICK - smithy.client.http.connections.limit | ||
| httpMetricsUsage.connectionsLimit = 0 // TODO: Get from AsyncHTTPClient configuration | ||
| do { | ||
| let timeout: TimeAmount = .seconds(Int64(config.socketTimeout)) | ||
| let nioResponse = try await client.execute(nioRequest, timeout: timeout) | ||
|
|
||
| // Convert NIO response to Smithy HTTPResponse | ||
| let statusCode = HTTPStatusCode(rawValue: Int(nioResponse.status.code)) ?? .insufficientStorage | ||
| var headers = Headers() | ||
| for (name, value) in nioResponse.headers { | ||
| headers.add(name: name, value: value) | ||
| } | ||
|
|
||
| // TICK - smithy.client.http.connections.usage | ||
| httpMetricsUsage.acquiredConnections = 0 // TODO: Get from AsyncHTTPClient | ||
| httpMetricsUsage.idleConnections = 0 // TODO: Get from AsyncHTTPClient | ||
| let body = await NIOHTTPClientStreamBridge.convertResponseBody(from: nioResponse) | ||
|
|
||
| // TICK - smithy.client.http.requests.usage | ||
| httpMetricsUsage.inflightRequests = httpMetricsUsage.acquiredConnections | ||
| httpMetricsUsage.queuedRequests = httpMetricsUsage.idleConnections | ||
| } | ||
| let response = HTTPResponse(headers: headers, body: body, statusCode: statusCode) | ||
| logger.debug("NIOHTTPClient(\(httpMethod) \(url)) succeeded") | ||
|
Check warning on line 164 in Sources/ClientRuntime/Networking/Http/NIO/NIOHTTPClient.swift
|
||
|
|
||
| return response | ||
| } catch { | ||
| logger.error("NIOHTTPClient(\(httpMethod) \(url)) failed with error: \(error)") | ||
|
Check warning on line 168 in Sources/ClientRuntime/Networking/Http/NIO/NIOHTTPClient.swift
|
||
| throw error | ||
| } | ||
| } | ||
|
|
||
| // DURATION - smithy.client.http.connections.uptime | ||
| let connectionUptimeStart = acquireConnectionEnd | ||
| defer { | ||
| telemetry.connectionsUptime.record( | ||
| value: Date().timeIntervalSinceReferenceDate - connectionUptimeStart, | ||
| attributes: Attributes(), | ||
| context: telemetryContext) | ||
| /// Create an AsyncHTTPClient request from a Smithy HTTPRequest | ||
| private func makeNIORequest( | ||
| from request: SmithyHTTPAPI.HTTPRequest | ||
| ) async throws -> AsyncHTTPClient.HTTPClientRequest { | ||
| var components = URLComponents() | ||
| components.scheme = config.protocolType?.rawValue ?? request.destination.scheme.rawValue | ||
| components.host = request.endpoint.uri.host | ||
| components.port = port(for: request) | ||
| components.percentEncodedPath = request.destination.path | ||
| if let queryItems = request.queryItems, !queryItems.isEmpty { | ||
| components.percentEncodedQueryItems = queryItems.map { | ||
| URLQueryItem(name: $0.name, value: $0.value) | ||
| } | ||
| } | ||
| guard let url = components.url else { throw NIOHTTPClientError.incompleteHTTPRequest } | ||
|
|
||
| // TODO: Execute the HTTP request using AsyncHTTPClient | ||
| let method = NIOHTTP1.HTTPMethod(rawValue: request.method.rawValue) | ||
| var nioRequest = AsyncHTTPClient.HTTPClientRequest(url: url.absoluteString) | ||
| nioRequest.method = method | ||
|
|
||
| for header in request.headers.headers + config.defaultHeaders.headers { | ||
|
||
| for value in header.value { | ||
| nioRequest.headers.add(name: header.name, value: value) | ||
| } | ||
| } | ||
|
|
||
| // TODO: Log body description | ||
| nioRequest.body = try await NIOHTTPClientStreamBridge.convertRequestBody( | ||
| from: request.body, | ||
| allocator: allocator | ||
| ) | ||
|
|
||
| // TODO: Handle response | ||
| // TODO: Record bytes sent during request body streaming with server address attributes | ||
| // TODO: Record bytes received during response streaming with server address attributes | ||
| return nioRequest | ||
| } | ||
|
|
||
| // TODO: Convert NIO response to Smithy HTTPResponse | ||
| private func port(for request: SmithyHTTPAPI.HTTPRequest) -> Int? { | ||
| switch (request.destination.scheme, request.destination.port) { | ||
| case (.https, 443), (.http, 80): | ||
| return nil | ||
| default: | ||
| return request.destination.port.map { Int($0) } | ||
| } | ||
| } | ||
|
|
||
| return HTTPResponse() // TODO: Return actual response | ||
| } catch { | ||
| // TODO: Handle catch | ||
| private func logBodyDescription(_ body: ByteStream) { | ||
| switch body { | ||
| case .stream(let stream): | ||
| let lengthString: String | ||
| if let length = stream.length { | ||
| lengthString = "\(length) bytes" | ||
| } else { | ||
| lengthString = "unknown length" | ||
| } | ||
| logger.debug("body is Stream (\(lengthString))") | ||
| case .data(let data): | ||
| if let data { | ||
| logger.debug("body is Data (\(data.count) bytes)") | ||
| } else { | ||
| logger.debug("body is empty") | ||
| } | ||
| case .noStream: | ||
| logger.debug("body is empty") | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol is this really all it takes? Will it also do HTTP/2 bidi streaming out of the box for us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup :) All integration tests are passing for me!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is performance? We should look at the tests where we run huge #s of S3 streaming ops at the same time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll be doing performance testing for the whole implementation prior to merging the feature branch to main. So that answer is incoming! But I dont think we need to hold off on merging this to the feature branch until we get those numbers.