Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ async def normalize_gsc_analytics(gsc_data: Dict[str, Any]) -> Dict[str, Any]:
if not gsc_data:
logger.warning("⚠️ normalize_gsc_analytics: Empty gsc_data received")
return {}


status = gsc_data.get('status', 'success')
if status not in ('success', 'partial_success', 'no_data'):
logger.warning(f"⚠️ normalize_gsc_analytics: Skipping due to status={status}, error={gsc_data.get('error')}")
return {}

logger.warning(f"🔍 normalize_gsc_analytics received keys: {list(gsc_data.keys())}")

# Extract metrics from GSC data
metrics = gsc_data.get('metrics', {})
data = gsc_data.get('data', {})
Expand Down
91 changes: 30 additions & 61 deletions backend/services/analytics/handlers/gsc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,66 +149,35 @@ def _process_gsc_metrics(self, search_analytics: Dict[str, Any]) -> Dict[str, An
logger.info(f"GSC Raw search analytics structure: {search_analytics}")
logger.info(f"GSC Raw search analytics keys: {list(search_analytics.keys())}")

# Handle new data structure with overall_metrics and query_data
if 'overall_metrics' in search_analytics:
# New structure from updated GSC service
overall_rows = search_analytics.get('overall_metrics', {}).get('rows', [])
query_rows = search_analytics.get('query_data', {}).get('rows', [])

# Calculate totals from overall_rows (most accurate as it includes anonymized queries)
total_clicks = 0
total_impressions = 0
total_position = 0
valid_position_rows = 0

# Use overall_rows for totals if available, otherwise fallback to query_rows
calc_rows = overall_rows if overall_rows else query_rows

for row in calc_rows:
clicks = row.get('clicks', 0)
impressions = row.get('impressions', 0)
position = row.get('position', 0)

total_clicks += clicks
total_impressions += impressions

if position and position > 0:
total_position += position * impressions # Weighted average

# Calculate weighted average position
avg_position = total_position / total_impressions if total_impressions > 0 else 0
avg_ctr = (total_clicks / total_impressions * 100) if total_impressions > 0 else 0

# Use query_rows for top queries list
top_queries_source = query_rows

else:
# Legacy structure
rows = search_analytics.get('rows', [])
# ... existing legacy logic ...
calc_rows = rows
top_queries_source = rows

total_clicks = 0
total_impressions = 0
total_position = 0
valid_position_rows = 0

for row in calc_rows:
clicks = row.get('clicks', 0)
impressions = row.get('impressions', 0)
position = row.get('position', 0)

total_clicks += clicks
total_impressions += impressions

if position and position > 0:
# Simple average for legacy/unknown structure if we can't do weighted
total_position += position
valid_position_rows += 1

avg_ctr = (total_clicks / total_impressions * 100) if total_impressions > 0 else 0
avg_position = total_position / valid_position_rows if valid_position_rows > 0 else 0
status = search_analytics.get('status', 'success')
overall_rows = search_analytics.get('overall_metrics', {}).get('rows', [])
query_rows = search_analytics.get('query_data', {}).get('rows', [])

# Calculate totals from overall_rows (most accurate as it includes anonymized queries)
total_clicks = 0
total_impressions = 0
total_position = 0

# Use overall_rows for totals if available, otherwise fallback to query_rows
calc_rows = overall_rows if overall_rows else query_rows

for row in calc_rows:
clicks = row.get('clicks', 0)
impressions = row.get('impressions', 0)
position = row.get('position', 0)

total_clicks += clicks
total_impressions += impressions

if position and position > 0:
total_position += position * impressions # Weighted average

# Calculate weighted average position
avg_position = total_position / total_impressions if total_impressions > 0 else 0
avg_ctr = (total_clicks / total_impressions * 100) if total_impressions > 0 else 0

# Use query_rows for top queries list
top_queries_source = query_rows


# Get top performing queries
Expand Down Expand Up @@ -373,7 +342,7 @@ def _process_gsc_metrics(self, search_analytics: Dict[str, Any]) -> Dict[str, An
logger.warning(f"Failed computing cannibalization: {e}")

return {
'connection_status': 'connected',
'connection_status': 'connected' if status in ('success', 'partial_success', 'no_data') else 'error',
'connected_sites': 1,
'total_clicks': total_clicks,
'total_impressions': total_impressions,
Expand Down
99 changes: 85 additions & 14 deletions backend/services/gsc_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,68 @@ def get_site_list(self, user_id: str) -> List[Dict[str, Any]]:
def get_search_analytics(self, user_id: str, site_url: str,
start_date: str = None, end_date: str = None) -> Dict[str, Any]:
"""Get search analytics data from GSC."""
def _base_payload(status: str, error: Optional[str] = None) -> Dict[str, Any]:
payload = {
'overall_metrics': {'rows': [], 'rowCount': 0},
'query_data': {'rows': [], 'rowCount': 0},
'page_data': {'rows': [], 'rowCount': 0},
'query_page_data': {'rows': [], 'rowCount': 0},
'verification_data': {'rows': [], 'rowCount': 0},
'date_range': {
'start': start_date,
'end': end_date
},
'site_url': site_url,
# Backward-compatible aliases for existing consumers
'startDate': start_date,
'endDate': end_date,
'siteUrl': site_url,
'status': status
}
if error:
payload['error'] = error
return payload

def _normalize_payload(payload: Dict[str, Any]) -> Dict[str, Any]:
"""Normalize cached/legacy payloads into the unified contract."""
if not isinstance(payload, dict):
return _base_payload(status='error', error='Invalid cached analytics payload')

date_range = payload.get('date_range') if isinstance(payload.get('date_range'), dict) else {
'start': payload.get('startDate', start_date),
'end': payload.get('endDate', end_date)
}
normalized = {
'overall_metrics': payload.get('overall_metrics') if isinstance(payload.get('overall_metrics'), dict) else {'rows': payload.get('rows', []), 'rowCount': payload.get('rowCount', 0)},
'query_data': payload.get('query_data') if isinstance(payload.get('query_data'), dict) else {'rows': [], 'rowCount': 0},
'page_data': payload.get('page_data') if isinstance(payload.get('page_data'), dict) else {'rows': [], 'rowCount': 0},
'query_page_data': payload.get('query_page_data') if isinstance(payload.get('query_page_data'), dict) else {'rows': [], 'rowCount': 0},
'verification_data': payload.get('verification_data') if isinstance(payload.get('verification_data'), dict) else {'rows': [], 'rowCount': 0},
'date_range': {
'start': date_range.get('start', start_date),
'end': date_range.get('end', end_date)
},
'site_url': payload.get('site_url') or payload.get('siteUrl') or site_url,
'status': payload.get('status') or ('error' if payload.get('error') else 'success')
}

# Keep aliases for compatibility
normalized['startDate'] = normalized['date_range']['start']
normalized['endDate'] = normalized['date_range']['end']
normalized['siteUrl'] = normalized['site_url']

if payload.get('error'):
normalized['error'] = payload.get('error')
if payload.get('warning'):
normalized['warning'] = payload.get('warning')

for section in ('overall_metrics', 'query_data', 'page_data', 'query_page_data', 'verification_data'):
rows = normalized[section].get('rows', [])
normalized[section]['rows'] = rows if isinstance(rows, list) else []
normalized[section]['rowCount'] = normalized[section].get('rowCount', len(normalized[section]['rows']))

return normalized

try:
# Set default date range (last 30 days)
if not end_date:
Expand All @@ -407,21 +469,18 @@ def get_search_analytics(self, user_id: str, site_url: str,
cache_key = f"{user_id}_{site_url}_{start_date}_{end_date}"
cached_data = self._get_cached_data(user_id, site_url, 'analytics', cache_key)
if cached_data and isinstance(cached_data, dict):
has_pages = 'page_data' in cached_data and isinstance(cached_data.get('page_data'), dict)
has_queries = 'query_data' in cached_data and isinstance(cached_data.get('query_data'), dict)
if has_pages and has_queries:
logger.info(f"Returning cached analytics data for user: {user_id} (includes page_data)")
return cached_data
logger.info(f"Returning cached analytics data for user: {user_id}")
return _normalize_payload(cached_data)

try:
service = self.get_authenticated_service(user_id)
except ValueError:
logger.warning(f"User {user_id} not connected to GSC. Returning empty analytics.")
return {'error': 'User not connected to GSC', 'rows': [], 'rowCount': 0}
return _base_payload(status='not_connected', error='User not connected to GSC')

if not service:
logger.error(f"Failed to get authenticated GSC service for user: {user_id}")
return {'error': 'Authentication failed', 'rows': [], 'rowCount': 0}
return _base_payload(status='auth_failed', error='Authentication failed')

# Step 1: Verify data presence first (as per GSC API documentation)
verification_request = {
Expand All @@ -444,13 +503,13 @@ def get_search_analytics(self, user_id: str, site_url: str,
verification_rows = verification_response.get('rows', [])
if not verification_rows:
logger.warning(f"No GSC data available for user {user_id} in date range {start_date} to {end_date}")
return {'error': 'No data available for this date range', 'rows': [], 'rowCount': 0}
return _base_payload(status='no_data', error='No data available for this date range')

logger.info(f"GSC Data verification successful - found {len(verification_rows)} days with data")

except Exception as verification_error:
logger.error(f"GSC Data verification failed for user {user_id}: {verification_error}")
return {'error': f'Data verification failed: {str(verification_error)}', 'rows': [], 'rowCount': 0}
return _base_payload(status='verification_failed', error=f'Data verification failed: {str(verification_error)}')

# Step 2: Get daily metrics for charting (ensure we have rows)
request = {
Expand All @@ -471,7 +530,7 @@ def get_search_analytics(self, user_id: str, site_url: str,
logger.info(f"GSC API response for user {user_id}: {response}")
except Exception as api_error:
logger.error(f"GSC API call failed for user {user_id}: {api_error}")
return {'error': str(api_error), 'rows': [], 'rowCount': 0}
return _base_payload(status='api_error', error=str(api_error))

# Step 3: Get query-level data for insights (as per documentation)
query_request = {
Expand Down Expand Up @@ -561,13 +620,19 @@ def get_search_analytics(self, user_id: str, site_url: str,
},
'startDate': start_date,
'endDate': end_date,
'siteUrl': site_url
'siteUrl': site_url,
'date_range': {
'start': start_date,
'end': end_date
},
'site_url': site_url,
'status': 'success'
}

self._cache_data(user_id, site_url, 'analytics', analytics_data, cache_key)

logger.info(f"Retrieved comprehensive analytics data for user: {user_id}, site: {site_url}")
return analytics_data
return _normalize_payload(analytics_data)

except Exception as query_error:
logger.error(f"GSC Query-level request failed for user {user_id}: {query_error}")
Expand All @@ -587,15 +652,21 @@ def get_search_analytics(self, user_id: str, site_url: str,
'startDate': start_date,
'endDate': end_date,
'siteUrl': site_url,
'date_range': {
'start': start_date,
'end': end_date
},
'site_url': site_url,
'status': 'partial_success',
'warning': f'Query-level data unavailable: {str(query_error)}'
}

self._cache_data(user_id, site_url, 'analytics', analytics_data, cache_key)
return analytics_data
return _normalize_payload(analytics_data)

except Exception as e:
logger.error(f"Error getting search analytics for user {user_id}: {e}")
raise
return _base_payload(status='error', error=str(e))

def get_sitemaps(self, user_id: str, site_url: str) -> List[Dict[str, Any]]:
"""Get sitemaps from GSC."""
Expand Down
10 changes: 5 additions & 5 deletions backend/services/scheduler/executors/gsc_insights_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,20 +299,20 @@ async def _fetch_fresh_data(self, user_id: str, site_url: Optional[str]) -> Task
end_date=end_date
)

if 'error' in search_analytics:
if search_analytics.get('status') not in ('success', 'partial_success'):
return TaskExecutionResult(
success=False,
error_message=search_analytics.get('error', 'Unknown error'),
error_message=search_analytics.get('error', f"GSC analytics unavailable: {search_analytics.get('status', 'unknown')}"),
result_data=search_analytics
)

# Format insights data
insights_data = {
'site_url': site_url,
'date_range': {
'site_url': search_analytics.get('site_url', site_url),
'date_range': search_analytics.get('date_range', {
'start': start_date,
'end': end_date
},
}),
'overall_metrics': search_analytics.get('overall_metrics', {}),
'query_data': search_analytics.get('query_data', {}),
'fetched_at': datetime.utcnow().isoformat()
Expand Down