import openai import numpy as np from tempfile import NamedTemporaryFile import copy import shapely from shapely.geometry import * from shapely.affinity import * from omegaconf import OmegaConf from moviepy.editor import ImageSequenceClip import gradio as gr from lmp import LMP, LMPFGen from sim import PickPlaceEnv, LMP_wrapper, VoxPoserWrapper, AgibotWrapper from consts import ALL_BLOCKS, ALL_BOWLS from md_logger import MarkdownLogger from utils import get_geoip class DemoRunner: def __init__(self, config_file: str = 'cfg.yaml'): self._cfg = OmegaConf.to_container(OmegaConf.load(config_file), resolve=True) self._env = None self._md_logger = MarkdownLogger() def make_LMP(self, env, cfg_choice): # LMP env wrapper cfg = copy.deepcopy(self._cfg) # cfg['env'] = { # 'init_objs': list(env.obj_name_to_id.keys()), # 'coords': cfg['tabletop_coords'] # } if cfg_choice == "voxposer": LMP_env = VoxPoserWrapper(env, cfg) elif cfg_choice == "agibot": LMP_env = AgibotWrapper(env, cfg) # creating APIs that the LMPs can interact with fixed_vars = { 'np': np } fixed_vars.update({ name: eval(name) for name in shapely.geometry.__all__ + shapely.affinity.__all__ }) variable_vars = { k: getattr(LMP_env, k) for k in dir(LMP_env) if not k.startswith("__") and callable(getattr(LMP_env, k)) } variable_vars['say'] = lambda msg: self._md_logger.log_text(f'Robot says: "{msg}"') # creating the function-generating LMP lmp_fgen = LMPFGen(cfg['lmps']['fgen'], fixed_vars, variable_vars, self._md_logger) # creating other low-level LMPs variable_vars.update({ k: LMP(k, cfg['lmps'][k], lmp_fgen, fixed_vars, variable_vars, self._md_logger) for k in cfg['lmps'].keys() if k != 'fgen' }) # creating the LMP that deals w/ high-level language commands lmp_planner = LMP( 'planner', cfg['lmps']['planner'], lmp_fgen, fixed_vars, variable_vars, self._md_logger ) return lmp_planner def setup(self, api_key, n_blocks, n_bowls, proxy, cfg_choice): openai.api_key = api_key # self._env = PickPlaceEnv(render=True, high_res=True, high_frame_rate=False) # list_idxs = np.random.choice(len(ALL_BLOCKS), size=max(n_blocks, n_bowls), replace=False) # block_list = [ALL_BLOCKS[i] for i in list_idxs[:n_blocks]] # bowl_list = [ALL_BOWLS[i] for i in list_idxs[:n_bowls]] # obj_list = block_list + bowl_list # self._env.reset(obj_list) self._lmp_planner = self.make_LMP(self._env, cfg_choice) # info = '### Available Objects: \n- ' + '\n- '.join(obj_list) # img = self._env.get_camera_image() info, img = None, None return info, img def run(self, instruction): # if self._env is None: # return 'Please run setup first!', None, None # self._env.cache_video = [] self._md_logger.clear() self._lmp_planner(instruction) # try: # self._lmp_planner(instruction, f'objects = {self._env.object_list}') # except Exception as e: # return f'Error: {e}', None, None # video_file_name = None # if self._env.cache_video: # rendered_clip = ImageSequenceClip(self._env.cache_video, fps=25) # video_file_name = NamedTemporaryFile(suffix='.mp4').name # rendered_clip.write_videofile(video_file_name, fps=25) video_file_name = None # return self._md_logger.get_log(), self._env.get_camera_image(), video_file_name return self._md_logger.get_log(), None, video_file_name def setup(api_key, n_blocks, n_bowls, proxy_addr, cfg_choice): if not api_key: return 'Please enter your OpenAI API key!', None, None if n_blocks + n_bowls == 0: return 'Please select at least one object!', None, None if proxy_addr is not None and "http://" in proxy_addr: openai.proxy = proxy_addr elif proxy_addr is not None: openai.proxy = f"http://{proxy_addr}" # ip_status, ip_info = get_geoip(openai.proxy) # if ip_status == -1: # return ip_info, None, None # elif ip_status == 0: # pressed_key = input('Continue with current ip location? (y/n)') # if pressed_key.lower() != 'y': # return ip_info, None, None # else: # print(f'{ip_info} IP location check passed.') if cfg_choice == "voxposer": cfg_file = 'cfg_voxposer.yaml' elif cfg_choice == "agibot": cfg_file = 'cfg_agibot.yaml' demo_runner = DemoRunner(cfg_file) info, img = demo_runner.setup(api_key, n_blocks, n_bowls, proxy_addr, cfg_choice) return info, img, demo_runner def run(instruction, demo_runner): if demo_runner is None: return 'Please run setup first!', None, None return demo_runner.run(instruction) if __name__ == '__main__': with open('README.md', 'r') as f: for _ in range(12): next(f) readme_text = f.read() with gr.Blocks() as demo: state = gr.State(None) # gr.Markdown(readme_text) gr.Markdown('# Interactive Demo') with gr.Row(): with gr.Column(): with gr.Column(): inp_api_key = gr.Textbox(label='OpenAI API Key (this is not stored anywhere)', lines=1) inp_proxy_addr = gr.Textbox(label='Your local proxy address', lines=1) inp_cfg = gr.Dropdown(label='Configuration', choices=['voxposer', 'agibot']) with gr.Row(): inp_n_blocks = gr.Slider(label='Number of Blocks', minimum=0, maximum=4, value=3, step=1) inp_n_bowls = gr.Slider(label='Number of Bowls', minimum=0, maximum=4, value=3, step=1) btn_setup = gr.Button("Setup/Reset Simulation") info_setup = gr.Markdown(label='Setup Info') with gr.Column(): img_setup = gr.Image(label='Current Simulation') with gr.Row(): with gr.Column(): inp_instruction = gr.Textbox(label='Instruction', lines=1) btn_run = gr.Button("Run (this may take 30+ seconds)") info_run = gr.Markdown(label='Generated Code') with gr.Column(): video_run = gr.Video(label='Video of Last Instruction') btn_setup.click( setup, inputs=[inp_api_key, inp_n_blocks, inp_n_bowls, inp_proxy_addr, inp_cfg], outputs=[info_setup, img_setup, state] ) btn_run.click( run, inputs=[inp_instruction, state], outputs=[info_run, img_setup, video_run] ) demo.launch()