Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
DeepCompetitorAnalysisExecutionLog
)
import os
from services.analytics.opportunity_scorer import categorize_opportunities

logger = get_service_logger("onboarding.data_integration")

Expand Down Expand Up @@ -1035,6 +1036,77 @@ def _get_deep_competitor_analysis(self, user_id: str, db: Session) -> Dict[str,
logger.error(f"Error getting deep competitor analysis for user {user_id}: {str(e)}")
return {}


def _normalize_gsc_rows_for_opportunities(self, rows: List[Dict[str, Any]], dimension: str) -> Dict[str, Dict[str, Any]]:
"""Normalize query/page rows to scorer schema."""
normalized: Dict[str, Dict[str, Any]] = {}
for row in rows or []:
keys = row.get('keys', [])
raw_key = str(keys[0]) if keys else ''
if not raw_key:
continue

if dimension == 'query':
entry_id = f"q:{raw_key}"
query = raw_key
page_url = None
else:
entry_id = f"p:{raw_key}"
query = None
page_url = raw_key

clicks = float(row.get('clicks', 0) or 0)
impressions = float(row.get('impressions', 0) or 0)
raw_ctr = row.get('ctr')
ctr = round(float(raw_ctr) * 100, 2) if raw_ctr is not None else (round((clicks / impressions) * 100, 2) if impressions > 0 else 0.0)

normalized[raw_key] = {
'id': entry_id,
'query': query,
'page_url': page_url,
'current_metrics': {
'clicks': clicks,
'impressions': impressions,
'ctr': ctr,
'position': round(float(row.get('position', 0) or 0), 2),
},
}

return normalized

def _merge_current_previous_rows(self, current_rows: List[Dict[str, Any]], previous_rows: List[Dict[str, Any]], dimension: str) -> List[Dict[str, Any]]:
"""Merge current and previous metrics into stable row format."""
current_map = self._normalize_gsc_rows_for_opportunities(current_rows, dimension)
previous_map = self._normalize_gsc_rows_for_opportunities(previous_rows, dimension)

merged_rows: List[Dict[str, Any]] = []
for key in set(current_map.keys()) | set(previous_map.keys()):
current_entry = current_map.get(key, {})
previous_entry = previous_map.get(key, {})
merged_rows.append({
'id': current_entry.get('id') or previous_entry.get('id') or (f"q:{key}" if dimension == 'query' else f"p:{key}"),
'query': current_entry.get('query') or previous_entry.get('query'),
'page_url': current_entry.get('page_url') or previous_entry.get('page_url'),
'current_metrics': current_entry.get('current_metrics', {'clicks': 0.0, 'impressions': 0.0, 'ctr': 0.0, 'position': 0.0}),
'previous_metrics': previous_entry.get('current_metrics', {'clicks': 0.0, 'impressions': 0.0, 'ctr': 0.0, 'position': 0.0}),
})
return merged_rows

def _extract_gsc_opportunities(self, gsc_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Build scorer opportunities from raw GSC analytics payload."""
if not isinstance(gsc_data, dict):
return []

current_query_rows = gsc_data.get('query_data', {}).get('rows', [])
current_page_rows = gsc_data.get('page_data', {}).get('rows', [])
previous = gsc_data.get('previous_period', {}) if isinstance(gsc_data.get('previous_period'), dict) else {}
previous_query_rows = previous.get('query_data', {}).get('rows', [])
previous_page_rows = previous.get('page_data', {}).get('rows', [])

query_rows = self._merge_current_previous_rows(current_query_rows, previous_query_rows, 'query')
page_rows = self._merge_current_previous_rows(current_page_rows, previous_page_rows, 'page')
return categorize_opportunities(query_rows, page_rows)

async def _get_gsc_analytics(self, user_id: str) -> Dict[str, Any]:
"""Get Google Search Console analytics data for the user."""
try:
Expand All @@ -1050,10 +1122,12 @@ async def _get_gsc_analytics(self, user_id: str) -> Dict[str, Any]:

if gsc_data and gsc_data.get('status') != 'disconnected' and not gsc_data.get('error'):
logger.info(f"Retrieved GSC analytics for user {user_id}")
opportunities = self._extract_gsc_opportunities(gsc_data)
return {
'data': gsc_data.get('data', {}),
'metrics': gsc_data.get('metrics', {}),
'date_range': gsc_data.get('date_range', {}),
'opportunities': opportunities,
'data_freshness': 1.0, # GSC data is typically fresh
'confidence_level': 0.9
}
Expand Down
88 changes: 87 additions & 1 deletion backend/services/analytics/handlers/gsc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ..models.analytics_data import AnalyticsData
from ..models.platform_types import PlatformType
from .base_handler import BaseAnalyticsHandler
from ..opportunity_scorer import categorize_opportunities


class GSCAnalyticsHandler(BaseAnalyticsHandler):
Expand Down Expand Up @@ -372,6 +373,15 @@ def _process_gsc_metrics(self, search_analytics: Dict[str, Any]) -> Dict[str, An
except Exception as e:
logger.warning(f"Failed computing cannibalization: {e}")

current_query_rows = self._extract_period_rows(search_analytics, 'query', 'current')
previous_query_rows = self._extract_period_rows(search_analytics, 'query', 'previous')
current_page_rows = self._extract_period_rows(search_analytics, 'page', 'current')
previous_page_rows = self._extract_period_rows(search_analytics, 'page', 'previous')

query_rows_for_scoring = self._build_opportunity_rows(current_query_rows, previous_query_rows, 'query')
page_rows_for_scoring = self._build_opportunity_rows(current_page_rows, previous_page_rows, 'page')
opportunities = categorize_opportunities(query_rows_for_scoring, page_rows_for_scoring)

return {
'connection_status': 'connected',
'connected_sites': 1,
Expand All @@ -382,7 +392,8 @@ def _process_gsc_metrics(self, search_analytics: Dict[str, Any]) -> Dict[str, An
'total_queries': len(top_queries_source) if top_queries_source else 0,
'top_queries': top_queries,
'top_pages': top_pages,
'cannibalization': cannibalization
'cannibalization': cannibalization,
'opportunities': opportunities
}

except Exception as e:
Expand All @@ -397,9 +408,84 @@ def _process_gsc_metrics(self, search_analytics: Dict[str, Any]) -> Dict[str, An
'total_queries': 0,
'top_queries': [],
'top_pages': [],
'opportunities': [],
'error': str(e)
}


def _safe_ctr_percent(self, row: Dict[str, Any]) -> float:
"""Return CTR as percentage from row data."""
clicks_val = row.get('clicks', 0) or 0
impr_val = row.get('impressions', 0) or 0
raw_ctr = row.get('ctr', None)
if raw_ctr is not None:
return round(float(raw_ctr) * 100, 2)
return round(((clicks_val / impr_val) * 100), 2) if impr_val > 0 else 0.0

def _build_period_rows(self, rows: list, dimension: str, prefix: str = "") -> Dict[str, Dict[str, Any]]:
"""Normalize GSC rows into scorer-ready metrics keyed by query/page URL."""
normalized: Dict[str, Dict[str, Any]] = {}
for row in rows or []:
if dimension == 'query':
key = self._extract_query_from_row(row)
id_value = f"q:{key}"
query = key
page_url = None
else:
key = self._extract_page_from_row(row)
id_value = f"p:{key}"
query = None
page_url = key

if not key:
continue

normalized[key] = {
'id': id_value,
'query': query,
'page_url': page_url,
'current_metrics': {
'clicks': float(row.get('clicks', 0) or 0),
'impressions': float(row.get('impressions', 0) or 0),
'ctr': self._safe_ctr_percent(row),
'position': round(float(row.get('position', 0) or 0), 2),
},
'previous_metrics': {}
}

return normalized

def _build_opportunity_rows(self, current_rows: list, previous_rows: list, dimension: str) -> list:
"""Merge current/previous GSC rows into stable scorer rows."""
current_map = self._build_period_rows(current_rows, dimension)
previous_map = self._build_period_rows(previous_rows, dimension, prefix='prev_')

merged = []
for key in set(current_map.keys()) | set(previous_map.keys()):
current_entry = current_map.get(key, {})
previous_entry = previous_map.get(key, {})

merged.append({
'id': current_entry.get('id') or previous_entry.get('id') or (f"q:{key}" if dimension == 'query' else f"p:{key}"),
'query': current_entry.get('query') or previous_entry.get('query'),
'page_url': current_entry.get('page_url') or previous_entry.get('page_url'),
'current_metrics': current_entry.get('current_metrics', {
'clicks': 0.0, 'impressions': 0.0, 'ctr': 0.0, 'position': 0.0
}),
'previous_metrics': previous_entry.get('current_metrics', {
'clicks': 0.0, 'impressions': 0.0, 'ctr': 0.0, 'position': 0.0
})
})

return merged

def _extract_period_rows(self, search_analytics: Dict[str, Any], dimension: str, period_key: str) -> list:
"""Extract rows for a dimension from either the current or previous period payload."""
source = search_analytics if period_key == 'current' else search_analytics.get('previous_period', {})
if dimension == 'query':
return source.get('query_data', {}).get('rows', [])
return source.get('page_data', {}).get('rows', [])

def _extract_query_from_row(self, row: Dict[str, Any]) -> str:
"""Extract query text from GSC API row data"""
try:
Expand Down
128 changes: 128 additions & 0 deletions backend/services/analytics/opportunity_scorer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""Opportunity scoring helpers for search analytics data."""

from typing import Any, Dict, List


Opportunity = Dict[str, Any]
MetricRow = Dict[str, Any]


def high_impression_low_ctr_queries(
query_rows: List[MetricRow],
min_impressions: float = 100.0,
max_ctr: float = 2.5,
) -> List[Opportunity]:
"""Return queries with strong impressions but weak CTR."""
opportunities: List[Opportunity] = []
for row in query_rows:
current = row.get("current_metrics", {})
impressions = float(current.get("impressions", 0) or 0)
ctr = float(current.get("ctr", 0) or 0)
if impressions < min_impressions or ctr > max_ctr:
continue

opportunities.append({
"id": row.get("id") or f"q:{row.get('query', 'unknown')}",
"query": row.get("query"),
"page_url": row.get("page_url"),
"reason": "high_impression_low_ctr_query",
"score": 0.0,
"current_metrics": current,
"previous_metrics": row.get("previous_metrics", {}),
})
return opportunities


def rising_queries(
query_rows: List[MetricRow],
min_impression_delta: float = 50.0,
min_click_delta: float = 5.0,
) -> List[Opportunity]:
"""Return query opportunities with positive window-over-window growth."""
opportunities: List[Opportunity] = []
for row in query_rows:
current = row.get("current_metrics", {})
previous = row.get("previous_metrics", {})
delta_impressions = float(current.get("impressions", 0) or 0) - float(previous.get("impressions", 0) or 0)
delta_clicks = float(current.get("clicks", 0) or 0) - float(previous.get("clicks", 0) or 0)
if delta_impressions < min_impression_delta and delta_clicks < min_click_delta:
continue

opportunities.append({
"id": row.get("id") or f"q:{row.get('query', 'unknown')}",
"query": row.get("query"),
"page_url": row.get("page_url"),
"reason": "rising_query",
"score": 0.0,
"current_metrics": current,
"previous_metrics": previous,
})
return opportunities


def declining_pages(
page_rows: List[MetricRow],
min_impression_drop: float = 50.0,
min_click_drop: float = 5.0,
) -> List[Opportunity]:
"""Return page opportunities with negative window-over-window change."""
opportunities: List[Opportunity] = []
for row in page_rows:
current = row.get("current_metrics", {})
previous = row.get("previous_metrics", {})
impression_drop = float(previous.get("impressions", 0) or 0) - float(current.get("impressions", 0) or 0)
click_drop = float(previous.get("clicks", 0) or 0) - float(current.get("clicks", 0) or 0)
if impression_drop < min_impression_drop and click_drop < min_click_drop:
continue

opportunities.append({
"id": row.get("id") or f"p:{row.get('page_url', 'unknown')}",
"query": row.get("query"),
"page_url": row.get("page_url"),
"reason": "declining_page",
"score": 0.0,
"current_metrics": current,
"previous_metrics": previous,
})
return opportunities


def score_and_rank_opportunities(opportunities: List[Opportunity]) -> List[Opportunity]:
"""Assign simple priority score and return opportunities ordered by score."""
scored: List[Opportunity] = []
for item in opportunities:
current = item.get("current_metrics", {})
previous = item.get("previous_metrics", {})
impressions = float(current.get("impressions", 0) or 0)
clicks = float(current.get("clicks", 0) or 0)
ctr = float(current.get("ctr", 0) or 0)

previous_impressions = float(previous.get("impressions", 0) or 0)
previous_clicks = float(previous.get("clicks", 0) or 0)
momentum = abs(impressions - previous_impressions) + (abs(clicks - previous_clicks) * 10)

opportunity_multiplier = {
"high_impression_low_ctr_query": 1.2,
"rising_query": 1.0,
"declining_page": 1.3,
}.get(item.get("reason"), 1.0)

score = (impressions * 0.05) + (clicks * 0.8) + ((100.0 - ctr) * 0.1) + (momentum * 0.15)
updated = dict(item)
updated["score"] = round(score * opportunity_multiplier, 2)
scored.append(updated)

return sorted(scored, key=lambda x: (x.get("score", 0), str(x.get("id", ""))), reverse=True)


def categorize_opportunities(
query_rows: List[MetricRow],
page_rows: List[MetricRow],
) -> List[Opportunity]:
"""Build all opportunity categories and return a stable, ranked schema."""
opportunities: List[Opportunity] = []
opportunities.extend(high_impression_low_ctr_queries(query_rows))
opportunities.extend(rising_queries(query_rows))
opportunities.extend(declining_pages(page_rows))
return score_and_rank_opportunities(opportunities)

Loading