diff --git a/operate/config.py b/operate/config.py index 92f800ff..e4df0381 100644 --- a/operate/config.py +++ b/operate/config.py @@ -147,7 +147,8 @@ def validation(self, model, voice_mode): self.require_api_key( "ANTHROPIC_API_KEY", "Anthropic API key", model == "claude-3" ) - self.require_api_key("QWEN_API_KEY", "Qwen API key", model == "qwen-vl") + self.require_api_key("QWEN_API_KEY", "Qwen API key", model == "qwen-vl" + or model == "qwen-vl-with-omniparser") def require_api_key(self, key_name, key_description, is_required): key_exists = bool(os.environ.get(key_name)) diff --git a/operate/models/apis.py b/operate/models/apis.py index 14e55310..9f29c014 100644 --- a/operate/models/apis.py +++ b/operate/models/apis.py @@ -24,6 +24,7 @@ get_label_coordinates, ) from operate.utils.ocr import get_text_coordinates, get_text_element +from operate.utils.omniparser import OmniParserClient from operate.utils.screenshot import capture_screen_with_cursor, compress_screenshot from operate.utils.style import ANSI_BRIGHT_MAGENTA, ANSI_GREEN, ANSI_RED, ANSI_RESET @@ -40,6 +41,9 @@ async def get_next_action(model, messages, objective, session_id): if model == "qwen-vl": operation = await call_qwen_vl_with_ocr(messages, objective, model) return operation, None + if model == "qwen-vl-with-omniparser": + operation = await call_qwen_vl_with_ominiparser(messages, objective, model) + return operation, None if model == "gpt-4-with-som": operation = await call_gpt_4o_labeled(messages, objective, model) return operation, None @@ -678,6 +682,116 @@ async def call_gpt_4o_labeled(messages, objective, model): return call_gpt_4o(messages) +async def call_qwen_vl_with_ominiparser(messages, objective, model): + if config.verbose: + print("[call_qwen_vl_with_ominiparser]") + + try: + time.sleep(1) + client = config.initialize_qwen() + + confirm_system_prompt(messages, objective, model) + screenshots_dir = "screenshots" + if not os.path.exists(screenshots_dir): + os.makedirs(screenshots_dir) + + # Call the function to capture the screen with the cursor + raw_screenshot_filename = os.path.join(screenshots_dir, "raw_screenshot.png") + capture_screen_with_cursor(raw_screenshot_filename) + + # Use Omniparser to parse image + som_screenshot_filename = os.path.join(screenshots_dir, "som_screenshot.jpeg") + omni_parser_client = OmniParserClient(os.getenv("OMNIPARSER_BASE_URL", "http://localhost:8000")) + parsed_res = omni_parser_client.parse_screenshot(raw_screenshot_filename, som_screenshot_filename) + + if len(messages) == 1: + user_prompt = get_user_first_message_prompt() + else: + user_prompt = get_user_prompt() + + if config.verbose: + print( + "[call_qwen_vl_with_ominiparser] user_prompt", + user_prompt, + ) + + vision_message = { + "role": "user", + "content": [ + {"type": "text", "text": user_prompt}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{parsed_res['som_image_base64']}" + }, + }, + {"type": "text", "text": parsed_res["screen_info"]}, + ], + } + messages.append(vision_message) + + response = client.chat.completions.create( + model="qwen2.5-vl-72b-instruct", + messages=messages, + ) + + content = response.choices[0].message.content + + content = clean_json(content) + if config.verbose: + print( + "[call_qwen_vl_with_ominiparser] content", + content, + ) + + assistant_message = {"role": "assistant", "content": content} + messages.append(assistant_message) + content = json.loads(content) + processed_content = [] + + for operation in content: + print( + "[call_qwen_vl_with_ominiparser] for operation in content", + operation, + ) + if operation.get("operation") == "click": + box_id = operation.get("id") + if config.verbose: + print( + "[Self Operating Computer][call_gpt_4_vision_preview_labeled] box_id", + box_id, + ) + + x_percent, y_percent = OmniParserClient.get_click_position(int(box_id), parsed_res["parsed_content_list"]) + operation["x"] = x_percent + operation["y"] = y_percent + if config.verbose: + print( + "[Self Operating Computer][call_qwen_vl_with_ominiparser] new click operation", + operation, + ) + processed_content.append(operation) + else: + if config.verbose: + print( + "[Self Operating Computer][call_qwen_vl_with_ominiparser] .append none click operation", + operation, + ) + + processed_content.append(operation) + + return processed_content + + except Exception as e: + print( + f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_BRIGHT_MAGENTA}[{model}] That did not work. Trying another method {ANSI_RESET}" + ) + if config.verbose: + print("[Self-Operating Computer][Operate] error", e) + traceback.print_exc() + return call_gpt_4o(messages) + + def call_ollama_llava(messages): if config.verbose: print("[call_ollama_llava]") diff --git a/operate/models/prompts.py b/operate/models/prompts.py index 1acbe14f..57096ccc 100644 --- a/operate/models/prompts.py +++ b/operate/models/prompts.py @@ -128,6 +128,68 @@ """ +SYSTEM_PROMPT_OMNIPARSER = """ +You are operating a {operating_system} computer, using the same operating system as a human. + +From looking at the screen, the objective, and your previous actions, take the next best series of action. + +You have 4 possible operation actions available to you. The `pyautogui` library will be used to execute your decision. Your output will be used in a `json.loads` loads statement. + +1. click - Move mouse and click - We labeled the clickable elements with colored bounding boxes and IDs. Label IDs are in the following format with `x` being a number: `~x` +``` +[{{ "thought": "write a thought here", "operation": "click", "id": "x (e.g. 10)" }}] # 'id' refers to the ID of the colored box +``` +2. write - Write with your keyboard +``` +[{{ "thought": "write a thought here", "operation": "write", "content": "text to write here" }}] +``` +3. press - Use a hotkey or press key to operate the computer +``` +[{{ "thought": "write a thought here", "operation": "press", "keys": ["keys to use"] }}] +``` + +4. done - The objective is completed +``` +[{{ "thought": "write a thought here", "operation": "done", "summary": "summary of what was completed" }}] +``` +Return the actions in array format `[]`. You can take just one action or multiple actions. + +Here a helpful example: + +Example 1: Searches for Google Chrome on the OS and opens it +``` +[ + {{ "thought": "Searching the operating system to find Google Chrome because it appears I am currently in terminal", "operation": "press", "keys": {os_search_str} }}, + {{ "thought": "Now I need to write 'Google Chrome' as a next step", "operation": "write", "content": "Google Chrome" }}, +] +``` + +Example 2: Focuses on the address bar in a browser before typing a website +``` +[ + {{ "thought": "I'll focus on the address bar in the browser. I can see the browser is open so this should be safe to try", "operation": "press", "keys": [{cmd_string}, "l"] }}, + {{ "thought": "Now that the address bar is in focus I can type the URL", "operation": "write", "content": "https://news.ycombinator.com/" }}, + {{ "thought": "I'll need to press enter to go the URL now", "operation": "press", "keys": ["enter"] }} +] +``` + +Example 3: Send a "Hello World" message in the chat +``` +[ + {{ "thought": "I see a messsage field on this page near the button. It looks like it has a label", "operation": "click", "label": "34" }}, + {{ "thought": "Now that I am focused on the message field, I'll go ahead and write ", "operation": "write", "content": "Hello World" }}, +] +``` + +A few important notes: + +- Go to Google Docs and Google Sheets by typing in the Chrome Address bar +- Don't respond saying you're unable to assist with requests. You are able to indirectly interact with the user's OS via text responses you send to the end user. + +Objective: {objective} +""" + + # TODO: Add an example or instruction about `Action: press ['pagedown']` to scroll SYSTEM_PROMPT_OCR = """ You are operating a {operating_system} computer, using the same operating system as a human. @@ -232,6 +294,13 @@ def get_system_prompt(model, objective): os_search_str=os_search_str, operating_system=operating_system, ) + elif model == "qwen-vl-with-omniparser": + prompt = SYSTEM_PROMPT_OMNIPARSER.format( + objective=objective, + cmd_string=cmd_string, + os_search_str=os_search_str, + operating_system=operating_system, + ) elif model == "gpt-4-with-ocr" or model == "o1-with-ocr" or model == "claude-3" or model == "qwen-vl": prompt = SYSTEM_PROMPT_OCR.format( diff --git a/operate/utils/omniparser.py b/operate/utils/omniparser.py new file mode 100644 index 00000000..238b9d3c --- /dev/null +++ b/operate/utils/omniparser.py @@ -0,0 +1,68 @@ +import requests +import base64 + + +def reformat_messages(response_json: dict): + """ + example of a screen_info: + ID: 1, Text: xlt + ID: 2, Text: 4t8 + ID: 3, Text: Rt + ID: 4, Text: BA + ID: 5, Text: #B + ID: 6, Text: 16.04 + ID: 7, Text: YouTube + ID: 8, Text: youtube.com + """ + screen_info = "" + for idx, element in enumerate(response_json["parsed_content_list"]): + element['idx'] = idx + if element['type'] == 'text': + screen_info += f'ID: {idx}, Text: {element["content"]}\n' + elif element['type'] == 'icon': + screen_info += f'ID: {idx}, Icon: {element["content"]}\n' + response_json['screen_info'] = screen_info + return response_json + + +class OmniParserClient: + def __init__(self, url: str) -> None: + self.url = url + + def parse_screenshot(self, raw_screenshot_filename: str, som_screenshot_filename: str): + with open(raw_screenshot_filename, "rb") as image_file: + image_base64 = base64.b64encode(image_file.read()).decode("utf-8") + response = requests.post(f"{self.url}/parse/", json={"base64_image": image_base64}) + response.raise_for_status() + response_json = response.json() + print('omniparser latency:', response_json['latency']) + + som_image_data = base64.b64decode(response_json['som_image_base64']) + with open(som_screenshot_filename, "wb") as f: + f.write(som_image_data) + + response_json['raw_screenshot_base64'] = image_base64 + response_json = reformat_messages(response_json) + return response_json + + @staticmethod + def get_click_position(box_id, parsed_contents: list[dict]) -> tuple[str, str]: + """ + example of a parsed content: + { + "type": "text", + "bbox": [ + 0.01778179593384266, // min_x + 0.024020226672291756, // max_x + 0.3725135624408722, // min_y + 0.06510745733976364 // max_y + ], + "interactivity": false, + "content": "OmniParser for Pure Vision Based General GUI Agent", + "source": "box_ocr_content_ocr" + } + """ + bbox = parsed_contents[box_id]["bbox"] + x = (bbox[0] + bbox[2]) / 2 + y = (bbox[1] + bbox[3]) / 2 + return f"{x:.2f}", f"{y:.2f}"