22
33namespace Rx \Websocket ;
44
5- use Exception ;
6- use Rx \ Websocket \RFC6455 \Handshake \ClientNegotiator ;
5+ use GuzzleHttp \ Psr7 \ Uri ;
6+ use Ratchet \RFC6455 \Handshake \ClientNegotiator ;
77use React \Dns \Resolver \Factory ;
88use React \HttpClient \Request ;
99use React \HttpClient \Response ;
1010use Rx \Disposable \CallbackDisposable ;
11+ use Rx \Observable ;
1112use Rx \Observable \AnonymousObservable ;
1213use Rx \Observer \CallbackObserver ;
1314use Rx \ObserverInterface ;
14- use Rx \Subject \ Subject ;
15+ use Rx \SchedulerInterface ;
1516
16- class Client extends Subject
17+ class Client extends Observable
1718{
1819 /** @var string */
1920 protected $ url ;
@@ -32,40 +33,46 @@ class Client extends Subject
3233 */
3334 public function __construct ($ url , $ useMessageObject = false , array $ subProtocols = [])
3435 {
35- $ this ->url = $ url ;
36+ $ this ->url = $ url ;
3637 $ this ->useMessageObject = $ useMessageObject ;
37- $ this ->subProtocols = $ subProtocols ;
38+ $ this ->subProtocols = $ subProtocols ;
3839 }
3940
40- private function startConnection ( )
41+ public function subscribe ( ObserverInterface $ clientObserver , $ scheduler = null )
4142 {
4243 $ loop = \EventLoop \getLoop ();
4344
4445 $ dnsResolverFactory = new Factory ();
46+
4547 $ dnsResolver = $ dnsResolverFactory ->createCached ('8.8.8.8 ' , $ loop );
4648
4749 $ factory = new \React \HttpClient \Factory ();
48- $ client = $ factory ->create ($ loop , $ dnsResolver );
50+ $ client = $ factory ->create ($ loop , $ dnsResolver );
51+
52+ $ cNegotiator = new ClientNegotiator ();
4953
50- $ cNegotiator = new ClientNegotiator ($ this ->url , $ this ->subProtocols );
54+ /** @var \GuzzleHttp\Psr7\Request $nRequest */
55+ $ nRequest = $ cNegotiator ->generateRequest (new Uri ($ this ->url ));
56+
57+ if (!empty ($ this ->subProtocols )) {
58+ $ nRequest = $ nRequest
59+ ->withoutHeader ('Sec-WebSocket-Protocol ' )
60+ ->withHeader ('Sec-WebSocket-Protocol ' , $ this ->subProtocols );
61+ }
5162
52- $ headers = $ cNegotiator -> getRequest () ->getHeaders ();
63+ $ headers = $ nRequest ->getHeaders ();
5364
5465 $ flatHeaders = [];
5566 foreach ($ headers as $ k => $ v ) {
5667 $ flatHeaders [$ k ] = $ v [0 ];
5768 }
5869
59- $ request = $ client ->request (" GET " , $ this ->url , $ flatHeaders , '1.1 ' );
70+ $ request = $ client ->request (' GET ' , $ this ->url , $ flatHeaders , '1.1 ' );
6071
61- $ request ->on ('response ' , function (Response $ response , Request $ request ) use ($ cNegotiator ) {
72+ $ request ->on ('response ' , function (Response $ response , Request $ request ) use ($ flatHeaders , $ cNegotiator, $ nRequest , $ clientObserver ) {
6273 if ($ response ->getCode () !== 101 ) {
63- throw new \Exception (" Unexpected response code " . $ response ->getCode ());
74+ throw new \Exception (' Unexpected response code ' . $ response ->getCode ());
6475 }
65- // TODO: Should validate response
66- //$cNegotiator->validateResponse($response);
67-
68- $ subprotoHeader = "" ;
6976
7077 $ psr7Response = new \GuzzleHttp \Psr7 \Response (
7178 $ response ->getCode (),
@@ -74,12 +81,17 @@ private function startConnection()
7481 $ response ->getVersion ()
7582 );
7683
77- if (count ($ psr7Response ->getHeader ('Sec-WebSocket-Protocol ' )) == 1 ) {
78- $ subprotoHeader = $ psr7Response ->getHeader ('Sec-WebSocket-Protocol ' )[0 ];
84+ $ psr7Request = new \GuzzleHttp \Psr7 \Request ('GET ' , $ this ->url , $ flatHeaders );
85+
86+ if (!$ cNegotiator ->validateResponse ($ psr7Request , $ psr7Response )) {
87+ throw new \Exception ('Invalid response ' );
7988 }
8089
81- parent ::onNext (new MessageSubject (
82- new AnonymousObservable (function (ObserverInterface $ observer ) use ($ response ) {
90+ $ subprotoHeader = $ psr7Response ->getHeader ('Sec-WebSocket-Protocol ' );
91+
92+ $ clientObserver ->onNext (new MessageSubject (
93+ new AnonymousObservable (function (ObserverInterface $ observer , SchedulerInterface $ scheduler ) use ($ response , $ clientObserver ) {
94+
8395 $ response ->on ('data ' , function ($ data ) use ($ observer ) {
8496 $ observer ->onNext ($ data );
8597 });
@@ -92,18 +104,19 @@ private function startConnection()
92104 $ observer ->onCompleted ();
93105 });
94106
95- $ response ->on ('end ' , function () use ($ observer ) {
107+ $ response ->on ('end ' , function () use ($ observer, $ clientObserver ) {
96108 $ observer ->onCompleted ();
97109
98110 // complete the parent observer - we only do 1 connection
99- parent :: onCompleted ();
111+ $ clientObserver -> onCompleted ();
100112 });
101113
102114
103115 return new CallbackDisposable (function () use ($ response ) {
104116 // commented this out because disposal was causing the other
105117 // end (the request) to close also - which causes the pending messages
106118 // to get tossed
119+ // maybe try with $response->end()?
107120 //$response->close();
108121 });
109122 }),
@@ -121,41 +134,15 @@ function () use ($request) {
121134 true ,
122135 $ this ->useMessageObject ,
123136 $ subprotoHeader ,
124- $ cNegotiator -> getRequest () ,
137+ $ nRequest ,
125138 $ psr7Response
126139 ));
127140 });
128141
129142 $ request ->writeHead ();
130- }
131-
132- public function subscribe (ObserverInterface $ observer , $ scheduler = null )
133- {
134- if (!$ this ->isStopped ) {
135- $ this ->startConnection ();
136- }
137-
138- return parent ::subscribe ($ observer , $ scheduler );
139- }
140-
141- public function send ($ value )
142- {
143- $ this ->onNext ($ value );
144- }
145-
146- // Not sure we need this object to be a subject - just being an observer should be good enough I think
147- public function onNext ($ value )
148- {
149-
150- }
151-
152- public function onError (Exception $ exception )
153- {
154-
155- }
156-
157- public function onCompleted ()
158- {
159143
144+ return new CallbackDisposable (function () use ($ request ) {
145+ $ request ->close ();
146+ });
160147 }
161148}
0 commit comments