import asyncio import json import time import serial from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse SERIAL_PORT = "/dev/ttyACM0" # ggf. /dev/ttyUSB0 BAUD = 115200 # Safety: if websocket stops sending, we also stop. STOP_AFTER_SEC = 0.35 app = FastAPI() ser = serial.Serial(SERIAL_PORT, BAUD, timeout=0) last_msg_ts = 0.0 def clamp255(v: int) -> int: return max(-255, min(255, int(v))) def send_lr(l: int, r: int): l = clamp255(l) r = clamp255(r) line = f"L {l} R {r}\n".encode("ascii") ser.write(line) @app.get("/health") def health(): return {"ok": True, "serial": SERIAL_PORT} @app.websocket("/ws") async def ws(websocket: WebSocket): global last_msg_ts await websocket.accept() async def watchdog(): while True: await asyncio.sleep(0.05) if time.time() - last_msg_ts > STOP_AFTER_SEC: send_lr(0, 0) wd_task = asyncio.create_task(watchdog()) try: while True: msg = await websocket.receive_text() last_msg_ts = time.time() try: data = json.loads(msg) l = clamp255(data.get("l", 0)) r = clamp255(data.get("r", 0)) send_lr(l, r) except Exception: # On parse errors: stop send_lr(0, 0) await websocket.send_text("ok") except WebSocketDisconnect: send_lr(0, 0) finally: wd_task.cancel()