-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcore.py
More file actions
313 lines (251 loc) · 9.95 KB
/
core.py
File metadata and controls
313 lines (251 loc) · 9.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
"""
Shutter Core — the screen context engine.
Captures screenshots, runs a local vision model (Qwen3-VL-8B via Apple MLX),
maintains session memory, and sanitizes output. This module is the shared
foundation for both the HTTP API and MCP server.
Three public functions:
get_screen_context() -> structured text describing what's on screen
get_screenshot_bytes() -> base64-encoded PNG of the screen
run_text() -> text-only model inference (no image)
"""
import os
# Force standard PIL image processor instead of Pillow-SIMD.
# Required for compatibility with Qwen3-VL model on MLX.
os.environ["TRANSFORMERS_NO_FAST_IMAGE_PROCESSOR"] = "1"
import base64
import subprocess
import tempfile
import time as _time
import re
import logging
import psutil
import Quartz
log = logging.getLogger("shutter")
# ---------------------------------------------------------------------------
# CONFIG
# ---------------------------------------------------------------------------
MODEL_ID = "mlx-community/Qwen3-VL-8B-Instruct-4bit"
MIN_RAM_AVAILABLE_GB = 1.0
MAX_SESSION_LOG = 5
# Image-level PII redaction (Apple Vision OCR).
# Disable with SHUTTER_REDACT=0 environment variable.
REDACT_SCREENSHOTS = os.environ.get("SHUTTER_REDACT", "1") != "0"
# ---------------------------------------------------------------------------
# MODEL — loads once, stays warm
# ---------------------------------------------------------------------------
_model = None
_processor = None
_session_log = [] # list of (timestamp, description)
SESSION_TTL = 3600 # expire entries older than 1 hour
def load_model():
"""Load the vision model. Called automatically on first inference."""
global _model, _processor
if _model is not None:
return
log.info(f"Loading model {MODEL_ID} (first run downloads ~5GB)...")
from mlx_vlm import load
_model, _processor = load(MODEL_ID)
log.info("Model loaded and warm.")
def run_vision(image_path, prompt):
"""Run the model with an image + prompt. Returns text."""
load_model()
from mlx_vlm import generate
from mlx_vlm.prompt_utils import apply_chat_template
formatted_prompt = apply_chat_template(
_processor, _model.config, prompt, num_images=1,
)
output = generate(
_model, _processor, formatted_prompt, [image_path],
max_tokens=300, temperature=0.3, repetition_penalty=1.2, verbose=False,
)
if isinstance(output, str):
return output.strip()
elif hasattr(output, 'text'):
return output.text.strip()
else:
return str(output).strip()
def run_text(prompt):
"""Run the model text-only (no image). Returns text."""
load_model()
from mlx_vlm import generate
output = generate(
_model, _processor, prompt, [],
max_tokens=200, temperature=0.2, repetition_penalty=1.2, verbose=False,
)
if isinstance(output, str):
return output.strip()
elif hasattr(output, 'text'):
return output.text.strip()
else:
return str(output).strip()
# ---------------------------------------------------------------------------
# SESSION MEMORY
# ---------------------------------------------------------------------------
def _prune_session():
"""Remove expired session entries."""
cutoff = _time.time() - SESSION_TTL
while _session_log and _session_log[0][0] < cutoff:
_session_log.pop(0)
def remember_screen(description):
"""Track recent screen descriptions for continuity."""
_prune_session()
_session_log.append((_time.time(), description))
if len(_session_log) > MAX_SESSION_LOG:
_session_log.pop(0)
def get_session_context():
"""Return recent activity as context string for the model."""
_prune_session()
if not _session_log:
return ""
return "Recent activity:\n" + "\n".join(f"- {d[:100]}" for _, d in _session_log)
def get_session_history():
"""Return a copy of the raw session log (descriptions only)."""
_prune_session()
return [d for _, d in _session_log]
# ---------------------------------------------------------------------------
# SYSTEM CHECKS
# ---------------------------------------------------------------------------
def has_headroom():
"""Check if there's enough free RAM and CPU to run inference."""
mem = psutil.virtual_memory()
available_gb = mem.available / (1024 ** 3)
cpu_pct = psutil.cpu_percent(interval=0.5)
if available_gb < MIN_RAM_AVAILABLE_GB:
log.info(f"Low RAM: {available_gb:.1f}GB available, need {MIN_RAM_AVAILABLE_GB}GB.")
return False
if cpu_pct > 80:
log.info(f"CPU hot: {cpu_pct}%.")
return False
return True
def get_idle_seconds():
"""How many seconds since the user last touched keyboard/mouse."""
return Quartz.CGEventSourceSecondsSinceLastEventType(
Quartz.kCGEventSourceStateHIDSystemState,
int(0xFFFFFFFF),
)
# ---------------------------------------------------------------------------
# SCREENSHOT
# ---------------------------------------------------------------------------
def take_screenshot():
"""Silent screenshot, no shutter sound. Returns temp file path or None."""
# Use NamedTemporaryFile to avoid TOCTOU race condition.
# The file is created atomically; screencapture overwrites it.
fd = tempfile.NamedTemporaryFile(suffix=".png", prefix="shutter_", delete=False)
path = fd.name
fd.close()
try:
subprocess.run(
["screencapture", "-x", "-C", path],
check=True, capture_output=True,
)
return path
except subprocess.CalledProcessError as e:
log.error(f"Screenshot failed: {e}")
try:
os.remove(path)
except OSError:
pass
return None
# ---------------------------------------------------------------------------
# SANITIZE
# ---------------------------------------------------------------------------
def sanitize_text(text):
"""Strip anything that looks like a secret or PII from text.
Catches: long tokens, file paths, API keys, credit cards, SSNs,
email addresses, UUIDs, and credential keywords.
"""
# Credit card patterns (4 groups of 4 digits)
text = re.sub(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', '[REDACTED]', text)
# Social Security Numbers
text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[REDACTED]', text)
# Email addresses
text = re.sub(r'\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b', '[REDACTED]', text)
# UUIDs
text = re.sub(r'\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b', '[REDACTED]', text, flags=re.IGNORECASE)
# Long alphanumeric tokens (potential secrets, 15+ chars)
text = re.sub(r'\b[A-Za-z0-9_\-]{15,}\b', '[REDACTED]', text)
# Unix file paths
text = re.sub(r'(/[\w.\-]+)+', '[PATH]', text)
# Windows file paths
text = re.sub(r'[A-Za-z]:\\[\w.\-\\]+', '[PATH]', text)
# Credential keywords followed by values
text = re.sub(
r'(key|token|secret|password|api_key|bearer|auth|credential|'
r'private_key|access_token|refresh_token|apikey|api[_\-]secret)\s*[=:]\s*\S+',
'[REDACTED]', text, flags=re.IGNORECASE,
)
# Collapse whitespace
text = re.sub(r'\s+', ' ', text).strip()
return text
# ---------------------------------------------------------------------------
# PUBLIC API
# ---------------------------------------------------------------------------
VISION_PROMPT = (
"Describe what this person is doing in 2-3 sentences. "
"What application are they using, what task are they doing, "
"and what might they be stuck on? "
"Be factual and concise. "
"IMPORTANT: Do NOT describe any passwords, API keys, tokens, file paths, "
"or other sensitive information visible on screen. If you see private data, "
"just note that sensitive content is visible without describing it."
)
def get_screen_context(include_history=True):
"""
Take a screenshot, analyze it with the vision model, return structured text.
Args:
include_history: If True, include session_history in response.
Set to False for external API responses to limit data exposure.
Returns dict: {"description": "...", "session_history": [...]}
Raises RuntimeError if screenshot fails or system has no headroom.
"""
if not has_headroom():
raise RuntimeError("System resources too low for inference")
img = take_screenshot()
if not img or not os.path.exists(img):
raise RuntimeError("Screenshot capture failed")
try:
# Redact PII from image before the vision model sees it
if REDACT_SCREENSHOTS:
from redact import redact_image
img = redact_image(img)
session_ctx = get_session_context()
prompt = VISION_PROMPT
if session_ctx:
prompt += f"\n\n{session_ctx}"
description = run_vision(img, prompt)
description = sanitize_text(description)
remember_screen(description)
return {
"description": description,
"session_history": get_session_history() if include_history else [],
}
finally:
try:
os.remove(img)
except OSError:
pass
def get_screenshot_bytes():
"""
Take a screenshot and return it as base64-encoded PNG.
Returns dict: {"image_base64": "...", "content_type": "image/png"}
Raises RuntimeError if screenshot fails.
"""
img = take_screenshot()
if not img or not os.path.exists(img):
raise RuntimeError("Screenshot capture failed")
try:
# Redact PII from image before encoding
if REDACT_SCREENSHOTS:
from redact import redact_image
img = redact_image(img)
with open(img, "rb") as f:
image_data = base64.b64encode(f.read()).decode("utf-8")
return {
"image_base64": image_data,
"content_type": "image/png",
}
finally:
try:
os.remove(img)
except OSError:
pass