Spaces:
Sleeping
Sleeping
LeRobot Arena Python Client
Python client library for the LeRobot Arena robotics API with separate Producer and Consumer classes.
Installation
pip install -e .
Or with development dependencies:
pip install -e ".[dev]"
Basic Usage
Producer (Controller) Example
import asyncio
from lerobot_arena_client import RoboticsProducer
async def main():
# Create producer client
producer = RoboticsProducer('http://localhost:8000')
# List available rooms
rooms = await producer.list_rooms()
print('Available rooms:', rooms)
# Create new room and connect
room_id = await producer.create_room()
await producer.connect(room_id)
# Send initial state
await producer.send_state_sync({
'shoulder': 45.0,
'elbow': -20.0
})
# Send joint updates (only changed values will be forwarded!)
await producer.send_joint_update([
{'name': 'shoulder', 'value': 45.0},
{'name': 'elbow', 'value': -20.0}
])
# Handle errors
producer.on_error(lambda err: print(f'Error: {err}'))
# Disconnect
await producer.disconnect()
if __name__ == "__main__":
asyncio.run(main())
Consumer (Robot Executor) Example
import asyncio
from lerobot_arena_client import RoboticsConsumer
async def main():
consumer = RoboticsConsumer('http://localhost:8000')
# Connect to existing room
room_id = "your-room-id"
await consumer.connect(room_id)
# Get initial state
initial_state = await consumer.get_state_sync()
print('Initial state:', initial_state)
# Set up event handlers
def on_state_sync(state):
print('State sync:', state)
def on_joint_update(joints):
print('Execute joints:', joints)
# Execute on actual robot hardware
for joint in joints:
print(f"Moving {joint['name']} to {joint['value']}")
def on_error(error):
print(f'Error: {error}')
# Register callbacks
consumer.on_state_sync(on_state_sync)
consumer.on_joint_update(on_joint_update)
consumer.on_error(on_error)
# Keep running
try:
await asyncio.sleep(60) # Run for 60 seconds
finally:
await consumer.disconnect()
if __name__ == "__main__":
asyncio.run(main())
Factory Function Usage
import asyncio
from lerobot_arena_client import create_client
async def main():
# Create clients using factory function
producer = create_client("producer", "http://localhost:8000")
consumer = create_client("consumer", "http://localhost:8000")
# Or use convenience functions
from lerobot_arena_client import create_producer_client, create_consumer_client
# Quick producer setup (auto-creates room and connects)
producer = await create_producer_client('http://localhost:8000')
print(f"Producer connected to room: {producer.room_id}")
# Quick consumer setup (connects to existing room)
consumer = await create_consumer_client(producer.room_id, 'http://localhost:8000')
# Use context managers for automatic cleanup
async with RoboticsProducer('http://localhost:8000') as producer:
room_id = await producer.create_room()
await producer.connect(room_id)
await producer.send_state_sync({'joint1': 10.0})
if __name__ == "__main__":
asyncio.run(main())
Advanced Example: Producer-Consumer Pair
import asyncio
from lerobot_arena_client import RoboticsProducer, RoboticsConsumer
async def run_producer(room_id: str):
async with RoboticsProducer() as producer:
await producer.connect(room_id)
# Simulate sending commands
for i in range(10):
await producer.send_state_sync({
'joint1': i * 10.0,
'joint2': i * -5.0
})
await asyncio.sleep(1)
async def run_consumer(room_id: str):
async with RoboticsConsumer() as consumer:
await consumer.connect(room_id)
def handle_joint_update(joints):
print(f"🤖 Executing: {joints}")
# Your robot control code here
consumer.on_joint_update(handle_joint_update)
# Keep listening
await asyncio.sleep(15)
async def main():
# Create room
producer = RoboticsProducer()
room_id = await producer.create_room()
# Run producer and consumer concurrently
await asyncio.gather(
run_producer(room_id),
run_consumer(room_id)
)
if __name__ == "__main__":
asyncio.run(main())
API Reference
RoboticsProducer
Connection Methods:
connect(room_id, participant_id=None)- Connect as producer to room
Control Methods:
send_joint_update(joints)- Send joint updatessend_state_sync(state)- Send state synchronization (dict format)send_emergency_stop(reason)- Send emergency stop
Event Callbacks:
on_error(callback)- Set error callbackon_connected(callback)- Set connection callbackon_disconnected(callback)- Set disconnection callback
RoboticsConsumer
Connection Methods:
connect(room_id, participant_id=None)- Connect as consumer to room
State Methods:
get_state_sync()- Get current state synchronously
Event Callbacks:
on_state_sync(callback)- Set state sync callbackon_joint_update(callback)- Set joint update callbackon_error(callback)- Set error callbackon_connected(callback)- Set connection callbackon_disconnected(callback)- Set disconnection callback
RoboticsClientCore (Base Class)
REST API Methods:
list_rooms()- List all available roomscreate_room(room_id=None)- Create a new roomdelete_room(room_id)- Delete a roomget_room_state(room_id)- Get current room stateget_room_info(room_id)- Get basic room information
Utility Methods:
send_heartbeat()- Send heartbeat to serveris_connected()- Check connection statusget_connection_info()- Get connection detailsdisconnect()- Disconnect from room
Factory Functions
create_client(role, base_url)- Create client by role ("producer" or "consumer")create_producer_client(base_url, room_id=None)- Create connected producercreate_consumer_client(room_id, base_url)- Create connected consumer
Requirements
- Python 3.12+
- aiohttp>=3.9.0
- websockets>=12.0
Migration from v1
The old RoboticsClient is still available for backward compatibility but is now an alias to RoboticsClientCore. For new code, use the specific RoboticsProducer or RoboticsConsumer classes for better type safety and cleaner APIs.