diff --git a/backend/services/scheduler/executors/onboarding_full_website_analysis_executor.py b/backend/services/scheduler/executors/onboarding_full_website_analysis_executor.py index 15efc4e7..2c48f6f1 100644 --- a/backend/services/scheduler/executors/onboarding_full_website_analysis_executor.py +++ b/backend/services/scheduler/executors/onboarding_full_website_analysis_executor.py @@ -1,5 +1,6 @@ import asyncio import time +from collections import Counter from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Set from urllib.parse import urljoin, urlparse @@ -32,6 +33,7 @@ def __init__(self): self.logger = logger.bind(component="OnboardingFullWebsiteAnalysisExecutor") self.max_urls_default = 500 + self.persist_batch_size_default = 50 self.http_timeout_seconds = 25 self.http_concurrency = 10 @@ -71,13 +73,23 @@ async def execute_task(self, task: Any, db: Session) -> TaskExecutionResult: payload = task.payload or {} max_urls = int(payload.get('max_urls') or self.max_urls_default) + persist_batch_size = int(payload.get('persist_batch_size') or self.persist_batch_size_default) try: urls = await self._discover_urls(website_url, max_urls=max_urls) if not urls: raise ValueError("No URLs discovered for full-site analysis") - results = await self._audit_urls(user_id, website_url, urls, db) + results = await self._audit_urls( + user_id=user_id, + website_url=website_url, + urls=urls, + db=db, + task=task, + task_log=task_log, + start_time=start_time, + persist_batch_size=max(1, persist_batch_size) + ) task.last_executed = datetime.utcnow() task.last_success = datetime.utcnow() @@ -240,21 +252,87 @@ async def _fetch_text(self, url: str) -> Optional[str]: except Exception: return None - async def _audit_urls(self, user_id: str, website_url: str, urls: List[str], db: Session) -> Dict[str, Any]: + async def _audit_urls( + self, + user_id: str, + website_url: str, + urls: List[str], + db: Session, + task: OnboardingFullWebsiteAnalysisTask, + task_log: OnboardingFullWebsiteAnalysisExecutionLog, + start_time: float, + persist_batch_size: int, + ) -> Dict[str, Any]: timeout = aiohttp.ClientTimeout(total=self.http_timeout_seconds) connector = aiohttp.TCPConnector(limit=self.http_concurrency) semaphore = asyncio.Semaphore(self.http_concurrency) + pages_discovered = len(urls) + + existing_urls = { + row[0] + for row in db.query(SEOPageAudit.page_url).filter( + SEOPageAudit.user_id == user_id, + SEOPageAudit.page_url.in_(urls) + ).all() + } + + processed = 0 + success = 0 + failed = 0 + audit_batch: List[Dict[str, Any]] = [] + all_results: List[Dict[str, Any]] = [] + + self._update_progress(task, task_log, pages_discovered, processed, success, failed, start_time, db) async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session: async def audit_one(url: str) -> Dict[str, Any]: async with semaphore: - return await self._audit_single_url(user_id, website_url, url, session, db) - - audited = await asyncio.gather(*[audit_one(u) for u in urls], return_exceptions=True) - - successes = [r for r in audited if isinstance(r, dict) and r.get('success')] - failures = [r for r in audited if not (isinstance(r, dict) and r.get('success'))] + return await self._audit_single_url(user_id, website_url, url, session) + + tasks = [asyncio.create_task(audit_one(u)) for u in urls] + for completed_task in asyncio.as_completed(tasks): + try: + result = await completed_task + except Exception as e: + result = { + 'success': False, + 'page_url': 'unknown', + 'error': str(e), + 'failure_reason': 'audit_runtime_error', + 'audit_record': None, + } + + processed += 1 + if result.get('success'): + success += 1 + else: + failed += 1 + + all_results.append(result) + + if result.get('audit_record'): + audit_batch.append(result['audit_record']) + + should_flush_batch = len(audit_batch) >= persist_batch_size or processed == pages_discovered + if should_flush_batch and audit_batch: + self._bulk_upsert_page_audits(db=db, audit_records=audit_batch) + audit_batch = [] + + if processed % persist_batch_size == 0 or processed == pages_discovered: + self._update_progress( + task, + task_log, + pages_discovered, + processed, + success, + failed, + start_time, + db, + ) + + successes = [r for r in all_results if isinstance(r, dict) and r.get('success')] + failures = [r for r in all_results if not (isinstance(r, dict) and r.get('success'))] avg_score = round(sum(r['overall_score'] for r in successes) / len(successes)) if successes else 0 fix_scheduled = len([r for r in successes if r.get('status') == 'fix_scheduled']) @@ -264,14 +342,57 @@ async def audit_one(url: str) -> Dict[str, Any]: key=lambda x: x['overall_score'] )[:10] + fail_reason_counts = Counter([ + f.get('failure_reason') or 'unknown' for f in failures if isinstance(f, dict) + ]) + top_fail_reasons = [ + {'reason': reason, 'count': count} + for reason, count in fail_reason_counts.most_common(5) + ] + + rerun_expected_count = len(existing_urls.union(set(urls))) + persisted_count = db.query(SEOPageAudit).filter( + SEOPageAudit.user_id == user_id, + SEOPageAudit.page_url.in_(urls) + ).count() + + duration_ms = int((time.time() - start_time) * 1000) + success_rate = round((len(successes) / pages_discovered) * 100, 2) if pages_discovered else 0.0 + return { 'website_url': website_url, - 'pages_discovered': len(urls), + 'pages_discovered': pages_discovered, 'pages_audited': len(successes), 'pages_failed': len(failures), 'avg_score': avg_score, 'fix_scheduled_pages': fix_scheduled, 'worst_pages': worst_pages, + 'progress': { + 'pages_discovered': pages_discovered, + 'processed': processed, + 'success': len(successes), + 'failed': len(failures), + 'elapsed_ms': duration_ms, + }, + 'failure_details': [ + { + 'page_url': f.get('page_url'), + 'error': f.get('error'), + 'failure_reason': f.get('failure_reason') + } + for f in failures if isinstance(f, dict) + ], + 'execution_summary': { + 'success_rate': success_rate, + 'top_fail_reasons': top_fail_reasons, + 'duration_ms': duration_ms, + }, + 'idempotency_check': { + 'existing_rows_for_run_urls': len(existing_urls), + 'expected_rows_after_upsert': rerun_expected_count, + 'actual_rows_after_upsert': persisted_count, + 'duplicate_growth': max(0, persisted_count - rerun_expected_count), + } } async def _audit_single_url( @@ -280,7 +401,6 @@ async def _audit_single_url( website_url: str, page_url: str, session: aiohttp.ClientSession, - db: Session ) -> Dict[str, Any]: fetch_start = time.time() try: @@ -290,30 +410,38 @@ async def _audit_single_url( text = await resp.text(errors="ignore") headers = dict(resp.headers) except Exception as e: - self._upsert_page_audit( - db=db, - user_id=user_id, - website_url=website_url, - page_url=page_url, - overall_score=0, - status='error', - audit_data={'error': str(e)} - ) - return {'success': False, 'page_url': page_url, 'error': str(e)} + return { + 'success': False, + 'page_url': page_url, + 'error': str(e), + 'failure_reason': 'request_exception', + 'audit_record': self._build_audit_record( + user_id=user_id, + website_url=website_url, + page_url=page_url, + overall_score=0, + status='error', + audit_data={'error': str(e)}, + ) + } load_time = time.time() - fetch_start if status >= 400 or "text/html" not in content_type.lower(): - self._upsert_page_audit( - db=db, - user_id=user_id, - website_url=website_url, - page_url=page_url, - overall_score=0, - status='error', - audit_data={'http_status': status, 'content_type': content_type} - ) - return {'success': False, 'page_url': page_url, 'error': f'HTTP {status} / {content_type}'} + return { + 'success': False, + 'page_url': page_url, + 'error': f'HTTP {status} / {content_type}', + 'failure_reason': 'non_html_or_http_error', + 'audit_record': self._build_audit_record( + user_id=user_id, + website_url=website_url, + page_url=page_url, + overall_score=0, + status='error', + audit_data={'http_status': status, 'content_type': content_type}, + ) + } soup = BeautifulSoup(text, 'html.parser') @@ -363,25 +491,23 @@ async def _audit_single_url( warnings = self._collect_findings(audit_data, key='warnings') recommendations = self._collect_findings(audit_data, key='recommendations') - self._upsert_page_audit( - db=db, - user_id=user_id, - website_url=website_url, - page_url=page_url, - overall_score=overall_score, - status=page_status, - category_scores=category_scores, - issues=issues, - warnings=warnings, - recommendations=recommendations, - audit_data=audit_data - ) - return { 'success': True, 'page_url': page_url, 'overall_score': overall_score, - 'status': page_status + 'status': page_status, + 'audit_record': self._build_audit_record( + user_id=user_id, + website_url=website_url, + page_url=page_url, + overall_score=overall_score, + status=page_status, + category_scores=category_scores, + issues=issues, + warnings=warnings, + recommendations=recommendations, + audit_data=audit_data, + ) } def _weighted_score(self, category_scores: Dict[str, int]) -> int: @@ -517,9 +643,8 @@ def _security_from_headers(self, headers: Dict[str, str]) -> Dict[str, Any]: 'recommendations': recommendations } - def _upsert_page_audit( + def _build_audit_record( self, - db: Session, user_id: str, website_url: str, page_url: str, @@ -530,38 +655,80 @@ def _upsert_page_audit( warnings: Optional[List[Dict[str, Any]]] = None, recommendations: Optional[List[Dict[str, Any]]] = None, audit_data: Optional[Dict[str, Any]] = None, - ) -> None: - existing = db.query(SEOPageAudit).filter( + ) -> Dict[str, Any]: + return { + 'user_id': user_id, + 'website_url': website_url, + 'page_url': page_url, + 'overall_score': overall_score, + 'status': status, + 'category_scores': category_scores, + 'issues': issues, + 'warnings': warnings, + 'recommendations': recommendations, + 'audit_data': audit_data, + 'last_analyzed_at': datetime.utcnow(), + } + + def _bulk_upsert_page_audits(self, db: Session, audit_records: List[Dict[str, Any]]) -> None: + if not audit_records: + return + + page_urls = [record['page_url'] for record in audit_records] + user_id = audit_records[0]['user_id'] + existing_rows = db.query(SEOPageAudit).filter( SEOPageAudit.user_id == user_id, - SEOPageAudit.page_url == page_url - ).first() - - if existing: - existing.website_url = website_url - existing.overall_score = overall_score - existing.status = status - existing.category_scores = category_scores - existing.issues = issues - existing.warnings = warnings - existing.recommendations = recommendations - existing.audit_data = audit_data - existing.last_analyzed_at = datetime.utcnow() - db.add(existing) - else: - db.add(SEOPageAudit( - user_id=user_id, - website_url=website_url, - page_url=page_url, - overall_score=overall_score, - status=status, - category_scores=category_scores, - issues=issues, - warnings=warnings, - recommendations=recommendations, - audit_data=audit_data, - last_analyzed_at=datetime.utcnow() - )) + SEOPageAudit.page_url.in_(page_urls) + ).all() + existing_by_page_url = {row.page_url: row for row in existing_rows} + + for record in audit_records: + existing = existing_by_page_url.get(record['page_url']) + if existing: + existing.website_url = record['website_url'] + existing.overall_score = record['overall_score'] + existing.status = record['status'] + existing.category_scores = record['category_scores'] + existing.issues = record['issues'] + existing.warnings = record['warnings'] + existing.recommendations = record['recommendations'] + existing.audit_data = record['audit_data'] + existing.last_analyzed_at = record['last_analyzed_at'] + db.add(existing) + continue + + db.add(SEOPageAudit(**record)) + + db.commit() + def _update_progress( + self, + task: OnboardingFullWebsiteAnalysisTask, + task_log: OnboardingFullWebsiteAnalysisExecutionLog, + pages_discovered: int, + processed: int, + success: int, + failed: int, + start_time: float, + db: Session, + ) -> None: + progress = { + 'pages_discovered': pages_discovered, + 'processed': processed, + 'success': success, + 'failed': failed, + 'elapsed_ms': int((time.time() - start_time) * 1000), + } + payload = dict(task.payload or {}) + payload['progress'] = progress + task.payload = payload + + result_data = dict(task_log.result_data or {}) + result_data['progress'] = progress + task_log.result_data = result_data + + db.add(task) + db.add(task_log) db.commit() def _normalize_url(self, url: str) -> str: @@ -581,4 +748,3 @@ def _same_site(self, root: str, url: str) -> bool: return a.netloc == b.netloc except Exception: return False -