Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions backend/models/platform_insights_monitoring_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,43 @@ class PlatformInsightsExecutionLog(Base):
def __repr__(self):
return f"<PlatformInsightsExecutionLog(id={self.id}, task_id={self.task_id}, status={self.status}, execution_date={self.execution_date})>"


class PlatformInsightDeltaEvent(Base):
"""Persisted delta events generated from comparable platform-insights windows."""
__tablename__ = "platform_insight_delta_events"

id = Column(Integer, primary_key=True, index=True)

# Ownership and source
user_id = Column(String(255), nullable=False, index=True)
platform = Column(String(50), nullable=False, index=True) # 'gsc' / 'bing'
site_url = Column(String(500), nullable=True, index=True)
task_id = Column(Integer, ForeignKey("platform_insights_tasks.id"), nullable=True, index=True)

# Event classification
event_type = Column(String(50), nullable=False, index=True) # 'decline' | 'rise' | 'opportunity'
entity_type = Column(String(50), nullable=False, index=True) # 'page' | 'query'
entity_key = Column(Text, nullable=False, index=True) # URL for page, text for query

# Comparable windows
current_start_date = Column(String(20), nullable=False)
current_end_date = Column(String(20), nullable=False)
prior_start_date = Column(String(20), nullable=False)
prior_end_date = Column(String(20), nullable=False)

# Delta payload and metadata
details = Column(JSON, nullable=False) # metric deltas + labels + thresholds used
severity = Column(String(20), nullable=True, index=True) # low | medium | high

created_at = Column(DateTime, default=datetime.utcnow, index=True)

__table_args__ = (
Index('idx_platform_delta_events_user_platform_created', 'user_id', 'platform', 'created_at'),
Index('idx_platform_delta_events_user_type_created', 'user_id', 'event_type', 'created_at'),
)

def __repr__(self):
return (
f"<PlatformInsightDeltaEvent(id={self.id}, user_id={self.user_id}, platform={self.platform}, "
f"event_type={self.event_type}, entity_type={self.entity_type})>"
)
62 changes: 62 additions & 0 deletions backend/services/analytics/opportunity_alerts_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Service for retrieving persisted platform opportunity/alert events."""

from typing import Dict, Any, List, Optional

from loguru import logger

from models.platform_insights_monitoring_models import PlatformInsightDeltaEvent
from services.database import get_session_for_user


class OpportunityAlertsService:
"""Read optimized access to latest persisted opportunities/alerts."""

def get_latest_events(
self,
user_id: str,
platform: str = 'gsc',
site_url: Optional[str] = None,
event_types: Optional[List[str]] = None,
limit: int = 25,
) -> List[Dict[str, Any]]:
db = None
try:
db = get_session_for_user(user_id)
query = db.query(PlatformInsightDeltaEvent).filter(
PlatformInsightDeltaEvent.user_id == user_id,
PlatformInsightDeltaEvent.platform == platform,
)
if site_url:
query = query.filter(PlatformInsightDeltaEvent.site_url == site_url)
if event_types:
query = query.filter(PlatformInsightDeltaEvent.event_type.in_(event_types))

rows = query.order_by(PlatformInsightDeltaEvent.created_at.desc()).limit(limit).all()
return [
{
'id': row.id,
'platform': row.platform,
'site_url': row.site_url,
'event_type': row.event_type,
'entity_type': row.entity_type,
'entity_key': row.entity_key,
'severity': row.severity,
'current_window': {
'start': row.current_start_date,
'end': row.current_end_date,
},
'prior_window': {
'start': row.prior_start_date,
'end': row.prior_end_date,
},
'details': row.details,
'created_at': row.created_at.isoformat() if row.created_at else None,
}
for row in rows
]
except Exception as e:
logger.error(f"Failed to load latest opportunity/alert events for user {user_id}: {e}")
return []
finally:
if db:
db.close()
24 changes: 24 additions & 0 deletions backend/services/analytics/platform_analytics_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .connection_manager import PlatformConnectionManager
from .summary_generator import AnalyticsSummaryGenerator
from .cache_manager import AnalyticsCacheManager
from .opportunity_alerts_service import OpportunityAlertsService


class PlatformAnalyticsService:
Expand All @@ -41,6 +42,7 @@ def __init__(self):
self.connection_manager = PlatformConnectionManager()
self.summary_generator = AnalyticsSummaryGenerator()
self.cache_manager = AnalyticsCacheManager()
self.opportunity_alerts_service = OpportunityAlertsService()

async def get_comprehensive_analytics(self, user_id: str, platforms: List[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None) -> Dict[str, AnalyticsData]:
"""
Expand Down Expand Up @@ -118,6 +120,28 @@ async def get_comprehensive_analytics(self, user_id: str, platforms: List[str] =

return analytics_data

def get_latest_opportunities_alerts(
self,
user_id: str,
platform: str = 'gsc',
site_url: Optional[str] = None,
event_types: Optional[List[str]] = None,
limit: int = 25
) -> Dict[str, Any]:
"""Return latest persisted opportunities/alerts without recomputing deltas."""
events = self.opportunity_alerts_service.get_latest_events(
user_id=user_id,
platform=platform,
site_url=site_url,
event_types=event_types or ['opportunity', 'decline', 'rise'],
limit=limit,
)
return {
'platform': platform,
'count': len(events),
'events': events,
}

async def get_platform_connection_status(self, user_id: str) -> Dict[str, Dict[str, Any]]:
"""
Check connection status for all platforms
Expand Down
Loading