Skip to content

Commit 1a949de

Browse files
committed
refactor: extract WorkflowJobCollector module with feature flag support
Move WorkflowJobCollector from web_app_handler.py into its own module with improved documentation, error handling, and feature flag support for hierarchical tracing. Add comprehensive test coverage.
1 parent 0d4b2ee commit 1a949de

File tree

3 files changed

+942
-219
lines changed

3 files changed

+942
-219
lines changed

src/web_app_handler.py

Lines changed: 60 additions & 219 deletions
Original file line numberDiff line numberDiff line change
@@ -4,243 +4,81 @@
44
import hmac
55
import logging
66
import os
7-
import time
8-
import threading
9-
from typing import NamedTuple, Dict, List, Any
10-
from collections import defaultdict
7+
from typing import NamedTuple
118

129
from .github_app import GithubAppToken
1310
from .github_sdk import GithubClient
14-
from .workflow_tracer import WorkflowTracer
1511
from .sentry_config import fetch_dsn_for_github_org
12+
from .workflow_job_collector import WorkflowJobCollector
1613

1714
LOGGING_LEVEL = os.environ.get("LOGGING_LEVEL", logging.INFO)
1815
logger = logging.getLogger(__name__)
1916
logger.setLevel(LOGGING_LEVEL)
2017

2118

22-
class WorkflowJobCollector:
23-
"""Collects jobs from a workflow run and sends workflow-level transactions"""
24-
25-
def __init__(self, dsn: str, token: str, dry_run: bool = False):
26-
self.dsn = dsn
27-
self.token = token
28-
self.dry_run = dry_run
29-
self.workflow_jobs = defaultdict(list) # run_id -> list of jobs
30-
self.workflow_tracer = WorkflowTracer(token, dsn, dry_run)
31-
self.processed_jobs = set() # Track processed job IDs to avoid duplicates
32-
self.workflow_timers = {} # run_id -> timer for delayed processing
33-
self.processed_workflows = set() # Track processed workflow runs to avoid duplicates
34-
self.job_arrival_times = defaultdict(list) # run_id -> list of arrival timestamps
35-
self._lock = threading.Lock() # Thread lock for preventing race conditions
36-
37-
def add_job(self, job_data: Dict[str, Any]):
38-
"""Add a job to the collector"""
39-
job = job_data["workflow_job"]
40-
run_id = job["run_id"]
41-
job_id = job["id"]
42-
43-
with self._lock:
44-
# Skip if we've already processed this job
45-
if job_id in self.processed_jobs:
46-
return
47-
48-
self.processed_jobs.add(job_id)
49-
self.workflow_jobs[run_id].append(job)
50-
51-
# Track job arrival time for smart detection
52-
self.job_arrival_times[run_id].append(time.time())
53-
54-
logger.info(f"Added job {job['name']} (ID: {job_id}) to workflow run {run_id}")
55-
56-
# Smart workflow completion detection
57-
jobs_count = len(self.workflow_jobs[run_id])
58-
if run_id not in self.processed_workflows:
59-
if self._should_process_workflow(run_id, jobs_count):
60-
logger.info(f"Workflow run {run_id} has {jobs_count} jobs, setting timer to process in 2 seconds")
61-
# Set a short timer to allow all jobs to arrive
62-
timer = threading.Timer(2.0, self._process_workflow_immediately, args=[run_id])
63-
self.workflow_timers[run_id] = timer
64-
timer.start()
65-
else:
66-
logger.info(f"Workflow run {run_id} has {jobs_count} jobs, waiting for more")
67-
68-
def _process_workflow_immediately(self, run_id: int):
69-
"""Process workflow immediately when we have enough jobs"""
70-
try:
71-
with self._lock:
72-
# Skip if already processed
73-
if run_id in self.processed_workflows:
74-
logger.info(f"Workflow run {run_id} already processed, skipping")
75-
return
76-
77-
jobs = self.workflow_jobs[run_id]
78-
79-
if not jobs:
80-
logger.warning(f"No jobs found for workflow run {run_id}")
81-
return
82-
83-
logger.info(f"Processing workflow run {run_id} immediately with {len(jobs)} jobs")
84-
85-
# Check if all jobs are complete
86-
all_completed = all(job.get("conclusion") is not None for job in jobs)
87-
if all_completed:
88-
logger.info(f"All jobs complete for workflow run {run_id}, sending trace")
89-
self._send_workflow_trace(run_id)
90-
else:
91-
logger.info(f"Not all jobs complete for workflow run {run_id}, skipping")
92-
except Exception as e:
93-
logger.error(f"Error processing workflow run {run_id} immediately: {e}", exc_info=True)
94-
# Ensure cleanup happens even if there's an exception
95-
self._cleanup_workflow_run(run_id)
96-
97-
def _process_workflow_delayed(self, run_id: int):
98-
"""Process workflow after delay to allow all jobs to arrive"""
99-
with self._lock:
100-
# Skip if already processed
101-
if run_id in self.processed_workflows:
102-
logger.info(f"Workflow run {run_id} already processed, skipping")
103-
return
104-
105-
jobs = self.workflow_jobs[run_id]
106-
107-
if not jobs:
108-
logger.warning(f"No jobs found for workflow run {run_id}")
109-
return
110-
111-
logger.info(f"Processing delayed workflow run {run_id} with {len(jobs)} jobs")
112-
113-
# Check if all jobs are complete
114-
all_completed = all(job.get("conclusion") is not None for job in jobs)
115-
if all_completed:
116-
logger.info(f"All jobs complete for workflow run {run_id}, sending trace")
117-
self._send_workflow_trace(run_id)
118-
else:
119-
logger.info(f"Not all jobs complete for workflow run {run_id}, skipping")
120-
# Clean up timer if not all jobs are complete
121-
if run_id in self.workflow_timers:
122-
self.workflow_timers[run_id].cancel()
123-
del self.workflow_timers[run_id]
124-
125-
def _should_process_workflow(self, run_id: int, jobs_count: int) -> bool:
126-
"""Smart detection of when to process workflow based on job patterns and timing"""
127-
128-
jobs = self.workflow_jobs[run_id]
129-
arrival_times = self.job_arrival_times[run_id]
130-
131-
# All jobs must be completed
132-
all_completed = all(job.get("conclusion") is not None for job in jobs)
133-
if not all_completed:
134-
return False
135-
136-
# Smart thresholds based on job count patterns
137-
if jobs_count >= 10:
138-
# Large workflows (10+ jobs) - process immediately when all complete
139-
return True
140-
elif jobs_count >= 5:
141-
# Medium workflows (5-9 jobs) - process when all complete
142-
return True
143-
elif jobs_count >= 3:
144-
# Small workflows (3-4 jobs) - process when all complete
145-
return True
146-
elif jobs_count >= 1:
147-
# Single or few jobs - check if enough time has passed since last arrival
148-
if len(arrival_times) >= 1:
149-
time_since_last_job = time.time() - arrival_times[-1]
150-
# If no new jobs for 3 seconds, process what we have
151-
if time_since_last_job > 3.0:
152-
return True
153-
154-
# For single jobs, process immediately
155-
if jobs_count == 1:
156-
return True
157-
158-
return False
19+
class WebAppHandler:
20+
"""
21+
Handles GitHub webhook events for workflow job completion.
15922
160-
def _is_workflow_complete(self, run_id: int, current_job: Dict[str, Any]) -> bool:
161-
"""Check if all jobs in the workflow are complete (legacy method)"""
162-
jobs_count = len(self.workflow_jobs[run_id])
163-
return self._should_process_workflow(run_id, jobs_count)
23+
Supports both hierarchical workflow tracing (new) and individual job tracing (legacy).
24+
The mode is controlled by the ENABLE_HIERARCHICAL_TRACING environment variable.
25+
"""
16426

