From f50b114be5a726da4edb5b64d723b698d5d03cbe Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 23 Dec 2025 14:29:03 -0300 Subject: [PATCH 1/8] Rename `YokiFlow` to `DockerKotlinFlow` --- src/jvmMain/kotlin/me/devnatan/dockerkt/io/Flow.jvm.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/jvmMain/kotlin/me/devnatan/dockerkt/io/Flow.jvm.kt b/src/jvmMain/kotlin/me/devnatan/dockerkt/io/Flow.jvm.kt index 074e19c9..ff5c5815 100644 --- a/src/jvmMain/kotlin/me/devnatan/dockerkt/io/Flow.jvm.kt +++ b/src/jvmMain/kotlin/me/devnatan/dockerkt/io/Flow.jvm.kt @@ -12,7 +12,7 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onStart import me.devnatan.dockerkt.Closeable -public fun interface YokiFlow { +public fun interface DockerClientFlow { public fun onEach(value: T) public fun onStart(): Unit = Unit @@ -28,7 +28,7 @@ internal class InternalYokiFlow internal constructor() : Closeable { fun start( flow: Flow, - callback: YokiFlow, + callback: DockerClientFlow, ) { flow .onStart { callback.onStart() } From 68edd4041a72b17205e3a6b20677a565fc8cbdeb Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 23 Dec 2025 14:31:33 -0300 Subject: [PATCH 2/8] Rename `YokiFlow` to `DockerKotlinFlow` --- api/docker-kotlin.api | 20 +++++++++---------- .../me/devnatan/dockerkt/io/Flow.jvm.kt | 20 +++++++++++++------ 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/api/docker-kotlin.api b/api/docker-kotlin.api index 252fd192..ccb9da1c 100644 --- a/api/docker-kotlin.api +++ b/api/docker-kotlin.api @@ -61,22 +61,22 @@ public final class me/devnatan/dockerkt/DocketClientConfig$Companion { public final fun builder ()Lme/devnatan/dockerkt/DockerClientConfigBuilder; } -public final class me/devnatan/dockerkt/io/SocketUtils { - public static final field DEFAULT_DOCKER_HTTP_SOCKET Ljava/lang/String; - public static final field DEFAULT_DOCKER_UNIX_SOCKET Ljava/lang/String; -} - -public abstract interface class me/devnatan/dockerkt/io/YokiFlow { +public abstract interface class me/devnatan/dockerkt/io/DockerClientFrameListener { public fun onComplete (Ljava/lang/Throwable;)V public abstract fun onEach (Ljava/lang/Object;)V public fun onError (Ljava/lang/Throwable;)V public fun onStart ()V } -public final class me/devnatan/dockerkt/io/YokiFlow$DefaultImpls { - public static fun onComplete (Lme/devnatan/dockerkt/io/YokiFlow;Ljava/lang/Throwable;)V - public static fun onError (Lme/devnatan/dockerkt/io/YokiFlow;Ljava/lang/Throwable;)V - public static fun onStart (Lme/devnatan/dockerkt/io/YokiFlow;)V +public final class me/devnatan/dockerkt/io/DockerClientFrameListener$DefaultImpls { + public static fun onComplete (Lme/devnatan/dockerkt/io/DockerClientFrameListener;Ljava/lang/Throwable;)V + public static fun onError (Lme/devnatan/dockerkt/io/DockerClientFrameListener;Ljava/lang/Throwable;)V + public static fun onStart (Lme/devnatan/dockerkt/io/DockerClientFrameListener;)V +} + +public final class me/devnatan/dockerkt/io/SocketUtils { + public static final field DEFAULT_DOCKER_HTTP_SOCKET Ljava/lang/String; + public static final field DEFAULT_DOCKER_UNIX_SOCKET Ljava/lang/String; } public final class me/devnatan/dockerkt/models/BlkioWeightDevice { diff --git a/src/jvmMain/kotlin/me/devnatan/dockerkt/io/Flow.jvm.kt b/src/jvmMain/kotlin/me/devnatan/dockerkt/io/Flow.jvm.kt index ff5c5815..4687dabc 100644 --- a/src/jvmMain/kotlin/me/devnatan/dockerkt/io/Flow.jvm.kt +++ b/src/jvmMain/kotlin/me/devnatan/dockerkt/io/Flow.jvm.kt @@ -12,7 +12,7 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onStart import me.devnatan.dockerkt.Closeable -public fun interface DockerClientFlow { +public fun interface DockerClientFrameListener { public fun onEach(value: T) public fun onStart(): Unit = Unit @@ -22,24 +22,32 @@ public fun interface DockerClientFlow { public fun onComplete(error: Throwable?): Unit = Unit } -internal class InternalYokiFlow internal constructor() : Closeable { +internal class InternalDockerClientFrameListener internal constructor() : Closeable { private val coroutineScope: CoroutineScope = CoroutineScope(Dispatchers.Default) private var error: Throwable? = null fun start( flow: Flow, - callback: DockerClientFlow, + callback: DockerClientFrameListener, ) { flow .onStart { callback.onStart() } - .onCompletion { error -> callback.onComplete(error.also { this@InternalYokiFlow.error = it }) } + .onCompletion { exception -> + this@InternalDockerClientFrameListener.error = exception + callback.onComplete(exception) + } .onEach(callback::onEach) - .catch { error -> callback.onError(error.also { this@InternalYokiFlow.error = it }) } + .catch { exception -> + this@InternalDockerClientFrameListener.error = exception + callback.onError(exception) + } .launchIn(coroutineScope) } override fun close() { - val exception = error?.let { cause -> CancellationException("An error occurred while consuming flow.", cause) } + val exception = error?.let { cause -> + CancellationException("An error occurred while consuming flow.", cause) + } coroutineScope.cancel(exception) } } From d01676407cce7ed7ba9f2318b2a4c4a7047ec315 Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 23 Dec 2025 14:36:26 -0300 Subject: [PATCH 3/8] Uncomment container resource `logs` --- api/docker-kotlin.api | 3 + api/docker-kotlin.klib.api | 3 + .../resource/container/ContainerResource.kt | 4 + .../container/ContainerResourceExt.kt | 87 ++++--------------- .../container/ContainerResource.jvm.kt | 63 ++++++++++++++ .../container/ContainerResource.native.kt | 15 ++++ 6 files changed, 105 insertions(+), 70 deletions(-) diff --git a/api/docker-kotlin.api b/api/docker-kotlin.api index ccb9da1c..3cdb6e60 100644 --- a/api/docker-kotlin.api +++ b/api/docker-kotlin.api @@ -3830,6 +3830,7 @@ public final class me/devnatan/dockerkt/resource/container/ContainerResource { public final fun listAsync ()Ljava/util/concurrent/CompletableFuture; public final fun listAsync (Lme/devnatan/dockerkt/models/container/ContainerListOptions;)Ljava/util/concurrent/CompletableFuture; public static synthetic fun listAsync$default (Lme/devnatan/dockerkt/resource/container/ContainerResource;Lme/devnatan/dockerkt/models/container/ContainerListOptions;ILjava/lang/Object;)Ljava/util/concurrent/CompletableFuture; + public final fun logs (Ljava/lang/String;Lme/devnatan/dockerkt/models/container/ContainerLogsOptions;)Lkotlinx/coroutines/flow/Flow; public final synthetic fun pause (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun pauseAsync (Ljava/lang/String;)Ljava/util/concurrent/CompletableFuture; public final synthetic fun prune (Lme/devnatan/dockerkt/models/container/ContainerPruneFilters;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -3875,6 +3876,8 @@ public final class me/devnatan/dockerkt/resource/container/ContainerResource { public final class me/devnatan/dockerkt/resource/container/ContainerResourceExtKt { public static final fun create (Lme/devnatan/dockerkt/resource/container/ContainerResource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun list (Lme/devnatan/dockerkt/resource/container/ContainerResource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun logs (Lme/devnatan/dockerkt/resource/container/ContainerResource;Ljava/lang/String;)Lkotlinx/coroutines/flow/Flow; + public static final fun logs (Lme/devnatan/dockerkt/resource/container/ContainerResource;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static final fun prune (Lme/devnatan/dockerkt/resource/container/ContainerResource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun remove (Lme/devnatan/dockerkt/resource/container/ContainerResource;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun resizeTTY (Lme/devnatan/dockerkt/resource/container/ContainerResource;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/api/docker-kotlin.klib.api b/api/docker-kotlin.klib.api index e9260671..bf2d333d 100644 --- a/api/docker-kotlin.klib.api +++ b/api/docker-kotlin.klib.api @@ -3917,6 +3917,7 @@ final class me.devnatan.dockerkt.resource.container/ContainerResource { // me.de constructor () // me.devnatan.dockerkt.resource.container/ContainerResource.|(){}[0] final fun attach(kotlin/String): kotlinx.coroutines.flow/Flow // me.devnatan.dockerkt.resource.container/ContainerResource.attach|attach(kotlin.String){}[0] + final fun logs(kotlin/String, me.devnatan.dockerkt.models.container/ContainerLogsOptions): kotlinx.coroutines.flow/Flow // me.devnatan.dockerkt.resource.container/ContainerResource.logs|logs(kotlin.String;me.devnatan.dockerkt.models.container.ContainerLogsOptions){}[0] final suspend fun archive(kotlin/String, kotlin/String = ...): me.devnatan.dockerkt.models.container/ContainerArchiveInfo // me.devnatan.dockerkt.resource.container/ContainerResource.archive|archive(kotlin.String;kotlin.String){}[0] final suspend fun create(me.devnatan.dockerkt.models.container/ContainerCreateOptions): kotlin/String // me.devnatan.dockerkt.resource.container/ContainerResource.create|create(me.devnatan.dockerkt.models.container.ContainerCreateOptions){}[0] final suspend fun downloadArchive(kotlin/String, kotlin/String): kotlinx.io/RawSource // me.devnatan.dockerkt.resource.container/ContainerResource.downloadArchive|downloadArchive(kotlin.String;kotlin.String){}[0] @@ -4178,10 +4179,12 @@ final fun (me.devnatan.dockerkt.models/HostConfig).me.devnatan.dockerkt.models/p final fun (me.devnatan.dockerkt.models/HostConfig).me.devnatan.dockerkt.models/portBindings(kotlin/UShort, kotlin/Function1, kotlin/Unit> = ...) // me.devnatan.dockerkt.models/portBindings|portBindings@me.devnatan.dockerkt.models.HostConfig(kotlin.UShort;kotlin.Function1,kotlin.Unit>){}[0] final fun (me.devnatan.dockerkt.models/HostConfig).me.devnatan.dockerkt.models/portBindings(me.devnatan.dockerkt.models/ExposedPort, kotlin.collections/List) // me.devnatan.dockerkt.models/portBindings|portBindings@me.devnatan.dockerkt.models.HostConfig(me.devnatan.dockerkt.models.ExposedPort;kotlin.collections.List){}[0] final fun (me.devnatan.dockerkt.models/HostConfig).me.devnatan.dockerkt.models/portBindings(me.devnatan.dockerkt.models/ExposedPort, kotlin/Function1, kotlin/Unit> = ...) // me.devnatan.dockerkt.models/portBindings|portBindings@me.devnatan.dockerkt.models.HostConfig(me.devnatan.dockerkt.models.ExposedPort;kotlin.Function1,kotlin.Unit>){}[0] +final fun (me.devnatan.dockerkt.resource.container/ContainerResource).me.devnatan.dockerkt.resource.container/logs(kotlin/String): kotlinx.coroutines.flow/Flow // me.devnatan.dockerkt.resource.container/logs|logs@me.devnatan.dockerkt.resource.container.ContainerResource(kotlin.String){}[0] final fun me.devnatan.dockerkt.models/stream(kotlin/String): me.devnatan.dockerkt.models/Stream // me.devnatan.dockerkt.models/stream|stream(kotlin.String){}[0] final fun me.devnatan.dockerkt.util/fromJsonEncodedString(kotlin/String): kotlin.collections/Map // me.devnatan.dockerkt.util/fromJsonEncodedString|fromJsonEncodedString(kotlin.String){}[0] final fun me.devnatan.dockerkt.util/toJsonEncodedString(kotlin/Any): kotlin/String // me.devnatan.dockerkt.util/toJsonEncodedString|toJsonEncodedString(kotlin.Any){}[0] final inline fun (me.devnatan.dockerkt.models.container/ContainerListOptions).me.devnatan.dockerkt.models.container/filters(kotlin/Function1) // me.devnatan.dockerkt.models.container/filters|filters@me.devnatan.dockerkt.models.container.ContainerListOptions(kotlin.Function1){}[0] +final inline fun (me.devnatan.dockerkt.resource.container/ContainerResource).me.devnatan.dockerkt.resource.container/logs(kotlin/String, kotlin/Function1): kotlinx.coroutines.flow/Flow // me.devnatan.dockerkt.resource.container/logs|logs@me.devnatan.dockerkt.resource.container.ContainerResource(kotlin.String;kotlin.Function1){}[0] final inline fun (me.devnatan.dockerkt.resource.system/SystemResource).me.devnatan.dockerkt.resource.system/events(kotlin/Function1): kotlinx.coroutines.flow/Flow // me.devnatan.dockerkt.resource.system/events|events@me.devnatan.dockerkt.resource.system.SystemResource(kotlin.Function1){}[0] final inline fun me.devnatan.dockerkt/DockerClient(crossinline kotlin/Function1): me.devnatan.dockerkt/DockerClient // me.devnatan.dockerkt/DockerClient|DockerClient(kotlin.Function1){}[0] final suspend inline fun (me.devnatan.dockerkt.resource.container/ContainerResource).me.devnatan.dockerkt.resource.container/create(kotlin/Function1): kotlin/String // me.devnatan.dockerkt.resource.container/create|create@me.devnatan.dockerkt.resource.container.ContainerResource(kotlin.Function1){}[0] diff --git a/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.kt b/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.kt index 199e12a2..52de9f2c 100644 --- a/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.kt +++ b/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.kt @@ -1,6 +1,7 @@ package me.devnatan.dockerkt.resource.container import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow import kotlinx.io.RawSource import me.devnatan.dockerkt.DockerResponseException import me.devnatan.dockerkt.models.Frame @@ -9,6 +10,7 @@ import me.devnatan.dockerkt.models.container.Container import me.devnatan.dockerkt.models.container.ContainerArchiveInfo import me.devnatan.dockerkt.models.container.ContainerCreateOptions import me.devnatan.dockerkt.models.container.ContainerListOptions +import me.devnatan.dockerkt.models.container.ContainerLogsOptions import me.devnatan.dockerkt.models.container.ContainerPruneFilters import me.devnatan.dockerkt.models.container.ContainerPruneResult import me.devnatan.dockerkt.models.container.ContainerRemoveOptions @@ -194,4 +196,6 @@ public expect class ContainerResource { inputPath: String, remotePath: String, ) + + public fun logs(container: String, options: ContainerLogsOptions): Flow } diff --git a/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResourceExt.kt b/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResourceExt.kt index fd590965..0e01ae97 100644 --- a/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResourceExt.kt +++ b/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResourceExt.kt @@ -1,9 +1,12 @@ package me.devnatan.dockerkt.resource.container +import kotlinx.coroutines.flow.Flow import me.devnatan.dockerkt.DockerResponseException +import me.devnatan.dockerkt.models.Frame import me.devnatan.dockerkt.models.ResizeTTYOptions import me.devnatan.dockerkt.models.container.ContainerCreateOptions import me.devnatan.dockerkt.models.container.ContainerListOptions +import me.devnatan.dockerkt.models.container.ContainerLogsOptions import me.devnatan.dockerkt.models.container.ContainerPruneFilters import me.devnatan.dockerkt.models.container.ContainerPruneResult import me.devnatan.dockerkt.models.container.ContainerRemoveOptions @@ -59,74 +62,18 @@ public suspend inline fun ContainerResource.resizeTTY( resizeTTY(container, ResizeTTYOptions().apply(options)) } -// public inline fun ContainerResource.logs( -// id: String, -// block: ContainerLogsOptions.() -> Unit, -// ): Flow { -// return logs(id, ContainerLogsOptions().apply(block)) -// } - -// public fun ContainerResource.logs(id: String): Flow = logs( -// id, -// options = ContainerLogsOptions( -// follow = true, -// stderr = true, -// stdout = true, -// ), -// ) +public inline fun ContainerResource.logs( + id: String, + block: ContainerLogsOptions.() -> Unit, +): Flow { + return logs(id, ContainerLogsOptions().apply(block)) +} -// public fun ContainerResource.logs(id: String, options: ContainerLogsOptions): Flow = flow { -// httpClient.prepareGet("${ContainerResource.BASE_PATH}/$id/logs") { -// parameter("follow", options.follow) -// parameter("stdout", options.stdout) -// parameter("stderr", options.stderr) -// parameter("since", options.since) -// parameter("until", options.until) -// parameter("timestamps", options.showTimestamps) -// parameter("tail", options.tail) -// }.execute { response -> -// val channel = response.body() -// while (!channel.isClosedForRead) { -// val fb = channel.readByte() -// val stream = Stream.typeOfOrNull(fb) -// -// // Unknown stream = tty enabled -// if (stream == null) { -// val remaining = channel.availableForRead -// -// // Remaining +1 includes the previously read first byte. Reinsert the first byte since we read it -// // before but the type was not expected, so this byte is actually the first character of the line. -// val len = remaining + 1 -// val payload = ByteReadChannel( -// ByteArray(len) { -// if (it == 0) fb else channel.readByte() -// }, -// ) -// -// val line = payload.readUTF8Line() ?: error("Payload cannot be null") -// -// // Try to determine the "correct" stream since we cannot have this information. -// val stdoutEnabled = options.stdout ?: false -// val stdErrEnabled = options.stderr ?: false -// val expectedStream: Stream = stream ?: when { -// stdoutEnabled && !stdErrEnabled -> Stream.StdOut -// stdErrEnabled && !stdoutEnabled -> Stream.StdErr -// else -> Stream.Unknown -// } -// -// emit(Frame(line, len, expectedStream)) -// continue -// } -// -// val header = channel.readPacket(7) -// -// // We discard the first three bytes because the payload size is in the last four bytes -// // and the total header size is 8. -// header.discard(3) -// -// val payloadLength = header.readInt(ByteOrder.BIG_ENDIAN) -// val payloadData = channel.readUTF8Line(payloadLength)!! -// emit(Frame(payloadData, payloadLength, stream)) -// } -// } -// } +public fun ContainerResource.logs(id: String): Flow = logs( + id, + options = ContainerLogsOptions( + follow = true, + stderr = true, + stdout = true, + ), +) diff --git a/src/jvmMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.jvm.kt b/src/jvmMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.jvm.kt index ba8afafc..437665c9 100644 --- a/src/jvmMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.jvm.kt +++ b/src/jvmMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.jvm.kt @@ -8,6 +8,7 @@ import io.ktor.client.request.get import io.ktor.client.request.head import io.ktor.client.request.parameter import io.ktor.client.request.post +import io.ktor.client.request.prepareGet import io.ktor.client.request.preparePost import io.ktor.client.request.put import io.ktor.client.request.setBody @@ -15,6 +16,11 @@ import io.ktor.http.ContentType import io.ktor.http.HttpStatusCode import io.ktor.util.cio.toByteReadChannel import io.ktor.utils.io.ByteReadChannel +import io.ktor.utils.io.availableForRead +import io.ktor.utils.io.core.ByteOrder +import io.ktor.utils.io.core.discard +import io.ktor.utils.io.readByte +import io.ktor.utils.io.readPacket import io.ktor.utils.io.readUTF8Line import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.async @@ -39,6 +45,7 @@ import me.devnatan.dockerkt.models.container.ContainerArchiveInfo import me.devnatan.dockerkt.models.container.ContainerCreateOptions import me.devnatan.dockerkt.models.container.ContainerCreateResult import me.devnatan.dockerkt.models.container.ContainerListOptions +import me.devnatan.dockerkt.models.container.ContainerLogsOptions import me.devnatan.dockerkt.models.container.ContainerPruneFilters import me.devnatan.dockerkt.models.container.ContainerPruneResult import me.devnatan.dockerkt.models.container.ContainerRemoveOptions @@ -593,4 +600,60 @@ public actual class ContainerResource( setBody(archive.buffered().asInputStream().toByteReadChannel()) } } + + public actual fun logs(container: String, options: ContainerLogsOptions): Flow = flow { + httpClient.prepareGet("$CONTAINERS/$container/logs") { + parameter("follow", options.follow) + parameter("stdout", options.stdout) + parameter("stderr", options.stderr) + parameter("since", options.since) + parameter("until", options.until) + parameter("timestamps", options.showTimestamps) + parameter("tail", options.tail) + }.execute { response -> + val channel = response.body() + while (!channel.isClosedForRead) { + val fb = channel.readByte() + val stream = Stream.typeOfOrNull(fb) + + // Unknown stream = tty enabled + if (stream == null) { + val remaining = channel.availableForRead + + // Remaining +1 includes the previously read first byte. Reinsert the first byte since we read it + // before but the type was not expected, so this byte is actually the first character of the line. + val len = remaining + 1 + val payload = ByteReadChannel( + ByteArray(len) { + if (it == 0) fb else channel.readByte() + }, + ) + + val line = payload.readUTF8Line() ?: error("Payload cannot be null") + + // Try to determine the "correct" stream since we cannot have this information. + val stdoutEnabled = options.stdout ?: false + val stdErrEnabled = options.stderr ?: false + val expectedStream: Stream = stream ?: when { + stdoutEnabled && !stdErrEnabled -> Stream.StdOut + stdErrEnabled && !stdoutEnabled -> Stream.StdErr + else -> Stream.Unknown + } + + emit(Frame(line, len, expectedStream)) + continue + } + + val header = channel.readPacket(7) + + // We discard the first three bytes because the payload size is in the last four bytes + // and the total header size is 8. + header.discard(3) + + val payloadLength = header.readInt() + val payloadData = channel.readUTF8Line(payloadLength)!! + emit(Frame(payloadData, payloadLength, stream)) + } + } + } } diff --git a/src/nativeMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.native.kt b/src/nativeMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.native.kt index 440559da..2d8b42d1 100644 --- a/src/nativeMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.native.kt +++ b/src/nativeMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.native.kt @@ -8,6 +8,7 @@ import me.devnatan.dockerkt.models.container.Container import me.devnatan.dockerkt.models.container.ContainerArchiveInfo import me.devnatan.dockerkt.models.container.ContainerCreateOptions import me.devnatan.dockerkt.models.container.ContainerListOptions +import me.devnatan.dockerkt.models.container.ContainerLogsOptions import me.devnatan.dockerkt.models.container.ContainerPruneFilters import me.devnatan.dockerkt.models.container.ContainerPruneResult import me.devnatan.dockerkt.models.container.ContainerRemoveOptions @@ -48,6 +49,7 @@ public actual class ContainerResource { container: String, options: ContainerRemoveOptions, ) { + TODO("Not yet implemented") } /** @@ -75,6 +77,7 @@ public actual class ContainerResource { container: String, detachKeys: String?, ) { + TODO("Not yet implemented") } /** @@ -87,6 +90,7 @@ public actual class ContainerResource { container: String, timeout: Duration?, ) { + TODO("Not yet implemented") } /** @@ -99,6 +103,7 @@ public actual class ContainerResource { container: String, timeout: Duration?, ) { + TODO("Not yet implemented") } /** @@ -111,6 +116,7 @@ public actual class ContainerResource { container: String, signal: String?, ) { + TODO("Not yet implemented") } /** @@ -123,6 +129,7 @@ public actual class ContainerResource { container: String, newName: String, ) { + TODO("Not yet implemented") } /** @@ -132,6 +139,7 @@ public actual class ContainerResource { * @see unpause */ public actual suspend fun pause(container: String) { + TODO("Not yet implemented") } /** @@ -141,6 +149,7 @@ public actual class ContainerResource { * @see pause */ public actual suspend fun unpause(container: String) { + TODO("Not yet implemented") } /** @@ -155,6 +164,7 @@ public actual class ContainerResource { container: String, options: ResizeTTYOptions, ) { + TODO("Not yet implemented") } public actual fun attach(container: String): Flow { @@ -210,5 +220,10 @@ public actual class ContainerResource { inputPath: String, remotePath: String, ) { + TODO("Not yet implemented") + } + + public actual fun logs(container: String, options: ContainerLogsOptions): Flow { + TODO("Not yet implemented") } } From f79ca02ff68b3c1b90acd0b64aa9fd79a3444017 Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 23 Dec 2025 14:47:14 -0300 Subject: [PATCH 4/8] Handle EOFException while reading bytes --- .../dockerkt/resource/container/ContainerResourceExt.kt | 8 ++++---- src/commonTest/kotlin/me/devnatan/dockerkt/Client.kt | 1 - .../dockerkt/resource/container/ContainerResource.jvm.kt | 8 +++++++- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResourceExt.kt b/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResourceExt.kt index 0e01ae97..bc958e5d 100644 --- a/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResourceExt.kt +++ b/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResourceExt.kt @@ -63,14 +63,14 @@ public suspend inline fun ContainerResource.resizeTTY( } public inline fun ContainerResource.logs( - id: String, + container: String, block: ContainerLogsOptions.() -> Unit, ): Flow { - return logs(id, ContainerLogsOptions().apply(block)) + return logs(container, ContainerLogsOptions().apply(block)) } -public fun ContainerResource.logs(id: String): Flow = logs( - id, +public fun ContainerResource.logs(container: String): Flow = logs( + container = container, options = ContainerLogsOptions( follow = true, stderr = true, diff --git a/src/commonTest/kotlin/me/devnatan/dockerkt/Client.kt b/src/commonTest/kotlin/me/devnatan/dockerkt/Client.kt index 2134eed5..323937ed 100644 --- a/src/commonTest/kotlin/me/devnatan/dockerkt/Client.kt +++ b/src/commonTest/kotlin/me/devnatan/dockerkt/Client.kt @@ -7,7 +7,6 @@ package me.devnatan.dockerkt fun createTestDockerClient(block: DockerClientConfigBuilder.() -> Unit = {}): DockerClient = runCatching { DockerClient { - debugHttpCalls(true) apply(block) } }.onFailure { diff --git a/src/jvmMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.jvm.kt b/src/jvmMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.jvm.kt index 437665c9..436a8a1d 100644 --- a/src/jvmMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.jvm.kt +++ b/src/jvmMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.jvm.kt @@ -27,6 +27,7 @@ import kotlinx.coroutines.async import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.future.asCompletableFuture +import kotlinx.io.EOFException import kotlinx.io.RawSource import kotlinx.io.asInputStream import kotlinx.io.asSource @@ -613,7 +614,12 @@ public actual class ContainerResource( }.execute { response -> val channel = response.body() while (!channel.isClosedForRead) { - val fb = channel.readByte() + val fb: Byte = runCatching { + channel.readByte() + }.getOrElse { + break // container stopped while streaming logs? + } + val stream = Stream.typeOfOrNull(fb) // Unknown stream = tty enabled From ddbdf4d36c1df0a4720e1c1c33a26ad9d854449d Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 23 Dec 2025 15:05:34 -0300 Subject: [PATCH 5/8] Test container logs --- .../resource/container/LogContainerIT.kt | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 src/commonTest/kotlin/me/devnatan/dockerkt/resource/container/LogContainerIT.kt diff --git a/src/commonTest/kotlin/me/devnatan/dockerkt/resource/container/LogContainerIT.kt b/src/commonTest/kotlin/me/devnatan/dockerkt/resource/container/LogContainerIT.kt new file mode 100644 index 00000000..446f1f0b --- /dev/null +++ b/src/commonTest/kotlin/me/devnatan/dockerkt/resource/container/LogContainerIT.kt @@ -0,0 +1,74 @@ +package me.devnatan.dockerkt.resource.container + +import io.ktor.utils.io.CancellationException +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runTest +import me.devnatan.dockerkt.resource.ResourceIT +import me.devnatan.dockerkt.withContainer +import kotlin.concurrent.atomics.ExperimentalAtomicApi +import kotlin.test.Test +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.seconds + +class LogContainerIT : ResourceIT() { + @OptIn(ExperimentalAtomicApi::class, ExperimentalCoroutinesApi::class) + @Test + fun `stream container logs`() = + runTest { + testClient.withContainer("nginx:latest") { containerId -> + testClient.containers.start(containerId) + + val completed = CompletableDeferred() + + launch { + testClient.containers.logs(containerId).collect { + completed.complete(true) + cancel() + } + } + + completed.await() + assertTrue(completed.getCompleted()) + } + } + + @OptIn(ExperimentalAtomicApi::class) + @Test + fun `stream stopped container logs`() = + runTest { + testClient.withContainer("nginx:latest") { containerId -> + assertFailsWith( + message = "Container stopped? Logs are not available.", + ) { + testClient.containers.logs(containerId).collect() + } + } + } + + @Test + fun `stream container logs interrupted`() = + runTest { + testClient.withContainer("nginx:latest") { containerId -> + testClient.containers.start(containerId) + + val frames = mutableListOf() + + launch { + delay(3.seconds) + + // Force container stop so we can catch EOFException + testClient.containers.stop(containerId) + } + + testClient.containers.logs(containerId).collect(frames::add) + + assertTrue(frames.isNotEmpty()) + } + } +} From a88536319b740b107a9cf0c15be6cd70090c5036 Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 23 Dec 2025 15:05:46 -0300 Subject: [PATCH 6/8] Fix network prune test to work in any environment --- .../me/devnatan/dockerkt/resource/network/NetworkResourceIT.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/commonTest/kotlin/me/devnatan/dockerkt/resource/network/NetworkResourceIT.kt b/src/commonTest/kotlin/me/devnatan/dockerkt/resource/network/NetworkResourceIT.kt index 1e82414b..44f463e7 100644 --- a/src/commonTest/kotlin/me/devnatan/dockerkt/resource/network/NetworkResourceIT.kt +++ b/src/commonTest/kotlin/me/devnatan/dockerkt/resource/network/NetworkResourceIT.kt @@ -56,7 +56,8 @@ class NetworkResourceIT : ResourceIT() { // check for >= because docker can have default networks defined assertEquals(testClient.networks.list().size, oldCount + newCount) + // just ensure prune will work correctly, comparing sizes may not + // work well in different environments testClient.networks.prune() - assertEquals(testClient.networks.list().size, oldCount) } } From 82e0a68bb049bf3fb9cc218014d4afc4a9666bee Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 23 Dec 2025 15:05:50 -0300 Subject: [PATCH 7/8] Enable HTTP call debugging in resource integration tests --- .../kotlin/me/devnatan/dockerkt/resource/ResourceIT.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/commonTest/kotlin/me/devnatan/dockerkt/resource/ResourceIT.kt b/src/commonTest/kotlin/me/devnatan/dockerkt/resource/ResourceIT.kt index 64b98143..f8767051 100644 --- a/src/commonTest/kotlin/me/devnatan/dockerkt/resource/ResourceIT.kt +++ b/src/commonTest/kotlin/me/devnatan/dockerkt/resource/ResourceIT.kt @@ -4,7 +4,7 @@ import me.devnatan.dockerkt.DockerClient import me.devnatan.dockerkt.createTestDockerClient open class ResourceIT( - private val debugHttpCalls: Boolean = false, + private val debugHttpCalls: Boolean = true, ) { val testClient: DockerClient by lazy { createTestDockerClient { From 3553a7db9a420484f28f1ed5ec3e89cf6127fd34 Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 23 Dec 2025 15:06:05 -0300 Subject: [PATCH 8/8] Handle already-stopped container logs gracefully --- .../resource/container/ContainerResource.kt | 5 +- .../container/ContainerResourceExt.kt | 22 +-- .../me/devnatan/dockerkt/io/Flow.jvm.kt | 13 +- .../container/ContainerResource.jvm.kt | 128 ++++++++++-------- .../container/ContainerResource.native.kt | 5 +- 5 files changed, 94 insertions(+), 79 deletions(-) diff --git a/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.kt b/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.kt index 52de9f2c..99b690cc 100644 --- a/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.kt +++ b/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.kt @@ -197,5 +197,8 @@ public expect class ContainerResource { remotePath: String, ) - public fun logs(container: String, options: ContainerLogsOptions): Flow + public fun logs( + container: String, + options: ContainerLogsOptions, + ): Flow } diff --git a/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResourceExt.kt b/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResourceExt.kt index bc958e5d..a6508dd0 100644 --- a/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResourceExt.kt +++ b/src/commonMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResourceExt.kt @@ -65,15 +65,15 @@ public suspend inline fun ContainerResource.resizeTTY( public inline fun ContainerResource.logs( container: String, block: ContainerLogsOptions.() -> Unit, -): Flow { - return logs(container, ContainerLogsOptions().apply(block)) -} +): Flow = logs(container, ContainerLogsOptions().apply(block)) -public fun ContainerResource.logs(container: String): Flow = logs( - container = container, - options = ContainerLogsOptions( - follow = true, - stderr = true, - stdout = true, - ), -) +public fun ContainerResource.logs(container: String): Flow = + logs( + container = container, + options = + ContainerLogsOptions( + follow = true, + stderr = true, + stdout = true, + ), + ) diff --git a/src/jvmMain/kotlin/me/devnatan/dockerkt/io/Flow.jvm.kt b/src/jvmMain/kotlin/me/devnatan/dockerkt/io/Flow.jvm.kt index 4687dabc..b5df4f34 100644 --- a/src/jvmMain/kotlin/me/devnatan/dockerkt/io/Flow.jvm.kt +++ b/src/jvmMain/kotlin/me/devnatan/dockerkt/io/Flow.jvm.kt @@ -35,19 +35,18 @@ internal class InternalDockerClientFrameListener internal constructor() : Closea .onCompletion { exception -> this@InternalDockerClientFrameListener.error = exception callback.onComplete(exception) - } - .onEach(callback::onEach) + }.onEach(callback::onEach) .catch { exception -> this@InternalDockerClientFrameListener.error = exception callback.onError(exception) - } - .launchIn(coroutineScope) + }.launchIn(coroutineScope) } override fun close() { - val exception = error?.let { cause -> - CancellationException("An error occurred while consuming flow.", cause) - } + val exception = + error?.let { cause -> + CancellationException("An error occurred while consuming flow.", cause) + } coroutineScope.cancel(exception) } } diff --git a/src/jvmMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.jvm.kt b/src/jvmMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.jvm.kt index 436a8a1d..77f3216b 100644 --- a/src/jvmMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.jvm.kt +++ b/src/jvmMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.jvm.kt @@ -16,8 +16,8 @@ import io.ktor.http.ContentType import io.ktor.http.HttpStatusCode import io.ktor.util.cio.toByteReadChannel import io.ktor.utils.io.ByteReadChannel +import io.ktor.utils.io.CancellationException import io.ktor.utils.io.availableForRead -import io.ktor.utils.io.core.ByteOrder import io.ktor.utils.io.core.discard import io.ktor.utils.io.readByte import io.ktor.utils.io.readPacket @@ -27,12 +27,10 @@ import kotlinx.coroutines.async import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.future.asCompletableFuture -import kotlinx.io.EOFException import kotlinx.io.RawSource import kotlinx.io.asInputStream import kotlinx.io.asSource import kotlinx.io.buffered -import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import me.devnatan.dockerkt.DockerResponseException import me.devnatan.dockerkt.io.readTarFile @@ -602,64 +600,76 @@ public actual class ContainerResource( } } - public actual fun logs(container: String, options: ContainerLogsOptions): Flow = flow { - httpClient.prepareGet("$CONTAINERS/$container/logs") { - parameter("follow", options.follow) - parameter("stdout", options.stdout) - parameter("stderr", options.stderr) - parameter("since", options.since) - parameter("until", options.until) - parameter("timestamps", options.showTimestamps) - parameter("tail", options.tail) - }.execute { response -> - val channel = response.body() - while (!channel.isClosedForRead) { - val fb: Byte = runCatching { - channel.readByte() - }.getOrElse { - break // container stopped while streaming logs? - } - - val stream = Stream.typeOfOrNull(fb) - - // Unknown stream = tty enabled - if (stream == null) { - val remaining = channel.availableForRead - - // Remaining +1 includes the previously read first byte. Reinsert the first byte since we read it - // before but the type was not expected, so this byte is actually the first character of the line. - val len = remaining + 1 - val payload = ByteReadChannel( - ByteArray(len) { - if (it == 0) fb else channel.readByte() - }, - ) - - val line = payload.readUTF8Line() ?: error("Payload cannot be null") - - // Try to determine the "correct" stream since we cannot have this information. - val stdoutEnabled = options.stdout ?: false - val stdErrEnabled = options.stderr ?: false - val expectedStream: Stream = stream ?: when { - stdoutEnabled && !stdErrEnabled -> Stream.StdOut - stdErrEnabled && !stdoutEnabled -> Stream.StdErr - else -> Stream.Unknown + public actual fun logs( + container: String, + options: ContainerLogsOptions, + ): Flow = + flow { + httpClient + .prepareGet("$CONTAINERS/$container/logs") { + parameter("follow", options.follow) + parameter("stdout", options.stdout) + parameter("stderr", options.stderr) + parameter("since", options.since) + parameter("until", options.until) + parameter("timestamps", options.showTimestamps) + parameter("tail", options.tail) + }.execute { response -> + val channel = response.body() + if (channel.isClosedForRead) { + throw CancellationException("Container stopped? Logs are not available.") } - emit(Frame(line, len, expectedStream)) - continue + while (!channel.isClosedForRead) { + val fb: Byte = + runCatching { + channel.readByte() + }.getOrElse { + break // container stopped while streaming logs? + } + + val stream = Stream.typeOfOrNull(fb) + + // Unknown stream = tty enabled + if (stream == null) { + val remaining = channel.availableForRead + + // Remaining +1 includes the previously read first byte. Reinsert the first byte since we read it + // before but the type was not expected, so this byte is actually the first character of the line. + val len = remaining + 1 + val payload = + ByteReadChannel( + ByteArray(len) { + if (it == 0) fb else channel.readByte() + }, + ) + + val line = payload.readUTF8Line() ?: error("Payload cannot be null") + + // Try to determine the "correct" stream since we cannot have this information. + val stdoutEnabled = options.stdout ?: false + val stdErrEnabled = options.stderr ?: false + val expectedStream: Stream = + stream ?: when { + stdoutEnabled && !stdErrEnabled -> Stream.StdOut + stdErrEnabled && !stdoutEnabled -> Stream.StdErr + else -> Stream.Unknown + } + + emit(Frame(line, len, expectedStream)) + continue + } + + val header = channel.readPacket(7) + + // We discard the first three bytes because the payload size is in the last four bytes + // and the total header size is 8. + header.discard(3) + + val payloadLength = header.readInt() + val payloadData = channel.readUTF8Line(payloadLength)!! + emit(Frame(payloadData, payloadLength, stream)) + } } - - val header = channel.readPacket(7) - - // We discard the first three bytes because the payload size is in the last four bytes - // and the total header size is 8. - header.discard(3) - - val payloadLength = header.readInt() - val payloadData = channel.readUTF8Line(payloadLength)!! - emit(Frame(payloadData, payloadLength, stream)) - } } - } } diff --git a/src/nativeMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.native.kt b/src/nativeMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.native.kt index 2d8b42d1..f528d38c 100644 --- a/src/nativeMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.native.kt +++ b/src/nativeMain/kotlin/me/devnatan/dockerkt/resource/container/ContainerResource.native.kt @@ -223,7 +223,10 @@ public actual class ContainerResource { TODO("Not yet implemented") } - public actual fun logs(container: String, options: ContainerLogsOptions): Flow { + public actual fun logs( + container: String, + options: ContainerLogsOptions, + ): Flow { TODO("Not yet implemented") } }