|
| 1 | +import os |
| 2 | +import fcntl |
| 3 | +import ctypes |
| 4 | + |
| 5 | +# I2C constants from /usr/include/linux/i2c-dev.h |
| 6 | +I2C_SLAVE = 0x0703 |
| 7 | +I2C_SLAVE_FORCE = 0x0706 |
| 8 | +I2C_SMBUS = 0x0720 |
| 9 | + |
| 10 | +# SMBus transfer types |
| 11 | +I2C_SMBUS_READ = 1 |
| 12 | +I2C_SMBUS_WRITE = 0 |
| 13 | +I2C_SMBUS_BYTE_DATA = 2 |
| 14 | +I2C_SMBUS_I2C_BLOCK_DATA = 8 |
| 15 | + |
| 16 | +I2C_SMBUS_BLOCK_MAX = 32 |
| 17 | + |
| 18 | + |
| 19 | +class _I2cSmbusData(ctypes.Union): |
| 20 | + _fields_ = [ |
| 21 | + ("byte", ctypes.c_uint8), |
| 22 | + ("word", ctypes.c_uint16), |
| 23 | + ("block", ctypes.c_uint8 * (I2C_SMBUS_BLOCK_MAX + 2)), |
| 24 | + ] |
| 25 | + |
| 26 | + |
| 27 | +class _I2cSmbusIoctlData(ctypes.Structure): |
| 28 | + _fields_ = [ |
| 29 | + ("read_write", ctypes.c_uint8), |
| 30 | + ("command", ctypes.c_uint8), |
| 31 | + ("size", ctypes.c_uint32), |
| 32 | + ("data", ctypes.POINTER(_I2cSmbusData)), |
| 33 | + ] |
| 34 | + |
| 35 | + |
| 36 | +class SMBus: |
| 37 | + def __init__(self, bus: int): |
| 38 | + self._fd = os.open(f'/dev/i2c-{bus}', os.O_RDWR) |
| 39 | + |
| 40 | + def __enter__(self) -> 'SMBus': |
| 41 | + return self |
| 42 | + |
| 43 | + def __exit__(self, *args) -> None: |
| 44 | + self.close() |
| 45 | + |
| 46 | + def close(self) -> None: |
| 47 | + if hasattr(self, '_fd') and self._fd >= 0: |
| 48 | + os.close(self._fd) |
| 49 | + self._fd = -1 |
| 50 | + |
| 51 | + def _set_address(self, addr: int, force: bool = False) -> None: |
| 52 | + ioctl_arg = I2C_SLAVE_FORCE if force else I2C_SLAVE |
| 53 | + fcntl.ioctl(self._fd, ioctl_arg, addr) |
| 54 | + |
| 55 | + def _smbus_access(self, read_write: int, command: int, size: int, data: _I2cSmbusData) -> None: |
| 56 | + ioctl_data = _I2cSmbusIoctlData(read_write, command, size, ctypes.pointer(data)) |
| 57 | + fcntl.ioctl(self._fd, I2C_SMBUS, ioctl_data) |
| 58 | + |
| 59 | + def read_byte_data(self, addr: int, register: int, force: bool = False) -> int: |
| 60 | + self._set_address(addr, force) |
| 61 | + data = _I2cSmbusData() |
| 62 | + self._smbus_access(I2C_SMBUS_READ, register, I2C_SMBUS_BYTE_DATA, data) |
| 63 | + return int(data.byte) |
| 64 | + |
| 65 | + def write_byte_data(self, addr: int, register: int, value: int, force: bool = False) -> None: |
| 66 | + self._set_address(addr, force) |
| 67 | + data = _I2cSmbusData() |
| 68 | + data.byte = value & 0xFF |
| 69 | + self._smbus_access(I2C_SMBUS_WRITE, register, I2C_SMBUS_BYTE_DATA, data) |
| 70 | + |
| 71 | + def read_i2c_block_data(self, addr: int, register: int, length: int, force: bool = False) -> list[int]: |
| 72 | + self._set_address(addr, force) |
| 73 | + if not (0 <= length <= I2C_SMBUS_BLOCK_MAX): |
| 74 | + raise ValueError(f"length must be 0..{I2C_SMBUS_BLOCK_MAX}") |
| 75 | + |
| 76 | + data = _I2cSmbusData() |
| 77 | + data.block[0] = length |
| 78 | + self._smbus_access(I2C_SMBUS_READ, register, I2C_SMBUS_I2C_BLOCK_DATA, data) |
| 79 | + read_len = int(data.block[0]) or length |
| 80 | + read_len = min(read_len, length) |
| 81 | + return [int(b) for b in data.block[1 : read_len + 1]] |
0 commit comments