diff --git a/backend/api/onboarding_utils/step_management_service.py b/backend/api/onboarding_utils/step_management_service.py index dc1726b6..da4c5213 100644 --- a/backend/api/onboarding_utils/step_management_service.py +++ b/backend/api/onboarding_utils/step_management_service.py @@ -13,6 +13,7 @@ from api.content_planning.services.content_strategy.onboarding import OnboardingDataIntegrationService from services.database import get_db from models.onboarding import OnboardingSession, APIKey, WebsiteAnalysis, ResearchPreferences, PersonaData, CompetitorAnalysis +from services.intelligence.agent_flat_context import AgentFlatContextStore class StepManagementService: """Service for handling onboarding step management.""" @@ -62,6 +63,7 @@ def _save_api_key(self, user_id: str, provider: str, api_key: str, db: Session) db.add(new_key) db.commit() + return True except Exception as e: logger.error(f"Error saving API key for user {user_id}: {e}") @@ -139,6 +141,39 @@ def _save_website_analysis(self, user_id: str, analysis_data: Dict[str, Any], db db.add(new_analysis) db.commit() + + # Persist Step 2 snapshot to agent flat-file context for ultra-fast reads + try: + flat_store = AgentFlatContextStore(user_id) + canonical_payload = { + "website_url": filtered_data.get("website_url") or incoming.get("website") or incoming.get("website_url"), + "analysis_date": datetime.utcnow().isoformat(), + "status": (nested or incoming).get("status") or "completed", + "error_message": (nested or incoming).get("error_message"), + "warning_message": (nested or incoming).get("warning_message"), + "writing_style": filtered_data.get("writing_style"), + "content_characteristics": filtered_data.get("content_characteristics"), + "target_audience": filtered_data.get("target_audience"), + "content_type": filtered_data.get("content_type"), + "recommended_settings": filtered_data.get("recommended_settings"), + "brand_analysis": filtered_data.get("brand_analysis"), + "content_strategy_insights": filtered_data.get("content_strategy_insights"), + "social_media_presence": filtered_data.get("social_media_presence"), + "style_patterns": filtered_data.get("style_patterns"), + "style_guidelines": filtered_data.get("style_guidelines"), + "seo_audit": filtered_data.get("seo_audit"), + "strategic_insights_history": (nested or incoming).get("strategic_insights_history"), + "crawl_result": filtered_data.get("crawl_result"), + "meta_info": meta_info, + "sitemap_analysis": sitemap_analysis, + "raw_step2_payload": incoming, + "raw_analysis_payload": nested or incoming, + "saved_at": datetime.utcnow().isoformat(), + } + flat_store.save_step2_website_analysis(canonical_payload, source="onboarding_step2") + except Exception as flat_err: + logger.warning(f"Failed to persist step 2 flat context for user {user_id}: {flat_err}") + return True except Exception as e: logger.error(f"Error saving website analysis for user {user_id}: {e}") @@ -193,6 +228,28 @@ def _save_research_preferences(self, user_id: str, research_data: Dict[str, Any] db.add(new_prefs) db.commit() + + # Persist Step 3 snapshot to agent flat-file context + try: + flat_store = AgentFlatContextStore(user_id) + canonical_payload = { + "research_depth": research_data.get("research_depth"), + "content_types": research_data.get("content_types") or [], + "auto_research": research_data.get("auto_research", True), + "factual_content": research_data.get("factual_content", True), + "writing_style": research_data.get("writing_style") or {}, + "content_characteristics": research_data.get("content_characteristics") or {}, + "target_audience": research_data.get("target_audience") or {}, + "recommended_settings": research_data.get("recommended_settings") or {}, + "industry_context": research_data.get("industry_context") or research_data.get("industryContext"), + "competitors": research_data.get("competitors") if isinstance(research_data.get("competitors"), list) else [], + "saved_at": datetime.utcnow().isoformat(), + "source_payload": research_data, + } + flat_store.save_step3_research_preferences(canonical_payload, source="onboarding_step3") + except Exception as flat_err: + logger.warning(f"Failed to persist step 3 flat context for user {user_id}: {flat_err}") + return True except Exception as e: logger.error(f"Error saving research preferences for user {user_id}: {e}") @@ -268,6 +325,22 @@ def _save_competitor_analysis(self, user_id: str, competitors: List[Dict[str, An db.commit() logger.info(f"✅ Saved {saved_count} competitors ({failed_count} failed)") + + # Refresh Step 3 flat context with competitor details saved by this flow + try: + flat_store = AgentFlatContextStore(user_id) + existing_doc = flat_store.load_step3_context_document() or {} + existing_data = existing_doc.get("data") if isinstance(existing_doc, dict) and isinstance(existing_doc.get("data"), dict) else {} + merged_payload = { + **existing_data, + "competitors": competitors, + "industry_context": industry_context or existing_data.get("industry_context"), + "competitors_saved_at": datetime.utcnow().isoformat(), + } + flat_store.save_step3_research_preferences(merged_payload, source="onboarding_step3_competitors") + except Exception as flat_err: + logger.warning(f"Failed to refresh step 3 competitor flat context for user {user_id}: {flat_err}") + return True except Exception as e: logger.error(f"Error saving competitor analysis for user {user_id}: {e}") @@ -275,6 +348,25 @@ def _save_competitor_analysis(self, user_id: str, competitors: List[Dict[str, An raise e + + def _save_step5_integrations_context(self, user_id: str, step5_data: Dict[str, Any]) -> bool: + """Persist Step 5 integrations context to flat-file store.""" + try: + flat_store = AgentFlatContextStore(user_id) + canonical_payload = { + "integrations": step5_data.get("integrations") if isinstance(step5_data.get("integrations"), dict) else {}, + "providers": step5_data.get("providers") if isinstance(step5_data.get("providers"), list) else [], + "connected_accounts": step5_data.get("connectedAccounts") if isinstance(step5_data.get("connectedAccounts"), list) else [], + "integration_status": step5_data.get("status") or step5_data.get("integrationStatus"), + "notes": step5_data.get("notes") or step5_data.get("integrationNotes"), + "saved_at": datetime.utcnow().isoformat(), + "source_payload": step5_data, + } + return flat_store.save_step5_integrations(canonical_payload, source="onboarding_step5") + except Exception as e: + logger.warning(f"Failed to save Step 5 integrations context for user {user_id}: {e}") + return False + def _save_persona_data(self, user_id: str, persona_data: Dict[str, Any], db: Session) -> bool: """Save persona data directly to database.""" try: @@ -301,6 +393,24 @@ def _save_persona_data(self, user_id: str, persona_data: Dict[str, Any], db: Ses db.add(persona) db.commit() + + # Persist Step 4 snapshot to agent flat-file context + try: + flat_store = AgentFlatContextStore(user_id) + canonical_payload = { + "core_persona": persona_data.get("corePersona") or {}, + "platform_personas": persona_data.get("platformPersonas") or {}, + "quality_metrics": persona_data.get("qualityMetrics") or {}, + "selected_platforms": persona_data.get("selectedPlatforms", []), + "research_persona": persona_data.get("researchPersona") or persona_data.get("research_persona"), + "persona_generation_notes": persona_data.get("personaGenerationNotes") or persona_data.get("persona_generation_notes"), + "saved_at": datetime.utcnow().isoformat(), + "source_payload": persona_data, + } + flat_store.save_step4_persona_data(canonical_payload, source="onboarding_step4") + except Exception as flat_err: + logger.warning(f"Failed to persist step 4 flat context for user {user_id}: {flat_err}") + return True except Exception as e: logger.error(f"Error saving persona data for user {user_id}: {e}") @@ -635,6 +745,19 @@ async def complete_step(self, step_number: int, request_data: Dict[str, Any], cu detail="Failed to save persona data. Onboarding cannot proceed until this is resolved." ) from e + + # Step 5: Save integrations data to flat context + elif step_number == 5 and request_data: + step5_data = request_data.get('data') or request_data + logger.info(f"🔍 Step 5: Raw request_data keys: {list(request_data.keys()) if request_data else 'None'}") + logger.info(f"🔍 Step 5: Extracted step5_data keys: {list(step5_data.keys()) if step5_data else 'None'}") + if step5_data: + saved = self._save_step5_integrations_context(user_id, step5_data) + if saved: + logger.info(f"✅ Saved Step 5 integrations context for user {user_id}") + else: + logger.warning(f"⚠️ Step 5 integrations context not persisted for user {user_id}") + # Persist current step and progress in DB from services.onboarding.progress_service import OnboardingProgressService progress_service = OnboardingProgressService() diff --git a/backend/services/intelligence/agent_flat_context.py b/backend/services/intelligence/agent_flat_context.py new file mode 100644 index 00000000..ad81a91b --- /dev/null +++ b/backend/services/intelligence/agent_flat_context.py @@ -0,0 +1,528 @@ +"""Flat-file context storage for AI agents. + +Stores onboarding context in per-user workspace files, optimized for fast agent reads. +Includes minimal security hardening, context-size controls, and internal document linking. +""" + +from __future__ import annotations + +import json +import os +import tempfile +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Optional, Tuple + +from loguru import logger + + +class AgentFlatContextStore: + """Read/write agent-only flat-file context in per-user workspace.""" + + CONTEXT_DIRNAME = "agent_context" + STEP2_FILENAME = "step2_website_analysis.json" + STEP3_FILENAME = "step3_research_preferences.json" + STEP4_FILENAME = "step4_persona_data.json" + STEP5_FILENAME = "step5_integrations.json" + MANIFEST_FILENAME = "context_manifest.json" + + SCHEMA_VERSION = "1.3" + DEFAULT_MAX_BYTES = 300_000 + SUMMARY_TEXT_LIMIT = 800 + + def __init__(self, user_id: str): + self.user_id = user_id + self.safe_user_id = self._sanitize_user_id(user_id) + + @staticmethod + def _sanitize_user_id(user_id: str) -> str: + safe = "".join(c for c in str(user_id) if c.isalnum() or c in ("-", "_")) + return safe or "unknown_user" + + def _workspace_dir(self) -> Path: + root_dir = Path(__file__).resolve().parents[3] + return root_dir / "workspace" / f"workspace_{self.safe_user_id}" + + def _context_dir(self) -> Path: + return self._workspace_dir() / self.CONTEXT_DIRNAME + + def _context_file(self, filename: str) -> Path: + return self._context_dir() / filename + + @staticmethod + def _estimate_size_bytes(value: Any) -> int: + try: + return len(json.dumps(value, ensure_ascii=False).encode("utf-8")) + except Exception: + return 0 + + @staticmethod + def _to_context_list(value: Any) -> Any: + if value is None: + return [] + if isinstance(value, list): + return value + if isinstance(value, dict): + return list(value.keys()) + return [str(value)] + + @staticmethod + def _truncate_text(value: Any, max_chars: int = SUMMARY_TEXT_LIMIT) -> str: + text = value if isinstance(value, str) else "" + if len(text) <= max_chars: + return text + return f"{text[:max_chars]}..." + + @staticmethod + def _redact_sensitive(data: Any) -> Any: + """Minimal recursive redaction for sensitive-like keys in payload snapshots.""" + sensitive_tokens = {"api_key", "token", "secret", "password", "authorization", "cookie"} + if isinstance(data, dict): + redacted = {} + for k, v in data.items(): + key_lower = str(k).lower() + if any(token in key_lower for token in sensitive_tokens): + redacted[k] = "[REDACTED]" + else: + redacted[k] = AgentFlatContextStore._redact_sensitive(v) + return redacted + if isinstance(data, list): + return [AgentFlatContextStore._redact_sensitive(v) for v in data] + return data + + def _related_documents(self, context_type: str) -> list: + if context_type == "onboarding_step2_website_analysis": + return [ + {"type": "onboarding_step3_research_preferences", "path": self.STEP3_FILENAME, "relationship": "next_step"}, + {"type": "onboarding_step4_persona_data", "path": self.STEP4_FILENAME, "relationship": "future_dependency"}, + {"type": "onboarding_step5_integrations", "path": self.STEP5_FILENAME, "relationship": "future_dependency"}, + ] + if context_type == "onboarding_step3_research_preferences": + return [ + {"type": "onboarding_step2_website_analysis", "path": self.STEP2_FILENAME, "relationship": "previous_step"}, + {"type": "onboarding_step4_persona_data", "path": self.STEP4_FILENAME, "relationship": "next_step"}, + {"type": "onboarding_step5_integrations", "path": self.STEP5_FILENAME, "relationship": "future_dependency"}, + ] + if context_type == "onboarding_step4_persona_data": + return [ + {"type": "onboarding_step3_research_preferences", "path": self.STEP3_FILENAME, "relationship": "previous_step"}, + {"type": "onboarding_step2_website_analysis", "path": self.STEP2_FILENAME, "relationship": "upstream_context"}, + {"type": "onboarding_step5_integrations", "path": self.STEP5_FILENAME, "relationship": "next_step"}, + ] + if context_type == "onboarding_step5_integrations": + return [ + {"type": "onboarding_step4_persona_data", "path": self.STEP4_FILENAME, "relationship": "previous_step"}, + {"type": "onboarding_step3_research_preferences", "path": self.STEP3_FILENAME, "relationship": "upstream_context"}, + ] + return [] + + def _build_document_context( + self, + *, + context_type: str, + source: str, + journey_stage: str, + fallback_order: list, + payload_size: int, + summary_size: int, + payload_within_budget: bool, + ) -> Dict[str, Any]: + total_size = payload_size + summary_size + return { + "audience": "ai_agents", + "purpose": "fast_context_retrieval", + "context_type": context_type, + "source": source, + "tenant": {"user_id_safe": self.safe_user_id, "isolation_scope": "workspace_user"}, + "journey": { + "stage": journey_stage, + "user_action": "onboarding", + "agent_expectation": "read_summary_first_then_expand", + }, + "retrieval_contract": { + "preferred": "flat_file", + "fallback_order": fallback_order, + }, + "context_window_guidance": { + "max_raw_bytes": self.DEFAULT_MAX_BYTES, + "total_bytes": total_size, + "raw_document_within_budget": payload_within_budget, + "agent_policy": "Use agent_summary first; open full data only for specialist tasks", + }, + "related_documents": self._related_documents(context_type), + } + + def _build_step2_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]: + seo_audit = payload.get("seo_audit") if isinstance(payload.get("seo_audit"), dict) else {} + brand = payload.get("brand_analysis") if isinstance(payload.get("brand_analysis"), dict) else {} + rec_settings = payload.get("recommended_settings") if isinstance(payload.get("recommended_settings"), dict) else {} + target_audience = payload.get("target_audience") if isinstance(payload.get("target_audience"), dict) else {} + social = payload.get("social_media_presence") if isinstance(payload.get("social_media_presence"), dict) else {} + + technical_issues = self._to_context_list(seo_audit.get("technical_issues")) + recommendations = self._to_context_list(seo_audit.get("recommendations")) + + quick_facts = { + "website_url": payload.get("website_url") or "", + "brand_voice": brand.get("brand_voice") or "", + "industry": brand.get("industry") or "", + "target_segment": target_audience.get("primary_audience") or target_audience.get("audience_type") or "", + "writing_tone": rec_settings.get("writing_tone") or "", + "primary_content_type": (payload.get("content_type") or {}).get("primary_type") if isinstance(payload.get("content_type"), dict) else "", + "social_platforms": sorted(list(social.keys())), + "seo_issue_count": len(technical_issues), + "seo_recommendation_count": len(recommendations), + } + + return { + "quick_facts": quick_facts, + "retrieval_hints": { + "high_signal_terms": [ + term + for term in [ + quick_facts.get("brand_voice"), + quick_facts.get("industry"), + quick_facts.get("writing_tone"), + quick_facts.get("primary_content_type"), + ] + if term + ], + "agent_queries": [ + "brand voice guidelines", + "website style patterns", + "seo technical issues", + "content strategy opportunities", + "target audience profile", + ], + }, + "profile": { + "writing_style": payload.get("writing_style") or {}, + "style_patterns": payload.get("style_patterns") or {}, + "style_guidelines": payload.get("style_guidelines") or {}, + "recommended_settings": rec_settings, + "target_audience": target_audience, + }, + "seo_focus": { + "technical_issues": technical_issues, + "recommendations": recommendations, + }, + } + + def _build_step3_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]: + competitors = payload.get("competitors") if isinstance(payload.get("competitors"), list) else [] + domains = [] + for comp in competitors[:20]: + if isinstance(comp, dict): + dom = comp.get("domain") or comp.get("url") + if dom: + domains.append(str(dom)) + + research_depth = payload.get("research_depth") or "" + content_types = payload.get("content_types") if isinstance(payload.get("content_types"), list) else [] + industry_context = self._truncate_text(payload.get("industry_context") or payload.get("industryContext") or "", 500) + + return { + "quick_facts": { + "research_depth": research_depth, + "content_types": content_types, + "auto_research": bool(payload.get("auto_research", True)), + "factual_content": bool(payload.get("factual_content", True)), + "competitor_count": len(competitors), + }, + "retrieval_hints": { + "high_signal_terms": [research_depth, *content_types[:5]], + "agent_queries": [ + "competitor landscape summary", + "content opportunities by competitor", + "research depth preferences", + "factual content constraints", + ], + }, + "competitor_focus": { + "top_competitor_domains": domains[:10], + "industry_context": industry_context, + }, + } + + def _build_step4_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]: + core_persona = payload.get("core_persona") if isinstance(payload.get("core_persona"), dict) else {} + platform_personas = payload.get("platform_personas") if isinstance(payload.get("platform_personas"), dict) else {} + quality_metrics = payload.get("quality_metrics") if isinstance(payload.get("quality_metrics"), dict) else {} + selected_platforms = payload.get("selected_platforms") if isinstance(payload.get("selected_platforms"), list) else [] + + persona_name = core_persona.get("name") or core_persona.get("persona_name") or "" + primary_goal = self._truncate_text(core_persona.get("primary_goal") or core_persona.get("goal") or "", 250) + + return { + "quick_facts": { + "persona_name": persona_name, + "selected_platforms": selected_platforms, + "platform_persona_count": len(platform_personas.keys()) if isinstance(platform_personas, dict) else 0, + "has_research_persona": bool(payload.get("research_persona")), + }, + "retrieval_hints": { + "high_signal_terms": [persona_name, *selected_platforms[:5]], + "agent_queries": [ + "core persona profile", + "platform persona adaptations", + "persona quality metrics", + "research persona defaults", + ], + }, + "persona_focus": { + "primary_goal": primary_goal, + "core_persona": core_persona, + "quality_metrics": quality_metrics, + }, + } + + def _build_step5_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]: + integrations = payload.get("integrations") if isinstance(payload.get("integrations"), dict) else {} + providers = payload.get("providers") if isinstance(payload.get("providers"), list) else [] + connected = [k for k, v in integrations.items() if bool(v)] + notes = self._truncate_text(payload.get("notes") or payload.get("integration_notes") or "", 300) + + return { + "quick_facts": { + "connected_integrations_count": len(connected), + "connected_integrations": connected[:20], + "providers_count": len(providers), + }, + "retrieval_hints": { + "high_signal_terms": connected[:5], + "agent_queries": [ + "integration readiness", + "connected providers summary", + "missing integration dependencies", + ], + }, + "integration_focus": { + "notes": notes, + "integrations": integrations, + }, + } + + def _shrink_payload_if_needed(self, payload: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """Keep payload under budget by trimming heavy optional sections first.""" + payload = self._redact_sensitive(payload if isinstance(payload, dict) else {}) + original_size = self._estimate_size_bytes(payload) + trim_info = {"trimmed": False, "original_size_bytes": original_size, "trimmed_fields": []} + + if original_size <= self.DEFAULT_MAX_BYTES: + return payload, trim_info + + candidates = [ + "raw_step2_payload", + "raw_analysis_payload", + "source_payload", + "crawl_result", + "competitors", + "strategic_insights_history", + "seo_audit", + ] + + mutable = dict(payload) + for field in candidates: + if self._estimate_size_bytes(mutable) <= self.DEFAULT_MAX_BYTES: + break + if field in mutable: + value = mutable.get(field) + if field == "competitors" and isinstance(value, list): + mutable[field] = value[:20] + elif isinstance(value, (dict, list)): + mutable[field] = {"omitted": True, "reason": "size_budget", "original_type": type(value).__name__} + elif isinstance(value, str): + mutable[field] = self._truncate_text(value, 500) + else: + mutable[field] = "[OMITTED:size_budget]" + trim_info["trimmed_fields"].append(field) + + trim_info["trimmed"] = self._estimate_size_bytes(mutable) < original_size + trim_info["final_size_bytes"] = self._estimate_size_bytes(mutable) + return mutable, trim_info + + def _atomic_write_json(self, target_file: Path, data: Dict[str, Any]) -> None: + target_file.parent.mkdir(parents=True, exist_ok=True) + fd, tmp_path = tempfile.mkstemp(dir=str(target_file.parent), prefix=f".{target_file.name}.", suffix=".tmp") + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, separators=(",", ":")) + f.flush() + os.fsync(f.fileno()) + os.replace(tmp_path, target_file) + try: + os.chmod(target_file, 0o600) + except Exception: + pass + except Exception: + try: + os.unlink(tmp_path) + except Exception: + pass + raise + + def _update_manifest(self, context_type: str, filename: str, doc: Dict[str, Any]) -> None: + manifest_file = self._context_file(self.MANIFEST_FILENAME) + existing = {} + if manifest_file.exists(): + try: + with open(manifest_file, "r", encoding="utf-8") as f: + existing = json.load(f) or {} + except Exception: + existing = {} + + items = existing.get("documents") if isinstance(existing.get("documents"), list) else [] + items = [i for i in items if not (isinstance(i, dict) and i.get("type") == context_type)] + items.append( + { + "type": context_type, + "path": filename, + "updated_at": doc.get("updated_at"), + "size_bytes": (doc.get("meta") or {}).get("data_size_bytes", 0) + (doc.get("meta") or {}).get("summary_size_bytes", 0), + "related_documents": (doc.get("document_context") or {}).get("related_documents", []), + } + ) + + manifest = { + "schema_version": self.SCHEMA_VERSION, + "user_id": str(self.user_id), + "updated_at": datetime.utcnow().isoformat(), + "documents": items, + } + self._atomic_write_json(manifest_file, manifest) + + def _save_context_document( + self, + *, + filename: str, + context_type: str, + payload: Dict[str, Any], + summary: Dict[str, Any], + source: str, + journey_stage: str, + ) -> bool: + try: + target_file = self._context_file(filename) + payload = payload if isinstance(payload, dict) else {} + summary = summary if isinstance(summary, dict) else {} + + compact_payload, trim_info = self._shrink_payload_if_needed(payload) + payload_size = self._estimate_size_bytes(compact_payload) + summary_size = self._estimate_size_bytes(summary) + + context_doc = { + "schema_version": self.SCHEMA_VERSION, + "context_type": context_type, + "user_id": str(self.user_id), + "updated_at": datetime.utcnow().isoformat(), + "source": source, + "document_context": self._build_document_context( + context_type=context_type, + source=source, + journey_stage=journey_stage, + fallback_order=["flat_file", "database", "sif_semantic"], + payload_size=payload_size, + summary_size=summary_size, + payload_within_budget=payload_size <= self.DEFAULT_MAX_BYTES, + ), + "data": compact_payload, + "agent_summary": summary, + "meta": { + "data_size_bytes": payload_size, + "summary_size_bytes": summary_size, + "trim": trim_info, + }, + } + + self._atomic_write_json(target_file, context_doc) + self._update_manifest(context_type, filename, context_doc) + return True + except Exception as exc: + logger.error(f"Failed to save context for user {self.user_id} ({context_type}): {exc}") + return False + + def save_step2_website_analysis(self, payload: Dict[str, Any], *, source: str = "onboarding_step2") -> bool: + return self._save_context_document( + filename=self.STEP2_FILENAME, + context_type="onboarding_step2_website_analysis", + payload=payload, + summary=self._build_step2_summary(payload if isinstance(payload, dict) else {}), + source=source, + journey_stage="onboarding_step_2", + ) + + def save_step3_research_preferences(self, payload: Dict[str, Any], *, source: str = "onboarding_step3") -> bool: + return self._save_context_document( + filename=self.STEP3_FILENAME, + context_type="onboarding_step3_research_preferences", + payload=payload, + summary=self._build_step3_summary(payload if isinstance(payload, dict) else {}), + source=source, + journey_stage="onboarding_step_3", + ) + + def save_step4_persona_data(self, payload: Dict[str, Any], *, source: str = "onboarding_step4") -> bool: + return self._save_context_document( + filename=self.STEP4_FILENAME, + context_type="onboarding_step4_persona_data", + payload=payload, + summary=self._build_step4_summary(payload if isinstance(payload, dict) else {}), + source=source, + journey_stage="onboarding_step_4", + ) + + def save_step5_integrations(self, payload: Dict[str, Any], *, source: str = "onboarding_step5") -> bool: + return self._save_context_document( + filename=self.STEP5_FILENAME, + context_type="onboarding_step5_integrations", + payload=payload, + summary=self._build_step5_summary(payload if isinstance(payload, dict) else {}), + source=source, + journey_stage="onboarding_step_5", + ) + + def _load_context_document(self, filename: str) -> Optional[Dict[str, Any]]: + try: + target_file = self._context_file(filename) + if not target_file.exists(): + return None + with open(target_file, "r", encoding="utf-8") as f: + doc = json.load(f) + if isinstance(doc, dict) and str(doc.get("user_id")) != str(self.user_id): + logger.warning(f"Context user mismatch for {filename} (expected {self.user_id})") + return None + return doc if isinstance(doc, dict) else None + except Exception as exc: + logger.warning(f"Failed to load context document for user {self.user_id} ({filename}): {exc}") + return None + + def load_context_manifest(self) -> Optional[Dict[str, Any]]: + return self._load_context_document(self.MANIFEST_FILENAME) + + def load_step2_context_document(self) -> Optional[Dict[str, Any]]: + return self._load_context_document(self.STEP2_FILENAME) + + def load_step2_website_analysis(self) -> Optional[Dict[str, Any]]: + doc = self.load_step2_context_document() + return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None + + def load_step3_context_document(self) -> Optional[Dict[str, Any]]: + return self._load_context_document(self.STEP3_FILENAME) + + def load_step3_research_preferences(self) -> Optional[Dict[str, Any]]: + doc = self.load_step3_context_document() + return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None + + def load_step4_context_document(self) -> Optional[Dict[str, Any]]: + return self._load_context_document(self.STEP4_FILENAME) + + def load_step4_persona_data(self) -> Optional[Dict[str, Any]]: + doc = self.load_step4_context_document() + return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None + + def load_step5_context_document(self) -> Optional[Dict[str, Any]]: + return self._load_context_document(self.STEP5_FILENAME) + + def load_step5_integrations(self) -> Optional[Dict[str, Any]]: + doc = self.load_step5_context_document() + return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None diff --git a/backend/services/intelligence/sif_agents.py b/backend/services/intelligence/sif_agents.py index a81bd0cf..3a66930e 100644 --- a/backend/services/intelligence/sif_agents.py +++ b/backend/services/intelligence/sif_agents.py @@ -15,6 +15,7 @@ from .txtai_service import TxtaiIntelligenceService, TXTAI_AVAILABLE from services.intelligence.agents.core_agent_framework import BaseALwrityAgent from services.llm_providers.main_text_generation import llm_text_gen +from services.intelligence.agent_flat_context import AgentFlatContextStore # Optional txtai imports (align with core agent framework) try: @@ -170,8 +171,8 @@ async def _ensure_intelligence_ready(self) -> bool: def _create_txtai_agent(self): """ - SIF agents primarily use the intelligence service directly, but we can expose - capabilities via a standard agent interface if available. + Expose a txtai Agent interface with flat-file context tools. + Tools are scoped to the current user workspace via AgentFlatContextStore. """ if not TXTAI_AVAILABLE or Agent is None: raise RuntimeError(f"[{self.__class__.__name__}] txtai Agent not available") @@ -180,11 +181,103 @@ def _create_txtai_agent(self): _llm_for_agent = self.llm for _ in range(3): _llm_for_agent = getattr(_llm_for_agent, "llm", _llm_for_agent) - return Agent(llm=_llm_for_agent, tools=[]) + + return Agent( + llm=_llm_for_agent, + tools=[ + { + "name": "flat_context_manifest", + "description": "Returns manifest of available onboarding flat-context documents for this user", + "target": self._tool_flat_context_manifest, + }, + { + "name": "flat_context_read", + "description": "Read a flat-context document by logical name: step2|step3|step4|step5|manifest", + "target": self._tool_flat_context_read, + }, + { + "name": "flat_context_write_note", + "description": "Write lightweight agent notes/updates to a specific flat-context document", + "target": self._tool_flat_context_write_note, + }, + ], + ) except Exception as e: logger.error(f"[{self.__class__.__name__}] Failed to create txtai Agent: {e}") raise + def _tool_flat_context_manifest(self, context: Dict[str, Any]) -> Dict[str, Any]: + """Tool: list available flat-context docs and links.""" + try: + store = AgentFlatContextStore(self.user_id) + manifest = store.load_context_manifest() or {"documents": []} + return {"ok": True, "manifest": manifest} + except Exception as e: + return {"ok": False, "error": str(e)} + + def _tool_flat_context_read(self, context: Dict[str, Any]) -> Dict[str, Any]: + """Tool: read one user-scoped context doc.""" + try: + key = str((context or {}).get("document") or "").strip().lower() + store = AgentFlatContextStore(self.user_id) + mapping = { + "step2": store.load_step2_context_document, + "step3": store.load_step3_context_document, + "step4": store.load_step4_context_document, + "step5": store.load_step5_context_document, + "manifest": store.load_context_manifest, + } + if key not in mapping: + return {"ok": False, "error": "Invalid document. Use step2|step3|step4|step5|manifest"} + data = mapping[key]() + return {"ok": True, "document": key, "data": data or {}} + except Exception as e: + return {"ok": False, "error": str(e)} + + def _tool_flat_context_write_note(self, context: Dict[str, Any]) -> Dict[str, Any]: + """Tool: append agent note/update to step context by re-saving payload.""" + try: + key = str((context or {}).get("document") or "").strip().lower() + note = str((context or {}).get("note") or "").strip() + if not note: + return {"ok": False, "error": "note is required"} + + store = AgentFlatContextStore(self.user_id) + if key == "step2": + doc = store.load_step2_context_document() or {} + payload = doc.get("data") if isinstance(doc.get("data"), dict) else {} + notes = payload.get("agent_notes") if isinstance(payload.get("agent_notes"), list) else [] + notes.append({"note": note, "agent": self.agent_type, "ts": datetime.utcnow().isoformat()}) + payload["agent_notes"] = notes[-50:] + ok = store.save_step2_website_analysis(payload, source="agent_note") + elif key == "step3": + doc = store.load_step3_context_document() or {} + payload = doc.get("data") if isinstance(doc.get("data"), dict) else {} + notes = payload.get("agent_notes") if isinstance(payload.get("agent_notes"), list) else [] + notes.append({"note": note, "agent": self.agent_type, "ts": datetime.utcnow().isoformat()}) + payload["agent_notes"] = notes[-50:] + ok = store.save_step3_research_preferences(payload, source="agent_note") + elif key == "step4": + doc = store.load_step4_context_document() or {} + payload = doc.get("data") if isinstance(doc.get("data"), dict) else {} + notes = payload.get("agent_notes") if isinstance(payload.get("agent_notes"), list) else [] + notes.append({"note": note, "agent": self.agent_type, "ts": datetime.utcnow().isoformat()}) + payload["agent_notes"] = notes[-50:] + ok = store.save_step4_persona_data(payload, source="agent_note") + elif key == "step5": + doc = store.load_step5_context_document() or {} + payload = doc.get("data") if isinstance(doc.get("data"), dict) else {} + notes = payload.get("agent_notes") if isinstance(payload.get("agent_notes"), list) else [] + notes.append({"note": note, "agent": self.agent_type, "ts": datetime.utcnow().isoformat()}) + payload["agent_notes"] = notes[-50:] + ok = store.save_step5_integrations(payload, source="agent_note") + else: + return {"ok": False, "error": "Invalid document. Use step2|step3|step4|step5"} + + return {"ok": bool(ok), "document": key} + except Exception as e: + return {"ok": False, "error": str(e)} + class StrategyArchitectAgent(SIFBaseAgent): """Agent for discovering content pillars and identifying strategic gaps.""" @@ -686,7 +779,25 @@ async def style_enforcer(self, text: str, style_guidelines: Optional[Dict[str, A if not text: return {"compliance_score": 0.0, "issues": ["No text provided"]} - # 1. Fetch Style Guidelines from SIF if not provided + guidelines_source = "provided" if style_guidelines else "none" + + # 1. Fetch Style Guidelines from flat-file context first, then SIF fallback + if not style_guidelines: + try: + flat_doc = AgentFlatContextStore(self.user_id).load_step2_context_document() + flat_data = (flat_doc or {}).get("data") if isinstance(flat_doc, dict) else None + if isinstance(flat_data, dict): + style_guidelines = { + "tone": (flat_data.get("brand_analysis") or {}).get("brand_voice", "neutral"), + "style_patterns": flat_data.get("style_patterns", {}), + "writing_style": flat_data.get("writing_style", {}), + "style_guidelines": flat_data.get("style_guidelines", {}), + } + guidelines_source = "flat_file" + logger.info(f"[{self.__class__.__name__}] Retrieved style guidelines from flat context") + except Exception as e: + logger.warning(f"[{self.__class__.__name__}] Failed to retrieve style guidelines from flat context: {e}") + if not style_guidelines and self.sif_service: try: # Search for website analysis to get brand voice/style @@ -697,7 +808,7 @@ async def style_enforcer(self, text: str, style_guidelines: Optional[Dict[str, A res = results[0] metadata_str = res.get('object') metadata = json.loads(metadata_str) if isinstance(metadata_str, str) else (metadata_str or res) - + if metadata.get('type') == 'website_analysis': report = metadata.get('full_report', {}) style_guidelines = { @@ -705,6 +816,7 @@ async def style_enforcer(self, text: str, style_guidelines: Optional[Dict[str, A "style_patterns": report.get('style_patterns', {}), "writing_style": report.get('writing_style', {}) } + guidelines_source = "sif_index" logger.info(f"[{self.__class__.__name__}] Retrieved style guidelines from SIF: {style_guidelines.get('tone')}") except Exception as e: logger.warning(f"[{self.__class__.__name__}] Failed to retrieve style guidelines from SIF: {e}") @@ -735,7 +847,7 @@ async def style_enforcer(self, text: str, style_guidelines: Optional[Dict[str, A "compliance_score": max(0.0, score), "issues": issues, "is_compliant": score > 0.8, - "guidelines_source": "sif_index" if not style_guidelines and self.sif_service else "provided" + "guidelines_source": guidelines_source } except Exception as e: diff --git a/backend/services/intelligence/sif_integration.py b/backend/services/intelligence/sif_integration.py index 110c41aa..7f25d407 100644 --- a/backend/services/intelligence/sif_integration.py +++ b/backend/services/intelligence/sif_integration.py @@ -13,12 +13,13 @@ import json from services.database import get_session_for_user -from models.onboarding import WebsiteAnalysis, OnboardingSession, CompetitorAnalysis +from models.onboarding import WebsiteAnalysis, OnboardingSession, CompetitorAnalysis, ResearchPreferences, PersonaData # Import existing SIF components from .txtai_service import TxtaiIntelligenceService from .semantic_cache import semantic_cache_manager, SemanticCacheStats from services.intelligence.harvester import SemanticHarvesterService +from services.intelligence.agent_flat_context import AgentFlatContextStore class SIFIntegrationService: @@ -61,6 +62,284 @@ def get_trend_surfer_agent(self): ) return self.trend_surfer_agent + + async def get_step2_website_context(self) -> Dict[str, Any]: + """ + Retrieve onboarding step 2 website context with a strict fallback chain: + flat file -> database -> SIF semantic index. + """ + # 1) Fastest: flat-file agent context + try: + flat_doc = AgentFlatContextStore(self.user_id).load_step2_context_document() + if flat_doc: + return { + "source": "flat_file", + "data": flat_doc.get("data") or {}, + "agent_summary": flat_doc.get("agent_summary") or {}, + "document_context": flat_doc.get("document_context") or {}, + "meta": flat_doc.get("meta") or {}, + "updated_at": flat_doc.get("updated_at"), + } + except Exception as e: + logger.warning(f"Flat context lookup failed for user {self.user_id}: {e}") + + # 2) Database fallback + db = None + try: + db = get_session_for_user(self.user_id) + if db: + stmt = ( + select(WebsiteAnalysis) + .join(OnboardingSession, WebsiteAnalysis.session_id == OnboardingSession.id) + .where(OnboardingSession.user_id == self.user_id) + .order_by(desc(WebsiteAnalysis.updated_at)) + ) + row = db.execute(stmt).scalars().first() + if row: + payload = row.to_dict() if hasattr(row, "to_dict") else {} + return { + "source": "database", + "data": payload, + "agent_summary": { + "quick_facts": { + "website_url": payload.get("website_url"), + "brand_voice": (payload.get("brand_analysis") or {}).get("brand_voice") if isinstance(payload.get("brand_analysis"), dict) else "", + } + }, + } + except Exception as e: + logger.warning(f"Database fallback failed for user {self.user_id}: {e}") + finally: + if db: + db.close() + + # 3) Semantic fallback + try: + results = await self.intelligence_service.search("website analysis brand voice style", limit=1) + if results: + top = results[0] + metadata = top.get("object") if isinstance(top, dict) else None + if isinstance(metadata, str): + try: + metadata = json.loads(metadata) + except Exception: + metadata = {} + if isinstance(metadata, dict): + report = metadata.get("full_report") if isinstance(metadata.get("full_report"), dict) else metadata + return { + "source": "sif_semantic", + "data": report, + "agent_summary": { + "quick_facts": { + "website_url": report.get("website_url") if isinstance(report, dict) else None, + } + }, + } + except Exception as e: + logger.warning(f"SIF semantic fallback failed for user {self.user_id}: {e}") + + return {"source": "none", "data": {}} + + async def get_step3_research_context(self) -> Dict[str, Any]: + """ + Retrieve onboarding step 3 research context with fallback chain: + flat file -> database -> SIF semantic index. + """ + try: + flat_doc = AgentFlatContextStore(self.user_id).load_step3_context_document() + if flat_doc: + return { + "source": "flat_file", + "data": flat_doc.get("data") or {}, + "agent_summary": flat_doc.get("agent_summary") or {}, + "document_context": flat_doc.get("document_context") or {}, + "meta": flat_doc.get("meta") or {}, + "updated_at": flat_doc.get("updated_at"), + } + except Exception as e: + logger.warning(f"Step 3 flat context lookup failed for user {self.user_id}: {e}") + + db = None + try: + db = get_session_for_user(self.user_id) + if db: + stmt = ( + select(ResearchPreferences) + .join(OnboardingSession, ResearchPreferences.session_id == OnboardingSession.id) + .where(OnboardingSession.user_id == self.user_id) + .order_by(desc(ResearchPreferences.updated_at)) + ) + prefs = db.execute(stmt).scalars().first() + if prefs: + payload = prefs.to_dict() if hasattr(prefs, "to_dict") else {} + return { + "source": "database", + "data": payload, + "agent_summary": { + "quick_facts": { + "research_depth": payload.get("research_depth"), + "content_types_count": len(payload.get("content_types") or []), + } + }, + } + except Exception as e: + logger.warning(f"Step 3 database fallback failed for user {self.user_id}: {e}") + finally: + if db: + db.close() + + try: + results = await self.intelligence_service.search("research preferences competitors onboarding step 3", limit=1) + if results: + top = results[0] + metadata = top.get("object") if isinstance(top, dict) else None + if isinstance(metadata, str): + try: + metadata = json.loads(metadata) + except Exception: + metadata = {} + report = metadata.get("full_report") if isinstance(metadata, dict) and isinstance(metadata.get("full_report"), dict) else (metadata if isinstance(metadata, dict) else {}) + return { + "source": "sif_semantic", + "data": report, + "agent_summary": { + "quick_facts": { + "research_depth": report.get("research_depth") if isinstance(report, dict) else None, + } + }, + } + except Exception as e: + logger.warning(f"Step 3 semantic fallback failed for user {self.user_id}: {e}") + + return {"source": "none", "data": {}} + + async def get_step4_persona_context(self) -> Dict[str, Any]: + """ + Retrieve onboarding step 4 persona context with fallback chain: + flat file -> database -> SIF semantic index. + """ + try: + flat_doc = AgentFlatContextStore(self.user_id).load_step4_context_document() + if flat_doc: + return { + "source": "flat_file", + "data": flat_doc.get("data") or {}, + "agent_summary": flat_doc.get("agent_summary") or {}, + "document_context": flat_doc.get("document_context") or {}, + "meta": flat_doc.get("meta") or {}, + "updated_at": flat_doc.get("updated_at"), + } + except Exception as e: + logger.warning(f"Step 4 flat context lookup failed for user {self.user_id}: {e}") + + db = None + try: + db = get_session_for_user(self.user_id) + if db: + stmt = ( + select(PersonaData) + .join(OnboardingSession, PersonaData.session_id == OnboardingSession.id) + .where(OnboardingSession.user_id == self.user_id) + .order_by(desc(PersonaData.updated_at)) + ) + persona = db.execute(stmt).scalars().first() + if persona: + payload = persona.to_dict() if hasattr(persona, "to_dict") else {} + return { + "source": "database", + "data": payload, + "agent_summary": { + "quick_facts": { + "selected_platforms_count": len(payload.get("selected_platforms") or []), + "has_core_persona": bool(payload.get("core_persona")), + } + }, + } + except Exception as e: + logger.warning(f"Step 4 database fallback failed for user {self.user_id}: {e}") + finally: + if db: + db.close() + + try: + results = await self.intelligence_service.search("persona platform personas onboarding step 4", limit=1) + if results: + top = results[0] + metadata = top.get("object") if isinstance(top, dict) else None + if isinstance(metadata, str): + try: + metadata = json.loads(metadata) + except Exception: + metadata = {} + report = metadata.get("full_report") if isinstance(metadata, dict) and isinstance(metadata.get("full_report"), dict) else (metadata if isinstance(metadata, dict) else {}) + return { + "source": "sif_semantic", + "data": report, + "agent_summary": { + "quick_facts": { + "has_core_persona": bool(report.get("core_persona")) if isinstance(report, dict) else False, + } + }, + } + except Exception as e: + logger.warning(f"Step 4 semantic fallback failed for user {self.user_id}: {e}") + + return {"source": "none", "data": {}} + + async def get_step5_integrations_context(self) -> Dict[str, Any]: + """ + Retrieve onboarding step 5 integrations context with fallback chain: + flat file -> SIF semantic index. + """ + try: + flat_doc = AgentFlatContextStore(self.user_id).load_step5_context_document() + if flat_doc: + return { + "source": "flat_file", + "data": flat_doc.get("data") or {}, + "agent_summary": flat_doc.get("agent_summary") or {}, + "document_context": flat_doc.get("document_context") or {}, + "meta": flat_doc.get("meta") or {}, + "updated_at": flat_doc.get("updated_at"), + } + except Exception as e: + logger.warning(f"Step 5 flat context lookup failed for user {self.user_id}: {e}") + + try: + results = await self.intelligence_service.search("integrations onboarding step 5 connected providers", limit=1) + if results: + top = results[0] + metadata = top.get("object") if isinstance(top, dict) else None + if isinstance(metadata, str): + try: + metadata = json.loads(metadata) + except Exception: + metadata = {} + report = metadata.get("full_report") if isinstance(metadata, dict) and isinstance(metadata.get("full_report"), dict) else (metadata if isinstance(metadata, dict) else {}) + return { + "source": "sif_semantic", + "data": report, + "agent_summary": { + "quick_facts": { + "connected_integrations_count": len((report.get("integrations") or {})) if isinstance(report, dict) and isinstance(report.get("integrations"), dict) else None, + } + }, + } + except Exception as e: + logger.warning(f"Step 5 semantic fallback failed for user {self.user_id}: {e}") + + return {"source": "none", "data": {}} + + async def get_flat_context_manifest(self) -> Dict[str, Any]: + """Return lightweight manifest of available flat context documents for this user.""" + try: + manifest = AgentFlatContextStore(self.user_id).load_context_manifest() + if manifest: + return {"source": "flat_file", "data": manifest} + except Exception as e: + logger.warning(f"Failed to load flat context manifest for user {self.user_id}: {e}") + return {"source": "none", "data": {"documents": []}} + async def index_market_trends_run(self, trends_result: Dict[str, Any], run_id: str) -> bool: try: latest_id = f"market_trends_latest:{self.user_id}" diff --git a/docs/SIF/SIF_AGENTS_TEAM_ARCHITECTURE.md b/docs/SIF/SIF_AGENTS_TEAM_ARCHITECTURE.md index a4e8fb75..2b37add9 100644 --- a/docs/SIF/SIF_AGENTS_TEAM_ARCHITECTURE.md +++ b/docs/SIF/SIF_AGENTS_TEAM_ARCHITECTURE.md @@ -189,3 +189,20 @@ All orchestration updates are emitted as typed records under a shared schema: * **Inter-Agent Chat**: Allow agents to debate strategy (e.g., SEO Agent vs. Creative Agent). * **Auto-Execution**: Allow agents to *perform* tasks (e.g., fix a broken link) with user approval. * **Voice Interface**: Daily standup meeting via voice. + + +## ⚡ Agent Fast-Context Layer (Onboarding Step 2) + +To reduce latency for repetitive agent reads, Step 2 website analysis is now persisted to a per-user flat file in workspace: + +- `workspace/workspace_/agent_context/step2_website_analysis.json` + +**Read order for agents:** +1. Flat-file context (agent-only, fastest) +2. Relational database (`website_analyses`) +3. SIF semantic index retrieval + +This preserves SIF intelligence workflows while giving agents deterministic, low-latency access to core onboarding context. +It also stores agent-optimized `quick_facts`, `retrieval_hints`, and full-fidelity raw payload blocks so both fast inference and deep-dive reasoning are supported. + +Reference design docs: `docs/flat_file_context/STEP2_FLAT_FILE_CONTEXT_DESIGN.md`, `docs/flat_file_context/STEP3_FLAT_FILE_CONTEXT_DESIGN.md`, `docs/flat_file_context/STEP4_FLAT_FILE_CONTEXT_DESIGN.md`, `docs/flat_file_context/STEP5_FLAT_FILE_CONTEXT_DESIGN.md`, `docs/flat_file_context/FLAT_FILE_CONTEXT_FRAMEWORK_DESIGN.md`, `docs/flat_file_context/FLAT_FILE_CONTEXT_SECURITY_AND_ISOLATION.md`, and `docs/flat_file_context/FLAT_FILE_CONTEXT_PROGRESS_AND_QUICK_WINS.md`. diff --git a/docs/flat_file_context/FLAT_FILE_CONTEXT_ENHANCEMENTS_BACKLOG.md b/docs/flat_file_context/FLAT_FILE_CONTEXT_ENHANCEMENTS_BACKLOG.md new file mode 100644 index 00000000..d3dfcaf7 --- /dev/null +++ b/docs/flat_file_context/FLAT_FILE_CONTEXT_ENHANCEMENTS_BACKLOG.md @@ -0,0 +1,69 @@ +# Flat File Context Enhancements Backlog + +This document tracks next-phase implementation items for the flat-file context framework. + +## 1) TTL/Refresh Hints + Freshness Policy +### Objective +Prevent stale agent decisions by adding explicit freshness semantics. + +### Proposed additions +- Add `m.ttl_s` (seconds) and `m.stale_after` (timestamp) to context envelope. +- Add `m.refresh_recommended` boolean. +- Define per-context defaults (Step 2 likely long TTL, but still bounded). + +### Acceptance criteria +- Reader utility can classify context as `fresh|stale|expired`. +- Fallback to DB/SIF triggered automatically when stale policy requires. + +--- + +## 2) Optional `.json.gz` Companion for Large Payloads +### Objective +Reduce disk footprint and IO for large context payloads. + +### Proposed additions +- Write primary `.json` always. +- If payload exceeds threshold (e.g., >256 KB), write `.json.gz` companion. +- Add pointer metadata (`m.gz=true`, `m.gz_path`). + +### Acceptance criteria +- Reader transparently supports JSON + GZIP variants. +- No regression for small payloads. + +--- + +## 3) Section Checksums for Drift Detection +### Objective +Detect inconsistencies between flat-file context and database state. + +### Proposed additions +- Add checksums per section (`d.brand`, `d.seo`, `d.audience`, etc.) under `m.chk`. +- Persist DB-row reference (`m.db_ref`) with latest row id/timestamp. +- Add `verify_drift()` utility. + +### Acceptance criteria +- Drift check can flag `in_sync|partial_drift|out_of_sync`. +- On drift, reader suggests refresh + fallback path. + +--- + +## 4) Extend Pattern to Step 3 and Step 4 +### Objective +Standardize agent context retrieval across onboarding steps. + +### Proposed additions +- `step3_research_context.json` +- `step4_persona_context.json` +- Shared envelope with step-specific `d/s` contracts. + +### Acceptance criteria +- Same fallback chain works for step-specific readers. +- SIF agents can consume common interface across Step 2/3/4. + +--- + +## Suggested implementation order +1. TTL/freshness +2. Checksums/drift detection +3. Step 3/4 expansion +4. Optional gzip optimization diff --git a/docs/flat_file_context/FLAT_FILE_CONTEXT_FRAMEWORK_DESIGN.md b/docs/flat_file_context/FLAT_FILE_CONTEXT_FRAMEWORK_DESIGN.md new file mode 100644 index 00000000..3eb75149 --- /dev/null +++ b/docs/flat_file_context/FLAT_FILE_CONTEXT_FRAMEWORK_DESIGN.md @@ -0,0 +1,140 @@ +# Flat File Context Framework Design (Agent-Optimized) + +## Purpose +Design a **compact, machine-first flat-file framework** for ALwrity AI agents. + +This framework is optimized for: +- deterministic structure, +- minimal token footprint, +- fast parsing, +- high-signal retrieval, +- robust fallback behavior. + +## Core Principles +1. **Agent-first, not human-first** + - Keys are short and stable. + - Avoid verbose prose in payloads. + - Include only fields needed for reasoning and tool actions. + +2. **Compact + predictable schema** + - Fixed top-level keys in strict order. + - Canonical value types (no shape drift). + - Avoid polymorphic fields when possible. + +3. **Dual-layer context** + - `d` (full normalized data for deep reasoning). + - `s` (summary/high-signal fast path for most agent reads). + +4. **Fallback-safe design** + - Every context doc includes source + freshness metadata. + - If missing/stale, consumers fall back to DB then SIF semantic. + +5. **Multi-tenant isolation** + - Per-user file under `workspace/workspace_/agent_context/`. + +--- + +## Canonical Context Envelope (compact) +```json +{ + "v": "1.0", + "t": "onboarding.step2.website_analysis", + "u": "", + "ts": "", + "src": "onboarding_step2", + "d": {}, + "s": {}, + "m": { + "db": 0, + "sb": 0, + "q": [] + } +} +``` + +### Field map +- `v`: schema version +- `t`: context type +- `u`: user id +- `ts`: updated timestamp +- `src`: source writer +- `d`: canonical normalized data +- `s`: high-signal summary for quick agent use +- `m`: meta (`db`=data bytes, `sb`=summary bytes, `q`=query hints) + +--- + +## Agent Readability Best Practices +- Prefer enums/controlled vocab over free text. +- Use compact keys and arrays for repetitive entities. +- Truncate long textual blobs unless explicitly required. +- Keep “quick facts” flattened. +- Separate operational metadata from semantic content. +- Include retrieval hints (`q`) for consistent query drafting. + +--- + +## Write Pipeline Pattern +1. Normalize incoming source payload. +2. Derive compact summary (`s`) from normalized data. +3. Compute lightweight metadata (`m`). +4. Atomic write JSON file. +5. Emit writer version + timestamp. + +## Read Pipeline Pattern +1. Attempt flat-file load. +2. Validate minimum envelope fields (`v,t,u,ts,d`). +3. Prefer `s` for quick tasks; use `d` for deeper reasoning. +4. If invalid/missing/stale: fallback DB -> SIF semantic. + +--- + +## Scope Expansion Pattern +Apply same envelope for: +- Step 2: website analysis +- Step 3: research preferences + competitor snapshots +- Step 4: persona profile + platform personas + +Only `t`, `d`, and `s` payload contracts should vary. + +--- + +## Governance +- Schema changes require version bump (`v`). +- Backward compatibility policy: readers support N and N-1. +- Drift checks should compare canonical hash/checksum vs DB latest row. + + +## Document Context + End-User Journey Metadata +Each context file should carry explicit machine-oriented document metadata so agents understand *what this file is* before reading full payloads. + +Suggested `document_context` fields: +- `audience`: `ai_agents` +- `purpose`: `fast_context_retrieval` +- `context_type`: step-scoped type identifier +- `journey`: stage/action/agent expectation +- `retrieval_contract`: preferred source + fallback order +- `context_window_guidance`: byte budget and summary-first policy + +This block is intentionally compact and deterministic to reduce wasted token usage for agent planning. + +## Context Window and Length Policy +- Keep combined `data + summary` under a defined byte budget where practical. +- Enforce summary-first reads in agent consumers. +- Truncate long textual fields in summaries; keep full text only in `data` when needed. +- Flag oversize docs in metadata so readers can skip low-priority sections. +- Prefer short, stable keys in machine envelopes and avoid natural-language verbosity. + + +## Implemented baseline controls +- Atomic file writes to avoid partial documents. +- Best-effort restricted file permissions (`0600`). +- Recursive sensitive-key redaction for payload snapshots. +- Payload size budget enforcement with deterministic trimming metadata. +- Internal document linking via `related_documents` and manifest index. + + +Security and isolation details: `docs/flat_file_context/FLAT_FILE_CONTEXT_SECURITY_AND_ISOLATION.md` + + +Step docs: `docs/flat_file_context/STEP2_FLAT_FILE_CONTEXT_DESIGN.md`, `docs/flat_file_context/STEP3_FLAT_FILE_CONTEXT_DESIGN.md`, `docs/flat_file_context/STEP4_FLAT_FILE_CONTEXT_DESIGN.md`, `docs/flat_file_context/STEP5_FLAT_FILE_CONTEXT_DESIGN.md` diff --git a/docs/flat_file_context/FLAT_FILE_CONTEXT_PROGRESS_AND_QUICK_WINS.md b/docs/flat_file_context/FLAT_FILE_CONTEXT_PROGRESS_AND_QUICK_WINS.md new file mode 100644 index 00000000..2438bf39 --- /dev/null +++ b/docs/flat_file_context/FLAT_FILE_CONTEXT_PROGRESS_AND_QUICK_WINS.md @@ -0,0 +1,26 @@ +# Flat File Context Progress Review and Quick Wins + +## Progress so far +- Step 2 context: implemented (website analysis fast path + fallback). +- Step 3 context: implemented (research preferences + competitors fast path + fallback). +- Step 4 context: implemented (persona data fast path + fallback). +- Step 5 context: implemented (integrations fast path + fallback). +- Security baseline: user isolation checks, redaction, atomic writes, file-permission hardening. +- Size governance: payload budget + deterministic trimming + trim metadata. +- Internal linking: related-document links + manifest index. + +## Quick-win improvements (next 1-2 sprints) +1. Add explicit TTL/staleness fields and auto-refresh hints per step. +2. Add lightweight checksums per section to detect DB drift quickly. +3. Add optional `.json.gz` companion for oversized archives. +4. Add shared reader utility for summary-first + selective field loading. +5. Add minimal unit tests for: + - redaction + - trimming behavior + - manifest linking + - cross-user load rejection +6. Add agent telemetry: record which sections are actually read to optimize summaries. + + +## Newly added agent tooling +- txtai agent tools for flat-file context manifest/read/write-note operations were added to SIF base agent to support file operations in agent workflows. diff --git a/docs/flat_file_context/FLAT_FILE_CONTEXT_SECURITY_AND_ISOLATION.md b/docs/flat_file_context/FLAT_FILE_CONTEXT_SECURITY_AND_ISOLATION.md new file mode 100644 index 00000000..a74255f5 --- /dev/null +++ b/docs/flat_file_context/FLAT_FILE_CONTEXT_SECURITY_AND_ISOLATION.md @@ -0,0 +1,39 @@ +# Flat File Context Security, Isolation, and Size Controls + +## Objective +Provide minimal but practical security for agent flat-file context with strong end-user isolation and bounded document growth. + +## Isolation model +- Per-user namespace: `workspace/workspace_/agent_context/` +- Sanitized user IDs only (`[a-zA-Z0-9_-]`) to prevent path traversal. +- Reader-side user check: loaded document `user_id` must match requesting user context. + +## Minimal security controls implemented +1. **Atomic writes** + - Context files are written via temporary file + `os.replace`. + - Prevents partial/corrupt files under concurrent writes. +2. **File permissions** + - Context files are best-effort set to `0600`. +3. **Sensitive key redaction** + - Recursive redaction for key patterns like `api_key`, `token`, `secret`, `password`, `authorization`, `cookie`. +4. **Manifest index** + - `context_manifest.json` gives agents a controlled map of available docs and relationships. + +## Size and context-window controls +- Byte budget for raw document payloads (`DEFAULT_MAX_BYTES`). +- If oversize, low-priority/heavy sections are trimmed first (`raw_*`, large snapshots, heavy arrays). +- Trim metadata is preserved under `meta.trim` for traceability. +- Agent policy remains summary-first (`agent_summary` before `data`). + +## Internal document linking +- Each context file includes `document_context.related_documents`. +- Manifest includes per-document `related_documents` links. +- This enables agents to: + 1. read one document, + 2. discover related context files, + 3. fetch only relevant next documents. + +## Recommended next steps +- Add optional file-level signatures/HMAC for tamper evidence. +- Add checksum per section to detect DB drift. +- Add staleness policy (`ttl_s`, `stale_after`) and auto-refresh triggers. diff --git a/docs/flat_file_context/STEP2_FLAT_FILE_CONTEXT_DESIGN.md b/docs/flat_file_context/STEP2_FLAT_FILE_CONTEXT_DESIGN.md new file mode 100644 index 00000000..823bb21c --- /dev/null +++ b/docs/flat_file_context/STEP2_FLAT_FILE_CONTEXT_DESIGN.md @@ -0,0 +1,54 @@ +# Step 2 Flat File Context Design (Website Analysis) + +## Intent +Step 2 context must be optimized for **AI-agent retrieval speed and token efficiency**, not human readability. + +## Current storage location +- `workspace/workspace_/agent_context/step2_website_analysis.json` + +## Current retrieval chain +1. Flat file (fastest) +2. DB (`website_analyses`) +3. SIF semantic fallback + +## Compactness strategy +For implementation, keep two logical layers: +- **`d` equivalent (full canonical data)** for deep reasoning. +- **`s` equivalent (high-signal summary)** for fast agent prompts and most decisions. +- **`document_context`** for machine-readable orientation (purpose, journey stage, fallback contract, context-window guidance). + +Agents should default to summary-first reads and only open full data when needed. + +## Step 2 coverage requirements +The Step 2 context should preserve these semantic groups: +- identity/state: website url, timestamps, status/error/warning +- brand/style: writing style, style patterns/guidelines, brand analysis +- audience/content: target audience, content type, recommended settings, characteristics +- strategy/seo: strategy insights, SEO audit, strategic history +- crawl/discovery: crawl output, meta info, sitemap analysis +- traceability: raw inbound payload snapshots + +## Agent-readability best practices +- Keep keys stable and deterministic. +- Prefer arrays/enums over long free text. +- Keep summary fields flattened and high signal. +- Avoid duplicate verbose nested structures unless required for correctness. +- Include retrieval hints for consistent downstream querying. + +## Practical guidance for consumers +- Use summary/high-signal fields first for routing and lightweight reasoning. +- Pull deep fields only for specialist tasks (SEO, persona fidelity, editorial style checks). +- If flat-file missing/stale: auto-fallback to DB then SIF. + +## Note +A generalized compact framework is documented in: +- `docs/flat_file_context/FLAT_FILE_CONTEXT_FRAMEWORK_DESIGN.md` + +Future enhancements are tracked in: +- `docs/flat_file_context/FLAT_FILE_CONTEXT_ENHANCEMENTS_BACKLOG.md` + + +## Context window guidance +- Keep summary compact and deterministic. +- Add byte-size metadata to help agents decide whether to expand into full data. +- Prefer short keys and avoid verbose natural language in machine envelopes. diff --git a/docs/flat_file_context/STEP3_FLAT_FILE_CONTEXT_DESIGN.md b/docs/flat_file_context/STEP3_FLAT_FILE_CONTEXT_DESIGN.md new file mode 100644 index 00000000..9e5b5fee --- /dev/null +++ b/docs/flat_file_context/STEP3_FLAT_FILE_CONTEXT_DESIGN.md @@ -0,0 +1,39 @@ +# Step 3 Flat File Context Design (Research Preferences + Competitors) + +## Intent +Provide agent-ready Step 3 context with compact summaries for routing plus full payload for deep analysis. + +## Storage location +- `workspace/workspace_/agent_context/step3_research_preferences.json` + +## Why this matters for agents +Step 3 is the bridge from website understanding (Step 2) to competitive strategy and research execution. Agents need this file to understand: +- depth and quality preference constraints, +- factuality constraints, +- content-type priorities, +- competitor landscape and industry context. + +## Document-context block +Every context file should include machine-readable document metadata to orient agents quickly: +- audience (`ai_agents`) +- purpose (`fast_context_retrieval`) +- journey stage (`onboarding_step_3`) +- retrieval contract and fallback order +- context-window guidance (size budget + summary-first policy) + +## Minimal Step 3 data groups +- research config: depth/content types/auto/factual +- inherited style profile (if present): writing style, target audience, recommended settings +- competitors: domain/url/title/relevance highlights +- industry context: compact market framing text +- traceability: source payload and timestamps + +## Agent usage policy +1. Start with `agent_summary.quick_facts` and `retrieval_hints`. +2. Use competitor summary before opening full competitor objects. +3. Read full `data` only for tasks requiring strict evidence/fields. +4. Fall back to DB, then SIF semantic if missing or stale. + + +## Related-document navigation +Agents can consult `context_manifest.json` to discover linked context files and traverse only the required documents for the task. diff --git a/docs/flat_file_context/STEP4_FLAT_FILE_CONTEXT_DESIGN.md b/docs/flat_file_context/STEP4_FLAT_FILE_CONTEXT_DESIGN.md new file mode 100644 index 00000000..32fad458 --- /dev/null +++ b/docs/flat_file_context/STEP4_FLAT_FILE_CONTEXT_DESIGN.md @@ -0,0 +1,25 @@ +# Step 4 Flat File Context Design (Persona Data) + +## Intent +Capture onboarding Step 4 persona outputs in an agent-first flat file so agents can quickly personalize strategy, content, and platform execution. + +## Storage location +- `workspace/workspace_/agent_context/step4_persona_data.json` + +## Required Step 4 coverage +- core persona profile (`core_persona`) +- platform personas (`platform_personas`) +- quality metrics (`quality_metrics`) +- selected platforms (`selected_platforms`) +- research persona/notes when available +- source payload + timestamps for traceability + +## Agent summary expectations +- quick facts: selected platform count, persona availability flags +- retrieval hints: persona/profile adaptation queries +- persona focus: compact actionable slice of core persona + quality constraints + +## Usage policy +1. Start with `agent_summary`. +2. Expand into `data` only when a task needs full fidelity. +3. Use `document_context.related_documents` to fetch upstream Step 2/Step 3 context as needed. diff --git a/docs/flat_file_context/STEP5_FLAT_FILE_CONTEXT_DESIGN.md b/docs/flat_file_context/STEP5_FLAT_FILE_CONTEXT_DESIGN.md new file mode 100644 index 00000000..b1edf753 --- /dev/null +++ b/docs/flat_file_context/STEP5_FLAT_FILE_CONTEXT_DESIGN.md @@ -0,0 +1,22 @@ +# Step 5 Flat File Context Design (Integrations) + +## Intent +Capture onboarding Step 5 integration configuration in a compact agent-readable context so agents can reason about connected services and execution constraints. + +## Storage location +- `workspace/workspace_/agent_context/step5_integrations.json` + +## Required Step 5 coverage +- integration map (`integrations`) +- provider list (`providers`) +- connected account references (`connected_accounts`) +- integration status and notes +- source payload and timestamps + +## Agent summary expectations +- connected integration count/list +- provider count +- retrieval hints for integration readiness checks + +## Linked traversal +Use `document_context.related_documents` and `context_manifest.json` to navigate Step 2/3/4 upstream dependencies when deciding tool execution paths.