helva-robot/drive/opt/drive-ctl/server.py

66 lines
1.4 KiB
Python

import asyncio
import json
import time
import serial
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
SERIAL_PORT = "/dev/serial/by-id/usb-1a86_USB2.0-Serial-if00-port0" # ggf. /dev/ttyUSB0
BAUD = 115200
# Safety: if websocket stops sending, we also stop.
STOP_AFTER_SEC = 0.35
app = FastAPI()
ser = serial.Serial(SERIAL_PORT, BAUD, timeout=0)
last_msg_ts = 0.0
def clamp255(v: int) -> int:
return max(-255, min(255, int(v)))
def send_lr(l: int, r: int):
l = clamp255(l)
r = clamp255(r)
line = f"L {l} R {r}\n".encode("ascii")
ser.write(line)
@app.get("/health")
def health():
return {"ok": True, "serial": SERIAL_PORT}
@app.websocket("/drive-ws")
async def ws(websocket: WebSocket):
global last_msg_ts
await websocket.accept()
async def watchdog():
while True:
await asyncio.sleep(0.05)
if time.time() - last_msg_ts > STOP_AFTER_SEC:
send_lr(0, 0)
wd_task = asyncio.create_task(watchdog())
try:
while True:
msg = await websocket.receive_text()
last_msg_ts = time.time()
try:
data = json.loads(msg)
l = clamp255(data.get("l", 0))
r = clamp255(data.get("r", 0))
send_lr(l, r)
except Exception:
# On parse errors: stop
send_lr(0, 0)
await websocket.send_text("ok")
except WebSocketDisconnect:
send_lr(0, 0)
finally:
wd_task.cancel()