helva-robot/drive/opt/drive-ctl/server.py

92 lines
2.0 KiB
Python

import asyncio
import json
import time
import serial
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import time
import logging
log = logging.getLogger("drive")
logging.basicConfig(level=logging.INFO)
_last_send_t = None
def log_send_timing(note: str = ""):
global _last_send_t
now = time.perf_counter() # monotonic + hochauflösend
if _last_send_t is None:
_last_send_t = now
return
dt_ms = (now - _last_send_t) * 1000.0
_last_send_t = now
# Nur loggen wenn "auffällig"
if dt_ms > 120:
log.warning("SERIAL SEND GAP %.0f ms %s", dt_ms, note)
SERIAL_PORT = "/dev/serial/by-id/usb-1a86_USB2.0-Serial-if00-port0" # ggf. /dev/ttyUSB0
BAUD = 115200
# Safety: if websocket stops sending, we also stop.
STOP_AFTER_SEC = 0.5
app = FastAPI()
ser = serial.Serial(SERIAL_PORT, BAUD, timeout=0)
last_msg_ts = 0.0
def clamp255(v: int) -> int:
return max(-255, min(255, int(v)))
def send_lr(l: int, r: int):
l = clamp255(l)
r = clamp255(r)
line = f"L {l} R {r}\n".encode("ascii")
log_send_timing("(before write)")
ser.write(line)
@app.get("/health")
def health():
return {"ok": True, "serial": SERIAL_PORT}
@app.websocket("/ws")
async def ws(websocket: WebSocket):
global last_msg_ts
await websocket.accept()
last_msg_ts = time.time()
send_lr(0, 0)
async def watchdog():
while True:
await asyncio.sleep(0.05)
if time.time() - last_msg_ts > STOP_AFTER_SEC:
send_lr(0, 0)
wd_task = asyncio.create_task(watchdog())
try:
while True:
msg = await websocket.receive_text()
last_msg_ts = time.time()
try:
data = json.loads(msg)
l = clamp255(data.get("l", 0))
r = clamp255(data.get("r", 0))
send_lr(l, r)
except Exception:
# On parse errors: stop
send_lr(0, 0)
await websocket.send_text("ok")
except WebSocketDisconnect:
send_lr(0, 0)
finally:
wd_task.cancel()