Spaces:
Sleeping
Sleeping
import openai | |
import numpy as np | |
from tempfile import NamedTemporaryFile | |
import copy | |
import shapely | |
from shapely.geometry import * | |
from shapely.affinity import * | |
from omegaconf import OmegaConf | |
from moviepy.editor import ImageSequenceClip | |
import gradio as gr | |
from lmp import LMP, LMPFGen | |
from sim import PickPlaceEnv, LMP_wrapper, VoxPoserWrapper, AgibotWrapper | |
from consts import ALL_BLOCKS, ALL_BOWLS | |
from md_logger import MarkdownLogger | |
from utils import get_geoip | |
class DemoRunner: | |
def __init__(self, config_file: str = 'cfg.yaml'): | |
self._cfg = OmegaConf.to_container(OmegaConf.load(config_file), resolve=True) | |
self._env = None | |
self._md_logger = MarkdownLogger() | |
def make_LMP(self, env, cfg_choice): | |
# LMP env wrapper | |
cfg = copy.deepcopy(self._cfg) | |
# cfg['env'] = { | |
# 'init_objs': list(env.obj_name_to_id.keys()), | |
# 'coords': cfg['tabletop_coords'] | |
# } | |
if cfg_choice == "voxposer": | |
LMP_env = VoxPoserWrapper(env, cfg) | |
elif cfg_choice == "agibot": | |
LMP_env = AgibotWrapper(env, cfg) | |
# creating APIs that the LMPs can interact with | |
fixed_vars = { | |
'np': np | |
} | |
fixed_vars.update({ | |
name: eval(name) | |
for name in shapely.geometry.__all__ + shapely.affinity.__all__ | |
}) | |
variable_vars = { | |
k: getattr(LMP_env, k) | |
for k in dir(LMP_env) | |
if not k.startswith("__") and callable(getattr(LMP_env, k)) | |
} | |
variable_vars['say'] = lambda msg: self._md_logger.log_text(f'Robot says: "{msg}"') | |
# creating the function-generating LMP | |
lmp_fgen = LMPFGen(cfg['lmps']['fgen'], fixed_vars, variable_vars, self._md_logger) | |
# creating other low-level LMPs | |
variable_vars.update({ | |
k: LMP(k, cfg['lmps'][k], lmp_fgen, fixed_vars, variable_vars, self._md_logger) | |
for k in cfg['lmps'].keys() if k != 'fgen' | |
}) | |
# creating the LMP that deals w/ high-level language commands | |
lmp_planner = LMP( | |
'planner', cfg['lmps']['planner'], lmp_fgen, fixed_vars, variable_vars, self._md_logger | |
) | |
return lmp_planner | |
def setup(self, api_key, n_blocks, n_bowls, cfg_choice): | |
openai.api_key = api_key | |
# self._env = PickPlaceEnv(render=True, high_res=True, high_frame_rate=False) | |
# list_idxs = np.random.choice(len(ALL_BLOCKS), size=max(n_blocks, n_bowls), replace=False) | |
# block_list = [ALL_BLOCKS[i] for i in list_idxs[:n_blocks]] | |
# bowl_list = [ALL_BOWLS[i] for i in list_idxs[:n_bowls]] | |
# obj_list = block_list + bowl_list | |
# self._env.reset(obj_list) | |
self._lmp_planner = self.make_LMP(self._env, cfg_choice) | |
# info = '### Available Objects: \n- ' + '\n- '.join(obj_list) | |
# img = self._env.get_camera_image() | |
info, img = None, None | |
return info, img | |
def run(self, instruction): | |
# if self._env is None: | |
# return 'Please run setup first!', None, None | |
# self._env.cache_video = [] | |
self._md_logger.clear() | |
self._lmp_planner(instruction) | |
# try: | |
# self._lmp_planner(instruction, f'objects = {self._env.object_list}') | |
# except Exception as e: | |
# return f'Error: {e}', None, None | |
# video_file_name = None | |
# if self._env.cache_video: | |
# rendered_clip = ImageSequenceClip(self._env.cache_video, fps=25) | |
# video_file_name = NamedTemporaryFile(suffix='.mp4').name | |
# rendered_clip.write_videofile(video_file_name, fps=25) | |
video_file_name = None | |
# return self._md_logger.get_log(), self._env.get_camera_image(), video_file_name | |
return self._md_logger.get_log(), None, video_file_name | |
def setup(api_key, n_blocks, n_bowls, cfg_choice): | |
if not api_key: | |
return 'Please enter your OpenAI API key!', None, None | |
if n_blocks + n_bowls == 0: | |
return 'Please select at least one object!', None, None | |
# ip_status, ip_info = get_geoip(openai.proxy) | |
# if ip_status == -1: | |
# return ip_info, None, None | |
# elif ip_status == 0: | |
# pressed_key = input('Continue with current ip location? (y/n)') | |
# if pressed_key.lower() != 'y': | |
# return ip_info, None, None | |
# else: | |
# print(f'{ip_info} IP location check passed.') | |
if cfg_choice == "voxposer": | |
cfg_file = 'cfg_voxposer.yaml' | |
elif cfg_choice == "agibot": | |
cfg_file = 'cfg_agibot.yaml' | |
demo_runner = DemoRunner(cfg_file) | |
info, img = demo_runner.setup(api_key, n_blocks, n_bowls, cfg_choice) | |
return info, img, demo_runner | |
def run(instruction, demo_runner): | |
if demo_runner is None: | |
return 'Please run setup first!', None, None | |
return demo_runner.run(instruction) | |
if __name__ == '__main__': | |
with open('README.md', 'r') as f: | |
for _ in range(12): | |
next(f) | |
readme_text = f.read() | |
with gr.Blocks() as demo: | |
state = gr.State(None) | |
# gr.Markdown(readme_text) | |
gr.Markdown('# Interactive Demo') | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Column(): | |
inp_api_key = gr.Textbox(label='OpenAI API Key (this is not stored anywhere)', lines=1) | |
# inp_proxy_addr = gr.Textbox(label='Your local proxy address', lines=1) | |
inp_cfg = gr.Dropdown(label='Configuration', choices=['voxposer', 'agibot']) | |
with gr.Row(): | |
inp_n_blocks = gr.Slider(label='Number of Blocks', minimum=0, maximum=4, value=3, step=1) | |
inp_n_bowls = gr.Slider(label='Number of Bowls', minimum=0, maximum=4, value=3, step=1) | |
btn_setup = gr.Button("Setup/Reset Simulation") | |
info_setup = gr.Markdown(label='Setup Info') | |
with gr.Column(): | |
img_setup = gr.Image(label='Current Simulation') | |
with gr.Row(): | |
with gr.Column(): | |
inp_instruction = gr.Textbox(label='Instruction', lines=1) | |
btn_run = gr.Button("Run (this may take 30+ seconds)") | |
info_run = gr.Markdown(label='Generated Code') | |
with gr.Column(): | |
video_run = gr.Video(label='Video of Last Instruction') | |
btn_setup.click( | |
setup, | |
inputs=[inp_api_key, inp_n_blocks, inp_n_bowls, inp_cfg], | |
outputs=[info_setup, img_setup, state] | |
) | |
btn_run.click( | |
run, | |
inputs=[inp_instruction, state], | |
outputs=[info_run, img_setup, video_run] | |
) | |
demo.launch() |