Skip to content

Architecture

The system is intentionally layered, not monolithic. Each layer can be swapped — a new dashboard, a new robot, a new perception backend — without touching the others.

The four layers

flowchart TB subgraph Browser["🌐 Browser · Cloudflare Pages"] UI["robotapp<br/>Next.js 14 · TypeScript · Tailwind"] end subgraph Robot["🤖 Robot Host · port 8001"] direction TB AGENT["robot_agent · FastAPI runtime"] REG["SkillRegistry<br/>(internal · external)"] DM["DeviceManager<br/>(6 transports)"] UA["UnifiedAgent<br/>(plan stream)"] AGENT --- REG AGENT --- DM AGENT --- UA end subgraph Skills["🛠️ Skill Package"] KCARE["kcare_robot<br/>23 production skills"] TPL["robot_template<br/>cookiecutter scaffold"] TPL -.->|"generates"| KCARE end subgraph Hardware["⚙️ ROS2 + Hardware"] ROS["ROS2 Humble · rclpy"] ARM["KAAIR 6-DOF cobot"] CAM["RealSense D405<br/>Femto Bolt RGB-D"] BASE["Nav2 mobile base"] VLM["TCP VLM service<br/>GroundingDINO · GroundedSAM"] end UI -->|"REST + WebSocket"| AGENT REG -->|"importlib"| KCARE DM <-->|"actions · topics · services"| ROS ROS --> ARM ROS --> CAM ROS --> BASE DM <-->|"TCP"| VLM classDef ui fill:#eff6ff,stroke:#2563eb,color:#0f172a; classDef rt fill:#ecfdf5,stroke:#059669,color:#0f172a; classDef sk fill:#fef3c7,stroke:#d97706,color:#0f172a; classDef hw fill:#fce7f3,stroke:#db2777,color:#0f172a; class UI ui; class AGENT,REG,DM,UA rt; class KCARE,TPL sk; class ROS,ARM,CAM,BASE,VLM hw;

Why this composition

  • Browser → FastAPI over WebSocket + REST. Anyone can drive a robot from anywhere with internet access. The frontend is static — no server runtime to manage.
  • FastAPI → ROS2 via one shared node. DeviceManager lazy-initialises a single CustomNode with 4 callback groups, spun in a daemon thread. Discovers everything, spins once, cleans up in FastAPI’s lifespan.
  • Skills live in a separate package. robot_agent ships zero hardware-specific code. The contract is one dict:
    SKILL_CONFIGS: dict[str, tuple[module_path, func_name]]
  • Vision is plug-in. Heavy models (GroundingDINO, SAM, mask2grasps) live on a GPU host and are reached over TCP. Light skills live on the robot. The registry treats them identically.

Three execution modes from one core

robot_agent/runtime.py
def bootstrap(robot_pkg: str, *, node_name: str | None = None) -> AgentState:
"""Idempotent. Builds the singleton AgentState exactly once per process."""
flowchart LR subgraph Modes UI["UI<br/>uvicorn ... :8001"] CLI["CLI<br/>kcare_robot find::apple"] PY["Python API<br/>from kcare_robot.skills.pick import pick"] end BS["bootstrap(robot_pkg)<br/>idempotent · singleton"] STATE["AgentState<br/>(DeviceManager, SkillRegistry,<br/>UnifiedAgent, ConfigManager)"] UI --> BS CLI --> BS PY --> BS BS --> STATE STATE --> SK["SKILL_CONFIGS"] STATE --> DEV["devices.json"] classDef m fill:#eff6ff,stroke:#2563eb; classDef core fill:#ecfdf5,stroke:#059669; class UI,CLI,PY m; class BS,STATE core;

CLI auto-suffixes the rclpy node name with _<pid> so it won’t collide with a running UI on the same host. (A single physical robot still accepts commands from any caller — operators must coordinate.)

End-to-end request — “pick the apple”

sequenceDiagram autonumber participant U as User participant FE as robotapp (browser) participant API as robot_agent (FastAPI) participant UA as UnifiedAgent participant SR as SkillRegistry participant SK as kcare_robot.skills.pick participant ROS as ROS2 node participant VLM as TCP VLM service participant ARM as KAAIR cobot U->>FE: types "pick the apple" FE->>API: WS /ws/agent { prompt, lang: "en" } API->>UA: run(prompt) UA-->>FE: { type: "plan", plan: [pick::apple] } UA->>SR: execute("pick", { inputs: "apple" }) SR->>SK: pick(node, inputs="apple") SK->>VLM: GroundingDINO("apple") VLM-->>SK: bbox + mask SK->>SK: attach_3d_features() → XYZ SK->>ROS: /kaair_worker/arm_moveT (approach pose) SK-->>UA: log_image(detection) UA-->>FE: { type: "step_log", image: <base64> } ROS->>ARM: actuate ARM-->>ROS: feedback SK->>ROS: gripper_cmd(close) SK->>SK: grasp_succeed() depth-verify SK-->>UA: { isdone: true } UA-->>FE: { type: "step_done", result: {...} } UA-->>FE: { type: "done" }

The whole flow is one WebSocket per request. The frontend’s PlanPanel.tsx renders each event as it arrives — step status icons, inline detection frames, expandable JSON results.

Layered responsibilities, in one table

LayerOwnsDoesn’t own
robotappUI state, WebSocket decode, depth colormap, multi-robot registrySkills, ROS, hardware
robot_agentSkill dispatch, device transport, plan execution, streaming, persistenceSkill bodies, hardware drivers
kcare_robot / templateSkill implementations, ROS message shapes, calibrationTransport, registry, UI
ROS2 + driversHardware I/O, low-level control loops, sensor publishingApplication logic

Persistence model

Every stateful artefact uses the same atomic-write pattern in robot_agent/core/*:

write(.tmp) → rotate(existing → .bak) → rename(.tmp → final)

Files:

  • skills.json — SkillRegistry CRUD
  • connects.json — DeviceManager device registry
  • buttons.json — quick-action shortcuts
  • skill_configs.json — per-skill overrides

No DB, no migrations. Restart-safe by construction.