diff --git a/.env.example b/.env.example index dee118aa9f..7bcee9826d 100644 --- a/.env.example +++ b/.env.example @@ -1,13 +1,899 @@ -# Deployment Environment: -NODE_ENV=development +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- -# Next Auth config: -# Generate a secret with `openssl rand -base64 32` -NEXTAUTH_SECRET=changeme -NEXTAUTH_URL=http://localhost:3000 +""" +Rule Tuner — rewritten single-file GUI and Gemini-aware client. -# Prisma -DATABASE_URL=file:./db.sqlite +Features: +- UI per screenshot (fields for Rule, FP JSONL, FN JSONL, Env JSON, API URL, API Key) +- Uses existing input spec & FORCE_JSON constraints when building prompts +- Calls Gemini (or any JSON-compatible endpoint) and extracts explanation and JSON code blocks +- Formats results as Markdown: FP analysis, FN analysis, risk analysis, diff, rule_after +- Outputs human-friendly Markdown -# External APIs: -OPENAI_API_KEY=changeme +Notes: +- This file is intentionally self-contained and designed to be easy to extend. +""" + +from __future__ import annotations + +import json +import os +import re +import tkinter as tk +from typing import Any, Dict, List, Optional +from tkinter import ttk, filedialog, messagebox +from tkinter.scrolledtext import ScrolledText + +try: + import requests +except Exception: + requests = None + +# --------------------- Prompt spec --------------------- + +BASE_SPEC_JSON = """指示に厳密に従い、以下のキーを持つJSONオブジェクトのみを生成せよ。 +- false_positive_patterns: 文字列の配列 +- false_negative_patterns: 文字列の配列 +- rule_diff: オブジェクトまたは文字列 +- risk_impact_analysis: オブジェクトまたは文字列 +- rule_after: 文字列 +""" + +FORCE_JSON = """応答は必ず以下の形式のJSONコードブロックのみとすること。 + +json +{ + "false_positive_patterns": [ ... ], + "false_negative_patterns": [ ... ], + "rule_diff": "...", + "risk_impact_analysis": "...", + "rule_after": "..." +} + +挨拶、説明、要約など、このJSONコードブロック以外のテキストは一切含めてはならない。 +""" + + +# --------------------- Utilities --------------------- + +def read_text_file(path: str) -> str: + if os.path.isfile(path): + with open(path, 'r', encoding='utf-8') as f: + return f.read() + return path + + +def read_json_file(path: str) -> Dict[str, Any]: + if os.path.isfile(path): + with open(path, 'r', encoding='utf-8') as f: + return json.load(f) + return json.loads(path) + + +def read_jsonl(path: str) -> List[Dict[str, Any]]: + if os.path.isfile(path): + with open(path, 'r', encoding='utf-8') as f: + lines = [l.strip() for l in f if l.strip()] + else: + lines = [l.strip() for l in path.splitlines() if l.strip()] + out: List[Dict[str, Any]] = [] + for ln in lines: + try: + out.append(json.loads(ln)) + except Exception: + # ignore malformed lines + continue + return out + + +# --------------------- Gemini-aware parsing --------------------- + +EXPECTED_KEYS = {"false_positive_patterns", "false_negative_patterns", "rule_diff", "risk_impact_analysis", "rule_after"} + + +def extract_text_from_gemini(obj: Any) -> str: + """Extract readable narrative text from Gemini / LLM-style response. + + Gemini often returns a list of candidates with content.parts[].text -- collect those. + """ + if isinstance(obj, str): + return obj + + if isinstance(obj, dict): + # if it already has expected keys, return empty narrative + if any(k in obj for k in EXPECTED_KEYS): + return "" + for key in ("content", "message", "text", "output"): + val = obj.get(key) + if isinstance(val, str): + return val + if isinstance(val, dict) or isinstance(val, list): + t = extract_text_from_gemini(val) + if t: + return t + + if isinstance(obj, list): + # candidates style + parts: List[str] = [] + for item in obj: + if isinstance(item, dict): + # Gemini: item['candidates'][i]['content']['parts'][j]['text'] + cands = item.get('candidates') or item.get('outputs') or item.get('choices') + if isinstance(cands, list): + for cand in cands: + content = cand.get('content') if isinstance(cand, dict) else None + if isinstance(content, dict): + parts_list = content.get('parts') or [] + for p in parts_list: + if isinstance(p, dict) and isinstance(p.get('text'), str): + parts.append(p.get('text')) + if isinstance(content, str): + parts.append(content) + # fallback for simple dicts with 'text' + if 'text' in item and isinstance(item['text'], str): + parts.append(item['text']) + elif isinstance(item, str): + parts.append(item) + return "\n\n".join(parts) + + return "" + + +def find_json_in_text(text: str) -> Optional[Dict[str, Any]]: + """Try to find a JSON object in text. Prefer codeblock JSON or first {...} with expected keys.""" + # 1. fenced codeblock JSON + m = re.search(r"```(?:json|JSON)?\s*\n([\s\S]*?)\n```", text, flags=re.S) + if m: + block = m.group(1) + try: + return json.loads(block) + except Exception: + # Try a tolerant repair: collapse accidental newlines between word characters, + # remove trailing commas before closing braces/brackets, and collapse multiple + # blank-lines. These heuristics fix common model formatting glitches. + attempt = re.sub(r'(?<=\w)\s*\n\s*(?=\w)', ' ', block) + attempt = re.sub(r',\s*([}\]])', r'\1', attempt) + attempt = re.sub(r'\n{2,}', '\n', attempt) + try: + return json.loads(attempt) + except Exception: + # Last-ditch: try to extract the largest {...} substring and repair it + first = block.find('{') + last = block.rfind('}') + if first != -1 and last != -1 and last > first: + candidate = block[first:last+1] + candidate = re.sub(r'(?<=\w)\s*\n\s*(?=\w)', ' ', candidate) + candidate = re.sub(r',\s*([}\]])', r'\1', candidate) + try: + return json.loads(candidate) + except Exception: + pass + + # 2. try to parse entire text + try: + parsed = json.loads(text) + if isinstance(parsed, dict) and any(k in parsed for k in EXPECTED_KEYS): + return parsed + except Exception: + pass + + # 3. Find braces sections and try to load + for match in re.finditer(r"\{[\s\S]*?\}", text): + s = match.group(0) + try: + parsed = json.loads(s) + if isinstance(parsed, dict) and any(k in parsed for k in EXPECTED_KEYS): + return parsed + except Exception: + # Try repairing this fragment similar to codeblock repair + s2 = re.sub(r'(?<=\w)\s*\n\s*(?=\w)', ' ', s) + s2 = re.sub(r',\s*([}\]])', r'\1', s2) + try: + parsed = json.loads(s2) + if isinstance(parsed, dict) and any(k in parsed for k in EXPECTED_KEYS): + return parsed + except Exception: + continue + + return None + + +# ------------------ AI client ------------------ + +class GeminiClient: + def __init__(self, api_url: str, api_key: Optional[str] = None, timeout: int = 600): + self.api_url = api_url + self.api_key = api_key + self.timeout = timeout + + def call(self, prompt: str) -> Any: + if requests is None: + raise RuntimeError("requests must be installed to call the API") + + headers = {"Content-Type": "application/json", "Accept": "application/json"} + if self.api_key: + headers[os.environ.get('API_KEY_HEADER', 'api-key')] = self.api_key + + payload = { + "contents": [{"role": "user", "parts": [{"text": prompt}]}], + "generationConfig": {"temperature": 0.0, "maxOutputTokens": 3000}, + } + resp = requests.post(self.api_url, headers=headers, json=payload, timeout=self.timeout) + try: + return resp.json() + except Exception: + return resp.text + + +# ------------------ Markdown formatting ------------------ + +def format_as_markdown(parsed: Any) -> str: + """Take parsed AI output (dict or str) and render a human-friendly Markdown string.""" + # If string -> assume it's already a narrative markdown + if isinstance(parsed, str): + # Clean up extra JSON noise if any + parsed = parsed.strip() + # If the string is a long JSON, try to decode + maybe = find_json_in_text(parsed) + if maybe: + parsed = maybe + else: + return parsed + + # If dict with expected keys, render sections + if isinstance(parsed, dict): + parts: List[str] = ["# AIによるルール修正案", "---", ""] + + # FP + fp = parsed.get("false_positive_patterns") + if fp: + parts.append("## 現状分析 (FP: 誤検知)") + if isinstance(fp, (list, tuple)): + parts.extend([f"- {i}" for i in fp]) + else: + parts.append(str(fp)) + parts.append("") + + # FN + fn = parsed.get("false_negative_patterns") + if fn: + parts.append("## 現状分析 (FN: 検知漏れ)") + if isinstance(fn, (list, tuple)): + parts.extend([f"- {i}" for i in fn]) + else: + parts.append(str(fn)) + parts.append("") + + # Risk analysis + ra = parsed.get("risk_impact_analysis") or parsed.get("risk") + if ra: + parts.append("## リスクと影響の分析") + parts.append(str(ra) if not isinstance(ra, (dict, list)) else json.dumps(ra, ensure_ascii=False, indent=2)) + parts.append("") + + # Rule diff + rd = parsed.get("rule_diff") + if rd: + parts.append("## 修正方針 (差分)") + parts.append("```text") + parts.append(str(rd) if not isinstance(rd, (dict, list)) else json.dumps(rd, ensure_ascii=False, indent=2)) + parts.append("```") + parts.append("") + + # Rule after + raft = parsed.get("rule_after") or parsed.get("rule_after_rule") + if raft: + parts.append("## 修正後のルール") + parts.append("```text") + parts.append(str(raft)) + parts.append("```") + parts.append("") + + return "\n".join(parts) + + # Fall back to JSON block + try: + return "```json\n" + json.dumps(parsed, ensure_ascii=False, indent=2) + "\n```" + except Exception: + return str(parsed) + + +# ------------------ UI ------------------ + +class RuleTunerApp(tk.Tk): + def __init__(self): + super().__init__() + self.title('Rule Tuner - Markdown (Gemini)') + self.geometry('1100x760') + + self.rule_path = tk.StringVar() + self.fp_path = tk.StringVar() + self.fn_path = tk.StringVar() + self.env_path = tk.StringVar() + self.api_url = tk.StringVar() + self.api_key = tk.StringVar() + + self._build_gui() + + def _build_gui(self): + top = ttk.Frame(self) + top.pack(fill=tk.X, padx=8, pady=6) + + def add_row(label, var): + row = ttk.Frame(top) + row.pack(fill=tk.X, pady=2) + ttk.Label(row, text=label, width=12).pack(side=tk.LEFT) + ttk.Entry(row, textvariable=var).pack(side=tk.LEFT, fill=tk.X, expand=True) + ttk.Button(row, text='Browse', command=lambda v=var: self._browse(v)).pack(side=tk.LEFT) + + add_row('Rule', self.rule_path) + add_row('FP JSONL', self.fp_path) + add_row('FN JSONL', self.fn_path) + add_row('Env JSON', self.env_path) + + api_row = ttk.Frame(top) + api_row.pack(fill=tk.X, pady=2) + ttk.Label(api_row, text='API URL', width=12).pack(side=tk.LEFT) + ttk.Entry(api_row, textvariable=self.api_url).pack(side=tk.LEFT, fill=tk.X, expand=True) + ttk.Label(api_row, text='API Key', width=8).pack(side=tk.LEFT) + ttk.Entry(api_row, textvariable=self.api_key, show='*').pack(side=tk.LEFT) + + btn_row = ttk.Frame(top) + btn_row.pack(fill=tk.X, pady=6) + ttk.Button(btn_row, text='Build Prompt', command=self.build_prompt).pack(side=tk.LEFT) + ttk.Button(btn_row, text='Send to API', command=self.send_to_api).pack(side=tk.LEFT) + + mid = ttk.PanedWindow(self, orient=tk.HORIZONTAL) + mid.pack(fill=tk.BOTH, expand=True, padx=8, pady=6) + + left = ttk.Frame(mid) + right = ttk.Frame(mid) + mid.add(left, weight=1) + mid.add(right, weight=1) + + ttk.Label(left, text='Prompt Preview').pack(anchor=tk.W) + self.prompt_view = ScrolledText(left, height=24) + self.prompt_view.pack(fill=tk.BOTH, expand=True) + + ttk.Label(right, text='出力: 分析 / 変更案 (Markdown)').pack(anchor=tk.W) + self.output_view = ScrolledText(right, height=24) + self.output_view.pack(fill=tk.BOTH, expand=True) + + def _browse(self, var: tk.StringVar): + p = filedialog.askopenfilename() + if p: + var.set(p) + + def build_prompt(self): + try: + rule_text = read_text_file(self.rule_path.get().strip()) + fp = read_jsonl(self.fp_path.get().strip()) + fn = read_jsonl(self.fn_path.get().strip()) + env = read_json_file(self.env_path.get().strip()) + prompt = self._compose_prompt(env, rule_text, fp, fn) + self.prompt_view.delete('1.0', tk.END) + self.prompt_view.insert(tk.END, prompt) + messagebox.showinfo('OK', 'Prompt built') + except Exception as e: + messagebox.showerror('Error', f'Failed to build prompt: {e}') + + def _compose_prompt(self, env: Dict[str, Any], rule_text: str, fp_head: List[Dict[str, Any]], fn_head: List[Dict[str, Any]]) -> str: + parts = [BASE_SPEC_JSON, FORCE_JSON, '\n【既存ルール(全文)】\n', rule_text, '\n【環境制約】\n', json.dumps(env, ensure_ascii=False, indent=2), '\n【FP】\n', json.dumps(fp_head[:5], ensure_ascii=False, indent=2), '\n【FN】\n', json.dumps(fn_head[:5], ensure_ascii=False, indent=2)] + return '\n'.join(parts) + + def send_to_api(self): + prompt = self.prompt_view.get('1.0', tk.END).strip() + if not prompt: + messagebox.showwarning('Empty prompt', 'Please build the prompt first.') + return + url = self.api_url.get().strip() or os.environ.get('DEFAULT_API_URL') + if not url: + messagebox.showwarning('API URL missing', 'Please enter API URL or set DEFAULT_API_URL') + return + client = GeminiClient(url, self.api_key.get().strip() or None) + self.output_view.delete('1.0', tk.END) + self.output_view.insert(tk.END, 'Calling API...') + self.update_idletasks() + + try: + raw = client.call(prompt) + # Extract narrative and JSON + narrative = extract_text_from_gemini(raw) + parsed = None + if isinstance(raw, dict) and any(k in raw for k in EXPECTED_KEYS): + parsed = raw + else: + # try to locate JSON in narrative + parsed = find_json_in_text(narrative) + # If parsed is None, the narrative may be the only thing + md = format_as_markdown(parsed if parsed else narrative) + self.output_view.delete('1.0', tk.END) + self.output_view.insert(tk.END, md) + except Exception as e: + self.output_view.delete('1.0', tk.END) + self.output_view.insert(tk.END, f'API call failed: {e}') + + +if __name__ == '__main__': + RuleTunerApp().mainloop() +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Single-file Rule Tuner UI + AI client that always displays AI outputs as Markdown +- Keeps the original features but converts structured AI responses to Markdown +- No automatic JSON debug output (only if SHOW_DEBUG environment var is set) +""" + +import json +import os +import re +import tkinter as tk +from tkinter import ttk, filedialog, messagebox +from tkinter.scrolledtext import ScrolledText +from typing import Any, Dict, List, Optional + +try: + import requests +except Exception: + requests = None # we can still run an offline flow + +# ---------- utilities ---------- + +def read_text_file(path: str) -> str: + if os.path.isfile(path): + with open(path, 'r', encoding='utf-8') as f: + return f.read() + return path + + +def read_json_file(path: str) -> Dict[str, Any]: + if os.path.isfile(path): + with open(path, 'r', encoding='utf-8') as f: + return json.load(f) + return json.loads(path) + + +def read_jsonl(path: str) -> List[Dict[str, Any]]: + if os.path.isfile(path): + with open(path, 'r', encoding='utf-8') as f: + lines = [l.strip() for l in f if l.strip()] + else: + lines = [l.strip() for l in path.splitlines() if l.strip()] + out = [] + for ln in lines: + try: + out.append(json.loads(ln)) + except Exception: + continue + return out + + +# ---------- normalization (keeps behavior) ---------- + +def normalize_response(data: Any) -> Any: + expected_keys = {"false_positive_patterns", "false_negative_patterns", "rule_diff", "risk_impact_analysis", "rule_after"} + + def is_expected(obj: Any) -> bool: + return isinstance(obj, dict) and any(k in obj for k in expected_keys) + + # Already a desired structure + if is_expected(data): + return data + + # If the service wrapped JSON in codeblock, try to strip it. + # Prefer narrative text outside codeblocks so users see plain Markdown rather than raw JSON. + if isinstance(data, str): + # regex matching fenced codeblocks like ```json ... ``` + codeblock_re = re.compile(r"```(?:json|JSON|markdown)?\s*\n([\s\S]*?)\n```", flags=re.S) + + # If there is text outside of fenced codeblocks, prefer that. + split_parts = codeblock_re.split(data) + if split_parts: + # parts at even indices are the outside-of-code content + outside = "".join(split_parts[i] for i in range(0, len(split_parts), 2)).strip() + if outside: + # Also remove embedded JSON-like fragments inside the narrative + return _remove_json_fragments(outside) + + # Otherwise, try to parse the JSON from the first codeblock if present + m = codeblock_re.search(data) + if m: + try: + return normalize_response(json.loads(m.group(1))) + except Exception: + pass + try: + parsed = json.loads(data) + return normalize_response(parsed) + except Exception: + return data + + # If list with Gemini style content, pull text + if isinstance(data, list) and len(data) > 0: + if isinstance(data[0], dict) and 'content' in data[0]: + try: + text_content = data[0]['content']['parts'][0]['text'] + return normalize_response(text_content) + except Exception: + pass + + # join textual parts + texts: List[str] = [] + for item in data: + if isinstance(item, str): + texts.append(item) + elif isinstance(item, dict): + if is_expected(item): + return item + # try to get text + text = item.get('text') or item.get('content') or None + if isinstance(text, str): + texts.append(text) + elif isinstance(text, dict): + # try deeper + parts = text.get('parts') or [] + for p in parts: + if isinstance(p, dict) and isinstance(p.get('text'), str): + texts.append(p.get('text')) + if texts: + return normalize_response('\n'.join(texts)) + + # dict wrapper + if isinstance(data, dict): + for key in ('text', 'content', 'message', 'output'): + val = data.get(key) + if val is not None: + return normalize_response(val) + # arrays inside + for key in ('outputs', 'choices', 'results', 'candidates'): + val = data.get(key) + if isinstance(val, list): + return normalize_response(val) + + return data + + +def _remove_json_fragments(text: str) -> str: + """ + Remove JSON-like blocks and standalone JSON key/value lines from ``text``. + We only remove blocks that look like they contain the expected rule keys + (false_positive_patterns, rule_diff, etc.) to avoid stripping natural text. + """ + expected_keys = ("false_positive_patterns", "false_negative_patterns", "rule_diff", "risk_impact_analysis", "rule_after") + + # Remove fenced codeblocks first (already handled upstream but keep for safety) + text = re.sub(r"```(?:json|JSON|markdown)?\s*\n[\s\S]*?\n```", "", text, flags=re.S) + + # Remove balanced JSON objects that include any expected key + out = [] + i = 0 + n = len(text) + while i < n: + if text[i] == '{': + depth = 1 + j = i + 1 + while j < n and depth > 0: + if text[j] == '{': + depth += 1 + elif text[j] == '}': + depth -= 1 + j += 1 + candidate = text[i:j] + if any(k in candidate for k in expected_keys): + # skip this fragment + i = j + continue + else: + out.append(candidate) + i = j + continue + else: + out.append(text[i]) + i += 1 + cleaned = ''.join(out) + + # Remove JSON style key lines such as " "rule_after": "...", + cleaned = re.sub(r"^\s*\"[a-zA-Z0-9_]+\"\s*:\s*[^\n]*\n?", "", cleaned, flags=re.M) + + # Trim extraneous punctuation or leftover commas / brackets + cleaned = re.sub(r"[,\[\]]{2,}", "", cleaned) + # Collapse multiple blank lines + cleaned = re.sub(r"\n{2,}", "\n\n", cleaned) + + return cleaned.strip() + + +# ---------- AI client ---------- + +class SimpleAIClient: + def __init__(self, api_url: str, api_key: Optional[str] = None, timeout: int = 600): + self.api_url = api_url + self.api_key = api_key + self.timeout = timeout + + def generate(self, prompt: str) -> Any: + if requests is None: + raise RuntimeError("requests module is required for API calls") + + headers = {'Content-Type': 'application/json', 'Accept': 'application/json'} + api_key_header = os.environ.get('API_KEY_HEADER', 'api-key') + if self.api_key: + if api_key_header.lower() == 'authorization': + headers['Authorization'] = f'Bearer {self.api_key}' + else: + headers[api_key_header] = self.api_key + + payload = { + 'contents': [{'role': 'user', 'parts': [{'text': prompt}]}], + 'generationConfig': { + 'temperature': 0.2, + 'maxOutputTokens': 10000, + } + } + + resp = requests.post(self.api_url, headers=headers, json=payload, timeout=self.timeout) + try: + return normalize_response(resp.json()) + except Exception: + return normalize_response(resp.text) + + +# ---------- prompt builder (kept minimal) ---------- + +BASE_SPEC_JSON = """指示に厳密に従い、以下のキーを持つJSONオブジェクトのみを生成せよ。 +- false_positive_patterns: 文字列の配列 +- false_negative_patterns: 文字列の配列 +- rule_diff: オブジェクトまたは文字列 +- risk_impact_analysis: オブジェクトまたは文字列 +- rule_after: 文字列 +""" + +FORCE_JSON = """応答は必ず以下の形式のJSONコードブロックのみとすること。 + +json +{ +"false_positive_patterns": [ ... ], +"false_negative_patterns": [ ... ], +"rule_diff": "...", +"risk_impact_analysis": "...", +"rule_after": "..." +} + +挨拶、説明、要約など、このJSONコードブロック以外のテキストは一切含めてはならない。 +""" + + +def build_prompt(env: Dict[str, Any], rule_text: str, fp_head: List[Dict[str, Any]], fn_head: List[Dict[str, Any]], extra: str = '') -> str: + parts = [BASE_SPEC_JSON, FORCE_JSON] + parts.append('\n【既存ルール(全文)】\n') + parts.append(rule_text) + parts.append('\n【環境制約】\n') + parts.append(json.dumps(env, ensure_ascii=False, indent=2)) + parts.append('\n【FP】\n') + parts.append(json.dumps(fp_head[:5], ensure_ascii=False, indent=2)) + parts.append('\n【FN】\n') + parts.append(json.dumps(fn_head[:5], ensure_ascii=False, indent=2)) + if extra: + parts.append('\n【追加指示】') + parts.append(extra) + return '\n'.join(parts) + + +# ---------- Markdown conversion ---------- + +def _render_section_list(title: str, items: Any) -> List[str]: + md: List[str] = [f"## {title}", ""] + if isinstance(items, list): + for it in items: + md.append(f"- {it}") + else: + md.append(str(items)) + md.append("") + return md + + +def _render_section_text(title: str, text: Any, code: bool = False) -> List[str]: + md: List[str] = [f"## {title}", ""] + if isinstance(text, (dict, list)): + try: + pretty = json.dumps(text, ensure_ascii=False, indent=2) + if code: + md.append("```text") + md.append(pretty) + md.append("```") + else: + md.append(pretty) + except Exception: + md.append(str(text)) + else: + if isinstance(text, str): + md.append(text.replace("\\n", "\n")) + else: + md.append(str(text)) + md.append("") + return md + + +def format_thinking(ai_result: Any) -> str: + """ + Convert the AI result into Markdown. + If it's a string -> assume it's already Markdown and return it. + If it's a dict with the expected keys -> format the sections nicely. + Otherwise, convert to a short JSON code block. + + Note: No raw debug JSON is appended by default. Set env SHOW_DEBUG=1 to enable raw dump. + """ + if isinstance(ai_result, str): + return ai_result + + if isinstance(ai_result, dict): + sections_map = [ + ("現状分析 (FP: 誤検知)", "false_positive_patterns", True), + ("現状分析 (FN: 検知漏れ)", "false_negative_patterns", True), + ("リスクと影響の分析", "risk_impact_analysis", False), + ("修正方針 (差分)", "rule_diff", True), + ("修正後のルール", "rule_after", True), + ] + + md: List[str] = ["# AIによるルール修正案", "---", ""] + + for title, key, is_list in sections_map: + content = ai_result.get(key) + if content is None or content == "": + continue + + # If the item is a list and maps to bullet style + if is_list and isinstance(content, (list, tuple)): + md.extend(_render_section_list(title, content)) + elif is_list and isinstance(content, str) and ('\n' in content or content.strip().startswith('[')): + # try parsing bracketed lists in strings + try: + parsed = json.loads(content) + md.extend(_render_section_list(title, parsed)) + except Exception: + md.extend(_render_section_text(title, content, code=False)) + elif key in ("rule_diff", "rule_after"): + # code block display for these keys + md.extend(_render_section_text(title, content, code=True)) + else: + md.extend(_render_section_text(title, content, code=False)) + + # Optionally add raw JSON if user wants it + if os.environ.get('SHOW_DEBUG'): + md.append("---") + md.append("## DEBUG: Raw AI response (JSON)") + md.append("```json") + try: + md.append(json.dumps(ai_result, ensure_ascii=False, indent=2)) + except Exception: + md.append(str(ai_result)) + md.append("```") + + return "\n".join(md) + + # fallback: print a concise json block as markdown + try: + return "```json\n" + json.dumps(ai_result, ensure_ascii=False, indent=2) + "\n```" + except Exception: + return str(ai_result) + + +# ---------- minimal GUI ---------- + +class App(tk.Tk): + def __init__(self): + super().__init__() + self.title('Rule Tuner - Markdown') + self.geometry('1000x720') + + self.rule_path = tk.StringVar() + self.fp_path = tk.StringVar() + self.fn_path = tk.StringVar() + self.env_path = tk.StringVar() + self.api_url = tk.StringVar() + self.api_key = tk.StringVar() + + self.prompt_text: Optional[str] = None + self.ai_result: Optional[Any] = None + + self._build_widgets() + + def _build_widgets(self): + pad = {'padx': 6, 'pady': 4} + top = ttk.Frame(self) + top.pack(fill=tk.X, **pad) + + def add_row(label, var): + row = ttk.Frame(top) + row.pack(fill=tk.X, **pad) + ttk.Label(row, text=label, width=12).pack(side=tk.LEFT) + ttk.Entry(row, textvariable=var).pack(side=tk.LEFT, fill=tk.X, expand=True) + ttk.Button(row, text='Browse', command=lambda: self._browse(var)).pack(side=tk.LEFT) + + add_row('Rule', self.rule_path) + add_row('FP JSONL', self.fp_path) + add_row('FN JSONL', self.fn_path) + add_row('Env JSON', self.env_path) + + api_frame = ttk.Frame(self) + api_frame.pack(fill=tk.X, **pad) + ttk.Label(api_frame, text='API URL', width=12).pack(side=tk.LEFT) + ttk.Entry(api_frame, textvariable=self.api_url).pack(side=tk.LEFT, fill=tk.X, expand=True) + ttk.Label(api_frame, text='API Key', width=8).pack(side=tk.LEFT) + ttk.Entry(api_frame, textvariable=self.api_key, show='*').pack(side=tk.LEFT) + + btn_frame = ttk.Frame(self) + btn_frame.pack(fill=tk.X, **pad) + ttk.Button(btn_frame, text='Build Prompt', command=self.on_build_prompt).pack(side=tk.LEFT) + ttk.Button(btn_frame, text='Send to API', command=self.on_send_api).pack(side=tk.LEFT) + + mid = ttk.PanedWindow(self, orient=tk.HORIZONTAL) + mid.pack(fill=tk.BOTH, expand=True, **pad) + + left = ttk.Frame(mid) + right = ttk.Frame(mid) + mid.add(left, weight=1) + mid.add(right, weight=1) + + ttk.Label(left, text='Prompt Preview').pack(anchor=tk.W) + self.prompt_view = ScrolledText(left, height=20) + self.prompt_view.pack(fill=tk.BOTH, expand=True) + + ttk.Label(right, text='思考プロセス / 現状分析 / 修正方針 (マークダウン)').pack(anchor=tk.W) + self.output_container = ttk.Frame(right) + self.output_container.pack(fill=tk.BOTH, expand=True) + + self.output_widget = ScrolledText(self.output_container, height=20) + self.output_widget.pack(fill=tk.BOTH, expand=True) + + def _browse(self, var: tk.StringVar): + p = filedialog.askopenfilename() + if p: + var.set(p) + + def on_build_prompt(self): + try: + rule_text = read_text_file(self.rule_path.get().strip()) + fp = read_jsonl(self.fp_path.get().strip()) + fn = read_jsonl(self.fn_path.get().strip()) + env = read_json_file(self.env_path.get().strip()) + prompt = build_prompt(env, rule_text, fp, fn, extra='') + self.prompt_text = prompt + self.prompt_view.delete('1.0', tk.END) + self.prompt_view.insert(tk.END, prompt) + messagebox.showinfo('OK', 'Prompt built') + except Exception as e: + messagebox.showerror('Error', f'Build prompt failed: {e}') + + def on_send_api(self): + if not self.prompt_text: + messagebox.showwarning('未作成', '先に Build Prompt を実行してください') + return + api_url = self.api_url.get().strip() or os.environ.get('DEFAULT_API_URL') + if not api_url: + messagebox.showwarning('API未設定', 'API URL を入力するか DEFAULT_API_URL 環境変数を設定してください') + return + client = SimpleAIClient(api_url, self.api_key.get().strip() or None) + self.show_output('Calling API...') + self.update_idletasks() + + try: + result = client.generate(self.prompt_text) + self.ai_result = result + display = format_thinking(result) + self.show_output(display) + except Exception as e: + self.show_output(f'API call failed:\n{e}\n') + + def show_output(self, md_text: str): + txt_widget = self.output_widget + txt_widget.configure(state='normal') + txt_widget.delete('1.0', tk.END) + txt_widget.insert(tk.END, md_text) + txt_widget.configure(state='disabled') + + +if __name__ == '__main__': + App().mainloop() diff --git "a/AI\346\264\273\347\224\250" "b/AI\346\264\273\347\224\250" new file mode 100644 index 0000000000..7bcee9826d --- /dev/null +++ "b/AI\346\264\273\347\224\250" @@ -0,0 +1,899 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Rule Tuner — rewritten single-file GUI and Gemini-aware client. + +Features: +- UI per screenshot (fields for Rule, FP JSONL, FN JSONL, Env JSON, API URL, API Key) +- Uses existing input spec & FORCE_JSON constraints when building prompts +- Calls Gemini (or any JSON-compatible endpoint) and extracts explanation and JSON code blocks +- Formats results as Markdown: FP analysis, FN analysis, risk analysis, diff, rule_after +- Outputs human-friendly Markdown + +Notes: +- This file is intentionally self-contained and designed to be easy to extend. +""" + +from __future__ import annotations + +import json +import os +import re +import tkinter as tk +from typing import Any, Dict, List, Optional +from tkinter import ttk, filedialog, messagebox +from tkinter.scrolledtext import ScrolledText + +try: + import requests +except Exception: + requests = None + +# --------------------- Prompt spec --------------------- + +BASE_SPEC_JSON = """指示に厳密に従い、以下のキーを持つJSONオブジェクトのみを生成せよ。 +- false_positive_patterns: 文字列の配列 +- false_negative_patterns: 文字列の配列 +- rule_diff: オブジェクトまたは文字列 +- risk_impact_analysis: オブジェクトまたは文字列 +- rule_after: 文字列 +""" + +FORCE_JSON = """応答は必ず以下の形式のJSONコードブロックのみとすること。 + +json +{ + "false_positive_patterns": [ ... ], + "false_negative_patterns": [ ... ], + "rule_diff": "...", + "risk_impact_analysis": "...", + "rule_after": "..." +} + +挨拶、説明、要約など、このJSONコードブロック以外のテキストは一切含めてはならない。 +""" + + +# --------------------- Utilities --------------------- + +def read_text_file(path: str) -> str: + if os.path.isfile(path): + with open(path, 'r', encoding='utf-8') as f: + return f.read() + return path + + +def read_json_file(path: str) -> Dict[str, Any]: + if os.path.isfile(path): + with open(path, 'r', encoding='utf-8') as f: + return json.load(f) + return json.loads(path) + + +def read_jsonl(path: str) -> List[Dict[str, Any]]: + if os.path.isfile(path): + with open(path, 'r', encoding='utf-8') as f: + lines = [l.strip() for l in f if l.strip()] + else: + lines = [l.strip() for l in path.splitlines() if l.strip()] + out: List[Dict[str, Any]] = [] + for ln in lines: + try: + out.append(json.loads(ln)) + except Exception: + # ignore malformed lines + continue + return out + + +# --------------------- Gemini-aware parsing --------------------- + +EXPECTED_KEYS = {"false_positive_patterns", "false_negative_patterns", "rule_diff", "risk_impact_analysis", "rule_after"} + + +def extract_text_from_gemini(obj: Any) -> str: + """Extract readable narrative text from Gemini / LLM-style response. + + Gemini often returns a list of candidates with content.parts[].text -- collect those. + """ + if isinstance(obj, str): + return obj + + if isinstance(obj, dict): + # if it already has expected keys, return empty narrative + if any(k in obj for k in EXPECTED_KEYS): + return "" + for key in ("content", "message", "text", "output"): + val = obj.get(key) + if isinstance(val, str): + return val + if isinstance(val, dict) or isinstance(val, list): + t = extract_text_from_gemini(val) + if t: + return t + + if isinstance(obj, list): + # candidates style + parts: List[str] = [] + for item in obj: + if isinstance(item, dict): + # Gemini: item['candidates'][i]['content']['parts'][j]['text'] + cands = item.get('candidates') or item.get('outputs') or item.get('choices') + if isinstance(cands, list): + for cand in cands: + content = cand.get('content') if isinstance(cand, dict) else None + if isinstance(content, dict): + parts_list = content.get('parts') or [] + for p in parts_list: + if isinstance(p, dict) and isinstance(p.get('text'), str): + parts.append(p.get('text')) + if isinstance(content, str): + parts.append(content) + # fallback for simple dicts with 'text' + if 'text' in item and isinstance(item['text'], str): + parts.append(item['text']) + elif isinstance(item, str): + parts.append(item) + return "\n\n".join(parts) + + return "" + + +def find_json_in_text(text: str) -> Optional[Dict[str, Any]]: + """Try to find a JSON object in text. Prefer codeblock JSON or first {...} with expected keys.""" + # 1. fenced codeblock JSON + m = re.search(r"```(?:json|JSON)?\s*\n([\s\S]*?)\n```", text, flags=re.S) + if m: + block = m.group(1) + try: + return json.loads(block) + except Exception: + # Try a tolerant repair: collapse accidental newlines between word characters, + # remove trailing commas before closing braces/brackets, and collapse multiple + # blank-lines. These heuristics fix common model formatting glitches. + attempt = re.sub(r'(?<=\w)\s*\n\s*(?=\w)', ' ', block) + attempt = re.sub(r',\s*([}\]])', r'\1', attempt) + attempt = re.sub(r'\n{2,}', '\n', attempt) + try: + return json.loads(attempt) + except Exception: + # Last-ditch: try to extract the largest {...} substring and repair it + first = block.find('{') + last = block.rfind('}') + if first != -1 and last != -1 and last > first: + candidate = block[first:last+1] + candidate = re.sub(r'(?<=\w)\s*\n\s*(?=\w)', ' ', candidate) + candidate = re.sub(r',\s*([}\]])', r'\1', candidate) + try: + return json.loads(candidate) + except Exception: + pass + + # 2. try to parse entire text + try: + parsed = json.loads(text) + if isinstance(parsed, dict) and any(k in parsed for k in EXPECTED_KEYS): + return parsed + except Exception: + pass + + # 3. Find braces sections and try to load + for match in re.finditer(r"\{[\s\S]*?\}", text): + s = match.group(0) + try: + parsed = json.loads(s) + if isinstance(parsed, dict) and any(k in parsed for k in EXPECTED_KEYS): + return parsed + except Exception: + # Try repairing this fragment similar to codeblock repair + s2 = re.sub(r'(?<=\w)\s*\n\s*(?=\w)', ' ', s) + s2 = re.sub(r',\s*([}\]])', r'\1', s2) + try: + parsed = json.loads(s2) + if isinstance(parsed, dict) and any(k in parsed for k in EXPECTED_KEYS): + return parsed + except Exception: + continue + + return None + + +# ------------------ AI client ------------------ + +class GeminiClient: + def __init__(self, api_url: str, api_key: Optional[str] = None, timeout: int = 600): + self.api_url = api_url + self.api_key = api_key + self.timeout = timeout + + def call(self, prompt: str) -> Any: + if requests is None: + raise RuntimeError("requests must be installed to call the API") + + headers = {"Content-Type": "application/json", "Accept": "application/json"} + if self.api_key: + headers[os.environ.get('API_KEY_HEADER', 'api-key')] = self.api_key + + payload = { + "contents": [{"role": "user", "parts": [{"text": prompt}]}], + "generationConfig": {"temperature": 0.0, "maxOutputTokens": 3000}, + } + resp = requests.post(self.api_url, headers=headers, json=payload, timeout=self.timeout) + try: + return resp.json() + except Exception: + return resp.text + + +# ------------------ Markdown formatting ------------------ + +def format_as_markdown(parsed: Any) -> str: + """Take parsed AI output (dict or str) and render a human-friendly Markdown string.""" + # If string -> assume it's already a narrative markdown + if isinstance(parsed, str): + # Clean up extra JSON noise if any + parsed = parsed.strip() + # If the string is a long JSON, try to decode + maybe = find_json_in_text(parsed) + if maybe: + parsed = maybe + else: + return parsed + + # If dict with expected keys, render sections + if isinstance(parsed, dict): + parts: List[str] = ["# AIによるルール修正案", "---", ""] + + # FP + fp = parsed.get("false_positive_patterns") + if fp: + parts.append("## 現状分析 (FP: 誤検知)") + if isinstance(fp, (list, tuple)): + parts.extend([f"- {i}" for i in fp]) + else: + parts.append(str(fp)) + parts.append("") + + # FN + fn = parsed.get("false_negative_patterns") + if fn: + parts.append("## 現状分析 (FN: 検知漏れ)") + if isinstance(fn, (list, tuple)): + parts.extend([f"- {i}" for i in fn]) + else: + parts.append(str(fn)) + parts.append("") + + # Risk analysis + ra = parsed.get("risk_impact_analysis") or parsed.get("risk") + if ra: + parts.append("## リスクと影響の分析") + parts.append(str(ra) if not isinstance(ra, (dict, list)) else json.dumps(ra, ensure_ascii=False, indent=2)) + parts.append("") + + # Rule diff + rd = parsed.get("rule_diff") + if rd: + parts.append("## 修正方針 (差分)") + parts.append("```text") + parts.append(str(rd) if not isinstance(rd, (dict, list)) else json.dumps(rd, ensure_ascii=False, indent=2)) + parts.append("```") + parts.append("") + + # Rule after + raft = parsed.get("rule_after") or parsed.get("rule_after_rule") + if raft: + parts.append("## 修正後のルール") + parts.append("```text") + parts.append(str(raft)) + parts.append("```") + parts.append("") + + return "\n".join(parts) + + # Fall back to JSON block + try: + return "```json\n" + json.dumps(parsed, ensure_ascii=False, indent=2) + "\n```" + except Exception: + return str(parsed) + + +# ------------------ UI ------------------ + +class RuleTunerApp(tk.Tk): + def __init__(self): + super().__init__() + self.title('Rule Tuner - Markdown (Gemini)') + self.geometry('1100x760') + + self.rule_path = tk.StringVar() + self.fp_path = tk.StringVar() + self.fn_path = tk.StringVar() + self.env_path = tk.StringVar() + self.api_url = tk.StringVar() + self.api_key = tk.StringVar() + + self._build_gui() + + def _build_gui(self): + top = ttk.Frame(self) + top.pack(fill=tk.X, padx=8, pady=6) + + def add_row(label, var): + row = ttk.Frame(top) + row.pack(fill=tk.X, pady=2) + ttk.Label(row, text=label, width=12).pack(side=tk.LEFT) + ttk.Entry(row, textvariable=var).pack(side=tk.LEFT, fill=tk.X, expand=True) + ttk.Button(row, text='Browse', command=lambda v=var: self._browse(v)).pack(side=tk.LEFT) + + add_row('Rule', self.rule_path) + add_row('FP JSONL', self.fp_path) + add_row('FN JSONL', self.fn_path) + add_row('Env JSON', self.env_path) + + api_row = ttk.Frame(top) + api_row.pack(fill=tk.X, pady=2) + ttk.Label(api_row, text='API URL', width=12).pack(side=tk.LEFT) + ttk.Entry(api_row, textvariable=self.api_url).pack(side=tk.LEFT, fill=tk.X, expand=True) + ttk.Label(api_row, text='API Key', width=8).pack(side=tk.LEFT) + ttk.Entry(api_row, textvariable=self.api_key, show='*').pack(side=tk.LEFT) + + btn_row = ttk.Frame(top) + btn_row.pack(fill=tk.X, pady=6) + ttk.Button(btn_row, text='Build Prompt', command=self.build_prompt).pack(side=tk.LEFT) + ttk.Button(btn_row, text='Send to API', command=self.send_to_api).pack(side=tk.LEFT) + + mid = ttk.PanedWindow(self, orient=tk.HORIZONTAL) + mid.pack(fill=tk.BOTH, expand=True, padx=8, pady=6) + + left = ttk.Frame(mid) + right = ttk.Frame(mid) + mid.add(left, weight=1) + mid.add(right, weight=1) + + ttk.Label(left, text='Prompt Preview').pack(anchor=tk.W) + self.prompt_view = ScrolledText(left, height=24) + self.prompt_view.pack(fill=tk.BOTH, expand=True) + + ttk.Label(right, text='出力: 分析 / 変更案 (Markdown)').pack(anchor=tk.W) + self.output_view = ScrolledText(right, height=24) + self.output_view.pack(fill=tk.BOTH, expand=True) + + def _browse(self, var: tk.StringVar): + p = filedialog.askopenfilename() + if p: + var.set(p) + + def build_prompt(self): + try: + rule_text = read_text_file(self.rule_path.get().strip()) + fp = read_jsonl(self.fp_path.get().strip()) + fn = read_jsonl(self.fn_path.get().strip()) + env = read_json_file(self.env_path.get().strip()) + prompt = self._compose_prompt(env, rule_text, fp, fn) + self.prompt_view.delete('1.0', tk.END) + self.prompt_view.insert(tk.END, prompt) + messagebox.showinfo('OK', 'Prompt built') + except Exception as e: + messagebox.showerror('Error', f'Failed to build prompt: {e}') + + def _compose_prompt(self, env: Dict[str, Any], rule_text: str, fp_head: List[Dict[str, Any]], fn_head: List[Dict[str, Any]]) -> str: + parts = [BASE_SPEC_JSON, FORCE_JSON, '\n【既存ルール(全文)】\n', rule_text, '\n【環境制約】\n', json.dumps(env, ensure_ascii=False, indent=2), '\n【FP】\n', json.dumps(fp_head[:5], ensure_ascii=False, indent=2), '\n【FN】\n', json.dumps(fn_head[:5], ensure_ascii=False, indent=2)] + return '\n'.join(parts) + + def send_to_api(self): + prompt = self.prompt_view.get('1.0', tk.END).strip() + if not prompt: + messagebox.showwarning('Empty prompt', 'Please build the prompt first.') + return + url = self.api_url.get().strip() or os.environ.get('DEFAULT_API_URL') + if not url: + messagebox.showwarning('API URL missing', 'Please enter API URL or set DEFAULT_API_URL') + return + client = GeminiClient(url, self.api_key.get().strip() or None) + self.output_view.delete('1.0', tk.END) + self.output_view.insert(tk.END, 'Calling API...') + self.update_idletasks() + + try: + raw = client.call(prompt) + # Extract narrative and JSON + narrative = extract_text_from_gemini(raw) + parsed = None + if isinstance(raw, dict) and any(k in raw for k in EXPECTED_KEYS): + parsed = raw + else: + # try to locate JSON in narrative + parsed = find_json_in_text(narrative) + # If parsed is None, the narrative may be the only thing + md = format_as_markdown(parsed if parsed else narrative) + self.output_view.delete('1.0', tk.END) + self.output_view.insert(tk.END, md) + except Exception as e: + self.output_view.delete('1.0', tk.END) + self.output_view.insert(tk.END, f'API call failed: {e}') + + +if __name__ == '__main__': + RuleTunerApp().mainloop() +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Single-file Rule Tuner UI + AI client that always displays AI outputs as Markdown +- Keeps the original features but converts structured AI responses to Markdown +- No automatic JSON debug output (only if SHOW_DEBUG environment var is set) +""" + +import json +import os +import re +import tkinter as tk +from tkinter import ttk, filedialog, messagebox +from tkinter.scrolledtext import ScrolledText +from typing import Any, Dict, List, Optional + +try: + import requests +except Exception: + requests = None # we can still run an offline flow + +# ---------- utilities ---------- + +def read_text_file(path: str) -> str: + if os.path.isfile(path): + with open(path, 'r', encoding='utf-8') as f: + return f.read() + return path + + +def read_json_file(path: str) -> Dict[str, Any]: + if os.path.isfile(path): + with open(path, 'r', encoding='utf-8') as f: + return json.load(f) + return json.loads(path) + + +def read_jsonl(path: str) -> List[Dict[str, Any]]: + if os.path.isfile(path): + with open(path, 'r', encoding='utf-8') as f: + lines = [l.strip() for l in f if l.strip()] + else: + lines = [l.strip() for l in path.splitlines() if l.strip()] + out = [] + for ln in lines: + try: + out.append(json.loads(ln)) + except Exception: + continue + return out + + +# ---------- normalization (keeps behavior) ---------- + +def normalize_response(data: Any) -> Any: + expected_keys = {"false_positive_patterns", "false_negative_patterns", "rule_diff", "risk_impact_analysis", "rule_after"} + + def is_expected(obj: Any) -> bool: + return isinstance(obj, dict) and any(k in obj for k in expected_keys) + + # Already a desired structure + if is_expected(data): + return data + + # If the service wrapped JSON in codeblock, try to strip it. + # Prefer narrative text outside codeblocks so users see plain Markdown rather than raw JSON. + if isinstance(data, str): + # regex matching fenced codeblocks like ```json ... ``` + codeblock_re = re.compile(r"```(?:json|JSON|markdown)?\s*\n([\s\S]*?)\n```", flags=re.S) + + # If there is text outside of fenced codeblocks, prefer that. + split_parts = codeblock_re.split(data) + if split_parts: + # parts at even indices are the outside-of-code content + outside = "".join(split_parts[i] for i in range(0, len(split_parts), 2)).strip() + if outside: + # Also remove embedded JSON-like fragments inside the narrative + return _remove_json_fragments(outside) + + # Otherwise, try to parse the JSON from the first codeblock if present + m = codeblock_re.search(data) + if m: + try: + return normalize_response(json.loads(m.group(1))) + except Exception: + pass + try: + parsed = json.loads(data) + return normalize_response(parsed) + except Exception: + return data + + # If list with Gemini style content, pull text + if isinstance(data, list) and len(data) > 0: + if isinstance(data[0], dict) and 'content' in data[0]: + try: + text_content = data[0]['content']['parts'][0]['text'] + return normalize_response(text_content) + except Exception: + pass + + # join textual parts + texts: List[str] = [] + for item in data: + if isinstance(item, str): + texts.append(item) + elif isinstance(item, dict): + if is_expected(item): + return item + # try to get text + text = item.get('text') or item.get('content') or None + if isinstance(text, str): + texts.append(text) + elif isinstance(text, dict): + # try deeper + parts = text.get('parts') or [] + for p in parts: + if isinstance(p, dict) and isinstance(p.get('text'), str): + texts.append(p.get('text')) + if texts: + return normalize_response('\n'.join(texts)) + + # dict wrapper + if isinstance(data, dict): + for key in ('text', 'content', 'message', 'output'): + val = data.get(key) + if val is not None: + return normalize_response(val) + # arrays inside + for key in ('outputs', 'choices', 'results', 'candidates'): + val = data.get(key) + if isinstance(val, list): + return normalize_response(val) + + return data + + +def _remove_json_fragments(text: str) -> str: + """ + Remove JSON-like blocks and standalone JSON key/value lines from ``text``. + We only remove blocks that look like they contain the expected rule keys + (false_positive_patterns, rule_diff, etc.) to avoid stripping natural text. + """ + expected_keys = ("false_positive_patterns", "false_negative_patterns", "rule_diff", "risk_impact_analysis", "rule_after") + + # Remove fenced codeblocks first (already handled upstream but keep for safety) + text = re.sub(r"```(?:json|JSON|markdown)?\s*\n[\s\S]*?\n```", "", text, flags=re.S) + + # Remove balanced JSON objects that include any expected key + out = [] + i = 0 + n = len(text) + while i < n: + if text[i] == '{': + depth = 1 + j = i + 1 + while j < n and depth > 0: + if text[j] == '{': + depth += 1 + elif text[j] == '}': + depth -= 1 + j += 1 + candidate = text[i:j] + if any(k in candidate for k in expected_keys): + # skip this fragment + i = j + continue + else: + out.append(candidate) + i = j + continue + else: + out.append(text[i]) + i += 1 + cleaned = ''.join(out) + + # Remove JSON style key lines such as " "rule_after": "...", + cleaned = re.sub(r"^\s*\"[a-zA-Z0-9_]+\"\s*:\s*[^\n]*\n?", "", cleaned, flags=re.M) + + # Trim extraneous punctuation or leftover commas / brackets + cleaned = re.sub(r"[,\[\]]{2,}", "", cleaned) + # Collapse multiple blank lines + cleaned = re.sub(r"\n{2,}", "\n\n", cleaned) + + return cleaned.strip() + + +# ---------- AI client ---------- + +class SimpleAIClient: + def __init__(self, api_url: str, api_key: Optional[str] = None, timeout: int = 600): + self.api_url = api_url + self.api_key = api_key + self.timeout = timeout + + def generate(self, prompt: str) -> Any: + if requests is None: + raise RuntimeError("requests module is required for API calls") + + headers = {'Content-Type': 'application/json', 'Accept': 'application/json'} + api_key_header = os.environ.get('API_KEY_HEADER', 'api-key') + if self.api_key: + if api_key_header.lower() == 'authorization': + headers['Authorization'] = f'Bearer {self.api_key}' + else: + headers[api_key_header] = self.api_key + + payload = { + 'contents': [{'role': 'user', 'parts': [{'text': prompt}]}], + 'generationConfig': { + 'temperature': 0.2, + 'maxOutputTokens': 10000, + } + } + + resp = requests.post(self.api_url, headers=headers, json=payload, timeout=self.timeout) + try: + return normalize_response(resp.json()) + except Exception: + return normalize_response(resp.text) + + +# ---------- prompt builder (kept minimal) ---------- + +BASE_SPEC_JSON = """指示に厳密に従い、以下のキーを持つJSONオブジェクトのみを生成せよ。 +- false_positive_patterns: 文字列の配列 +- false_negative_patterns: 文字列の配列 +- rule_diff: オブジェクトまたは文字列 +- risk_impact_analysis: オブジェクトまたは文字列 +- rule_after: 文字列 +""" + +FORCE_JSON = """応答は必ず以下の形式のJSONコードブロックのみとすること。 + +json +{ +"false_positive_patterns": [ ... ], +"false_negative_patterns": [ ... ], +"rule_diff": "...", +"risk_impact_analysis": "...", +"rule_after": "..." +} + +挨拶、説明、要約など、このJSONコードブロック以外のテキストは一切含めてはならない。 +""" + + +def build_prompt(env: Dict[str, Any], rule_text: str, fp_head: List[Dict[str, Any]], fn_head: List[Dict[str, Any]], extra: str = '') -> str: + parts = [BASE_SPEC_JSON, FORCE_JSON] + parts.append('\n【既存ルール(全文)】\n') + parts.append(rule_text) + parts.append('\n【環境制約】\n') + parts.append(json.dumps(env, ensure_ascii=False, indent=2)) + parts.append('\n【FP】\n') + parts.append(json.dumps(fp_head[:5], ensure_ascii=False, indent=2)) + parts.append('\n【FN】\n') + parts.append(json.dumps(fn_head[:5], ensure_ascii=False, indent=2)) + if extra: + parts.append('\n【追加指示】') + parts.append(extra) + return '\n'.join(parts) + + +# ---------- Markdown conversion ---------- + +def _render_section_list(title: str, items: Any) -> List[str]: + md: List[str] = [f"## {title}", ""] + if isinstance(items, list): + for it in items: + md.append(f"- {it}") + else: + md.append(str(items)) + md.append("") + return md + + +def _render_section_text(title: str, text: Any, code: bool = False) -> List[str]: + md: List[str] = [f"## {title}", ""] + if isinstance(text, (dict, list)): + try: + pretty = json.dumps(text, ensure_ascii=False, indent=2) + if code: + md.append("```text") + md.append(pretty) + md.append("```") + else: + md.append(pretty) + except Exception: + md.append(str(text)) + else: + if isinstance(text, str): + md.append(text.replace("\\n", "\n")) + else: + md.append(str(text)) + md.append("") + return md + + +def format_thinking(ai_result: Any) -> str: + """ + Convert the AI result into Markdown. + If it's a string -> assume it's already Markdown and return it. + If it's a dict with the expected keys -> format the sections nicely. + Otherwise, convert to a short JSON code block. + + Note: No raw debug JSON is appended by default. Set env SHOW_DEBUG=1 to enable raw dump. + """ + if isinstance(ai_result, str): + return ai_result + + if isinstance(ai_result, dict): + sections_map = [ + ("現状分析 (FP: 誤検知)", "false_positive_patterns", True), + ("現状分析 (FN: 検知漏れ)", "false_negative_patterns", True), + ("リスクと影響の分析", "risk_impact_analysis", False), + ("修正方針 (差分)", "rule_diff", True), + ("修正後のルール", "rule_after", True), + ] + + md: List[str] = ["# AIによるルール修正案", "---", ""] + + for title, key, is_list in sections_map: + content = ai_result.get(key) + if content is None or content == "": + continue + + # If the item is a list and maps to bullet style + if is_list and isinstance(content, (list, tuple)): + md.extend(_render_section_list(title, content)) + elif is_list and isinstance(content, str) and ('\n' in content or content.strip().startswith('[')): + # try parsing bracketed lists in strings + try: + parsed = json.loads(content) + md.extend(_render_section_list(title, parsed)) + except Exception: + md.extend(_render_section_text(title, content, code=False)) + elif key in ("rule_diff", "rule_after"): + # code block display for these keys + md.extend(_render_section_text(title, content, code=True)) + else: + md.extend(_render_section_text(title, content, code=False)) + + # Optionally add raw JSON if user wants it + if os.environ.get('SHOW_DEBUG'): + md.append("---") + md.append("## DEBUG: Raw AI response (JSON)") + md.append("```json") + try: + md.append(json.dumps(ai_result, ensure_ascii=False, indent=2)) + except Exception: + md.append(str(ai_result)) + md.append("```") + + return "\n".join(md) + + # fallback: print a concise json block as markdown + try: + return "```json\n" + json.dumps(ai_result, ensure_ascii=False, indent=2) + "\n```" + except Exception: + return str(ai_result) + + +# ---------- minimal GUI ---------- + +class App(tk.Tk): + def __init__(self): + super().__init__() + self.title('Rule Tuner - Markdown') + self.geometry('1000x720') + + self.rule_path = tk.StringVar() + self.fp_path = tk.StringVar() + self.fn_path = tk.StringVar() + self.env_path = tk.StringVar() + self.api_url = tk.StringVar() + self.api_key = tk.StringVar() + + self.prompt_text: Optional[str] = None + self.ai_result: Optional[Any] = None + + self._build_widgets() + + def _build_widgets(self): + pad = {'padx': 6, 'pady': 4} + top = ttk.Frame(self) + top.pack(fill=tk.X, **pad) + + def add_row(label, var): + row = ttk.Frame(top) + row.pack(fill=tk.X, **pad) + ttk.Label(row, text=label, width=12).pack(side=tk.LEFT) + ttk.Entry(row, textvariable=var).pack(side=tk.LEFT, fill=tk.X, expand=True) + ttk.Button(row, text='Browse', command=lambda: self._browse(var)).pack(side=tk.LEFT) + + add_row('Rule', self.rule_path) + add_row('FP JSONL', self.fp_path) + add_row('FN JSONL', self.fn_path) + add_row('Env JSON', self.env_path) + + api_frame = ttk.Frame(self) + api_frame.pack(fill=tk.X, **pad) + ttk.Label(api_frame, text='API URL', width=12).pack(side=tk.LEFT) + ttk.Entry(api_frame, textvariable=self.api_url).pack(side=tk.LEFT, fill=tk.X, expand=True) + ttk.Label(api_frame, text='API Key', width=8).pack(side=tk.LEFT) + ttk.Entry(api_frame, textvariable=self.api_key, show='*').pack(side=tk.LEFT) + + btn_frame = ttk.Frame(self) + btn_frame.pack(fill=tk.X, **pad) + ttk.Button(btn_frame, text='Build Prompt', command=self.on_build_prompt).pack(side=tk.LEFT) + ttk.Button(btn_frame, text='Send to API', command=self.on_send_api).pack(side=tk.LEFT) + + mid = ttk.PanedWindow(self, orient=tk.HORIZONTAL) + mid.pack(fill=tk.BOTH, expand=True, **pad) + + left = ttk.Frame(mid) + right = ttk.Frame(mid) + mid.add(left, weight=1) + mid.add(right, weight=1) + + ttk.Label(left, text='Prompt Preview').pack(anchor=tk.W) + self.prompt_view = ScrolledText(left, height=20) + self.prompt_view.pack(fill=tk.BOTH, expand=True) + + ttk.Label(right, text='思考プロセス / 現状分析 / 修正方針 (マークダウン)').pack(anchor=tk.W) + self.output_container = ttk.Frame(right) + self.output_container.pack(fill=tk.BOTH, expand=True) + + self.output_widget = ScrolledText(self.output_container, height=20) + self.output_widget.pack(fill=tk.BOTH, expand=True) + + def _browse(self, var: tk.StringVar): + p = filedialog.askopenfilename() + if p: + var.set(p) + + def on_build_prompt(self): + try: + rule_text = read_text_file(self.rule_path.get().strip()) + fp = read_jsonl(self.fp_path.get().strip()) + fn = read_jsonl(self.fn_path.get().strip()) + env = read_json_file(self.env_path.get().strip()) + prompt = build_prompt(env, rule_text, fp, fn, extra='') + self.prompt_text = prompt + self.prompt_view.delete('1.0', tk.END) + self.prompt_view.insert(tk.END, prompt) + messagebox.showinfo('OK', 'Prompt built') + except Exception as e: + messagebox.showerror('Error', f'Build prompt failed: {e}') + + def on_send_api(self): + if not self.prompt_text: + messagebox.showwarning('未作成', '先に Build Prompt を実行してください') + return + api_url = self.api_url.get().strip() or os.environ.get('DEFAULT_API_URL') + if not api_url: + messagebox.showwarning('API未設定', 'API URL を入力するか DEFAULT_API_URL 環境変数を設定してください') + return + client = SimpleAIClient(api_url, self.api_key.get().strip() or None) + self.show_output('Calling API...') + self.update_idletasks() + + try: + result = client.generate(self.prompt_text) + self.ai_result = result + display = format_thinking(result) + self.show_output(display) + except Exception as e: + self.show_output(f'API call failed:\n{e}\n') + + def show_output(self, md_text: str): + txt_widget = self.output_widget + txt_widget.configure(state='normal') + txt_widget.delete('1.0', tk.END) + txt_widget.insert(tk.END, md_text) + txt_widget.configure(state='disabled') + + +if __name__ == '__main__': + App().mainloop() diff --git a/src/pages/python-playground.tsx b/src/pages/python-playground.tsx new file mode 100644 index 0000000000..1e4984f0f5 --- /dev/null +++ b/src/pages/python-playground.tsx @@ -0,0 +1,269 @@ +import { type NextPage } from "next"; +import Head from "next/head"; +import { useCallback, useEffect, useMemo, useState } from "react"; +import Button from "../components/Button"; +import DefaultLayout from "../layout/default"; + +const PYODIDE_VERSION = "0.24.1"; +const DEFAULT_CODE = `import numpy as np +import matplotlib.pyplot as plt + +x = np.linspace(0, 2 * np.pi, 200) +y = np.sin(x) + +plt.figure(figsize=(6, 3)) +plt.plot(x, y) +plt.title("Sine wave") +plt.xlabel("x") +plt.ylabel("sin(x)") +plt.grid(True) +plt.show() +`; + +declare global { + interface Window { + loadPyodide?: (options: { indexURL: string }) => Promise; + } +} + +const PythonPlaygroundPage: NextPage = () => { + const [pyodide, setPyodide] = useState(null); + const [code, setCode] = useState(DEFAULT_CODE); + const [output, setOutput] = useState(""); + const [images, setImages] = useState([]); + const [isLoading, setIsLoading] = useState(true); + const [statusMessage, setStatusMessage] = useState("Python環境を読み込み中..."); + const [isRunning, setIsRunning] = useState(false); + const [errorMessage, setErrorMessage] = useState(null); + + useEffect(() => { + let cancelled = false; + + const loadPyodideRuntime = async () => { + if (typeof window === "undefined") { + return; + } + + try { + setStatusMessage("Pyodideをダウンロード中..."); + if (!window.loadPyodide) { + await new Promise((resolve, reject) => { + const script = document.createElement("script"); + script.src = `https://cdn.jsdelivr.net/pyodide/v${PYODIDE_VERSION}/full/pyodide.js`; + script.onload = () => resolve(); + script.onerror = () => reject(new Error("Pyodideの読み込みに失敗しました")); + document.body.appendChild(script); + }); + } + + if (!window.loadPyodide) { + throw new Error("Pyodideが利用できません"); + } + + setStatusMessage("Python環境を初期化しています..."); + const instance = await window.loadPyodide({ + indexURL: `https://cdn.jsdelivr.net/pyodide/v${PYODIDE_VERSION}/full/`, + }); + + if (cancelled) { + return; + } + + setStatusMessage("科学計算ライブラリを読み込み中..."); + await instance.loadPackage(["numpy", "matplotlib"]); + + if (cancelled) { + return; + } + + setPyodide(instance); + setIsLoading(false); + setStatusMessage("Python環境の準備が整いました!"); + } catch (error) { + if (!cancelled) { + const message = + error instanceof Error ? error.message : "不明なエラーが発生しました"; + setErrorMessage(message); + setIsLoading(false); + } + } + }; + + void loadPyodideRuntime(); + + return () => { + cancelled = true; + }; + }, []); + + const runCode = useCallback(async () => { + if (!pyodide) { + return; + } + + setIsRunning(true); + setImages([]); + setOutput(""); + setErrorMessage(null); + + const pushFigure = (img: unknown) => { + if (typeof img === "string") { + setImages((prev) => [...prev, img]); + } + }; + + pyodide.globals.set("send_figure", pushFigure); + + try { + const result = await pyodide.runPythonAsync(` +import sys +import io +import base64 + +from js import send_figure + +_stdout = sys.stdout +_stderr = sys.stderr +_buffer = io.StringIO() +sys.stdout = _buffer +sys.stderr = _buffer + +import matplotlib +matplotlib.use("AGG") +from matplotlib import pyplot as plt + +import io as _img_io + +def _flush_figures(): + figs = plt.get_fignums() + for fig in figs: + plt.figure(fig) + buf = _img_io.BytesIO() + plt.savefig(buf, format="png", bbox_inches="tight") + buf.seek(0) + send_figure("data:image/png;base64," + base64.b64encode(buf.read()).decode("ascii")) + buf.close() + plt.close(fig) + +def show(*args, **kwargs): + _flush_figures() + +plt.show = show + +${code} + +_flush_figures() +sys.stdout = _stdout +sys.stderr = _stderr +_buffer.getvalue() +`); + + const textOutput = + typeof result === "string" + ? result + : typeof result?.toString === "function" + ? result.toString() + : ""; + + setOutput(textOutput); + + if (result && typeof (result as { destroy?: () => void }).destroy === "function") { + (result as { destroy: () => void }).destroy(); + } + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + setErrorMessage(message); + } finally { + setIsRunning(false); + } + }, [code, pyodide]); + + const pageTitle = useMemo( + () => "Python Playground | AgentGPT", + [] + ); + + return ( + + + {pageTitle} + +
+
+
+

Pythonプレイグラウンド

+

+ ブラウザ上でPythonコードを実行して、その場でグラフを確認できます。NumPyとMatplotlibは既にインストール済みです。 +

+
+ +
+
+ +