Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
*/
package io.agentscope.core.model.transport;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.PasswordAuthentication;
import java.net.Proxy;
Expand All @@ -28,7 +25,9 @@
import java.net.http.HttpClient;
import java.net.http.HttpClient.Redirect;
import java.net.http.HttpClient.Version;
import java.net.http.HttpHeaders;
import java.net.http.HttpResponse.BodyHandlers;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
Expand All @@ -38,17 +37,19 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.StringJoiner;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.core.publisher.Sinks;

/**
* Pure JDK implementation of the HttpTransport interface.
Expand Down Expand Up @@ -195,86 +196,89 @@ public Flux<String> stream(HttpRequest request) {

var jdkRequest = buildJdkRequest(request);

// Check status code and read error body immediately when CompletableFuture completes
// to avoid stream being closed before we can read it
CompletableFuture<java.net.http.HttpResponse<InputStream>> future =
client.sendAsync(jdkRequest, BodyHandlers.ofInputStream())
.thenApply(
response -> {
int statusCode = response.statusCode();
if (statusCode < 200 || statusCode >= 300) {
// Read error body immediately while stream is still open
String errorBody = readInputStream(response.body());
log.warn(
"HTTP request failed. URL: {} | Status: {} | Error:"
+ " {}",
request.getUrl(),
statusCode,
errorBody);
throw new CompletionException(
new HttpTransportException(
"HTTP request failed with status "
+ statusCode
+ " | "
+ errorBody,
statusCode,
errorBody));
}
return response;
});

return Mono.fromCompletionStage(future)
.flatMapMany(response -> processStreamResponse(response, request))
.publishOn(Schedulers.boundedElastic())
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
java.net.http.HttpResponse.BodyHandler<String> responseBodyHandler =
responseInfo ->
java.net.http.HttpResponse.BodySubscribers.fromLineSubscriber(new ResponseLineSubscriber(sink, responseInfo),
ResponseLineSubscriber::getErrorMessage,
charsetFrom(responseInfo.headers()),
null);
client.sendAsync(jdkRequest, responseBodyHandler)
.whenComplete((response, throwable) -> {
if (throwable != null) {
//e.g. java.util.concurrent.CompletionException: java.net.ConnectException
sink.tryEmitError(throwable);
} else {
int statusCode = response.statusCode();
log.debug("Received HTTP response with status code {}", statusCode);
if (statusCode >= 200 && statusCode < 300) {
//ignore successful
return;
}
String errorBody = response.body();
log.warn("HTTP request failed. URL: {} | Status: {} | Error: {}",
request.getUrl(),
statusCode,
errorBody);

}
});
Flux<String> lineFlux = sink.asFlux()
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The streaming request isn’t tied to downstream cancellation: if a subscriber cancels the returned Flux, the JDK sendAsync/Flow pipeline will keep reading the response and buffering into the sink until completion/error. This can waste network/CPU and potentially grow the backpressure buffer. Consider wiring cancellation to cancel the CompletableFuture and/or the Flow.Subscription (and prefer a bounded buffer strategy).

Copilot uses AI. Check for mistakes.
.onErrorMap(
e -> !(e instanceof HttpTransportException),
e -> {
Throwable cause = e instanceof CompletionException ? e.getCause() : e;
if (cause instanceof HttpTransportException) {
return (HttpTransportException) cause;
if (cause instanceof HttpTransportException hte) {
return hte;
}
return new HttpTransportException(
"SSE/NDJSON stream failed: " + e.getMessage(), e);
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the onErrorMap branch, CompletionException is unwrapped into cause, but the new HttpTransportException message/cause still use the wrapper e. This can produce messages like ...failed: null and hide the real cause. Prefer using cause.getMessage() (fall back to cause.toString() if null) and pass cause as the exception cause.

Suggested change
return new HttpTransportException(
"SSE/NDJSON stream failed: " + e.getMessage(), e);
String causeMessage;
if (cause != null && cause.getMessage() != null) {
causeMessage = cause.getMessage();
} else if (cause != null) {
causeMessage = cause.toString();
} else if (e.getMessage() != null) {
causeMessage = e.getMessage();
} else {
causeMessage = e.toString();
}
return new HttpTransportException(
"SSE/NDJSON stream failed: " + causeMessage, cause);

Copilot uses AI. Check for mistakes.
})
.subscribeOn(Schedulers.boundedElastic());
}

private Flux<String> processStreamResponse(
java.net.http.HttpResponse<InputStream> response, HttpRequest request) {
InputStream inputStream = response.body();
if (inputStream == null) {
return Flux.empty();
}

// Check if the request has the NDJSON format header
});
boolean isNdjson =
TransportConstants.STREAM_FORMAT_NDJSON.equals(
request.getHeaders().get(TransportConstants.STREAM_FORMAT_HEADER));

// Use Flux.using to manage resource lifecycle
return Flux.using(
() ->
new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8)),
reader -> isNdjson ? readNdJsonLines(reader) : readSseLines(reader),
this::closeQuietly);
}

private Flux<String> readSseLines(BufferedReader reader) {
return Flux.fromStream(reader.lines())
.filter(line -> line.startsWith(SSE_DATA_PREFIX))
TransportConstants.STREAM_FORMAT_NDJSON.equals(request.getHeaders().get(TransportConstants.STREAM_FORMAT_HEADER));
if (isNdjson) {
return lineFlux
.doOnNext(line -> log.debug("Received NDJSON line:{}", line))
.filter(line -> !line.isEmpty());
}
return lineFlux.filter(line -> line.startsWith(SSE_DATA_PREFIX))
.map(line -> line.substring(SSE_DATA_PREFIX.length()).trim())
.takeWhile(data -> !SSE_DONE_MARKER.equals(data))
.doOnNext(data -> log.debug("Received SSE data chunk"))
.doOnNext(data -> log.debug("Received SSE data chunk:{}", data))
.filter(data -> !data.isEmpty());
}

private Flux<String> readNdJsonLines(BufferedReader reader) {
return Flux.fromStream(reader.lines())
.doOnNext(line -> log.debug("Received NDJSON line"))
.filter(line -> !line.isEmpty());
private static Charset charsetFrom(HttpHeaders headers) {
String type = headers.firstValue("Content-Type").orElse("text/html; charset=utf-8");
int i = type.indexOf(';');
if (i >= 0) {
type = type.substring(i + 1);
}
try {
String value = null;
for (String param : type.split(";")) {
String[] keyValue = param.split("=", 2);
if (keyValue.length != 2) {
continue;
}
if ("charset".equalsIgnoreCase(keyValue[0].trim())) {
value = keyValue[1].trim();
if (value.length() >= 2 && value.startsWith("\"") && value.endsWith("\"")) {
value = value.substring(1, value.length() - 1).trim();
}
break;
}
}
if (value == null || value.isEmpty()) {
return StandardCharsets.UTF_8;
}
return Charset.forName(value);
} catch (Exception x) {
return StandardCharsets.UTF_8;
}
}


@Override
public void close() {
closed.set(true);
Expand Down Expand Up @@ -370,28 +374,6 @@ private HttpResponse buildHttpResponse(java.net.http.HttpResponse<String> respon
return builder.build();
}

private String readInputStream(InputStream inputStream) {
if (inputStream == null) {
return null;
}
try (inputStream) {
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
} catch (IOException e) {
log.warn("Failed to read response body: {}", e.getMessage());
return null;
}
}

private void closeQuietly(AutoCloseable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (Exception e) {
log.debug("Error closing resource: {}", e.getMessage());
}
}
}

/**
* Create a new builder for JdkHttpTransport.
*
Expand Down Expand Up @@ -497,4 +479,69 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
log.warn("Proxy connection failed: uri={}, address={}", uri, sa, ioe);
}
}

private static class ResponseLineSubscriber implements Flow.Subscriber<String> {

private final AtomicReference<Flow.Subscription> subscriptionRef = new AtomicReference<>();

private final Sinks.Many<String> sink;

private final java.net.http.HttpResponse.ResponseInfo responseInfo;

private final StringJoiner errorBodyJoiner = new StringJoiner("\n");

ResponseLineSubscriber(Sinks.Many<String> sink,
java.net.http.HttpResponse.ResponseInfo responseInfo) {
this.sink = Objects.requireNonNull(sink, "sink must not be null");
this.responseInfo = responseInfo;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
if (subscriptionRef.compareAndSet(null, subscription)) {
subscription.request(Long.MAX_VALUE);
} else {
subscription.cancel();
}
}

@Override
public void onNext(String item) {
if (is2xxSuccessful()) {
sink.tryEmitNext(item);
} else {
errorBodyJoiner.add(item);
}
}

@Override
public void onError(Throwable throwable) {
sink.tryEmitError(throwable);
}

@Override
public void onComplete() {
if (is2xxSuccessful()) {
sink.tryEmitComplete();
} else {
int statusCode = responseInfo.statusCode();
HttpTransportException ex = new HttpTransportException(
"HTTP request failed with status "
+ statusCode
+ " | "
+ errorBodyJoiner,
statusCode,
errorBodyJoiner.toString());
sink.tryEmitError(ex);
}
}

public String getErrorMessage() {
return errorBodyJoiner.toString();
}

private boolean is2xxSuccessful() {
return responseInfo.statusCode() >= 200 && responseInfo.statusCode() < 300;
}
}
}
Loading