From b538581d111912e7942ad4cfa48bd4af263f95f1 Mon Sep 17 00:00:00 2001 From: akrommicc Date: Sun, 6 Jul 2025 19:27:37 -0400 Subject: [PATCH] add smart agent endpoint for dynamic model/field/record --- src/odoo_mcp/server.py | 189 ++++++++++++----------------------------- 1 file changed, 53 insertions(+), 136 deletions(-) diff --git a/src/odoo_mcp/server.py b/src/odoo_mcp/server.py index e924bf0..6207866 100644 --- a/src/odoo_mcp/server.py +++ b/src/odoo_mcp/server.py @@ -138,6 +138,59 @@ def search_records_resource(model_name: str, domain: str) -> str: return json.dumps({"error": str(e)}, indent=2) +@mcp.resource( + "odoo://agent/{model}/{fields}/{domain}/{limit}", + description="Agent endpoint: lists models, fields, or records depending on parameters. Use 'none' for optional parameters." +) +def odoo_agent_resource( + model: str = "none", + fields: str = "none", + domain: str = "none", + limit: str = "10", +) -> str: + """ + Agent endpoint for Odoo: + - If no model: list all models + - If model but no fields: list fields for the model + - If model and fields: search and return records + Query params: + model: model name (optional) + fields: comma-separated field names (optional) + domain: JSON string (optional) + limit: int (optional, default 10) + """ + odoo_client = get_odoo_client() + try: + # Convert "none" to None for optional parameters + model = None if model == "none" else model + fields = None if fields == "none" else fields + domain = None if domain == "none" else domain + limit = int(limit) if limit.isdigit() else 10 + + if not model: + # Step 1: List all models + models = odoo_client.get_models() + return json.dumps({"step": "choose_model", "models": models}, indent=2) + if model and not fields: + # Step 2: List fields for the model + model_fields = odoo_client.get_model_fields(model) + return json.dumps({"step": "choose_fields", "fields": model_fields}, indent=2) + if model and fields: + # Step 3: Query records + field_list = [f.strip() for f in fields.split(",") if f.strip()] + domain_list = [] + if domain: + try: + domain_list = json.loads(domain) + except Exception: + domain_list = [] + results = odoo_client.search_read(model, domain_list, fields=field_list, limit=limit) + return json.dumps({"step": "results", "results": results}, indent=2) + return json.dumps({"error": "Invalid parameters"}, indent=2) + except Exception as e: + return json.dumps({"error": str(e)}, indent=2) + + # ----- Pydantic models for type safety ----- @@ -168,46 +221,6 @@ def to_domain_list(self) -> List[List]: return [condition.to_tuple() for condition in self.conditions] -class EmployeeSearchResult(BaseModel): - """Represents a single employee search result.""" - - id: int = Field(description="Employee ID") - name: str = Field(description="Employee name") - - -class SearchEmployeeResponse(BaseModel): - """Response model for the search_employee tool.""" - - success: bool = Field(description="Indicates if the search was successful") - result: Optional[List[EmployeeSearchResult]] = Field( - default=None, description="List of employee search results" - ) - error: Optional[str] = Field(default=None, description="Error message, if any") - - -class Holiday(BaseModel): - """Represents a single holiday.""" - - display_name: str = Field(description="Display name of the holiday") - start_datetime: str = Field(description="Start date and time of the holiday") - stop_datetime: str = Field(description="End date and time of the holiday") - employee_id: List[Union[int, str]] = Field( - description="Employee ID associated with the holiday" - ) - name: str = Field(description="Name of the holiday") - state: str = Field(description="State of the holiday") - - -class SearchHolidaysResponse(BaseModel): - """Response model for the search_holidays tool.""" - - success: bool = Field(description="Indicates if the search was successful") - result: Optional[List[Holiday]] = Field( - default=None, description="List of holidays found" - ) - error: Optional[str] = Field(default=None, description="Error message, if any") - - # ----- MCP Tools ----- @@ -346,99 +359,3 @@ def execute_method( return {"success": True, "result": result} except Exception as e: return {"success": False, "error": str(e)} - - -@mcp.tool(description="Search for employees by name") -def search_employee( - ctx: Context, - name: str, - limit: int = 20, -) -> SearchEmployeeResponse: - """ - Search for employees by name using Odoo's name_search method. - - Parameters: - name: The name (or part of the name) to search for. - limit: The maximum number of results to return (default 20). - - Returns: - SearchEmployeeResponse containing results or error information. - """ - odoo = ctx.request_context.lifespan_context.odoo - model = "hr.employee" - method = "name_search" - - args = [] - kwargs = {"name": name, "limit": limit} - - try: - result = odoo.execute_method(model, method, *args, **kwargs) - parsed_result = [ - EmployeeSearchResult(id=item[0], name=item[1]) for item in result - ] - return SearchEmployeeResponse(success=True, result=parsed_result) - except Exception as e: - return SearchEmployeeResponse(success=False, error=str(e)) - - -@mcp.tool(description="Search for holidays within a date range") -def search_holidays( - ctx: Context, - start_date: str, - end_date: str, - employee_id: Optional[int] = None, -) -> SearchHolidaysResponse: - """ - Searches for holidays within a specified date range. - - Parameters: - start_date: Start date in YYYY-MM-DD format. - end_date: End date in YYYY-MM-DD format. - employee_id: Optional employee ID to filter holidays. - - Returns: - SearchHolidaysResponse: Object containing the search results. - """ - odoo = ctx.request_context.lifespan_context.odoo - - # Validate date format using datetime - try: - datetime.strptime(start_date, "%Y-%m-%d") - except ValueError: - return SearchHolidaysResponse( - success=False, error="Invalid start_date format. Use YYYY-MM-DD." - ) - try: - datetime.strptime(end_date, "%Y-%m-%d") - except ValueError: - return SearchHolidaysResponse( - success=False, error="Invalid end_date format. Use YYYY-MM-DD." - ) - - # Calculate adjusted start_date (subtract one day) - start_date_dt = datetime.strptime(start_date, "%Y-%m-%d") - adjusted_start_date_dt = start_date_dt - timedelta(days=1) - adjusted_start_date = adjusted_start_date_dt.strftime("%Y-%m-%d") - - # Build the domain - domain = [ - "&", - ["start_datetime", "<=", f"{end_date} 22:59:59"], - # Use adjusted date - ["stop_datetime", ">=", f"{adjusted_start_date} 23:00:00"], - ] - if employee_id: - domain.append( - ["employee_id", "=", employee_id], - ) - - try: - holidays = odoo.search_read( - model_name="hr.leave.report.calendar", - domain=domain, - ) - parsed_holidays = [Holiday(**holiday) for holiday in holidays] - return SearchHolidaysResponse(success=True, result=parsed_holidays) - - except Exception as e: - return SearchHolidaysResponse(success=False, error=str(e))