Skip to content

Streaming Protocol

Two WebSocket endpoints carry the real-time surface of the system:

EndpointDirectionCarries
WS /ws/camera/{connect_id}bidirectionallive RGB-D frames + per-stream depth settings
WS /ws/agentserver → clienttask plan + step lifecycle events

Camera stream — /ws/camera/{id}

Source: robot_agent/api/camera.py. ~250 lines, the file worth opening.

Frame lifecycle

sequenceDiagram autonumber participant FE as Browser (CameraFeed.tsx) participant WS as FastAPI WS participant ST as Stream thread participant ROS as ROS subscriber participant CAM as RealSense / Femto Bolt FE->>WS: open WS /ws/camera/head_rgb WS->>ST: start thread (Event + Thread) ST->>ROS: subscribe topic CAM-->>ROS: image_raw / compressedDepth ROS-->>ST: latest frame ST->>WS: asyncio.run_coroutine_threadsafe(send) WS-->>FE: { mode: "rgb"|"raw", w, h, raw: <base64> } Note over FE: depth: zlib-inflate uint16 → JET colormap → canvas FE->>WS: { depth_mode: "colored", dmin: 200, dmax: 1500 } Note over ST: mutates per-stream settings, read on next emit FE->>WS: close WS->>ST: signal Event → join

Why two depth modes?

ModeWire formatUse case
coloredTURBO-colormap JPEGdashboard, demos — small, pretty, but lossy
rawzlib(uint16)pixel-accurate mm hover, frame analysis, RoIs

The backend auto-ranges depth via 2nd/98th percentile clipping so the colormap stays useful regardless of scene depth.

Polymorphic frame handling

The same endpoint accepts:

  • ROS sensor_msgs/Image16uc1, 32fc1 encodings
  • ROS sensor_msgs/CompressedImage — with or without the 12-byte compressedDepth header
  • raw numpy arrays (for WebRTC ingest paths)
  • dicts of {rgb, depth} (for skills emitting log_image)

_encode_frame() in camera.py is the funnel.

Plan stream — /ws/agent

The agent endpoint accepts one JSON message:

{
"prompt": "pick the apple",
"lang": "en",
"direct": false
}

…and emits a typed event stream:

sequenceDiagram participant FE as Browser participant WS as FastAPI WS participant UA as UnifiedAgent participant LLM as LLM planner participant SK as Skill thread FE->>WS: { prompt, lang, direct } WS-->>FE: { type: "start" } WS-->>FE: { type: "status", msg: "thinking..." } WS->>LLM: prompt + guide LLM-->>WS: plan (raw text) WS-->>FE: { type: "plan_raw", text: "..." } WS-->>FE: { type: "plan", steps: [...] } loop for each step WS->>SK: skill(node, **params) WS-->>FE: { type: "step_start", idx, name, params } SK-->>WS: log_image bytes WS-->>FE: { type: "step_log", image: <base64 JPEG> } SK-->>WS: { isdone: true, ... } WS-->>FE: { type: "step_done", idx, result } end WS-->>FE: { type: "done" }

Event types

TypeWhenPayload
startWS opened{}
statusmid-planning chatter{ msg }
translatednon-EN prompt → EN{ original, translated }
plan_rawLLM raw output{ text }
planparsed plan{ steps: [{name, params}, ...] }
step_starta step begins{ idx, name, params }
step_logmid-skill log image{ idx, image }
step_donea step ends{ idx, result }
doneplan complete{}
errorany failure{ msg, traceback? }

Why a typed event stream instead of polling?

  • Low latency: events are pushed as they happen, no fixed tick.
  • Cheap rendering: the browser only invalidates the affected step row.
  • No state drift: the server is authoritative; the client reflects.

Bridging sync and async

The hardest part of building the streaming layer is that ROS callbacks fire on background threads while FastAPI WebSocket sends are async. robot_agent handles this with one idiom, used in both camera and agent:

# inside the background skill / stream thread
asyncio.run_coroutine_threadsafe(
websocket.send_json(event),
main_loop # captured at WS-open time
)

This is the bridge between the rclpy executor’s thread pool and the FastAPI event loop. No asyncio.run_in_executor ping-pong, no manual queue, no lost frames.

JSON serialisation — the corner everyone hits

You can’t json.dumps a numpy array. You also can’t json.dumps a PIL Image, an np.float32(NaN), or a bytes buffer. robot_agent solves this once, in two places:

  • NumpyJSONResponse (app_factory.py) — custom FastAPI response class that handles .item(), .tolist(), and bytes→UTF8 for every REST endpoint
  • _serialize_result() (core/unified_agent.py) — flattens skill return values for the WebSocket stream: scalars, NaN/Inf, PIL Images, oversized arrays all become JSON-safe