-
Notifications
You must be signed in to change notification settings - Fork 785
Expand file tree
/
Copy pathfind_in_file.py
More file actions
181 lines (150 loc) · 6.37 KB
/
find_in_file.py
File metadata and controls
181 lines (150 loc) · 6.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import base64
import os
import re
from typing import Annotated, Any
from urllib.parse import unquote, urlparse
from fastmcp import Context
from mcp.types import ToolAnnotations
from services.registry import mcp_for_unity_tool
from services.tools import get_unity_instance_from_context
from transport.unity_transport import send_with_unity_instance
from transport.legacy.unity_connection import async_send_command_with_retry
def _split_uri(uri: str) -> tuple[str, str]:
"""Split an incoming URI or path into (name, directory) suitable for Unity.
Rules:
- unity://path/Assets/... → keep as Assets-relative (after decode/normalize)
- file://... → percent-decode, normalize, strip host and leading slashes,
then, if any 'Assets' segment exists, return path relative to that 'Assets' root.
Otherwise, fall back to original name/dir behavior.
- plain paths → decode/normalize separators; if they contain an 'Assets' segment,
return relative to 'Assets'.
"""
raw_path: str
if uri.startswith("unity://path/"):
raw_path = uri[len("unity://path/"):]
elif uri.startswith("file://"):
parsed = urlparse(uri)
host = (parsed.netloc or "").strip()
p = parsed.path or ""
# UNC: file://server/share/... -> //server/share/...
if host and host.lower() != "localhost":
p = f"//{host}{p}"
# Use percent-decoded path, preserving leading slashes
raw_path = unquote(p)
else:
raw_path = uri
# Percent-decode any residual encodings and normalize separators
raw_path = unquote(raw_path).replace("\\", "/")
# Strip leading slash only for Windows drive-letter forms like "/C:/..."
if os.name == "nt" and len(raw_path) >= 3 and raw_path[0] == "/" and raw_path[2] == ":":
raw_path = raw_path[1:]
# Normalize path (collapse ../, ./)
norm = os.path.normpath(raw_path).replace("\\", "/")
# If an 'Assets' segment exists, compute path relative to it (case-insensitive)
parts = [p for p in norm.split("/") if p not in ("", ".")]
idx = next((i for i, seg in enumerate(parts)
if seg.lower() == "assets"), None)
assets_rel = "/".join(parts[idx:]) if idx is not None else None
effective_path = assets_rel if assets_rel else norm
# For POSIX absolute paths outside Assets, drop the leading '/'
# to return a clean relative-like directory (e.g., '/tmp' -> 'tmp').
if effective_path.startswith("/"):
effective_path = effective_path[1:]
name = os.path.splitext(os.path.basename(effective_path))[0]
directory = os.path.dirname(effective_path)
return name, directory
@mcp_for_unity_tool(
description="Searches a file with a regex pattern and returns line numbers and excerpts.",
annotations=ToolAnnotations(
title="Find in File",
readOnlyHint=True,
),
)
async def find_in_file(
ctx: Context,
uri: Annotated[str, "The resource URI to search under Assets/ or file path form supported by read_resource"],
pattern: Annotated[str, "The regex pattern to search for"],
project_root: Annotated[str | None, "Optional project root path"] = None,
max_results: Annotated[int, "Cap results to avoid huge payloads"] = 200,
ignore_case: Annotated[bool | str | None, "Case insensitive search"] = True,
) -> dict[str, Any]:
# project_root is currently unused but kept for interface consistency
unity_instance = get_unity_instance_from_context(ctx)
await ctx.info(
f"Processing find_in_file: {uri} (unity_instance={unity_instance or 'default'})")
name, directory = _split_uri(uri)
# 1. Read file content via Unity
read_resp = await send_with_unity_instance(
async_send_command_with_retry,
unity_instance,
"manage_script",
{
"action": "read",
"name": name,
"path": directory,
},
)
if not isinstance(read_resp, dict) or not read_resp.get("success"):
return read_resp if isinstance(read_resp, dict) else {"success": False, "message": str(read_resp)}
data = read_resp.get("data", {})
contents = data.get("contents")
if not contents and data.get("contentsEncoded") and data.get("encodedContents"):
try:
contents = base64.b64decode(data.get("encodedContents", "").encode(
"utf-8")).decode("utf-8", "replace")
except (ValueError, TypeError, base64.binascii.Error):
contents = contents or ""
if contents is None:
return {"success": False, "message": "Could not read file content."}
# 2. Perform regex search
flags = re.MULTILINE
# Handle ignore_case which can be boolean or string from some clients
ic = ignore_case
if isinstance(ic, str):
ic = ic.lower() in ("true", "1", "yes")
if ic:
flags |= re.IGNORECASE
try:
regex = re.compile(pattern, flags)
except re.error as e:
return {"success": False, "message": f"Invalid regex pattern: {e}"}
# If the regex is not multiline specific (doesn't contain \n literal match logic),
# we could iterate lines. But users might use multiline regexes.
# Let's search the whole content and map back to lines.
found = list(regex.finditer(contents))
results = []
count = 0
for m in found:
if count >= max_results:
break
start_idx = m.start()
end_idx = m.end()
# Calculate line number
# Count newlines up to start_idx
line_num = contents.count('\n', 0, start_idx) + 1
# Get line content for excerpt
# Find start of line
line_start = contents.rfind('\n', 0, start_idx) + 1
# Find end of line
line_end = contents.find('\n', start_idx)
if line_end == -1:
line_end = len(contents)
line_content = contents[line_start:line_end]
# Create excerpt
# We can just return the line content as excerpt
results.append({
"line": line_num,
"content": line_content.strip(), # detailed match info?
"match": m.group(0),
"start": start_idx,
"end": end_idx
})
count += 1
return {
"success": True,
"data": {
"matches": results,
"count": len(results),
"total_matches": len(found)
}
}