wip
This commit is contained in:
parent
3679e3e8e8
commit
65ed4dff1d
2 changed files with 143 additions and 17 deletions
18
main.py
18
main.py
|
@ -40,24 +40,8 @@ def listen_i2c(bus_number, address, register=0x00):
|
||||||
print("Stopped listening.")
|
print("Stopped listening.")
|
||||||
finally:
|
finally:
|
||||||
bus.close()
|
bus.close()
|
||||||
|
print("closed bus")
|
||||||
|
|
||||||
def create_led_color_packet(r, g, b):
|
|
||||||
"""
|
|
||||||
Create an 8-byte packet to set LED color for I2C transfer.
|
|
||||||
Packet format:
|
|
||||||
[0x10, 0x00, R, G, B, 0x00, 0x00, checksum]
|
|
||||||
- 0x10: Command for LED color
|
|
||||||
- 0x00: Target/channel (set as needed)
|
|
||||||
- R, G, B: Color bytes (0-255)
|
|
||||||
- 0x00, 0x00: Unused
|
|
||||||
- checksum: XOR of bytes 0-6
|
|
||||||
"""
|
|
||||||
packet = [0x10, 0x00, r & 0xFF, g & 0xFF, b & 0xFF, 0x00, 0x00]
|
|
||||||
checksum = 0
|
|
||||||
for b in packet:
|
|
||||||
checksum ^= b
|
|
||||||
packet.append(checksum)
|
|
||||||
return bytes(packet)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
if len(sys.argv) < 3:
|
if len(sys.argv) < 3:
|
||||||
|
|
142
newmain.py
Executable file
142
newmain.py
Executable file
|
@ -0,0 +1,142 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
import sys
|
||||||
|
from smbus2 import SMBus, i2c_msg
|
||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
import RPi.GPIO as GPIO
|
||||||
|
|
||||||
|
GPIO.setmode(GPIO.BCM)
|
||||||
|
GPIO.setup(17, GPIO.OUT)
|
||||||
|
|
||||||
|
# --- I2C Packet Creator Functions ---
|
||||||
|
class Packet:
|
||||||
|
LENGTH = 8
|
||||||
|
def __init__(self, data=None):
|
||||||
|
if data is None:
|
||||||
|
data = [0] * self.LENGTH
|
||||||
|
if len(data) != self.LENGTH:
|
||||||
|
raise ValueError(f"Packet must be {self.LENGTH} bytes")
|
||||||
|
self.bytes = list(data)
|
||||||
|
@classmethod
|
||||||
|
def from_bytes(cls, byte_list):
|
||||||
|
return cls(list(byte_list[:cls.LENGTH]))
|
||||||
|
def as_list(self):
|
||||||
|
return self.bytes
|
||||||
|
|
||||||
|
def create_motor_state_packet(fl=None, fr=None, bl=None, br=None):
|
||||||
|
def getByteForState(state):
|
||||||
|
if state is None: return 0x00
|
||||||
|
return 0x01 if state else 0x02
|
||||||
|
packet = [
|
||||||
|
0x01, 0x00,
|
||||||
|
getByteForState(fl),
|
||||||
|
getByteForState(fr),
|
||||||
|
getByteForState(bl),
|
||||||
|
getByteForState(br),
|
||||||
|
0x00
|
||||||
|
]
|
||||||
|
checksum = 0
|
||||||
|
for b in packet:
|
||||||
|
checksum ^= b
|
||||||
|
packet.append(checksum)
|
||||||
|
return Packet.from_bytes(packet)
|
||||||
|
|
||||||
|
def create_tail_wagging_packet(wagging=False):
|
||||||
|
packet = [0x02, 0x00, 0x01 if wagging else 0x00, 0x00, 0x00, 0x00, 0x00]
|
||||||
|
checksum = 0
|
||||||
|
for b in packet:
|
||||||
|
checksum ^= b
|
||||||
|
packet.append(checksum)
|
||||||
|
return Packet.from_bytes(packet)
|
||||||
|
|
||||||
|
def create_set_led_mode(mode=0):
|
||||||
|
packet = [0x02, 0x00, mode & 0xFF, 0x00, 0x00, 0x00, 0x00]
|
||||||
|
checksum = 0
|
||||||
|
for b in packet:
|
||||||
|
checksum ^= b
|
||||||
|
packet.append(checksum)
|
||||||
|
return Packet.from_bytes(packet)
|
||||||
|
|
||||||
|
# --- MQTT & I2C Integration ---
|
||||||
|
MQTT_BROKER = "192.168.188.30"
|
||||||
|
MQTT_PORT = 1883
|
||||||
|
I2C_BUS = 1
|
||||||
|
|
||||||
|
BUTTON_TOPICS = {
|
||||||
|
"drive_forward": "home/buttons/drive_forward",
|
||||||
|
"drive_backward": "home/buttons/drive_backward",
|
||||||
|
"turn_left": "home/buttons/turn_left",
|
||||||
|
"turn_right": "home/buttons/turn_right"
|
||||||
|
}
|
||||||
|
SWITCH_TOPIC = "home/switch/wag_tail"
|
||||||
|
SELECT_TOPIC = "home/select/led_mode"
|
||||||
|
|
||||||
|
LED_MODE_MAP = {
|
||||||
|
"standard": 0,
|
||||||
|
"rot": 1,
|
||||||
|
"rainbow": 2
|
||||||
|
}
|
||||||
|
|
||||||
|
def send_i2c_packet(packet: Packet, address):
|
||||||
|
with SMBus(I2C_BUS) as bus:
|
||||||
|
msg = i2c_msg.write(address, packet.as_list())
|
||||||
|
bus.i2c_rdwr(msg)
|
||||||
|
print(f"Sent I2C packet: {packet.as_list()}")
|
||||||
|
|
||||||
|
def on_connect(client, userdata, flags, rc):
|
||||||
|
print("Connected to MQTT broker")
|
||||||
|
for topic in BUTTON_TOPICS.values():
|
||||||
|
client.subscribe(topic)
|
||||||
|
print(f"Subscribed to {topic}")
|
||||||
|
client.subscribe(SWITCH_TOPIC)
|
||||||
|
print(f"Subscribed to {SWITCH_TOPIC}")
|
||||||
|
client.subscribe(SELECT_TOPIC)
|
||||||
|
print(f"Subscribed to {SELECT_TOPIC}")
|
||||||
|
|
||||||
|
def on_message(client, userdata, msg):
|
||||||
|
print(f"MQTT: {msg.topic} - {msg.payload}")
|
||||||
|
payload = msg.payload.decode().lower()
|
||||||
|
# Handle buttons
|
||||||
|
if msg.topic == BUTTON_TOPICS["drive_forward"] and payload == "on":
|
||||||
|
packet = create_motor_state_packet(fl=True, fr=True, bl=True, br=True)
|
||||||
|
send_i2c_packet(packet, 0x41)
|
||||||
|
elif msg.topic == BUTTON_TOPICS["drive_backward"] and payload == "on":
|
||||||
|
packet = create_motor_state_packet(fl=False, fr=False, bl=False, br=False)
|
||||||
|
send_i2c_packet(packet, 0x41)
|
||||||
|
elif msg.topic == BUTTON_TOPICS["turn_left"] and payload == "on":
|
||||||
|
packet = create_motor_state_packet(fl=False, fr=True, bl=False, br=True)
|
||||||
|
send_i2c_packet(packet, 0x41)
|
||||||
|
elif msg.topic == BUTTON_TOPICS["turn_right"] and payload == "on":
|
||||||
|
packet = create_motor_state_packet(fl=True, fr=False, bl=True, br=False)
|
||||||
|
send_i2c_packet(packet, 0x41)
|
||||||
|
elif msg.topic in BUTTON_TOPICS.values() and payload == "off":
|
||||||
|
send_i2c_packet(create_motor_state_packet(), 0x41)
|
||||||
|
# Handle wag tail switch: expects "ON" or "OFF"
|
||||||
|
elif msg.topic == SWITCH_TOPIC:
|
||||||
|
# wag = payload == "on"
|
||||||
|
# if wag:
|
||||||
|
# GPIO.output(17, True)
|
||||||
|
# else:
|
||||||
|
# GPIO.output(17, False)
|
||||||
|
packet = create_tail_wagging_packet(payload == "on")
|
||||||
|
send_i2c_packet(packet, 0x43)
|
||||||
|
# Handle led mode select: expects "standard", "rot", "rainbow"
|
||||||
|
elif msg.topic == SELECT_TOPIC:
|
||||||
|
if payload in LED_MODE_MAP:
|
||||||
|
mode = LED_MODE_MAP[payload]
|
||||||
|
packet = create_set_led_mode(mode)
|
||||||
|
send_i2c_packet(packet, 0x03)
|
||||||
|
print(f"LED mode set to {payload} ({mode})")
|
||||||
|
else:
|
||||||
|
print(f"Unknown LED mode: {payload}")
|
||||||
|
|
||||||
|
def main():
|
||||||
|
client = mqtt.Client()
|
||||||
|
client.on_connect = on_connect
|
||||||
|
client.on_message = on_message
|
||||||
|
client.username_pw_set("mqtt", "mqtt")
|
||||||
|
client.connect(MQTT_BROKER, MQTT_PORT, 60)
|
||||||
|
print("MQTT client started. Waiting for button/switch/select events...")
|
||||||
|
client.loop_forever()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
Loading…
Add table
Add a link
Reference in a new issue