Skip to content

Commit 1e257b8

Browse files
committed
fix: client logger timeout
1 parent eb55005 commit 1e257b8

File tree

2 files changed

+78
-3
lines changed

2 files changed

+78
-3
lines changed

src/McpServerManager.php

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,19 @@
66
use Mcp\Server\Session\Psr16StoreSession;
77
use Mcp\Server\Transport\CallbackStream;
88
use Mcp\Server\Transport\StdioTransport;
9-
use Mcp\Server\Transport\StreamableHttpTransport;
9+
use Luoyue\WebmanMcp\Server\StreamableHttpTransport;
1010
use Mcp\Server\Session\InMemorySessionStore;
1111
use Nyholm\Psr7\ServerRequest;
1212
use Psr\Http\Message\ResponseInterface;
13+
use Psr\Http\Message\StreamInterface;
1314
use Psr\Log\LoggerInterface;
1415
use Psr\Log\NullLogger;
1516
use support\Cache;
1617
use support\Container;
1718
use support\Log;
1819
use Webman\Http\Response;
1920
use Workerman\Connection\TcpConnection;
21+
use Workerman\Coroutine;
2022
use Workerman\Worker;
2123
use Generator;
2224
use function request;
@@ -130,8 +132,16 @@ private function handleHttpRequest(Server $server, string $serviceName): Respons
130132
/** @var ResponseInterface $response */
131133
$response = $server->run($transport);
132134

133-
$body = $response->getBody() instanceof CallbackStream ? "\r\n" : $response->getBody()->getContents();
134-
return response($body, $response->getStatusCode(), array_map('current', $response->getHeaders()));
135+
return response($this->getResponseBody($response->getBody()), $response->getStatusCode(), array_map('current', $response->getHeaders()));
136+
}
137+
138+
private function getResponseBody(StreamInterface $body): string
139+
{
140+
if($body instanceof CallbackStream) {
141+
Coroutine::defer($body->getContents(...));
142+
return "\r\n";
143+
}
144+
return $body->getContents();
135145
}
136146

137147
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
<?php
2+
3+
namespace Luoyue\WebmanMcp\Server;
4+
5+
use \Mcp\Server\Transport\StreamableHttpTransport as BaseStreamableHttpTransport;
6+
use Psr\Http\Message\ResponseFactoryInterface;
7+
use Psr\Http\Message\ServerRequestInterface;
8+
use Psr\Http\Message\StreamFactoryInterface;
9+
use Psr\Log\LoggerInterface;
10+
use Psr\Log\NullLogger;
11+
use Symfony\Component\Uid\Uuid;
12+
use Workerman\Connection\TcpConnection;
13+
use Workerman\Protocols\Http\ServerSentEvents;
14+
15+
class StreamableHttpTransport extends BaseStreamableHttpTransport
16+
{
17+
18+
private readonly TcpConnection $connection;
19+
20+
/**
21+
* @param array<string, string> $corsHeaders
22+
*/
23+
public function __construct(
24+
private readonly ServerRequestInterface $request,
25+
?ResponseFactoryInterface $responseFactory = null,
26+
?StreamFactoryInterface $streamFactory = null,
27+
array $corsHeaders = [],
28+
LoggerInterface $logger = new NullLogger(),
29+
) {
30+
$this->connection = $request->getAttribute(TcpConnection::class);
31+
parent::__construct($request, $responseFactory, $streamFactory, $corsHeaders, $logger);
32+
}
33+
34+
protected function handleFiberTermination(): void
35+
{
36+
$finalResult = $this->sessionFiber->getReturn();
37+
38+
if (null !== $finalResult) {
39+
try {
40+
$encoded = json_encode($finalResult, \JSON_THROW_ON_ERROR);
41+
$this->connection->send(new ServerSentEvents([
42+
'event' => 'message',
43+
'data' => $encoded,
44+
]));
45+
} catch (\JsonException $e) {
46+
$this->logger->error('SSE: Failed to encode final Fiber result.', ['exception' => $e]);
47+
}
48+
}
49+
50+
$this->sessionFiber = null;
51+
}
52+
53+
protected function flushOutgoingMessages(?Uuid $sessionId): void
54+
{
55+
$messages = $this->getOutgoingMessages($sessionId);
56+
57+
foreach ($messages as $message) {
58+
$this->connection->send(new ServerSentEvents([
59+
'event' => 'message',
60+
'data' => $message['message'],
61+
]));
62+
}
63+
}
64+
65+
}

0 commit comments

Comments
 (0)