165-
def _send_workflow_trace(self, run_id: int):
166-
"""Send workflow-level trace for all jobs in the run"""
167-
# Check if already processed to prevent duplicates
168-
if run_id in self.processed_workflows:
169-
logger.warning(f"Workflow run {run_id} already processed, skipping to prevent duplicates")
170-
return
171-
172-
jobs = self.workflow_jobs[run_id]
173-
174-
if not jobs:
175-
logger.warning(f"No jobs found for workflow run {run_id}")
176-
return
177-
178-
logger.info(f"Sending workflow trace for run {run_id} with {len(jobs)} jobs")
179-
180-
try:
181-
# Use the first job as the base for workflow metadata
182-
base_job = jobs[0]
183-
184-
# Send workflow trace
185-
self.workflow_tracer.send_workflow_trace(base_job, jobs)
186-
187-
logger.info(f"Successfully sent workflow trace for run {run_id}")
188-
189-
except Exception as e:
190-
logger.error(f"Failed to send workflow trace for run {run_id}: {e}", exc_info=True)
191-
# DISABLED FALLBACK: Don't send individual traces to prevent duplicates
192-
logger.warning(f"Workflow trace failed, but NOT falling back to individual traces to prevent duplicates")
193-
finally:
194-
# Mark workflow as processed and clean up IMMEDIATELY
195-
self.processed_workflows.add(run_id)
196-
if run_id in self.workflow_jobs:
197-
del self.workflow_jobs[run_id]
198-
if run_id in self.workflow_timers:
199-
self.workflow_timers[run_id].cancel()
200-
del self.workflow_timers[run_id]
201-
if run_id in self.job_arrival_times:
202-
del self.job_arrival_times[run_id]
203-
204-
def _cleanup_workflow_run(self, run_id: int):
205-
"""Clean up workflow run data to prevent resource leaks"""
206-
try:
207-
with self._lock:
208-
# Mark as processed to prevent reprocessing
209-
self.processed_workflows.add(run_id)
210-
211-
# Clean up workflow data
212-
if run_id in self.workflow_jobs:
213-
del self.workflow_jobs[run_id]
214-
if run_id in self.workflow_timers:
215-
self.workflow_timers[run_id].cancel()
216-
del self.workflow_timers[run_id]
217-
if run_id in self.job_arrival_times:
218-
del self.job_arrival_times[run_id]
219-
220-
logger.info(f"Cleaned up workflow run {run_id} after exception")
221-
except Exception as cleanup_error:
222-
logger.error(f"Error during cleanup of workflow run {run_id}: {cleanup_error}", exc_info=True)
223-
224-
def _send_individual_traces(self, jobs: List[Dict[str, Any]]):
225-
"""DISABLED: Individual job traces are now handled by WorkflowTracer"""
226-
logger.info(f"DISABLED: Individual traces for {len(jobs)} jobs - now handled by WorkflowTracer")
227-
return
228-
229-
230-
class WebAppHandler:
23127
def __init__(self, dry_run=False):
28+
"""
29+
Initialize the WebAppHandler.
30+
31+
Args:
32+
dry_run: If True, simulates operations without sending traces
33+
"""
23234
self.config = init_config()
23335
self.dry_run = dry_run
23436
self.job_collectors = {} # org -> WorkflowJobCollector
23537

