Teleoperation & Demonstration Guide

Complete guide to building a teleoperation system for collecting high-quality VLA training data

Overview

Teleoperation is the foundation of data collection for Vision-Language-Action (VLA) models. Before your robot can learn to perform tasks autonomously, you need high-quality demonstrations showing how to do them.

This guide covers multiple approaches to teleoperation, from simple keyboard control to professional gamepads, so you can start collecting demonstrations regardless of your budget.

By the end of this guide, you'll have:

  • ✅ A working teleoperation system for your robot
  • ✅ Keyboard and/or gamepad control configured
  • ✅ Synchronized data recording (camera + actions)
  • ✅ Data exported in LeRobot-compatible format

System Architecture

A complete teleoperation data collection system consists of these core components:

Input Device

Keyboard, gamepad, or joystick for human control

Teleop Server

Reads inputs and computes target joint positions

Robot Driver

Executes motion commands on hardware

Camera System

Captures synchronized visual observations

Data Recorder

Saves episodes to dataset format

Monitoring

Real-time visualization and debugging

Recommended Hardware

Component Recommendation Price Range
Input Device Xbox One Controller / PlayStation DualSense $50-$70
Backup Keyboard (any) $0 (already have)
Computer Jetson Orin Nano 8GB $300-$400
Cameras 2-3 x Logitech C920/C930e $60-$100 each

Option 1: Keyboard Teleoperation

Keyboard teleoperation is the easiest to get started - you probably already have a keyboard. It's great for:

  • Beginner-friendly quick tests
  • Simple navigation tasks (differential drive)
  • When you don't have a gamepad handy

Implementation with Pynput

import threading
import time
from dataclasses import dataclass
from typing import Optional
from pynput import keyboard

@dataclass
class VelocityCommand:
    linear_x: float  # forward/backward [-1, 1]
    angular_z: float  # rotation [-1, 1]

class KeyboardTeleop:
    def __init__(self, linear_speed=0.5, angular_speed=1.0):
        self.linear_speed = linear_speed
        self.angular_speed = angular_speed
        self.cmd = VelocityCommand(0.0, 0.0)
        self.running = True

        # Track key states
        self.pressed_keys = set()

        # Start listener in separate thread
        self.listener = keyboard.Listener(
            on_press=self.on_press,
            on_release=self.on_release
        )
        self.listener.start()

        # Update thread
        self.update_thread = threading.Thread(target=self.update_loop)
        self.update_thread.daemon = True
        self.update_thread.start()

    def on_press(self, key):
        try:
            self.pressed_keys.add(key.char)
        except AttributeError:
            self.pressed_keys.add(key)

    def on_release(self, key):
        try:
            if key.char in self.pressed_keys:
                self.pressed_keys.remove(key.char)
        except AttributeError:
            if key in self.pressed_keys:
                self.pressed_keys.remove(key)

        # Exit on ESC
        if key == keyboard.Key.esc:
            self.running = False
            return False

    def update_loop(self):
        while self.running:
            linear = 0.0
            angular = 0.0

            # WASD controls
            if 'w' in self.pressed_keys:
                linear += self.linear_speed
            if 's' in self.pressed_keys:
                linear -= self.linear_speed
            if 'a' in self.pressed_keys:
                angular += self.angular_speed
            if 'd' in self.pressed_keys:
                angular -= self.angular_speed

            self.cmd = VelocityCommand(linear, angular)
            time.sleep(0.01)

    def get_command(self) -> VelocityCommand:
        return self.cmd

    def stop(self):
        self.running = False
        self.listener.stop()
        self.update_thread.join()

# Usage example
if __name__ == "__main__":
    teleop = KeyboardTeleop()

    try:
        while teleop.running:
            cmd = teleop.get_command()
            print(f"Linear: {cmd.linear_x:.2f}, Angular: {cmd.angular_z:.2f}")
            time.sleep(0.1)
    except KeyboardInterrupt:
        pass
    finally:
        teleop.stop()

💡 Controls: Use W forward, S backward, A turn left, D turn right. Press ESC to exit.

Option 2: Gamepad Teleoperation

Gamepads provide much better ergonomic control for longer data collection sessions. We recommend using inputs library which works with most gamepads on Linux.

Setup

pip install inputs

# Check if your gamepad is detected
ls -la /dev/input/
# Should show js0 (joystick device)

Implementation

from inputs import get_gamepad
import threading
import time
from dataclasses import dataclass
from typing import Optional

@dataclass
class GamepadCommand:
    left_stick_x: float  # [-1, 1]
    left_stick_y: float  # [-1, 1]
    right_stick_x: float  # [-1, 1]
    right_stick_y: float  # [-1, 1]
    buttons: dict[str, bool]

