diff --git a/.gitignore b/.gitignore index e275bff..7de14f1 100644 --- a/.gitignore +++ b/.gitignore @@ -23,6 +23,7 @@ var/ *.egg-info/ .installed.cfg *.egg +.venv/ # PyInstaller # Usually these files are written by a python script from a template @@ -67,3 +68,6 @@ target/ # pyenv .python-version + +# PyCharm +.idea/ diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..5ff0969 --- /dev/null +++ b/__init__.py @@ -0,0 +1,3 @@ +from kaitaistruct import KaitaiStruct, KaitaiStream, ReadWriteKaitaiStruct +__version__ = '0.11.dev1' +__all__ = ['KaitaiStruct', 'KaitaiStream', 'ReadWriteKaitaiStruct'] diff --git a/experimental/exp_class.py b/experimental/exp_class.py new file mode 100644 index 0000000..6c2ded6 --- /dev/null +++ b/experimental/exp_class.py @@ -0,0 +1,247 @@ +""" +This test is a proof of concept of a lazy parsing. It has the same benefit as the original one, but it is much faster. +I've added the test file to the repository, so you can run it yourself. +""" + +from kaitaistruct import KaitaiStruct +import cProfile +import tracemalloc + + +class AnftOrig(KaitaiStruct): + def __init__(self, _io, _parent = None, _root = None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.magic_bytes = self._io.read_bytes(4) + self.unknown1 = self._io.read_u1() + self.num_records = self._io.read_u4le() + self.records = [] + for i in range(self.num_records): + self.records.append(Anft.Records(self._io, self, self._root)) + + self.footer = self._io.read_bytes(4) + + class Records(KaitaiStruct): + def __init__(self, _io, _parent = None, _root = None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.first = self._io.read_u8be() + self.second = self._io.read_u8be() + + +def old(): + tracemalloc.start() + cProfile.run("AnftOrig.from_file('test.anft')", sort = "cumtime") + cur, peak = tracemalloc.get_traced_memory() + print(f"cur: {cur / 1024 / 1024} MB, peak: {peak / 1024 / 1024} MB") + snapshot = tracemalloc.take_snapshot() + top_stats = snapshot.statistics('lineno') + + print("[ Top 10 ]") + for stat in top_stats[:10]: + print(stat) + + +class Anft(KaitaiStruct): + def __init__(self, _io, _parent = None, _root = None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + + @property + def magic_bytes(self): + if hasattr(self, "_m_magic_bytes"): + return self._m_magic_bytes + + self._m_magic_bytes = self._io.read_bytes(4).decode("UTF-8") + return self._m_magic_bytes + + @property + def unknown1(self): + if hasattr(self, "_m_unknown1"): + return self._m_unknown1 + + self._m_unknown1 = self._io.read_u1() + return self._m_unknown1 + + @property + def num_records(self): + if hasattr(self, "_m_num_records"): + return self._m_num_records + + self._m_num_records = self._io.read_u4le() + return self._m_num_records + + @property + def records(self): + if hasattr(self, "_m_records"): + return self._m_records + + self._m_records = [] + for i in range(self.num_records): + self._m_records.append(self._root.Records(self._io, self, self._root)) + + return self._m_records + + class Records(KaitaiStruct): + def __init__(self, _io, _parent = None, _root = None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + + @property + def first(self): + if hasattr(self, "_m_first"): + return self._m_first + + self._m_first = self._io.read_u8be() + return self._m_first + + @property + def second(self): + if hasattr(self, "_m_second"): + return self._m_second + + self._m_second = self._io.read_u8be() + return self._m_second + + +def experimental(): + tracemalloc.start() + cProfile.run("Anft.from_file('test.anft')", sort = "cumtime") + cur, peak = tracemalloc.get_traced_memory() + print(f"cur: {cur / 1024 / 1024} MB, peak: {peak / 1024 / 1024} MB") + snapshot = tracemalloc.take_snapshot() + top_stats = snapshot.statistics('lineno') + + print("[ Top 10 ]") + for stat in top_stats[:10]: + print(stat) + + +# noinspection PyStatementEffect +def experimental2(): + tracemalloc.start() + cProfile.run("Anft.from_file('test.anft')", sort = "cumtime") + a = Anft.from_file('test.anft') + print(a.magic_bytes) + print(a.unknown1) + print(a.num_records) + for i in a.records: + i.first # no print to not spam the console + i.second # no print to not spam the console + + cur, peak = tracemalloc.get_traced_memory() + print(f"cur: {cur / 1024 / 1024} MB, peak: {peak / 1024 / 1024} MB") + snapshot = tracemalloc.take_snapshot() + top_stats = snapshot.statistics('lineno') + + print("[ Top 10 ]") + for stat in top_stats[:10]: + print(stat) + + +if __name__ == '__main__': + """ + The following test is a lazy parsing. It has the same benefit as the original one, but it is much faster. + It also caches the results of the parsing, so it is even faster on the second run. + """ + experimental() + # cur: 1.0425701141357422 MB, peak: 1.0906896591186523 MB + + # 15 function calls in 0.000 seconds + # + # Ordered by: cumulative time + # + # ncalls tottime percall cumtime percall filename:lineno(function) + # 1 0.000 0.000 0.000 0.000 {built-in method builtins.exec} + # 1 0.000 0.000 0.000 0.000 :1() + # 1 0.000 0.000 0.000 0.000 kaitaistruct.py:43(from_file) + # 1 0.000 0.000 0.000 0.000 {built-in method io.open} + # 1 0.000 0.000 0.000 0.000 kaitaistruct.py:73(__init__) + # 1 0.000 0.000 0.000 0.000 contextlib.py:347(__exit__) + # 1 0.000 0.000 0.000 0.000 kaitaistruct.py:143(size) + # 1 0.000 0.000 0.000 0.000 {method 'close' of '_io.BufferedReader' objects} + # 1 0.000 0.000 0.000 0.000 {method 'size' of 'mmap.mmap' objects} + # 1 0.000 0.000 0.000 0.000 exp_class.py:49(__init__) + # 1 0.000 0.000 0.000 0.000 {method 'fileno' of '_io.BufferedReader' objects} + # 1 0.000 0.000 0.000 0.000 contextlib.py:343(__init__) + # 1 0.000 0.000 0.000 0.000 kaitaistruct.py:272(align_to_byte) + # 1 0.000 0.000 0.000 0.000 contextlib.py:345(__enter__) + # 1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects} + # + # + # [ Top 10 ] + # :729: size=682 KiB, count=5624, average=124 B + # C:\Python311\Lib\collections\__init__.py:503: size=17.1 KiB, count=90, average=195 B + # C:\Python311\Lib\opcode.py:36: size=10026 B, count=148, average=68 B + # C:\Python311\Lib\dataclasses.py:433: size=8524 B, count=100, average=85 B + # C:\Python311\Lib\ast.py:636: size=7936 B, count=9, average=882 B + # :241: size=7597 B, count=76, average=100 B + # C:\Python311\Lib\dis.py:49: size=6512 B, count=1, average=6512 B + # :106: size=6358 B, count=20, average=318 B + # C:\Python311\Lib\ast.py:683: size=5462 B, count=10, average=546 B + # C:\Python311\Lib\collections\__init__.py:436: size=5169 B, count=58, average=89 B + print("----------------------------------------") + """ + Next is the comparison of the original with the experimental version. + """ + # cur: 3.9979352951049805 MB, peak: 4.018841743469238 MB + old() + # 57414 function calls in 0.039 seconds + # + # Ordered by: cumulative time + # + # ncalls tottime percall cumtime percall filename:lineno(function) + # 1 0.000 0.000 0.039 0.039 {built-in method builtins.exec} + # 1 0.000 0.000 0.039 0.039 :1() + # 1 0.000 0.000 0.039 0.039 kaitaistruct.py:43(from_file) + # 1 0.000 0.000 0.039 0.039 exp_class.py:9(__init__) + # 1 0.034 0.034 0.039 0.039 exp_class.py:15(_read) + # 28689 0.003 0.000 0.003 0.000 exp_class.py:90(__init__) + # 28689 0.001 0.000 0.001 0.000 {method 'append' of 'list' objects} + # 1 0.000 0.000 0.000 0.000 {built-in method io.open} + # 4 0.000 0.000 0.000 0.000 kaitaistruct.py:332(read_bytes) + # 4 0.000 0.000 0.000 0.000 kaitaistruct.py:337(_read_bytes_not_aligned) + # 1 0.000 0.000 0.000 0.000 contextlib.py:347(__exit__) + # 1 0.000 0.000 0.000 0.000 kaitaistruct.py:73(__init__) + # 1 0.000 0.000 0.000 0.000 {method 'close' of '_io.BufferedReader' objects} + # 4 0.000 0.000 0.000 0.000 {method 'read' of 'mmap.mmap' objects} + # 1 0.000 0.000 0.000 0.000 kaitaistruct.py:212(read_u1) + # 1 0.000 0.000 0.000 0.000 kaitaistruct.py:143(size) + # 1 0.000 0.000 0.000 0.000 kaitaistruct.py:234(read_u4le) + # 1 0.000 0.000 0.000 0.000 {method 'size' of 'mmap.mmap' objects} + # 2 0.000 0.000 0.000 0.000 {method 'unpack' of '_struct.Struct' objects} + # 1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects} + # 4 0.000 0.000 0.000 0.000 {built-in method builtins.len} + # 1 0.000 0.000 0.000 0.000 contextlib.py:343(__init__) + # 1 0.000 0.000 0.000 0.000 {method 'fileno' of '_io.BufferedReader' objects} + # 1 0.000 0.000 0.000 0.000 kaitaistruct.py:272(align_to_byte) + # 1 0.000 0.000 0.000 0.000 contextlib.py:345(__enter__) + # + # + # [ Top 10 ] + # D:\Dev\kaitai_struct_python_runtime\experimental\exp_class.py:21: size=2933 KiB, count=57380, average=52 B + # :729: size=681 KiB, count=5612, average=124 B + # C:\Python311\Lib\tracemalloc.py:67: size=46.7 KiB, count=747, average=64 B + # C:\Python311\Lib\tracemalloc.py:193: size=39.0 KiB, count=832, average=48 B + # C:\Python311\Lib\collections\__init__.py:503: size=17.1 KiB, count=90, average=195 B + # C:\Python311\Lib\opcode.py:36: size=10026 B, count=148, average=68 B + # C:\Python311\Lib\ast.py:636: size=7936 B, count=9, average=882 B + # :241: size=7650 B, count=77, average=99 B + # C:\Python311\Lib\dataclasses.py:433: size=6820 B, count=69, average=99 B + # C:\Python311\Lib\dis.py:49: size=6512 B, count=1, average=6512 B + + print("----------------------------------------") + """ + Bonus + """ + experimental2() diff --git a/experimental/test.anft b/experimental/test.anft new file mode 100644 index 0000000..b29e963 Binary files /dev/null and b/experimental/test.anft differ diff --git a/kaitaistruct.py b/kaitaistruct.py index ef36bba..998658c 100644 --- a/kaitaistruct.py +++ b/kaitaistruct.py @@ -1,9 +1,11 @@ import itertools -import sys import struct -from io import open, BytesIO, SEEK_CUR, SEEK_END # noqa - -PY2 = sys.version_info[0] == 2 +from contextlib import closing +from enum import Enum +from io import BytesIO, SEEK_CUR, open +from mmap import ACCESS_READ, mmap +from typing import Any, Callable, List, Optional, Type, TypeVar, Union +API_VERSION = (0, 11) # Kaitai Struct runtime version, in the format defined by PEP 440. # Used by our setup.cfg to set the version number in @@ -13,75 +15,75 @@ # Since ksc 0.10, the compatibility check instead uses the API_VERSION constant, # so that the version string does not need to be parsed at runtime # (see https://github.com/kaitai-io/kaitai_struct/issues/804). -__version__ = '0.11.dev1' + # Kaitai Struct runtime API version, as a tuple of ints. # Used in generated Python code (since ksc 0.10) to check that the imported # runtime is compatible with the generated code. -API_VERSION = (0, 11) + # pylint: disable=invalid-name,missing-docstring,too-many-public-methods # pylint: disable=useless-object-inheritance,super-with-arguments,consider-using-f-string +KTStruct = TypeVar("KTStruct", bound = "KaitaiStruct") +KTStream = TypeVar("KTStream", bound = "KaitaiStream") -class KaitaiStruct(object): - def __init__(self, stream): - self._io = stream +class KaitaiStruct: + def __init__(self, stream: KTStream) -> None: + self._io: KTStream = stream - def __enter__(self): + def __enter__(self) -> KTStruct: return self - def __exit__(self, *args, **kwargs): + def __exit__(self, *args, **kwargs) -> None: self.close() - def close(self): + def close(self) -> None: self._io.close() @classmethod - def from_file(cls, filename): - f = open(filename, 'rb') - try: - return cls(KaitaiStream(f)) - except Exception: - # close file descriptor, then reraise the exception - f.close() - raise + def from_file(cls: Type[KTStruct], filename: str) -> KTStruct: + with closing(open(filename, "rb")) as f: + m: mmap = mmap(f.fileno(), 0, access = ACCESS_READ) + return cls(KaitaiStream(m)) @classmethod - def from_bytes(cls, buf): - return cls(KaitaiStream(BytesIO(buf))) + def from_bytes(cls: Type[KTStruct], buf: mmap) -> KTStruct: + return cls(KaitaiStream(mmap(-1, len(buf), access = ACCESS_READ))) @classmethod - def from_io(cls, io): + def from_io(cls, io: Union[BytesIO, mmap]) -> KTStruct: return cls(KaitaiStream(io)) class ReadWriteKaitaiStruct(KaitaiStruct): - def _fetch_instances(self): + def _fetch_instances(self) -> None: raise NotImplementedError() - def _write(self, io=None): + def _write(self, io = None) -> None: self._write__seq(io) self._fetch_instances() self._io.write_back_child_streams() - def _write__seq(self, io): + def _write__seq(self, io) -> None: if io is not None: self._io = io class KaitaiStream(object): - def __init__(self, io): - self._io = io + def __init__(self, io: mmap) -> None: + self._io: mmap = io + self.bits: int = 0 + self.bits_left: int = 0 self.align_to_byte() - self.bits_le = False - self.bits_write_mode = False + self.bits_le: bool = False + self.bits_write_mode: bool = False - self.write_back_handler = None - self.child_streams = [] + self.write_back_handler: Optional[callable] = None + self.child_streams: List[KTStream] = [] try: - self._size = self.size() + self._size: int = self.size() # IOError is for Python 2 (IOError also exists in Python 3, but it has # become just an alias for OSError). # @@ -98,24 +100,24 @@ def __init__(self, io): # _write_bytes_not_aligned()) pass - def __enter__(self): + def __enter__(self: KTStream) -> KTStream: return self - def __exit__(self, *args, **kwargs): + def __exit__(self, *args, **kwargs) -> None: self.close() - def close(self): + def close(self) -> None: try: if self.bits_write_mode: self.write_align_to_byte() - else: + elif self.bits_left > 0: self.align_to_byte() finally: self._io.close() # region Stream positioning - def is_eof(self): + def is_eof(self) -> bool: if not self.bits_write_mode and self.bits_left > 0: return False @@ -126,67 +128,46 @@ def is_eof(self): # . return self._io.tell() >= self.size() - def seek(self, n): + def seek(self, n: int) -> None: if self.bits_write_mode: self.write_align_to_byte() else: - self.align_to_byte() - + if self.bits_left > 0: + self.align_to_byte() self._io.seek(n) - def pos(self): - return self._io.tell() + (1 if self.bits_write_mode and self.bits_left > 0 else 0) - - def size(self): - # Python has no internal File object API function to get - # current file / StringIO size, thus we use the following - # trick. - io = self._io - # Remember our current position - cur_pos = io.tell() - # Seek to the end of the stream and remember the full length - full_size = io.seek(0, SEEK_END) - - if full_size is None: - # In Python 2, the seek() method of 'file' objects (created by the - # built-in open() function) has no return value, so we have to call - # tell() ourselves to get the new absolute position - see - # . - # - # In Python 3, seek() methods of all - # streams return the new - # position already, so this won't be needed once we drop support for - # Python 2. - full_size = io.tell() + def pos(self) -> int: + return self._io.tell() + ( + 1 if self.bits_write_mode and self.bits_left > 0 else 0 + ) - # Seek back to the current position - io.seek(cur_pos) - return full_size + def size(self) -> int: + return self._io.size() # endregion # region Structs for numeric types - packer_s1 = struct.Struct('b') - packer_s2be = struct.Struct('>h') - packer_s4be = struct.Struct('>i') - packer_s8be = struct.Struct('>q') - packer_s2le = struct.Struct('H') - packer_u4be = struct.Struct('>I') - packer_u8be = struct.Struct('>Q') - packer_u2le = struct.Struct('f') - packer_f8be = struct.Struct('>d') - packer_f4le = struct.Struct('h") + packer_s4be: struct.Struct = struct.Struct(">i") + packer_s8be: struct.Struct = struct.Struct(">q") + packer_s2le: struct.Struct = struct.Struct("H") + packer_u4be: struct.Struct = struct.Struct(">I") + packer_u8be: struct.Struct = struct.Struct(">Q") + packer_u2le: struct.Struct = struct.Struct("f") + packer_f8be: struct.Struct = struct.Struct(">d") + packer_f4le: struct.Struct = struct.Struct(" int: + return self.packer_s1.unpack(self.read_bytes(1))[0] # region Big-endian - def read_s2be(self): - return KaitaiStream.packer_s2be.unpack(self.read_bytes(2))[0] + def read_s2be(self) -> int: + return self.packer_s2be.unpack(self.read_bytes(2))[0] - def read_s4be(self): - return KaitaiStream.packer_s4be.unpack(self.read_bytes(4))[0] + def read_s4be(self) -> int: + return self.packer_s4be.unpack(self.read_bytes(4))[0] - def read_s8be(self): - return KaitaiStream.packer_s8be.unpack(self.read_bytes(8))[0] + def read_s8be(self) -> int: + return self.packer_s8be.unpack(self.read_bytes(8))[0] # endregion # region Little-endian - def read_s2le(self): - return KaitaiStream.packer_s2le.unpack(self.read_bytes(2))[0] + def read_s2le(self) -> int: + return self.packer_s2le.unpack(self.read_bytes(2))[0] - def read_s4le(self): - return KaitaiStream.packer_s4le.unpack(self.read_bytes(4))[0] + def read_s4le(self) -> int: + return self.packer_s4le.unpack(self.read_bytes(4))[0] - def read_s8le(self): - return KaitaiStream.packer_s8le.unpack(self.read_bytes(8))[0] + def read_s8le(self) -> int: + return self.packer_s8le.unpack(self.read_bytes(8))[0] # endregion @@ -229,32 +210,33 @@ def read_s8le(self): # region Unsigned - def read_u1(self): - return KaitaiStream.packer_u1.unpack(self.read_bytes(1))[0] + def read_u1(self) -> int: + return self.packer_u1.unpack(self.read_bytes(1))[0] - # region Big-endian + def read_u2be(self) -> int: + return self.packer_u2be.unpack(self.read_bytes(2))[0] - def read_u2be(self): - return KaitaiStream.packer_u2be.unpack(self.read_bytes(2))[0] + def read_u4be(self) -> int: + return self.packer_u4be.unpack(self.read_bytes(4))[0] - def read_u4be(self): - return KaitaiStream.packer_u4be.unpack(self.read_bytes(4))[0] + # def read_u8be(self) -> int: + # return self.packer_u8be.unpack_from(self.read_bytes(8))[0] - def read_u8be(self): - return KaitaiStream.packer_u8be.unpack(self.read_bytes(8))[0] + def read_u8be(self) -> int: + return self.packer_u8be.unpack(self.read_bytes(8))[0] # endregion # region Little-endian - def read_u2le(self): - return KaitaiStream.packer_u2le.unpack(self.read_bytes(2))[0] + def read_u2le(self) -> int: + return self.packer_u2le.unpack(self.read_bytes(2))[0] - def read_u4le(self): - return KaitaiStream.packer_u4le.unpack(self.read_bytes(4))[0] + def read_u4le(self) -> int: + return self.packer_u4le.unpack(self.read_bytes(4))[0] - def read_u8le(self): - return KaitaiStream.packer_u8le.unpack(self.read_bytes(8))[0] + def read_u8le(self) -> int: + return self.packer_u8le.unpack(self.read_bytes(8))[0] # endregion @@ -266,21 +248,21 @@ def read_u8le(self): # region Big-endian - def read_f4be(self): - return KaitaiStream.packer_f4be.unpack(self.read_bytes(4))[0] + def read_f4be(self) -> float: + return self.packer_f4be.unpack(self.read_bytes(4))[0] - def read_f8be(self): - return KaitaiStream.packer_f8be.unpack(self.read_bytes(8))[0] + def read_f8be(self) -> float: + return self.packer_f8be.unpack(self.read_bytes(8))[0] # endregion # region Little-endian - def read_f4le(self): - return KaitaiStream.packer_f4le.unpack(self.read_bytes(4))[0] + def read_f4le(self) -> float: + return self.packer_f4le.unpack(self.read_bytes(4))[0] - def read_f8le(self): - return KaitaiStream.packer_f8le.unpack(self.read_bytes(8))[0] + def read_f8le(self) -> float: + return self.packer_f8le.unpack(self.read_bytes(8))[0] # endregion @@ -288,137 +270,130 @@ def read_f8le(self): # region Unaligned bit values - def align_to_byte(self): - self.bits_left = 0 + def align_to_byte(self) -> None: + if self.bits_left == 0: + return self.bits = 0 + self.bits_left = 0 - def read_bits_int_be(self, n): + def read_bits_int_be(self, n: int) -> int: self.bits_write_mode = False - res = 0 - bits_needed = n - self.bits_left self.bits_left = -bits_needed % 8 if bits_needed > 0: - # 1 bit => 1 byte - # 8 bits => 1 byte - # 9 bits => 2 bytes - bytes_needed = ((bits_needed - 1) // 8) + 1 # `ceil(bits_needed / 8)` + bytes_needed = -(-bits_needed // 8) # Equivalent to `ceil(bits_needed / 8)` buf = self._read_bytes_not_aligned(bytes_needed) - if PY2: - buf = bytearray(buf) for byte in buf: - res = res << 8 | byte + res = (res << 8) | byte new_bits = res - res = res >> self.bits_left | self.bits << bits_needed - self.bits = new_bits # will be masked at the end of the function - else: - res = self.bits >> -bits_needed # shift unneeded bits out + res = (res >> self.bits_left) | (self.bits << bits_needed) + self.bits = new_bits - mask = (1 << self.bits_left) - 1 # `bits_left` is in range 0..7 - self.bits &= mask + else: + res = self.bits >> -bits_needed + self.bits &= (1 << self.bits_left) - 1 return res # Unused since Kaitai Struct Compiler v0.9+ - compatibility with # older versions. - def read_bits_int(self, n): + def read_bits_int(self, n: int) -> int: # For backward compatibility return self.read_bits_int_be(n) - def read_bits_int_le(self, n): + def read_bits_int_le(self, n: int) -> int: self.bits_write_mode = False - res = 0 bits_needed = n - self.bits_left if bits_needed > 0: - # 1 bit => 1 byte - # 8 bits => 1 byte - # 9 bits => 2 bytes - bytes_needed = ((bits_needed - 1) // 8) + 1 # `ceil(bits_needed / 8)` + bytes_needed = -(-bits_needed // 8) buf = self._read_bytes_not_aligned(bytes_needed) - if PY2: - buf = bytearray(buf) for i, byte in enumerate(buf): res |= byte << (i * 8) new_bits = res >> bits_needed - res = res << self.bits_left | self.bits + res = (res << self.bits_left) | self.bits self.bits = new_bits + else: res = self.bits self.bits >>= n self.bits_left = -bits_needed % 8 - - mask = (1 << n) - 1 # no problem with this in Python (arbitrary precision integers) - res &= mask + res &= (1 << n) - 1 return res # endregion # region Byte arrays - def read_bytes(self, n): - self.align_to_byte() + def read_bytes(self, n: int) -> bytes: + if self.bits_left > 0: + self.align_to_byte() return self._read_bytes_not_aligned(n) - def _read_bytes_not_aligned(self, n): + def _read_bytes_not_aligned(self, n: int) -> bytes: if n < 0: - raise ValueError( - "requested invalid %d amount of bytes" % - (n,) - ) + raise ValueError(f"requested invalid {n} amount of bytes") - is_satisfiable = True - # When a large number of bytes is requested, try to check first - # that there is indeed enough data left in the stream. - # This avoids reading large amounts of data only to notice afterwards - # that it's not long enough. For smaller amounts of data, it's faster to - # first read the data unconditionally and check the length afterwards. - if ( - n >= 8*1024*1024 # = 8 MiB - # in Python 2, there is a common error ['file' object has no - # attribute 'seekable'], so we need to make sure that seekable() exists - and callable(getattr(self._io, 'seekable', None)) - and self._io.seekable() - ): - num_bytes_available = self.size() - self.pos() - is_satisfiable = (n <= num_bytes_available) - - if is_satisfiable: - r = self._io.read(n) - num_bytes_available = len(r) - is_satisfiable = (n <= num_bytes_available) - - if not is_satisfiable: - # noinspection PyUnboundLocalVariable - raise EOFError( - "requested %d bytes, but only %d bytes available" % - (n, num_bytes_available) - ) + if n >= 8 * 1024 * 1024: + if callable(getattr(self._io, "seekable", None)) and self._io.seekable(): + if n > self.size() - self.pos(): + raise EOFError(f"requested {n} bytes, exceeds available size") + + r: Optional[bytes] = self._io.read(n) + if r is None or len(r) < n: + raise EOFError(f"requested {n} bytes, but only {len(r)} bytes available") - # noinspection PyUnboundLocalVariable return r + # def _read_bytes_not_aligned(self, n: int) -> bytes: + # if n < 0: + # raise ValueError("requested invalid %d amount of bytes" % (n,)) + # + # is_satisfiable = True + # # When a large number of bytes is requested, try to check first + # # that there is indeed enough data left in the stream. + # # This avoids reading large amounts of data only to notice afterwards + # # that it's not long enough. For smaller amounts of data, it's faster to + # # first read the data unconditionally and check the length afterwards. + # if ( + # n >= 8 * 1024 * 1024 + # and callable(getattr(self._io, "seekable", None)) + # and self._io.seekable() + # ): + # is_satisfiable = n <= self.size() - self.pos() + # + # if is_satisfiable: + # r = self._io.read(n) + # is_satisfiable = n <= len(r) + # + # if not is_satisfiable: + # raise EOFError(f"requested {n} bytes, but only {len(r)} bytes available") + # + # # noinspection PyUnboundLocalVariable + # return r + def read_bytes_full(self): - self.align_to_byte() + if self.bits_left > 0: + self.align_to_byte() return self._io.read() - def read_bytes_term(self, term, include_term, consume_term, eos_error): + def read_bytes_term( + self, term: int, include_term: bool, consume_term: bool, eos_error: bool + ) -> bytes: self.align_to_byte() - r = b'' + r = b"" while True: c = self._io.read(1) - if c == b'': + if not c: if eos_error: - raise Exception( - "end of stream reached, but no terminator %d found" % - (term,) + raise EOFError( + f"end of stream reached, but no terminator {term} found" ) - return r if ord(c) == term: @@ -427,36 +402,33 @@ def read_bytes_term(self, term, include_term, consume_term, eos_error): if not consume_term: self._io.seek(-1, SEEK_CUR) return r - r += c - def ensure_fixed_contents(self, expected): + def ensure_fixed_contents(self, expected: bytes) -> bytes: actual = self._io.read(len(expected)) if actual != expected: - raise Exception( - "unexpected fixed contents: got %r, was waiting for %r" % - (actual, expected) + raise ValueError( + f"unexpected fixed contents: got {actual}, was waiting for {expected}" ) return actual @staticmethod - def bytes_strip_right(data, pad_byte): - return data.rstrip(KaitaiStream.byte_from_int(pad_byte)) + def bytes_strip_right(data: bytes, pad_byte: int) -> bytes: + return data.rstrip(bytes([pad_byte])) @staticmethod - def bytes_terminate(data, term, include_term): - term_index = KaitaiStream.byte_array_index_of(data, term) + def bytes_terminate(data: bytes, term: int, include_term: bool) -> bytes: + term_index = data.find(bytes([term])) if term_index == -1: return data[:] - return data[:term_index + (1 if include_term else 0)] + return data[: term_index + (1 if include_term else 0)] # endregion # endregion # region Writing - - def _ensure_bytes_left_to_write(self, n, pos): + def _ensure_bytes_left_to_write(self, n: int, pos: int) -> None: try: full_size = self._size except AttributeError: @@ -465,40 +437,39 @@ def _ensure_bytes_left_to_write(self, n, pos): num_bytes_left = full_size - pos if n > num_bytes_left: raise EOFError( - "requested to write %d bytes, but only %d bytes left in the stream" % - (n, num_bytes_left) + f"requested to write {n} bytes, but only {num_bytes_left} bytes left in the stream" ) # region Integer numbers # region Signed - def write_s1(self, v): - self.write_bytes(KaitaiStream.packer_s1.pack(v)) + def write_s1(self, v: int) -> None: + self.write_bytes(self.packer_s1.pack(v)) # region Big-endian - def write_s2be(self, v): - self.write_bytes(KaitaiStream.packer_s2be.pack(v)) + def write_s2be(self, v: int) -> None: + self.write_bytes(self.packer_s2be.pack(v)) - def write_s4be(self, v): - self.write_bytes(KaitaiStream.packer_s4be.pack(v)) + def write_s4be(self, v: int) -> None: + self.write_bytes(self.packer_s4be.pack(v)) - def write_s8be(self, v): - self.write_bytes(KaitaiStream.packer_s8be.pack(v)) + def write_s8be(self, v: int) -> None: + self.write_bytes(self.packer_s8be.pack(v)) # endregion # region Little-endian - def write_s2le(self, v): - self.write_bytes(KaitaiStream.packer_s2le.pack(v)) + def write_s2le(self, v: int) -> None: + self.write_bytes(self.packer_s2le.pack(v)) - def write_s4le(self, v): - self.write_bytes(KaitaiStream.packer_s4le.pack(v)) + def write_s4le(self, v: int) -> None: + self.write_bytes(self.packer_s4le.pack(v)) - def write_s8le(self, v): - self.write_bytes(KaitaiStream.packer_s8le.pack(v)) + def write_s8le(self, v: int) -> None: + self.write_bytes(self.packer_s8le.pack(v)) # endregion @@ -506,32 +477,32 @@ def write_s8le(self, v): # region Unsigned - def write_u1(self, v): - self.write_bytes(KaitaiStream.packer_u1.pack(v)) + def write_u1(self, v: int) -> None: + self.write_bytes(self.packer_u1.pack(v)) # region Big-endian - def write_u2be(self, v): - self.write_bytes(KaitaiStream.packer_u2be.pack(v)) + def write_u2be(self, v: int) -> None: + self.write_bytes(self.packer_u2be.pack(v)) - def write_u4be(self, v): - self.write_bytes(KaitaiStream.packer_u4be.pack(v)) + def write_u4be(self, v: int) -> None: + self.write_bytes(self.packer_u4be.pack(v)) - def write_u8be(self, v): - self.write_bytes(KaitaiStream.packer_u8be.pack(v)) + def write_u8be(self, v: int) -> None: + self.write_bytes(self.packer_u8be.pack(v)) # endregion # region Little-endian - def write_u2le(self, v): - self.write_bytes(KaitaiStream.packer_u2le.pack(v)) + def write_u2le(self, v: int) -> None: + self.write_bytes(self.packer_u2le.pack(v)) - def write_u4le(self, v): - self.write_bytes(KaitaiStream.packer_u4le.pack(v)) + def write_u4le(self, v: int) -> None: + self.write_bytes(self.packer_u4le.pack(v)) - def write_u8le(self, v): - self.write_bytes(KaitaiStream.packer_u8le.pack(v)) + def write_u8le(self, v: int) -> None: + self.write_bytes(self.packer_u8le.pack(v)) # endregion @@ -543,21 +514,21 @@ def write_u8le(self, v): # region Big-endian - def write_f4be(self, v): - self.write_bytes(KaitaiStream.packer_f4be.pack(v)) + def write_f4be(self, v: float) -> None: + self.write_bytes(self.packer_f4be.pack(v)) - def write_f8be(self, v): + def write_f8be(self, v: float) -> None: self.write_bytes(KaitaiStream.packer_f8be.pack(v)) # endregion # region Little-endian - def write_f4le(self, v): - self.write_bytes(KaitaiStream.packer_f4le.pack(v)) + def write_f4le(self, v: float) -> None: + self.write_bytes(self.packer_f4le.pack(v)) - def write_f8le(self, v): - self.write_bytes(KaitaiStream.packer_f8le.pack(v)) + def write_f8le(self, v: float) -> None: + self.write_bytes(self.packer_f8le.pack(v)) # endregion @@ -565,11 +536,9 @@ def write_f8le(self, v): # region Unaligned bit values - def write_align_to_byte(self): + def write_align_to_byte(self) -> None: if self.bits_left > 0: - b = self.bits - if not self.bits_le: - b <<= 8 - self.bits_left + b = self.bits << (8 - self.bits_left) if not self.bits_le else self.bits # We clear the `bits_left` and `bits` fields using align_to_byte() # before writing the byte in the stream so that it happens even in @@ -595,54 +564,25 @@ def write_align_to_byte(self): # failure of the "align to byte" operation, but the writing of some # bits to the stream that was requested earlier. self.align_to_byte() - self._write_bytes_not_aligned(KaitaiStream.byte_from_int(b)) + self._write_bytes_not_aligned(bytes([b])) - def write_bits_int_be(self, n, val): + def write_bits_int_be(self, n: int, val: int) -> None: self.bits_le = False - self.bits_write_mode = True - - mask = (1 << n) - 1 # no problem with this in Python (arbitrary precision integers) - val &= mask - - bits_to_write = self.bits_left + n - bytes_needed = ((bits_to_write - 1) // 8) + 1 # `ceil(bits_to_write / 8)` - - # Unlike self._io.tell(), pos() respects the `bits_left` field (it - # returns the stream position as if it were already aligned on a byte - # boundary), which ensures that we report the same numbers of bytes here - # as read_bits_int_*() methods would. - self._ensure_bytes_left_to_write(bytes_needed - (1 if self.bits_left > 0 else 0), self.pos()) - - bytes_to_write = bits_to_write // 8 - self.bits_left = bits_to_write % 8 - - if bytes_to_write > 0: - buf = bytearray(bytes_to_write) - - mask = (1 << self.bits_left) - 1 # `bits_left` is in range 0..7 - new_bits = val & mask - val = val >> self.bits_left | self.bits << (n - self.bits_left) - self.bits = new_bits - - for i in range(bytes_to_write - 1, -1, -1): - buf[i] = val & 0xff - val >>= 8 - self._write_bytes_not_aligned(buf) - else: - self.bits = self.bits << n | val + self._write_bits_int(n, val) - def write_bits_int_le(self, n, val): + def write_bits_int_le(self, n: int, val: int) -> None: self.bits_le = True + self._write_bits_int(n, val) + + def _write_bits_int(self, n: int, val: int) -> None: self.bits_write_mode = True + val &= (1 << n) - 1 bits_to_write = self.bits_left + n - bytes_needed = ((bits_to_write - 1) // 8) + 1 # `ceil(bits_to_write / 8)` - - # Unlike self._io.tell(), pos() respects the `bits_left` field (it - # returns the stream position as if it were already aligned on a byte - # boundary), which ensures that we report the same numbers of bytes here - # as read_bits_int_*() methods would. - self._ensure_bytes_left_to_write(bytes_needed - (1 if self.bits_left > 0 else 0), self.pos()) + bytes_needed = -(-bits_to_write // 8) + self._ensure_bytes_left_to_write( + bytes_needed - (1 if self.bits_left > 0 else 0), self.pos() + ) bytes_to_write = bits_to_write // 8 old_bits_left = self.bits_left @@ -650,44 +590,42 @@ def write_bits_int_le(self, n, val): if bytes_to_write > 0: buf = bytearray(bytes_to_write) - - new_bits = val >> (n - self.bits_left) # no problem with this in Python (arbitrary precision integers) - val = val << old_bits_left | self.bits + new_bits = val >> (n - self.bits_left) + val = (val << old_bits_left) | self.bits self.bits = new_bits for i in range(bytes_to_write): - buf[i] = val & 0xff + buf[i] = val & 0xFF val >>= 8 self._write_bytes_not_aligned(buf) else: self.bits |= val << old_bits_left - mask = (1 << self.bits_left) - 1 # `bits_left` is in range 0..7 - self.bits &= mask + self.bits &= (1 << self.bits_left) - 1 # endregion # region Byte arrays - def write_bytes(self, buf): + def write_bytes(self, buf: Union[bytes, bytearray]) -> None: self.write_align_to_byte() self._write_bytes_not_aligned(buf) - def _write_bytes_not_aligned(self, buf): + def _write_bytes_not_aligned(self, buf: Union[bytes, bytearray]) -> None: n = len(buf) self._ensure_bytes_left_to_write(n, self._io.tell()) self._io.write(buf) - def write_bytes_limit(self, buf, size, term, pad_byte): + def write_bytes_limit( + self, buf: Union[bytes, bytearray], size: int, term: int, pad_byte: int + ) -> None: n = len(buf) self.write_bytes(buf) if n < size: self.write_u1(term) - pad_len = size - n - 1 - for _ in range(pad_len): - self.write_u1(pad_byte) + self._write_bytes_not_aligned(bytes([pad_byte] * (size - n - 1))) elif n > size: - raise ValueError("writing %d bytes, but %d bytes were given" % (size, n)) + raise ValueError(f"writing {size} bytes, but {n} bytes were given") # endregion @@ -696,64 +634,55 @@ def write_bytes_limit(self, buf, size, term, pad_byte): # region Byte array processing @staticmethod - def process_xor_one(data, key): - if PY2: - return bytes(bytearray(v ^ key for v in bytearray(data))) - + def process_xor_one(data: Union[bytes, bytearray], key: int) -> bytes: return bytes(v ^ key for v in data) @staticmethod - def process_xor_many(data, key): - if PY2: - return bytes(bytearray(a ^ b for a, b in zip(bytearray(data), itertools.cycle(bytearray(key))))) - + def process_xor_many( + data: Union[bytes, bytearray], key: Union[bytes, bytearray] + ) -> bytes: return bytes(a ^ b for a, b in zip(data, itertools.cycle(key))) @staticmethod - def process_rotate_left(data, amount, group_size): + def process_rotate_left( + data: Union[bytes, bytearray], amount: int, group_size: int + ) -> bytes: if group_size != 1: - raise Exception( - "unable to rotate group of %d bytes yet" % - (group_size,) - ) + raise ValueError(f"unable to rotate group of {group_size} bytes yet") - anti_amount = -amount % (group_size * 8) - - r = bytearray(data) - for i, byte in enumerate(r): - r[i] = (byte << amount) & 0xff | (byte >> anti_amount) - return bytes(r) + anti_amount = -amount % 8 + return bytes((byte << amount) & 0xFF | (byte >> anti_amount) for byte in data) # endregion # region Misc runtime operations @staticmethod - def int_from_byte(v): - return ord(v) if PY2 else v + def int_from_byte(v: Union[bytes, bytearray]) -> int: + return v[0] @staticmethod - def byte_from_int(i): - return chr(i) if PY2 else bytes((i,)) + def byte_from_int(i: int) -> bytes: + return bytes([i]) @staticmethod - def byte_array_index(data, i): - return KaitaiStream.int_from_byte(data[i]) + def byte_array_index(data: Union[bytes, bytearray], i: int) -> int: + return data[i] @staticmethod - def byte_array_min(b): - return KaitaiStream.int_from_byte(min(b)) + def byte_array_min(b: Union[bytes, bytearray]) -> int: + return min(b) @staticmethod - def byte_array_max(b): - return KaitaiStream.int_from_byte(max(b)) + def byte_array_max(b: Union[bytes, bytearray]) -> int: + return max(b) @staticmethod - def byte_array_index_of(data, b): - return data.find(KaitaiStream.byte_from_int(b)) + def byte_array_index_of(data: Union[bytes, bytearray], b: int) -> int: + return data.find(bytes([b])) @staticmethod - def resolve_enum(enum_obj, value): + def resolve_enum(enum_obj: Type[Enum], value: int) -> Any: """Resolves value using enum: if the value is not found in the map, we'll just use literal value per se. Works around problem with Python enums throwing an exception when encountering unknown value. @@ -765,7 +694,7 @@ def resolve_enum(enum_obj, value): # endregion - def to_byte_array(self): + def to_byte_array(self) -> bytes: pos = self.pos() self.seek(0) r = self.read_bytes_full() @@ -773,18 +702,18 @@ def to_byte_array(self): return r class WriteBackHandler(object): - def __init__(self, pos, handler): + def __init__(self, pos: int, handler: Callable[[KTStream], None]) -> None: self.pos = pos self.handler = handler - def write_back(self, parent): + def write_back(self, parent: KTStream) -> None: parent.seek(self.pos) self.handler(parent) - def add_child_stream(self, child): + def add_child_stream(self, child: WriteBackHandler) -> None: self.child_streams.append(child) - def write_back_child_streams(self, parent=None): + def write_back_child_streams(self, parent: Union[KTStream, None] = None) -> None: _pos = self.pos() for child in self.child_streams: child.write_back_child_streams(self) @@ -797,7 +726,7 @@ def write_back_child_streams(self, parent=None): if parent is not None: self._write_back(parent) - def _write_back(self, parent): + def _write_back(self, parent: KTStream) -> None: self.write_back_handler.write_back(parent) @@ -806,6 +735,7 @@ class KaitaiStructError(Exception): Stores KSY source path, pointing to an element supposedly guilty of an error. """ + def __init__(self, msg, src_path): super(KaitaiStructError, self).__init__("%s: %s" % (src_path, msg)) self.src_path = src_path @@ -816,16 +746,25 @@ class UndecidedEndiannessError(KaitaiStructError): switch, but nothing matches (although using endianness expression implies that there should be some positive result). """ + def __init__(self, src_path): - super(UndecidedEndiannessError, self).__init__("unable to decide on endianness for a type", src_path) + super(UndecidedEndiannessError, self).__init__( + "unable to decide on endianness for a type", src_path + ) class ValidationFailedError(KaitaiStructError): """Common ancestor for all validation failures. Stores pointer to KaitaiStream IO object which was involved in an error. """ + def __init__(self, msg, io, src_path): - super(ValidationFailedError, self).__init__(("" if io is None else "at pos %d: " % (io.pos(),)) + "validation failed: " + msg, src_path) + super(ValidationFailedError, self).__init__( + ("" if io is None else "at pos %d: " % (io.pos(),)) + + "validation failed: " + + msg, + src_path, + ) self.io = io @@ -833,8 +772,13 @@ class ValidationNotEqualError(ValidationFailedError): """Signals validation failure: we required "actual" value to be equal to "expected", but it turned out that it's not. """ + def __init__(self, expected, actual, io, src_path): - super(ValidationNotEqualError, self).__init__("not equal, expected %s, but got %s" % (repr(expected), repr(actual)), io, src_path) + super(ValidationNotEqualError, self).__init__( + "not equal, expected %s, but got %s" % (repr(expected), repr(actual)), + io, + src_path, + ) self.expected = expected self.actual = actual @@ -843,8 +787,13 @@ class ValidationLessThanError(ValidationFailedError): """Signals validation failure: we required "actual" value to be greater than or equal to "min", but it turned out that it's not. """ + def __init__(self, min_bound, actual, io, src_path): - super(ValidationLessThanError, self).__init__("not in range, min %s, but got %s" % (repr(min_bound), repr(actual)), io, src_path) + super(ValidationLessThanError, self).__init__( + "not in range, min %s, but got %s" % (repr(min_bound), repr(actual)), + io, + src_path, + ) self.min = min_bound self.actual = actual @@ -853,8 +802,13 @@ class ValidationGreaterThanError(ValidationFailedError): """Signals validation failure: we required "actual" value to be less than or equal to "max", but it turned out that it's not. """ + def __init__(self, max_bound, actual, io, src_path): - super(ValidationGreaterThanError, self).__init__("not in range, max %s, but got %s" % (repr(max_bound), repr(actual)), io, src_path) + super(ValidationGreaterThanError, self).__init__( + "not in range, max %s, but got %s" % (repr(max_bound), repr(actual)), + io, + src_path, + ) self.max = max_bound self.actual = actual @@ -863,8 +817,11 @@ class ValidationNotAnyOfError(ValidationFailedError): """Signals validation failure: we required "actual" value to be from the list, but it turned out that it's not. """ + def __init__(self, actual, io, src_path): - super(ValidationNotAnyOfError, self).__init__("not any of the list, got %s" % (repr(actual)), io, src_path) + super(ValidationNotAnyOfError, self).__init__( + "not any of the list, got %s" % (repr(actual)), io, src_path + ) self.actual = actual @@ -872,14 +829,24 @@ class ValidationExprError(ValidationFailedError): """Signals validation failure: we required "actual" value to match the expression, but it turned out that it doesn't. """ + def __init__(self, actual, io, src_path): - super(ValidationExprError, self).__init__("not matching the expression, got %s" % (repr(actual)), io, src_path) + super(ValidationExprError, self).__init__( + "not matching the expression, got %s" % (repr(actual)), io, src_path + ) self.actual = actual class ConsistencyError(Exception): def __init__(self, attr_id, actual, expected): - super(ConsistencyError, self).__init__("Check failed: %s, expected: %s, actual: %s" % (attr_id, repr(expected), repr(actual))) + super(ConsistencyError, self).__init__( + "Check failed: %s, expected: %s, actual: %s" + % (attr_id, repr(expected), repr(actual)) + ) self.id = attr_id self.actual = actual self.expected = expected + + +if __name__ == "__main__": + pass diff --git a/kaitaistruct.pyi b/kaitaistruct.pyi new file mode 100644 index 0000000..b30bfff --- /dev/null +++ b/kaitaistruct.pyi @@ -0,0 +1,902 @@ +import struct +from _typeshed import Incomplete +from enum import Enum +from io import BytesIO +from mmap import mmap +from typing import Any, Callable, List, Optional, Type, TypeVar, Union +API_VERSION = (0, 11) + +KTStruct = TypeVar("KTStruct", bound="KaitaiStruct") +KTStream = TypeVar("KTStream", bound="KaitaiStream") + +class KaitaiStruct: + """ + The KaitaiStruct class represents a Kaitai Struct binary format parser. + + Parameters: + - stream: A KTStream object representing the input stream from which the binary data will be parsed. + + Methods: + - __init__(self, stream: KTStream) -> None: + Initializes a new KaitaiStruct object. + + - __enter__(self) -> KTStruct: + Context manager method that allows the KaitaiStruct object to be used in a "with" statement. + + - __exit__(self, *args, **kwargs) -> None: + Context manager method that is called when exiting a "with" statement. + + - close(self) -> None: + Closes the KaitaiStruct object and releases any resources associated with it. + + - from_file(cls, filename: str) -> KTStruct: + Creates a new KaitaiStruct object from a file specified by the filename parameter. + + - from_bytes(cls, buf: mmap) -> KTStruct: + Creates a new KaitaiStruct object from a byte buffer specified by the buf parameter. + + - from_io(cls, io: Union[BytesIO, mmap]) -> KTStruct: + Creates a new KaitaiStruct object from an input stream specified by the io parameter. + """ + + def __init__(self, stream: KTStream) -> None: + """ + :param stream: A Kaitai stream object representing the input stream to parse. + :type stream: KTStream + + Initializes a KaitaiStruct object with the given stream. + """ + self._io: KTStream = ... + + ... + def __enter__(self) -> KTStruct: + """ + Enter the context manager. + + :returns: An instance of KTStruct representing the entered context. + :rtype: KTStruct + """ + ... + def __exit__(self, *args, **kwargs) -> None: + """ + Context management method that is called when exiting the 'with' statement block. + + :param args: The positional arguments passed to the method. + :param kwargs: The keyword arguments passed to the method. + :return: None + """ + ... + def close(self) -> None: + """ + Close the KaitaiStruct instance and release any associated resources. + + :return: None + """ + ... + @classmethod + def from_file(cls, filename: str) -> KTStruct: + """ + :param filename: The name of the file to read from. + :return: An instance of KaitaiStruct with a KaiStream object representing the contents of the file. + + This method reads the contents of a file and returns an instance of the KaitaiStruct subclass that represents the data in the file. + """ + ... + @classmethod + def from_bytes(cls, buf: mmap) -> KTStruct: + """ + Converts a memory-mapped file, represented by `buf`, into an instance of `KTStruct`. + + :param buf: The memory-mapped file to convert. + :return: An instance of `KTStruct` representing the contents of the memory-mapped file. + """ + ... + @classmethod + def from_io(cls, io: Union[BytesIO, mmap]) -> KTStruct: + """ """ + ... + +class ReadWriteKaitaiStruct(KaitaiStruct): + """A class to read and write data using the Kaitai Struct format. + + This class extends the base KaitaiStruct class to add functionality for writing data. + + Attributes: + See the base KaitaiStruct class for the list of inherited attributes. + + Methods: + write_: Write a value of the specified data type to the underlying data stream. + """ + + ... + +class KaitaiStream: + """ """ + + + def __init__(self, io: mmap) -> None: + """ + __init__(io: mmap) + + Initialize a new instance of KaitaiStream. + + :param io: A memory-mappable file object to read from. It must be an instance of mmap. + """ + self._io: mmap = ... + self.bits: int = ... + self.bits_left: int = ... + + self.bits_le: bool = ... + self.bits_write_mode: bool = ... + + self.write_back_handler: Optional[callable] = ... + self.child_streams: List[KTStream] = ... + self._size: Optional[int] = ... + ... + def __enter__(self) -> KTStream: + """ + Subclass of BytesIO that implements __enter__ and __exit__ methods for use as a context manager. + + Usage: + with KaitaiStream(data) as ks: + # Perform operations on ks + + :return: A KaitaiStream object. + :rtype: KaitaiStream + """ + ... + def __exit__(self, *args, **kwargs) -> None: + """ + :param args: Tuple of positional arguments passed to the method. + :param kwargs: Dictionary of keyword arguments passed to the method. + :return: None + + The __exit__ method is a special method in the `KaitaiStream` class. It is called when exiting a context managed by a `with` statement. This method allows for any necessary cleanup or finalization operations to be performed. + + Usage example: + ``` + with KaitaiStream() as stream: + # Code within the context managed by the `with` statement + # ... + + # After the code block completes or raises an exception, + # the __exit__ method is automatically called. + ``` + """ + ... + def close(self) -> None: + """ + Close the KaitaiStream object. + + :return: None + """ + ... + def is_eof(self) -> bool: + """ + Check if the stream is at the end of the file. + + Returns: + True if the stream is at the end of the file, False otherwise. + """ + ... + def seek(self, n: int) -> None: + """ + Seek to a position in the stream. + + Args: + n: Position to seek to. + """ + ... + def pos(self) -> int: + """ + Get the current position in the stream. + + Returns: + Current position in the stream. + """ + ... + def size(self) -> int: + """ + Get the size of the stream. + + Returns: + Size of the stream. + """ + ... + packer_s1: struct.Struct + packer_s2be: struct.Struct + packer_s4be: struct.Struct + packer_s8be: struct.Struct + packer_s2le: struct.Struct + packer_s4le: struct.Struct + packer_s8le: struct.Struct + packer_u1: struct.Struct + packer_u2be: struct.Struct + packer_u4be: struct.Struct + packer_u8be: struct.Struct + packer_u2le: struct.Struct + packer_u4le: struct.Struct + packer_u8le: struct.Struct + packer_f4be: struct.Struct + packer_f8be: struct.Struct + packer_f4le: struct.Struct + packer_f8le: struct.Struct + def read_s1(self) -> int: + """ + Read a signed 1-byte integer from the stream. + + :return: The read value as an integer. + :rtype: int + """ + ... + def read_s2be(self) -> int: + """ + Read a signed 2-byte big-endian integer from the stream. + + :return: The read value as an integer. + :rtype: int + """ + ... + def read_s4be(self) -> int: + """ + Read a signed 4-byte big-endian integer from the stream. + + :return: The read value as an integer. + """ + ... + def read_s8be(self) -> int: + """ + Reads a signed 8-bit integer in big-endian format from the stream. + + :return: The signed 8-bit integer value read from the stream. + :rtype: int + """ + ... + def read_s2le(self) -> int: + """Reads a signed 2-byte little-endian integer from the stream. + + :return: The signed 2-byte little-endian integer read from the stream. + :rtype: int + """ + ... + def read_s4le(self) -> int: + """Read a 4-byte signed integer in little-endian format. + + :return: The 4-byte signed integer read from the stream. + :rtype: int + """ + ... + def read_s8le(self) -> int: + """ + Read a signed 8-bit integer in little-endian format from the stream. + + :return: The signed 8-bit integer read from the stream as an integer value. + """ + ... + def read_u1(self) -> int: + """ + Reads and returns an unsigned 1-byte integer from the stream. + + :return: The read unsigned 1-byte integer. + """ + ... + def read_u2be(self) -> int: + """ + Reads and interprets a 2-byte unsigned big-endian integer from the stream. + + :return: The 2-byte unsigned big-endian integer read from the stream. + """ + ... + def read_u4be(self) -> int: + """ + Reads a 4-byte big-endian unsigned integer from the stream. + + :return: The value of the 4-byte big-endian unsigned integer read from the stream. + :rtype: int + """ + ... + def read_u8be(self) -> int: + """ + Reads an unsigned 8-bit integer (big endian) from the underlying stream. + + :return: The value of the unsigned 8-bit integer. + :rtype: int + """ + ... + def read_u2le(self) -> int: + """ + Reads a 2-byte unsigned integer value in little-endian format from the current position of the stream. + + :return: The 2-byte unsigned integer value read from the stream. + :rtype: int + """ + ... + def read_u4le(self) -> int: + """ + Reads a 4-byte unsigned integer in little-endian format from the stream. + + :return: The 4-byte unsigned integer read from the stream. + :rtype: int + """ + ... + def read_u8le(self) -> int: + """ + Reads an unsigned 8-bit integer from the stream in little-endian byte order. + + :return: The unsigned 8-bit integer. + """ + ... + def read_f4be(self) -> float: + """ + Reads and returns a 4-byte big-endian floating point number from the current position in the KaitaiStream. + + :return: A 4-byte big-endian floating point number read from the stream. + :rtype: float + """ + ... + def read_f8be(self) -> float: + """ + Reads an 8-byte floating point number in big-endian format from the KaitaiStream. + + :return: The 8-byte floating point number read from the KaitaiStream. + :rtype: float + """ + ... + def read_f4le(self) -> float: + """ + Reads a 4-byte floating-point value from the KaitaiStream. + + :return: A floating-point value read from the stream. + """ + ... + def read_f8le(self) -> float: + """ + Reads an 8-byte floating-point value from the stream in little-endian format. + + :return: The read floating-point value. + :rtype: float + """ + ... + def align_to_byte(self) -> None: + """ + Aligns the current position of the KaitaiStream object to the nearest byte boundary. + + :return: None + """ + ... + def read_bits_int_be(self, n: int) -> int: + """ + Reads and returns an integer value from the stream by reading `n` bits in big-endian format. + + :param n: Number of bits to read + :return: Integer value read from the stream + """ + ... + def read_bits_int(self, n: int) -> int: + """ + Reads an integer value from the stream as a specified number of bits. + + :param n: The number of bits to read. + :return: The integer value read from the stream. + """ + ... + def read_bits_int_le(self, n: int) -> int: + """ + Reads an unsigned integer of length `n` bits from the stream in little-endian format. + + :param n: The number of bits to read. + :return: The unsigned integer value. + + """ + ... + def read_bytes(self, n: int) -> bytes: + """ + :param n: The number of bytes to read from the stream. + :return: The bytes read from the stream. + + """ + ... + + def _read_bytes_not_aligned(self, n: int) -> bytes: + """ + Reads a specified number of bytes from the stream. + + :param n: The number of bytes to read. + :return: The bytes read from the stream. + """ + ... + def read_bytes_full(self): + """ + Reads and returns all available bytes in the stream. + + :return: All available bytes in the stream. + :rtype: bytes + """ + ... + def read_bytes_term( + self, term: int, include_term: bool, consume_term: bool, eos_error: bool + ) -> bytes: + """ + Reads bytes from the stream until a specific termination condition is met. + + :param term: The termination condition (byte value or ordinal). + :param include_term: If True, includes the termination byte in the returned bytes. Otherwise, excludes it. + :param consume_term: If True, consumes the termination byte from the stream. Otherwise, keeps it in the stream. + :param eos_error: If True and end of stream is encountered before the termination condition, raises an error. Otherwise, returns the bytes read until that point. + :return: The bytes read from the stream. + + :rtype: bytes + """ + ... + def ensure_fixed_contents(self, expected: bytes) -> bytes: + """ + Ensures that the given `expected` bytes are present in the stream. + + :param expected: The bytes to be checked in the stream. + :type expected: bytes + :return: The bytes if found in the stream. + :rtype: bytes + """ + ... + @staticmethod + def bytes_strip_right(data: bytes, pad_byte: int) -> bytes: + """ + Strip trailing padding bytes from the given data. + + :param data: The input bytes to be stripped. + :param pad_byte: The byte value used for padding. + :return: The data with trailing padding bytes stripped. + """ + ... + @staticmethod + def bytes_terminate(data: bytes, term: int, include_term: bool) -> bytes: + """ + :param data: The input bytes data that needs to be terminated. + :param term: The termination character or byte value. + :param include_term: A boolean flag indicating whether to include the termination character in the result. + :return: The terminated bytes data. + + This method `bytes_terminate` takes in a `data` parameter which is a bytes object representing the input data that needs to be terminated. The `term` parameter defines the termination character or byte value. The `include_term` parameter is a boolean flag indicating whether the termination character should be included in the result or not. + + The return value of this method is a bytes object that represents the terminated data. If `include_term` is `True`, the termination character will be included in the result; otherwise, it will be excluded. + + Example usage: + + ```python + data = b'Hello, world!' + term = 0x00 + include_term = True + result = KaitaiStream.bytes_terminate(data, term, include_term) + print(result) # Output: b'Hello, world!\x00' + ``` + """ + ... + def write_s1(self, v: int) -> None: + """ + Write a single signed 1-byte integer value to the output stream. + + :param v: The value to be written. + :return: None + + """ + ... + def write_s2be(self, v: int) -> None: + """ + Write the given 16-bit signed integer in big-endian byte order to the + output stream. + + :param v: The 16-bit signed integer to write. + :type v: int + :return: None + """ + ... + def write_s4be(self, v: int) -> None: + """ + Write an int value in signed 4-byte big-endian encoding to the output stream. + + :param v: The int value to be written. + :return: None + """ + ... + def write_s8be(self, v: int) -> None: + """ + Writes a signed 8-bit integer in big-endian format to the stream. + + :param v: The signed 8-bit integer value to write. + :type v: int + :return: None + """ + ... + def write_s2le(self, v: int) -> None: + """ + Writes a signed 2-byte integer value (`v`) to the stream in little-endian format. + + :param v: The value to be written. + :return: None + """ + ... + def write_s4le(self, v: int) -> None: + """ + Writes a 32-bit signed integer in little-endian byte order to the underlying stream. + + :param v: The value to write. + :type v: int + :return: None + :rtype: None + + """ + ... + def write_s8le(self, v: int) -> None: + """ + Writes a signed 8-bit integer in little-endian format to the stream. + + :param v: The signed 8-bit integer value to write. + :type v: int + + :return: None + :rtype: None + + """ + ... + def write_u1(self, v: int) -> None: + """ + Writes a 1-byte unsigned integer to the KaitaiStream. + + :param v: The value to be written as a 1-byte unsigned integer. + :return: None + """ + ... + def write_u2be(self, v: int) -> None: + """ + Write a 2-byte unsigned integer in big-endian byte order to the stream. + + :param v: The value to write. + :return: None + + """ + ... + def write_u4be(self, v: int) -> None: + """ + Write a 4-byte unsigned integer in big-endian byte order to the underlying stream. + + :param v: The value to be written. + :return: None + """ + ... + def write_u8be(self, v: int) -> None: + """ + Writes an unsigned 8-bit integer (big endian) to the underlying stream. + + :param v: The value to be written. + :type v: int + :return: None + """ + ... + def write_u2le(self, v: int) -> None: + """ + + .. method:: KaitaiStream.write_u2le(v: int) -> None + + This method writes an unsigned 2-byte integer in little-endian format to the stream. + + :param v: The value to be written. + :type v: int + :return: None + + """ + ... + def write_u4le(self, v: int) -> None: + """ + Write a 4-byte unsigned integer (little-endian) to the output stream. + + :param v: The value to write. + :return: None + + """ + ... + def write_u8le(self, v: int) -> None: + """ + Write an unsigned 8-bit integer in little-endian byte order to the underlying stream. + + :param v: The value to write, must be within the range [0, 255]. + :return: None + """ + ... + def write_f4be(self, v: float) -> None: + """ + Writes a 4-byte floating point value in big endian format to the output stream. + + :param v: The 4-byte floating point value to write. + :type v: float + :return: None + """ + ... + def write_f8be(self, v: float) -> None: + """ + Write a float value in 8-byte big-endian format to the current position in the output stream. + + :param v: The float value to write + :return: None + + """ + ... + def write_f4le(self, v: float) -> None: + """ + Writes a 4-byte little-endian (f4le) floating-point value to the output stream. + + :param v: The floating-point value to be written to the stream. + :type v: float + :return: None + """ + ... + def write_f8le(self, v: float) -> None: + """ + Writes a 8-byte floating-point value in little-endian format to the output stream. + + :param v: The float value to write. + :type v: float + :return: None + :rtype: None + """ + ... + def write_align_to_byte(self) -> None: + """ + Write a zero byte padding to the current position in the output stream, if necessary, + in order to align the position to the next byte boundary. + + :return: None + """ + ... + + def _write_bytes_not_aligned(self, buf: Union[bytes, bytearray]) -> None: + """ + Writes the given bytes to the stream. + + :param buf: The bytes to write. + :return: None + """ + ... + + def _write_bits_int(self, n: int, val: int) -> None: + """ + Writes an integer value to the stream as a specified number of bits. + + :param n: The number of bits to write. + :param val: The integer value to write. + :return: None + """ + ... + + def _ensure_bytes_left_to_write(self, n: int, pos: int) -> None: + """ + Ensures that the specified number of bytes are available to write to the stream. + + :param n: The number of bytes to write. + :param pos: The current position in the stream. + :return: None + """ + ... + def write_bits_int_be(self, n: int, val: int) -> None: + """ + Write bits of an integer value in big-endian order to the output stream. + + :param n: Number of bits to write. + :param val: Integer value to write. + :return: None + + Example Usage: + stream = KaitaiStream() + stream.write_bits_int_be(8, 255) + """ + ... + def write_bits_int_le(self, n: int, val: int) -> None: + """ + Writes an `n`-bit integer value in little-endian format to the stream. + + :param n: The number of bits to write. + :type n: int + :param val: The integer value to write. + :type val: int + :return: None + :rtype: None + """ + ... + def write_bytes(self, buf: Union[bytes, bytearray]) -> None: + """ + Write bytes to the underlying binary stream. + + :param buf: The bytes to be written. + :type buf: Union[bytes, bytearray] + :return: None + :rtype: None + """ + ... + def write_bytes_limit( + self, buf: Union[bytes, bytearray], size: int, term: int, pad_byte: int + ) -> None: + """ + Writes a limited number of bytes from the given buffer to the stream. + + :param buf: The buffer containing the bytes to write. + :type buf: Union[bytes, bytearray] + :param size: The maximum number of bytes to write from the buffer. + :type size: int + :param term: The termination marker indicating the end of the data in the buffer. + :type term: int + :param pad_byte: The byte value used for padding if the size limit is not reached. + :type pad_byte: int + :return: None + """ + ... + @staticmethod + def process_xor_one(data: Union[bytes, bytearray], key: int) -> bytes: + """ + :param data: The input data to be XORed. + :param key: The key used for XOR operation. + :return: The XORed bytes. + + This method takes in a sequence of bytes represented by either a bytes or bytearray object, + and performs a XOR operation on each byte of the input data using the provided key. The key + should be an integer value. + + The method returns the result of the XOR operation as a bytes object. + + + """ + ... + @staticmethod + def process_xor_many( + data: Union[bytes, bytearray], key: Union[bytes, bytearray] + ) -> bytes: + """ + :param data: The input data to be processed. + :param key: The XOR key used for encryption. + :return: The processed data after XOR encryption. + + This method takes an input data and performs XOR encryption using the specified key. It returns the processed data as the result of the XOR operation. + """ + ... + @staticmethod + def process_rotate_left( + data: Union[bytes, bytearray], amount: int, group_size: int + ) -> bytes: + """ + :method process_rotate_left: + :param data: The input data to rotate left. + :param amount: The number of positions to rotate the data left. + :param group_size: The size of each rotating group. + :return: The rotated data. + + This method takes in input data and rotates it left by the specified number of positions. The rotation is done in groups of the specified size. + The method returns the rotated data. + + :type data: Union[bytes, bytearray] + :type amount: int + :type group_size: int + :return: bytes + """ + ... + @staticmethod + def int_from_byte(v: Union[bytes, bytearray]) -> int: + """ + Converts a byte or bytearray value to an integer. + + :param v: The byte or bytearray value to convert. + :return: The converted integer value. + + """ + ... + @staticmethod + def byte_from_int(i: int) -> bytes: + """ + :param i: an integer value representing a byte + :return: a bytes object representing the input integer value as a single byte + """ + ... + @staticmethod + def byte_array_index(data: Union[bytes, bytearray], i: int) -> int: + """ + :param data: The byte array or byte string to search in. + :param i: The index to search for in the byte array. + :return: The index of the first occurrence of `i` in `data`. If `i` is not found, returns -1. + """ + ... + @staticmethod + def byte_array_min(b: Union[bytes, bytearray]) -> int: + """ + :param b: The byte array to find the minimum value in. + :type b: Union[bytes, bytearray] + :return: The minimum value in the byte array. + :rtype: int + """ + ... + @staticmethod + def byte_array_max(b: Union[bytes, bytearray]) -> int: + """ + :param b: A byte array. + :return: The maximum value of the byte array. + """ + ... + @staticmethod + def byte_array_index_of(data: Union[bytes, bytearray], b: int) -> int: + """ + :param data: The byte array in which to search for the specified byte. + :type data: Union[bytes, bytearray] + :param b: The byte to search for in the byte array. + :type b: int + :return: The index of the first occurrence of the specified byte in the byte array, or -1 if the byte is not found. + :rtype: int + """ + ... + @staticmethod + def resolve_enum(enum_obj: Type[Enum], value: int) -> Any: + """ + Resolves an enum value based on its integer value. + + :param enum_obj: The enum class. + :type enum_obj: Type[Enum] + :param value: The integer value of the enum. + :type value: int + :return: The enum value corresponding to the provided integer value. + :rtype: Any + """ + ... + def to_byte_array(self) -> bytes: + """ + Convert the KaitaiStream object to a byte array. + + :return: A byte array representation of the KaitaiStream object. + :rtype: bytes + """ + ... + + class WriteBackHandler: + """ """ + + pos: Incomplete + handler: Incomplete + def __init__(self, pos: int, handler: Callable[[KTStream], None]) -> None: ... + def write_back(self, parent: KTStream) -> None: ... + + def add_child_stream(self, child: WriteBackHandler) -> None: + """ + :param child: A child stream to be added to the current stream. + :return: None + """ + ... + def write_back_child_streams(self, parent: Union[KTStream, None] = ...) -> None: + """ + Write back child streams. + + :param parent: The parent stream object to write back to. + :return: None + """ + ... + + def _write_back(self, parent: KTStream) -> None: + """ + Write back the current stream to the parent stream. + + :param parent: The parent stream object to write back to. + :return: None + """ + ... + +class KaitaiStructError(Exception): ... + +class UndecidedEndiannessError(KaitaiStructError): ... + +class ValidationFailedError(KaitaiStructError): ... + +class ValidationNotEqualError(ValidationFailedError): ... + +class ValidationLessThanError(ValidationFailedError): ... + +class ValidationGreaterThanError(ValidationFailedError): ... + +class ValidationNotAnyOfError(ValidationFailedError): + ... + +class ValidationExprError(ValidationFailedError): ... + +class ConsistencyError(Exception): ... diff --git a/py.typed b/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/pyproject.toml b/pyproject.toml index c6d7bce..fed528d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,3 @@ [build-system] -requires = ["setuptools >= 38.6.0"] +requires = ["setuptools"] build-backend = "setuptools.build_meta" diff --git a/setup.cfg b/setup.cfg index 455b64b..0d9e2ee 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,5 @@ [metadata] name = kaitaistruct -version = attr: kaitaistruct.__version__ author = Kaitai Project author_email = greycat@kaitai.io url = https://kaitai.io @@ -19,12 +18,6 @@ classifiers = Intended Audience :: Developers Topic :: Software Development :: Build Tools License :: OSI Approved :: MIT License - Programming Language :: Python :: 2 - Programming Language :: Python :: 2.7 - Programming Language :: Python :: 3 - Programming Language :: Python :: 3.4 - Programming Language :: Python :: 3.5 - Programming Language :: Python :: 3.6 Programming Language :: Python :: 3.7 Programming Language :: Python :: 3.8 Programming Language :: Python :: 3.9 @@ -36,19 +29,16 @@ classifiers = [options] zip_safe = True include_package_data = True -py_modules = kaitaistruct -python_requires = >=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.* -install_requires = - enum34; python_version < "3.4" -setup_requires = - setuptools >= 38.6.0 +python_requires = >=3.7 + [bdist_wheel] # This flag says that the code is written to work on both Python 2 and Python # 3. If at all possible, it is good practice to do this. If you cannot, you # will need to generate wheels for each Python version that you support. -universal=1 +universal=0 [pycodestyle] max-line-length = 140 statistics = True + diff --git a/setup.py b/setup.py old mode 100755 new mode 100644 index 6068493..cb88312 --- a/setup.py +++ b/setup.py @@ -1,3 +1,9 @@ + +from importlib import metadata from setuptools import setup -setup() +setup( + name="kaitaistruct", + version=metadata.version("kaitaistruct"), +) +