23638
def _get_job_collector(self, org: str, token: str, dsn: str) -> WorkflowJobCollector:
237-
"""Get or create a job collector for the organization"""
39+
"""
40+
Get or create a job collector for the organization.
41+
42+
Args:
43+
org: GitHub organization name
44+
token: GitHub API token
45+
dsn: Sentry DSN for trace submission
46+
47+
Returns:
48+
WorkflowJobCollector instance for the organization
49+
"""
23850
if org not in self.job_collectors:
23951
self.job_collectors[org] = WorkflowJobCollector(dsn, token, self.dry_run)
24052
return self.job_collectors[org]
24153

54+
def _send_legacy_trace(self, data: dict, org: str, token: str, dsn: str) -> None:
55+
"""
56+
Send individual job trace (legacy behavior).
57+
58+
Args:
59+
data: GitHub webhook job payload
60+
org: GitHub organization name
61+
token: GitHub API token
62+
dsn: Sentry DSN for trace submission
63+
"""
64+
logger.info(f"Using legacy individual job tracing for org '{org}'")
65+
github_client = GithubClient(token, dsn, self.dry_run)
66+
github_client.send_trace(data)
67+
24268
def handle_event(self, data, headers):
243-
# We return 200 to make webhook not turn red since everything got processed well
69+
"""
70+
Handle GitHub webhook events.
71+
72+
Supports both hierarchical workflow tracing (new) and individual job tracing (legacy).
73+
The mode is determined by feature flags and organization settings.
74+
75+
Args:
76+
data: GitHub webhook payload
77+
headers: HTTP headers from the webhook request
78+
79+
Returns:
80+
Tuple of (reason, http_code)
81+
"""
24482
http_code = 200
24583
reason = "OK"
24684

@@ -249,7 +87,6 @@ def handle_event(self, data, headers):
24987
elif data["action"] != "completed":
25088
reason = "We cannot do anything with this workflow state."
25189
else:
252-
# For now, this simplifies testing
25390
if self.dry_run:
25491
return reason, http_code
25592

@@ -264,14 +101,18 @@ def handle_event(self, data, headers):
264101
http_code = 500
265102
else:
266103
# For webhook testing, we'll use a mock token and avoid GitHub API calls
267-
# The workflow tracer will extract data from the job payload instead
268104
token = "webhook_testing_token"
269105

270-
# Get job collector for this org
106+
# Get job collector and check if hierarchical tracing is enabled
271107
collector = self._get_job_collector(org, token, dsn)
272108

273-
# Add job to collector (will send workflow trace when complete)
274-
collector.add_job(data)
109+
if collector.is_hierarchical_tracing_enabled(org):
110+
# Use new hierarchical workflow tracing
111+
logger.debug(f"Using hierarchical workflow tracing for org '{org}'")
112+
collector.add_job(data)
113+
else:
114+
# Fall back to legacy individual job tracing
115+
self._send_legacy_trace(data, org, token, dsn)
275116

276117
return reason, http_code
277118

0 commit comments

Comments
 (0)