@@ -21,9 +21,6 @@ class Server extends Observable
2121
2222 protected $ port ;
2323
24- /** @var Subject */
25- protected $ connectionSubject ;
26-
2724 /** @var bool */
2825 private $ useMessageObject ;
2926
@@ -42,12 +39,10 @@ public function __construct($bindAddress, $port, $useMessageObject = false, arra
4239 $ this ->bindAddress = $ bindAddress ;
4340 $ this ->port = $ port ;
4441 $ this ->useMessageObject = $ useMessageObject ;
45-
46- $ this ->connectionSubject = new Subject ();
47- $ this ->subProtocols = $ subProtocols ;
42+ $ this ->subProtocols = $ subProtocols ;
4843 }
4944
50- private function startServer ( )
45+ public function subscribe ( ObserverInterface $ observer , $ scheduler = null )
5146 {
5247 $ socket = new \React \Socket \Server (\EventLoop \getLoop ());
5348
@@ -57,7 +52,7 @@ private function startServer()
5752 }
5853
5954 $ http = new \React \Http \Server ($ socket );
60- $ http ->on ('request ' , function (Request $ request , Response $ response ) use ($ negotiator ) {
55+ $ http ->on ('request ' , function (Request $ request , Response $ response ) use ($ negotiator, $ observer , & $ outStream ) {
6156 $ uri = new Uri ($ request ->getPath ());
6257 if (count ($ request ->getQuery ()) > 0 ) {
6358 $ uri = $ uri ->withQuery (\GuzzleHttp \Psr7 \build_query ($ request ->getQuery ()));
@@ -134,7 +129,7 @@ function () use ($response) {
134129 $ negotiatorResponse
135130 );
136131
137- $ this -> connectionSubject ->onNext ($ connection );
132+ $ observer ->onNext ($ connection );
138133 });
139134
140135 $ socket ->listen ($ this ->port , $ this ->bindAddress );
@@ -143,17 +138,12 @@ function () use ($response) {
143138// $http->on('data', function () {});
144139// $http->on('pause', function () {});
145140// $http->on('resume', function () {});
146- }
147141
148- public function subscribe (ObserverInterface $ observer , $ scheduler = null )
149- {
150- if (!$ this ->started ) {
151- $ this ->started = true ;
152-
153- $ this ->startServer ();
154- }
142+ $ this ->started = true ;
155143
156- return $ this ->connectionSubject ->subscribe ($ observer , $ scheduler );
144+ return new CallbackDisposable (function () use ($ socket ) {
145+ $ socket ->shutdown ();
146+ });
157147 }
158148
159149 protected function doStart ($ scheduler )
0 commit comments