-
Notifications
You must be signed in to change notification settings - Fork 9
Feat/hierarchical workflow tracing #73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
serglom21
wants to merge
6
commits into
getsentry:main
Choose a base branch
from
serglom21:feat/hierarchical-workflow-tracing
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 4 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
a1404f9
feat: implement hierarchical workflow tracing with single transaction…
serglom21 6c14dc2
docs: add PR description and update local development guide
serglom21 2f936e6
refactor: remove markdown docs and revert unnecessary Sentry disabling
serglom21 b844639
docs: remove LOCAL_DEVELOPMENT.md and TESTING_SUMMARY.md
serglom21 48a9a69
feat: implement smart workflow completion detection
serglom21 0d4b2ee
fix: add exception handling to prevent resource leaks in timer callbacks
serglom21 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,173 @@ | ||
| """ | ||
| Enhanced web app handler that creates workflow-level transactions | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import base64 | ||
| import hmac | ||
| import logging | ||
| import os | ||
| import time | ||
| from typing import NamedTuple, Dict, List, Any | ||
| from collections import defaultdict | ||
|
|
||
| from github_app import GithubAppToken | ||
| from github_sdk import GithubClient | ||
| from workflow_tracer import WorkflowTracer | ||
| from sentry_config import fetch_dsn_for_github_org | ||
|
|
||
| LOGGING_LEVEL = os.environ.get("LOGGING_LEVEL", logging.INFO) | ||
| logger = logging.getLogger(__name__) | ||
| logger.setLevel(LOGGING_LEVEL) | ||
|
|
||
|
|
||
| class WorkflowJobCollector: | ||
| """Collects jobs from a workflow run and sends workflow-level transactions""" | ||
|
|
||
| def __init__(self, dsn: str, token: str, dry_run: bool = False): | ||
| self.dsn = dsn | ||
| self.token = token | ||
| self.dry_run = dry_run | ||
| self.workflow_jobs = defaultdict(list) # run_id -> list of jobs | ||
| self.workflow_tracer = WorkflowTracer(token, dsn, dry_run) | ||
| self.processed_jobs = set() # Track processed job IDs to avoid duplicates | ||
|
|
||
| def add_job(self, job_data: Dict[str, Any]): | ||
| """Add a job to the collector""" | ||
| job = job_data["workflow_job"] | ||
| run_id = job["run_id"] | ||
| job_id = job["id"] | ||
|
|
||
| # Skip if we've already processed this job | ||
| if job_id in self.processed_jobs: | ||
| return | ||
|
|
||
| self.processed_jobs.add(job_id) | ||
| self.workflow_jobs[run_id].append(job) | ||
|
|
||
| logger.info(f"Added job {job['name']} (ID: {job_id}) to workflow run {run_id}") | ||
|
|
||
| # Check if this is the last job in the workflow | ||
| if self._is_workflow_complete(run_id, job): | ||
| self._send_workflow_trace(run_id) | ||
|
|
||
| def _is_workflow_complete(self, run_id: int, current_job: Dict[str, Any]) -> bool: | ||
| """Check if all jobs in the workflow are complete""" | ||
| jobs = self.workflow_jobs[run_id] | ||
|
|
||
| # If we have jobs, check if they're all completed | ||
| if jobs: | ||
| all_completed = all(job.get("conclusion") is not None for job in jobs) | ||
| if all_completed: | ||
| logger.info(f"Workflow run {run_id} appears complete with {len(jobs)} jobs") | ||
| return True | ||
|
|
||
| return False | ||
|
|
||
| def _send_workflow_trace(self, run_id: int): | ||
| """Send workflow-level trace for all jobs in the run""" | ||
| jobs = self.workflow_jobs[run_id] | ||
|
|
||
| if not jobs: | ||
| return | ||
|
|
||
| logger.info(f"Sending workflow trace for run {run_id} with {len(jobs)} jobs") | ||
|
|
||
| try: | ||
| # Use the first job as the base for workflow metadata | ||
| base_job = jobs[0] | ||
|
|
||
| # Send workflow trace | ||
| self.workflow_tracer.send_workflow_trace(base_job, jobs) | ||
|
|
||
| # Clean up processed jobs | ||
| del self.workflow_jobs[run_id] | ||
|
|
||
| logger.info(f"Successfully sent workflow trace for run {run_id}") | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to send workflow trace for run {run_id}: {e}") | ||
| # Fall back to individual job traces | ||
| self._send_individual_traces(jobs) | ||
|
|
||
| def _send_individual_traces(self, jobs: List[Dict[str, Any]]): | ||
| """DISABLED: Individual job traces are now handled by WorkflowTracer""" | ||
| logger.info(f"DISABLED: Individual traces for {len(jobs)} jobs - now handled by WorkflowTracer") | ||
| return | ||
|
|
||
|
|
||
| class EnhancedWebAppHandler: | ||
| """Enhanced web app handler with workflow-level tracing""" | ||
|
|
||
| def __init__(self, dry_run=False): | ||
| self.config = init_config() | ||
| self.dry_run = dry_run | ||
| self.job_collectors = {} # org -> WorkflowJobCollector | ||
|
|
||
| def _get_job_collector(self, org: str, token: str, dsn: str) -> WorkflowJobCollector: | ||
| """Get or create a job collector for the organization""" | ||
| if org not in self.job_collectors: | ||
| self.job_collectors[org] = WorkflowJobCollector(dsn, token, self.dry_run) | ||
| return self.job_collectors[org] | ||
|
|
||
| def handle_event(self, data, headers): | ||
| """Handle GitHub webhook events""" | ||
| # We return 200 to make webhook not turn red since everything got processed well | ||
| http_code = 200 | ||
| reason = "OK" | ||
|
|
||
| if headers["X-GitHub-Event"] != "workflow_job": | ||
| reason = "Event not supported." | ||
| elif data["action"] != "completed": | ||
| reason = "We cannot do anything with this workflow state." | ||
| else: | ||
| # For now, this simplifies testing | ||
| if self.dry_run: | ||
| return reason, http_code | ||
|
|
||
| installation_id = data["installation"]["id"] | ||
| org = data["repository"]["owner"]["login"] | ||
|
|
||
| # We are executing in Github App mode | ||
| if self.config.gh_app: | ||
| with GithubAppToken(**self.config.gh_app._asdict()).get_token( | ||
| installation_id | ||
| ) as token: | ||
| # Once the Sentry org has a .sentry repo we can remove the DSN from the deployment | ||
| dsn = fetch_dsn_for_github_org(org, token) | ||
|
|
||
| # Get job collector for this org | ||
| collector = self._get_job_collector(org, token, dsn) | ||
|
|
||
| # Add job to collector (will send workflow trace when complete) | ||
| collector.add_job(data) | ||
|
|
||
| else: | ||
| # Once the Sentry org has a .sentry repo we can remove the DSN from the deployment | ||
| dsn = fetch_dsn_for_github_org(org, token) | ||
|
|
||
| # Get job collector for this org | ||
| collector = self._get_job_collector(org, self.config.gh.token, dsn) | ||
|
|
||
| # Add job to collector (will send workflow trace when complete) | ||
| collector.add_job(data) | ||
|
|
||
| return reason, http_code | ||
|
|
||
| def valid_signature(self, body, headers): | ||
| """Validate webhook signature""" | ||
| if not self.config.gh.webhook_secret: | ||
| return True | ||
| else: | ||
| signature = headers["X-Hub-Signature-256"].replace("sha256=", "") | ||
| body_signature = hmac.new( | ||
| self.config.gh.webhook_secret.encode(), | ||
| msg=body, | ||
| digestmod="sha256", | ||
| ).hexdigest() | ||
| return hmac.compare_digest(body_signature, signature) | ||
|
|
||
|
|
||
| # Import the config initialization from the original handler | ||
| from web_app_handler import init_config | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential bug: An unhandled exception in the
_process_workflow_immediatelytimer callback will cause the thread to fail silently, preventing cleanup and leading to a resource leak.Description: The
_process_workflow_immediatelymethod, executed by athreading.Timer, lacks exception handling. If an exception occurs within this method or in methods it calls like_send_workflow_trace(e.g., due to network issues or malformed data, as seen in Sentry issue 6767327085), the timer thread will terminate silently. This prevents the execution of critical cleanup logic in thefinallyblock, such as removing the workflow's data fromself.workflow_jobsandself.workflow_timers, and adding therun_idtoself.processed_workflows. This results in a permanent resource leak and leaves the workflow in an inconsistent state.Suggested fix: Wrap the contents of the
_process_workflow_immediatelymethod in atry...exceptblock. In theexceptblock, log the exception to ensure failures are not silent. This will allow the program to handle errors gracefully without causing resource leaks or state corruption.severity: 0.7, confidence: 0.9
Did we get this right? 👍 / 👎 to inform future reviews.