Overview
Teleoperation is the foundation of data collection for Vision-Language-Action (VLA) models. Before your robot can learn to perform tasks autonomously, you need high-quality demonstrations showing how to do them.
This guide covers multiple approaches to teleoperation, from simple keyboard control to professional gamepads, so you can start collecting demonstrations regardless of your budget.
By the end of this guide, you'll have:
- ✅ A working teleoperation system for your robot
- ✅ Keyboard and/or gamepad control configured
- ✅ Synchronized data recording (camera + actions)
- ✅ Data exported in LeRobot-compatible format
System Architecture
A complete teleoperation data collection system consists of these core components:
Input Device
Keyboard, gamepad, or joystick for human control
Teleop Server
Reads inputs and computes target joint positions
Robot Driver
Executes motion commands on hardware
Camera System
Captures synchronized visual observations
Data Recorder
Saves episodes to dataset format
Monitoring
Real-time visualization and debugging
Recommended Hardware
| Component | Recommendation | Price Range |
|---|---|---|
| Input Device | Xbox One Controller / PlayStation DualSense | $50-$70 |
| Backup | Keyboard (any) | $0 (already have) |
| Computer | Jetson Orin Nano 8GB | $300-$400 |
| Cameras | 2-3 x Logitech C920/C930e | $60-$100 each |
Option 1: Keyboard Teleoperation
Keyboard teleoperation is the easiest to get started - you probably already have a keyboard. It's great for:
- Beginner-friendly quick tests
- Simple navigation tasks (differential drive)
- When you don't have a gamepad handy
Implementation with Pynput
import threading
import time
from dataclasses import dataclass
from typing import Optional
from pynput import keyboard
@dataclass
class VelocityCommand:
linear_x: float # forward/backward [-1, 1]
angular_z: float # rotation [-1, 1]
class KeyboardTeleop:
def __init__(self, linear_speed=0.5, angular_speed=1.0):
self.linear_speed = linear_speed
self.angular_speed = angular_speed
self.cmd = VelocityCommand(0.0, 0.0)
self.running = True
# Track key states
self.pressed_keys = set()
# Start listener in separate thread
self.listener = keyboard.Listener(
on_press=self.on_press,
on_release=self.on_release
)
self.listener.start()
# Update thread
self.update_thread = threading.Thread(target=self.update_loop)
self.update_thread.daemon = True
self.update_thread.start()
def on_press(self, key):
try:
self.pressed_keys.add(key.char)
except AttributeError:
self.pressed_keys.add(key)
def on_release(self, key):
try:
if key.char in self.pressed_keys:
self.pressed_keys.remove(key.char)
except AttributeError:
if key in self.pressed_keys:
self.pressed_keys.remove(key)
# Exit on ESC
if key == keyboard.Key.esc:
self.running = False
return False
def update_loop(self):
while self.running:
linear = 0.0
angular = 0.0
# WASD controls
if 'w' in self.pressed_keys:
linear += self.linear_speed
if 's' in self.pressed_keys:
linear -= self.linear_speed
if 'a' in self.pressed_keys:
angular += self.angular_speed
if 'd' in self.pressed_keys:
angular -= self.angular_speed
self.cmd = VelocityCommand(linear, angular)
time.sleep(0.01)
def get_command(self) -> VelocityCommand:
return self.cmd
def stop(self):
self.running = False
self.listener.stop()
self.update_thread.join()
# Usage example
if __name__ == "__main__":
teleop = KeyboardTeleop()
try:
while teleop.running:
cmd = teleop.get_command()
print(f"Linear: {cmd.linear_x:.2f}, Angular: {cmd.angular_z:.2f}")
time.sleep(0.1)
except KeyboardInterrupt:
pass
finally:
teleop.stop()
💡 Controls: Use W forward, S backward, A turn left, D turn right. Press ESC to exit.
Option 2: Gamepad Teleoperation
Gamepads provide much better ergonomic control for longer data collection sessions. We recommend using inputs library which works with most gamepads on Linux.
Setup
pip install inputs
# Check if your gamepad is detected
ls -la /dev/input/
# Should show js0 (joystick device)
Implementation
from inputs import get_gamepad
import threading
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class GamepadCommand:
left_stick_x: float # [-1, 1]
left_stick_y: float # [-1, 1]
right_stick_x: float # [-1, 1]
right_stick_y: float # [-1, 1]
buttons: dict[str, bool]
class GamepadTeleop:
def __init__(self, deadzone=0.1):
self.deadzone = deadzone
self.left_x = 0.0
self.left_y = 0.0
self.right_x = 0.0
self.right_y = 0.0
self.buttons = {
'A': False, 'B': False, 'X': False, 'Y': False,
'LB': False, 'RB': False, 'START': False, 'BACK': False
}
self.running = True
self.cmd_lock = threading.Lock()
# Start reading thread
self.read_thread = threading.Thread(target=self.read_loop)
self.read_thread.daemon = True
self.read_thread.start()
def apply_deadzone(self, value):
"""Filter out small joystick drift"""
if abs(value) < self.deadzone:
return 0.0
return value
def read_loop(self):
"""Background thread that continuously reads gamepad events"""
while self.running:
try:
events = get_gamepad()
with self.cmd_lock:
for event in events:
if event.code == 'ABS_X':
self.left_x = self.apply_deadzone(event.state / 32768.0)
elif event.code == 'ABS_Y':
self.left_y = self.apply_deadzone(event.state / 32768.0)
elif event.code == 'ABS_RX':
self.right_x = self.apply_deadzone(event.state / 32768.0)
elif event.code == 'ABS_RY':
self.right_y = self.apply_deadzone(event.state / 32768.0)
elif event.code == 'BTN_SOUTH':
self.buttons['A'] = event.state == 1
elif event.code == 'BTN_EAST':
self.buttons['B'] = event.state == 1
elif event.code == 'BTN_NORTH':
self.buttons['Y'] = event.state == 1
elif event.code == 'BTN_WEST':
self.buttons['X'] = event.state == 1
except Exception:
# Gamepad disconnected
time.sleep(0.1)
def get_velocity_command(self) -> tuple[float, float]:
"""Get differential drive velocity command from left stick"""
with self.cmd_lock:
# Left stick Y -> linear velocity
linear = -self.left_y # Negative because Y is up=negative
# Left stick X -> angular velocity
angular = -self.left_x
# Scale to reasonable speeds
linear = linear * 0.5 # max 0.5 m/s
angular = angular * 1.5 # max 1.5 rad/s
return linear, angular
def get_command(self) -> GamepadCommand:
"""Get full gamepad state"""
with self.cmd_lock:
return GamepadCommand(
left_stick_x=self.left_x,
left_stick_y=self.left_y,
right_stick_x=self.right_x,
right_stick_y=self.right_y,
buttons=self.buttons.copy()
)
def stop(self):
self.running = False
self.read_thread.join()
# Usage example
if __name__ == "__main__":
teleop = GamepadTeleop()
try:
while True:
linear, angular = teleop.get_velocity_command()
cmd = teleop.get_command()
print(f"Linear: {linear:.2f}, Angular: {angular:.2f}, A: {cmd.buttons['A']}")
time.sleep(0.1)
except KeyboardInterrupt:
pass
finally:
teleop.stop()
Recommended Control Mapping (Differential Drive)
- Left Stick Forward/Backward: Linear velocity
- Left Stick Left/Right: Angular velocity (steering)
- Right Stick: Optional - camera pan/tilt if you have a gimbal
- A Button: Start recording episode
- B Button: Stop recording episode
- X Button: Save/discard current episode
💡 Pro Tip: For manipulation tasks, use left stick for X/Y translation and right stick for Z rotation. Second stick axis for Z translation.
Data Recording
Once you have teleoperation working, the next step is to record synchronized demonstrations. Each demonstration is called an "episode" - typically one complete attempt at a task.
What to Record
For VLA training, you need to record at minimum:
- Images from each camera (synchronized)
- Action (joint positions / velocities) at each timestep
- Metadata: episode index, task description, success/failure
Implementation
import cv2
import numpy as np
import json
import h5py
import os
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import List, Optional
from teleop import GamepadTeleop
from cameras import SyncedMultiCamera
@dataclass
class EpisodeFrame:
timestamp: float
images: dict[str, np.ndarray] # camera_name -> image
actions: dict[str, np.ndarray] # joint/velocity actions
feedback: dict # robot state feedback
class DataRecorder:
def __init__(self, output_dir: str, task_name: str):
self.output_dir = output_dir
self.task_name = task_name
self.current_episode: List[EpisodeFrame] = []
self.is_recording = False
self.episode_idx = self._get_next_episode_idx()
# Create output directory
os.makedirs(output_dir, exist_ok=True)
def _get_next_episode_idx(self) -> int:
"""Find next available episode index"""
existing = [int(f.split('_')[1]) for f in os.listdir(self.output_dir)
if f.startswith('episode_') and f.endswith('.hdf5')]
if not existing:
return 0
return max(existing) + 1
def start_episode(self):
"""Start a new demonstration episode"""
self.current_episode = []
self.is_recording = True
print(f"Starting episode {self.episode_idx}")
def record_frame(self, frame: EpisodeFrame):
"""Add a frame to current episode if recording"""
if self.is_recording:
self.current_episode.append(frame)
def end_episode(self, success: bool = True):
"""Save current episode to disk"""
if not self.is_recording:
return
self.is_recording = False
if len(self.current_episode) == 0:
print("Empty episode, skipping...")
return
# Save episode to HDF5
filename = f"episode_{self.episode_idx:04d}.hdf5"
filepath = os.path.join(self.output_dir, filename)
with h5py.File(filepath, 'w') as f:
# Store metadata
f.attrs['episode_idx'] = self.episode_idx
f.attrs['task_name'] = self.task_name
f.attrs['success'] = success
f.attrs['num_frames'] = len(self.current_episode)
f.attrs['timestamp'] = datetime.now().isoformat()
# Store each frame
for i, frame in enumerate(self.current_episode):
group = f.create_group(f'frame_{i:04d}')
group.attrs['timestamp'] = frame.timestamp
# Save images
img_group = group.create_group('images')
for cam_name, img in frame.images.items():
img_group.create_dataset(
cam_name,
data=img,
compression='gzip',
compression_opts=9
)
# Save actions
action_group = group.create_group('actions')
for act_name, act in frame.actions.items():
action_group.create_dataset(act_name, data=act)
# Save feedback
fb_group = group.create_group('feedback')
for fb_name, fb_val in frame.feedback.items():
fb_group.attrs[fb_name] = fb_val
# Save metadata JSON
meta_path = os.path.join(self.output_dir, f"episode_{self.episode_idx:04d}.json")
meta = {
'episode_idx': self.episode_idx,
'task_name': self.task_name,
'success': success,
'num_frames': len(self.current_episode),
'duration_seconds': self.current_episode[-1].timestamp - self.current_episode[0].timestamp,
'created_at': datetime.now().isoformat()
}
with open(meta_path, 'w') as f:
json.dump(meta, f, indent=2)
print(f"Saved episode {self.episode_idx} with {len(self.current_episode)} frames")
self.episode_idx += 1
self.current_episode = []
def stop(self):
if self.is_recording:
self.end_episode(success=False)
# Main recording loop example
if __name__ == "__main__":
# Initialize components
teleop = GamepadTeleop()
cameras = SyncedMultiCamera()
recorder = DataRecorder(
output_dir="./data/my_first_task",
task_name="pick_and_place_box"
)
# Initialize your robot driver here...
# robot = RobotDriver()
try:
recording = False
while True:
cmd = teleop.get_command()
linear, angular = teleop.get_velocity_command()
# Check buttons for recording control
if cmd.buttons['A'] and not recording:
recorder.start_episode()
recording = True
time.sleep(0.3) # Debounce
if cmd.buttons['B'] and recording:
recorder.end_episode(success=True)
recording = False
time.sleep(0.3) # Debounce
# Get synchronized camera frames
synced_frames = cameras.capture_synced()
# Get current robot action/state
# current_joints = robot.get_joint_positions()
# Create frame for recording
frame = EpisodeFrame(
timestamp=time.time(),
images={f"cam_{i}": img for i, (name, img) in enumerate(synced_frames.frames.items())},
actions={'linear_vel': np.array([linear]), 'angular_vel': np.array([angular])},
feedback={} # Add joint positions, etc. here
)
# Record if active
if recording:
recorder.record_frame(frame)
# Send command to robot
# robot.send_velocity(linear, angular)
time.sleep(1.0 / 30.0) # 30 FPS
except KeyboardInterrupt:
print("\nShutting down...")
finally:
recorder.stop()
teleop.stop()
cameras.stop()
LeRobot Framework Integration
XRollout is fully compatible with the LeRobot framework from Hugging Face. Converting your recorded data to LeRobot format is straightforward:
import lerobot
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
import h5py
import os
import cv2
# Create LeRobot dataset
dataset = LeRobotDataset.create(
repo_id="xrollout/my_pick_and_place",
root="./data/lerobot_format",
fps=30,
robot_type="my_robot",
features={
"image_front": {
"dtype": "image",
"shape": (480, 640, 3),
},
"image_top": {
"dtype": "image",
"shape": (480, 640, 3),
},
"action": {
"dtype": "float32",
"shape": (2,), # linear, angular
},
"state": {
"dtype": "float32",
"shape": (2,), # wheel positions
}
},
)
# Convert each episode
data_dir = "./data/my_first_task"
episode_files = sorted([f for f in os.listdir(data_dir) if f.endswith('.hdf5')])
for episode_file in episode_files:
print(f"Converting {episode_file}...")
with h5py.File(os.path.join(data_dir, episode_file), 'r') as f:
num_frames = f.attrs['num_frames']
success = f.attrs['success']
if not success:
continue # Skip failed episodes
# Create new episode in LeRobot format
episode = []
for i in range(num_frames):
frame_group = f[f'frame_{i:04d}']
# Get images (LeRobot handles saving to disk)
images = {}
for cam_name in frame_group['images'].keys():
img = frame_group['images'][cam_name][:]
# Convert BGR to RGB if needed
if img.shape[2] == 3:
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
images[f"image_{cam_name}"] = img
# Get actions
actions = []
for act_name in frame_group['actions'].keys():
actions.append(frame_group['actions'][act_name][()])
action = np.concatenate(actions)
# Add frame to episode
frame_dict = {
**images,
"action": action,
"state": action, # For diffusion inference, we need current state
}
episode.append(frame_dict)
# Add entire episode to dataset
if len(episode) > 0:
dataset.add_episode(episode)
# Save dataset info
dataset.save()
print(f"Converted dataset saved to {dataset.root}")
# Optionally push to Hugging Face Hub
# dataset.push_to_hub()
📚 Learn more: Check out the LeRobot documentation for advanced features like visual prompting and model training.
Best Practices for High-Quality Data
1. Lighting is Everything
Good lighting dramatically improves model performance. Use diffuse, even lighting - avoid harsh shadows and glare on reflective surfaces. If possible, add a ring light around your primary camera.
2. Record Both Successful and Failed Attempts
Don't just throw away failed demonstrations. Including failures helps the model learn what not to do, improving robustness. Mark them in metadata and the model will learn from them.
3. Vary Your Demonstrations
Don't demonstrate the exact same motion every time. Vary:
- Starting positions
- Object locations
- Speed of execution
- Minor corrections along the way
This gives the model better generalization.
4. How Many Episodes Do You Need?
| Task Complexity | Minimum Episodes | Good Results At |
|---|---|---|
| Simple navigation | 20-50 | 100 |
| Pick & place (single object) | 50-100 | 200 |
| Complex manipulation | 100-200 | 500+ |
5. Review Your Data Regularly
Every 50 episodes, spot-check a few to make sure:
- Images are correctly exposed and in focus
- Actions are being recorded correctly
- Timestamps are synchronized within acceptable tolerance
⚠️ Common Pitfall: Unsynchronized data where the image doesn't match the action is the #1 cause of poor model performance. Always double-check synchronization.
Troubleshooting
Gamepad not detected on Linux
# Check if kernel sees the device
dmesg | grep -i "input\|joy"
# Check permissions
ls -la /dev/input/js0
# Add your user to input group
sudo usermod -aG input $USER
# You need to log out and back in
Input lag
- Make sure you're running on the robot's onboard computer (not remote control over WiFi)
- Use lower resolution cameras (640x480 is enough for most tasks)
- Check that compression isn't causing CPU bottlenecks
Jittery motion
- Increase deadzone to filter out joystick drift
- Add a simple moving average filter to commands
- Check servo voltage - brownouts cause unstable motion
💡 Final Tip: Start small - collect 10 episodes, train a tiny model, see if it works. Then scale up. It's much better to iterate quickly than collect 500 episodes before discovering an issue.
Summary
Teleoperation data collection is the foundation of any VLA project. With the setup described in this guide, you can:
- ✅ Control your robot with keyboard or gamepad
- ✅ Record synchronized camera images and actions
- ✅ Convert data to LeRobot format for training
- ✅ Collect high-quality demonstrations that result in better models