diff --git a/backend/api/content_planning/services/content_strategy/autofill/normalizers/analytics_normalizer.py b/backend/api/content_planning/services/content_strategy/autofill/normalizers/analytics_normalizer.py index 9989b85f..952c1087 100644 --- a/backend/api/content_planning/services/content_strategy/autofill/normalizers/analytics_normalizer.py +++ b/backend/api/content_planning/services/content_strategy/autofill/normalizers/analytics_normalizer.py @@ -15,9 +15,14 @@ async def normalize_gsc_analytics(gsc_data: Dict[str, Any]) -> Dict[str, Any]: if not gsc_data: logger.warning("⚠️ normalize_gsc_analytics: Empty gsc_data received") return {} - + + status = gsc_data.get('status', 'success') + if status not in ('success', 'partial_success', 'no_data'): + logger.warning(f"⚠️ normalize_gsc_analytics: Skipping due to status={status}, error={gsc_data.get('error')}") + return {} + logger.warning(f"🔍 normalize_gsc_analytics received keys: {list(gsc_data.keys())}") - + # Extract metrics from GSC data metrics = gsc_data.get('metrics', {}) data = gsc_data.get('data', {}) diff --git a/backend/services/analytics/handlers/gsc_handler.py b/backend/services/analytics/handlers/gsc_handler.py index 8c7c1f0f..085a6891 100644 --- a/backend/services/analytics/handlers/gsc_handler.py +++ b/backend/services/analytics/handlers/gsc_handler.py @@ -149,66 +149,35 @@ def _process_gsc_metrics(self, search_analytics: Dict[str, Any]) -> Dict[str, An logger.info(f"GSC Raw search analytics structure: {search_analytics}") logger.info(f"GSC Raw search analytics keys: {list(search_analytics.keys())}") - # Handle new data structure with overall_metrics and query_data - if 'overall_metrics' in search_analytics: - # New structure from updated GSC service - overall_rows = search_analytics.get('overall_metrics', {}).get('rows', []) - query_rows = search_analytics.get('query_data', {}).get('rows', []) - - # Calculate totals from overall_rows (most accurate as it includes anonymized queries) - total_clicks = 0 - total_impressions = 0 - total_position = 0 - valid_position_rows = 0 - - # Use overall_rows for totals if available, otherwise fallback to query_rows - calc_rows = overall_rows if overall_rows else query_rows - - for row in calc_rows: - clicks = row.get('clicks', 0) - impressions = row.get('impressions', 0) - position = row.get('position', 0) - - total_clicks += clicks - total_impressions += impressions - - if position and position > 0: - total_position += position * impressions # Weighted average - - # Calculate weighted average position - avg_position = total_position / total_impressions if total_impressions > 0 else 0 - avg_ctr = (total_clicks / total_impressions * 100) if total_impressions > 0 else 0 - - # Use query_rows for top queries list - top_queries_source = query_rows - - else: - # Legacy structure - rows = search_analytics.get('rows', []) - # ... existing legacy logic ... - calc_rows = rows - top_queries_source = rows - - total_clicks = 0 - total_impressions = 0 - total_position = 0 - valid_position_rows = 0 - - for row in calc_rows: - clicks = row.get('clicks', 0) - impressions = row.get('impressions', 0) - position = row.get('position', 0) - - total_clicks += clicks - total_impressions += impressions - - if position and position > 0: - # Simple average for legacy/unknown structure if we can't do weighted - total_position += position - valid_position_rows += 1 - - avg_ctr = (total_clicks / total_impressions * 100) if total_impressions > 0 else 0 - avg_position = total_position / valid_position_rows if valid_position_rows > 0 else 0 + status = search_analytics.get('status', 'success') + overall_rows = search_analytics.get('overall_metrics', {}).get('rows', []) + query_rows = search_analytics.get('query_data', {}).get('rows', []) + + # Calculate totals from overall_rows (most accurate as it includes anonymized queries) + total_clicks = 0 + total_impressions = 0 + total_position = 0 + + # Use overall_rows for totals if available, otherwise fallback to query_rows + calc_rows = overall_rows if overall_rows else query_rows + + for row in calc_rows: + clicks = row.get('clicks', 0) + impressions = row.get('impressions', 0) + position = row.get('position', 0) + + total_clicks += clicks + total_impressions += impressions + + if position and position > 0: + total_position += position * impressions # Weighted average + + # Calculate weighted average position + avg_position = total_position / total_impressions if total_impressions > 0 else 0 + avg_ctr = (total_clicks / total_impressions * 100) if total_impressions > 0 else 0 + + # Use query_rows for top queries list + top_queries_source = query_rows # Get top performing queries @@ -373,7 +342,7 @@ def _process_gsc_metrics(self, search_analytics: Dict[str, Any]) -> Dict[str, An logger.warning(f"Failed computing cannibalization: {e}") return { - 'connection_status': 'connected', + 'connection_status': 'connected' if status in ('success', 'partial_success', 'no_data') else 'error', 'connected_sites': 1, 'total_clicks': total_clicks, 'total_impressions': total_impressions, diff --git a/backend/services/gsc_service.py b/backend/services/gsc_service.py index 95dfabde..c9c85966 100644 --- a/backend/services/gsc_service.py +++ b/backend/services/gsc_service.py @@ -396,6 +396,68 @@ def get_site_list(self, user_id: str) -> List[Dict[str, Any]]: def get_search_analytics(self, user_id: str, site_url: str, start_date: str = None, end_date: str = None) -> Dict[str, Any]: """Get search analytics data from GSC.""" + def _base_payload(status: str, error: Optional[str] = None) -> Dict[str, Any]: + payload = { + 'overall_metrics': {'rows': [], 'rowCount': 0}, + 'query_data': {'rows': [], 'rowCount': 0}, + 'page_data': {'rows': [], 'rowCount': 0}, + 'query_page_data': {'rows': [], 'rowCount': 0}, + 'verification_data': {'rows': [], 'rowCount': 0}, + 'date_range': { + 'start': start_date, + 'end': end_date + }, + 'site_url': site_url, + # Backward-compatible aliases for existing consumers + 'startDate': start_date, + 'endDate': end_date, + 'siteUrl': site_url, + 'status': status + } + if error: + payload['error'] = error + return payload + + def _normalize_payload(payload: Dict[str, Any]) -> Dict[str, Any]: + """Normalize cached/legacy payloads into the unified contract.""" + if not isinstance(payload, dict): + return _base_payload(status='error', error='Invalid cached analytics payload') + + date_range = payload.get('date_range') if isinstance(payload.get('date_range'), dict) else { + 'start': payload.get('startDate', start_date), + 'end': payload.get('endDate', end_date) + } + normalized = { + 'overall_metrics': payload.get('overall_metrics') if isinstance(payload.get('overall_metrics'), dict) else {'rows': payload.get('rows', []), 'rowCount': payload.get('rowCount', 0)}, + 'query_data': payload.get('query_data') if isinstance(payload.get('query_data'), dict) else {'rows': [], 'rowCount': 0}, + 'page_data': payload.get('page_data') if isinstance(payload.get('page_data'), dict) else {'rows': [], 'rowCount': 0}, + 'query_page_data': payload.get('query_page_data') if isinstance(payload.get('query_page_data'), dict) else {'rows': [], 'rowCount': 0}, + 'verification_data': payload.get('verification_data') if isinstance(payload.get('verification_data'), dict) else {'rows': [], 'rowCount': 0}, + 'date_range': { + 'start': date_range.get('start', start_date), + 'end': date_range.get('end', end_date) + }, + 'site_url': payload.get('site_url') or payload.get('siteUrl') or site_url, + 'status': payload.get('status') or ('error' if payload.get('error') else 'success') + } + + # Keep aliases for compatibility + normalized['startDate'] = normalized['date_range']['start'] + normalized['endDate'] = normalized['date_range']['end'] + normalized['siteUrl'] = normalized['site_url'] + + if payload.get('error'): + normalized['error'] = payload.get('error') + if payload.get('warning'): + normalized['warning'] = payload.get('warning') + + for section in ('overall_metrics', 'query_data', 'page_data', 'query_page_data', 'verification_data'): + rows = normalized[section].get('rows', []) + normalized[section]['rows'] = rows if isinstance(rows, list) else [] + normalized[section]['rowCount'] = normalized[section].get('rowCount', len(normalized[section]['rows'])) + + return normalized + try: # Set default date range (last 30 days) if not end_date: @@ -407,21 +469,18 @@ def get_search_analytics(self, user_id: str, site_url: str, cache_key = f"{user_id}_{site_url}_{start_date}_{end_date}" cached_data = self._get_cached_data(user_id, site_url, 'analytics', cache_key) if cached_data and isinstance(cached_data, dict): - has_pages = 'page_data' in cached_data and isinstance(cached_data.get('page_data'), dict) - has_queries = 'query_data' in cached_data and isinstance(cached_data.get('query_data'), dict) - if has_pages and has_queries: - logger.info(f"Returning cached analytics data for user: {user_id} (includes page_data)") - return cached_data + logger.info(f"Returning cached analytics data for user: {user_id}") + return _normalize_payload(cached_data) try: service = self.get_authenticated_service(user_id) except ValueError: logger.warning(f"User {user_id} not connected to GSC. Returning empty analytics.") - return {'error': 'User not connected to GSC', 'rows': [], 'rowCount': 0} + return _base_payload(status='not_connected', error='User not connected to GSC') if not service: logger.error(f"Failed to get authenticated GSC service for user: {user_id}") - return {'error': 'Authentication failed', 'rows': [], 'rowCount': 0} + return _base_payload(status='auth_failed', error='Authentication failed') # Step 1: Verify data presence first (as per GSC API documentation) verification_request = { @@ -444,13 +503,13 @@ def get_search_analytics(self, user_id: str, site_url: str, verification_rows = verification_response.get('rows', []) if not verification_rows: logger.warning(f"No GSC data available for user {user_id} in date range {start_date} to {end_date}") - return {'error': 'No data available for this date range', 'rows': [], 'rowCount': 0} + return _base_payload(status='no_data', error='No data available for this date range') logger.info(f"GSC Data verification successful - found {len(verification_rows)} days with data") except Exception as verification_error: logger.error(f"GSC Data verification failed for user {user_id}: {verification_error}") - return {'error': f'Data verification failed: {str(verification_error)}', 'rows': [], 'rowCount': 0} + return _base_payload(status='verification_failed', error=f'Data verification failed: {str(verification_error)}') # Step 2: Get daily metrics for charting (ensure we have rows) request = { @@ -471,7 +530,7 @@ def get_search_analytics(self, user_id: str, site_url: str, logger.info(f"GSC API response for user {user_id}: {response}") except Exception as api_error: logger.error(f"GSC API call failed for user {user_id}: {api_error}") - return {'error': str(api_error), 'rows': [], 'rowCount': 0} + return _base_payload(status='api_error', error=str(api_error)) # Step 3: Get query-level data for insights (as per documentation) query_request = { @@ -561,13 +620,19 @@ def get_search_analytics(self, user_id: str, site_url: str, }, 'startDate': start_date, 'endDate': end_date, - 'siteUrl': site_url + 'siteUrl': site_url, + 'date_range': { + 'start': start_date, + 'end': end_date + }, + 'site_url': site_url, + 'status': 'success' } self._cache_data(user_id, site_url, 'analytics', analytics_data, cache_key) logger.info(f"Retrieved comprehensive analytics data for user: {user_id}, site: {site_url}") - return analytics_data + return _normalize_payload(analytics_data) except Exception as query_error: logger.error(f"GSC Query-level request failed for user {user_id}: {query_error}") @@ -587,15 +652,21 @@ def get_search_analytics(self, user_id: str, site_url: str, 'startDate': start_date, 'endDate': end_date, 'siteUrl': site_url, + 'date_range': { + 'start': start_date, + 'end': end_date + }, + 'site_url': site_url, + 'status': 'partial_success', 'warning': f'Query-level data unavailable: {str(query_error)}' } self._cache_data(user_id, site_url, 'analytics', analytics_data, cache_key) - return analytics_data + return _normalize_payload(analytics_data) except Exception as e: logger.error(f"Error getting search analytics for user {user_id}: {e}") - raise + return _base_payload(status='error', error=str(e)) def get_sitemaps(self, user_id: str, site_url: str) -> List[Dict[str, Any]]: """Get sitemaps from GSC.""" diff --git a/backend/services/scheduler/executors/gsc_insights_executor.py b/backend/services/scheduler/executors/gsc_insights_executor.py index 3ae1e875..9f89c3f0 100644 --- a/backend/services/scheduler/executors/gsc_insights_executor.py +++ b/backend/services/scheduler/executors/gsc_insights_executor.py @@ -299,20 +299,20 @@ async def _fetch_fresh_data(self, user_id: str, site_url: Optional[str]) -> Task end_date=end_date ) - if 'error' in search_analytics: + if search_analytics.get('status') not in ('success', 'partial_success'): return TaskExecutionResult( success=False, - error_message=search_analytics.get('error', 'Unknown error'), + error_message=search_analytics.get('error', f"GSC analytics unavailable: {search_analytics.get('status', 'unknown')}"), result_data=search_analytics ) # Format insights data insights_data = { - 'site_url': site_url, - 'date_range': { + 'site_url': search_analytics.get('site_url', site_url), + 'date_range': search_analytics.get('date_range', { 'start': start_date, 'end': end_date - }, + }), 'overall_metrics': search_analytics.get('overall_metrics', {}), 'query_data': search_analytics.get('query_data', {}), 'fetched_at': datetime.utcnow().isoformat()