diff --git a/backend/models/platform_insights_monitoring_models.py b/backend/models/platform_insights_monitoring_models.py index c2ee77da..5647e67a 100644 --- a/backend/models/platform_insights_monitoring_models.py +++ b/backend/models/platform_insights_monitoring_models.py @@ -102,3 +102,43 @@ class PlatformInsightsExecutionLog(Base): def __repr__(self): return f"" + +class PlatformInsightDeltaEvent(Base): + """Persisted delta events generated from comparable platform-insights windows.""" + __tablename__ = "platform_insight_delta_events" + + id = Column(Integer, primary_key=True, index=True) + + # Ownership and source + user_id = Column(String(255), nullable=False, index=True) + platform = Column(String(50), nullable=False, index=True) # 'gsc' / 'bing' + site_url = Column(String(500), nullable=True, index=True) + task_id = Column(Integer, ForeignKey("platform_insights_tasks.id"), nullable=True, index=True) + + # Event classification + event_type = Column(String(50), nullable=False, index=True) # 'decline' | 'rise' | 'opportunity' + entity_type = Column(String(50), nullable=False, index=True) # 'page' | 'query' + entity_key = Column(Text, nullable=False, index=True) # URL for page, text for query + + # Comparable windows + current_start_date = Column(String(20), nullable=False) + current_end_date = Column(String(20), nullable=False) + prior_start_date = Column(String(20), nullable=False) + prior_end_date = Column(String(20), nullable=False) + + # Delta payload and metadata + details = Column(JSON, nullable=False) # metric deltas + labels + thresholds used + severity = Column(String(20), nullable=True, index=True) # low | medium | high + + created_at = Column(DateTime, default=datetime.utcnow, index=True) + + __table_args__ = ( + Index('idx_platform_delta_events_user_platform_created', 'user_id', 'platform', 'created_at'), + Index('idx_platform_delta_events_user_type_created', 'user_id', 'event_type', 'created_at'), + ) + + def __repr__(self): + return ( + f"" + ) diff --git a/backend/services/analytics/opportunity_alerts_service.py b/backend/services/analytics/opportunity_alerts_service.py new file mode 100644 index 00000000..7495e085 --- /dev/null +++ b/backend/services/analytics/opportunity_alerts_service.py @@ -0,0 +1,62 @@ +"""Service for retrieving persisted platform opportunity/alert events.""" + +from typing import Dict, Any, List, Optional + +from loguru import logger + +from models.platform_insights_monitoring_models import PlatformInsightDeltaEvent +from services.database import get_session_for_user + + +class OpportunityAlertsService: + """Read optimized access to latest persisted opportunities/alerts.""" + + def get_latest_events( + self, + user_id: str, + platform: str = 'gsc', + site_url: Optional[str] = None, + event_types: Optional[List[str]] = None, + limit: int = 25, + ) -> List[Dict[str, Any]]: + db = None + try: + db = get_session_for_user(user_id) + query = db.query(PlatformInsightDeltaEvent).filter( + PlatformInsightDeltaEvent.user_id == user_id, + PlatformInsightDeltaEvent.platform == platform, + ) + if site_url: + query = query.filter(PlatformInsightDeltaEvent.site_url == site_url) + if event_types: + query = query.filter(PlatformInsightDeltaEvent.event_type.in_(event_types)) + + rows = query.order_by(PlatformInsightDeltaEvent.created_at.desc()).limit(limit).all() + return [ + { + 'id': row.id, + 'platform': row.platform, + 'site_url': row.site_url, + 'event_type': row.event_type, + 'entity_type': row.entity_type, + 'entity_key': row.entity_key, + 'severity': row.severity, + 'current_window': { + 'start': row.current_start_date, + 'end': row.current_end_date, + }, + 'prior_window': { + 'start': row.prior_start_date, + 'end': row.prior_end_date, + }, + 'details': row.details, + 'created_at': row.created_at.isoformat() if row.created_at else None, + } + for row in rows + ] + except Exception as e: + logger.error(f"Failed to load latest opportunity/alert events for user {user_id}: {e}") + return [] + finally: + if db: + db.close() diff --git a/backend/services/analytics/platform_analytics_service.py b/backend/services/analytics/platform_analytics_service.py index 23da5488..a094f47e 100644 --- a/backend/services/analytics/platform_analytics_service.py +++ b/backend/services/analytics/platform_analytics_service.py @@ -18,6 +18,7 @@ from .connection_manager import PlatformConnectionManager from .summary_generator import AnalyticsSummaryGenerator from .cache_manager import AnalyticsCacheManager +from .opportunity_alerts_service import OpportunityAlertsService class PlatformAnalyticsService: @@ -41,6 +42,7 @@ def __init__(self): self.connection_manager = PlatformConnectionManager() self.summary_generator = AnalyticsSummaryGenerator() self.cache_manager = AnalyticsCacheManager() + self.opportunity_alerts_service = OpportunityAlertsService() async def get_comprehensive_analytics(self, user_id: str, platforms: List[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None) -> Dict[str, AnalyticsData]: """ @@ -118,6 +120,28 @@ async def get_comprehensive_analytics(self, user_id: str, platforms: List[str] = return analytics_data + def get_latest_opportunities_alerts( + self, + user_id: str, + platform: str = 'gsc', + site_url: Optional[str] = None, + event_types: Optional[List[str]] = None, + limit: int = 25 + ) -> Dict[str, Any]: + """Return latest persisted opportunities/alerts without recomputing deltas.""" + events = self.opportunity_alerts_service.get_latest_events( + user_id=user_id, + platform=platform, + site_url=site_url, + event_types=event_types or ['opportunity', 'decline', 'rise'], + limit=limit, + ) + return { + 'platform': platform, + 'count': len(events), + 'events': events, + } + async def get_platform_connection_status(self, user_id: str) -> Dict[str, Dict[str, Any]]: """ Check connection status for all platforms diff --git a/backend/services/scheduler/executors/gsc_insights_executor.py b/backend/services/scheduler/executors/gsc_insights_executor.py index 3ae1e875..1d5b4853 100644 --- a/backend/services/scheduler/executors/gsc_insights_executor.py +++ b/backend/services/scheduler/executors/gsc_insights_executor.py @@ -8,13 +8,17 @@ import time import json from datetime import datetime, timedelta -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, List, Tuple from sqlalchemy.orm import Session import sqlite3 from ..core.executor_interface import TaskExecutor, TaskExecutionResult from ..core.exception_handler import TaskExecutionError, DatabaseError, SchedulerExceptionHandler -from models.platform_insights_monitoring_models import PlatformInsightsTask, PlatformInsightsExecutionLog +from models.platform_insights_monitoring_models import ( + PlatformInsightsTask, + PlatformInsightsExecutionLog, + PlatformInsightDeltaEvent, +) from services.gsc_service import GSCService from utils.logger_utils import get_service_logger @@ -215,11 +219,11 @@ async def _fetch_insights(self, task: PlatformInsightsTask, db: Session) -> Task else: # No cached data - try to fetch from API self.logger.info(f"No cached data found, fetching from GSC API") - return await self._fetch_fresh_data(user_id, site_url) + return await self._fetch_fresh_data(task, db, user_id, site_url) else: # Subsequent run: Always fetch fresh data self.logger.info(f"Subsequent run for GSC insights task {task.id} - fetching fresh data") - return await self._fetch_fresh_data(user_id, site_url) + return await self._fetch_fresh_data(task, db, user_id, site_url) except Exception as e: self.logger.error(f"Error fetching GSC insights for user {user_id}: {e}", exc_info=True) @@ -273,10 +277,9 @@ def _load_cached_data(self, user_id: str, site_url: Optional[str]) -> Optional[D self.logger.warning(f"Error loading cached GSC data: {e}") return None - async def _fetch_fresh_data(self, user_id: str, site_url: Optional[str]) -> TaskExecutionResult: - """Fetch fresh GSC insights from API.""" + async def _fetch_fresh_data(self, task: PlatformInsightsTask, db: Session, user_id: str, site_url: Optional[str]) -> TaskExecutionResult: + """Fetch fresh GSC insights from API and persist comparable-window delta events.""" try: - # If no site_url, get first site if not site_url: sites = self.gsc_service.get_site_list(user_id) if not sites: @@ -286,51 +289,92 @@ async def _fetch_fresh_data(self, user_id: str, site_url: Optional[str]) -> Task result_data={'error': 'No sites found'} ) site_url = sites[0]['siteUrl'] - - # Get analytics for last 30 days - end_date = datetime.now().strftime('%Y-%m-%d') - start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d') - - # Fetch search analytics - search_analytics = self.gsc_service.get_search_analytics( + + window_days = 30 + end_dt = datetime.now() + start_dt = end_dt - timedelta(days=window_days) + prior_end_dt = start_dt - timedelta(days=1) + prior_start_dt = prior_end_dt - timedelta(days=window_days) + + end_date = end_dt.strftime('%Y-%m-%d') + start_date = start_dt.strftime('%Y-%m-%d') + prior_end_date = prior_end_dt.strftime('%Y-%m-%d') + prior_start_date = prior_start_dt.strftime('%Y-%m-%d') + + current_analytics = self.gsc_service.get_search_analytics( user_id=user_id, site_url=site_url, start_date=start_date, end_date=end_date ) - - if 'error' in search_analytics: + if 'error' in current_analytics: return TaskExecutionResult( success=False, - error_message=search_analytics.get('error', 'Unknown error'), - result_data=search_analytics + error_message=current_analytics.get('error', 'Unknown error'), + result_data=current_analytics ) - - # Format insights data + + prior_analytics = self.gsc_service.get_search_analytics( + user_id=user_id, + site_url=site_url, + start_date=prior_start_date, + end_date=prior_end_date + ) + if 'error' in prior_analytics: + self.logger.warning( + f"Prior comparable window unavailable for user={user_id}, site={site_url}: " + f"{prior_analytics.get('error')}" + ) + prior_analytics = {'query_data': {'rows': []}, 'page_data': {'rows': []}} + + delta_events = self._compute_delta_events( + current_analytics=current_analytics, + prior_analytics=prior_analytics, + current_window=(start_date, end_date), + prior_window=(prior_start_date, prior_end_date), + ) + persisted_events = self._persist_delta_events( + db=db, + task=task, + user_id=user_id, + site_url=site_url, + delta_events=delta_events, + current_window=(start_date, end_date), + prior_window=(prior_start_date, prior_end_date), + ) + insights_data = { 'site_url': site_url, 'date_range': { 'start': start_date, 'end': end_date }, - 'overall_metrics': search_analytics.get('overall_metrics', {}), - 'query_data': search_analytics.get('query_data', {}), + 'prior_date_range': { + 'start': prior_start_date, + 'end': prior_end_date + }, + 'overall_metrics': current_analytics.get('overall_metrics', {}), + 'query_data': current_analytics.get('query_data', {}), + 'page_data': current_analytics.get('page_data', {}), + 'delta_events_generated': len(delta_events), + 'delta_events_persisted': persisted_events, 'fetched_at': datetime.utcnow().isoformat() } - + self.logger.info( - f"Successfully fetched GSC insights for user {user_id}, site {site_url}" + f"Successfully fetched GSC insights and persisted {persisted_events} delta events " + f"for user {user_id}, site {site_url}" ) - + return TaskExecutionResult( success=True, result_data={ 'data_source': 'api', 'insights': insights_data, - 'message': 'Fetched fresh data from GSC API' + 'message': 'Fetched fresh data from GSC API and persisted delta events' } ) - + except Exception as e: self.logger.error(f"Error fetching fresh GSC data: {e}", exc_info=True) return TaskExecutionResult( @@ -338,7 +382,186 @@ async def _fetch_fresh_data(self, user_id: str, site_url: Optional[str]) -> Task error_message=f"API fetch failed: {str(e)}", result_data={'error': str(e)} ) - + + def _extract_dimension_map(self, analytics_data: Dict[str, Any], entity_type: str) -> Dict[str, Dict[str, float]]: + """Build normalized metric map keyed by query/page.""" + container_key = 'page_data' if entity_type == 'page' else 'query_data' + rows = analytics_data.get(container_key, {}).get('rows', []) if isinstance(analytics_data, dict) else [] + + mapped: Dict[str, Dict[str, float]] = {} + for row in rows: + keys = row.get('keys', []) + if not keys: + continue + entity_key = str(keys[0]).strip() + if not entity_key: + continue + + clicks = float(row.get('clicks', 0) or 0) + impressions = float(row.get('impressions', 0) or 0) + ctr = float(row.get('ctr', 0) or 0) * 100.0 + position = float(row.get('position', 0) or 0) + + mapped[entity_key] = { + 'clicks': clicks, + 'impressions': impressions, + 'ctr': ctr, + 'position': position, + } + + return mapped + + def _compute_delta_events( + self, + current_analytics: Dict[str, Any], + prior_analytics: Dict[str, Any], + current_window: Tuple[str, str], + prior_window: Tuple[str, str], + ) -> List[Dict[str, Any]]: + """Generate labeled decline/rise/opportunity events from comparable windows.""" + thresholds = { + 'clicks_pct_change': 20.0, + 'impressions_pct_change': 25.0, + 'ctr_abs_pp_change': 1.5, + 'position_abs_change': 2.0, + 'opportunity_min_impressions': 100.0, + 'opportunity_max_ctr': 2.0, + 'opportunity_min_position': 3.0, + 'opportunity_max_position': 20.0, + } + + events: List[Dict[str, Any]] = [] + for entity_type in ('page', 'query'): + current_map = self._extract_dimension_map(current_analytics, entity_type) + prior_map = self._extract_dimension_map(prior_analytics, entity_type) + all_keys = set(current_map.keys()) | set(prior_map.keys()) + + for entity_key in all_keys: + curr = current_map.get(entity_key, {'clicks': 0.0, 'impressions': 0.0, 'ctr': 0.0, 'position': 0.0}) + prev = prior_map.get(entity_key, {'clicks': 0.0, 'impressions': 0.0, 'ctr': 0.0, 'position': 0.0}) + + delta_clicks = curr['clicks'] - prev['clicks'] + delta_impressions = curr['impressions'] - prev['impressions'] + delta_ctr = curr['ctr'] - prev['ctr'] + delta_position = curr['position'] - prev['position'] + + clicks_pct = ((delta_clicks / prev['clicks']) * 100.0) if prev['clicks'] > 0 else None + impressions_pct = ((delta_impressions / prev['impressions']) * 100.0) if prev['impressions'] > 0 else None + + trigger_type = None + reasons: List[str] = [] + + if clicks_pct is not None and clicks_pct <= -thresholds['clicks_pct_change']: + trigger_type = 'decline' + reasons.append('clicks_drop') + if impressions_pct is not None and impressions_pct <= -thresholds['impressions_pct_change']: + trigger_type = trigger_type or 'decline' + reasons.append('impressions_drop') + if delta_ctr <= -thresholds['ctr_abs_pp_change']: + trigger_type = trigger_type or 'decline' + reasons.append('ctr_drop') + if delta_position >= thresholds['position_abs_change']: + trigger_type = trigger_type or 'decline' + reasons.append('position_drop') + + if trigger_type is None: + if clicks_pct is not None and clicks_pct >= thresholds['clicks_pct_change']: + trigger_type = 'rise' + reasons.append('clicks_rise') + if impressions_pct is not None and impressions_pct >= thresholds['impressions_pct_change']: + trigger_type = trigger_type or 'rise' + reasons.append('impressions_rise') + if delta_ctr >= thresholds['ctr_abs_pp_change']: + trigger_type = trigger_type or 'rise' + reasons.append('ctr_rise') + if delta_position <= -thresholds['position_abs_change']: + trigger_type = trigger_type or 'rise' + reasons.append('position_rise') + + if trigger_type is None: + is_opportunity = ( + curr['impressions'] >= thresholds['opportunity_min_impressions'] + and curr['ctr'] <= thresholds['opportunity_max_ctr'] + and thresholds['opportunity_min_position'] <= curr['position'] <= thresholds['opportunity_max_position'] + ) + if is_opportunity: + trigger_type = 'opportunity' + reasons.append('high_impressions_low_ctr') + + if not trigger_type: + continue + + severity = 'low' + if trigger_type == 'decline': + severity = 'high' if len(reasons) >= 2 else 'medium' + elif trigger_type == 'opportunity': + severity = 'medium' + + events.append({ + 'event_type': trigger_type, + 'entity_type': entity_type, + 'entity_key': entity_key, + 'severity': severity, + 'reasons': reasons, + 'metrics': { + 'current': curr, + 'prior': prev, + 'delta': { + 'clicks': delta_clicks, + 'impressions': delta_impressions, + 'ctr': delta_ctr, + 'position': delta_position, + 'clicks_pct': clicks_pct, + 'impressions_pct': impressions_pct, + }, + }, + 'thresholds': thresholds, + 'current_window': {'start': current_window[0], 'end': current_window[1]}, + 'prior_window': {'start': prior_window[0], 'end': prior_window[1]}, + }) + + return events + + def _persist_delta_events( + self, + db: Session, + task: PlatformInsightsTask, + user_id: str, + site_url: str, + delta_events: List[Dict[str, Any]], + current_window: Tuple[str, str], + prior_window: Tuple[str, str], + ) -> int: + """Persist generated delta events for downstream dashboard/content-strategy features.""" + # Remove previously stored events for the same window to keep retrieval deterministic + db.query(PlatformInsightDeltaEvent).filter( + PlatformInsightDeltaEvent.user_id == user_id, + PlatformInsightDeltaEvent.platform == 'gsc', + PlatformInsightDeltaEvent.site_url == site_url, + PlatformInsightDeltaEvent.current_start_date == current_window[0], + PlatformInsightDeltaEvent.current_end_date == current_window[1], + ).delete(synchronize_session=False) + + for event in delta_events: + db.add(PlatformInsightDeltaEvent( + user_id=user_id, + platform='gsc', + site_url=site_url, + task_id=task.id, + event_type=event['event_type'], + entity_type=event['entity_type'], + entity_key=event['entity_key'], + current_start_date=current_window[0], + current_end_date=current_window[1], + prior_start_date=prior_window[0], + prior_end_date=prior_window[1], + details=event, + severity=event.get('severity'), + )) + + db.flush() + return len(delta_events) + def calculate_next_execution( self, task: PlatformInsightsTask,