diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java index f6a374fe3..1111ab24d 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java @@ -15,10 +15,7 @@ */ package io.agentscope.core.model.transport; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.net.PasswordAuthentication; import java.net.Proxy; @@ -28,7 +25,9 @@ import java.net.http.HttpClient; import java.net.http.HttpClient.Redirect; import java.net.http.HttpClient.Version; +import java.net.http.HttpHeaders; import java.net.http.HttpResponse.BodyHandlers; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; @@ -38,17 +37,19 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CompletableFuture; +import java.util.StringJoiner; import java.util.concurrent.CompletionException; +import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; +import reactor.core.publisher.FluxSink; /** * Pure JDK implementation of the HttpTransport interface. @@ -195,84 +196,112 @@ public Flux stream(HttpRequest request) { var jdkRequest = buildJdkRequest(request); - // Check status code and read error body immediately when CompletableFuture completes - // to avoid stream being closed before we can read it - CompletableFuture> future = - client.sendAsync(jdkRequest, BodyHandlers.ofInputStream()) - .thenApply( - response -> { - int statusCode = response.statusCode(); - if (statusCode < 200 || statusCode >= 300) { - // Read error body immediately while stream is still open - String errorBody = readInputStream(response.body()); - log.warn( - "HTTP request failed. URL: {} | Status: {} | Error:" - + " {}", - request.getUrl(), - statusCode, - errorBody); - throw new CompletionException( - new HttpTransportException( - "HTTP request failed with status " - + statusCode - + " | " - + errorBody, - statusCode, - errorBody)); + Flux lineFlux = + Flux.create( + (Consumer>) + sink -> { + java.net.http.HttpResponse.BodyHandler + responseBodyHandler = + responseInfo -> + java.net.http.HttpResponse + .BodySubscribers + .fromLineSubscriber( + new ResponseLineSubscriber( + sink, + responseInfo), + ResponseLineSubscriber + ::getErrorMessage, + charsetFrom( + responseInfo + .headers()), + null); + client.sendAsync(jdkRequest, responseBodyHandler) + .whenComplete( + (response, throwable) -> { + if (throwable != null) { + // e.g. + // java.util.concurrent.CompletionException: + // java.net.ConnectException + sink.error(throwable); + } else { + int statusCode = + response.statusCode(); + log.debug( + "Received HTTP response" + + " with status" + + " code {}", + statusCode); + if (statusCode >= 200 + && statusCode < 300) { + // ignore successful + return; + } + String errorBody = + response.body(); + log.warn( + "HTTP request failed." + + " URL: {} |" + + " Status: {} |" + + " Error: {}", + request.getUrl(), + statusCode, + errorBody); + } + }); + }) + .onErrorMap( + e -> !(e instanceof HttpTransportException), + e -> { + Throwable cause = + e instanceof CompletionException ? e.getCause() : e; + if (cause instanceof HttpTransportException hte) { + return hte; } - return response; + return new HttpTransportException( + "SSE/NDJSON stream failed: " + e.getMessage(), e); }); - - return Mono.fromCompletionStage(future) - .flatMapMany(response -> processStreamResponse(response, request)) - .publishOn(Schedulers.boundedElastic()) - .onErrorMap( - e -> !(e instanceof HttpTransportException), - e -> { - Throwable cause = e instanceof CompletionException ? e.getCause() : e; - if (cause instanceof HttpTransportException) { - return (HttpTransportException) cause; - } - return new HttpTransportException( - "SSE/NDJSON stream failed: " + e.getMessage(), e); - }) - .subscribeOn(Schedulers.boundedElastic()); - } - - private Flux processStreamResponse( - java.net.http.HttpResponse response, HttpRequest request) { - InputStream inputStream = response.body(); - if (inputStream == null) { - return Flux.empty(); - } - - // Check if the request has the NDJSON format header boolean isNdjson = TransportConstants.STREAM_FORMAT_NDJSON.equals( request.getHeaders().get(TransportConstants.STREAM_FORMAT_HEADER)); - - // Use Flux.using to manage resource lifecycle - return Flux.using( - () -> - new BufferedReader( - new InputStreamReader(inputStream, StandardCharsets.UTF_8)), - reader -> isNdjson ? readNdJsonLines(reader) : readSseLines(reader), - this::closeQuietly); - } - - private Flux readSseLines(BufferedReader reader) { - return Flux.fromStream(reader.lines()) - .filter(line -> line.startsWith(SSE_DATA_PREFIX)) + if (isNdjson) { + return lineFlux.doOnNext(line -> log.debug("Received NDJSON line:{}", line)) + .filter(line -> !line.isEmpty()); + } + return lineFlux.filter(line -> line.startsWith(SSE_DATA_PREFIX)) .map(line -> line.substring(SSE_DATA_PREFIX.length()).trim()) .takeWhile(data -> !SSE_DONE_MARKER.equals(data)) - .doOnNext(data -> log.debug("Received SSE data chunk")) + .doOnNext(data -> log.debug("Received SSE data chunk:{}", data)) .filter(data -> !data.isEmpty()); } - private Flux readNdJsonLines(BufferedReader reader) { - return Flux.fromStream(reader.lines()) - .doOnNext(line -> log.debug("Received NDJSON line")) - .filter(line -> !line.isEmpty()); + private static Charset charsetFrom(HttpHeaders headers) { + String type = headers.firstValue("Content-Type").orElse("text/html; charset=utf-8"); + int i = type.indexOf(';'); + if (i >= 0) { + type = type.substring(i + 1); + } + try { + String value = null; + for (String param : type.split(";")) { + String[] keyValue = param.split("=", 2); + if (keyValue.length != 2) { + continue; + } + if ("charset".equalsIgnoreCase(keyValue[0].trim())) { + value = keyValue[1].trim(); + if (value.length() >= 2 && value.startsWith("\"") && value.endsWith("\"")) { + value = value.substring(1, value.length() - 1).trim(); + } + break; + } + } + if (value == null || value.isEmpty()) { + return StandardCharsets.UTF_8; + } + return Charset.forName(value); + } catch (Exception x) { + return StandardCharsets.UTF_8; + } } @Override @@ -370,28 +399,6 @@ private HttpResponse buildHttpResponse(java.net.http.HttpResponse respon return builder.build(); } - private String readInputStream(InputStream inputStream) { - if (inputStream == null) { - return null; - } - try (inputStream) { - return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); - } catch (IOException e) { - log.warn("Failed to read response body: {}", e.getMessage()); - return null; - } - } - - private void closeQuietly(AutoCloseable closeable) { - if (closeable != null) { - try { - closeable.close(); - } catch (Exception e) { - log.debug("Error closing resource: {}", e.getMessage()); - } - } - } - /** * Create a new builder for JdkHttpTransport. * @@ -497,4 +504,70 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) { log.warn("Proxy connection failed: uri={}, address={}", uri, sa, ioe); } } + + private static class ResponseLineSubscriber implements Flow.Subscriber { + + private final AtomicReference subscriptionRef = new AtomicReference<>(); + + private final FluxSink sink; + + private final java.net.http.HttpResponse.ResponseInfo responseInfo; + + private final StringJoiner errorBodyJoiner = new StringJoiner("\n"); + + ResponseLineSubscriber( + FluxSink sink, java.net.http.HttpResponse.ResponseInfo responseInfo) { + this.sink = Objects.requireNonNull(sink, "sink must not be null"); + this.responseInfo = responseInfo; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + if (subscriptionRef.compareAndSet(null, subscription)) { + subscription.request(Long.MAX_VALUE); + } else { + subscription.cancel(); + } + } + + @Override + public void onNext(String item) { + if (is2xxSuccessful()) { + sink.next(item); + } else { + errorBodyJoiner.add(item); + } + } + + @Override + public void onError(Throwable throwable) { + sink.error(throwable); + } + + @Override + public void onComplete() { + if (is2xxSuccessful()) { + sink.complete(); + } else { + int statusCode = responseInfo.statusCode(); + HttpTransportException ex = + new HttpTransportException( + "HTTP request failed with status " + + statusCode + + " | " + + errorBodyJoiner, + statusCode, + errorBodyJoiner.toString()); + sink.error(ex); + } + } + + public String getErrorMessage() { + return errorBodyJoiner.toString(); + } + + private boolean is2xxSuccessful() { + return responseInfo.statusCode() >= 200 && responseInfo.statusCode() < 300; + } + } }