Skip to content

Make pull request assignment loading more robust #1918

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ postgres-native-tls = "0.5.0"
native-tls = "0.2"
x509-cert = { version = "0.2.5", features = ["pem"] }
serde_path_to_error = "0.1.2"
octocrab = "0.30.1"
octocrab = { version = "0.30.1", features = ["stream"] }
comrak = { version = "0.8.2", default-features = false }
route-recognizer = "0.3.0"
cynic = "3.2.2"
Expand Down
26 changes: 0 additions & 26 deletions github-graphql/PullRequestsOpen.gql

This file was deleted.

41 changes: 0 additions & 41 deletions github-graphql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,44 +387,3 @@ pub mod project_items {
pub date: Option<Date>,
}
}

/// Retrieve all pull requests waiting on review from T-compiler
/// GraphQL query: see file github-graphql/PullRequestsOpen.gql
pub mod pull_requests_open {
use crate::queries::{LabelConnection, PullRequestConnection, UserConnection};

use super::queries::DateTime;
use super::schema;

#[derive(cynic::QueryVariables, Clone, Debug)]
pub struct PullRequestsOpenVariables<'a> {
pub repo_owner: &'a str,
pub repo_name: &'a str,
pub after: Option<String>,
}

#[derive(cynic::QueryFragment, Debug)]
#[cynic(graphql_type = "Query", variables = "PullRequestsOpenVariables")]
pub struct PullRequestsOpen {
#[arguments(owner: $repo_owner, name: $repo_name)]
pub repository: Option<Repository>,
}

#[derive(cynic::QueryFragment, Debug)]
#[cynic(variables = "PullRequestsOpenVariables")]
pub struct Repository {
#[arguments(first: 100, after: $after, states: "OPEN")]
pub pull_requests: PullRequestConnection,
}

