Spaces:
Running
Running
| import gradio as gr | |
| import re | |
| import base64 | |
| import os | |
| from PIL import Image, ImageDraw | |
| from io import BytesIO | |
| from img_utils import smart_resize | |
| import backoff | |
| import httpx | |
| from loguru import logger | |
| import time | |
| from typing import List, Optional | |
| import traceback | |
| def image_to_base64(image): | |
| buffered = BytesIO() | |
| image.save(buffered, format="PNG") | |
| img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| return img_str | |
| def create_grounding_messages(image: str, instruction: str): | |
| image_path = image_to_base64(image) | |
| """Create chat messages for GUI grounding task.""" | |
| system_prompt = ( | |
| "You are a GUI agent. You are given a task and a screenshot of the screen. " | |
| "You need to perform a series of pyautogui actions to complete the task." | |
| ) | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_path}"}}, | |
| {"type": "text", "text": instruction} | |
| ], | |
| }, | |
| ] | |
| return messages | |
| def draw_circle_at_point(image, x, y, outline_color="#FF3366", line_width=5, radius=15): | |
| """Draw a circle at the specified point""" | |
| draw = ImageDraw.Draw(image) | |
| # Draw circle outline | |
| draw.ellipse( | |
| [x - radius, y - radius, x + radius, y + radius], | |
| outline=outline_color, | |
| width=line_width | |
| ) | |
| return image | |
| def draw_bounding_boxes(image, bounding_boxes, outline_color="#FF3366", line_width=3): | |
| draw = ImageDraw.Draw(image) | |
| for box in bounding_boxes: | |
| xmin, ymin, xmax, ymax = box | |
| # Draw rounded rectangle | |
| radius = 10 # Corner radius | |
| # Draw the rounded rectangle using arcs and lines | |
| # Top left corner | |
| draw.arc((xmin, ymin, xmin + 2 * radius, ymin + 2 * radius), 180, 270, fill=outline_color, width=line_width) | |
| # Top right corner | |
| draw.arc((xmax - 2 * radius, ymin, xmax, ymin + 2 * radius), 270, 0, fill=outline_color, width=line_width) | |
| # Bottom right corner | |
| draw.arc((xmax - 2 * radius, ymax - 2 * radius, xmax, ymax), 0, 90, fill=outline_color, width=line_width) | |
| # Bottom left corner | |
| draw.arc((xmin, ymax - 2 * radius, xmin + 2 * radius, ymax), 90, 180, fill=outline_color, width=line_width) | |
| # Top line | |
| draw.line((xmin + radius, ymin, xmax - radius, ymin), fill=outline_color, width=line_width) | |
| # Right line | |
| draw.line((xmax, ymin + radius, xmax, ymax - radius), fill=outline_color, width=line_width) | |
| # Bottom line | |
| draw.line((xmin + radius, ymax, xmax - radius, ymax), fill=outline_color, width=line_width) | |
| # Left line | |
| draw.line((xmin, ymin + radius, xmin, ymax - radius), fill=outline_color, width=line_width) | |
| return image | |
| # Import parsing and evaluation functions from the provided code | |
| def extract_coordinates(code): | |
| """Extract coordinates from pyautogui command string.""" | |
| coords = {"x1": None, "y1": None, "x2": None, "y2": None} | |
| if not ("x=" in code and "y=" in code): | |
| return None | |
| commands = code.split("\n") | |
| first_cmd = commands[0] | |
| x_match = re.search(r"x=([\d.]+)", first_cmd) | |
| y_match = re.search(r"y=([\d.]+)", first_cmd) | |
| if x_match and y_match: | |
| coords["x1"] = float(x_match.group(1)) | |
| coords["y1"] = float(y_match.group(1)) | |
| coords["x2"] = coords["x1"] | |
| coords["y2"] = coords["y1"] | |
| if len(commands) == 2: | |
| x_match = re.search(r"x=([\d.]+)", commands[1]) | |
| y_match = re.search(r"y=([\d.]+)", commands[1]) | |
| if x_match and y_match: | |
| coords["x2"] = float(x_match.group(1)) | |
| coords["y2"] = float(y_match.group(1)) | |
| if None in coords.values(): | |
| print("Failed to extract coordinates") | |
| return None | |
| return [coords["x1"], coords["y1"], coords["x2"], coords["y2"]] | |
| def split_args(args_str: str) -> List[str]: | |
| args = [] | |
| current_arg = "" | |
| within_string = False | |
| string_char = "" | |
| prev_char = "" | |
| for char in args_str: | |
| if char in ['"', "'"]: | |
| if not within_string: | |
| within_string = True | |
| string_char = char | |
| elif within_string and prev_char != "\\" and char == string_char: | |
| within_string = False | |
| if char == "," and not within_string: | |
| args.append(current_arg) | |
| current_arg = "" | |
| else: | |
| current_arg += char | |
| prev_char = char | |
| if current_arg: | |
| args.append(current_arg) | |
| return args | |
| def correct_pyautogui_arguments(code: str) -> str: | |
| function_corrections = { | |
| "write": { | |
| "incorrect_args": ["text", "content"], | |
| "correct_args": [], | |
| "keyword_arg": "message", | |
| }, | |
| "press": { | |
| "incorrect_args": ["key", "button"], | |
| "correct_args": [], | |
| "keyword_arg": None, | |
| }, | |
| "hotkey": { | |
| "incorrect_args": ["key1", "key2", "keys"], | |
| "correct_args": [], | |
| "keyword_arg": None, | |
| }, | |
| } | |
| lines = code.strip().split("\n") | |
| corrected_lines = [] | |
| for line in lines: | |
| line = line.strip() | |
| match = re.match(r"(pyautogui\.(\w+))\((.*)\)", line) | |
| if match: | |
| full_func_call = match.group(1) | |
| func_name = match.group(2) | |
| args_str = match.group(3) | |
| if func_name in function_corrections: | |
| func_info = function_corrections[func_name] | |
| args = split_args(args_str) | |
| corrected_args = [] | |
| for arg in args: | |
| arg = arg.strip() | |
| kwarg_match = re.match(r"(\w+)\s*=\s*(.*)", arg) | |
| if kwarg_match: | |
| arg_name = kwarg_match.group(1) | |
| arg_value = kwarg_match.group(2) | |
| if arg_name in func_info["incorrect_args"]: | |
| if func_info["keyword_arg"]: | |
| corrected_args.append(f"{func_info['keyword_arg']}={arg_value}") | |
| else: | |
| corrected_args.append(arg_value) | |
| else: | |
| corrected_args.append(f"{arg_name}={arg_value}") | |
| else: | |
| corrected_args.append(arg) | |
| corrected_args_str = ", ".join(corrected_args) | |
| corrected_line = f"{full_func_call}({corrected_args_str})" | |
| corrected_lines.append(corrected_line) | |
| else: | |
| corrected_lines.append(line) | |
| else: | |
| corrected_lines.append(line) | |
| corrected_code = "\n".join(corrected_lines) | |
| return corrected_code | |
| def transform_agnet_action_to_code_block(action): | |
| if any(keyword in action for keyword in ["computer.terminate", "computer.wait", "browser.select_option", "browser.clear"]): | |
| return f"```code\n{action}\n```" | |
| else: | |
| return f"```python\n{action}\n```" | |
| def _coordinate_projection(x, y, screen_width, screen_height, coordinate_type): | |
| if coordinate_type == "relative": | |
| return int(round(x * screen_width)), int(round(y * screen_height)) | |
| elif coordinate_type == "absolute": | |
| return x, y | |
| elif coordinate_type == "qwen25": | |
| if 0 <= x <= 1 and 0 <= y <= 1: | |
| # If already normalized, treat like "relative" | |
| return int(round(x * screen_width)), int(round(y * screen_height)) | |
| height, width = smart_resize( | |
| height=screen_height, | |
| width=screen_width, | |
| factor=28, | |
| min_pixels=3136, | |
| max_pixels=12845056 | |
| ) | |
| return int(x / width * screen_width), int(y / height * screen_height) | |
| elif coordinate_type == "relative1000": | |
| if screen_width == 0 or screen_height == 0: | |
| raise ValueError("Screen width and height must be greater than zero for relative1000 coordinates.") | |
| x_abs = int(round(x * screen_width / 1000)) | |
| y_abs = int(round(y * screen_height / 1000)) | |
| return x_abs, y_abs | |
| else: | |
| raise ValueError(f"Unsupported coordinate type: {coordinate_type}") | |
| def project_coordinate_to_absolute_scale(pyautogui_code_relative_coordinates, screen_width, screen_height, coordinate_type="qwen25"): | |
| """ | |
| Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size. | |
| """ | |
| import re | |
| import ast | |
| if coordinate_type not in ["relative", "relative1000", "absolute", "qwen25"]: | |
| raise ValueError(f"Invalid coordinate type: {coordinate_type}. Expected one of ['relative', 'relative1000', 'absolute', 'qwen25'].") | |
| pattern = r'(pyautogui\.\w+\([^\)]*\))' | |
| matches = re.findall(pattern, pyautogui_code_relative_coordinates) | |
| new_code = pyautogui_code_relative_coordinates | |
| for full_call in matches: | |
| func_name_pattern = r'(pyautogui\.\w+)\((.*)\)' | |
| func_match = re.match(func_name_pattern, full_call, re.DOTALL) | |
| if not func_match: | |
| continue | |
| func_name = func_match.group(1) | |
| args_str = func_match.group(2) | |
| try: | |
| parsed = ast.parse(f"func({args_str})").body[0].value | |
| parsed_args = parsed.args | |
| parsed_keywords = parsed.keywords | |
| except SyntaxError: | |
| return pyautogui_code_relative_coordinates | |
| function_parameters = { | |
| 'click': ['x', 'y', 'clicks', 'interval', 'button', 'duration', 'pause'], | |
| 'moveTo': ['x', 'y', 'duration', 'tween', 'pause'], | |
| 'moveRel': ['xOffset', 'yOffset', 'duration', 'tween', 'pause'], | |
| 'dragTo': ['x', 'y', 'duration', 'button', 'mouseDownUp', 'pause'], | |
| 'dragRel': ['xOffset', 'yOffset', 'duration', 'button', 'mouseDownUp', 'pause'], | |
| 'doubleClick': ['x', 'y', 'interval', 'button', 'duration', 'pause'], | |
| } | |
| func_base_name = func_name.split('.')[-1] | |
| param_names = function_parameters.get(func_base_name, []) | |
| args = {} | |
| for idx, arg in enumerate(parsed_args): | |
| if idx < len(param_names): | |
| param_name = param_names[idx] | |
| arg_value = ast.literal_eval(arg) | |
| args[param_name] = arg_value | |
| try: | |
| for kw in parsed_keywords: | |
| param_name = kw.arg | |
| arg_value = ast.literal_eval(kw.value) | |
| args[param_name] = arg_value | |
| except Exception as e: | |
| logger.error(f"Error parsing keyword arguments: {e}") | |
| return pyautogui_code_relative_coordinates | |
| updated = False | |
| if 'x' in args and 'y' in args: | |
| try: | |
| x_rel = float(args['x']) | |
| y_rel = float(args['y']) | |
| x_abs, y_abs = _coordinate_projection(x_rel, y_rel, screen_width, screen_height, coordinate_type) | |
| logger.warning(f"Projecting coordinates: ({x_rel}, {y_rel}) to ({x_abs}, {y_abs}) using {coordinate_type} projection.") | |
| args['x'] = x_abs | |
| args['y'] = y_abs | |
| updated = True | |
| except ValueError: | |
| pass | |
| if 'xOffset' in args and 'yOffset' in args: | |
| try: | |
| x_rel = float(args['xOffset']) | |
| y_rel = float(args['yOffset']) | |
| x_abs, y_abs = _coordinate_projection(x_rel, y_rel, screen_width, screen_height, coordinate_type) | |
| args['xOffset'] = x_abs | |
| args['yOffset'] = y_abs | |
| updated = True | |
| except ValueError: | |
| pass | |
| if updated: | |
| reconstructed_args = [] | |
| for idx, param_name in enumerate(param_names): | |
| if param_name in args: | |
| arg_value = args[param_name] | |
| if isinstance(arg_value, str): | |
| arg_repr = f"'{arg_value}'" | |
| else: | |
| arg_repr = str(arg_value) | |
| reconstructed_args.append(arg_repr) | |
| else: | |
| break | |
| used_params = set(param_names[:len(reconstructed_args)]) | |
| for kw in parsed_keywords: | |
| if kw.arg not in used_params: | |
| arg_value = args[kw.arg] | |
| if isinstance(arg_value, str): | |
| arg_repr = f"{kw.arg}='{arg_value}'" | |
| else: | |
| arg_repr = f"{kw.arg}={arg_value}" | |
| reconstructed_args.append(arg_repr) | |
| new_args_str = ', '.join(reconstructed_args) | |
| new_full_call = f"{func_name}({new_args_str})" | |
| new_code = new_code.replace(full_call, new_full_call) | |
| return new_code | |
| def parse_response_to_cot_and_action(input_string, screen_width, screen_height, coordinate_type="qwen25") -> Optional[str]: | |
| """Parse response including Observation, Thought, Action and code block""" | |
| sections = {} | |
| obs_match = re.search(r'^##\s*Observation\s*:?[\n\r]+(.*?)(?=^##\s*Thought:|^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE) | |
| if obs_match: | |
| sections['observation'] = obs_match.group(1).strip() | |
| thought_match = re.search(r'^##\s*Thought\s*:?[\n\r]+(.*?)(?=^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE) | |
| if thought_match: | |
| sections['thought'] = thought_match.group(1).strip() | |
| action_match = re.search(r'^##\s*Action\s*:?[\n\r]+(.*?)(?=^##|\Z)', input_string, re.DOTALL | re.MULTILINE) | |
| if action_match: | |
| action = action_match.group(1).strip() | |
| sections['action'] = action.strip() | |
| if "computer.wait" in input_string.lower(): | |
| code_blocks = re.findall(r'```(?:code|python)?\s*(.*?)\s*```', input_string, re.DOTALL | re.IGNORECASE) | |
| if code_blocks: | |
| code = code_blocks[-1].strip() | |
| sections['original_code'] = transform_agnet_action_to_code_block(code) | |
| sections["code"] = "WAIT" | |
| return sections | |
| elif "computer.terminate" in input_string.lower(): | |
| # Look for code blocks that might contain terminate command | |
| code_blocks = re.findall(r'```(?:code|python)?\s*(.*?)\s*```', input_string, re.DOTALL | re.IGNORECASE) | |
| if code_blocks: | |
| last_code = code_blocks[-1].strip().lower() | |
| if "fail" in last_code: | |
| sections['code'] = "FAIL" | |
| return sections | |
| elif "success" in last_code: | |
| sections['code'] = "DONE" | |
| return sections | |
| code_blocks = re.findall(r'```(?:python)\s*(.*?)\s*```', input_string, re.DOTALL) | |
| if code_blocks: | |
| code = code_blocks[-1].strip() | |
| sections['original_code'] = transform_agnet_action_to_code_block(code) | |
| corrected_code = correct_pyautogui_arguments(code) | |
| sections['code'] = corrected_code | |
| sections['code'] = project_coordinate_to_absolute_scale(corrected_code, screen_width=screen_width, screen_height=screen_height, coordinate_type=coordinate_type) | |
| if 'code' not in sections: | |
| logger.error("Missing required action or code section") | |
| sections['code'] = "FAIL" | |
| return sections | |
| return sections | |
| def rescale_bounding_boxes(bounding_boxes, original_width, original_height, scaled_width=1000, scaled_height=1000): | |
| x_scale = original_width / scaled_width | |
| y_scale = original_height / scaled_height | |
| rescaled_boxes = [] | |
| for box in bounding_boxes: | |
| xmin, ymin, xmax, ymax = box | |
| rescaled_box = [ | |
| xmin * x_scale, | |
| ymin * y_scale, | |
| xmax * x_scale, | |
| ymax * y_scale | |
| ] | |
| rescaled_boxes.append(rescaled_box) | |
| return rescaled_boxes | |
| def parse_coordinates(code): | |
| """ | |
| Parse coordinates from pyautogui code. | |
| Supports: click, moveTo, dragTo, doubleClick, middleClick, rightClick, tripleClick | |
| Returns: [x, y] or None if no coordinates found | |
| """ | |
| if not code or code in ["WAIT", "FAIL", "DONE"]: | |
| return None | |
| # List of pyautogui functions that take x, y coordinates | |
| coordinate_functions = [ | |
| 'click', 'moveTo', 'dragTo', 'doubleClick', | |
| 'middleClick', 'rightClick', 'tripleClick' | |
| ] | |
| # Pattern to match pyautogui function calls | |
| pattern = r'pyautogui\.(' + '|'.join(coordinate_functions) + r')\s*\([^)]*\)' | |
| # Find all matching function calls | |
| matches = re.findall(pattern, code) | |
| if not matches: | |
| return None | |
| # Get the first matching function call | |
| func_name = matches[0] | |
| func_pattern = rf'pyautogui\.{func_name}\s*\(([^)]*)\)' | |
| func_match = re.search(func_pattern, code) | |
| if not func_match: | |
| return None | |
| args_str = func_match.group(1) | |
| # Try to extract x and y coordinates | |
| # Method 1: Look for x=value, y=value patterns | |
| x_match = re.search(r'x\s*=\s*([\d.]+)', args_str) | |
| y_match = re.search(r'y\s*=\s*([\d.]+)', args_str) | |
| if x_match and y_match: | |
| try: | |
| x = float(x_match.group(1)) | |
| y = float(y_match.group(1)) | |
| return [x, y] | |
| except ValueError: | |
| pass | |
| # Method 2: Look for positional arguments (first two numbers) | |
| # Remove any keyword arguments first | |
| args_without_kwargs = re.sub(r'\w+\s*=\s*[^,]+', '', args_str) | |
| # Find all numbers in the remaining arguments | |
| numbers = re.findall(r'([\d.]+)', args_without_kwargs) | |
| if len(numbers) >= 2: | |
| try: | |
| x = float(numbers[0]) | |
| y = float(numbers[1]) | |
| return [x, y] | |
| except ValueError: | |
| pass | |
| return None | |
| def call_llm(payload): | |
| """Call the LLM API""" | |
| headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {os.environ['OPENCUA_API_KEY']}" | |
| } | |
| response = None | |
| for _ in range(30): | |
| response = httpx.post( | |
| os.environ['OPENCUA_URL'], | |
| headers=headers, | |
| json=payload, | |
| timeout=500, | |
| verify=False | |
| ) | |
| if response.status_code != 200: | |
| logger.error("Failed to call LLM: " + response.text) | |
| logger.error("Retrying...") | |
| time.sleep(5) | |
| else: | |
| response = response.json() | |
| finish_reason = response["choices"][0].get("finish_reason") | |
| if finish_reason is not None and finish_reason == "stop": # for most of the time, length will not exceed max_tokens | |
| return response['choices'][0]['message']['content'] | |
| else: | |
| logger.error("LLM did not finish properly, retrying...") | |
| time.sleep(5) | |
| def run_inference(image, text_input): | |
| if image is None: | |
| return "Please upload an image", "", None | |
| if not text_input: | |
| text_input = "Describe this image in detail" | |
| resized_height, resized_width = smart_resize(image.height, image.width, max_pixels=12845056) | |
| messages = create_grounding_messages(image, instruction = text_input) | |
| output_text = call_llm({ | |
| "model": "opencua", | |
| "messages": messages, | |
| "max_tokens": 2000, | |
| "top_p": 0.9, | |
| "temperature": 0 | |
| }) | |
| print(output_text) | |
| try: | |
| sections = parse_response_to_cot_and_action(output_text, resized_width, resized_height, coordinate_type="qwen25") | |
| # Parse coordinates from the code | |
| coordinates = parse_coordinates(sections.get('code', '')) | |
| if coordinates is None: | |
| # No coordinates found, return original image | |
| return output_text, "No coordinates found", image | |
| # Extract x, y from coordinates | |
| x, y = coordinates | |
| # Draw a red circle at the parsed coordinates | |
| annotated_image = draw_circle_at_point(image.copy(), x, y) | |
| return output_text, f"x: {x}, y: {y}", annotated_image | |
| except Exception as e: | |
| # 获取完整的traceback信息 | |
| tb_str = traceback.format_exc() | |
| logger.error(f"Error in run_inference: {e}\nTraceback:\n{tb_str}") | |
| return output_text, f"Error: {str(e)}\n{tb_str}", image | |
| # Load example images | |
| example_images = [ | |
| # "assets/images/example_0.png", | |
| "assets/images/example_1.jpg", | |
| "assets/images/example_2.png" | |
| ] | |
| example_prompts = [ | |
| # "Select the C9 cell", | |
| "Close the file explorer", | |
| "Click on the word 'underserved'" | |
| ] | |
| examples = [[Image.open(img), prompt] for img, prompt in zip(example_images, example_prompts)] | |
| css = """ | |
| #output { | |
| height: 500px; | |
| overflow: auto; | |
| border: 1px solid #ccc; | |
| } | |
| """ | |
| with gr.Blocks(css=css) as demo: | |
| gr.Markdown( | |
| """ | |
| # OpenCUA GUI Grounding Demo | |
| Upload a screenshot and provide a description of an element. In the demo, we use the OpenCUA-32B model for demostration. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_img = gr.Image(label="Input Image", type="pil") | |
| text_input = gr.Textbox(label="Instruction") | |
| submit_btn = gr.Button(value="Submit") | |
| with gr.Column(): | |
| model_output_text = gr.Textbox(label="Model Output", lines=5) | |
| model_output_box = gr.Textbox(label="Coordinates", lines=2) | |
| annotated_image = gr.Image(label="Annotated Image") | |
| submit_btn.click(run_inference, [input_img, text_input], [model_output_text, model_output_box, annotated_image]) | |
| # Add examples | |
| gr.Examples( | |
| examples=examples, | |
| inputs=[input_img, text_input], | |
| outputs=[model_output_text, model_output_box, annotated_image], | |
| fn=run_inference, | |
| cache_examples=True, | |
| ) | |
| demo.launch(debug=True) |