Skip to content
This repository was archived by the owner on Mar 23, 2025. It is now read-only.

Use operation queue instead of lock #315

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions ApolloDeveloperKit.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
objects = {

/* Begin PBXBuildFile section */
5B00A17D266D0132009A4DCD /* HTTPConnectionTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5B00A17C266D0132009A4DCD /* HTTPConnectionTests.swift */; };
5B05890824EB53220071DB57 /* ConsoleDidWriteNotification.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5B05890724EB53220071DB57 /* ConsoleDidWriteNotification.swift */; };
5B05890A24EB5DC60071DB57 /* HTTPRequestMessage.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5B05890924EB5DC60071DB57 /* HTTPRequestMessage.swift */; };
5B05890C24EB5DCF0071DB57 /* HTTPResponseMessage.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5B05890B24EB5DCF0071DB57 /* HTTPResponseMessage.swift */; };
5B0EC95E22B6D728003D7933 /* DebuggableRequestChainNetworkTransportTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5B0EC95D22B6D728003D7933 /* DebuggableRequestChainNetworkTransportTests.swift */; };
5B15067424F149960081B1E8 /* Schema+JSONEncodable.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5B15067324F149960081B1E8 /* Schema+JSONEncodable.swift */; };
5B15067624F14B520081B1E8 /* ErrorLike+Error.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5B15067524F14B520081B1E8 /* ErrorLike+Error.swift */; };
5B16DECD23F45C5300EFEA16 /* MockNetworkTransport.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5B16DECC23F45C5300EFEA16 /* MockNetworkTransport.swift */; };
5B1DFA0E2526377E00594E80 /* AddressInfoErrorTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5B1DFA0D2526377E00594E80 /* AddressInfoErrorTests.swift */; };
5B207730236E098700817E45 /* HTTPChunkedResponse.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5B20772F236E098700817E45 /* HTTPChunkedResponse.swift */; };
5B207732236E0FD800817E45 /* HTTPChunkedResponseTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5B207731236E0FD800817E45 /* HTTPChunkedResponseTests.swift */; };
Expand Down Expand Up @@ -173,6 +173,7 @@
/* End PBXCopyFilesBuildPhase section */

