Skip to content

robot_agent — FastAPI Runtime

Runtime FastAPI ROS2 Humble async + threads

The runtime tier. A single Python package (~4.2 K LOC across 26 files) that takes any ROS2 robot and exposes it as a network-addressable agent — same behaviour whether driven from the UI, the CLI, or a Python REPL.

What’s inside

robot_agent/robot_agent/
├── app_factory.py FastAPI factory · CORS · NumpyJSONResponse · lifespan
├── runtime.py bootstrap() — single init path for UI / CLI / Python
├── state.py AgentState singleton (DeviceManager, SkillRegistry,
│ UnifiedAgent, ConfigManager)
├── cli.py Console-script entry: `<robot_pkg> find::apple`
├── api/ FastAPI routers (30+ endpoints)
│ ├── skills.py registry CRUD · hot reload · /skill/<name> dispatch
│ ├── connects.py device CRUD · status pings
│ ├── camera.py WebSocket RGB + depth streamer (~20 fps)
│ ├── agent.py WebSocket streaming plan execution
│ ├── diagnostics.py boot errors · skill importability · env snapshot
│ ├── ros.py · buttons.py · skill_configs.py · llm_config.py
└── core/
├── skill_registry.py dual-mode: internal (importlib) + external (HTTP)
├── device_manager.py ROS pub/sub/service/action · WebRTC · TCP · LLM
├── unified_agent.py plan parsing · streaming step events · log capture
├── config_manager.py per-skill overrides · atomic persistence
└── button_manager.py quick-action server-side storage

Three modes from one core

runtime.py
def bootstrap(robot_pkg: str, *, node_name: str | None = None) -> AgentState:
"""Idempotent. Builds the singleton AgentState exactly once per process."""

UI / HTTP

uvicorn <pkg>.main:app --port 8001 → multi-user dashboard, every endpoint is REST + WS.

CLI

kcare_robot find::apple — registered as a console-script; auto-suffixes the rclpy node with _<pid> to avoid clashing with a running UI.

Python API

from kcare_robot.skills.pick import pick; pick('apple')auto_wrap_skills() injects the ROS node on first call.

Skill registry — internal + external in one table

core/skill_registry.py
SkillDef(name='find', type='internal',
module='kcare_robot.skills.recognition', func='find')
SkillDef(name='detect_face', type='external',
url='http://gpu-box:9000/detect',
method='POST', timeout=15)

POST /skill/<name> dispatches identically for both. Internal skills are imported lazily via importlib; external skills round-trip over HTTP.

Heavy vision models live on a GPU host. Light skills live on the robot. Same contract either way.

Device manager — six transport types

ConnectType = Literal['ros_service', 'ros_topic', 'ros_action',
'webrtc', 'tcp', 'llm']

Encode/decode functions are stored as Python source strings in the device config and exec()d at registration time. Adding a new ROS service from the dashboard is one HTTP POST — no code change, no restart.

  • Thread-safe registry (threading.Lock over the connection dict)
  • Atomic persistence — write to .tmp, rotate .bak, replace original
  • Lazy ROS init — one shared CustomNode with 4 callback groups, spun in a daemon thread

HTTP surface

GroupEndpoints
SkillsGET /skills · GET /skills/status · POST /skill/{name} · POST /skills · PUT /skills/{name} · DELETE /skills/{name} · POST /skills/reload
Skill configsGET /skill-configs · GET /skill-configs/{name} · PUT /skill-configs/{name}
DevicesGET /connects · GET /connects/status · POST /connects · PUT /connects/{id} · DELETE /connects/{id} · POST /connects/{id}/set_active
Direct ROS dispatchPOST /agent/{name}/send
ROS discoveryGET /ros/scan
StreamingWS /ws/camera/{id} · WS /ws/agent
Agent / LLMPOST /agent/llm-config · POST /agent/api-key · GET /agent/api-keys
Quick buttonsGET /buttons · POST /buttons · PUT /buttons/{id} · DELETE /buttons/{id} · POST /buttons/reorder · POST /buttons/bulk
DiagnosticsGET /diagnostics · GET /diagnostics/boot

Swagger / OpenAPI live at http://<host>:8001/docs.

Design choices worth defending in an interview

  • No decorators for skill registrationSKILL_CONFIGS is a plain dict[str, tuple[module, func_name]]; static, greppable, no import-time magic, no metaclasses.
  • AgentState is a singleton, not a global — passed explicitly via app.state.agent_state; a process-local _CURRENT ref exists only for callbacks that run off-request (e.g. ROS subscriptions).
  • Encode/decode as code, not configexec() of trusted source strings is the right primitive for transforming ROS messages without inventing a per-message schema language.
  • Atomic persistence everywhere.tmp.bak → final, on every save of skills.json, connects.json, buttons.json, skill_configs.json.
  • One ROS node per process with four callback groups, instead of a node per client — discovers everything, spins once, cleans up in FastAPI’s lifespan shutdown.