picoslave/i2c_handler.py

124 lines
4.5 KiB
Python

import machine
from RP2040_Slave import i2c_slave
# === Packet Definition ===
class Packet:
"""Represents an 8-byte I2C packet."""
LENGTH = 8
def __init__(self, data=None):
if data is None:
data = [0] * self.LENGTH
if len(data) != self.LENGTH:
raise ValueError(f"Packet must be {self.LENGTH} bytes")
self.bytes = data
@classmethod
def from_bytes(cls, byte_list):
"""Create Packet from list of bytes."""
return cls(byte_list[:cls.LENGTH])
def __getitem__(self, idx):
return self.bytes[idx]
def __setitem__(self, idx, value):
self.bytes[idx] = value
def __repr__(self):
fields = [f"Byte {i}: 0x{b:02X}" for i, b in enumerate(self.bytes)]
return "\n".join(fields)
def as_list(self):
return self.bytes
def is_valid(self):
"""Verify checksum (XOR of bytes 0-6 equals byte 7)."""
cs = 0
for b in self.bytes[:-1]:
cs ^= b
return cs == self.bytes[-1]
# === I2C Slave Handler ===
class I2CSlaveHandler:
def __init__(self, i2c_id, sda, scl, slave_addr):
self.s_i2c = i2c_slave(i2c_id, sda=sda, scl=scl, slaveAddress=slave_addr)
self.packet_buf = []
self.response_buf = None
self.response_idx = 0
def handle(self):
state = self.s_i2c.I2CStateMachine.I2C_IDLE
while True:
state = self.s_i2c.handle_event()
if state == self.s_i2c.I2CStateMachine.I2C_RECEIVE:
while self.s_i2c.Available():
byte = self.s_i2c.Read_Data_Received()
self.packet_buf.append(byte)
#print(f"Received byte: 0x{byte:02X}")
# Only process when full packet is received
if len(self.packet_buf) == Packet.LENGTH:
packet = Packet.from_bytes(self.packet_buf)
print("Received 8-byte packet:")
print(packet)
print("Packet valid:", packet.is_valid())
self.process_packet(packet)
self.packet_buf = [] # Clear buffer after processing
if state == self.s_i2c.I2CStateMachine.I2C_FINISH:
if self.packet_buf and len(self.packet_buf) < Packet.LENGTH:
#print(f"Incomplete packet ({len(self.packet_buf)} bytes): {[f'0x{b:02X}' for b in self.packet_buf]}")
print(f"Incomplete packet ({len(self.packet_buf)} bytes): {self.packet_buf}")
self.packet_buf = [] # Clear buffer on incomplete packet
state = self.s_i2c.I2CStateMachine.I2C_IDLE
if state == self.s_i2c.I2CStateMachine.I2C_REQUEST:
print("Master requested data, sending response packet:")
#self.s_i2c.Slave_Write_Data(self.process_request().as_list())
# Prepare response buffer if not set
if self.response_buf is None:
self.response_buf = self.process_request().as_list()
self.response_idx = 0
print("Prepared response packet:", self.response_buf)
# Send next byte
if self.response_idx < Packet.LENGTH:
self.s_i2c.Slave_Write_Data(self.response_buf[self.response_idx])
print("write")
self.response_idx += 1
else:
# Reset response for next transaction
self.response_buf = None
self.response_idx = 0
def process_packet(self, packet):
"""Override this method to process packets."""
# Example: Print command/type and target/channel
cmd = packet[0]
target = packet[1]
print(f"Command/Type: 0x{cmd:02X}, Target/Channel: 0x{target:02X}")
# Add custom logic for your robot here
def process_request(self):
"""Override this method to process requests"""
return Packet.from_bytes([0x12, 0, 0, 0, 0, 0, 0, 0])
def deinit(self):
self.s_i2c.deinit()
# === Example usage for RP2040 ===
def main():
# Change slaveAddress as needed for each RP2040
SLAVE_ADDR = 0x41
handler = I2CSlaveHandler(i2c_id=0, sda=0, scl=1, slave_addr=SLAVE_ADDR)
print(f"I2C Slave started at address 0x{SLAVE_ADDR:02X}")
try:
handler.handle()
except KeyboardInterrupt:
handler.deinit()
print("I2C slave stopped")
if __name__ == "__main__":
main()