diff --git a/hyper/common/connection.py b/hyper/common/connection.py index dee18d68..507a8ad7 100644 --- a/hyper/common/connection.py +++ b/hyper/common/connection.py @@ -62,7 +62,8 @@ def __init__(self, self._port = port self._h1_kwargs = { 'secure': secure, 'ssl_context': ssl_context, - 'proxy_host': proxy_host, 'proxy_port': proxy_port + 'proxy_host': proxy_host, 'proxy_port': proxy_port, + 'enable_push': enable_push } self._h2_kwargs = { 'window_manager': window_manager, 'enable_push': enable_push, diff --git a/hyper/http11/connection.py b/hyper/http11/connection.py index 48bde2f0..8025c740 100644 --- a/hyper/http11/connection.py +++ b/hyper/http11/connection.py @@ -78,6 +78,7 @@ def __init__(self, host, port=None, secure=None, ssl_context=None, # only send http upgrade headers for non-secure connection self._send_http_upgrade = not self.secure + self._enable_push = kwargs.get('enable_push') self.ssl_context = ssl_context self._sock = None @@ -276,6 +277,10 @@ def _add_upgrade_headers(self, headers): # Settings header. http2_settings = SettingsFrame(0) http2_settings.settings[SettingsFrame.INITIAL_WINDOW_SIZE] = 65535 + if self._enable_push is not None: + http2_settings.settings[SettingsFrame.ENABLE_PUSH] = ( + int(self._enable_push) + ) encoded_settings = base64.urlsafe_b64encode( http2_settings.serialize_body() ) diff --git a/test/test_abstraction.py b/test/test_abstraction.py index cd0e0645..7c2cad1a 100644 --- a/test/test_abstraction.py +++ b/test/test_abstraction.py @@ -19,6 +19,7 @@ def test_h1_kwargs(self): 'proxy_host': False, 'proxy_port': False, 'other_kwarg': True, + 'enable_push': True, } def test_h2_kwargs(self): diff --git a/test/test_hyper.py b/test/test_hyper.py index 6a18d592..66ff6f42 100644 --- a/test/test_hyper.py +++ b/test/test_hyper.py @@ -9,6 +9,7 @@ PingFrame, FRAME_MAX_ALLOWED_LEN ) from hpack.hpack_compat import Encoder +from hyper.common.connection import HTTPConnection from hyper.http20.connection import HTTP20Connection from hyper.http20.response import HTTP20Response, HTTP20Push from hyper.http20.exceptions import ConnectionError, StreamResetError @@ -699,7 +700,7 @@ def test_incrementing_window_after_close(self): assert len(originally_sent_data) + 1 == len(c._sock.queue) -class TestServerPush(object): +class FrameEncoderMixin(object): def setup_method(self, method): self.frames = [] self.encoder = Encoder() @@ -731,8 +732,10 @@ def add_data_frame(self, stream_id, data, end_stream=False): frame.flags.add('END_STREAM') self.frames.append(frame) - def request(self): - self.conn = HTTP20Connection('www.google.com', enable_push=True) + +class TestServerPush(FrameEncoderMixin): + def request(self, enable_push=True): + self.conn = HTTP20Connection('www.google.com', enable_push=enable_push) self.conn._sock = DummySocket() self.conn._sock.buffer = BytesIO( b''.join([frame.serialize() for frame in self.frames]) @@ -934,8 +937,7 @@ def test_reset_pushed_streams_when_push_disabled(self): 1, [(':status', '200'), ('content-type', 'text/html')] ) - self.request() - self.conn._enable_push = False + self.request(False) self.conn.get_response() f = RstStreamFrame(2) @@ -1303,6 +1305,158 @@ def test_resetting_streams_after_close(self): c._single_read() +class TestUpgradingPush(FrameEncoderMixin): + http101 = (b"HTTP/1.1 101 Switching Protocols\r\n" + b"Connection: upgrade\r\n" + b"Upgrade: h2c\r\n" + b"\r\n") + + def request(self, enable_push=True): + self.frames = [SettingsFrame(0)] + self.frames # Server side preface + self.conn = HTTPConnection('www.google.com', enable_push=enable_push) + self.conn._conn._sock = DummySocket() + self.conn._conn._sock.buffer = BytesIO( + self.http101 + b''.join([frame.serialize() + for frame in self.frames]) + ) + self.conn.request('GET', '/') + + def assert_response(self): + self.response = self.conn.get_response() + assert self.response.status == 200 + assert dict(self.response.headers) == {b'content-type': [b'text/html']} + + def assert_pushes(self): + self.pushes = list(self.conn.get_pushes()) + assert len(self.pushes) == 1 + assert self.pushes[0].method == b'GET' + assert self.pushes[0].scheme == b'http' + assert self.pushes[0].authority == b'www.google.com' + assert self.pushes[0].path == b'/' + expected_headers = {b'accept-encoding': [b'gzip']} + assert dict(self.pushes[0].request_headers) == expected_headers + + def assert_push_response(self): + push_response = self.pushes[0].get_response() + assert push_response.status == 200 + assert dict(push_response.headers) == { + b'content-type': [b'application/javascript'] + } + assert push_response.read() == b'bar' + + def test_promise_before_headers(self): + # Current implementation only support get_pushes call + # after get_response + pass + + def test_promise_after_headers(self): + self.add_headers_frame( + 1, [(':status', '200'), ('content-type', 'text/html')] + ) + self.add_push_frame( + 1, + 2, + [ + (':method', 'GET'), + (':path', '/'), + (':authority', 'www.google.com'), + (':scheme', 'http'), + ('accept-encoding', 'gzip') + ] + ) + self.add_data_frame(1, b'foo', end_stream=True) + self.add_headers_frame( + 2, [(':status', '200'), ('content-type', 'application/javascript')] + ) + self.add_data_frame(2, b'bar', end_stream=True) + + self.request() + self.assert_response() + assert self.response.read() == b'foo' + self.assert_pushes() + self.assert_push_response() + + def test_promise_after_data(self): + self.add_headers_frame( + 1, [(':status', '200'), ('content-type', 'text/html')] + ) + self.add_data_frame(1, b'fo') + self.add_push_frame( + 1, + 2, + [ + (':method', 'GET'), + (':path', '/'), + (':authority', 'www.google.com'), + (':scheme', 'http'), + ('accept-encoding', 'gzip') + ] + ) + self.add_data_frame(1, b'o', end_stream=True) + self.add_headers_frame( + 2, [(':status', '200'), ('content-type', 'application/javascript')] + ) + self.add_data_frame(2, b'bar', end_stream=True) + + self.request() + self.assert_response() + assert self.response.read() == b'foo' + self.assert_pushes() + self.assert_push_response() + + def test_capture_all_promises(self): + # Current implementation does not support capture_all + # for h2c upgrading connection. + pass + + def test_cancel_push(self): + self.add_push_frame( + 1, + 2, + [ + (':method', 'GET'), + (':path', '/'), + (':authority', 'www.google.com'), + (':scheme', 'http'), + ('accept-encoding', 'gzip') + ] + ) + self.add_headers_frame( + 1, [(':status', '200'), ('content-type', 'text/html')] + ) + + self.request() + self.conn.get_response() + list(self.conn.get_pushes())[0].cancel() + + f = RstStreamFrame(2) + f.error_code = 8 + assert self.conn._sock.queue[-1] == f.serialize() + + def test_reset_pushed_streams_when_push_disabled(self): + self.add_push_frame( + 1, + 2, + [ + (':method', 'GET'), + (':path', '/'), + (':authority', 'www.google.com'), + (':scheme', 'http'), + ('accept-encoding', 'gzip') + ] + ) + self.add_headers_frame( + 1, [(':status', '200'), ('content-type', 'text/html')] + ) + + self.request(False) + self.conn.get_response() + + f = RstStreamFrame(2) + f.error_code = 7 + assert self.conn._sock.queue[-1].endswith(f.serialize()) + + # Some utility classes for the tests. class NullEncoder(object): @staticmethod diff --git a/test/test_integration.py b/test/test_integration.py index 96a8af76..72da6156 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -12,6 +12,7 @@ import hyper import hyper.http11.connection import pytest +from contextlib import contextmanager from mock import patch from h2.frame_buffer import FrameBuffer from hyper.compat import ssl @@ -64,17 +65,31 @@ def frame_buffer(): return buffer +@contextmanager +def reusable_frame_buffer(buffer): + # FrameBuffer does not return new iterator for iteration. + data = buffer.data + yield buffer + buffer.data = data + + def receive_preamble(sock): # Receive the HTTP/2 'preamble'. - first = sock.recv(65535) + client_preface = b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n' + got = b'' + while len(got) < len(client_preface): + tmp = sock.recv(len(client_preface) - len(got)) + assert len(tmp) > 0, "unexpected EOF" + got += tmp + + assert got == client_preface, "client preface mismatch" - # Work around some bugs: if the first message received was only the PRI - # string, aim to receive a settings frame as well. - if len(first) <= len(b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'): - sock.recv(65535) + # Send server side HTTP/2 preface sock.send(SettingsFrame(0).serialize()) - sock.recv(65535) - return + # Drain to let the client proceed. + # Note that in the lower socket level, this method is not + # just doing "receive". + return sock.recv(65535) @patch('hyper.http20.connection.H2_NPN_PROTOCOLS', PROTOCOLS) @@ -138,7 +153,7 @@ def socket_handler(listener): self._start_server(socket_handler) conn = self.get_connection() conn.connect() - send_event.wait() + send_event.wait(5) # Get the chunk of data after the preamble and decode it into frames. # We actually expect two, but only the second one contains ENABLE_PUSH. @@ -241,9 +256,9 @@ def socket_handler(listener): # We need to send back a SettingsFrame. f = SettingsFrame(0) sock.send(f.serialize()) - - send_event.wait() sock.recv(65535) + + send_event.wait(5) sock.close() self._start_server(socket_handler) @@ -260,6 +275,7 @@ def socket_handler(listener): def test_closed_responses_remove_their_streams_from_conn(self): self.set_up() + req_event = threading.Event() recv_event = threading.Event() def socket_handler(listener): @@ -270,24 +286,27 @@ def socket_handler(listener): receive_preamble(sock) sock.recv(65535) + # Wait for request + req_event.wait(5) # Now, send the headers for the response. f = build_headers_frame([(':status', '200')]) f.stream_id = 1 sock.send(f.serialize()) # Wait for the message from the main thread. - recv_event.set() + recv_event.wait(5) sock.close() self._start_server(socket_handler) conn = self.get_connection() conn.request('GET', '/') + req_event.set() resp = conn.get_response() # Close the response. resp.close() - recv_event.wait(5) + recv_event.set() assert not conn.streams @@ -296,6 +315,7 @@ def socket_handler(listener): def test_receiving_responses_with_no_body(self): self.set_up() + req_event = threading.Event() recv_event = threading.Event() def socket_handler(listener): @@ -306,6 +326,8 @@ def socket_handler(listener): receive_preamble(sock) sock.recv(65535) + # Wait for request + req_event.wait(5) # Now, send the headers for the response. This response has no body f = build_headers_frame( [(':status', '204'), ('content-length', '0')] @@ -315,12 +337,13 @@ def socket_handler(listener): sock.send(f.serialize()) # Wait for the message from the main thread. - recv_event.set() + recv_event.wait(5) sock.close() self._start_server(socket_handler) conn = self.get_connection() conn.request('GET', '/') + req_event.set() resp = conn.get_response() # Confirm the status code. @@ -331,13 +354,13 @@ def socket_handler(listener): assert resp._stream._in_window_manager.document_size == 0 # Awesome, we're done now. - recv_event.wait(5) - + recv_event.set() self.tear_down() def test_receiving_trailers(self): self.set_up() + req_event = threading.Event() recv_event = threading.Event() def socket_handler(listener): @@ -350,6 +373,8 @@ def socket_handler(listener): receive_preamble(sock) sock.recv(65535) + # Wait for request + req_event.wait(5) # Now, send the headers for the response. f = build_headers_frame( [(':status', '200'), ('content-length', '14')], @@ -372,12 +397,13 @@ def socket_handler(listener): sock.send(f.serialize()) # Wait for the message from the main thread. - recv_event.set() + recv_event.wait(5) sock.close() self._start_server(socket_handler) conn = self.get_connection() conn.request('GET', '/') + req_event.set() resp = conn.get_response() # Confirm the status code. @@ -396,15 +422,15 @@ def socket_handler(listener): assert len(resp.trailers) == 2 # Awesome, we're done now. - recv_event.wait(5) - + recv_event.set() self.tear_down() def test_receiving_trailers_before_reading(self): self.set_up() - recv_event = threading.Event() + req_event = threading.Event() wait_event = threading.Event() + recv_event = threading.Event() def socket_handler(listener): sock = listener.accept()[0] @@ -416,6 +442,8 @@ def socket_handler(listener): receive_preamble(sock) sock.recv(65535) + # Wait for request + req_event.wait(5) # Now, send the headers for the response. f = build_headers_frame( [(':status', '200'), ('content-length', '14')], @@ -443,12 +471,13 @@ def socket_handler(listener): sock.send(f.serialize()) # Wait for the message from the main thread. - recv_event.set() + recv_event.wait(5) sock.close() self._start_server(socket_handler) conn = self.get_connection() conn.request('GET', '/') + req_event.set() resp = conn.get_response() # Confirm the status code. @@ -470,8 +499,7 @@ def socket_handler(listener): assert resp._stream._in_window_manager.document_size == 14 # Awesome, we're done now. - recv_event.wait(5) - + recv_event.set() self.tear_down() def test_clean_shut_down(self): @@ -492,7 +520,7 @@ def socket_handler(listener): sock.send(f.serialize()) # Wait for the message from the main thread. - recv_event.set() + recv_event.wait(5) sock.close() self._start_server(socket_handler) @@ -503,8 +531,7 @@ def socket_handler(listener): assert conn._sock is None # Awesome, we're done now. - recv_event.wait(5) - + recv_event.set() self.tear_down() def test_unexpected_shut_down(self): @@ -525,8 +552,8 @@ def socket_handler(listener): sock.send(f.serialize()) # Wait for the message from the main thread. + recv_event.wait(5) sock.close() - recv_event.set() self._start_server(socket_handler) conn = self.get_connection() @@ -538,15 +565,15 @@ def socket_handler(listener): assert conn._sock is None # Awesome, we're done now. - recv_event.wait(5) - + recv_event.set() self.tear_down() def test_insecure_connection(self): self.set_up(secure=False) data = [] - send_event = threading.Event() + req_event = threading.Event() + recv_event = threading.Event() def socket_handler(listener): sock = listener.accept()[0] @@ -554,7 +581,7 @@ def socket_handler(listener): receive_preamble(sock) data.append(sock.recv(65535)) - send_event.wait(5) + req_event.wait(5) h = HeadersFrame(1) h.data = self.get_encoder().encode( @@ -573,12 +600,13 @@ def socket_handler(listener): d.flags.add('END_STREAM') sock.send(d.serialize()) + recv_event.wait(5) sock.close() self._start_server(socket_handler) c = self.get_connection() c.request('GET', '/') - send_event.set() + req_event.set() r = c.get_response() assert r.status == 200 @@ -589,13 +617,15 @@ def socket_handler(listener): assert r.read() == b'nsaislistening' + recv_event.set() self.tear_down() def test_proxy_connection(self): self.set_up(proxy=True) data = [] - send_event = threading.Event() + req_event = threading.Event() + recv_event = threading.Event() def socket_handler(listener): sock = listener.accept()[0] @@ -603,7 +633,7 @@ def socket_handler(listener): receive_preamble(sock) data.append(sock.recv(65535)) - send_event.wait(5) + req_event.wait(5) h = HeadersFrame(1) h.data = self.get_encoder().encode( @@ -622,12 +652,13 @@ def socket_handler(listener): d.flags.add('END_STREAM') sock.send(d.serialize()) + recv_event.wait(5) sock.close() self._start_server(socket_handler) c = self.get_connection() c.request('GET', '/') - send_event.set() + req_event.set() r = c.get_response() assert r.status == 200 @@ -638,6 +669,7 @@ def socket_handler(listener): assert r.read() == b'thisisaproxy' + recv_event.set() self.tear_down() def test_resetting_stream_with_frames_in_flight(self): @@ -647,6 +679,7 @@ def test_resetting_stream_with_frames_in_flight(self): """ self.set_up() + req_event = threading.Event() recv_event = threading.Event() def socket_handler(listener): @@ -657,6 +690,8 @@ def socket_handler(listener): receive_preamble(sock) sock.recv(65535) + # Wait for request + req_event.wait(5) # Now, send the headers for the response. This response has no # body. f = build_headers_frame( @@ -673,6 +708,7 @@ def socket_handler(listener): self._start_server(socket_handler) conn = self.get_connection() stream_id = conn.request('GET', '/') + req_event.set() # Now, trigger the RST_STREAM frame by closing the stream. conn._send_rst_frame(stream_id, 0) @@ -686,7 +722,6 @@ def socket_handler(listener): # Awesome, we're done now. recv_event.set() - self.tear_down() def test_stream_can_be_reset_multiple_times(self): @@ -696,6 +731,7 @@ def test_stream_can_be_reset_multiple_times(self): """ self.set_up() + req_event = threading.Event() recv_event = threading.Event() def socket_handler(listener): @@ -706,6 +742,8 @@ def socket_handler(listener): receive_preamble(sock) sock.recv(65535) + # Wait for request + req_event.wait(5) # Now, send two RST_STREAM frames. for _ in range(0, 2): f = RstStreamFrame(1) @@ -718,6 +756,7 @@ def socket_handler(listener): self._start_server(socket_handler) conn = self.get_connection() conn.request('GET', '/') + req_event.set() # Now, eat the Rst frames. These should not cause an exception. conn._single_read() @@ -737,6 +776,7 @@ def socket_handler(listener): def test_read_chunked_http2(self): self.set_up() + req_event = threading.Event() recv_event = threading.Event() wait_event = threading.Event() @@ -748,6 +788,8 @@ def socket_handler(listener): receive_preamble(sock) sock.recv(65535) + # Wait for request + req_event.wait(5) # Now, send the headers for the response. This response has a body. f = build_headers_frame([(':status', '200')]) f.stream_id = 1 @@ -771,12 +813,13 @@ def socket_handler(listener): sock.sendall(f.serialize()) # Wait for the message from the main thread. - recv_event.set() + recv_event.wait(5) sock.close() self._start_server(socket_handler) conn = self.get_connection() conn.request('GET', '/') + req_event.set() resp = conn.get_response() # Confirm the status code. @@ -798,15 +841,16 @@ def socket_handler(listener): assert third_chunk == b'world' # Awesome, we're done now. - recv_event.wait(5) + recv_event.set() self.tear_down() def test_read_delayed(self): self.set_up() - recv_event = threading.Event() + req_event = threading.Event() wait_event = threading.Event() + recv_event = threading.Event() def socket_handler(listener): sock = listener.accept()[0] @@ -816,6 +860,8 @@ def socket_handler(listener): receive_preamble(sock) sock.recv(65535) + # Wait for request + req_event.wait(5) # Now, send the headers for the response. This response has a body. f = build_headers_frame([(':status', '200')]) f.stream_id = 1 @@ -839,12 +885,13 @@ def socket_handler(listener): sock.sendall(f.serialize()) # Wait for the message from the main thread. - recv_event.set() + recv_event.wait(5) sock.close() self._start_server(socket_handler) conn = self.get_connection() conn.request('GET', '/') + req_event.set() resp = conn.get_response() # Confirm the status code. @@ -858,15 +905,14 @@ def socket_handler(listener): assert second_chunk == b'world' # Awesome, we're done now. - recv_event.wait(5) - + recv_event.set() self.tear_down() def test_upgrade(self): self.set_up(secure=False) - recv_event = threading.Event() wait_event = threading.Event() + recv_event = threading.Event() def socket_handler(listener): sock = listener.accept()[0] @@ -915,7 +961,7 @@ def socket_handler(listener): sock.sendall(f.serialize()) # Wait for the message from the main thread. - recv_event.set() + recv_event.wait(5) sock.close() self._start_server(socket_handler) @@ -934,8 +980,7 @@ def socket_handler(listener): assert second_chunk == b'world' # Awesome, we're done now. - recv_event.wait(5) - + recv_event.set() self.tear_down() def test_version_after_tls_upgrade(self, monkeypatch): @@ -951,13 +996,16 @@ def wrap(*args): monkeypatch.setattr(hyper.http11.connection, 'wrap_socket', wrap) - send_event = threading.Event() + req_event = threading.Event() + recv_event = threading.Event() def socket_handler(listener): sock = listener.accept()[0] receive_preamble(sock) + # Wait for the request + req_event.wait(5) # Send the headers for the response. This response has no body. f = build_headers_frame( [(':status', '200'), ('content-length', '0')] @@ -966,8 +1014,8 @@ def socket_handler(listener): f.stream_id = 1 sock.sendall(f.serialize()) - # Wait for the message from the main thread. - send_event.wait() + # wait for the message from the main thread + recv_event.wait(5) sock.close() self._start_server(socket_handler) @@ -976,16 +1024,18 @@ def socket_handler(listener): assert c.version is HTTPVersion.http11 assert c.version is not HTTPVersion.http20 c.request('GET', '/') - send_event.set() + req_event.set() assert c.version is HTTPVersion.http20 + recv_event.set() self.tear_down() def test_version_after_http_upgrade(self): self.set_up() self.secure = False - send_event = threading.Event() + req_event = threading.Event() + recv_event = threading.Event() def socket_handler(listener): sock = listener.accept()[0] @@ -996,7 +1046,7 @@ def socket_handler(listener): data += sock.recv(65535) assert b'upgrade: h2c\r\n' in data - send_event.wait() + req_event.wait(5) # We need to send back a response. resp = ( @@ -1020,15 +1070,22 @@ def socket_handler(listener): f.flags.add('END_STREAM') sock.sendall(f.serialize()) + # keep the socket open for clean shutdown + recv_event.wait(5) + sock.close() + self._start_server(socket_handler) c = hyper.HTTPConnection(self.host, self.port) assert c.version is HTTPVersion.http11 + c.request('GET', '/') - send_event.set() + req_event.set() + resp = c.get_response() assert c.version is HTTPVersion.http20 assert resp.version is HTTPVersion.http20 + recv_event.set() self.tear_down() @@ -1038,7 +1095,7 @@ class TestRequestsAdapter(SocketLevelTest): # This uses HTTP/2. h2 = True - def test_adapter_received_values(self, monkeypatch): + def test_adapter_received_values(self, monkeypatch, frame_buffer): self.set_up() # We need to patch the ssl_wrap_socket method to ensure that we @@ -1051,17 +1108,22 @@ def wrap(*args): monkeypatch.setattr(hyper.http11.connection, 'wrap_socket', wrap) - data = [] - send_event = threading.Event() + recv_event = threading.Event() def socket_handler(listener): sock = listener.accept()[0] # Do the handshake: conn header, settings, send settings, recv ack. - receive_preamble(sock) + frame_buffer.add_data(receive_preamble(sock)) # Now expect some data. One headers frame. - data.append(sock.recv(65535)) + req_wait = True + while req_wait: + frame_buffer.add_data(sock.recv(65535)) + with reusable_frame_buffer(frame_buffer) as fr: + for f in fr: + if isinstance(f, HeadersFrame): + req_wait = False # Respond! h = HeadersFrame(1) @@ -1079,7 +1141,8 @@ def socket_handler(listener): d.flags.add('END_STREAM') sock.send(d.serialize()) - send_event.wait(5) + # keep the socket open for clean shutdown + recv_event.wait(5) sock.close() self._start_server(socket_handler) @@ -1093,11 +1156,10 @@ def socket_handler(listener): assert r.headers[b'Content-Type'] == b'not/real' assert r.content == b'1234567890' * 2 - send_event.set() - + recv_event.set() self.tear_down() - def test_adapter_sending_values(self, monkeypatch): + def test_adapter_sending_values(self, monkeypatch, frame_buffer): self.set_up() # We need to patch the ssl_wrap_socket method to ensure that we @@ -1110,17 +1172,22 @@ def wrap(*args): monkeypatch.setattr(hyper.http11.connection, 'wrap_socket', wrap) - data = [] + recv_event = threading.Event() def socket_handler(listener): sock = listener.accept()[0] # Do the handshake: conn header, settings, send settings, recv ack. - receive_preamble(sock) + frame_buffer.add_data(receive_preamble(sock)) # Now expect some data. One headers frame and one data frame. - data.append(sock.recv(65535)) - data.append(sock.recv(65535)) + req_wait = True + while req_wait: + frame_buffer.add_data(sock.recv(65535)) + with reusable_frame_buffer(frame_buffer) as fr: + for f in fr: + if isinstance(f, DataFrame): + req_wait = False # Respond! h = HeadersFrame(1) @@ -1138,6 +1205,8 @@ def socket_handler(listener): d.flags.add('END_STREAM') sock.send(d.serialize()) + # keep the socket open for clean shutdown + recv_event.wait(5) sock.close() self._start_server(socket_handler) @@ -1152,11 +1221,11 @@ def socket_handler(listener): # Assert about the sent values. assert r.status_code == 200 - f = decode_frame(data[0]) - assert isinstance(f, HeadersFrame) + frames = list(frame_buffer) + assert isinstance(frames[-2], HeadersFrame) - f = decode_frame(data[1]) - assert isinstance(f, DataFrame) - assert f.data == b'hi there' + assert isinstance(frames[-1], DataFrame) + assert frames[-1].data == b'hi there' + recv_event.set() self.tear_down()