class GamepadTeleop:
    def __init__(self, deadzone=0.1):
        self.deadzone = deadzone
        self.left_x = 0.0
        self.left_y = 0.0
        self.right_x = 0.0
        self.right_y = 0.0
        self.buttons = {
            'A': False, 'B': False, 'X': False, 'Y': False,
            'LB': False, 'RB': False, 'START': False, 'BACK': False
        }
        self.running = True
        self.cmd_lock = threading.Lock()

        # Start reading thread
        self.read_thread = threading.Thread(target=self.read_loop)
        self.read_thread.daemon = True
        self.read_thread.start()

    def apply_deadzone(self, value):
        """Filter out small joystick drift"""
        if abs(value) < self.deadzone:
            return 0.0
        return value

    def read_loop(self):
        """Background thread that continuously reads gamepad events"""
        while self.running:
            try:
                events = get_gamepad()
                with self.cmd_lock:
                    for event in events:
                        if event.code == 'ABS_X':
                            self.left_x = self.apply_deadzone(event.state / 32768.0)
                        elif event.code == 'ABS_Y':
                            self.left_y = self.apply_deadzone(event.state / 32768.0)
                        elif event.code == 'ABS_RX':
                            self.right_x = self.apply_deadzone(event.state / 32768.0)
                        elif event.code == 'ABS_RY':
                            self.right_y = self.apply_deadzone(event.state / 32768.0)
                        elif event.code == 'BTN_SOUTH':
                            self.buttons['A'] = event.state == 1
                        elif event.code == 'BTN_EAST':
                            self.buttons['B'] = event.state == 1
                        elif event.code == 'BTN_NORTH':
                            self.buttons['Y'] = event.state == 1
                        elif event.code == 'BTN_WEST':
                            self.buttons['X'] = event.state == 1
            except Exception:
                # Gamepad disconnected
                time.sleep(0.1)

    def get_velocity_command(self) -> tuple[float, float]:
        """Get differential drive velocity command from left stick"""
        with self.cmd_lock:
            # Left stick Y -> linear velocity
            linear = -self.left_y  # Negative because Y is up=negative
            # Left stick X -> angular velocity
            angular = -self.left_x

            # Scale to reasonable speeds
            linear = linear * 0.5   # max 0.5 m/s
            angular = angular * 1.5  # max 1.5 rad/s

            return linear, angular

    def get_command(self) -> GamepadCommand:
        """Get full gamepad state"""
        with self.cmd_lock:
            return GamepadCommand(
                left_stick_x=self.left_x,
                left_stick_y=self.left_y,
                right_stick_x=self.right_x,
                right_stick_y=self.right_y,
                buttons=self.buttons.copy()
            )

    def stop(self):
        self.running = False
        self.read_thread.join()

# Usage example
if __name__ == "__main__":
    teleop = GamepadTeleop()

    try:
        while True:
            linear, angular = teleop.get_velocity_command()
            cmd = teleop.get_command()
            print(f"Linear: {linear:.2f}, Angular: {angular:.2f}, A: {cmd.buttons['A']}")
            time.sleep(0.1)
    except KeyboardInterrupt:
        pass
    finally:
        teleop.stop()

Recommended Control Mapping (Differential Drive)

  • Left Stick Forward/Backward: Linear velocity
  • Left Stick Left/Right: Angular velocity (steering)
  • Right Stick: Optional - camera pan/tilt if you have a gimbal
  • A Button: Start recording episode
  • B Button: Stop recording episode
  • X Button: Save/discard current episode

💡 Pro Tip: For manipulation tasks, use left stick for X/Y translation and right stick for Z rotation. Second stick axis for Z translation.

Data Recording

Once you have teleoperation working, the next step is to record synchronized demonstrations. Each demonstration is called an "episode" - typically one complete attempt at a task.

What to Record

For VLA training, you need to record at minimum:

  • Images from each camera (synchronized)
  • Action (joint positions / velocities) at each timestep
  • Metadata: episode index, task description, success/failure

Implementation

import cv2
import numpy as np
import json
import h5py
import os
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import List, Optional

from teleop import GamepadTeleop
from cameras import SyncedMultiCamera

@dataclass
class EpisodeFrame:
    timestamp: float
    images: dict[str, np.ndarray]  # camera_name -> image
    actions: dict[str, np.ndarray]  # joint/velocity actions
    feedback: dict  # robot state feedback

