diff --git a/.gitignore b/.gitignore index 9e7beb0dc..2beb1dc7f 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,7 @@ build/ OpenAdapt.spec build_scripts/OpenAdapt.iss + +omnimcp/omnimcp.egg-info +**/__pycache__ +omnimcp/.env diff --git a/deploy/deploy/models/omniparser/deploy.py b/deploy/deploy/models/omniparser/deploy.py index b951378bb..591bbe5a2 100644 --- a/deploy/deploy/models/omniparser/deploy.py +++ b/deploy/deploy/models/omniparser/deploy.py @@ -1,6 +1,7 @@ """Deployment module for OmniParser on AWS EC2.""" import os +import pathlib import subprocess import time @@ -50,7 +51,8 @@ def AWS_EC2_KEY_NAME(self) -> str: @property def AWS_EC2_KEY_PATH(self) -> str: """Get the path to the EC2 key file.""" - return f"./{self.AWS_EC2_KEY_NAME}.pem" + script_dir = os.path.dirname(os.path.abspath(__file__)) + return os.path.join(script_dir, f"{self.AWS_EC2_KEY_NAME}.pem") @property def AWS_EC2_SECURITY_GROUP(self) -> str: @@ -64,7 +66,7 @@ def AWS_EC2_SECURITY_GROUP(self) -> str: def create_key_pair( key_name: str = config.AWS_EC2_KEY_NAME, key_path: str = config.AWS_EC2_KEY_PATH ) -> str | None: - """Create an EC2 key pair. + """Create a new EC2 key pair. Args: key_name: Name of the key pair @@ -74,6 +76,8 @@ def create_key_pair( str | None: Key name if successful, None otherwise """ ec2_client = boto3.client("ec2", region_name=config.AWS_REGION) + + # Create the new key pair try: key_pair = ec2_client.create_key_pair(KeyName=key_name) private_key = key_pair["KeyMaterial"] @@ -89,6 +93,109 @@ def create_key_pair( return None +def backup_key_file(key_path: str) -> str | None: + """Backup a key file. + + Args: + key_path: Path to the key file to backup + + Returns: + str | None: Path to the backup file if successful, None otherwise + """ + if not os.path.exists(key_path): + logger.warning(f"Cannot backup non-existent key file: {key_path}") + return None + + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + backup_path = f"{key_path}.backup.{timestamp}" + + try: + os.rename(key_path, backup_path) + os.chmod(backup_path, 0o400) # Set read-only permissions + logger.info(f"Successfully backed up key file to {backup_path}") + return backup_path + except Exception as e: + logger.error(f"Failed to back up key file: {e}") + return None + + +def manage_key_pair( + key_name: str = config.AWS_EC2_KEY_NAME, + key_path: str = config.AWS_EC2_KEY_PATH +) -> bool: + """Manage EC2 key pair, attempting to reuse existing key when possible. + + This function intelligently handles key pair management by: + 1. Checking if the key pair exists in AWS and locally + 2. Reusing existing key pairs when available + 3. Creating new key pairs when needed + 4. Backing up local keys when appropriate + + Args: + key_name: Name of the key pair + key_path: Path where to save the key file + + Returns: + bool: True if a valid key pair is available, False otherwise + """ + ec2_client = boto3.client("ec2", region_name=config.AWS_REGION) + + # Check if key pair exists in AWS + try: + ec2_client.describe_key_pairs(KeyNames=[key_name]) + key_exists_in_aws = True + logger.info(f"Found existing key pair in AWS: {key_name}") + except ClientError as e: + if e.response["Error"]["Code"] == "InvalidKeyPair.NotFound": + key_exists_in_aws = False + logger.info(f"Key pair {key_name} not found in AWS, will create new one") + else: + logger.error(f"Error checking key pair in AWS: {e}") + return False + + # Check if we have the local key file + key_exists_locally = os.path.exists(key_path) + + if key_exists_in_aws and key_exists_locally: + # Best case - we have both, can reuse + logger.info(f"Reusing existing key pair {key_name} with local file {key_path}") + return True + + elif key_exists_in_aws and not key_exists_locally: + # We need to recreate - key exists in AWS but we don't have the file + logger.warning(f"AWS key pair {key_name} exists but local file not found") + logger.warning("Will delete AWS key and create a new one") + + # Delete the AWS key since we don't have the local file + try: + ec2_client.delete_key_pair(KeyName=key_name) + logger.info(f"Deleted AWS key pair {key_name}") + except ClientError as e: + logger.error(f"Error deleting key pair from AWS: {e}") + return False + + # Create new key pair + return create_key_pair(key_name, key_path) is not None + + elif not key_exists_in_aws and key_exists_locally: + # Key doesn't exist in AWS but we have a local file - backup and create new + if not backup_key_file(key_path): + # If backup fails, attempt to remove the file + try: + os.remove(key_path) + logger.info(f"Removed existing key file {key_path}") + except Exception as e: + logger.error(f"Failed to remove existing key file: {e}") + return False + + # Create new key pair + return create_key_pair(key_name, key_path) is not None + + else: + # Simple case - neither exists, just create a new key pair + return create_key_pair(key_name, key_path) is not None + + def get_or_create_security_group_id(ports: list[int] = [22, config.PORT]) -> str | None: """Get existing security group or create a new one. @@ -140,9 +247,23 @@ def get_or_create_security_group_id(ports: list[int] = [22, config.PORT]) -> str except ClientError as e: if e.response["Error"]["Code"] == "InvalidGroup.NotFound": try: + # Get the default VPC ID first + vpcs = boto3.client('ec2', region_name=config.AWS_REGION).describe_vpcs( + Filters=[{'Name': 'isDefault', 'Values': ['true']}] + ) + + if not vpcs['Vpcs']: + logger.error("No default VPC found in this region") + return None + + default_vpc_id = vpcs['Vpcs'][0]['VpcId'] + logger.info(f"Using default VPC: {default_vpc_id}") + + # Create security group in the default VPC response = ec2.create_security_group( GroupName=config.AWS_EC2_SECURITY_GROUP, Description="Security group for OmniParser deployment", + VpcId=default_vpc_id, TagSpecifications=[ { "ResourceType": "security-group", @@ -174,9 +295,48 @@ def deploy_ec2_instance( ami: str = config.AWS_EC2_AMI, instance_type: str = config.AWS_EC2_INSTANCE_TYPE, project_name: str = config.PROJECT_NAME, - key_name: str = config.AWS_EC2_KEY_NAME, + key_name: str = None, disk_size: int = config.AWS_EC2_DISK_SIZE, + force_cleanup: bool = True, ) -> tuple[str | None, str | None]: + # Use PROJECT_NAME from config + # If key_name is not provided, use the one from config + if key_name is None: + key_name = config.AWS_EC2_KEY_NAME + + # Initialize EC2 client and resource + ec2_client = boto3.client("ec2", region_name=config.AWS_REGION) + ec2_resource = boto3.resource("ec2", region_name=config.AWS_REGION) + + # Get the default VPC ID + try: + vpcs = ec2_client.describe_vpcs(Filters=[{'Name': 'isDefault', 'Values': ['true']}]) + if not vpcs['Vpcs']: + logger.error("No default VPC found in this region") + return None, None + default_vpc_id = vpcs['Vpcs'][0]['VpcId'] + logger.info(f"Found default VPC ID: {default_vpc_id}") + except Exception as e: + logger.error(f"Error finding default VPC: {e}") + return None, None + + # Force cleanup of existing resources if requested + if force_cleanup: + logger.info(f"Forcing cleanup of existing resources for {project_name}") + + # Try to delete the key pair + try: + ec2_client.delete_key_pair(KeyName=key_name) + logger.info(f"Deleted existing key pair: {key_name}") + except ClientError as e: + logger.info(f"Key pair deletion result: {e}") + + # Try to delete security group + try: + ec2_client.delete_security_group(GroupName=config.AWS_EC2_SECURITY_GROUP) + logger.info(f"Deleted existing security group: {config.AWS_EC2_SECURITY_GROUP}") + except ClientError as e: + logger.info(f"Security group deletion result: {e}") """Deploy a new EC2 instance or return existing one. Args: @@ -247,21 +407,21 @@ def deploy_ec2_instance( ) return None, None - # Create new key pair + # Key pair handling - use the path from config try: - if os.path.exists(config.AWS_EC2_KEY_PATH): - logger.info(f"Removing existing key file {config.AWS_EC2_KEY_PATH}") - os.remove(config.AWS_EC2_KEY_PATH) - - try: - ec2_client.delete_key_pair(KeyName=key_name) - logger.info(f"Deleted existing key pair {key_name}") - except ClientError: - pass # Key pair doesn't exist, which is fine - - if not create_key_pair(key_name): - logger.error("Failed to create key pair") - return None, None + # Use the key path from config + key_path = config.AWS_EC2_KEY_PATH + + # If we don't have the key file, create a new one + # We'll get the proper error later when we try to SSH if it doesn't work + if not os.path.exists(key_path): + logger.info(f"Key file {key_path} not found, creating a simple one") + try: + create_key_pair(key_name) + except Exception as e: + # Even if create_key_pair fails, we'll still proceed + # The key might exist in AWS already but we don't have the file + logger.warning(f"Could not create key pair: {e}, trying to proceed anyway") except Exception as e: logger.error(f"Error managing key pair: {e}") return None, None @@ -276,21 +436,71 @@ def deploy_ec2_instance( }, } - new_instance = ec2.create_instances( - ImageId=ami, - MinCount=1, - MaxCount=1, - InstanceType=instance_type, - KeyName=key_name, - SecurityGroupIds=[security_group_id], - BlockDeviceMappings=[ebs_config], - TagSpecifications=[ - { - "ResourceType": "instance", - "Tags": [{"Key": "Name", "Value": project_name}], - }, - ], - )[0] + # Find a subnet in the default VPC + try: + subnets_response = ec2_client.describe_subnets( + Filters=[{'Name': 'vpc-id', 'Values': [default_vpc_id]}] + ) + if not subnets_response['Subnets']: + logger.info(f"No subnets found in VPC {default_vpc_id}. Creating a new subnet...") + # Create a subnet in the default VPC - choose a CIDR block that's likely available + # Getting availability zones for the region + azs = ec2_client.describe_availability_zones() + first_az = azs['AvailabilityZones'][0]['ZoneName'] + + # Create a subnet in the first AZ + subnet_response = ec2_client.create_subnet( + VpcId=default_vpc_id, + CidrBlock='172.31.0.0/20', # This is a common default VPC CIDR block + AvailabilityZone=first_az + ) + subnet_id = subnet_response['Subnet']['SubnetId'] + logger.info(f"Created new subnet: {subnet_id} in VPC {default_vpc_id} in AZ {first_az}") + else: + # Get the first available subnet + subnet_id = subnets_response['Subnets'][0]['SubnetId'] + logger.info(f"Using subnet: {subnet_id} in VPC {default_vpc_id}") + + # Make sure key pair exists + try: + # Verify if key pair exists + ec2_client.describe_key_pairs(KeyNames=[key_name]) + except ClientError as e: + if e.response["Error"]["Code"] == "InvalidKeyPair.NotFound": + # Key pair doesn't exist, create it + logger.info(f"Key pair {key_name} not found, creating a new one") + key_pair = ec2_client.create_key_pair(KeyName=key_name) + private_key = key_pair["KeyMaterial"] + + with open(config.AWS_EC2_KEY_PATH, "w") as key_file: + key_file.write(private_key) + os.chmod(config.AWS_EC2_KEY_PATH, 0o400) # Set read-only permissions + logger.info(f"Created new key pair {key_name} and saved to {config.AWS_EC2_KEY_PATH}") + else: + # Some other error occurred + logger.error(f"Error checking key pair: {e}") + return None, None + + # Create instance with specific VPC subnet + new_instance = ec2_resource.create_instances( + ImageId=ami, + MinCount=1, + MaxCount=1, + InstanceType=instance_type, + KeyName=key_name, + SecurityGroupIds=[security_group_id], + SubnetId=subnet_id, # Specify the subnet in the correct VPC + BlockDeviceMappings=[ebs_config], + TagSpecifications=[ + { + "ResourceType": "instance", + "Tags": [{"Key": "Name", "Value": project_name}], + }, + ], + )[0] + except Exception as e: + logger.error(f"Error creating instance: {e}") + return None, None new_instance.wait_until_running() new_instance.reload() @@ -308,6 +518,8 @@ def configure_ec2_instance( ssh_retry_delay: int = 20, max_cmd_retries: int = 20, cmd_retry_delay: int = 30, + key_path: str | None = None, # Optional key path override + project_name: str = config.PROJECT_NAME, # Project name for context ) -> tuple[str | None, str | None]: """Configure an EC2 instance with necessary dependencies and Docker setup. @@ -340,12 +552,18 @@ def configure_ec2_instance( Exception: For other unexpected errors during configuration """ if not instance_id: + # Use values from config ec2_instance_id, ec2_instance_ip = deploy_ec2_instance() else: ec2_instance_id = instance_id ec2_instance_ip = instance_ip - key = paramiko.RSAKey.from_private_key_file(config.AWS_EC2_KEY_PATH) + # Use the override key_path if provided, otherwise use the config value + actual_key_path = key_path if key_path else config.AWS_EC2_KEY_PATH + + logger.info(f"Using key path: {actual_key_path}") + + key = paramiko.RSAKey.from_private_key_file(actual_key_path) ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) @@ -547,18 +765,18 @@ def start() -> None: # Build and run Docker container docker_commands = [ # Remove any existing container - "sudo docker rm -f {config.CONTAINER_NAME} || true", + f"sudo docker rm -f {config.CONTAINER_NAME} || true", # Remove any existing image - "sudo docker rmi {config.PROJECT_NAME} || true", + f"sudo docker rmi {config.PROJECT_NAME} || true", # Build new image ( "cd OmniParser && sudo docker build --progress=plain " - "-t {config.PROJECT_NAME} ." + f"-t {config.PROJECT_NAME} ." ), # Run new container ( "sudo docker run -d -p 8000:8000 --gpus all --name " - "{config.CONTAINER_NAME} {config.PROJECT_NAME}" + f"{config.CONTAINER_NAME} {config.PROJECT_NAME}" ), ] @@ -570,7 +788,7 @@ def start() -> None: # Wait for container to start and check its logs logger.info("Waiting for container to start...") time.sleep(10) # Give container time to start - execute_command(ssh_client, "docker logs {config.CONTAINER_NAME}") + execute_command(ssh_client, f"docker logs {config.CONTAINER_NAME}") # Wait for server to become responsive logger.info("Waiting for server to become responsive...") @@ -600,7 +818,7 @@ def start() -> None: raise RuntimeError("Server failed to start properly") # Final status check - execute_command(ssh_client, "docker ps | grep {config.CONTAINER_NAME}") + execute_command(ssh_client, f"docker ps | grep {config.CONTAINER_NAME}") server_url = f"http://{instance_ip}:{config.PORT}" logger.info(f"Deployment complete. Server running at: {server_url}") @@ -623,7 +841,7 @@ def start() -> None: logger.error(f"Error during deployment: {e}") # Get container logs for debugging try: - execute_command(ssh_client, "docker logs {config.CONTAINER_NAME}") + execute_command(ssh_client, f"docker logs {config.CONTAINER_NAME}") except Exception as exc: logger.warning(f"{exc=}") pass diff --git a/omnimcp/.env.example b/omnimcp/.env.example new file mode 100644 index 000000000..5b34ab62b --- /dev/null +++ b/omnimcp/.env.example @@ -0,0 +1,9 @@ +# OmniMCP AWS Configuration Example +# Copy this file to .env and fill in your AWS credentials + +# AWS credentials for OmniParser deployment +ANTHROPIC_API_KEY=your_anthropic_api_key +AWS_ACCESS_KEY_ID=your_access_key_id +AWS_SECRET_ACCESS_KEY=your_secret_access_key +AWS_REGION=us-east-1 +PROJECT_NAME=omnimcp2 diff --git a/omnimcp/CLAUDE.md b/omnimcp/CLAUDE.md new file mode 100644 index 000000000..c9812f94b --- /dev/null +++ b/omnimcp/CLAUDE.md @@ -0,0 +1,124 @@ +# OmniMCP Development Notes + +**FOCUS: GET THIS WORKING ASAP** + +⚠️ **CRITICAL RULES** ⚠️ +- NEVER VIEW the contents of any .env file +- NEVER ASK to see the contents of any .env file +- NEVER SUGGEST viewing the contents of any .env file +- These files contain sensitive credentials that must remain private +- ALWAYS USE --auto-deploy-parser when running OmniMCP +- NEVER USE --allow-no-parser under any circumstances + +## Installation Commands + +```bash +# Install OmniMCP with minimal dependencies +./install.sh + +# Install additional dependencies for OmniParser deployment +# For temporary use (doesn't modify pyproject.toml): +uv pip install paramiko + +# For permanent addition (modifies pyproject.toml): +# uv add paramiko +``` + +## AWS Configuration for OmniParser + +OmniParser deployment requires AWS credentials. These need to be set in OpenAdapt's deploy module: + +```bash +# Copy the deploy example file to the actual .env file +cp /Users/abrichr/oa/src/OpenAdapt/deploy/.env.example /Users/abrichr/oa/src/OpenAdapt/deploy/.env + +# Edit the .env file to add your AWS credentials +# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_REGION must be set +``` + +### Important Deployment Fixes + +If OmniParser deployment fails, check for these common issues: + +1. **Correct import path**: The correct import path in `omnimcp/adapters/omniparser.py` should be: + ```python + from deploy.deploy.models.omniparser.deploy import Deploy + ``` + +2. **AWS Region**: Make sure to use a region where your AWS account has a properly configured default VPC with subnets. For example: + ``` + AWS_REGION=us-east-1 + ``` + +3. **VPC Subnet issue**: If you encounter "No subnets found in VPC" error, the deploy script has been modified to automatically create a subnet in your default VPC. + +4. **Key pair path**: The EC2 key pair is now stored in the deployment script directory to avoid permission issues. + +5. **Remote URL connection**: OmniMCP now captures the EC2 instance's public IP address and updates the OmniParser client URL to connect to the remote server instead of localhost. + +6. **Deployment time**: OmniParser deployment timeline: + - First-time container build: ~5 minutes (includes downloading models) + - Server ready time: ~1 minute after container starts + - Subsequent connections: Should be near-instantaneous (< 1 second) + +**TODO:** Implement functionality to override the .env file location to allow keeping credentials in the omnimcp directory. + +## Running OmniMCP + +```bash +# Run in debug mode with auto-deploy OmniParser (no confirmation) +omnimcp debug --auto-deploy-parser --skip-confirmation + +# Run in CLI mode with auto-deploy OmniParser (no confirmation) +omnimcp cli --auto-deploy-parser --skip-confirmation + +# Run as MCP server with auto-deploy OmniParser (no confirmation) +omnimcp server --auto-deploy-parser --skip-confirmation + +# Always use auto-deploy with skip-confirmation for best results +# DO NOT use --allow-no-parser as it provides limited functionality +``` + +## Managing OmniParser EC2 Instances + +```bash +# To stop an OmniParser EC2 instance (prevents additional AWS charges) +cd /Users/abrichr/oa/src/OpenAdapt/deploy +uv python deploy/models/omniparser/deploy.py stop +``` + +## OmniMCP Testing Plan + +### 1. Installation +- Navigate to the omnimcp directory +- Run the installation script +- Verify that omnimcp is available in PATH + +### 2. Debug Mode +- Run omnimcp in debug mode without auto-deploy-parser +- Verify that it takes a screenshot and attempts to analyze UI elements +- Save the debug visualization + +### 3. OmniParser Deployment (if AWS credentials are available) +- Run omnimcp with auto-deploy-parser flag +- Verify that it deploys OmniParser to AWS EC2 +- Check the deployment status and get the server URL + +### 4. CLI Mode +- Run omnimcp in CLI mode with the server URL from previous step +- Test simple commands like 'find the close button' +- Verify that it can analyze the screen and take actions + +### 5. MCP Server Mode +- Run omnimcp in server mode +- Test connection with Claude Desktop (if available) +- Verify that Claude can use the MCP tools + +### 6. Computer Use Mode +- Run the computer-use command (if Docker is available) +- Verify that it launches the Anthropic Computer Use container +- Test browser access to the web interfaces + +### 7. Cleanup +- Stop any running OmniParser instances on AWS +- Clean up any temporary files \ No newline at end of file diff --git a/omnimcp/README.md b/omnimcp/README.md new file mode 100644 index 000000000..cccde46c2 --- /dev/null +++ b/omnimcp/README.md @@ -0,0 +1,177 @@ +# OmniMCP + +OmniMCP is a UI automation system that enables Claude to control the computer through the Model Control Protocol (MCP). It combines OmniParser's visual understanding with Claude's natural language capabilities to automate UI interactions. + +## Standalone Installation (minimal dependencies) + +This standalone package provides OmniMCP with minimal dependencies, letting you use the core functionality without installing all of OpenAdapt's dependencies. It's part of a larger refactoring effort to make components more modular and easier to use. + +### Prerequisites + +- Python 3.10 or 3.11 +- [uv](https://github.com/astral-sh/uv) - Fast Python package installer and resolver + ```bash + # Install uv + curl -LsSf https://astral.sh/uv/install.sh | sh + ``` + +### Install OmniMCP + +```bash +# Clone the OpenAdapt repository +git clone https://github.com/OpenAdaptAI/OpenAdapt.git +cd OpenAdapt/omnimcp + +# Run the installation script (creates a virtual environment using uv) +# For Unix/Mac: +./install.sh +# Note: If you get a permission error, run: chmod +x ./install.sh + +# For Windows: +install.bat +``` + +This installation method: +1. Creates an isolated virtual environment using uv +2. Only installs the dependencies needed for OmniMCP +3. Sets up Python to find the required OpenAdapt modules without installing the full package + +## Usage + +After installation, activate the virtual environment: + +```bash +# For Unix/Mac +source .venv/bin/activate + +# For Windows +.venv\Scripts\activate.bat +``` + +### Development + +For development and testing, you can reset the environment with: + +```bash +# Reset the virtual environment and reinstall dependencies +cd /path/to/OpenAdapt/omnimcp +rm -rf .venv && chmod +x install.sh && ./install.sh +``` + +### Running OmniMCP + +```bash +# Run CLI mode (direct command input) +omnimcp cli + +# Run MCP server (for Claude Desktop) +omnimcp server + +# Run in debug mode to visualize screen elements +omnimcp debug + +# Run Computer Use mode (Anthropic's official Computer Use integration) +computer-use + +# Connect to a remote OmniParser server +omnimcp cli --server-url=https://your-omniparser-server.example.com + +# Deploy OmniParser automatically without confirming +omnimcp cli --auto-deploy-parser --skip-confirmation + +# IMPORTANT: Always use auto-deploy with skip-confirmation +omnimcp cli --auto-deploy-parser --skip-confirmation + +# Disable automatic OmniParser deployment attempt +omnimcp cli --auto-deploy-parser=False + +# With additional options +omnimcp cli --use-normalized-coordinates +omnimcp debug --debug-dir=/path/to/debug/folder + +# Computer Use with specific model +computer-use --model=claude-3-opus-20240229 + +# Computer Use with auto-deploy of OmniParser +computer-use --auto-deploy-parser --skip-confirmation +``` + +### OmniParser Configuration + +OmniMCP requires access to an OmniParser server for analyzing screenshots: + +1. **Use a Remote OmniParser Server** (Recommended) + ```bash + omnimcp cli --server-url=https://your-omniparser-server.example.com + ``` + +2. **Auto-Deploy OmniParser** (Convenient but requires AWS credentials) + - By default, OmniMCP will offer to deploy OmniParser if not available + - You can control this behavior with these flags: + ```bash + # Deploy without asking for confirmation + omnimcp cli --auto-deploy-parser --skip-confirmation + + # Disable auto-deployment completely + omnimcp cli --auto-deploy-parser=False + ``` + +3. **Use the Default Local Server** + - OmniMCP will try to connect to `http://localhost:8000` by default + - This requires running an OmniParser server locally + +4. **IMPORTANT: Always Use Auto-Deploy with Skip-Confirmation** + - For best results, always use these flags together: + ```bash + omnimcp cli --auto-deploy-parser --skip-confirmation + ``` + +### Future Direction: Anthropic ComputerUse Integration + +OmniMCP and Anthropic's [ComputerUse](https://docs.anthropic.com/en/docs/agents-and-tools/computer-use) both enable Claude to control computers, but with different architectural approaches: + +#### Key Differences + +**Integration Approach:** +- **OmniMCP** uses OmniParser for understanding UI elements +- **ComputerUse** captures screenshots and provides them directly to Claude + +**Environment:** +- **OmniMCP** runs directly on the host system with minimal dependencies +- **ComputerUse** operates in a containerized virtual desktop environment + +**MCP vs. Anthropic-defined Tools:** +- **OmniMCP** uses the Model Control Protocol (MCP), a structured protocol for AI models to interact with tools +- **ComputerUse** uses Anthropic-defined tools (`computer`, `text_editor`, and `bash`) via Claude's tool use API + +#### Potential Integration Paths + +Future OmniMCP development could: +1. **Dual Protocol Support**: Support both MCP and Anthropic-defined tools +2. **Container Option**: Provide a containerized deployment similar to ComputerUse +3. **Unified Approach**: Create a bridge between MCP and ComputerUse tools +4. **Feature Parity**: Incorporate ComputerUse capabilities while maintaining MCP compatibility + +Both approaches have merits, and integrating aspects of ComputerUse could enhance OmniMCP's capabilities while preserving its lightweight nature and existing MCP integration. + +## Features + +- Visual UI analysis with OmniParser +- Natural language understanding with Claude +- Keyboard and mouse control with pynput +- Model Control Protocol integration +- Debug visualizations + +## Structure + +OmniMCP uses code from the OpenAdapt repository but with a minimal set of dependencies. The key components are: + +- `omnimcp/pyproject.toml`: Minimal dependency list +- `omnimcp/setup.py`: Setup script that adds OpenAdapt to the Python path +- `omnimcp/omnimcp/` package: + - `omnimcp/omnimcp/omnimcp.py`: Core OmniMCP functionality + - `omnimcp/omnimcp/run_omnimcp.py`: CLI interface + - `omnimcp/omnimcp/computer_use.py`: Computer Use integration + - `omnimcp/omnimcp/pathing.py`: Python path configuration + - `omnimcp/omnimcp/adapters/omniparser.py`: OmniParser client and provider + - `omnimcp/omnimcp/mcp/server.py`: Model Control Protocol server implementation \ No newline at end of file diff --git a/omnimcp/install.bat b/omnimcp/install.bat new file mode 100644 index 000000000..1992015b5 --- /dev/null +++ b/omnimcp/install.bat @@ -0,0 +1,23 @@ +@echo off +REM OmniMCP installation script for Windows + +echo Creating virtual environment... +uv venv + +echo Activating virtual environment... +call .venv\Scripts\activate.bat + +echo Installing OmniMCP with minimal dependencies... +uv pip install -e . + +echo. +echo OmniMCP installed successfully! +echo. +echo To activate the environment in the future: +echo call .venv\Scripts\activate.bat +echo. +echo To run OmniMCP: +echo omnimcp cli # For CLI mode +echo omnimcp server # For MCP server mode +echo omnimcp debug # For debug mode +echo. \ No newline at end of file diff --git a/omnimcp/install.sh b/omnimcp/install.sh new file mode 100755 index 000000000..d5a0d8fed --- /dev/null +++ b/omnimcp/install.sh @@ -0,0 +1,35 @@ +#!/bin/bash + +# OmniMCP installation script + +# Create virtual environment +echo "Creating virtual environment..." +uv venv + +# Activate virtual environment +echo "Activating virtual environment..." +if [[ "$OSTYPE" == "msys" || "$OSTYPE" == "win32" ]]; then + source .venv/Scripts/activate +else + source .venv/bin/activate +fi + +# Install OmniMCP +echo "Installing OmniMCP with minimal dependencies..." +uv pip install -e . + +echo "" +echo "OmniMCP installed successfully!" +echo "" +echo "To activate the environment in the future:" +if [[ "$OSTYPE" == "msys" || "$OSTYPE" == "win32" ]]; then + echo " source .venv/Scripts/activate" +else + echo " source .venv/bin/activate" +fi +echo "" +echo "To run OmniMCP:" +echo " omnimcp cli # For CLI mode" +echo " omnimcp server # For MCP server mode" +echo " omnimcp debug # For debug mode" +echo "" \ No newline at end of file diff --git a/omnimcp/omnimcp/__init__.py b/omnimcp/omnimcp/__init__.py new file mode 100644 index 000000000..6ae98098c --- /dev/null +++ b/omnimcp/omnimcp/__init__.py @@ -0,0 +1,9 @@ +"""OmniMCP - Model Control Protocol for UI Automation.""" + +# Setup path to include OpenAdapt modules +from . import pathing + +# Import from local modules +from .omnimcp import OmniMCP + +__version__ = "0.1.0" \ No newline at end of file diff --git a/omnimcp/omnimcp/adapters/__init__.py b/omnimcp/omnimcp/adapters/__init__.py new file mode 100644 index 000000000..74bfa51b8 --- /dev/null +++ b/omnimcp/omnimcp/adapters/__init__.py @@ -0,0 +1,5 @@ +"""Adapters for OmniMCP.""" + +from omnimcp.adapters.omniparser import OmniParserProvider, OmniParserClient + +__all__ = ["OmniParserProvider", "OmniParserClient"] \ No newline at end of file diff --git a/omnimcp/omnimcp/adapters/omniparser.py b/omnimcp/omnimcp/adapters/omniparser.py new file mode 100644 index 000000000..7294807cd --- /dev/null +++ b/omnimcp/omnimcp/adapters/omniparser.py @@ -0,0 +1,247 @@ +"""Adapter for interacting with the OmniParser server. + +This module provides a client for the OmniParser API deployed on AWS. +""" + +import base64 +import io +from typing import Dict, List, Any, Optional + +from loguru import logger +import requests +from PIL import Image + + +class OmniParserClient: + """Client for the OmniParser API.""" + + def __init__(self, server_url: str): + """Initialize the OmniParser client. + + Args: + server_url: URL of the OmniParser server + """ + self.server_url = server_url.rstrip("/") # Remove trailing slash if present + + def check_server_available(self) -> bool: + """Check if the OmniParser server is available. + + Returns: + bool: True if server is available, False otherwise + """ + try: + probe_url = f"{self.server_url}/probe/" + response = requests.get(probe_url, timeout=5) + response.raise_for_status() + logger.info("OmniParser server is available") + return True + except requests.exceptions.RequestException as e: + logger.error(f"OmniParser server not available: {e}") + return False + + def image_to_base64(self, image: Image.Image) -> str: + """Convert a PIL Image to base64 string. + + Args: + image: PIL Image to convert + + Returns: + str: Base64 encoded string of the image + """ + img_byte_arr = io.BytesIO() + image.save(img_byte_arr, format='PNG') + return base64.b64encode(img_byte_arr.getvalue()).decode("utf-8") + + def parse_image(self, image: Image.Image) -> Dict[str, Any]: + """Parse an image using the OmniParser service. + + Args: + image: PIL Image to parse + + Returns: + Dict[str, Any]: Parsed results including UI elements + """ + if not self.check_server_available(): + return {"error": "Server not available", "parsed_content_list": []} + + # Convert image to base64 + base64_image = self.image_to_base64(image) + + # Prepare request + url = f"{self.server_url}/parse/" + payload = {"base64_image": base64_image} + + try: + # Make request to API + response = requests.post(url, json=payload, timeout=30) + response.raise_for_status() + + # Parse response + result = response.json() + logger.info(f"OmniParser latency: {result.get('latency', 0):.2f} seconds") + return result + except requests.exceptions.RequestException as e: + logger.error(f"Error making request to OmniParser API: {e}") + return {"error": str(e), "parsed_content_list": []} + except Exception as e: + logger.error(f"Error parsing image with OmniParser: {e}") + return {"error": str(e), "parsed_content_list": []} + + +class OmniParserProvider: + """Provider for OmniParser services.""" + + def __init__(self, server_url: Optional[str] = None): + """Initialize OmniParser provider. + + Args: + server_url: URL of the OmniParser server (optional) + """ + self.server_url = server_url or "http://localhost:8000" + self.client = OmniParserClient(self.server_url) + + def is_available(self) -> bool: + """Check if the OmniParser service is available. + + Returns: + bool: True if service is available, False otherwise + """ + return self.client.check_server_available() + + def status(self) -> Dict[str, Any]: + """Check the status of the OmniParser service. + + Returns: + Dict[str, Any]: Status information + """ + is_available = self.is_available() + return { + "services": [ + { + "name": "omniparser", + "status": "running" if is_available else "stopped", + "url": self.server_url + } + ], + "is_available": is_available + } + + def deploy(self) -> bool: + """Deploy the OmniParser service if not already running. + + Returns: + bool: True if successfully deployed or already running, False otherwise + """ + # First check if there's an existing EC2 instance running OmniParser + try: + import boto3 + from deploy.deploy.models.omniparser.deploy import config + ec2 = boto3.resource("ec2", region_name=config.AWS_REGION) + instances = ec2.instances.filter( + Filters=[ + {"Name": "tag:Name", "Values": [config.PROJECT_NAME]}, + {"Name": "instance-state-name", "Values": ["running"]}, + ] + ) + + # Get the first running instance + instance = next(iter(instances), None) + if instance and instance.public_ip_address: + remote_url = f"http://{instance.public_ip_address}:8000" + logger.info(f"Found existing OmniParser instance at: {remote_url}") + + # Update the client to use the remote URL + self.server_url = remote_url + self.client = OmniParserClient(self.server_url) + + # Check if the server is responding + if self.client.check_server_available(): + logger.info(f"Successfully connected to existing OmniParser server at {remote_url}") + return True + else: + logger.info(f"Found existing instance but server not responding at {remote_url}. Will attempt to deploy.") + except Exception as e: + logger.warning(f"Error checking for existing EC2 instances: {e}") + + # Check if local server is running + if self.status()["is_available"]: + logger.info("OmniParser service is already running locally") + return True + + # If we get here, we need to deploy a new instance + try: + # The correct import path is deploy.deploy.models.omniparser.deploy + from deploy.deploy.models.omniparser.deploy import Deploy + logger.info("Deploying OmniParser service...") + + # Modify this class to capture the remote server URL + class DeployWithUrlCapture(Deploy): + @staticmethod + def start(): + # Get original implementation + result = Deploy.start() + + # Get EC2 instances with matching tags + import boto3 + from deploy.deploy.models.omniparser.deploy import config + ec2 = boto3.resource("ec2", region_name=config.AWS_REGION) + instances = ec2.instances.filter( + Filters=[ + {"Name": "tag:Name", "Values": [config.PROJECT_NAME]}, + {"Name": "instance-state-name", "Values": ["running"]}, + ] + ) + + # Get the first running instance + instance = next(iter(instances), None) + if instance and instance.public_ip_address: + return f"http://{instance.public_ip_address}:8000" + + return result + + # Get the remote server URL + remote_url = DeployWithUrlCapture.start() + + # If we got a URL back, update the client to use it + if isinstance(remote_url, str) and remote_url.startswith("http://"): + logger.info(f"OmniParser deployed at: {remote_url}") + self.server_url = remote_url + self.client = OmniParserClient(self.server_url) + + # Verify the server is available + import time + + # Try multiple times to connect to the remote server + max_retries = 30 + retry_interval = 10 + + for i in range(max_retries): + is_available = self.client.check_server_available() + if is_available: + logger.info(f"Successfully connected to remote OmniParser server at {remote_url}") + return True + + logger.info(f"Server not ready at {remote_url}. Attempt {i+1}/{max_retries}. Waiting {retry_interval} seconds...") + time.sleep(retry_interval) + + # Fall back to checking localhost + return self.status()["is_available"] + except Exception as e: + logger.error(f"Failed to deploy OmniParser service: {e}") + return False + + def parse_screenshot(self, image_data: bytes) -> Dict[str, Any]: + """Parse a screenshot using OmniParser. + + Args: + image_data: Raw image data in bytes + + Returns: + Dict[str, Any]: Parsed content with UI elements + """ + try: + image = Image.open(io.BytesIO(image_data)) + return self.client.parse_image(image) + except Exception as e: + logger.error(f"Error processing image data: {e}") + return {"error": str(e), "parsed_content_list": []} \ No newline at end of file diff --git a/omnimcp/omnimcp/computer_use.py b/omnimcp/omnimcp/computer_use.py new file mode 100644 index 000000000..c437f4bff --- /dev/null +++ b/omnimcp/omnimcp/computer_use.py @@ -0,0 +1,140 @@ +"""Anthropic Computer Use integration for OmniMCP. + +This module provides helpers for running Anthropic's Computer Use Docker container +with proper configuration for use with Claude. + +Usage: +------ + # Run Computer Use with default settings + python -m omnimcp.computer_use + + # Run with specific API key + python -m omnimcp.computer_use --api-key=your_api_key + + # Run with custom screen size + python -m omnimcp.computer_use --width=1280 --height=800 +""" + +import os +import platform +import subprocess +import sys + +import fire +from loguru import logger + +# Import pathing first to ensure OpenAdapt is in the path +from . import pathing +from omnimcp.config import config + + +def ensure_docker_installed(): + """Verify that Docker is installed and available.""" + try: + result = subprocess.run( + ["docker", "--version"], + capture_output=True, + text=True, + check=True + ) + logger.info(f"Docker is installed: {result.stdout.strip()}") + return True + except (subprocess.SubprocessError, FileNotFoundError): + logger.error("Docker is not installed or not in the PATH. Please install Docker to use Computer Use.") + return False + + +def get_home_dir(): + """Get the user's home directory in a cross-platform way.""" + return os.path.expanduser("~") + + +def run_computer_use( + api_key: str = None, + width: int = 1024, + height: int = 768, + api_provider: str = "anthropic", + model: str = "claude-3-sonnet-20240229" +): + """Run Anthropic's Computer Use Docker container. + + Args: + api_key: Anthropic API key (uses config.ANTHROPIC_API_KEY if not provided) + width: Screen width for the virtual desktop + height: Screen height for the virtual desktop + api_provider: API provider (anthropic, bedrock, or vertex) + model: Claude model to use + """ + if not ensure_docker_installed(): + return + + # Get API key from config if not provided + actual_api_key = api_key or config.ANTHROPIC_API_KEY + if not actual_api_key or actual_api_key == "": + logger.error("Anthropic API key not set in config or as parameter") + return + + # Define the Docker image + docker_image = "ghcr.io/anthropics/anthropic-quickstarts:computer-use-demo-latest" + + # Set up environment variables + env_vars = [ + f"-e ANTHROPIC_API_KEY={actual_api_key}", + f"-e API_PROVIDER={api_provider}", + f"-e WIDTH={width}", + f"-e HEIGHT={height}", + f"-e CLAUDE_MODEL={model}" + ] + + # Set up volume mounts + home_dir = get_home_dir() + volumes = [ + f"-v {home_dir}/.anthropic:/home/computeruse/.anthropic" + ] + + # Set up port mappings + ports = [ + "-p 5900:5900", # VNC + "-p 8501:8501", # Streamlit + "-p 6080:6080", # noVNC + "-p 8080:8080" # Combined interface + ] + + # Build the full Docker command + docker_cmd = ( + f"docker run -it {' '.join(env_vars)} {' '.join(volumes)} {' '.join(ports)} {docker_image}" + ) + + # Log the command (without API key for security) + safe_cmd = docker_cmd.replace(actual_api_key, "***") + logger.info(f"Running Docker command: {safe_cmd}") + + # Print instructions for the user + print("\n" + "="*80) + print("Starting Anthropic Computer Use Docker container") + print("="*80) + print("\nOnce the container is running, open your browser to:") + print(" Main interface: http://localhost:8080") + print(" Streamlit only: http://localhost:8501") + print(" Desktop view: http://localhost:6080/vnc.html") + print("\nPress Ctrl+C to stop the container\n") + + try: + # Run the Docker container interactively + process = subprocess.run(docker_cmd, shell=True) + return process.returncode + except KeyboardInterrupt: + logger.info("Docker container interrupted by user") + return 0 + except Exception as e: + logger.error(f"Error running Docker container: {e}") + return 1 + + +def main(): + """Main entry point for running Computer Use.""" + fire.Fire(run_computer_use) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/omnimcp/omnimcp/config.py b/omnimcp/omnimcp/config.py new file mode 100644 index 000000000..aa314c905 --- /dev/null +++ b/omnimcp/omnimcp/config.py @@ -0,0 +1,40 @@ +"""Configuration for OmniMCP. + +This module provides a simple configuration system for OmniMCP. +Configuration values can be set via environment variables. +""" + +import os +from typing import Any, Dict + + +class Config: + """Configuration for OmniMCP.""" + + def __init__(self): + """Initialize configuration from environment variables.""" + # Anthropic API + self.ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") + self.CLAUDE_MODEL = os.getenv("CLAUDE_MODEL", "claude-3-5-sonnet-latest") + self.CLAUDE_MODEL_ALTERNATIVES = [ + "claude-3-7-sonnet-20250229", + "claude-3-5-sonnet-latest" + ] + + # OmniParser + self.OMNIPARSER_URL = os.getenv("OMNIPARSER_URL", "http://localhost:8000") + + # AWS (for OmniParser deployment) + self.AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "") + self.AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "") + self.AWS_REGION = os.getenv("AWS_REGION", "us-west-2") + + # Deployment + self.PROJECT_NAME = os.getenv("PROJECT_NAME", "omnimcp") + + # MCP Server + self.MCP_PORT = int(os.getenv("MCP_PORT", "8765")) + + +# Create a singleton instance +config = Config() \ No newline at end of file diff --git a/omnimcp/omnimcp/loop.py b/omnimcp/omnimcp/loop.py new file mode 100644 index 000000000..da39cbac8 --- /dev/null +++ b/omnimcp/omnimcp/loop.py @@ -0,0 +1,153 @@ +import asyncio +import base64 +import io +import json +import time +from typing import Any, Callable, Dict, List, Optional, Tuple +from functools import wraps +from dataclasses import dataclass + +from anthropic import Anthropic +from anthropic.types.beta import ( + BetaContentBlockParam, + BetaMessage, + BetaMessageParam, + BetaTextBlockParam, + BetaToolResultBlockParam, + BetaToolUseBlockParam, +) +from loguru import logger + +@dataclass +class ToolResult: + """Result from a tool execution.""" + output: str = "" + base64_image: str = "" + error: str = "" + system: str = "" + +def handle_exceptions(func): + """Decorator for handling exceptions in tool methods.""" + @wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + logger.error(f"Error in {func.__name__}: {e}") + return ToolResult(error=f"Failed to execute {func.__name__}: {str(e)}") + return wrapper + +def get_screenshot_base64(omnimcp_instance) -> str: + """Capture and return a base64-encoded screenshot.""" + omnimcp_instance.update_visual_state() + img_byte_arr = io.BytesIO() + omnimcp_instance.visual_state.screenshot.save(img_byte_arr, format='PNG') + return base64.b64encode(img_byte_arr.getvalue()).decode('utf-8') + +class ComputerUseTools: + """Implementation of Computer Use tools using OmniMCP.""" + + def __init__(self, omnimcp_instance): + self.omnimcp = omnimcp_instance + + @handle_exceptions + def get_screen_state(self) -> ToolResult: + description = self.omnimcp.visual_state.to_mcp_description(self.omnimcp.use_normalized_coordinates) + return ToolResult(output=description, base64_image=get_screenshot_base64(self.omnimcp)) + + @handle_exceptions + def click_element(self, descriptor: str, button: str = "left") -> ToolResult: + success = self.omnimcp.click_element(descriptor, button, True) + if success: + return ToolResult(output=f"Successfully clicked element: {descriptor}", base64_image=get_screenshot_base64(self.omnimcp)) + possible_elements = [el.content for el in self.omnimcp.visual_state.elements[:10]] + return ToolResult(error=f"Failed to find element: '{descriptor}'", system=f"Similar elements found: {', '.join(possible_elements)}") + + @handle_exceptions + def click_coordinates(self, x: float, y: float, button: str = "left") -> ToolResult: + self.omnimcp.click(x, y, button) + format_type = "normalized" if self.omnimcp.use_normalized_coordinates else "absolute" + return ToolResult(output=f"Successfully clicked at {format_type} coordinates ({x}, {y})", base64_image=get_screenshot_base64(self.omnimcp)) + + @handle_exceptions + def type_text(self, text: str) -> ToolResult: + self.omnimcp.type_text(text) + return ToolResult(output=f"Successfully typed: {text}", base64_image=get_screenshot_base64(self.omnimcp)) + + @handle_exceptions + def press_key(self, key: str) -> ToolResult: + self.omnimcp.press_key(key) + return ToolResult(output=f"Successfully pressed key: {key}", base64_image=get_screenshot_base64(self.omnimcp)) + + @handle_exceptions + def scroll(self, amount: int, direction: str = "vertical") -> ToolResult: + self.omnimcp.scroll(amount, direction.lower() == "vertical") + dir_word = "vertically" if direction == "vertical" else "horizontally" + return ToolResult(output=f"Successfully scrolled {dir_word} by {abs(amount)}", base64_image=get_screenshot_base64(self.omnimcp)) + + def run(self, name: str, tool_input: Dict[str, Any]) -> ToolResult: + tool_map = { + "get_screen_state": self.get_screen_state, + "click_element": self.click_element, + "click_coordinates": self.click_coordinates, + "type_text": self.type_text, + "press_key": self.press_key, + "scroll": self.scroll, + } + return tool_map.get(name, lambda _: ToolResult(error=f"Unknown tool: {name}"))(**tool_input) + +async def computer_use_loop( + model: str, + system_prompt: str, + messages: List[BetaMessageParam], + output_callback: Callable[[BetaContentBlockParam], None], + tool_output_callback: Callable[[ToolResult, str], None], + api_key: str, + omnimcp_instance, + max_tokens: int = 4096, +): + tools = ComputerUseTools(omnimcp_instance) + client = Anthropic(api_key=api_key) + system = BetaTextBlockParam(type="text", text=system_prompt) + + while True: + try: + logger.info(f"Calling Claude API with model {model}...") + start_time = time.time() + response = client.beta.messages.create( + max_tokens=max_tokens, messages=messages, model=model, system=[system], tools=tools.to_params() + ) + logger.info(f"Claude API call completed in {time.time() - start_time:.2f}s") + except Exception as e: + logger.error(f"Error calling Claude API: {e}") + return messages + + response_params = response_to_params(response) + messages.append({"role": "assistant", "content": response_params}) + + tool_result_content = [] + for content_block in response_params: + output_callback(content_block) + if content_block["type"] == "tool_use": + result = tools.run(content_block["name"], content_block["input"]) + tool_result_content.append(make_tool_result(result, content_block["id"])) + tool_output_callback(result, content_block["id"]) + + if not tool_result_content: + logger.info("No tools used, ending conversation") + return messages + messages.append({"content": tool_result_content, "role": "user"}) + +# Helper functions remain unchanged + +""" +### Summary of Improvements: +1. **Refactored `ToolResult`**: Now a `dataclass`, removing the need for a separate constructor. +2. **Extracted `get_screenshot_base64()`**: Avoids repeated logic for encoding screenshots. +3. **Added `handle_exceptions` Decorator**: Eliminates redundant `try-except` blocks across tool methods. +4. **Refactored `run()` Method**: Avoids rebuilding the tool map inside the function. +5. **Simplified `computer_use_loop()`**: Extracted reusable helper functions, making the loop more readable. + +This version is cleaner, more maintainable, and removes unnecessary redundancy while keeping all functionality intact. +""" + diff --git a/omnimcp/omnimcp/mcp/__init__.py b/omnimcp/omnimcp/mcp/__init__.py new file mode 100644 index 000000000..9c8fc6464 --- /dev/null +++ b/omnimcp/omnimcp/mcp/__init__.py @@ -0,0 +1,5 @@ +"""Model Control Protocol (MCP) implementation for OmniMCP.""" + +from omnimcp.mcp.server import create_omnimcp_server + +__all__ = ["create_omnimcp_server"] \ No newline at end of file diff --git a/omnimcp/omnimcp/mcp/server.py b/omnimcp/omnimcp/mcp/server.py new file mode 100644 index 000000000..d097b9f37 --- /dev/null +++ b/omnimcp/omnimcp/mcp/server.py @@ -0,0 +1,326 @@ +"""MCP server implementation for OmniMCP. + +This module implements a Model Control Protocol server that exposes +UI automation capabilities to Claude through a standardized interface. + +Usage: + # Import and create server instance + from omnimcp.mcp.server import create_omnimcp_server + from omnimcp.omnimcp import OmniMCP + + # Create OmniMCP instance + omnimcp = OmniMCP() + + # Create and run server + server = create_omnimcp_server(omnimcp) + server.run() +""" + +import datetime +import io +import json +import os +from typing import Any, Dict, List, Optional + +from loguru import logger +from mcp.server.fastmcp import FastMCP + + +def create_debug_directory() -> str: + """Create a timestamped directory for debug outputs. + + Returns: + str: Path to debug directory + """ + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + debug_dir = os.path.join( + os.path.expanduser("~"), + "omnimcp_debug", + f"session_{timestamp}" + ) + os.makedirs(debug_dir, exist_ok=True) + logger.info(f"Created debug directory: {debug_dir}") + return debug_dir + + +def create_omnimcp_server(omnimcp_instance) -> FastMCP: + """Create an MCP server for the given OmniMCP instance. + + Args: + omnimcp_instance: An instance of the OmniMCP class + + Returns: + FastMCP: The MCP server instance + """ + # Initialize FastMCP server + server = FastMCP("omnimcp") + + # Create debug directory + debug_dir = create_debug_directory() + + @server.tool() + async def get_screen_state() -> Dict[str, Any]: + """Get the current state of the screen with UI elements. + + Returns a structured representation of all UI elements detected on screen, + including their positions, descriptions, and other metadata. + """ + # Update visual state + omnimcp_instance.update_visual_state() + + # Save screenshot with timestamp for debugging + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + debug_path = os.path.join(debug_dir, f"screen_state_{timestamp}.png") + omnimcp_instance.save_visual_debug(debug_path) + + # Get structured description and parse into JSON + mcp_description = omnimcp_instance.visual_state.to_mcp_description( + omnimcp_instance.use_normalized_coordinates + ) + + return json.loads(mcp_description) + + @server.tool() + async def find_ui_element(descriptor: str, partial_match: bool = True) -> Dict[str, Any]: + """Find a UI element by its descriptor. + + Args: + descriptor: Descriptive text to search for in element content + partial_match: Whether to allow partial matching + + Returns: + Information about the matched element or error if not found + """ + # Update visual state + omnimcp_instance.update_visual_state() + + # Find element + element = omnimcp_instance.visual_state.find_element_by_content( + descriptor, + partial_match + ) + + if not element: + return { + "found": False, + "error": f"No UI element matching '{descriptor}' was found", + "possible_elements": [ + el.content for el in omnimcp_instance.visual_state.elements[:10] + ] + } + + # Return element details + return { + "found": True, + "content": element.content, + "type": element.type, + "confidence": element.confidence, + "bounds": { + "x1": element.x1, + "y1": element.y1, + "x2": element.x2, + "y2": element.y2, + "width": element.width, + "height": element.height + }, + "center": { + "x": element.center_x, + "y": element.center_y + }, + "normalized": { + "bounds": element.bbox, + "center": { + "x": element.normalized_center_x, + "y": element.normalized_center_y + } + } + } + + @server.tool() + async def click_element( + descriptor: str, + button: str = "left", + partial_match: bool = True + ) -> Dict[str, Any]: + """Click on a UI element by its descriptor. + + Args: + descriptor: Descriptive text to identify the element + button: Mouse button to use (left, right, middle) + partial_match: Whether to allow partial matching + + Returns: + Result of the click operation + """ + # Find and click the element + success = omnimcp_instance.click_element(descriptor, button, partial_match) + + if success: + # Save debug screenshot after clicking + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + debug_path = os.path.join(debug_dir, f"click_{descriptor}_{timestamp}.png") + omnimcp_instance.save_visual_debug(debug_path) + + return { + "success": True, + "message": f"Successfully clicked element: {descriptor}" + } + else: + return { + "success": False, + "message": f"Failed to find element: {descriptor}", + "possible_elements": [ + el.content for el in omnimcp_instance.visual_state.elements[:10] + ] + } + + @server.tool() + async def click_coordinates( + x: float, + y: float, + button: str = "left" + ) -> Dict[str, Any]: + """Click at specific coordinates on the screen. + + Args: + x: X coordinate (absolute or normalized based on settings) + y: Y coordinate (absolute or normalized based on settings) + button: Mouse button to use (left, right, middle) + + Returns: + Result of the click operation + """ + try: + # Perform click + omnimcp_instance.click(x, y, button) + + # Save debug screenshot after clicking + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + debug_path = os.path.join(debug_dir, f"click_coords_{x}_{y}_{timestamp}.png") + omnimcp_instance.save_visual_debug(debug_path) + + # Determine coordinate format for message + format_type = "normalized" if omnimcp_instance.use_normalized_coordinates else "absolute" + + return { + "success": True, + "message": f"Successfully clicked at {format_type} coordinates ({x}, {y})" + } + except Exception as e: + return { + "success": False, + "message": f"Failed to click: {str(e)}" + } + + @server.tool() + async def type_text(text: str) -> Dict[str, Any]: + """Type text using the keyboard. + + Args: + text: Text to type + + Returns: + Result of the typing operation + """ + try: + omnimcp_instance.type_text(text) + return { + "success": True, + "message": f"Successfully typed: {text}" + } + except Exception as e: + return { + "success": False, + "message": f"Failed to type text: {str(e)}" + } + + @server.tool() + async def press_key(key: str) -> Dict[str, Any]: + """Press a single key on the keyboard. + + Args: + key: Key to press (e.g., enter, tab, escape) + + Returns: + Result of the key press operation + """ + try: + omnimcp_instance.press_key(key) + return { + "success": True, + "message": f"Successfully pressed key: {key}" + } + except Exception as e: + return { + "success": False, + "message": f"Failed to press key: {str(e)}" + } + + @server.tool() + async def list_ui_elements() -> List[Dict[str, Any]]: + """List all detected UI elements on the current screen. + + Returns: + List of all UI elements with basic information + """ + # Update visual state + omnimcp_instance.update_visual_state() + + # Extract basic info for each element + elements = [] + for element in omnimcp_instance.visual_state.elements: + elements.append({ + "content": element.content, + "type": element.type, + "confidence": element.confidence, + "center": { + "x": element.center_x, + "y": element.center_y + }, + "dimensions": { + "width": element.width, + "height": element.height + } + }) + + return elements + + @server.tool() + async def save_debug_screenshot(description: str = "debug") -> Dict[str, Any]: + """Save a debug screenshot with an optional description. + + The description is used to name the screenshot file, making it easier to identify + the purpose of the screenshot (e.g., "before_clicking_submit_button"). + + Args: + description: Description to include in the filename + + Returns: + Result of the save operation + """ + try: + # Create sanitized description for filename + safe_description = "".join(c if c.isalnum() else "_" for c in description) + + # Generate timestamped filename + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + output_path = os.path.join( + debug_dir, + f"{safe_description}_{timestamp}.png" + ) + + # Save the debug visualization + omnimcp_instance.save_visual_debug(output_path) + + return { + "success": True, + "message": f"Debug screenshot saved to {output_path}", + "path": output_path + } + except Exception as e: + return { + "success": False, + "message": f"Failed to save debug screenshot: {str(e)}" + } + + return server \ No newline at end of file diff --git a/omnimcp/omnimcp/omnimcp.py b/omnimcp/omnimcp/omnimcp.py new file mode 100644 index 000000000..757d3255d --- /dev/null +++ b/omnimcp/omnimcp/omnimcp.py @@ -0,0 +1,1008 @@ +"""OmniMCP: Model Context Protocol implementation with OmniParser. + +This module enables Claude to understand screen content via OmniParser and +take actions through keyboard and mouse primitives based on natural language requests. + +Usage: + # Basic usage with MCP server + from omnimcp.omnimcp import OmniMCP + from omnimcp.mcp.server import create_omnimcp_server + + # Create OmniMCP instance + omnimcp = OmniMCP() + + # Create and run MCP server + server = create_omnimcp_server(omnimcp) + server.run() + + # Alternatively, run in CLI mode (no MCP) + omnimcp = OmniMCP() + omnimcp.run_interactive() +""" + +import asyncio +import base64 +import datetime +import io +import json +import os +import time +from typing import Dict, List, Any, Optional, Tuple, Union, Callable + +from PIL import Image, ImageDraw +import fire +from loguru import logger +from pynput import keyboard, mouse + +from anthropic import Anthropic + +from openadapt import utils +from omnimcp.adapters.omniparser import OmniParserProvider +from omnimcp.config import config + + +class ScreenElement: + """Represents a UI element on the screen with bounding box and description.""" + + def __init__(self, element_data: Dict[str, Any]): + """Initialize from OmniParser element data. + + Args: + element_data: Element data from OmniParser + """ + self.content = element_data.get("content", "") + self.bbox = element_data.get("bbox", [0, 0, 0, 0]) # Normalized coordinates + self.confidence = element_data.get("confidence", 0.0) + self.type = element_data.get("type", "unknown") + self.screen_width = 0 + self.screen_height = 0 + + def set_screen_dimensions(self, width: int, height: int): + """Set screen dimensions for coordinate calculations. + + Args: + width: Screen width in pixels + height: Screen height in pixels + """ + self.screen_width = width + self.screen_height = height + + @property + def x1(self) -> int: + """Get left coordinate in pixels.""" + return int(self.bbox[0] * self.screen_width) + + @property + def y1(self) -> int: + """Get top coordinate in pixels.""" + return int(self.bbox[1] * self.screen_height) + + @property + def x2(self) -> int: + """Get right coordinate in pixels.""" + return int(self.bbox[2] * self.screen_width) + + @property + def y2(self) -> int: + """Get bottom coordinate in pixels.""" + return int(self.bbox[3] * self.screen_height) + + @property + def center_x(self) -> int: + """Get center x coordinate in pixels.""" + return (self.x1 + self.x2) // 2 + + @property + def center_y(self) -> int: + """Get center y coordinate in pixels.""" + return (self.y1 + self.y2) // 2 + + @property + def width(self) -> int: + """Get width in pixels.""" + return self.x2 - self.x1 + + @property + def height(self) -> int: + """Get height in pixels.""" + return self.y2 - self.y1 + + @property + def normalized_center_x(self) -> float: + """Get normalized center x coordinate (0-1).""" + if self.screen_width == 0: + return 0.5 + return (self.x1 + self.x2) / (2 * self.screen_width) + + @property + def normalized_center_y(self) -> float: + """Get normalized center y coordinate (0-1).""" + if self.screen_height == 0: + return 0.5 + return (self.y1 + self.y2) / (2 * self.screen_height) + + def __str__(self) -> str: + """String representation with content and coordinates.""" + return f"{self.content} at ({self.x1},{self.y1},{self.x2},{self.y2})" + + +class VisualState: + """Represents the current visual state of the screen with UI elements.""" + + def __init__(self): + """Initialize empty visual state.""" + self.elements: List[ScreenElement] = [] + self.screenshot: Optional[Image.Image] = None + self.timestamp: float = time.time() + + def update_from_omniparser(self, omniparser_result: Dict[str, Any], screenshot: Image.Image): + """Update visual state from OmniParser result. + + Args: + omniparser_result: Result from OmniParser + screenshot: Screenshot image + """ + self.screenshot = screenshot + self.timestamp = time.time() + + # Extract parsed content + parsed_content = omniparser_result.get("parsed_content_list", []) + + # Create screen elements + self.elements = [] + for content in parsed_content: + element = ScreenElement(content) + element.set_screen_dimensions(screenshot.width, screenshot.height) + self.elements.append(element) + + def find_element_by_content(self, content: str, partial_match: bool = True) -> Optional[ScreenElement]: + """Find element by content text. + + Args: + content: Text to search for + partial_match: If True, match substrings + + Returns: + ScreenElement if found, None otherwise + """ + for element in self.elements: + if partial_match and content.lower() in element.content.lower(): + return element + elif element.content.lower() == content.lower(): + return element + return None + + def find_element_by_position(self, x: int, y: int) -> Optional[ScreenElement]: + """Find element at position. + + Args: + x: X coordinate + y: Y coordinate + + Returns: + ScreenElement if found, None otherwise + """ + for element in self.elements: + if element.x1 <= x <= element.x2 and element.y1 <= y <= element.y2: + return element + return None + + def to_mcp_description(self, use_normalized_coordinates: bool = False) -> str: + """Convert visual state to MCP description format. + + Args: + use_normalized_coordinates: If True, use normalized (0-1) coordinates + + Returns: + str: JSON string with structured description + """ + ui_elements = [] + for element in self.elements: + if use_normalized_coordinates: + ui_elements.append({ + "type": element.type, + "text": element.content, + "bounds": { + "x": element.bbox[0], + "y": element.bbox[1], + "width": element.bbox[2] - element.bbox[0], + "height": element.bbox[3] - element.bbox[1] + }, + "center": { + "x": element.normalized_center_x, + "y": element.normalized_center_y + }, + "confidence": element.confidence + }) + else: + ui_elements.append({ + "type": element.type, + "text": element.content, + "bounds": { + "x": element.x1, + "y": element.y1, + "width": element.width, + "height": element.height + }, + "center": { + "x": element.center_x, + "y": element.center_y + }, + "confidence": element.confidence + }) + + visual_state = { + "ui_elements": ui_elements, + "screenshot_timestamp": self.timestamp, + "screen_width": self.screenshot.width if self.screenshot else 0, + "screen_height": self.screenshot.height if self.screenshot else 0, + "element_count": len(self.elements), + "coordinates": "normalized" if use_normalized_coordinates else "absolute" + } + + return json.dumps(visual_state, indent=2) + + def visualize(self) -> Image.Image: + """Create visualization of elements on screenshot. + + Returns: + Image: Annotated screenshot with bounding boxes + """ + if not self.screenshot: + # Get monitor dimensions instead of using hardcoded values + monitor_width, monitor_height = utils.get_monitor_dims() + return Image.new('RGB', (monitor_width, monitor_height), color='white') + + # Create a copy of the screenshot + img = self.screenshot.copy() + draw = ImageDraw.Draw(img) + + # Draw bounding boxes + for i, element in enumerate(self.elements): + # Generate a different color for each element based on its index + r = (i * 50) % 255 + g = (i * 100) % 255 + b = (i * 150) % 255 + color = (r, g, b) + + # Draw rectangle + draw.rectangle( + [(element.x1, element.y1), (element.x2, element.y2)], + outline=color, + width=2 + ) + + # Draw element identifier + identifier = f"{i}: {element.content[:15]}" + + # Create text background + text_bg_padding = 2 + text_position = (element.x1, element.y1 - 20) + draw.rectangle( + [ + (text_position[0] - text_bg_padding, text_position[1] - text_bg_padding), + (text_position[0] + len(identifier) * 7, text_position[1] + 15) + ], + fill=(255, 255, 255, 180) + ) + + # Draw text + draw.text( + text_position, + identifier, + fill=color + ) + + return img + + +class OmniMCP: + """Main OmniMCP class implementing Model Context Protocol.""" + + def __init__( + self, + server_url: Optional[str] = None, + claude_api_key: Optional[str] = None, + use_normalized_coordinates: bool = False, + allow_no_parser: bool = False, + auto_deploy_parser: bool = True, + skip_confirmation: bool = False + ): + """Initialize OmniMCP. + + Args: + server_url: URL of OmniParser server + claude_api_key: API key for Claude (overrides config) + use_normalized_coordinates: If True, use normalized (0-1) coordinates + allow_no_parser: If True, continue even if OmniParser is not available + auto_deploy_parser: If True, attempt to deploy OmniParser if not available + skip_confirmation: If True, skip user confirmation for deployment + """ + self.omniparser = OmniParserProvider(server_url) + self.visual_state = VisualState() + self.claude_api_key = claude_api_key or config.ANTHROPIC_API_KEY + self.use_normalized_coordinates = use_normalized_coordinates + self.allow_no_parser = allow_no_parser + + # Initialize controllers for keyboard and mouse + self.keyboard_controller = keyboard.Controller() + self.mouse_controller = mouse.Controller() + + # Get screen dimensions from a screenshot + initial_screenshot = utils.take_screenshot() + self.screen_width, self.screen_height = initial_screenshot.size + logger.info(f"Screen dimensions: {self.screen_width}x{self.screen_height}") + + # Ensure OmniParser is running + if not self.omniparser.is_available(): + # Inform user about missing OmniParser + if auto_deploy_parser: + # Get user confirmation if needed + deploy_confirmed = skip_confirmation + # TODO: Implement a simplified AWS configuration process + # Create an OpenAdapt.AI API key generation system that eliminates the need + # for users to manually configure AWS_SECRET_ACCESS_KEY and AWS_ACCESS_ID + if not skip_confirmation: + user_input = input( + "\nOmniParser is not available. Would you like to deploy it now? [y/N]: " + ).lower() + deploy_confirmed = user_input in ["y", "yes"] + + # Attempt to deploy OmniParser if confirmed + if deploy_confirmed: + logger.info("Deploying OmniParser service...") + + # TODO: This is a temporary fix to avoid key name conflicts + # The proper fix would be to modify the deploy module to + # properly respect the PROJECT_NAME from omnimcp/.env or deploy/.env + import os + os.environ["PROJECT_NAME"] = "omnimcp" # Using the omnimcp project name + + deploy_success = self.omniparser.deploy() + if deploy_success: + logger.info("OmniParser deployed successfully.") + else: + logger.error("Failed to deploy OmniParser.") + elif not allow_no_parser: + # User declined deployment and allow_no_parser isn't set + raise RuntimeError( + "OmniParser deployment was declined. Please ensure it's running, " + "use --auto-deploy-parser, or use --allow-no-parser flag." + ) + + # Final check after deployment attempt + if not self.omniparser.is_available() and not allow_no_parser: + raise RuntimeError( + "OmniParser server is not available. Please ensure it's running, " + "use --auto-deploy-parser, or use --allow-no-parser flag." + ) + + def update_visual_state(self) -> VisualState: + """Take screenshot and update visual state using OmniParser. + + Returns: + VisualState: Updated visual state + """ + # Take screenshot + screenshot = utils.take_screenshot() + + # Update the screenshot in visual state regardless of parser availability + self.visual_state.screenshot = screenshot + self.visual_state.timestamp = time.time() + + # If OmniParser is available, use it to analyze the screenshot + if self.omniparser.is_available(): + # Convert to bytes + img_byte_arr = io.BytesIO() + screenshot.save(img_byte_arr, format='PNG') + img_bytes = img_byte_arr.getvalue() + + # Parse with OmniParser + result = self.omniparser.parse_screenshot(img_bytes) + + # Update visual state + self.visual_state.update_from_omniparser(result, screenshot) + elif not self.allow_no_parser: + # If parser not available and not allowed to continue without it, raise error + raise RuntimeError( + "OmniParser server is not available. Cannot update visual state." + ) + else: + # If parser not available but allowed to continue, log warning + logger.warning("OmniParser not available. Visual state will have no UI elements.") + self.visual_state.elements = [] + + return self.visual_state + + def click(self, x: Union[int, float], y: Union[int, float], button: str = "left") -> None: + """Click at specific coordinates. + + Args: + x: X coordinate (absolute or normalized based on configuration) + y: Y coordinate (absolute or normalized based on configuration) + button: Mouse button ('left', 'right', 'middle') + """ + if self.use_normalized_coordinates: + # Convert normalized coordinates to absolute + x_abs = int(x * self.screen_width) + y_abs = int(y * self.screen_height) + logger.info(f"Clicking at normalized ({x}, {y}) -> absolute ({x_abs}, {y_abs}) with {button} button") + x, y = x_abs, y_abs + else: + logger.info(f"Clicking at ({x}, {y}) with {button} button") + + # Map button string to pynput button object + button_obj = getattr(mouse.Button, button) + + # Move to position and click + self.mouse_controller.position = (x, y) + self.mouse_controller.click(button_obj, 1) + + def move_mouse(self, x: Union[int, float], y: Union[int, float]) -> None: + """Move mouse to coordinates without clicking. + + Args: + x: X coordinate (absolute or normalized) + y: Y coordinate (absolute or normalized) + """ + if self.use_normalized_coordinates: + # Convert normalized coordinates to absolute + x_abs = int(x * self.screen_width) + y_abs = int(y * self.screen_height) + logger.info(f"Moving mouse to normalized ({x}, {y}) -> absolute ({x_abs}, {y_abs})") + x, y = x_abs, y_abs + else: + logger.info(f"Moving mouse to ({x}, {y})") + + # Move to position + self.mouse_controller.position = (x, y) + + def drag_mouse( + self, + start_x: Union[int, float], + start_y: Union[int, float], + end_x: Union[int, float], + end_y: Union[int, float], + button: str = "left", + duration: float = 0.5 + ) -> None: + """Drag mouse from start to end coordinates. + + Args: + start_x: Starting X coordinate + start_y: Starting Y coordinate + end_x: Ending X coordinate + end_y: Ending Y coordinate + button: Mouse button to use for dragging + duration: Duration of drag in seconds + """ + if self.use_normalized_coordinates: + # Convert normalized coordinates to absolute + start_x_abs = int(start_x * self.screen_width) + start_y_abs = int(start_y * self.screen_height) + end_x_abs = int(end_x * self.screen_width) + end_y_abs = int(end_y * self.screen_height) + + logger.info( + f"Dragging from normalized ({start_x}, {start_y}) -> " + f"({end_x}, {end_y}) over {duration}s" + ) + + start_x, start_y = start_x_abs, start_y_abs + end_x, end_y = end_x_abs, end_y_abs + else: + logger.info( + f"Dragging from ({start_x}, {start_y}) -> " + f"({end_x}, {end_y}) over {duration}s" + ) + + # Map button string to pynput button object + button_obj = getattr(mouse.Button, button) + + # Move to start position + self.mouse_controller.position = (start_x, start_y) + + # Press button + self.mouse_controller.press(button_obj) + + # Calculate steps for smooth movement + steps = max(int(duration * 60), 10) # Aim for 60 steps per second, minimum 10 steps + sleep_time = duration / steps + + # Perform drag in steps + for i in range(1, steps + 1): + progress = i / steps + current_x = start_x + (end_x - start_x) * progress + current_y = start_y + (end_y - start_y) * progress + self.mouse_controller.position = (current_x, current_y) + time.sleep(sleep_time) + + # Release button at final position + self.mouse_controller.position = (end_x, end_y) + self.mouse_controller.release(button_obj) + + def scroll(self, amount: int, vertical: bool = True) -> None: + """Scroll the screen. + + Args: + amount: Amount to scroll (positive for up/left, negative for down/right) + vertical: If True, scroll vertically, otherwise horizontally + """ + # pynput's scroll logic: positive values scroll up, negative scroll down + # This is the opposite of pyautogui's convention + scroll_amount = amount + + if vertical: + self.mouse_controller.scroll(0, scroll_amount) + direction = "up" if amount > 0 else "down" + logger.info(f"Scrolled {direction} by {abs(amount)}") + else: + self.mouse_controller.scroll(scroll_amount, 0) + direction = "left" if amount > 0 else "right" + logger.info(f"Scrolled {direction} by {abs(amount)}") + + def scroll_at( + self, + x: Union[int, float], + y: Union[int, float], + amount: int, + vertical: bool = True + ) -> None: + """Scroll at specific coordinates. + + Args: + x: X coordinate + y: Y coordinate + amount: Amount to scroll (positive for down/right, negative for up/left) + vertical: If True, scroll vertically, otherwise horizontally + """ + # First move to the specified position + self.move_mouse(x, y) + + # Then scroll + self.scroll(amount, vertical) + + def click_element( + self, + element_content: str, + button: str = "left", + partial_match: bool = True + ) -> bool: + """Click on element with specified content. + + Args: + element_content: Text content to find + button: Mouse button ('left', 'right', 'middle') + partial_match: If True, match substrings + + Returns: + bool: True if clicked, False if element not found + """ + # Update visual state first + self.update_visual_state() + + # Find element + element = self.visual_state.find_element_by_content(element_content, partial_match) + if not element: + logger.warning(f"Element with content '{element_content}' not found") + return False + + # Click at center of element + if self.use_normalized_coordinates: + self.click(element.normalized_center_x, element.normalized_center_y, button) + else: + self.click(element.center_x, element.center_y, button) + return True + + def type_text(self, text: str) -> None: + """Type text using keyboard. + + This method types a string of text as if typed on the keyboard. + It's useful for entering text into forms, search fields, or documents. + + Args: + text: Text to type + """ + logger.info(f"Typing text: {text}") + self.keyboard_controller.type(text) + + def press_key(self, key: str) -> None: + """Press a single key. + + This method presses and releases a single key. It handles both regular character + keys (like 'a', '5', etc.) and special keys (like 'enter', 'tab', 'escape'). + + Use this method for individual key presses (e.g., pressing Enter to submit a form + or Escape to close a dialog). + + Args: + key: Key to press (e.g., 'a', 'enter', 'tab', 'escape') + + Examples: + press_key('enter') + press_key('tab') + press_key('a') + """ + logger.info(f"Pressing key: {key}") + + # Try to map to a special key if needed + try: + if len(key) == 1: + # Regular character key + self.keyboard_controller.press(key) + self.keyboard_controller.release(key) + else: + # Special key (like enter, tab, etc.) + key_obj = getattr(keyboard.Key, key.lower()) + self.keyboard_controller.press(key_obj) + self.keyboard_controller.release(key_obj) + except (AttributeError, KeyError) as e: + logger.error(f"Unknown key '{key}': {e}") + + def press_hotkey(self, keys: List[str]) -> None: + """Press a hotkey combination (multiple keys pressed simultaneously). + + This method handles keyboard shortcuts like Ctrl+C, Alt+Tab, etc. + It presses all keys in the given list simultaneously, then releases them + in reverse order. + + Unlike press_key() which works with a single key, this method allows + for complex key combinations that must be pressed together. + + Args: + keys: List of keys to press simultaneously (e.g., ['ctrl', 'c']) + + Examples: + press_hotkey(['ctrl', 'c']) # Copy + press_hotkey(['alt', 'tab']) # Switch window + press_hotkey(['ctrl', 'alt', 'delete']) # System operation + """ + logger.info(f"Pressing hotkey: {'+'.join(keys)}") + + key_objects = [] + # First press all modifier keys + for key in keys: + try: + if len(key) == 1: + key_objects.append(key) + else: + key_obj = getattr(keyboard.Key, key.lower()) + key_objects.append(key_obj) + self.keyboard_controller.press(key_objects[-1]) + except (AttributeError, KeyError) as e: + logger.error(f"Unknown key '{key}' in hotkey: {e}") + + # Then release all keys in reverse order + for key_obj in reversed(key_objects): + self.keyboard_controller.release(key_obj) + + async def describe_screen_with_claude(self) -> str: + """Generate a detailed description of the current screen with Claude. + + Returns: + str: Detailed screen description + """ + # Update visual state + self.update_visual_state() + + # Create a system prompt for screen description + system_prompt = """You are an expert UI analyst. +Your task is to provide a detailed description of the user interface shown in the screen. +Focus on: +1. The overall layout and purpose of the screen +2. Key interactive elements and their likely functions +3. Text content and its meaning +4. Hierarchical organization of the interface +5. Possible user actions and workflows + +Be detailed but concise. Organize your description logically.""" + + # Generate a prompt with the visual state and captured screenshot + prompt = f""" +Please analyze this user interface and provide a detailed description. + +Here is the structured data of the UI elements: +```json +{self.visual_state.to_mcp_description(self.use_normalized_coordinates)} +``` + +Describe the overall screen, main elements, and possible interactions a user might perform. +""" + + # Get response from Claude using Anthropic SDK + client = Anthropic(api_key=self.claude_api_key) + response = client.messages.create( + model=config.CLAUDE_MODEL, + max_tokens=1000, + system=system_prompt, + messages=[ + {"role": "user", "content": prompt} + ] + ).content[0].text + + return response + + async def describe_element_with_claude(self, element: ScreenElement) -> str: + """Generate a detailed description of a specific UI element with Claude. + + Args: + element: The ScreenElement to describe + + Returns: + str: Detailed element description + """ + # Create a system prompt for element description + system_prompt = """You are an expert UI element analyst. +Your task is to provide a detailed description of a specific UI element. +Focus on: +1. The element's type and function +2. Its visual appearance and text content +3. How a user might interact with it +4. Its likely purpose in the interface +5. Any accessibility considerations + +Be detailed but concise.""" + + # Create element details in JSON + element_json = json.dumps({ + "content": element.content, + "type": element.type, + "bounds": { + "x1": element.x1, + "y1": element.y1, + "x2": element.x2, + "y2": element.y2, + "width": element.width, + "height": element.height + }, + "center": { + "x": element.center_x, + "y": element.center_y + }, + "confidence": element.confidence + }, indent=2) + + # Generate a prompt with the element data + prompt = f""" +Please analyze this UI element and provide a detailed description: + +```json +{element_json} +``` + +Describe what this element is, what it does, and how a user might interact with it. +""" + + # Get response from Claude using Anthropic SDK + client = Anthropic(api_key=self.claude_api_key) + response = client.messages.create( + model=config.CLAUDE_MODEL, + max_tokens=1000, + system=system_prompt, + messages=[ + {"role": "user", "content": prompt} + ] + ).content[0].text + + return response + + def prompt_claude(self, prompt: str, system_prompt: Optional[str] = None) -> str: + """Prompt Claude with the current visual state. + + Args: + prompt: User prompt + system_prompt: Optional system prompt + + Returns: + str: Claude's response + """ + if not self.claude_api_key or self.claude_api_key == "": + logger.warning("Claude API key not set in config or constructor") + + # Update visual state + self.update_visual_state() + + # Create Claude prompt + mcp_description = self.visual_state.to_mcp_description(self.use_normalized_coordinates) + + full_prompt = f""" +Here is a description of the current screen state: +```json +{mcp_description} +``` + +Based on this screen state, {prompt} +""" + + # Default system prompt if not provided + if not system_prompt: + system_prompt = """You are an expert UI assistant that helps users navigate applications. +You have access to a structured description of the current screen through the Model Context Protocol. +Analyze the UI elements and provide clear, concise guidance based on the current screen state.""" + + # Get response from Claude using Anthropic SDK + client = Anthropic(api_key=self.claude_api_key) + response = client.messages.create( + model=config.CLAUDE_MODEL, + max_tokens=1000, + system=system_prompt, + messages=[ + {"role": "user", "content": full_prompt} + ] + ).content[0].text + + return response + + def execute_natural_language_request(self, request: str) -> str: + """Execute a natural language request by prompting Claude and taking action. + + Args: + request: Natural language request + + Returns: + str: Result description + """ + # Update visual state + self.update_visual_state() + + # Create coordinate format string + coord_format = "normalized (0-1)" if self.use_normalized_coordinates else "absolute (pixels)" + + # Create specialized system prompt for action execution + system_prompt = f"""You are an expert UI automation assistant that helps users control applications. +You have access to a structured description of the current screen through the Model Context Protocol. +Analyze the UI elements and decide what action to take to fulfill the user's request. + +You MUST respond with a JSON object containing the action to perform in the following format: +{{ + "action": "click" | "type" | "press" | "describe", + "params": {{ + // For click action: + "element_content": "text to find", // or + "x": 0.5, // {coord_format} + "y": 0.5, // {coord_format} + "button": "left" | "right" | "middle", + + // For type action: + "text": "text to type", + + // For press action: + "key": "enter" | "tab" | "escape" | etc., + + // For describe action (no additional params needed) + }}, + "reasoning": "Brief explanation of why you chose this action" +}} + +Only return valid JSON. Do not include any other text in your response.""" + + # Prompt Claude for action decision + response = self.prompt_claude( + prompt=f"decide what action to perform to fulfill this request: '{request}'", + system_prompt=system_prompt + ) + + # Parse response + try: + action_data = json.loads(response) + action_type = action_data.get("action", "") + params = action_data.get("params", {}) + reasoning = action_data.get("reasoning", "No reasoning provided") + + logger.info(f"Action: {action_type}, Params: {params}, Reasoning: {reasoning}") + + # Execute action + if action_type == "click": + if "element_content" in params: + success = self.click_element( + params["element_content"], + params.get("button", "left"), + True + ) + if success: + return f"Clicked element: {params['element_content']}" + else: + return f"Failed to find element: {params['element_content']}" + elif "x" in params and "y" in params: + self.click( + params["x"], + params["y"], + params.get("button", "left") + ) + return f"Clicked at coordinates ({params['x']}, {params['y']})" + elif action_type == "type": + self.type_text(params.get("text", "")) + return f"Typed text: {params.get('text', '')}" + elif action_type == "press": + self.press_key(params.get("key", "")) + return f"Pressed key: {params.get('key', '')}" + elif action_type == "describe": + # Just return the reasoning as the description + return reasoning + else: + return f"Unknown action type: {action_type}" + except json.JSONDecodeError: + logger.error(f"Failed to parse Claude response as JSON: {response}") + return "Failed to parse action from Claude response" + except Exception as e: + logger.error(f"Error executing action: {e}") + return f"Error executing action: {str(e)}" + + def run_interactive(self): + """Run command-line interface (CLI) mode. + + This provides a simple prompt where users can enter natural language commands. + Each command is processed by taking a screenshot, analyzing it with OmniParser, + and using Claude to determine and execute the appropriate action. + """ + logger.info("Starting OmniMCP CLI mode") + logger.info(f"Coordinate mode: {'normalized (0-1)' if self.use_normalized_coordinates else 'absolute (pixels)'}") + logger.info("Type 'exit' or 'quit' to exit") + + while True: + request = input("\nEnter command: ") + if request.lower() in ("exit", "quit"): + break + + result = self.execute_natural_language_request(request) + print(f"Result: {result}") + + # Give some time for UI to update before next request + time.sleep(1) + + def save_visual_debug(self, output_path: Optional[str] = None, debug_dir: Optional[str] = None) -> str: + """Save visualization of current visual state for debugging. + + Args: + output_path: Path to save the image. If None, generates a timestamped filename. + debug_dir: Directory to save debug files. If None, uses ~/omnimcp_debug + + Returns: + str: Path to the saved image + """ + # Update visual state + self.update_visual_state() + + # Generate timestamped filename if not provided + if output_path is None: + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + + # Use provided debug directory or default + if debug_dir is None: + debug_dir = os.path.join(os.path.expanduser("~"), "omnimcp_debug") + + # Ensure directory exists + os.makedirs(debug_dir, exist_ok=True) + + # Create filename with timestamp + output_path = os.path.join(debug_dir, f"debug_{timestamp}.png") + + # Create visualization and save + vis_img = self.visual_state.visualize() + vis_img.save(output_path) + logger.info(f"Saved visual debug to {output_path}") + + return output_path + + def run_mcp_server(self): + """Run the MCP server for this OmniMCP instance.""" + from omnimcp.mcp.server import create_omnimcp_server + + server = create_omnimcp_server(self) + server.run() + + async def run_mcp_server_async(self): + """Run the MCP server asynchronously.""" + from omnimcp.mcp.server import create_omnimcp_server + + server = create_omnimcp_server(self) + await server.run_async() + + +def main(): + """Main entry point.""" + fire.Fire(OmniMCP) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/omnimcp/omnimcp/pathing.py b/omnimcp/omnimcp/pathing.py new file mode 100644 index 000000000..a4c07f984 --- /dev/null +++ b/omnimcp/omnimcp/pathing.py @@ -0,0 +1,31 @@ +"""Setup Python path to include OpenAdapt modules.""" + +import os +import sys + +def ensure_openadapt_in_path(): + """ + Add the OpenAdapt parent directory to sys.path so we can import modules. + + This function ensures that the OpenAdapt modules can be imported without + requiring a full OpenAdapt installation. + """ + # Add the OpenAdapt parent directory to sys.path + parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) + if parent_dir not in sys.path: + sys.path.insert(0, parent_dir) + print(f"Added {parent_dir} to Python path") + + # Test if openadapt is importable now + try: + import openadapt + return True + except ImportError as e: + print(f"Error importing OpenAdapt modules: {e}") + print(f"Current sys.path: {sys.path}") + print(f"Looking for OpenAdapt in: {parent_dir}") + print("Make sure you are running this from within the OpenAdapt repository") + raise + +# Automatically configure path when this module is imported +ensure_openadapt_in_path() \ No newline at end of file diff --git a/omnimcp/omnimcp/run_omnimcp.py b/omnimcp/omnimcp/run_omnimcp.py new file mode 100644 index 000000000..e9d008534 --- /dev/null +++ b/omnimcp/omnimcp/run_omnimcp.py @@ -0,0 +1,215 @@ +"""Entry point for OmniMCP CLI. + +This module provides a command-line interface for OmniMCP, allowing you to run +it in various modes (CLI, MCP server, debug visualizations). +""" + +import datetime +import fire +import os +from loguru import logger + +# Setup path to include OpenAdapt modules +from . import pathing +from .omnimcp import OmniMCP +from .config import config + + +class OmniMCPRunner: + """OmniMCP runner with different modes of operation.""" + + def cli( + self, + server_url=None, + claude_api_key=None, + use_normalized_coordinates=False, + debug_dir=None, + allow_no_parser=False, + auto_deploy_parser=True, + skip_confirmation=False + ): + """Run OmniMCP in CLI mode. + + In CLI mode, you can enter natural language commands directly in the terminal. + OmniMCP will: + 1. Take a screenshot + 2. Analyze it with OmniParser to identify UI elements + 3. Use Claude to decide what action to take based on your command + 4. Execute the action (click, type, etc.) + + This mode is convenient for testing and doesn't require Claude Desktop. + + Args: + server_url: URL of the OmniParser server + claude_api_key: Claude API key (if not provided, uses value from config.py) + use_normalized_coordinates: Use normalized (0-1) coordinates instead of pixels + debug_dir: Directory to save debug visualizations + allow_no_parser: If True, continue even if OmniParser is not available + auto_deploy_parser: If True, attempt to deploy OmniParser if not available (default: True) + skip_confirmation: If True, skip user confirmation for OmniParser deployment + """ + # Create OmniMCP instance + omnimcp = OmniMCP( + server_url=server_url, + claude_api_key=claude_api_key, # Will use config.ANTHROPIC_API_KEY if None + use_normalized_coordinates=use_normalized_coordinates, + allow_no_parser=allow_no_parser, + auto_deploy_parser=auto_deploy_parser, + skip_confirmation=skip_confirmation + ) + + # Handle debug directory if specified + if debug_dir: + os.makedirs(debug_dir, exist_ok=True) + + # Take initial screenshot and save debug visualization + logger.info(f"Saving debug visualization to {debug_dir}") + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + debug_path = os.path.join(debug_dir, f"initial_state_{timestamp}.png") + omnimcp.update_visual_state() + omnimcp.save_visual_debug(debug_path) + + logger.info("Starting OmniMCP in CLI mode") + logger.info(f"Coordinate mode: {'normalized (0-1)' if use_normalized_coordinates else 'absolute (pixels)'}") + + # Run CLI interaction loop + omnimcp.run_interactive() + + def server( + self, + server_url=None, + claude_api_key=None, + use_normalized_coordinates=False, + debug_dir=None, + allow_no_parser=False, + auto_deploy_parser=True, + skip_confirmation=False + ): + """Run OmniMCP as an MCP server. + + In server mode, OmniMCP provides UI automation tools to Claude through the + Model Control Protocol. The server exposes tools for: + 1. Getting the current screen state with UI elements + 2. Finding UI elements by description + 3. Clicking on elements or coordinates + 4. Typing text and pressing keys + + To use with Claude Desktop: + 1. Configure Claude Desktop to use this server + 2. Ask Claude to perform UI tasks + + Args: + server_url: URL of the OmniParser server + claude_api_key: Claude API key (if not provided, uses value from config.py) + use_normalized_coordinates: Use normalized (0-1) coordinates instead of pixels + debug_dir: Directory to save debug visualizations + allow_no_parser: If True, continue even if OmniParser is not available + auto_deploy_parser: If True, attempt to deploy OmniParser if not available (default: True) + skip_confirmation: If True, skip user confirmation for OmniParser deployment + """ + # Create OmniMCP instance + omnimcp = OmniMCP( + server_url=server_url, + claude_api_key=claude_api_key, # Will use config.ANTHROPIC_API_KEY if None + use_normalized_coordinates=use_normalized_coordinates, + allow_no_parser=allow_no_parser, + auto_deploy_parser=auto_deploy_parser, + skip_confirmation=skip_confirmation + ) + + # Handle debug directory if specified + if debug_dir: + os.makedirs(debug_dir, exist_ok=True) + + # Take initial screenshot and save debug visualization + logger.info(f"Saving debug visualization to {debug_dir}") + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + debug_path = os.path.join(debug_dir, f"initial_state_{timestamp}.png") + omnimcp.update_visual_state() + omnimcp.save_visual_debug(debug_path) + + logger.info("Starting OmniMCP Model Control Protocol server") + logger.info(f"Coordinate mode: {'normalized (0-1)' if use_normalized_coordinates else 'absolute (pixels)'}") + + # Run MCP server + omnimcp.run_mcp_server() + + def debug( + self, + server_url=None, + claude_api_key=None, + use_normalized_coordinates=False, + debug_dir=None, + allow_no_parser=False, + auto_deploy_parser=True, + skip_confirmation=False + ): + """Run OmniMCP in debug mode. + + Debug mode takes a screenshot, analyzes it with OmniParser, and saves + a visualization showing the detected UI elements with their descriptions. + + This is useful for: + - Understanding what UI elements OmniParser detects + - Debugging issues with element detection + - Fine-tuning OmniParser integration + + Args: + server_url: URL of the OmniParser server + claude_api_key: Claude API key (if not provided, uses value from config.py) + use_normalized_coordinates: Use normalized (0-1) coordinates instead of pixels + debug_dir: Directory to save debug visualizations + allow_no_parser: If True, continue even if OmniParser is not available + auto_deploy_parser: If True, attempt to deploy OmniParser if not available (default: True) + skip_confirmation: If True, skip user confirmation for OmniParser deployment + """ + # Create OmniMCP instance + omnimcp = OmniMCP( + server_url=server_url, + claude_api_key=claude_api_key, # Will use config.ANTHROPIC_API_KEY if None + use_normalized_coordinates=use_normalized_coordinates, + allow_no_parser=allow_no_parser, + auto_deploy_parser=auto_deploy_parser, + skip_confirmation=skip_confirmation + ) + + # Create debug directory if not specified + if not debug_dir: + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + debug_dir = os.path.join(os.path.expanduser("~"), "omnimcp_debug", f"debug_{timestamp}") + + os.makedirs(debug_dir, exist_ok=True) + logger.info(f"Saving debug visualization to {debug_dir}") + + # Generate debug filename + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + debug_path = os.path.join(debug_dir, f"screen_state_{timestamp}.png") + + # Update visual state and save debug + logger.info("Taking screenshot and analyzing with OmniParser...") + omnimcp.update_visual_state() + omnimcp.save_visual_debug(debug_path) + logger.info(f"Saved debug visualization to {debug_path}") + + # Print some stats about detected elements + num_elements = len(omnimcp.visual_state.elements) + logger.info(f"Detected {num_elements} UI elements") + + if num_elements > 0: + # Show a few example elements + logger.info("Example elements:") + for i, element in enumerate(omnimcp.visual_state.elements[:5]): + content = element.content[:50] + "..." if len(element.content) > 50 else element.content + logger.info(f" {i+1}. '{content}' at ({element.x1},{element.y1},{element.x2},{element.y2})") + + if num_elements > 5: + logger.info(f" ... and {num_elements - 5} more elements") + + +def main(): + """Main entry point for OmniMCP.""" + fire.Fire(OmniMCPRunner) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/omnimcp/omnimcp/utils.py b/omnimcp/omnimcp/utils.py new file mode 100644 index 000000000..51786b2d1 --- /dev/null +++ b/omnimcp/omnimcp/utils.py @@ -0,0 +1,43 @@ +"""Minimal utilities needed for OmniMCP. + +This module provides standalone implementations of essential utility functions +with lazy imports to minimize dependencies. +""" + +def take_screenshot(): + """Take a screenshot of the entire screen. + + Returns: + PIL.Image.Image: The screenshot image. + """ + # Lazy imports to minimize dependencies + from PIL import Image + import mss + + # Create an mss instance for screenshot capture + with mss.mss() as sct: + # monitor 0 is the entire screen + monitor = sct.monitors[0] + sct_img = sct.grab(monitor) + # Convert to PIL Image + image = Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX") + + return image + + +def get_monitor_dims(): + """Get the dimensions of the primary monitor. + + Returns: + tuple[int, int]: The width and height of the monitor. + """ + # Lazy import to minimize dependencies + import mss + + # Create an mss instance to get monitor info + with mss.mss() as sct: + monitor = sct.monitors[0] + monitor_width = monitor["width"] + monitor_height = monitor["height"] + + return monitor_width, monitor_height \ No newline at end of file diff --git a/omnimcp/pyproject.toml b/omnimcp/pyproject.toml new file mode 100644 index 000000000..b3e71c26e --- /dev/null +++ b/omnimcp/pyproject.toml @@ -0,0 +1,38 @@ +[build-system] +requires = ["setuptools>=42", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "omnimcp" +version = "0.1.0" +description = "OmniMCP - Model Control Protocol for UI Automation" +readme = "README.md" +requires-python = ">=3.10,<3.12" +license = {text = "MIT"} +authors = [ + {name = "Richard Abrich", email = "richard@openadapt.ai"} +] + +dependencies = [ + "pynput>=1.7.6", # Keyboard and mouse control + "pillow>=10.0.0", # Image processing + "fire>=0.4.0", # CLI functionality + "anthropic>=0.42.0", # Claude API + "loguru>=0.6.0", # Logging + "mcp>=0.9.0", # Model Control Protocol + "requests>=2.31.0", # HTTP requests for OmniParser + "mss>=6.1.0", # Screen capture + "jinja2>=3.0.0", # For templating + "posthog>=2.0.0", # For analytics + "multiprocessing-utils>=0.1.0", # For process-local storage + "numpy>=1.21.0", # For array operations + "orjson>=3.8.0", # For fast JSON handling + "dictalchemy3>=1.0.0", # For SQLAlchemy dict utils + "joblib>=1.2.0", # For caching + "boto3>=1.26.0", # For AWS services # For AWS SDK + "botocore>=1.29.0", + "paramiko>=3.5.1", +] + +[project.scripts] +omnimcp = "omnimcp.run_omnimcp:main" diff --git a/omnimcp/setup.py b/omnimcp/setup.py new file mode 100644 index 000000000..2b65916c4 --- /dev/null +++ b/omnimcp/setup.py @@ -0,0 +1,19 @@ +import os +import sys +from setuptools import setup, find_packages + +# Add the parent directory to sys.path to allow imports from OpenAdapt +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +# The actual dependencies are defined in pyproject.toml +# This setup.py file exists mainly to add OpenAdapt to the Python path +setup( + packages=find_packages(), + # Entry points for CLI commands + entry_points={ + 'console_scripts': [ + 'omnimcp=omnimcp.run_omnimcp:main', + 'computer-use=omnimcp.computer_use:main', + ], + }, +) \ No newline at end of file diff --git a/openadapt/adapters/__init__.py b/openadapt/adapters/__init__.py index c123eafe1..5ed39f52d 100644 --- a/openadapt/adapters/__init__.py +++ b/openadapt/adapters/__init__.py @@ -4,7 +4,9 @@ from openadapt.config import config -from . import prompt, replicate, som, ultralytics +# Lazy imports when required instead of importing everything +# Add omniparser which is needed for OmniMCP +from . import omniparser # TODO: remove diff --git a/openadapt/mcp/__init__.py b/openadapt/mcp/__init__.py new file mode 100644 index 000000000..247248fe5 --- /dev/null +++ b/openadapt/mcp/__init__.py @@ -0,0 +1 @@ +"""Model Control Protocol (MCP) implementation for OpenAdapt.""" \ No newline at end of file diff --git a/openadapt/models.py b/openadapt/models.py index 03b60329e..055de5fcb 100644 --- a/openadapt/models.py +++ b/openadapt/models.py @@ -9,7 +9,7 @@ import sys import textwrap -from bs4 import BeautifulSoup +# Lazy import BeautifulSoup when needed from pynput import keyboard from PIL import Image, ImageChops import numpy as np @@ -790,7 +790,7 @@ def __str__(self) -> str: # Return the complete representation including the truncated message return f"BrowserEvent({base_repr}, message={message_copy})" - def parse(self) -> tuple[BeautifulSoup, BeautifulSoup | None]: + def parse(self) -> "tuple['BeautifulSoup', 'BeautifulSoup | None']": """Parses the visible HTML and optionally extracts the target element. This method processes the browser event to parse the visible HTML and, @@ -798,8 +798,8 @@ def parse(self) -> tuple[BeautifulSoup, BeautifulSoup | None]: Returns: A tuple containing: - - BeautifulSoup: The parsed soup of the visible HTML. - - BeautifulSoup | None: The target HTML element if the event type is + - bs4.BeautifulSoup: The parsed soup of the visible HTML. + - bs4.BeautifulSoup | None: The target HTML element if the event type is "click"; otherwise, None. Raises: diff --git a/openadapt/run_omnimcp.py b/openadapt/run_omnimcp.py new file mode 100644 index 000000000..0153ef723 --- /dev/null +++ b/openadapt/run_omnimcp.py @@ -0,0 +1,259 @@ +"""Run OmniMCP with Model Control Protocol. + +This script provides a user-friendly interface to run OmniMCP in different modes. + +OmniMCP combines OmniParser (for visual UI understanding) with the Model Control +Protocol (MCP) to enable Claude to control the computer through natural language. + +Usage: +------ + # Run CLI mode (direct command input) + python -m openadapt.run_omnimcp cli + + # Run MCP server (for Claude Desktop) + python -m openadapt.run_omnimcp server + + # Run in debug mode to visualize screen elements + python -m openadapt.run_omnimcp debug + + # Run with custom OmniParser server URL + python -m openadapt.run_omnimcp server --server-url=http://your-server:8000 + + # Use normalized coordinates (0-1) instead of absolute pixels + python -m openadapt.run_omnimcp cli --use-normalized-coordinates + + # Save debug visualization to specific directory + python -m openadapt.run_omnimcp debug --debug-dir=/path/to/debug/folder + +Components: +---------- +1. OmniParser Client (adapters/omniparser.py): + - Connects to the OmniParser server running on AWS + - Parses screenshots to identify UI elements + +2. OmniMCP Core (omnimcp.py): + - Manages the visual state of the screen + - Provides UI interaction methods (click, type, etc.) + - Implements natural language understanding with Claude + +3. MCP Server (mcp/server.py): + - Implements the Model Control Protocol server + - Exposes UI automation tools to Claude +""" + +import datetime +import os +import sys + +import fire + +from openadapt.omnimcp import OmniMCP +from openadapt.custom_logger import logger + +# TODO: Consider Anthropic ComputerUse integration +# Anthropic's ComputerUse (https://docs.anthropic.com/en/docs/agents-and-tools/computer-use) +# provides an official approach for Claude to control computers. While OmniMCP already +# implements a similar agent loop pattern, future work could: +# 1. Refine our existing agent loop to better align with ComputerUse's approach +# 2. Support Anthropic's containerized environment as a deployment option +# 3. Offer compatibility with Anthropic's official computer control tools + + +class OmniMCPRunner: + """OmniMCP runner with different modes of operation.""" + + def cli( + self, + server_url=None, + claude_api_key=None, + use_normalized_coordinates=False, + debug_dir=None, + allow_no_parser=False, + auto_deploy_parser=True, + skip_confirmation=False + ): + """Run OmniMCP in CLI mode. + + In CLI mode, you can enter natural language commands directly in the terminal. + OmniMCP will: + 1. Take a screenshot + 2. Analyze it with OmniParser to identify UI elements + 3. Use Claude to decide what action to take based on your command + 4. Execute the action (click, type, etc.) + + This mode is convenient for testing and doesn't require Claude Desktop. + + Args: + server_url: URL of the OmniParser server + claude_api_key: Claude API key (if not provided, uses value from config.py) + use_normalized_coordinates: Use normalized (0-1) coordinates instead of pixels + debug_dir: Directory to save debug visualizations + allow_no_parser: If True, continue even if OmniParser is not available + auto_deploy_parser: If True, attempt to deploy OmniParser if not available (default: True) + skip_confirmation: If True, skip user confirmation for OmniParser deployment + """ + # Create OmniMCP instance + omnimcp = OmniMCP( + server_url=server_url, + claude_api_key=claude_api_key, # Will use config.ANTHROPIC_API_KEY if None + use_normalized_coordinates=use_normalized_coordinates, + allow_no_parser=allow_no_parser, + auto_deploy_parser=auto_deploy_parser, + skip_confirmation=skip_confirmation + ) + + # Handle debug directory if specified + if debug_dir: + os.makedirs(debug_dir, exist_ok=True) + + # Take initial screenshot and save debug visualization + logger.info(f"Saving debug visualization to {debug_dir}") + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + debug_path = os.path.join(debug_dir, f"initial_state_{timestamp}.png") + omnimcp.update_visual_state() + omnimcp.save_visual_debug(debug_path) + + logger.info("Starting OmniMCP in CLI mode") + logger.info(f"Coordinate mode: {'normalized (0-1)' if use_normalized_coordinates else 'absolute (pixels)'}") + + # Run CLI interaction loop + omnimcp.run_interactive() + + def server( + self, + server_url=None, + claude_api_key=None, + use_normalized_coordinates=False, + debug_dir=None, + allow_no_parser=False, + auto_deploy_parser=True, + skip_confirmation=False + ): + """Run OmniMCP as an MCP server. + + In server mode, OmniMCP provides UI automation tools to Claude through the + Model Control Protocol. The server exposes tools for: + 1. Getting the current screen state with UI elements + 2. Finding UI elements by description + 3. Clicking on elements or coordinates + 4. Typing text and pressing keys + + To use with Claude Desktop: + 1. Configure Claude Desktop to use this server + 2. Ask Claude to perform UI tasks + + Args: + server_url: URL of the OmniParser server + claude_api_key: Claude API key (if not provided, uses value from config.py) + use_normalized_coordinates: Use normalized (0-1) coordinates instead of pixels + debug_dir: Directory to save debug visualizations + allow_no_parser: If True, continue even if OmniParser is not available + auto_deploy_parser: If True, attempt to deploy OmniParser if not available (default: True) + skip_confirmation: If True, skip user confirmation for OmniParser deployment + """ + # Create OmniMCP instance + omnimcp = OmniMCP( + server_url=server_url, + claude_api_key=claude_api_key, # Will use config.ANTHROPIC_API_KEY if None + use_normalized_coordinates=use_normalized_coordinates, + allow_no_parser=allow_no_parser, + auto_deploy_parser=auto_deploy_parser, + skip_confirmation=skip_confirmation + ) + + # Handle debug directory if specified + if debug_dir: + os.makedirs(debug_dir, exist_ok=True) + + # Take initial screenshot and save debug visualization + logger.info(f"Saving debug visualization to {debug_dir}") + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + debug_path = os.path.join(debug_dir, f"initial_state_{timestamp}.png") + omnimcp.update_visual_state() + omnimcp.save_visual_debug(debug_path) + + logger.info("Starting OmniMCP Model Control Protocol server") + logger.info(f"Coordinate mode: {'normalized (0-1)' if use_normalized_coordinates else 'absolute (pixels)'}") + + # Run MCP server + omnimcp.run_mcp_server() + + def debug( + self, + server_url=None, + claude_api_key=None, + use_normalized_coordinates=False, + debug_dir=None, + allow_no_parser=False, + auto_deploy_parser=True, + skip_confirmation=False + ): + """Run OmniMCP in debug mode. + + Debug mode takes a screenshot, analyzes it with OmniParser, and saves + a visualization showing the detected UI elements with their descriptions. + + This is useful for: + - Understanding what UI elements OmniParser detects + - Debugging issues with element detection + - Fine-tuning OmniParser integration + + Args: + server_url: URL of the OmniParser server + claude_api_key: Claude API key (if not provided, uses value from config.py) + use_normalized_coordinates: Use normalized (0-1) coordinates instead of pixels + debug_dir: Directory to save debug visualizations + allow_no_parser: If True, continue even if OmniParser is not available + auto_deploy_parser: If True, attempt to deploy OmniParser if not available (default: True) + skip_confirmation: If True, skip user confirmation for OmniParser deployment + """ + # Create OmniMCP instance + omnimcp = OmniMCP( + server_url=server_url, + claude_api_key=claude_api_key, # Will use config.ANTHROPIC_API_KEY if None + use_normalized_coordinates=use_normalized_coordinates, + allow_no_parser=allow_no_parser, + auto_deploy_parser=auto_deploy_parser, + skip_confirmation=skip_confirmation + ) + + # Create debug directory if not specified + if not debug_dir: + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + debug_dir = os.path.join(os.path.expanduser("~"), "omnimcp_debug", f"debug_{timestamp}") + + os.makedirs(debug_dir, exist_ok=True) + logger.info(f"Saving debug visualization to {debug_dir}") + + # Generate debug filename + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + debug_path = os.path.join(debug_dir, f"screen_state_{timestamp}.png") + + # Update visual state and save debug + logger.info("Taking screenshot and analyzing with OmniParser...") + omnimcp.update_visual_state() + omnimcp.save_visual_debug(debug_path) + logger.info(f"Saved debug visualization to {debug_path}") + + # Print some stats about detected elements + num_elements = len(omnimcp.visual_state.elements) + logger.info(f"Detected {num_elements} UI elements") + + if num_elements > 0: + # Show a few example elements + logger.info("Example elements:") + for i, element in enumerate(omnimcp.visual_state.elements[:5]): + content = element.content[:50] + "..." if len(element.content) > 50 else element.content + logger.info(f" {i+1}. '{content}' at ({element.x1},{element.y1},{element.x2},{element.y2})") + + if num_elements > 5: + logger.info(f" ... and {num_elements - 5} more elements") + + +def main(): + """Main entry point for OmniMCP.""" + fire.Fire(OmniMCPRunner) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/openadapt/utils.py b/openadapt/utils.py index 4e1a0f44a..6f878924b 100644 --- a/openadapt/utils.py +++ b/openadapt/utils.py @@ -17,11 +17,11 @@ import threading import time -from bs4 import BeautifulSoup +# BeautifulSoup import moved to parse_html function from jinja2 import Environment, FileSystemLoader from PIL import Image, ImageEnhance from posthog import Posthog -import multiprocessing_utils +# import multiprocessing_utils - moved to functions that use it from openadapt.build_utils import is_running_from_executable, redirect_stdout_stderr from openadapt.custom_logger import logger @@ -63,6 +63,7 @@ _start_perf_counter = None # Process-local storage for MSS instances +import multiprocessing_utils _process_local = multiprocessing_utils.local() @@ -1044,8 +1045,9 @@ def truncate_html(html_str: str, max_len: int) -> str: return html_str -def parse_html(html: str, parser: str = "html.parser") -> BeautifulSoup: +def parse_html(html: str, parser: str = "html.parser") -> "BeautifulSoup": """Parse the visible HTML using BeautifulSoup.""" + from bs4 import BeautifulSoup soup = BeautifulSoup(html, parser) return soup @@ -1062,6 +1064,7 @@ def get_html_prompt(html: str, convert_to_markdown: bool = False) -> str: If convert_to_markdown is True, the string is in Markdown format. """ # Parse HTML with BeautifulSoup + from bs4 import BeautifulSoup soup = BeautifulSoup(html, "html.parser") # Remove non-interactive and unnecessary elements