import machine from RP2040_Slave import i2c_slave # === Packet Definition === class Packet: """Represents an 8-byte I2C packet.""" LENGTH = 8 def __init__(self, data=None): if data is None: data = [0] * self.LENGTH if len(data) != self.LENGTH: raise ValueError(f"Packet must be {self.LENGTH} bytes") self.bytes = data @classmethod def from_bytes(cls, byte_list): """Create Packet from list of bytes.""" return cls(byte_list[:cls.LENGTH]) def __getitem__(self, idx): return self.bytes[idx] def __setitem__(self, idx, value): self.bytes[idx] = value def __repr__(self): fields = [f"Byte {i}: 0x{b:02X}" for i, b in enumerate(self.bytes)] return "\n".join(fields) def as_list(self): return self.bytes def is_valid(self): """Verify checksum (XOR of bytes 0-6 equals byte 7).""" cs = 0 for b in self.bytes[:-1]: cs ^= b return cs == self.bytes[-1] # === I2C Slave Handler === class I2CSlaveHandler: def __init__(self, i2c_id, sda, scl, slave_addr): self.s_i2c = i2c_slave(i2c_id, sda=sda, scl=scl, slaveAddress=slave_addr) self.packet_buf = [] self.response_buf = None self.response_idx = 0 def update(self): pass def handle(self): state = self.s_i2c.I2CStateMachine.I2C_IDLE while True: state = self.s_i2c.handle_event() if state == self.s_i2c.I2CStateMachine.I2C_RECEIVE: while self.s_i2c.Available(): byte = self.s_i2c.Read_Data_Received() self.packet_buf.append(byte) #print(f"Received byte: 0x{byte:02X}") # Only process when full packet is received if len(self.packet_buf) == Packet.LENGTH: packet = Packet.from_bytes(self.packet_buf) print("Received 8-byte packet:") print(packet) print("Packet valid:", packet.is_valid()) self.process_packet(packet) self.packet_buf = [] # Clear buffer after processing if state == self.s_i2c.I2CStateMachine.I2C_FINISH: if self.packet_buf and len(self.packet_buf) < Packet.LENGTH: #print(f"Incomplete packet ({len(self.packet_buf)} bytes): {[f'0x{b:02X}' for b in self.packet_buf]}") print(f"Incomplete packet ({len(self.packet_buf)} bytes): {self.packet_buf}") self.packet_buf = [] # Clear buffer on incomplete packet state = self.s_i2c.I2CStateMachine.I2C_IDLE if state == self.s_i2c.I2CStateMachine.I2C_REQUEST: print("Master requested data, sending response packet:") #self.s_i2c.Slave_Write_Data(self.process_request().as_list()) # Prepare response buffer if not set if self.response_buf is None: self.response_buf = self.process_request().as_list() self.response_idx = 0 print("Prepared response packet:", self.response_buf) # Send next byte if self.response_idx < Packet.LENGTH: self.s_i2c.Slave_Write_Data(self.response_buf[self.response_idx]) print("write") self.response_idx += 1 else: # Reset response for next transaction self.response_buf = None self.response_idx = 0 self.update() def process_packet(self, packet): """Override this method to process packets.""" # Example: Print command/type and target/channel cmd = packet[0] target = packet[1] print(f"Command/Type: 0x{cmd:02X}, Target/Channel: 0x{target:02X}") # Add custom logic for your robot here def process_request(self): """Override this method to process requests""" return Packet.from_bytes([0x12, 0, 0, 0, 0, 0, 0, 0]) def deinit(self): self.s_i2c.deinit() # === Example usage for RP2040 === def main(): # Change slaveAddress as needed for each RP2040 SLAVE_ADDR = 0x41 handler = I2CSlaveHandler(i2c_id=0, sda=0, scl=1, slave_addr=SLAVE_ADDR) print(f"I2C Slave started at address 0x{SLAVE_ADDR:02X}") try: handler.handle() except KeyboardInterrupt: handler.deinit() print("I2C slave stopped") if __name__ == "__main__": main()