class DataRecorder:
    def __init__(self, output_dir: str, task_name: str):
        self.output_dir = output_dir
        self.task_name = task_name
        self.current_episode: List[EpisodeFrame] = []
        self.is_recording = False
        self.episode_idx = self._get_next_episode_idx()

        # Create output directory
        os.makedirs(output_dir, exist_ok=True)

    def _get_next_episode_idx(self) -> int:
        """Find next available episode index"""
        existing = [int(f.split('_')[1]) for f in os.listdir(self.output_dir)
                   if f.startswith('episode_') and f.endswith('.hdf5')]
        if not existing:
            return 0
        return max(existing) + 1

    def start_episode(self):
        """Start a new demonstration episode"""
        self.current_episode = []
        self.is_recording = True
        print(f"Starting episode {self.episode_idx}")

    def record_frame(self, frame: EpisodeFrame):
        """Add a frame to current episode if recording"""
        if self.is_recording:
            self.current_episode.append(frame)

    def end_episode(self, success: bool = True):
        """Save current episode to disk"""
        if not self.is_recording:
            return

        self.is_recording = False

        if len(self.current_episode) == 0:
            print("Empty episode, skipping...")
            return

        # Save episode to HDF5
        filename = f"episode_{self.episode_idx:04d}.hdf5"
        filepath = os.path.join(self.output_dir, filename)

        with h5py.File(filepath, 'w') as f:
            # Store metadata
            f.attrs['episode_idx'] = self.episode_idx
            f.attrs['task_name'] = self.task_name
            f.attrs['success'] = success
            f.attrs['num_frames'] = len(self.current_episode)
            f.attrs['timestamp'] = datetime.now().isoformat()

            # Store each frame
            for i, frame in enumerate(self.current_episode):
                group = f.create_group(f'frame_{i:04d}')
                group.attrs['timestamp'] = frame.timestamp

                # Save images
                img_group = group.create_group('images')
                for cam_name, img in frame.images.items():
                    img_group.create_dataset(
                        cam_name,
                        data=img,
                        compression='gzip',
                        compression_opts=9
                    )

                # Save actions
                action_group = group.create_group('actions')
                for act_name, act in frame.actions.items():
                    action_group.create_dataset(act_name, data=act)

                # Save feedback
                fb_group = group.create_group('feedback')
                for fb_name, fb_val in frame.feedback.items():
                    fb_group.attrs[fb_name] = fb_val

        # Save metadata JSON
        meta_path = os.path.join(self.output_dir, f"episode_{self.episode_idx:04d}.json")
        meta = {
            'episode_idx': self.episode_idx,
            'task_name': self.task_name,
            'success': success,
            'num_frames': len(self.current_episode),
            'duration_seconds': self.current_episode[-1].timestamp - self.current_episode[0].timestamp,
            'created_at': datetime.now().isoformat()
        }

        with open(meta_path, 'w') as f:
            json.dump(meta, f, indent=2)

        print(f"Saved episode {self.episode_idx} with {len(self.current_episode)} frames")
        self.episode_idx += 1
        self.current_episode = []

    def stop(self):
        if self.is_recording:
            self.end_episode(success=False)

# Main recording loop example
if __name__ == "__main__":
    # Initialize components
    teleop = GamepadTeleop()
    cameras = SyncedMultiCamera()
    recorder = DataRecorder(
        output_dir="./data/my_first_task",
        task_name="pick_and_place_box"
    )

    # Initialize your robot driver here...
    # robot = RobotDriver()

    try:
        recording = False

        while True:
            cmd = teleop.get_command()
            linear, angular = teleop.get_velocity_command()

            # Check buttons for recording control
            if cmd.buttons['A'] and not recording:
                recorder.start_episode()
                recording = True
                time.sleep(0.3)  # Debounce

            if cmd.buttons['B'] and recording:
                recorder.end_episode(success=True)
                recording = False
                time.sleep(0.3)  # Debounce

            # Get synchronized camera frames
            synced_frames = cameras.capture_synced()

            # Get current robot action/state
            # current_joints = robot.get_joint_positions()

            # Create frame for recording
            frame = EpisodeFrame(
                timestamp=time.time(),
                images={f"cam_{i}": img for i, (name, img) in enumerate(synced_frames.frames.items())},
                actions={'linear_vel': np.array([linear]), 'angular_vel': np.array([angular])},
                feedback={}  # Add joint positions, etc. here
            )

            # Record if active
            if recording:
                recorder.record_frame(frame)

            # Send command to robot
            # robot.send_velocity(linear, angular)

            time.sleep(1.0 / 30.0)  # 30 FPS

    except KeyboardInterrupt:
        print("\nShutting down...")
    finally:
        recorder.stop()
        teleop.stop()
        cameras.stop()

LeRobot Framework Integration