#[derive(cynic::QueryFragment, Debug)]
pub struct PullRequest {
pub number: i32,
pub updated_at: DateTime,
pub created_at: DateTime,
#[arguments(first: 10)]
pub assignees: UserConnection,
#[arguments(first: 5, orderBy: { direction: "DESC", field: "NAME" })]
pub labels: Option<LabelConnection>,
}
}
129 changes: 50 additions & 79 deletions src/github.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ use anyhow::{anyhow, Context};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, FixedOffset, Utc};
use futures::{future::BoxFuture, FutureExt};
use futures::{future::BoxFuture, FutureExt, TryStreamExt};
use hyper::header::HeaderValue;
use octocrab::params::pulls::Sort;
use octocrab::params::{Direction, State};
use octocrab::Octocrab;
use once_cell::sync::OnceCell;
use regex::Regex;
use reqwest::header::{AUTHORIZATION, USER_AGENT};
Expand All @@ -16,6 +19,7 @@ use std::{
use tracing as log;

pub type UserId = u64;
pub type PullRequestNumber = u64;

#[derive(Debug, PartialEq, Eq, Hash, serde::Deserialize, Clone)]
pub struct User {
Expand Down Expand Up @@ -3049,88 +3053,55 @@ async fn project_items_by_status(
Ok(all_items)
}

/// Retrieve all pull requests in status OPEN that are not drafts
pub async fn retrieve_open_pull_requests(
repo: &Repository,
client: &GithubClient,
) -> anyhow::Result<Vec<(User, i32)>> {
use cynic::QueryBuilder;
use github_graphql::pull_requests_open::{PullRequestsOpen, PullRequestsOpenVariables};

let repo_owner = repo.owner();
let repo_name = repo.name();

let mut prs = vec![];

let mut vars = PullRequestsOpenVariables {
repo_owner,
repo_name,
after: None,
};
loop {
let query = PullRequestsOpen::build(vars.clone());
let req = client.post(&client.graphql_url);
let req = req.json(&query);

let data: cynic::GraphQlResponse<PullRequestsOpen> = client.json(req).await?;
if let Some(errors) = data.errors {
anyhow::bail!("There were graphql errors. {:?}", errors);
/// Retrieve tuples of (user, PR number) where
/// the given user is assigned as a reviewer for that PR.
/// Only non-draft, non-rollup and open PRs are taken into account.
pub async fn retrieve_pull_request_assignments(
owner: &str,
repository: &str,
client: &Octocrab,
) -> anyhow::Result<Vec<(User, PullRequestNumber)>> {
let mut assignments = vec![];

// We use the REST API to fetch open pull requests, as it is much (~5-10x)
// faster than using GraphQL here.
let stream = client
.pulls(owner, repository)
.list()
.state(State::Open)
.direction(Direction::Ascending)
.sort(Sort::Created)
.per_page(100)
.send()
.await?
.into_stream(client);
let mut stream = std::pin::pin!(stream);
while let Some(pr) = stream.try_next().await? {
if pr.draft == Some(true) {
continue;
}
let repository = data
.data
.ok_or_else(|| anyhow::anyhow!("No data returned."))?
.repository
.ok_or_else(|| anyhow::anyhow!("No repository."))?;
prs.extend(repository.pull_requests.nodes);

let page_info = repository.pull_requests.page_info;
if !page_info.has_next_page || page_info.end_cursor.is_none() {
break;
// exclude rollup PRs
if pr
.labels
.unwrap_or_default()
.iter()
.any(|label| label.name == "rollup")
{
continue;
}
for user in pr.assignees.unwrap_or_default() {
assignments.push((
User {
login: user.login,
id: (*user.id).into(),
},
pr.number,
));
}
vars.after = page_info.end_cursor;
}
assignments.sort_by(|a, b| a.0.id.cmp(&b.0.id));

let mut prs_processed: Vec<_> = vec![];
let _: Vec<_> = prs
.into_iter()
.filter_map(|pr| {
if pr.is_draft {
return None;
}

// exclude rollup PRs
let labels = pr
.labels
.map(|l| l.nodes)
.unwrap_or_default()
.into_iter()
.map(|node| node.name)
.collect::<Vec<_>>();
if labels.iter().any(|label| label == "rollup") {
return None;
}

let _: Vec<_> = pr
.assignees
.nodes
.iter()
.map(|user| {
let user_id = user.database_id.expect("checked") as u64;
prs_processed.push((
User {
login: user.login.clone(),
id: user_id,
},
pr.number,
));
})
.collect();
Some(true)
})
.collect();
prs_processed.sort_by(|a, b| a.0.id.cmp(&b.0.id));

Ok(prs_processed)
Ok(assignments)
}

pub enum DesignMeetingStatus {
Expand Down
8 changes: 3 additions & 5 deletions src/handlers/pr_tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! - Adds the PR to the workqueue of one team member (after the PR has been assigned or reopened)
//! - Removes the PR from the workqueue of one team member (after the PR has been unassigned or closed)

use crate::github::PullRequestNumber;
use crate::github::{User, UserId};
use crate::{
config::ReviewPrefsConfig,
Expand All @@ -15,8 +16,6 @@ use crate::{
use std::collections::{HashMap, HashSet};
use tracing as log;

pub type PullRequestNumber = u64;

/// Maps users to a set of currently assigned open non-draft pull requests.
/// We store this map in memory, rather than in the DB, because it can get desynced when webhooks
/// are missed.
Expand Down Expand Up @@ -161,10 +160,9 @@ async fn delete_pr_from_workqueue(ctx: &Context, user_id: UserId, pr: PullReques
#[cfg(test)]
mod tests {
use crate::config::Config;
use crate::github::PullRequestNumber;
use crate::github::{Issue, IssuesAction, IssuesEvent, Repository, User};
use crate::handlers::pr_tracking::{
handle_input, parse_input, upsert_pr_into_workqueue, PullRequestNumber,
};
use crate::handlers::pr_tracking::{handle_input, parse_input, upsert_pr_into_workqueue};
use crate::tests::github::{default_test_user, issue, pull_request, user};
use crate::tests::{run_test, TestContext};

Expand Down
40 changes: 23 additions & 17 deletions src/handlers/pull_requests_assignment_update.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::github::{retrieve_open_pull_requests, UserId};
use crate::handlers::pr_tracking::{PullRequestNumber, ReviewerWorkqueue};
use crate::github::PullRequestNumber;
use crate::github::{retrieve_pull_request_assignments, UserId};
use crate::handlers::pr_tracking::ReviewerWorkqueue;
use crate::jobs::Job;
use async_trait::async_trait;
use octocrab::Octocrab;
use std::collections::{HashMap, HashSet};

pub struct PullRequestAssignmentUpdate;
Expand All @@ -13,23 +15,27 @@ impl Job for PullRequestAssignmentUpdate {
}

async fn run(&self, ctx: &super::Context, _metadata: &serde_json::Value) -> anyhow::Result<()> {
let gh = &ctx.github;

tracing::trace!("starting pull_request_assignment_update");

let rust_repo = gh.repository("rust-lang/rust").await?;
let prs = retrieve_open_pull_requests(&rust_repo, &gh).await?;

// Aggregate PRs by user
let aggregated: HashMap<UserId, HashSet<PullRequestNumber>> =
prs.into_iter().fold(HashMap::new(), |mut acc, (user, pr)| {
let prs = acc.entry(user.id).or_default();
prs.insert(pr as PullRequestNumber);
acc
});
tracing::info!("PR assignments\n{aggregated:?}");
*ctx.workqueue.write().await = ReviewerWorkqueue::new(aggregated);
let workqueue = load_workqueue(&ctx.octocrab).await?;
*ctx.workqueue.write().await = workqueue;
tracing::trace!("finished pull_request_assignment_update");

Ok(())
}
}

/// Loads the workqueue (mapping of open PRs assigned to users) from GitHub
pub async fn load_workqueue(client: &Octocrab) -> anyhow::Result<ReviewerWorkqueue> {
tracing::debug!("Loading workqueue for rust-lang/rust");
let prs = retrieve_pull_request_assignments("rust-lang", "rust", &client).await?;

// Aggregate PRs by user
let aggregated: HashMap<UserId, HashSet<PullRequestNumber>> =
prs.into_iter().fold(HashMap::new(), |mut acc, (user, pr)| {
let prs = acc.entry(user.id).or_default();
prs.insert(pr);
acc
});
tracing::debug!("PR assignments\n{aggregated:?}");
Ok(ReviewerWorkqueue::new(aggregated))
}
13 changes: 12 additions & 1 deletion src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use std::str::FromStr;
use async_trait::async_trait;
use cron::Schedule;

use crate::handlers::pull_requests_assignment_update::PullRequestAssignmentUpdate;
use crate::{
db::jobs::JobSchedule,
handlers::{docs_update::DocsUpdateJob, rustc_commits::RustcCommitsJob, Context},
Expand All @@ -61,7 +62,11 @@ pub const JOB_PROCESSING_CADENCE_IN_SECS: u64 = 60;

// The default jobs list that are currently scheduled to run
pub fn jobs() -> Vec<Box<dyn Job + Send + Sync>> {
vec![Box::new(DocsUpdateJob), Box::new(RustcCommitsJob)]
vec![
Box::new(DocsUpdateJob),
Box::new(RustcCommitsJob),
Box::new(PullRequestAssignmentUpdate),
]
}

// Definition of the schedule repetition for the jobs we want to run.
Expand All @@ -79,6 +84,12 @@ pub fn default_jobs() -> Vec<JobSchedule> {
schedule: Schedule::from_str("* 0,30 * * * * *").unwrap(),
metadata: serde_json::Value::Null,
},
JobSchedule {
name: PullRequestAssignmentUpdate.name(),
// Every 30 minutes
schedule: Schedule::from_str("* 0,30 * * * * *").unwrap(),
metadata: serde_json::Value::Null,
},
]
}

Expand Down
Loading
Loading