Skip to content

Commit

Permalink
Partial revert "KTOR-5199 re-run multi perform loop to schedule reque…
Browse files Browse the repository at this point in the history
…sts" (commit 6438787) but leaves test to fix HTTP timeout tests
  • Loading branch information
whyoleg committed Feb 7, 2025
1 parent c9ca137 commit ef6380a
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ internal class RequestContainer(
val completionHandler: CompletableDeferred<CurlSuccess>
)

/**
* A class responsible for processing requests asynchronously.
*
* It holds a dispatcher interacting with curl multi interface API,
* which requires API calls from single thread.
*/
internal class CurlProcessor(coroutineContext: CoroutineContext) {
@OptIn(InternalAPI::class)
private val curlDispatcher: CloseableCoroutineDispatcher =
Expand All @@ -34,7 +28,6 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) {

private val curlScope = CoroutineScope(coroutineContext + curlDispatcher)
private val requestQueue: Channel<RequestContainer> = Channel(Channel.UNLIMITED)
private val requestCounter = atomic(0L)

init {
val init = curlScope.launch {
Expand All @@ -50,9 +43,8 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) {

suspend fun executeRequest(request: CurlRequestData): CurlSuccess {
val result = CompletableDeferred<CurlSuccess>()
nextRequest {
requestQueue.send(RequestContainer(request, result))
}
requestQueue.send(RequestContainer(request, result))
curlApi!!.wakeup()
return result.await()
}

Expand All @@ -62,7 +54,7 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) {
val api = curlApi!!
while (!requestQueue.isClosedForReceive) {
drainRequestQueue(api)
api.perform(requestCounter)
api.perform()
}
}
}
Expand Down Expand Up @@ -94,7 +86,7 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) {
if (!closed.compareAndSet(false, true)) return

requestQueue.close()
nextRequest()
curlApi!!.wakeup()

GlobalScope.launch(curlDispatcher) {
curlScope.coroutineContext[Job]!!.join()
Expand All @@ -110,10 +102,4 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) {
curlApi!!.cancelRequest(easyHandle, cause)
}
}

private inline fun nextRequest(body: (Long) -> Unit = {}) = try {
body(requestCounter.incrementAndGet())
} finally {
curlApi!!.wakeup()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import io.ktor.client.plugins.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import io.ktor.utils.io.locks.*
import kotlinx.atomicfu.*
import kotlinx.cinterop.*
import kotlinx.coroutines.*
import kotlinx.io.*
Expand All @@ -26,11 +25,6 @@ private class RequestHolder @OptIn(ExperimentalForeignApi::class) constructor(
}
}

/**
* Handles requests using libcurl with multi interface.
*
* @see <a href="https://curl.se/libcurl/c/libcurl-multi.html">Multi interface overview</a>
*/
@OptIn(InternalAPI::class)
internal class CurlMultiApiHandler : Closeable {
@OptIn(ExperimentalForeignApi::class)
Expand Down Expand Up @@ -136,12 +130,11 @@ internal class CurlMultiApiHandler : Closeable {
}

@OptIn(ExperimentalForeignApi::class)
internal fun perform(counter: AtomicLong) {
internal fun perform() {
if (activeHandles.isEmpty()) return

memScoped {
val transfersRunning = alloc<IntVar>()
val requestId = counter.value
do {
synchronized(easyHandlesToUnpauseLock) {
var handle = easyHandlesToUnpause.removeFirstOrNull()
Expand All @@ -157,7 +150,7 @@ internal class CurlMultiApiHandler : Closeable {
if (transfersRunning.value < activeHandles.size) {
handleCompleted()
}
} while (transfersRunning.value != 0 && requestId == counter.value)
} while (transfersRunning.value != 0)
}
}

Expand Down

0 comments on commit ef6380a

Please sign in to comment.