Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 112 additions & 29 deletions jmeter_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
import subprocess
from pathlib import Path
from mcp.server.fastmcp import FastMCP
#from fastapi.middleware.cors import CORSMiddleware
#from starlette.middleware.base import BaseHTTPMiddleware
import os
import datetime
import uuid
import logging
import logging
import shutil
from dotenv import load_dotenv

# Configure logging
Expand All @@ -16,8 +19,10 @@
)
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv()
# Load environment variables from .env and jmeter.env
#load_dotenv()
load_dotenv('jmeter.env', override=True)


# Initialize MCP server
mcp = FastMCP("jmeter")
Expand Down Expand Up @@ -47,40 +52,64 @@ async def run_jmeter(test_file: str, non_gui: bool = True, properties: dict = No
return f"Error: Invalid file type. Expected .jmx file: {test_file}"

# Get JMeter binary path from environment
jmeter_bin = os.getenv('JMETER_BIN', 'jmeter')
jmeter_bin_env = os.getenv('JMETER_BIN', 'jmeter')
jmeter_bin_env = jmeter_bin_env.strip('"').rstrip("\\/") # clean quotes and slashes
java_opts = os.getenv('JMETER_JAVA_OPTS', '')

# Log the JMeter binary path and Java options
logger.info(f"JMeter binary path: {jmeter_bin}")
# Determine the actual JMeter binary path
jmeter_path = Path(jmeter_bin_env)

def get_jmeter_cmd_path(jmeter_path: Path):
if jmeter_path.is_file():
# JMETER_BIN points directly to the batch/script
return jmeter_path

elif jmeter_path.is_dir():
exe_name = "jmeter.bat" if os.name == 'nt' else "jmeter.sh"

# If path ends with \bin, don't add another \bin
if jmeter_path.name.lower() == "bin":
return jmeter_path / exe_name
else:
return jmeter_path / "bin" / exe_name
else:
logger.error(f"Invalid JMETER_BIN path: {jmeter_path}")
return f"Error: Invalid JMETER_BIN path: {jmeter_path}"

jmeter_cmd_path = get_jmeter_cmd_path(jmeter_path)

# Log paths
logger.info(f"JMeter binary env: {jmeter_bin_env}")
logger.info(f"JMeter resolved path: {jmeter_cmd_path}")
logger.debug(f"Java options: {java_opts}")

# Build command
cmd = [str(Path(jmeter_bin).resolve())]
# Build the command
cmd = [str(jmeter_cmd_path.resolve())]

if non_gui:
cmd.extend(['-n'])
cmd.extend(['-t', str(test_file_path)])
# Add JMeter properties if provided

# Add JMeter properties if provided
if properties:
for prop_name, prop_value in properties.items():
cmd.extend([f'-J{prop_name}={prop_value}'])
logger.debug(f"Adding property: -J{prop_name}={prop_value}")

# Add report generation options if requested
if generate_report and non_gui:
if log_file is None:
# Generate unique log file name if not specified
unique_id = generate_unique_id()
log_file = f"{test_file_path.stem}_{unique_id}_results.jtl"
logger.debug(f"Using generated unique log file: {log_file}")

cmd.extend(['-l', log_file])
cmd.extend(['-e'])

# Always ensure report_output_dir is unique
unique_id = unique_id if 'unique_id' in locals() else generate_unique_id()

if report_output_dir:
# Append unique identifier to user-provided report directory
original_dir = report_output_dir
Expand All @@ -90,26 +119,61 @@ async def run_jmeter(test_file: str, non_gui: bool = True, properties: dict = No
# Generate unique report output directory if not specified
report_output_dir = f"{test_file_path.stem}_{unique_id}_report"
logger.debug(f"Using generated unique report output directory: {report_output_dir}")

cmd.extend(['-o', report_output_dir])

# Log the full command for debugging
logger.debug(f"Executing command: {' '.join(cmd)}")

logger.info(f"DEBUG CMD REPR: {repr(cmd)}")
logger.info(f"DEBUG CMD TYPE: {type(cmd)} {[type(x) for x in cmd]}")
logger.info(f"DEBUG CMD+SHELL: {repr(['cmd', '/c'] + cmd)}")
#logger.info(f"DEBUG CWD: {repr(str(Path(jmeter_bin_dir).resolve().parent))}")
logger.info(f"DEBUG CWD: {repr(str(jmeter_cmd_path.resolve().parent))}")

if non_gui:
# For non-GUI mode, capture output
result = subprocess.run(cmd, capture_output=True, text=True)

# Log output for debugging
logger.debug("Command output:")
logger.debug(f"Return code: {result.returncode}")
logger.debug(f"Stdout: {result.stdout}")
logger.debug(f"Stderr: {result.stderr}")

if result.returncode != 0:
return f"Error executing JMeter test:\n{result.stderr}"

return result.stdout
# result = subprocess.run(' '.join(cmd), capture_output=True, text=True, shell=True)
import platform
logger.info(f"About to run JMeter command: {' '.join(cmd)}")
try:
if os.name == "nt":
proc_cmd = ["cmd", "/c"] + cmd
else:
# On Unix, ensure script is executable
jmeter_exec = str(jmeter_cmd_path.resolve())
if not os.access(jmeter_exec, os.X_OK):
import stat
os.chmod(jmeter_exec, os.stat(jmeter_exec).st_mode | stat.S_IEXEC)
proc_cmd = cmd

result = subprocess.run(
proc_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=120,
cwd=str(jmeter_cmd_path.resolve().parent)
)
logger.info("JMeter subprocess finished.")

logger.debug("Command output:")
logger.debug(f"Return code: {result.returncode}")
logger.debug(f"Stdout: {result.stdout}")
logger.debug(f"Stderr: {result.stderr}")

if result.returncode != 0:
logger.error(f"JMeter execution failed with return code {result.returncode}")
logger.error(f"Stderr:\n{result.stderr}")
return f"JMeter execution failed!\n\nStderr:\n{result.stderr}\n\nStdout:\n{result.stdout}"

logger.info("JMeter executed successfully.")
logger.info(f"Stdout:\n{result.stdout}")
return f"JMeter test completed successfully.\n\nStdout:\n{result.stdout}"

except Exception as e:
logger.error(f"Subprocess error: {e}")
return f"Subprocess error: {e}"

else:
# For GUI mode, start process without capturing output
subprocess.Popen(cmd)
Expand Down Expand Up @@ -590,6 +654,25 @@ def generate_unique_id():
random_id = str(uuid.uuid4())[:8] # Use first 8 chars of UUID for brevity
return f"{timestamp}_{random_id}"

# Add CSP & CORS middleware
#mcp.app.add_middleware(
# CORSMiddleware,
# allow_origins=["*"], # Adjust to match your use case
# allow_credentials=True,
# allow_methods=["*"],
# allow_headers=["*"],
#)

#class CSPMiddleware(BaseHTTPMiddleware):
# async def dispatch(self, request, call_next):
# response = await call_next(request)
# response.headers["Content-Security-Policy"] = (
# "default-src 'self'; "
# "connect-src 'self' http://localhost:8000 ws://localhost:8000;"
# )
# return response

#mcp.app.add_middleware(CSPMiddleware)

if __name__ == "__main__":
mcp.run(transport='stdio')
mcp.run(transport='stdio')