/* Begin PBXFileReference section */
5B00A17C266D0132009A4DCD /* HTTPConnectionTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = HTTPConnectionTests.swift; sourceTree = "<group>"; };
5B05890724EB53220071DB57 /* ConsoleDidWriteNotification.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ConsoleDidWriteNotification.swift; sourceTree = "<group>"; };
5B05890924EB5DC60071DB57 /* HTTPRequestMessage.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = HTTPRequestMessage.swift; sourceTree = "<group>"; };
5B05890B24EB5DCF0071DB57 /* HTTPResponseMessage.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = HTTPResponseMessage.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -522,6 +523,7 @@
children = (
5B1DFA0D2526377E00594E80 /* AddressInfoErrorTests.swift */,
5B207731236E0FD800817E45 /* HTTPChunkedResponseTests.swift */,
5B00A17C266D0132009A4DCD /* HTTPConnectionTests.swift */,
5B3A83AC22C93361002B4FFB /* HTTPServerTests.swift */,
5B71522122DA00CB00002BA6 /* HTTPServerErrorTests.swift */,
5B2E1B2C230CC93C003C85CB /* InterfaceAddressTests.swift */,
Expand Down Expand Up @@ -910,6 +912,7 @@
5BF1E71226682BB80071E99A /* GraphQLResult+AnyGraphQLOperation.swift in Sources */,
5B7AE3942375F38A00EF6D43 /* ApolloDebugServerLoadTests.swift in Sources */,
5B3A837322C6721D002B4FFB /* AnyGraphQLSelectionSetTests.swift in Sources */,
5B00A17D266D0132009A4DCD /* HTTPConnectionTests.swift in Sources */,
5B3A83AD22C93361002B4FFB /* HTTPServerTests.swift in Sources */,
5B2E1B38230CD1D0003C85CB /* sockaddr+Factory.swift in Sources */,
5B2E1B36230CD1BA003C85CB /* ifaddrs+Factory.swift in Sources */,
Expand All @@ -925,7 +928,6 @@
5BA76BC2240ABAB600501193 /* BackgroundTaskTests.swift in Sources */,
5B2E1B29230C2CE0003C85CB /* InterfaceAddressIteratorTests.swift in Sources */,
5B0EC95E22B6D728003D7933 /* DebuggableRequestChainNetworkTransportTests.swift in Sources */,
5B16DECD23F45C5300EFEA16 /* MockNetworkTransport.swift in Sources */,
5B3A839322C72197002B4FFB /* Reference+JSONEncodableTests.swift in Sources */,
5B53B15524FBFD0D00A946D4 /* ApolloDeveloperKitTests.swift in Sources */,
5BCE500E24F4C06400310E26 /* MockFileDescriptorDuplicator.swift in Sources */,
Expand Down
54 changes: 20 additions & 34 deletions Sources/ApolloDeveloperKit/WebServer/HTTPConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ final class HTTPConnection {
weak var delegate: HTTPConnectionDelegate?
private let incomingRequest = HTTPRequestMessage()
private let socket: Socket
private var eventQueue = ArraySlice<Event>()
private let eventQueueLock = NSLock()
private let operationQueue = OperationQueue()

init(httpVersion: String, nativeHandle: CFSocketNativeHandle) throws {
init(httpVersion: String, nativeHandle: CFSocketNativeHandle, queue: DispatchQueue) throws {
self.httpVersion = httpVersion
self.operationQueue.maxConcurrentOperationCount = 1
self.operationQueue.qualityOfService = .userInitiated
self.operationQueue.underlyingQueue = queue
let socket = try Socket(nativeHandle: nativeHandle, callbackTypes: [.dataCallBack, .writeCallBack])
self.socket = socket
socket.isNonBlocking = true
Expand All @@ -48,10 +50,9 @@ final class HTTPConnection {

extension HTTPConnection: HTTPOutputStream {
func write(data: Data) {
eventQueueLock.lock()
eventQueue.append(.write(data))
eventQueueLock.unlock()
tryFlush()
operationQueue.addOperation { [weak self] in
self?.sendOrSuspend(data: data, timeout: 0)
}
}

func writeAndClose(contentsOf url: URL) throws {
Expand All @@ -61,13 +62,13 @@ extension HTTPConnection: HTTPOutputStream {
}

func close() {
eventQueueLock.lock()
eventQueue.append(.close)
eventQueueLock.unlock()
tryFlush()
operationQueue.addOperation { [weak self] in
self?.closeImmediately()
}
}

func closeImmediately() {
operationQueue.cancelAllOperations()
delegate?.httpConnectionWillClose(self)
socket.invalidate()
}
Expand All @@ -84,31 +85,16 @@ extension HTTPConnection: HTTPOutputStream {
close()
}

private func tryFlush() {
eventQueueLock.lock()
switch eventQueue.first {
case .write(let data)?:
if sendOrClose(data: data, timeout: 0) {
eventQueue = eventQueue[eventQueue.startIndex.advanced(by: 1)..<eventQueue.endIndex]
eventQueueLock.unlock()
tryFlush()
} else {
eventQueueLock.unlock()
}
case .close?:
closeImmediately()
eventQueueLock.unlock()
case nil:
eventQueueLock.unlock()
}
}

private func sendOrClose(data: Data, timeout: TimeInterval) -> Bool {
private func sendOrSuspend(data: Data, timeout: TimeInterval) {
do {
return try socket.send(data: data, timeout: timeout)
if try !socket.send(data: data, timeout: timeout) {
operationQueue.isSuspended = true
operationQueue.addOperation { [weak self] in
self?.sendOrSuspend(data: data, timeout: timeout)
}
}
} catch {
closeImmediately()
return false
}
}
}
Expand Down Expand Up @@ -154,6 +140,6 @@ extension HTTPConnection: SocketDelegate {
}

func socketDidBecomeWritable(_ socket: Socket) {
tryFlush()
operationQueue.isSuspended = false
}
}
3 changes: 2 additions & 1 deletion Sources/ApolloDeveloperKit/WebServer/HTTPServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ protocol HTTPServerDelegate: class {
*/
final class HTTPServer {
weak var delegate: HTTPServerDelegate?
private let connectionQueue = DispatchQueue(label: "com.github.manicmaniac.ApolloDeveloperKit.HTTPConnection")
private var socket: Socket?
private var connections = Set<HTTPConnection>()

Expand Down Expand Up @@ -242,7 +243,7 @@ extension HTTPServer: HTTPConnectionDelegate {

extension HTTPServer: SocketDelegate {
func socket(_ socket: Socket, didAccept nativeHandle: CFSocketNativeHandle, address: Data) {
guard let connection = try? HTTPConnection(httpVersion: kCFHTTPVersion1_1 as String, nativeHandle: nativeHandle) else {
guard let connection = try? HTTPConnection(httpVersion: kCFHTTPVersion1_1 as String, nativeHandle: nativeHandle, queue: connectionQueue) else {
return
}
connection.delegate = self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,15 @@ private class MockOperationStore: OperationStore {
return State(mutations: [], queries: [])
}
}

private class MockNetworkTransport: NetworkTransport {
func send<Operation>(operation: Operation, cachePolicy: CachePolicy, contextIdentifier: UUID?, callbackQueue: DispatchQueue, completionHandler: @escaping (Result<GraphQLResult<Operation.Data>, Error>) -> Void) -> Cancellable where Operation : GraphQLOperation {
return MockCancellable()
}
}

private class MockCancellable: Cancellable {
func cancel() {
// Do nothing.
}
}

This file was deleted.

144 changes: 144 additions & 0 deletions Tests/ApolloDeveloperKitTests/WebServer/HTTPConnectionTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//
// HTTPConnectionTests.swift
// ApolloDeveloperKitTests
//
// Created by Ryosuke Ito on 6/6/21.
// Copyright © 2021 Ryosuke Ito. All rights reserved.
//

import XCTest
@testable import ApolloDeveloperKit

class HTTPConnectionTests: XCTestCase {
private let httpVersion = kCFHTTPVersion1_1 as String
private var queue: DispatchQueue!
private var writerFileDescriptor: Int32!
private var readerFileDescriptor: Int32!

override func setUpWithError() throws {
var fileDescriptors = [Int32](repeating: 0, count: 2)
errno = 0
guard socketpair(AF_UNIX, SOCK_STREAM, 0, &fileDescriptors) == 0 else {
throw POSIXError(POSIXErrorCode(rawValue: errno)!)
}
writerFileDescriptor = fileDescriptors[0]
readerFileDescriptor = fileDescriptors[1]
}

override func setUp() {
queue = DispatchQueue(label: "com.github.manicmaniac.ApolloDeveloperKitTests.HTTPConnectionTests")
}

override func tearDownWithError() throws {
for case let handle? in [writerFileDescriptor, readerFileDescriptor] {
errno = 0
guard close(handle) == 0 || errno == EBADF else {
throw POSIXError(POSIXErrorCode(rawValue: errno)!)
}
}
}

func testWrite() throws {
let connection = try HTTPConnection(httpVersion: httpVersion, nativeHandle: writerFileDescriptor, queue: queue)
connection.write(data: httpGetRequestMessage)
connection.close()
let fileHandle = FileHandle(fileDescriptor: readerFileDescriptor)
let data: Data?
if #available(macOS 10.15.4, *, iOS 13.4, *) {
data = try fileHandle.read(upToCount: httpGetRequestMessage.count)
} else {
data = fileHandle.readData(ofLength: httpGetRequestMessage.count)
}
XCTAssertEqual(data, httpGetRequestMessage)
}

func testWrite_threadSafety() throws {
let connection = try HTTPConnection(httpVersion: httpVersion, nativeHandle: writerFileDescriptor, queue: queue)
let iterations = 100
for _ in 0..<iterations {
let thread: Thread
if #available(macOS 10.12, *, iOS 10, *) {
thread = Thread { [unowned self] in
self.writeData(into: connection)
}
} else {
thread = Thread(target: self, selector: #selector(writeData(into:)), object: connection)
}
expectation(forNotification: .NSThreadWillExit, object: thread)
thread.start()
}
waitForExpectations(timeout: 0.5)
connection.close()
let fileHandle = FileHandle(fileDescriptor: readerFileDescriptor)
let expectedBytesCount = httpGetRequestMessage.count * iterations
let data: Data?
if #available(macOS 10.15.4, *, iOS 13.4, *) {
data = try fileHandle.read(upToCount: expectedBytesCount)
} else {
data = fileHandle.readData(ofLength: expectedBytesCount)
}
XCTAssertEqual(data?.count, expectedBytesCount)
}

func testWriteAndClose() throws {
let fileManager = FileManager.default
let temporaryDirectoryURL: URL
if #available(macOS 10.12, *, iOS 10, *) {
temporaryDirectoryURL = fileManager.temporaryDirectory
} else {
temporaryDirectoryURL = URL(fileURLWithPath: NSTemporaryDirectory())
}
let itemReplacementURL = try fileManager.url(for: .itemReplacementDirectory,
in: .userDomainMask,
appropriateFor: temporaryDirectoryURL,
create: true)
addTeardownBlock {
do {
try fileManager.removeItem(at: itemReplacementURL)
} catch let error {
XCTFail(String(describing: error))
}
}
let temporaryFileURL = itemReplacementURL.appendingPathComponent(ProcessInfo().globallyUniqueString)
try httpGetRequestMessage.write(to: temporaryFileURL)
let connection = try HTTPConnection(httpVersion: httpVersion, nativeHandle: writerFileDescriptor, queue: queue)
let fileHandle = FileHandle(fileDescriptor: readerFileDescriptor)
expectation(forNotification: .NSFileHandleReadToEndOfFileCompletion, object: fileHandle) { notification in
let data = notification.userInfo?[NSFileHandleNotificationDataItem] as? Data
XCTAssertEqual(data, httpGetRequestMessage)
return true
}
try connection.writeAndClose(contentsOf: temporaryFileURL)
fileHandle.readToEndOfFileInBackgroundAndNotify()
waitForExpectations(timeout: 0.5)
}

@objc private func writeData(into connection: AnyObject) {
let connection = connection as! HTTPConnection
connection.write(data: httpGetRequestMessage)
}
}

private class HTTPConnectionDelegateHandler: HTTPConnectionDelegate {
var didReceiveRequest: ((HTTPConnection, HTTPRequestMessage) -> Void)?
var willClose: ((HTTPConnection) -> Void)?
var didFailToHandleRequest: ((HTTPConnection, HTTPRequestMessage, Error) -> Void)?

func httpConnection(_ connection: HTTPConnection, didReceive request: HTTPRequestMessage) {
didReceiveRequest?(connection, request)
}

func httpConnectionWillClose(_ connection: HTTPConnection) {
willClose?(connection)
}

func httpConnection(_ connection: HTTPConnection, didFailToHandle request: HTTPRequestMessage, error: Error) {
didFailToHandleRequest?(connection, request, error)
}
}

private let httpGetRequestMessage = """
GET / HTTP/1.1
Host: localhost

""".data(using: .utf8)!