From 1bded81d6d659fe5d62f4d84a31f307820113150 Mon Sep 17 00:00:00 2001 From: icodening Date: Sat, 21 Mar 2026 18:42:55 +0800 Subject: [PATCH 1/3] refactor(http): non-blocking transport for JdkHttpTransport --- .../model/transport/JdkHttpTransport.java | 233 +++++++++++------- 1 file changed, 140 insertions(+), 93 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java index f6a374fe3..2189a1438 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java @@ -15,10 +15,7 @@ */ package io.agentscope.core.model.transport; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.net.PasswordAuthentication; import java.net.Proxy; @@ -28,7 +25,9 @@ import java.net.http.HttpClient; import java.net.http.HttpClient.Redirect; import java.net.http.HttpClient.Version; +import java.net.http.HttpHeaders; import java.net.http.HttpResponse.BodyHandlers; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; @@ -38,17 +37,19 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CompletableFuture; +import java.util.StringJoiner; import java.util.concurrent.CompletionException; +import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; +import reactor.core.publisher.Sinks; /** * Pure JDK implementation of the HttpTransport interface. @@ -195,86 +196,89 @@ public Flux stream(HttpRequest request) { var jdkRequest = buildJdkRequest(request); - // Check status code and read error body immediately when CompletableFuture completes - // to avoid stream being closed before we can read it - CompletableFuture> future = - client.sendAsync(jdkRequest, BodyHandlers.ofInputStream()) - .thenApply( - response -> { - int statusCode = response.statusCode(); - if (statusCode < 200 || statusCode >= 300) { - // Read error body immediately while stream is still open - String errorBody = readInputStream(response.body()); - log.warn( - "HTTP request failed. URL: {} | Status: {} | Error:" - + " {}", - request.getUrl(), - statusCode, - errorBody); - throw new CompletionException( - new HttpTransportException( - "HTTP request failed with status " - + statusCode - + " | " - + errorBody, - statusCode, - errorBody)); - } - return response; - }); - - return Mono.fromCompletionStage(future) - .flatMapMany(response -> processStreamResponse(response, request)) - .publishOn(Schedulers.boundedElastic()) + Sinks.Many sink = Sinks.many().multicast().onBackpressureBuffer(); + java.net.http.HttpResponse.BodyHandler responseBodyHandler = + responseInfo -> + java.net.http.HttpResponse.BodySubscribers.fromLineSubscriber(new ResponseLineSubscriber(sink, responseInfo), + ResponseLineSubscriber::getErrorMessage, + charsetFrom(responseInfo.headers()), + null); + client.sendAsync(jdkRequest, responseBodyHandler) + .whenComplete((response, throwable) -> { + if (throwable != null) { + //e.g. java.util.concurrent.CompletionException: java.net.ConnectException + sink.tryEmitError(throwable); + } else { + int statusCode = response.statusCode(); + log.debug("Received HTTP response with status code {}", statusCode); + if (statusCode >= 200 && statusCode < 300) { + //ignore successful + return; + } + String errorBody = response.body(); + log.warn("HTTP request failed. URL: {} | Status: {} | Error: {}", + request.getUrl(), + statusCode, + errorBody); + + } + }); + Flux lineFlux = sink.asFlux() .onErrorMap( e -> !(e instanceof HttpTransportException), e -> { Throwable cause = e instanceof CompletionException ? e.getCause() : e; - if (cause instanceof HttpTransportException) { - return (HttpTransportException) cause; + if (cause instanceof HttpTransportException hte) { + return hte; } return new HttpTransportException( "SSE/NDJSON stream failed: " + e.getMessage(), e); - }) - .subscribeOn(Schedulers.boundedElastic()); - } - - private Flux processStreamResponse( - java.net.http.HttpResponse response, HttpRequest request) { - InputStream inputStream = response.body(); - if (inputStream == null) { - return Flux.empty(); - } - - // Check if the request has the NDJSON format header + }); boolean isNdjson = - TransportConstants.STREAM_FORMAT_NDJSON.equals( - request.getHeaders().get(TransportConstants.STREAM_FORMAT_HEADER)); - - // Use Flux.using to manage resource lifecycle - return Flux.using( - () -> - new BufferedReader( - new InputStreamReader(inputStream, StandardCharsets.UTF_8)), - reader -> isNdjson ? readNdJsonLines(reader) : readSseLines(reader), - this::closeQuietly); - } - - private Flux readSseLines(BufferedReader reader) { - return Flux.fromStream(reader.lines()) - .filter(line -> line.startsWith(SSE_DATA_PREFIX)) + TransportConstants.STREAM_FORMAT_NDJSON.equals(request.getHeaders().get(TransportConstants.STREAM_FORMAT_HEADER)); + if (isNdjson) { + return lineFlux + .doOnNext(line -> log.debug("Received NDJSON line:{}", line)) + .filter(line -> !line.isEmpty()); + } + return lineFlux.filter(line -> line.startsWith(SSE_DATA_PREFIX)) .map(line -> line.substring(SSE_DATA_PREFIX.length()).trim()) .takeWhile(data -> !SSE_DONE_MARKER.equals(data)) - .doOnNext(data -> log.debug("Received SSE data chunk")) + .doOnNext(data -> log.debug("Received SSE data chunk:{}", data)) .filter(data -> !data.isEmpty()); } - private Flux readNdJsonLines(BufferedReader reader) { - return Flux.fromStream(reader.lines()) - .doOnNext(line -> log.debug("Received NDJSON line")) - .filter(line -> !line.isEmpty()); + private static Charset charsetFrom(HttpHeaders headers) { + String type = headers.firstValue("Content-Type").orElse("text/html; charset=utf-8"); + int i = type.indexOf(';'); + if (i >= 0) { + type = type.substring(i + 1); + } + try { + String value = null; + for (String param : type.split(";")) { + String[] keyValue = param.split("=", 2); + if (keyValue.length != 2) { + continue; + } + if ("charset".equalsIgnoreCase(keyValue[0].trim())) { + value = keyValue[1].trim(); + if (value.length() >= 2 && value.startsWith("\"") && value.endsWith("\"")) { + value = value.substring(1, value.length() - 1).trim(); + } + break; + } + } + if (value == null || value.isEmpty()) { + return StandardCharsets.UTF_8; + } + return Charset.forName(value); + } catch (Exception x) { + return StandardCharsets.UTF_8; + } } + @Override public void close() { closed.set(true); @@ -370,28 +374,6 @@ private HttpResponse buildHttpResponse(java.net.http.HttpResponse respon return builder.build(); } - private String readInputStream(InputStream inputStream) { - if (inputStream == null) { - return null; - } - try (inputStream) { - return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); - } catch (IOException e) { - log.warn("Failed to read response body: {}", e.getMessage()); - return null; - } - } - - private void closeQuietly(AutoCloseable closeable) { - if (closeable != null) { - try { - closeable.close(); - } catch (Exception e) { - log.debug("Error closing resource: {}", e.getMessage()); - } - } - } - /** * Create a new builder for JdkHttpTransport. * @@ -497,4 +479,69 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) { log.warn("Proxy connection failed: uri={}, address={}", uri, sa, ioe); } } + + private static class ResponseLineSubscriber implements Flow.Subscriber { + + private final AtomicReference subscriptionRef = new AtomicReference<>(); + + private final Sinks.Many sink; + + private final java.net.http.HttpResponse.ResponseInfo responseInfo; + + private final StringJoiner errorBodyJoiner = new StringJoiner("\n"); + + ResponseLineSubscriber(Sinks.Many sink, + java.net.http.HttpResponse.ResponseInfo responseInfo) { + this.sink = Objects.requireNonNull(sink, "sink must not be null"); + this.responseInfo = responseInfo; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + if (subscriptionRef.compareAndSet(null, subscription)) { + subscription.request(Long.MAX_VALUE); + } else { + subscription.cancel(); + } + } + + @Override + public void onNext(String item) { + if (is2xxSuccessful()) { + sink.tryEmitNext(item); + } else { + errorBodyJoiner.add(item); + } + } + + @Override + public void onError(Throwable throwable) { + sink.tryEmitError(throwable); + } + + @Override + public void onComplete() { + if (is2xxSuccessful()) { + sink.tryEmitComplete(); + } else { + int statusCode = responseInfo.statusCode(); + HttpTransportException ex = new HttpTransportException( + "HTTP request failed with status " + + statusCode + + " | " + + errorBodyJoiner, + statusCode, + errorBodyJoiner.toString()); + sink.tryEmitError(ex); + } + } + + public String getErrorMessage() { + return errorBodyJoiner.toString(); + } + + private boolean is2xxSuccessful() { + return responseInfo.statusCode() >= 200 && responseInfo.statusCode() < 300; + } + } } From db087bfd6950b9d456fc3c70645fb547a5287f81 Mon Sep 17 00:00:00 2001 From: icodening Date: Sat, 21 Mar 2026 19:01:49 +0800 Subject: [PATCH 2/3] refactor(http): non-blocking transport for JdkHttpTransport --- .../model/transport/JdkHttpTransport.java | 90 ++++++++++--------- 1 file changed, 47 insertions(+), 43 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java index 2189a1438..15bc7d74e 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java @@ -45,7 +45,6 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -199,46 +198,51 @@ public Flux stream(HttpRequest request) { Sinks.Many sink = Sinks.many().multicast().onBackpressureBuffer(); java.net.http.HttpResponse.BodyHandler responseBodyHandler = responseInfo -> - java.net.http.HttpResponse.BodySubscribers.fromLineSubscriber(new ResponseLineSubscriber(sink, responseInfo), + java.net.http.HttpResponse.BodySubscribers.fromLineSubscriber( + new ResponseLineSubscriber(sink, responseInfo), ResponseLineSubscriber::getErrorMessage, charsetFrom(responseInfo.headers()), null); client.sendAsync(jdkRequest, responseBodyHandler) - .whenComplete((response, throwable) -> { - if (throwable != null) { - //e.g. java.util.concurrent.CompletionException: java.net.ConnectException - sink.tryEmitError(throwable); - } else { - int statusCode = response.statusCode(); - log.debug("Received HTTP response with status code {}", statusCode); - if (statusCode >= 200 && statusCode < 300) { - //ignore successful - return; - } - String errorBody = response.body(); - log.warn("HTTP request failed. URL: {} | Status: {} | Error: {}", - request.getUrl(), - statusCode, - errorBody); - - } - }); - Flux lineFlux = sink.asFlux() - .onErrorMap( - e -> !(e instanceof HttpTransportException), - e -> { - Throwable cause = e instanceof CompletionException ? e.getCause() : e; - if (cause instanceof HttpTransportException hte) { - return hte; + .whenComplete( + (response, throwable) -> { + if (throwable != null) { + // e.g. java.util.concurrent.CompletionException: + // java.net.ConnectException + sink.tryEmitError(throwable); + } else { + int statusCode = response.statusCode(); + log.debug("Received HTTP response with status code {}", statusCode); + if (statusCode >= 200 && statusCode < 300) { + // ignore successful + return; + } + String errorBody = response.body(); + log.warn( + "HTTP request failed. URL: {} | Status: {} | Error: {}", + request.getUrl(), + statusCode, + errorBody); } - return new HttpTransportException( - "SSE/NDJSON stream failed: " + e.getMessage(), e); }); + Flux lineFlux = + sink.asFlux() + .onErrorMap( + e -> !(e instanceof HttpTransportException), + e -> { + Throwable cause = + e instanceof CompletionException ? e.getCause() : e; + if (cause instanceof HttpTransportException hte) { + return hte; + } + return new HttpTransportException( + "SSE/NDJSON stream failed: " + e.getMessage(), e); + }); boolean isNdjson = - TransportConstants.STREAM_FORMAT_NDJSON.equals(request.getHeaders().get(TransportConstants.STREAM_FORMAT_HEADER)); + TransportConstants.STREAM_FORMAT_NDJSON.equals( + request.getHeaders().get(TransportConstants.STREAM_FORMAT_HEADER)); if (isNdjson) { - return lineFlux - .doOnNext(line -> log.debug("Received NDJSON line:{}", line)) + return lineFlux.doOnNext(line -> log.debug("Received NDJSON line:{}", line)) .filter(line -> !line.isEmpty()); } return lineFlux.filter(line -> line.startsWith(SSE_DATA_PREFIX)) @@ -278,7 +282,6 @@ private static Charset charsetFrom(HttpHeaders headers) { } } - @Override public void close() { closed.set(true); @@ -490,8 +493,8 @@ private static class ResponseLineSubscriber implements Flow.Subscriber { private final StringJoiner errorBodyJoiner = new StringJoiner("\n"); - ResponseLineSubscriber(Sinks.Many sink, - java.net.http.HttpResponse.ResponseInfo responseInfo) { + ResponseLineSubscriber( + Sinks.Many sink, java.net.http.HttpResponse.ResponseInfo responseInfo) { this.sink = Objects.requireNonNull(sink, "sink must not be null"); this.responseInfo = responseInfo; } @@ -525,13 +528,14 @@ public void onComplete() { sink.tryEmitComplete(); } else { int statusCode = responseInfo.statusCode(); - HttpTransportException ex = new HttpTransportException( - "HTTP request failed with status " - + statusCode - + " | " - + errorBodyJoiner, - statusCode, - errorBodyJoiner.toString()); + HttpTransportException ex = + new HttpTransportException( + "HTTP request failed with status " + + statusCode + + " | " + + errorBodyJoiner, + statusCode, + errorBodyJoiner.toString()); sink.tryEmitError(ex); } } From 4df665a5d6a95eb37673ac9674690fa835643842 Mon Sep 17 00:00:00 2001 From: icodening Date: Sat, 21 Mar 2026 19:31:12 +0800 Subject: [PATCH 3/3] refactor(http): non-blocking transport for JdkHttpTransport --- .../model/transport/JdkHttpTransport.java | 98 ++++++++++++------- 1 file changed, 60 insertions(+), 38 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java index 15bc7d74e..1111ab24d 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java @@ -42,13 +42,14 @@ import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; -import reactor.core.publisher.Sinks; +import reactor.core.publisher.FluxSink; /** * Pure JDK implementation of the HttpTransport interface. @@ -195,38 +196,59 @@ public Flux stream(HttpRequest request) { var jdkRequest = buildJdkRequest(request); - Sinks.Many sink = Sinks.many().multicast().onBackpressureBuffer(); - java.net.http.HttpResponse.BodyHandler responseBodyHandler = - responseInfo -> - java.net.http.HttpResponse.BodySubscribers.fromLineSubscriber( - new ResponseLineSubscriber(sink, responseInfo), - ResponseLineSubscriber::getErrorMessage, - charsetFrom(responseInfo.headers()), - null); - client.sendAsync(jdkRequest, responseBodyHandler) - .whenComplete( - (response, throwable) -> { - if (throwable != null) { - // e.g. java.util.concurrent.CompletionException: - // java.net.ConnectException - sink.tryEmitError(throwable); - } else { - int statusCode = response.statusCode(); - log.debug("Received HTTP response with status code {}", statusCode); - if (statusCode >= 200 && statusCode < 300) { - // ignore successful - return; - } - String errorBody = response.body(); - log.warn( - "HTTP request failed. URL: {} | Status: {} | Error: {}", - request.getUrl(), - statusCode, - errorBody); - } - }); Flux lineFlux = - sink.asFlux() + Flux.create( + (Consumer>) + sink -> { + java.net.http.HttpResponse.BodyHandler + responseBodyHandler = + responseInfo -> + java.net.http.HttpResponse + .BodySubscribers + .fromLineSubscriber( + new ResponseLineSubscriber( + sink, + responseInfo), + ResponseLineSubscriber + ::getErrorMessage, + charsetFrom( + responseInfo + .headers()), + null); + client.sendAsync(jdkRequest, responseBodyHandler) + .whenComplete( + (response, throwable) -> { + if (throwable != null) { + // e.g. + // java.util.concurrent.CompletionException: + // java.net.ConnectException + sink.error(throwable); + } else { + int statusCode = + response.statusCode(); + log.debug( + "Received HTTP response" + + " with status" + + " code {}", + statusCode); + if (statusCode >= 200 + && statusCode < 300) { + // ignore successful + return; + } + String errorBody = + response.body(); + log.warn( + "HTTP request failed." + + " URL: {} |" + + " Status: {} |" + + " Error: {}", + request.getUrl(), + statusCode, + errorBody); + } + }); + }) .onErrorMap( e -> !(e instanceof HttpTransportException), e -> { @@ -487,14 +509,14 @@ private static class ResponseLineSubscriber implements Flow.Subscriber { private final AtomicReference subscriptionRef = new AtomicReference<>(); - private final Sinks.Many sink; + private final FluxSink sink; private final java.net.http.HttpResponse.ResponseInfo responseInfo; private final StringJoiner errorBodyJoiner = new StringJoiner("\n"); ResponseLineSubscriber( - Sinks.Many sink, java.net.http.HttpResponse.ResponseInfo responseInfo) { + FluxSink sink, java.net.http.HttpResponse.ResponseInfo responseInfo) { this.sink = Objects.requireNonNull(sink, "sink must not be null"); this.responseInfo = responseInfo; } @@ -511,7 +533,7 @@ public void onSubscribe(Flow.Subscription subscription) { @Override public void onNext(String item) { if (is2xxSuccessful()) { - sink.tryEmitNext(item); + sink.next(item); } else { errorBodyJoiner.add(item); } @@ -519,13 +541,13 @@ public void onNext(String item) { @Override public void onError(Throwable throwable) { - sink.tryEmitError(throwable); + sink.error(throwable); } @Override public void onComplete() { if (is2xxSuccessful()) { - sink.tryEmitComplete(); + sink.complete(); } else { int statusCode = responseInfo.statusCode(); HttpTransportException ex = @@ -536,7 +558,7 @@ public void onComplete() { + errorBodyJoiner, statusCode, errorBodyJoiner.toString()); - sink.tryEmitError(ex); + sink.error(ex); } }