Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 53 additions & 136 deletions src/odoo_mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,59 @@ def search_records_resource(model_name: str, domain: str) -> str:
return json.dumps({"error": str(e)}, indent=2)


@mcp.resource(
"odoo://agent/{model}/{fields}/{domain}/{limit}",
description="Agent endpoint: lists models, fields, or records depending on parameters. Use 'none' for optional parameters."
)
def odoo_agent_resource(
model: str = "none",
fields: str = "none",
domain: str = "none",
limit: str = "10",
) -> str:
"""
Agent endpoint for Odoo:
- If no model: list all models
- If model but no fields: list fields for the model
- If model and fields: search and return records
Query params:
model: model name (optional)
fields: comma-separated field names (optional)
domain: JSON string (optional)
limit: int (optional, default 10)
"""
odoo_client = get_odoo_client()
try:
# Convert "none" to None for optional parameters
model = None if model == "none" else model
fields = None if fields == "none" else fields
domain = None if domain == "none" else domain
limit = int(limit) if limit.isdigit() else 10

if not model:
# Step 1: List all models
models = odoo_client.get_models()
return json.dumps({"step": "choose_model", "models": models}, indent=2)
if model and not fields:
# Step 2: List fields for the model
model_fields = odoo_client.get_model_fields(model)
return json.dumps({"step": "choose_fields", "fields": model_fields}, indent=2)
if model and fields:
# Step 3: Query records
field_list = [f.strip() for f in fields.split(",") if f.strip()]
domain_list = []
if domain:
try:
domain_list = json.loads(domain)
except Exception:
domain_list = []
results = odoo_client.search_read(model, domain_list, fields=field_list, limit=limit)
return json.dumps({"step": "results", "results": results}, indent=2)
return json.dumps({"error": "Invalid parameters"}, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)


# ----- Pydantic models for type safety -----


Expand Down Expand Up @@ -168,46 +221,6 @@ def to_domain_list(self) -> List[List]:
return [condition.to_tuple() for condition in self.conditions]


class EmployeeSearchResult(BaseModel):
"""Represents a single employee search result."""

id: int = Field(description="Employee ID")
name: str = Field(description="Employee name")


class SearchEmployeeResponse(BaseModel):
"""Response model for the search_employee tool."""

success: bool = Field(description="Indicates if the search was successful")
result: Optional[List[EmployeeSearchResult]] = Field(
default=None, description="List of employee search results"
)
error: Optional[str] = Field(default=None, description="Error message, if any")


class Holiday(BaseModel):
"""Represents a single holiday."""

display_name: str = Field(description="Display name of the holiday")
start_datetime: str = Field(description="Start date and time of the holiday")
stop_datetime: str = Field(description="End date and time of the holiday")
employee_id: List[Union[int, str]] = Field(
description="Employee ID associated with the holiday"
)
name: str = Field(description="Name of the holiday")
state: str = Field(description="State of the holiday")


class SearchHolidaysResponse(BaseModel):
"""Response model for the search_holidays tool."""

success: bool = Field(description="Indicates if the search was successful")
result: Optional[List[Holiday]] = Field(
default=None, description="List of holidays found"
)
error: Optional[str] = Field(default=None, description="Error message, if any")


# ----- MCP Tools -----


Expand Down Expand Up @@ -346,99 +359,3 @@ def execute_method(
return {"success": True, "result": result}
except Exception as e:
return {"success": False, "error": str(e)}


@mcp.tool(description="Search for employees by name")
def search_employee(
ctx: Context,
name: str,
limit: int = 20,
) -> SearchEmployeeResponse:
"""
Search for employees by name using Odoo's name_search method.

Parameters:
name: The name (or part of the name) to search for.
limit: The maximum number of results to return (default 20).

Returns:
SearchEmployeeResponse containing results or error information.
"""
odoo = ctx.request_context.lifespan_context.odoo
model = "hr.employee"
method = "name_search"

args = []
kwargs = {"name": name, "limit": limit}

try:
result = odoo.execute_method(model, method, *args, **kwargs)
parsed_result = [
EmployeeSearchResult(id=item[0], name=item[1]) for item in result
]
return SearchEmployeeResponse(success=True, result=parsed_result)
except Exception as e:
return SearchEmployeeResponse(success=False, error=str(e))


@mcp.tool(description="Search for holidays within a date range")
def search_holidays(
ctx: Context,
start_date: str,
end_date: str,
employee_id: Optional[int] = None,
) -> SearchHolidaysResponse:
"""
Searches for holidays within a specified date range.

Parameters:
start_date: Start date in YYYY-MM-DD format.
end_date: End date in YYYY-MM-DD format.
employee_id: Optional employee ID to filter holidays.

Returns:
SearchHolidaysResponse: Object containing the search results.
"""
odoo = ctx.request_context.lifespan_context.odoo

# Validate date format using datetime
try:
datetime.strptime(start_date, "%Y-%m-%d")
except ValueError:
return SearchHolidaysResponse(
success=False, error="Invalid start_date format. Use YYYY-MM-DD."
)
try:
datetime.strptime(end_date, "%Y-%m-%d")
except ValueError:
return SearchHolidaysResponse(
success=False, error="Invalid end_date format. Use YYYY-MM-DD."
)

# Calculate adjusted start_date (subtract one day)
start_date_dt = datetime.strptime(start_date, "%Y-%m-%d")
adjusted_start_date_dt = start_date_dt - timedelta(days=1)
adjusted_start_date = adjusted_start_date_dt.strftime("%Y-%m-%d")

# Build the domain
domain = [
"&",
["start_datetime", "<=", f"{end_date} 22:59:59"],
# Use adjusted date
["stop_datetime", ">=", f"{adjusted_start_date} 23:00:00"],
]
if employee_id:
domain.append(
["employee_id", "=", employee_id],
)

try:
holidays = odoo.search_read(
model_name="hr.leave.report.calendar",
domain=domain,
)
parsed_holidays = [Holiday(**holiday) for holiday in holidays]
return SearchHolidaysResponse(success=True, result=parsed_holidays)

except Exception as e:
return SearchHolidaysResponse(success=False, error=str(e))