From 8a60b010dfb50cf44f352e4869616b21ad7444f2 Mon Sep 17 00:00:00 2001 From: Aaron Gibson Date: Wed, 1 Feb 2023 21:55:51 -0800 Subject: [PATCH 1/5] First pass at adding StreamingMultipartFormDataParser. Still need to fix the tests and also improve Awaitable vs. None interface. --- tornado/httputil.py | 265 ++++++++++++++++++++++++++++++++++ tornado/test/httputil_test.py | 156 +++++++++++++++++++- 2 files changed, 419 insertions(+), 2 deletions(-) diff --git a/tornado/httputil.py b/tornado/httputil.py index 9c341d47cc..663815ea82 100644 --- a/tornado/httputil.py +++ b/tornado/httputil.py @@ -849,6 +849,271 @@ def parse_multipart_form_data( arguments.setdefault(name, []).append(value) +_BOUNDARY_REGEX = re.compile(r'boundary="?(?P[^"]+)"?') +"""Regex to match the boundary option.""" + +_1MB = 1048576 +"""Number of bytes in 1 Megabyte.""" + + +class AbstractFileDelegate: + + def start_file(self, name: str, headers: HTTPHeaders) -> Optional[Awaitable[None]]: + pass + + def file_data_received(self, name: str, data: bytes) -> Optional[Awaitable[None]]: + pass + + def finish_file(self, name: str) -> Optional[Awaitable[None]]: + pass + + +class ParserState: + + PARSE_BOUNDARY_LINE = 1 + """State that parses the initial boundary.""" + + PARSE_FILE_HEADERS = 2 + """State that parses the 'headers' for the next file/object.""" + + PARSE_BODY = 3 + """State that parses some body text.""" + + PARSING_DONE = 4 + """State that denotes the parser is finished.""" + + +class StreamingMultipartFormDataParser(object): + """Basic parser that accepts data and parses it into distinct files. + + This parser handles 'multipart/form-data' Content-Type uploads, which + permits multiple file uploads in a single request. + """ + + @classmethod + def from_content_type_header( + cls, delegate, header + ) -> "StreamingMultipartFormDataParser": + if isinstance(header, bytes): + header = header.decode('utf-8') + boundary = None + # Make sure the header is the multipart/form-data. + parts = [ + part.strip() + for part in header.split(';') + ] + if parts[0].lower() != "multipart/form-data": + raise ValueError("Invalid Content-Type: {}".format(parts[0])) + + # Search for 'boundary=' + for part in parts: + m = _BOUNDARY_REGEX.match(part) + if m: + boundary = m.group('boundary') + return cls(delegate, boundary) + raise ValueError("Required 'boundary' option not found in header!") + + + def __init__(self, delegate: AbstractFileDelegate, boundary: str, + max_header_bytes=_1MB): + # Be nice and decode the boundary if it is a bytes object. + if isinstance(boundary, bytes): + boundary = boundary.decode('utf-8') + # Store the delegate to write out the data. + self._delegate = delegate + self._boundary = boundary + self._max_buffer_size = max_header_bytes + self._name = None + self._info = None + + # Variables to store the current state of the parser. + self._state = ParserState.PARSE_BOUNDARY_LINE + self._buffer = bytearray() + + # Variables to hold the boundary matches. + self._boundary_next = '--{}\r\n'.format(self._boundary).encode() + self._boundary_end = '--{}--\r\n'.format(self._boundary).encode() + self._boundary_base = self._boundary_next[:-2] + + # Variables for caching boundary matching. + self._last_idx = 0 + self._boundary_idx = 0 + + @property + def boundary(self) -> str: + """Return the boundary text that denotes the end of a file.""" + return self._boundary + + def _change_state(self, state: ParserState, name: Optional[str]=None): + """Helper to change the state of the parser. + + This also clears some variables used in different states. + """ + self._state = state + self._last_idx = 0 + self._boundary_idx = 0 + self._name = name + + async def data_received(self, chunk: bytes) -> None: + # Process the data received, based on the current state. + self._buffer.extend(chunk) + + # Iterate over and over while there is sufficient data in the buffer. + # Each loop should either consume data, or move to a state where not + # enough data is available, in which case this should exit to await + # more data. + while True: + # NOTE: As a almost useless optimization, we order the states in + # the "expected" most common state ordering: + # (1) PARSE_BODY is likely the most common state. + # (2) PARSE_BOUNDARY_LINE + # (3) PARSE_HEADERS + # (4) PARSING_DONE + + # PARSE_BODY state --> Expecting to parse the file contents. + if self._state == ParserState.PARSE_BODY: + # Search for the boundary characters. + idx = self._buffer.find(b'-') + if idx < 0: + # No match against any boundary character. Write out the + # whole buffer. + data = self._buffer + self._buffer = bytearray() + await self._delegate.write(self._info, data) + + # Return because the whole buffer was written out. + return + + # If 'idx > 0', write the data _up to_ this boundary point, + # then proceed in the same manner as 'idx == 0'. + if idx > 0: + # Write out all of the data, _up to_ this boundary point, + # then cycle around to check whether we are at the bounary + # or not. This simplifies the logic for checking against + # the boundary cases. + data = self._buffer[:idx] + self._buffer = self._buffer[idx:] + await self._delegate.file_data_received(self._info, data) + + # Not enough data (technically) to check against. Wait for + # more data to be certain whether the boundary was parsed. + if len(self._buffer) < len(self._boundary_next): + return + + # If the buffer starts with the same contents as + # 'self._boundary_base', switch states and let that state + # handle this case more cleanly. + if self._buffer.startswith(self._boundary_next): + # Mark the current file as finished. + await self._delegate.finish_file(self._info) + self._change_state(ParserState.PARSE_BOUNDARY_LINE) + continue + + # Check the end boundary as well. The end boundary _might_ + # match if the 'self._boundary_base' matches, but the + # 'self._boundary_next' does not. Wait for more data if the + # buffer does not have enough data to be sure. + if len(self._buffer) < len(self._boundary_end): + return + + if self._buffer.startswith(self._boundary_end): + await self._delegate.finish_file(self._info) + self._change_state(ParserState.PARSE_BOUNDARY_LINE) + continue + + # No match so far, so write out the data up to the next + # boundary delimiter. + next_idx = self._buffer.find(b'-', 1) + if next_idx < 0: + data = self._buffer + self._buffer = bytearray() + else: + data = self._buffer[:next_idx] + self._buffer = self._buffer[next_idx:] + await self._delegate.file_data_received(self._info, data) + + # Continue and run the check after this update. + continue + + # PARSE_BOUNDARY_LINE state --> Expecting to parse either: + # - self._boundary_next (for the next file) + # - self._boundary_end (for the end of the request) + if self._state == ParserState.PARSE_BOUNDARY_LINE: + # Parse the first boundary chunk. + if len(self._buffer) < len(self._boundary_next): + # Not enough data, so exit. + return + # This implies we are parsing another file, so transition to + # the 'PARSE_HEADER' state. Also, continue to run through the + # loop again with the new state. + if self._buffer.startswith(self._boundary_next): + self._buffer = self._buffer[len(self._boundary_next):] + self._change_state(ParserState.PARSE_FILE_HEADERS) + continue + # Check against 'self._boundary_end' as well. There is a slim + # chance that we are at the self._boundary_end case, but still + # do not have enough data, so handle that here. + if len(self._buffer) < len(self._boundary_end): + # Hope we get more data to confirm the boundary end case. + return + elif self._buffer.startswith(self._boundary_end): + # Done parsing. We should probably sanity-check that all + # data was consumed. + self._buffer = self._buffer[len(self._boundary_end):] + self._change_state(ParserState.PARSING_DONE) + continue + else: + gen_log.warning("Invalid boundary parsed!") + + # PARSE_HEADERS state --> Expecting to parse headers with CRLF. + if self._state == ParserState.PARSE_FILE_HEADERS: + idx = self._buffer.find(b'\r\n\r\n', self._last_idx) + # Implies no match. Update the next index to search to be: + # max(0, len(buffer) - 3) + # as an optimization to speed up future comparisons. This + # should work; if there is no match, then the buffer could + # (in the worst case) have '\r\n\r', but not the final '\n' + # so we might need to rescan the previous 3 characters, but + # not 4. (Cap at 0 in case the buffer is too small for some + # reason.) + # + # In any case, there is not enough data, so just exit. + if idx < 0: + self._last_idx = max(0, len(self._buffer) - 3) + return + # Otherwise, we have a match. Parse this into a dictionary of + # headers and pass the result to create a new file. + data = self._buffer[:idx + 4].decode('utf-8') + self._buffer = self._buffer[idx + 4:] + headers = HTTPHeaders.parse(data) + name, plist = _parse_header( + headers.get('Content-Disposition', '')) + # content_disp = headers.get('Content-Disposition', '') + # _parse_header(head) + # name = parse_content_name(content_disp) + + # Call the delegate with the new file. + + self._info = await self._delegate.start_file( + name, headers=headers) + + # Update the buffer and the state. + self._change_state(ParserState.PARSE_BODY, name=name) + continue + + # PARSE_DONE state --> Expect no more data, but break the loop. + if self._state == ParserState.PARSING_DONE: + if len(self._buffer) > 0: + # WARNING: Data is left in the buffer when we should be + # finished... + gen_log.warning( + "Finished with non-empty buffer (%s bytes remaining).", + len(self._buffer)) + + # Even if there is data remaining, we should exit the loop. + return + + def format_timestamp( ts: Union[int, float, tuple, time.struct_time, datetime.datetime] ) -> str: diff --git a/tornado/test/httputil_test.py b/tornado/test/httputil_test.py index 8424491d87..7ded8c4c03 100644 --- a/tornado/test/httputil_test.py +++ b/tornado/test/httputil_test.py @@ -9,11 +9,12 @@ qs_to_qsl, HTTPInputError, HTTPFile, + StreamingMultipartFormDataParser, + AbstractFileDelegate ) from tornado.escape import utf8, native_str from tornado.log import gen_log from tornado.testing import ExpectLog - import copy import datetime import logging @@ -22,7 +23,7 @@ import urllib.parse import unittest -from typing import Tuple, Dict, List +from typing import Tuple, Dict, List, Optional def form_data_args() -> Tuple[Dict[str, List[bytes]], Dict[str, List[HTTPFile]]]: @@ -259,6 +260,157 @@ def test_data_after_final_boundary(self): self.assertEqual(file["body"], b"Foo") +MULTIPART_DATA = b"""----boundarything\r +Content-Disposition: form-data; name="a.txt"\r +\r +a----boundarything\r +Content-Disposition: form-data; name="b.csv"\r +Content-Type: text/csv\r +\r +col1,col2 +a,b +--boundarythin,thatwasclose +----boundarything--\r +""" + + +class MemoryFileDelegate(AbstractFileDelegate): + """Basic File Delegate that stores its contents in memory.""" + + def __init__(self): + super().__init__() + self._file_mapping = {} + + self._curr_file = None + self._buffer = bytearray() + self._headers = None + + @property + def keys(self): + return list(self._file_mapping.keys()) + + async def start_file(self, name: str, headers: HTTPHeaders): + self._curr_file = name + self._headers = headers + self._buffer = bytearray() + + async def file_data_received(self, name: str, data: bytes): + self._buffer.extend(data) + + async def finish_file(self, name: str): + content_type = self._headers.get( + 'Content-Type', 'application/octet-stream') + httpfile = HTTPFile( + filename=name, body=bytes(self._buffer), content_type=content_type) + self._file_mapping[name] = httpfile + + def get_file(self, name) -> Optional[HTTPFile]: + return self._file_mapping.get(name) + + +class StreamingMultipartFormDataTest(unittest.IsolatedAsyncioTestCase): + + async def test_multipart_form_data(self): + boundary = b'--boundarything' + + headers_a_txt = list(HTTPHeaders({ + 'Content-Disposition': 'form-data; name="a.txt"', + }).get_all()) + headers_b_csv = list(HTTPHeaders({ + 'Content-Disposition': 'form-data; name="b.csv"', + 'Content-Type': 'text/csv' + }).get_all()) + + # Test all possible splits and chunks of the given data. This will + # verify the parser with all (?) possible corner cases. + for i in range(len(MULTIPART_DATA)): + delegate = MemoryFileDelegate() + parser = StreamingMultipartFormDataParser(delegate, boundary) + chunk1 = MULTIPART_DATA[:i] + chunk2 = MULTIPART_DATA[i:] + await parser.data_received(chunk1) + await parser.data_received(chunk2) + + # Verify that the delegate contents are correct. + self.assertEqual( + set(['a.txt', 'b.csv']), set(delegate.keys), + "Expected files not found for slicing at: {}".format(i)) + # Assert the 'headers' match what is expected. + self.assertEqual( + headers_a_txt, + list(delegate.get_headers('a.txt').get_all()), + '"a.txt" header mismatch on slice: {}'.format(i)) + self.assertEqual( + headers_b_csv, + list(delegate.get_headers('b.csv').get_all()), + '"b.csv" header mismatch on slice: {}'.format(i)) + # Assert that the file contents match what is expected. + a_info = await delegate.get_file_info('a.txt') + self.assertIsNotNone(a_info) + a_data = await delegate.read_into_bytes(a_info) + self.assertEqual( + b'a', a_data, + '"a.txt" file contents mismatch on slice: {}'.format(i)) + b_info = await delegate.get_file_info('b.csv') + self.assertIsNotNone(b_info) + b_data = await delegate.read_into_bytes(b_info) + self.assertEqual( + b'col1,col2\na,b\n--boundarythin,thatwasclose\n', + b_data, + # bytes(delegate.parsed_data['b.csv']), + '"b.csv" file contents mismatch on slice: {}'.format(i)) + + async def test_multipart_form_data_async(self): + # Same test as above, but with async methods for the delegate. + boundary = b'--boundarything' + + headers_a_txt = list(HTTPHeaders({ + 'Content-Disposition': 'form-data; name="a.txt"', + }).get_all()) + headers_b_csv = list(HTTPHeaders({ + 'Content-Disposition': 'form-data; name=b.csv', + 'Content-Type': 'text/csv;' + })) + + # Test all possible splits and chunks of the given data. This will + # verify the parser with all possible corner cases. + for i in range(len(MULTIPART_DATA)): + delegate = MemoryFileDelegate() + parser = StreamingMultipartFormDataParser(delegate, boundary) + chunk1 = MULTIPART_DATA[:i] + chunk2 = MULTIPART_DATA[i:] + await parser.data_received(chunk1) + await parser.data_received(chunk2) + + # Verify that the delegate contents are correct. + self.assertEqual( + set(['a.txt', 'b.csv']), set(delegate.keys), + "Expected files not found for slicing at: {}".format(i)) + # Assert the 'headers' match what is expected. + self.assertEqual( + headers_a_txt, + list(delegate.get_headers('a.txt').get_all()), + '"a.txt" header mismatch on slice: {}'.format(i)) + self.assertEqual( + headers_b_csv, + list(delegate.get_headers('b.csv')), + '"b.csv" header mismatch on slice: {}'.format(i)) + # Assert that the file contents match what is expected. + a_info = await delegate.get_file_info('a.txt') + self.assertIsNotNone(a_info) + a_data = await delegate.read_into_bytes(a_info) + self.assertEqual( + b'a', a_data, + '"a.txt" file contents mismatch on slice: {}'.format(i)) + b_info = await delegate.get_file_info('b.csv') + self.assertIsNotNone(b_info) + b_data = await delegate.read_into_bytes(b_info) + self.assertEqual( + b'col1,col2\na,b\n--boundarythin,thatwasclose\n', + b_data, + '"b.csv" file contents mismatch on slice: {}'.format(i)) + + class HTTPHeadersTest(unittest.TestCase): def test_multi_line(self): # Lines beginning with whitespace are appended to the previous line From e7454e53ff528caec818c139a4b81e4c19d97136 Mon Sep 17 00:00:00 2001 From: Aaron Gibson Date: Mon, 6 Feb 2023 18:58:38 -0800 Subject: [PATCH 2/5] Work with both Async and sync versions of AbstractFileDelegate. Also, fix the tests to start passing now. --- tornado/httputil.py | 30 ++++++---- tornado/test/httputil_test.py | 102 ++++++++++++++++------------------ 2 files changed, 69 insertions(+), 63 deletions(-) diff --git a/tornado/httputil.py b/tornado/httputil.py index 663815ea82..97284938f5 100644 --- a/tornado/httputil.py +++ b/tornado/httputil.py @@ -924,7 +924,6 @@ def __init__(self, delegate: AbstractFileDelegate, boundary: str, self._boundary = boundary self._max_buffer_size = max_header_bytes self._name = None - self._info = None # Variables to store the current state of the parser. self._state = ParserState.PARSE_BOUNDARY_LINE @@ -979,7 +978,9 @@ async def data_received(self, chunk: bytes) -> None: # whole buffer. data = self._buffer self._buffer = bytearray() - await self._delegate.write(self._info, data) + fut = self._delegate.file_data_received(self._name, data) + if fut is not None: + await fut # Return because the whole buffer was written out. return @@ -993,7 +994,9 @@ async def data_received(self, chunk: bytes) -> None: # the boundary cases. data = self._buffer[:idx] self._buffer = self._buffer[idx:] - await self._delegate.file_data_received(self._info, data) + fut = self._delegate.file_data_received(self._name, data) + if fut is not None: + await fut # Not enough data (technically) to check against. Wait for # more data to be certain whether the boundary was parsed. @@ -1005,7 +1008,9 @@ async def data_received(self, chunk: bytes) -> None: # handle this case more cleanly. if self._buffer.startswith(self._boundary_next): # Mark the current file as finished. - await self._delegate.finish_file(self._info) + fut = self._delegate.finish_file(self._name) + if fut is not None: + await fut self._change_state(ParserState.PARSE_BOUNDARY_LINE) continue @@ -1017,7 +1022,9 @@ async def data_received(self, chunk: bytes) -> None: return if self._buffer.startswith(self._boundary_end): - await self._delegate.finish_file(self._info) + fut = self._delegate.finish_file(self._name) + if fut is not None: + await fut self._change_state(ParserState.PARSE_BOUNDARY_LINE) continue @@ -1030,7 +1037,9 @@ async def data_received(self, chunk: bytes) -> None: else: data = self._buffer[:next_idx] self._buffer = self._buffer[next_idx:] - await self._delegate.file_data_received(self._info, data) + fut = self._delegate.file_data_received(self._name, data) + if fut is not None: + await fut # Continue and run the check after this update. continue @@ -1086,16 +1095,17 @@ async def data_received(self, chunk: bytes) -> None: data = self._buffer[:idx + 4].decode('utf-8') self._buffer = self._buffer[idx + 4:] headers = HTTPHeaders.parse(data) - name, plist = _parse_header( + _, plist = _parse_header( headers.get('Content-Disposition', '')) + name = plist.get('name') # content_disp = headers.get('Content-Disposition', '') # _parse_header(head) # name = parse_content_name(content_disp) # Call the delegate with the new file. - - self._info = await self._delegate.start_file( - name, headers=headers) + fut = self._delegate.start_file(name, headers=headers) + if fut is not None: + await fut # Update the buffer and the state. self._change_state(ParserState.PARSE_BODY, name=name) diff --git a/tornado/test/httputil_test.py b/tornado/test/httputil_test.py index 7ded8c4c03..805ab68c25 100644 --- a/tornado/test/httputil_test.py +++ b/tornado/test/httputil_test.py @@ -277,6 +277,40 @@ def test_data_after_final_boundary(self): class MemoryFileDelegate(AbstractFileDelegate): """Basic File Delegate that stores its contents in memory.""" + def __init__(self): + super().__init__() + self._file_mapping = {} + + self._curr_file = None + self._buffer = bytearray() + self._headers = None + + @property + def keys(self): + return list(self._file_mapping.keys()) + + def start_file(self, name: str, headers: HTTPHeaders): + self._curr_file = name + self._headers = headers + self._buffer = bytearray() + + def file_data_received(self, name: str, data: bytes): + self._buffer.extend(data) + + def finish_file(self, name: str): + content_type = self._headers.get( + 'Content-Type', 'application/octet-stream') + httpfile = HTTPFile( + filename=name, body=bytes(self._buffer), content_type=content_type) + self._file_mapping[name] = httpfile + + def get_file(self, name) -> Optional[HTTPFile]: + return self._file_mapping.get(name) + + +class AsyncMemoryFileDelegate(AbstractFileDelegate): + """Basic File Delegate that stores its contents in memory.""" + def __init__(self): super().__init__() self._file_mapping = {} @@ -307,20 +341,11 @@ async def finish_file(self, name: str): def get_file(self, name) -> Optional[HTTPFile]: return self._file_mapping.get(name) - class StreamingMultipartFormDataTest(unittest.IsolatedAsyncioTestCase): async def test_multipart_form_data(self): boundary = b'--boundarything' - headers_a_txt = list(HTTPHeaders({ - 'Content-Disposition': 'form-data; name="a.txt"', - }).get_all()) - headers_b_csv = list(HTTPHeaders({ - 'Content-Disposition': 'form-data; name="b.csv"', - 'Content-Type': 'text/csv' - }).get_all()) - # Test all possible splits and chunks of the given data. This will # verify the parser with all (?) possible corner cases. for i in range(len(MULTIPART_DATA)): @@ -335,28 +360,17 @@ async def test_multipart_form_data(self): self.assertEqual( set(['a.txt', 'b.csv']), set(delegate.keys), "Expected files not found for slicing at: {}".format(i)) - # Assert the 'headers' match what is expected. - self.assertEqual( - headers_a_txt, - list(delegate.get_headers('a.txt').get_all()), - '"a.txt" header mismatch on slice: {}'.format(i)) - self.assertEqual( - headers_b_csv, - list(delegate.get_headers('b.csv').get_all()), - '"b.csv" header mismatch on slice: {}'.format(i)) - # Assert that the file contents match what is expected. - a_info = await delegate.get_file_info('a.txt') - self.assertIsNotNone(a_info) - a_data = await delegate.read_into_bytes(a_info) + + http_file_a = delegate.get_file('a.txt') + self.assertIsNotNone(http_file_a) self.assertEqual( - b'a', a_data, + b'a', http_file_a.body, '"a.txt" file contents mismatch on slice: {}'.format(i)) - b_info = await delegate.get_file_info('b.csv') - self.assertIsNotNone(b_info) - b_data = await delegate.read_into_bytes(b_info) + http_file_b = delegate.get_file('b.csv') + self.assertIsNotNone(http_file_b) self.assertEqual( b'col1,col2\na,b\n--boundarythin,thatwasclose\n', - b_data, + http_file_b.body, # bytes(delegate.parsed_data['b.csv']), '"b.csv" file contents mismatch on slice: {}'.format(i)) @@ -364,18 +378,10 @@ async def test_multipart_form_data_async(self): # Same test as above, but with async methods for the delegate. boundary = b'--boundarything' - headers_a_txt = list(HTTPHeaders({ - 'Content-Disposition': 'form-data; name="a.txt"', - }).get_all()) - headers_b_csv = list(HTTPHeaders({ - 'Content-Disposition': 'form-data; name=b.csv', - 'Content-Type': 'text/csv;' - })) - # Test all possible splits and chunks of the given data. This will # verify the parser with all possible corner cases. for i in range(len(MULTIPART_DATA)): - delegate = MemoryFileDelegate() + delegate = AsyncMemoryFileDelegate() parser = StreamingMultipartFormDataParser(delegate, boundary) chunk1 = MULTIPART_DATA[:i] chunk2 = MULTIPART_DATA[i:] @@ -386,28 +392,18 @@ async def test_multipart_form_data_async(self): self.assertEqual( set(['a.txt', 'b.csv']), set(delegate.keys), "Expected files not found for slicing at: {}".format(i)) - # Assert the 'headers' match what is expected. - self.assertEqual( - headers_a_txt, - list(delegate.get_headers('a.txt').get_all()), - '"a.txt" header mismatch on slice: {}'.format(i)) - self.assertEqual( - headers_b_csv, - list(delegate.get_headers('b.csv')), - '"b.csv" header mismatch on slice: {}'.format(i)) # Assert that the file contents match what is expected. - a_info = await delegate.get_file_info('a.txt') - self.assertIsNotNone(a_info) - a_data = await delegate.read_into_bytes(a_info) + http_file_a = delegate.get_file('a.txt') + self.assertIsNotNone(http_file_a) self.assertEqual( - b'a', a_data, + b'a', http_file_a.body, '"a.txt" file contents mismatch on slice: {}'.format(i)) - b_info = await delegate.get_file_info('b.csv') - self.assertIsNotNone(b_info) - b_data = await delegate.read_into_bytes(b_info) + http_file_b = delegate.get_file('b.csv') + self.assertIsNotNone(http_file_b) self.assertEqual( b'col1,col2\na,b\n--boundarythin,thatwasclose\n', - b_data, + http_file_b.body, + # bytes(delegate.parsed_data['b.csv']), '"b.csv" file contents mismatch on slice: {}'.format(i)) From a1cd8fc1b5397e2f5c01cbb4ffce59e82a2488c4 Mon Sep 17 00:00:00 2001 From: Aaron Gibson Date: Tue, 7 Feb 2023 20:30:17 -0800 Subject: [PATCH 3/5] Apply 'black' linter. --- tornado/httputil.py | 50 ++++++++++++--------------- tornado/test/httputil_test.py | 63 ++++++++++++++++++++--------------- 2 files changed, 57 insertions(+), 56 deletions(-) diff --git a/tornado/httputil.py b/tornado/httputil.py index 97284938f5..8b4c7fcd89 100644 --- a/tornado/httputil.py +++ b/tornado/httputil.py @@ -857,7 +857,6 @@ def parse_multipart_form_data( class AbstractFileDelegate: - def start_file(self, name: str, headers: HTTPHeaders) -> Optional[Awaitable[None]]: pass @@ -869,7 +868,6 @@ def finish_file(self, name: str) -> Optional[Awaitable[None]]: class ParserState: - PARSE_BOUNDARY_LINE = 1 """State that parses the initial boundary.""" @@ -895,13 +893,10 @@ def from_content_type_header( cls, delegate, header ) -> "StreamingMultipartFormDataParser": if isinstance(header, bytes): - header = header.decode('utf-8') + header = header.decode("utf-8") boundary = None # Make sure the header is the multipart/form-data. - parts = [ - part.strip() - for part in header.split(';') - ] + parts = [part.strip() for part in header.split(";")] if parts[0].lower() != "multipart/form-data": raise ValueError("Invalid Content-Type: {}".format(parts[0])) @@ -909,16 +904,16 @@ def from_content_type_header( for part in parts: m = _BOUNDARY_REGEX.match(part) if m: - boundary = m.group('boundary') + boundary = m.group("boundary") return cls(delegate, boundary) raise ValueError("Required 'boundary' option not found in header!") - - def __init__(self, delegate: AbstractFileDelegate, boundary: str, - max_header_bytes=_1MB): + def __init__( + self, delegate: AbstractFileDelegate, boundary: str, max_header_bytes=_1MB + ): # Be nice and decode the boundary if it is a bytes object. if isinstance(boundary, bytes): - boundary = boundary.decode('utf-8') + boundary = boundary.decode("utf-8") # Store the delegate to write out the data. self._delegate = delegate self._boundary = boundary @@ -930,8 +925,8 @@ def __init__(self, delegate: AbstractFileDelegate, boundary: str, self._buffer = bytearray() # Variables to hold the boundary matches. - self._boundary_next = '--{}\r\n'.format(self._boundary).encode() - self._boundary_end = '--{}--\r\n'.format(self._boundary).encode() + self._boundary_next = "--{}\r\n".format(self._boundary).encode() + self._boundary_end = "--{}--\r\n".format(self._boundary).encode() self._boundary_base = self._boundary_next[:-2] # Variables for caching boundary matching. @@ -943,7 +938,7 @@ def boundary(self) -> str: """Return the boundary text that denotes the end of a file.""" return self._boundary - def _change_state(self, state: ParserState, name: Optional[str]=None): + def _change_state(self, state: ParserState, name: Optional[str] = None): """Helper to change the state of the parser. This also clears some variables used in different states. @@ -972,7 +967,7 @@ async def data_received(self, chunk: bytes) -> None: # PARSE_BODY state --> Expecting to parse the file contents. if self._state == ParserState.PARSE_BODY: # Search for the boundary characters. - idx = self._buffer.find(b'-') + idx = self._buffer.find(b"-") if idx < 0: # No match against any boundary character. Write out the # whole buffer. @@ -1030,7 +1025,7 @@ async def data_received(self, chunk: bytes) -> None: # No match so far, so write out the data up to the next # boundary delimiter. - next_idx = self._buffer.find(b'-', 1) + next_idx = self._buffer.find(b"-", 1) if next_idx < 0: data = self._buffer self._buffer = bytearray() @@ -1056,7 +1051,7 @@ async def data_received(self, chunk: bytes) -> None: # the 'PARSE_HEADER' state. Also, continue to run through the # loop again with the new state. if self._buffer.startswith(self._boundary_next): - self._buffer = self._buffer[len(self._boundary_next):] + self._buffer = self._buffer[len(self._boundary_next) :] self._change_state(ParserState.PARSE_FILE_HEADERS) continue # Check against 'self._boundary_end' as well. There is a slim @@ -1068,7 +1063,7 @@ async def data_received(self, chunk: bytes) -> None: elif self._buffer.startswith(self._boundary_end): # Done parsing. We should probably sanity-check that all # data was consumed. - self._buffer = self._buffer[len(self._boundary_end):] + self._buffer = self._buffer[len(self._boundary_end) :] self._change_state(ParserState.PARSING_DONE) continue else: @@ -1076,7 +1071,7 @@ async def data_received(self, chunk: bytes) -> None: # PARSE_HEADERS state --> Expecting to parse headers with CRLF. if self._state == ParserState.PARSE_FILE_HEADERS: - idx = self._buffer.find(b'\r\n\r\n', self._last_idx) + idx = self._buffer.find(b"\r\n\r\n", self._last_idx) # Implies no match. Update the next index to search to be: # max(0, len(buffer) - 3) # as an optimization to speed up future comparisons. This @@ -1092,15 +1087,11 @@ async def data_received(self, chunk: bytes) -> None: return # Otherwise, we have a match. Parse this into a dictionary of # headers and pass the result to create a new file. - data = self._buffer[:idx + 4].decode('utf-8') - self._buffer = self._buffer[idx + 4:] + data = self._buffer[: idx + 4].decode("utf-8") + self._buffer = self._buffer[idx + 4 :] headers = HTTPHeaders.parse(data) - _, plist = _parse_header( - headers.get('Content-Disposition', '')) - name = plist.get('name') - # content_disp = headers.get('Content-Disposition', '') - # _parse_header(head) - # name = parse_content_name(content_disp) + _, plist = _parse_header(headers.get("Content-Disposition", "")) + name = plist.get("name") # Call the delegate with the new file. fut = self._delegate.start_file(name, headers=headers) @@ -1118,7 +1109,8 @@ async def data_received(self, chunk: bytes) -> None: # finished... gen_log.warning( "Finished with non-empty buffer (%s bytes remaining).", - len(self._buffer)) + len(self._buffer), + ) # Even if there is data remaining, we should exit the loop. return diff --git a/tornado/test/httputil_test.py b/tornado/test/httputil_test.py index 805ab68c25..27c5be2562 100644 --- a/tornado/test/httputil_test.py +++ b/tornado/test/httputil_test.py @@ -10,7 +10,7 @@ HTTPInputError, HTTPFile, StreamingMultipartFormDataParser, - AbstractFileDelegate + AbstractFileDelegate, ) from tornado.escape import utf8, native_str from tornado.log import gen_log @@ -298,10 +298,10 @@ def file_data_received(self, name: str, data: bytes): self._buffer.extend(data) def finish_file(self, name: str): - content_type = self._headers.get( - 'Content-Type', 'application/octet-stream') + content_type = self._headers.get("Content-Type", "application/octet-stream") httpfile = HTTPFile( - filename=name, body=bytes(self._buffer), content_type=content_type) + filename=name, body=bytes(self._buffer), content_type=content_type + ) self._file_mapping[name] = httpfile def get_file(self, name) -> Optional[HTTPFile]: @@ -332,19 +332,19 @@ async def file_data_received(self, name: str, data: bytes): self._buffer.extend(data) async def finish_file(self, name: str): - content_type = self._headers.get( - 'Content-Type', 'application/octet-stream') + content_type = self._headers.get("Content-Type", "application/octet-stream") httpfile = HTTPFile( - filename=name, body=bytes(self._buffer), content_type=content_type) + filename=name, body=bytes(self._buffer), content_type=content_type + ) self._file_mapping[name] = httpfile def get_file(self, name) -> Optional[HTTPFile]: return self._file_mapping.get(name) -class StreamingMultipartFormDataTest(unittest.IsolatedAsyncioTestCase): +class StreamingMultipartFormDataTest(unittest.IsolatedAsyncioTestCase): async def test_multipart_form_data(self): - boundary = b'--boundarything' + boundary = b"--boundarything" # Test all possible splits and chunks of the given data. This will # verify the parser with all (?) possible corner cases. @@ -358,25 +358,29 @@ async def test_multipart_form_data(self): # Verify that the delegate contents are correct. self.assertEqual( - set(['a.txt', 'b.csv']), set(delegate.keys), - "Expected files not found for slicing at: {}".format(i)) + set(["a.txt", "b.csv"]), + set(delegate.keys), + "Expected files not found for slicing at: {}".format(i), + ) - http_file_a = delegate.get_file('a.txt') + http_file_a = delegate.get_file("a.txt") self.assertIsNotNone(http_file_a) self.assertEqual( - b'a', http_file_a.body, - '"a.txt" file contents mismatch on slice: {}'.format(i)) - http_file_b = delegate.get_file('b.csv') + b"a", + http_file_a.body, + '"a.txt" file contents mismatch on slice: {}'.format(i), + ) + http_file_b = delegate.get_file("b.csv") self.assertIsNotNone(http_file_b) self.assertEqual( - b'col1,col2\na,b\n--boundarythin,thatwasclose\n', + b"col1,col2\na,b\n--boundarythin,thatwasclose\n", http_file_b.body, - # bytes(delegate.parsed_data['b.csv']), - '"b.csv" file contents mismatch on slice: {}'.format(i)) + '"b.csv" file contents mismatch on slice: {}'.format(i), + ) async def test_multipart_form_data_async(self): # Same test as above, but with async methods for the delegate. - boundary = b'--boundarything' + boundary = b"--boundarything" # Test all possible splits and chunks of the given data. This will # verify the parser with all possible corner cases. @@ -390,21 +394,26 @@ async def test_multipart_form_data_async(self): # Verify that the delegate contents are correct. self.assertEqual( - set(['a.txt', 'b.csv']), set(delegate.keys), - "Expected files not found for slicing at: {}".format(i)) + set(["a.txt", "b.csv"]), + set(delegate.keys), + "Expected files not found for slicing at: {}".format(i), + ) # Assert that the file contents match what is expected. - http_file_a = delegate.get_file('a.txt') + http_file_a = delegate.get_file("a.txt") self.assertIsNotNone(http_file_a) self.assertEqual( - b'a', http_file_a.body, - '"a.txt" file contents mismatch on slice: {}'.format(i)) - http_file_b = delegate.get_file('b.csv') + b"a", + http_file_a.body, + '"a.txt" file contents mismatch on slice: {}'.format(i), + ) + http_file_b = delegate.get_file("b.csv") self.assertIsNotNone(http_file_b) self.assertEqual( - b'col1,col2\na,b\n--boundarythin,thatwasclose\n', + b"col1,col2\na,b\n--boundarythin,thatwasclose\n", http_file_b.body, # bytes(delegate.parsed_data['b.csv']), - '"b.csv" file contents mismatch on slice: {}'.format(i)) + '"b.csv" file contents mismatch on slice: {}'.format(i), + ) class HTTPHeadersTest(unittest.TestCase): From 5c27a25ce02f91efef93f3b7f9bcbe6d98a84361 Mon Sep 17 00:00:00 2001 From: Aaron Gibson Date: Mon, 20 Feb 2023 08:51:57 -0800 Subject: [PATCH 4/5] Restrict the size of the buffer. --- tornado/httputil.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/tornado/httputil.py b/tornado/httputil.py index 8b4c7fcd89..e7e7ba2e4e 100644 --- a/tornado/httputil.py +++ b/tornado/httputil.py @@ -950,20 +950,25 @@ def _change_state(self, state: ParserState, name: Optional[str] = None): async def data_received(self, chunk: bytes) -> None: # Process the data received, based on the current state. - self._buffer.extend(chunk) + # + # It is possible for 'chunk' here to be larger than the maximum buffer + # size. Initially, this is okay because we still need to process the + # chunk. However, when the buffer _remains_ this size after going + # through the rest of this call, then the input is bad since each state + # should incrementally consume data from the buffer contain its size. + if len(self._buffer) > self._max_buffer_size: + raise ValueError( + "Buffer is growing larger than: {} bytes!".format(self._buffer) + ) + # Ignore incrementing the buffer when in the DONE state altogether. + if self._state != ParserState.PARSING_DONE: + self._buffer.extend(chunk) # Iterate over and over while there is sufficient data in the buffer. # Each loop should either consume data, or move to a state where not # enough data is available, in which case this should exit to await # more data. while True: - # NOTE: As a almost useless optimization, we order the states in - # the "expected" most common state ordering: - # (1) PARSE_BODY is likely the most common state. - # (2) PARSE_BOUNDARY_LINE - # (3) PARSE_HEADERS - # (4) PARSING_DONE - # PARSE_BODY state --> Expecting to parse the file contents. if self._state == ParserState.PARSE_BODY: # Search for the boundary characters. From b82355a2039eb3953241d900fd0bb0a1a999470a Mon Sep 17 00:00:00 2001 From: Aaron Gibson Date: Mon, 20 Feb 2023 15:21:39 -0800 Subject: [PATCH 5/5] Add a max_buffer_size field to (loosely) bound the buffer size in the parser. --- tornado/httputil.py | 11 +++++++++-- tornado/test/httputil_test.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/tornado/httputil.py b/tornado/httputil.py index e7e7ba2e4e..4c60c9e7e0 100644 --- a/tornado/httputil.py +++ b/tornado/httputil.py @@ -868,6 +868,7 @@ def finish_file(self, name: str) -> Optional[Awaitable[None]]: class ParserState: + PARSE_BOUNDARY_LINE = 1 """State that parses the initial boundary.""" @@ -909,15 +910,20 @@ def from_content_type_header( raise ValueError("Required 'boundary' option not found in header!") def __init__( - self, delegate: AbstractFileDelegate, boundary: str, max_header_bytes=_1MB + self, delegate: AbstractFileDelegate, boundary: str, max_buffer_size=_1MB ): + """Create a StreamingMultipartFormDataParser. + + This parser (asynchronously) receives data, parses it, and invokes the + given delegate appropriately. + """ # Be nice and decode the boundary if it is a bytes object. if isinstance(boundary, bytes): boundary = boundary.decode("utf-8") # Store the delegate to write out the data. self._delegate = delegate self._boundary = boundary - self._max_buffer_size = max_header_bytes + self._max_buffer_size = max_buffer_size self._name = None # Variables to store the current state of the parser. @@ -1116,6 +1122,7 @@ async def data_received(self, chunk: bytes) -> None: "Finished with non-empty buffer (%s bytes remaining).", len(self._buffer), ) + self._buffer.clear() # Even if there is data remaining, we should exit the loop. return diff --git a/tornado/test/httputil_test.py b/tornado/test/httputil_test.py index 27c5be2562..d88010ea44 100644 --- a/tornado/test/httputil_test.py +++ b/tornado/test/httputil_test.py @@ -343,6 +343,7 @@ def get_file(self, name) -> Optional[HTTPFile]: class StreamingMultipartFormDataTest(unittest.IsolatedAsyncioTestCase): + async def test_multipart_form_data(self): boundary = b"--boundarything" @@ -415,6 +416,33 @@ async def test_multipart_form_data_async(self): '"b.csv" file contents mismatch on slice: {}'.format(i), ) + async def test_runaway_memory_parser(self): + # Same test as above, but with async methods for the delegate. + boundary = b"--boundarything" + + # Parse the data. + BAD_DATA = b"""----boundarything\r +Content-Disposition: form-data; name="a.txt"\r +\r +a----boundarything\r +Content-Disposition: form-data; name="b.csv"\r +Content-Type: text/csv\r +""" + (b'a' * 10000) + b""" +\r +col1,col2 +a,b +--boundarythin,thatwasclose +----boundarything--\r +""" + delegate = AsyncMemoryFileDelegate() + # Configure the parser, but set the header size to something small. + # This will check if the parser raises after high memory consumption. + parser = StreamingMultipartFormDataParser( + delegate, boundary, max_buffer_size=1024) + await parser.data_received(BAD_DATA[:5000]) + with self.assertRaises(Exception): + await parser.data_received(BAD_DATA[5000:]) + class HTTPHeadersTest(unittest.TestCase): def test_multi_line(self):