XRollout is fully compatible with the LeRobot framework from Hugging Face. Converting your recorded data to LeRobot format is straightforward:

import lerobot
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
import h5py
import os
import cv2

# Create LeRobot dataset
dataset = LeRobotDataset.create(
    repo_id="xrollout/my_pick_and_place",
    root="./data/lerobot_format",
    fps=30,
    robot_type="my_robot",
    features={
        "image_front": {
            "dtype": "image",
            "shape": (480, 640, 3),
        },
        "image_top": {
            "dtype": "image",
            "shape": (480, 640, 3),
        },
        "action": {
            "dtype": "float32",
            "shape": (2,),  # linear, angular
        },
        "state": {
            "dtype": "float32",
            "shape": (2,),  # wheel positions
        }
    },
)

# Convert each episode
data_dir = "./data/my_first_task"
episode_files = sorted([f for f in os.listdir(data_dir) if f.endswith('.hdf5')])

for episode_file in episode_files:
    print(f"Converting {episode_file}...")

    with h5py.File(os.path.join(data_dir, episode_file), 'r') as f:
        num_frames = f.attrs['num_frames']
        success = f.attrs['success']

        if not success:
            continue  # Skip failed episodes

        # Create new episode in LeRobot format
        episode = []
        for i in range(num_frames):
            frame_group = f[f'frame_{i:04d}']

            # Get images (LeRobot handles saving to disk)
            images = {}
            for cam_name in frame_group['images'].keys():
                img = frame_group['images'][cam_name][:]
                # Convert BGR to RGB if needed
                if img.shape[2] == 3:
                    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                images[f"image_{cam_name}"] = img

            # Get actions
            actions = []
            for act_name in frame_group['actions'].keys():
                actions.append(frame_group['actions'][act_name][()])
            action = np.concatenate(actions)

            # Add frame to episode
            frame_dict = {
                **images,
                "action": action,
                "state": action,  # For diffusion inference, we need current state
            }
            episode.append(frame_dict)

        # Add entire episode to dataset
        if len(episode) > 0:
            dataset.add_episode(episode)

# Save dataset info
dataset.save()
print(f"Converted dataset saved to {dataset.root}")

# Optionally push to Hugging Face Hub
# dataset.push_to_hub()

📚 Learn more: Check out the LeRobot documentation for advanced features like visual prompting and model training.

Best Practices for High-Quality Data

1. Lighting is Everything

Good lighting dramatically improves model performance. Use diffuse, even lighting - avoid harsh shadows and glare on reflective surfaces. If possible, add a ring light around your primary camera.

2. Record Both Successful and Failed Attempts

Don't just throw away failed demonstrations. Including failures helps the model learn what not to do, improving robustness. Mark them in metadata and the model will learn from them.

3. Vary Your Demonstrations

Don't demonstrate the exact same motion every time. Vary:

  • Starting positions
  • Object locations
  • Speed of execution
  • Minor corrections along the way

This gives the model better generalization.

4. How Many Episodes Do You Need?

Task Complexity Minimum Episodes Good Results At
Simple navigation 20-50 100
Pick & place (single object) 50-100 200
Complex manipulation 100-200 500+

5. Review Your Data Regularly

Every 50 episodes, spot-check a few to make sure:

  • Images are correctly exposed and in focus
  • Actions are being recorded correctly
  • Timestamps are synchronized within acceptable tolerance

⚠️ Common Pitfall: Unsynchronized data where the image doesn't match the action is the #1 cause of poor model performance. Always double-check synchronization.

Troubleshooting

Gamepad not detected on Linux

# Check if kernel sees the device
dmesg | grep -i "input\|joy"

# Check permissions
ls -la /dev/input/js0

# Add your user to input group
sudo usermod -aG input $USER
# You need to log out and back in

Input lag

  • Make sure you're running on the robot's onboard computer (not remote control over WiFi)
  • Use lower resolution cameras (640x480 is enough for most tasks)
  • Check that compression isn't causing CPU bottlenecks

Jittery motion

  • Increase deadzone to filter out joystick drift
  • Add a simple moving average filter to commands
  • Check servo voltage - brownouts cause unstable motion

💡 Final Tip: Start small - collect 10 episodes, train a tiny model, see if it works. Then scale up. It's much better to iterate quickly than collect 500 episodes before discovering an issue.

Summary

Teleoperation data collection is the foundation of any VLA project. With the setup described in this guide, you can:

  • ✅ Control your robot with keyboard or gamepad
  • ✅ Record synchronized camera images and actions
  • ✅ Convert data to LeRobot format for training
  • ✅ Collect high-quality demonstrations that result in better models