Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I2C writeto_then_readfrom in a single transaction on u2if #888

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
57 changes: 57 additions & 0 deletions src/adafruit_blinka/microcontroller/rp2040_u2if/rp2040_u2if.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ class RP2040_u2if:
I2C0_WRITE = 0x82
I2C0_READ = 0x83
I2C0_WRITE_FROM_UART = 0x84
I2C0_WRITE_THEN_READ = 0x85
I2C1_INIT = I2C0_INIT + 0x10
I2C1_DEINIT = I2C0_DEINIT + 0x10
I2C1_WRITE = I2C0_WRITE + 0x10
I2C1_READ = I2C0_READ + 0x10
I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10
I2C1_WRITE_THEN_READ = I2C0_WRITE_THEN_READ + 0x10

# SPI
SPI0_INIT = 0x60
Expand Down Expand Up @@ -81,6 +83,8 @@ def __init__(self):
self._neopixel_initialized = False
self._uart_rx_buffer = None

self.FLAG_I2C_NO_WRITE_THEN_READ_AVAILABLE = False

def _hid_xfer(self, report, response=True):
"""Perform HID Transfer"""
# first byte is report ID, which =0
Expand Down Expand Up @@ -259,6 +263,38 @@ def _i2c_read(self, address, buffer, start=0, end=None):
for i in range(read_size):
buffer[start + i] = resp[i + 2]

def _i2c_write_then_read(
self, address, buffer_w, buffer_r, start_w=0, end_w=None, start_r=0, end_r=None
):
"""Write data to an address, then read data back and into the buffer"""
if self._i2c_index is None:
raise RuntimeError("I2C bus not initialized.")

end_w = end_w if end_w else len(buffer_w)
end_r = end_r if end_r else len(buffer_r)

write_then_read_cmd = (
self.I2C0_WRITE_THEN_READ
if self._i2c_index == 0
else self.I2C1_WRITE_THEN_READ
)

stop_flag = 0x01 # always stop

write_size = end_w - start_w
read_size = end_r - start_r

resp = self._hid_xfer(
bytes([write_then_read_cmd, address, stop_flag, write_size, read_size])
+ buffer_w,
True,
)
if resp[1] != self.RESP_OK:
raise RuntimeError("I2C write error")
# move into buffer
for i in range(read_size):
buffer_r[start_r + i] = resp[i + 2]

def i2c_writeto(self, address, buffer, *, start=0, end=None):
"""Write data from the buffer to an address"""
self._i2c_write(address, buffer, start, end)
Expand All @@ -281,6 +317,27 @@ def i2c_writeto_then_readfrom(
"""Write data from buffer_out to an address and then
read data from an address and into buffer_in
"""
if self._i2c_index is None:
raise RuntimeError("I2C bus not initialized.")

if not self.FLAG_I2C_NO_WRITE_THEN_READ_AVAILABLE:
try:
self._i2c_write_then_read(
address,
out_buffer,
in_buffer,
start_w=out_start,
end_w=out_end,
start_r=in_start,
end_r=in_end,
)
return
except RuntimeError:
# print("DEBUG: SENDING I2C WRITE THEN READ FAILED. SETTING FLAG.")
self.FLAG_I2C_NO_WRITE_THEN_READ_AVAILABLE = True
# print(
# "DEBUG: NO I2C WRITE THEN READ AVAILABLE FLAG SET. SENDING SEPARATELY."
# )
self._i2c_write(address, out_buffer, out_start, out_end, False)
self._i2c_read(address, in_buffer, in_start, in_end)

Expand Down
Loading