Skip to content

Feat; mark benchmark requests as complete #2214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions database/schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,34 @@ Columns:
execute.
* **is_active** (`boolean NOT NULL`): For controlling whether the collector is
active for use. Useful for adding/removing collectors.

### job_queue

This table stores ephemeral benchmark jobs, which specifically tell the
collector which benchmarks it should execute. The jobs will be kept in the
table for ~30 days after being completed, so that we can quickly figure out
what master parent jobs we need to backfill when handling try builds.

Columns:

* **id** (`bigint` / `serial`): Primary*key identifier for the job row;
auto*increments with each new job.
Comment on lines +326 to +327
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this intentional italics?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not :) We can fix it in some follow-up.

* **request_tag** (`text`): References the parent benchmark request that
spawned this job.
* **target** (`text NOT NULL`): Hardware/ISA the benchmarks must run on
(e.g. AArch64, x86_64).
* **backend** (`text NOT NULL`): Code generation backend the collector should
test (e.g. llvm, cranelift).
* **benchmark_set** (`int NOT NULL`): ID of the predefined benchmark suite to
execute.
* **collector_name** (`text`): Name of the collector that claimed the job
(populated once the job is started).
* **created_at** (`timestamptz NOT NULL`): Datetime when the job was queued.
* **started_at** (`timestamptz`): Datetime when the collector actually began
running the benchmarks; NULL until the job is claimed.
* **completed_at** (`timestampt`): Datetime when the collector finished
(successfully or otherwise); used to purge rows after ~30 days.
* **status** (`text NOT NULL`): Current job state. `queued`, `in_progress`,
`success`, or `failure`.
* **retry** (`int NOT NULL`): Number of times the job has been re*queued after
a failure; 0 on the first attempt.
39 changes: 31 additions & 8 deletions database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,13 +846,6 @@ impl BenchmarkRequestStatus {
_ => Err(anyhow!("Unknown BenchmarkRequestStatus `{text}`")),
}
}

pub(crate) fn completed_at(&self) -> Option<DateTime<Utc>> {
match self {
Self::Completed { completed_at } => Some(*completed_at),
_ => None,
}
}
}

impl fmt::Display for BenchmarkRequestStatus {
Expand Down Expand Up @@ -1026,6 +1019,10 @@ impl BenchmarkRequest {
.collect::<Result<Vec<_>, _>>()
.map_err(|e| anyhow::anyhow!("Invalid backend: {e}"))
}

pub fn is_completed(&self) -> bool {
matches!(self.status, BenchmarkRequestStatus::Completed { .. })
}
}

/// Cached information about benchmark requests in the DB
Expand All @@ -1047,6 +1044,11 @@ impl BenchmarkRequestIndex {
pub fn completed_requests(&self) -> &HashSet<String> {
&self.completed
}

pub fn add_tag(&mut self, tag: &str) {
self.all.insert(tag.to_string());
self.completed.insert(tag.to_string());
}
}

#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -1092,7 +1094,7 @@ impl fmt::Display for BenchmarkJobStatus {
}

#[derive(Debug, Clone, PartialEq)]
pub struct BenchmarkSet(u32);
pub struct BenchmarkSet(pub u32);

/// A single unit of work generated from a benchmark request. Split by profiles
/// and backends
Expand All @@ -1102,6 +1104,7 @@ pub struct BenchmarkSet(u32);
/// they are responsible for.
#[derive(Debug, Clone, PartialEq)]
pub struct BenchmarkJob {
id: u32,
target: Target,
backend: CodegenBackend,
profile: Profile,
Expand All @@ -1113,6 +1116,10 @@ pub struct BenchmarkJob {
}

impl BenchmarkJob {
pub fn id(&self) -> u32 {
self.id
}

pub fn target(&self) -> &Target {
&self.target
}
Expand Down Expand Up @@ -1142,6 +1149,22 @@ impl BenchmarkJob {
}
}

/// Describes the final state of a job
#[derive(Debug, Clone, PartialEq)]
pub enum BenchmarkJobConclusion {
Failure,
Success,
}

impl BenchmarkJobConclusion {
pub fn as_str(&self) -> &str {
match self {
BenchmarkJobConclusion::Failure => BENCHMARK_JOB_STATUS_FAILURE_STR,
BenchmarkJobConclusion::Success => BENCHMARK_JOB_STATUS_SUCCESS_STR,
}
}
}

/// The configuration for a collector
#[derive(Debug, PartialEq)]
pub struct CollectorConfig {
Expand Down
189 changes: 163 additions & 26 deletions database/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::selector::CompileTestCase;
use crate::{
ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkJob, BenchmarkRequest,
BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkSet, CodegenBackend, CollectorConfig,
CompileBenchmark, Target,
ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkJob, BenchmarkJobConclusion,
BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkSet, CodegenBackend,
CollectorConfig, CompileBenchmark, Target,
};
use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step};
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -246,13 +246,25 @@ pub trait Connection: Send + Sync {
/// Get the confiuguration for a collector by the name of the collector
async fn get_collector_config(&self, collector_name: &str) -> anyhow::Result<CollectorConfig>;

/// Get the confiuguration for a collector by the name of the collector
/// Dequeue benchmark job
async fn dequeue_benchmark_job(
&self,
collector_name: &str,
target: &Target,
benchmark_set: &BenchmarkSet,
) -> anyhow::Result<Option<BenchmarkJob>>;

/// Try and mark the benchmark_request as completed. Will return `true` if
/// it has been marked as completed else `false` meaning there was no change
async fn mark_benchmark_request_as_completed(&self, tag: &str) -> anyhow::Result<bool>;

/// Mark the job as completed. Sets the status to 'failed' or 'success'
/// depending on the enum's completed state being a success
async fn mark_benchmark_job_as_completed(
&self,
id: u32,
benchmark_job_conculsion: &BenchmarkJobConclusion,
) -> anyhow::Result<()>;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -388,6 +400,45 @@ mod tests {
}
}

async fn complete_request(
db: &dyn Connection,
request_tag: &str,
collector_name: &str,
benchmark_set: u32,
target: &Target,
) {
/* Create job for the request */
db.enqueue_benchmark_job(
request_tag,
&target,
&CodegenBackend::Llvm,
&Profile::Opt,
benchmark_set,
)
.await
.unwrap();

let job = db
.dequeue_benchmark_job(collector_name, &target, &BenchmarkSet(benchmark_set))
.await
.unwrap()
.unwrap();

assert_eq!(job.request_tag(), request_tag);

/* Mark the job as complete */
db.mark_benchmark_job_as_completed(job.id(), &BenchmarkJobConclusion::Success)
.await
.unwrap();

assert_eq!(
db.mark_benchmark_request_as_completed(request_tag)
.await
.unwrap(),
true
);
}

#[tokio::test]
async fn pstat_returns_empty_vector_when_empty() {
run_db_test(|ctx| async {
Expand Down Expand Up @@ -475,28 +526,31 @@ mod tests {
#[tokio::test]
async fn multiple_non_completed_try_requests() {
run_postgres_test(|ctx| async {
let db = ctx.db_client();
let db = db.connection().await;
let db = ctx.db_client().connection().await;
let target = &Target::X86_64UnknownLinuxGnu;
let collector_name = "collector-1";
let benchmark_set = 1;

// Completed
db.add_collector_config(collector_name, &target, benchmark_set, true)
.await
.unwrap();

// Complete parent
let parent = BenchmarkRequest::create_release("sha-parent-1", Utc::now());
// Complete
let req_a = BenchmarkRequest::create_try_without_artifacts(42, Utc::now(), "", "");
// WaitingForArtifacts
let req_b = BenchmarkRequest::create_try_without_artifacts(42, Utc::now(), "", "");
let req_c = BenchmarkRequest::create_try_without_artifacts(42, Utc::now(), "", "");

db.insert_benchmark_request(&parent).await.unwrap();
db.insert_benchmark_request(&req_a).await.unwrap();
db.attach_shas_to_try_benchmark_request(42, "sha1", "sha-parent-1")
.await
.unwrap();

db.update_benchmark_request_status(
"sha1",
BenchmarkRequestStatus::Completed {
completed_at: Utc::now(),
},
)
.await
.unwrap();
complete_request(&*db, "sha-parent-1", collector_name, benchmark_set, &target).await;
complete_request(&*db, "sha1", collector_name, benchmark_set, &target).await;

// This should be fine, req_a was completed
db.insert_benchmark_request(&req_b).await.unwrap();
Expand Down Expand Up @@ -543,8 +597,15 @@ mod tests {
#[tokio::test]
async fn load_pending_benchmark_requests() {
run_postgres_test(|ctx| async {
let db = ctx.db_client();
let db = ctx.db_client().connection().await;
let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap();
let target = &Target::X86_64UnknownLinuxGnu;
let collector_name = "collector-1";
let benchmark_set = 1;

db.add_collector_config(collector_name, &target, benchmark_set, true)
.await
.unwrap();

// ArtifactsReady
let req_a = BenchmarkRequest::create_master("sha-1", "parent-sha-1", 42, time);
Expand All @@ -555,24 +616,17 @@ mod tests {
// InProgress
let req_d = BenchmarkRequest::create_master("sha-2", "parent-sha-2", 51, time);
// Completed
let req_e = BenchmarkRequest::create_master("sha-3", "parent-sha-3", 52, time);
let req_e = BenchmarkRequest::create_release("1.79.0", time);

let db = db.connection().await;
for &req in &[&req_a, &req_b, &req_c, &req_d, &req_e] {
db.insert_benchmark_request(req).await.unwrap();
}

complete_request(&*db, "1.79.0", collector_name, benchmark_set, &target).await;

db.update_benchmark_request_status("sha-2", BenchmarkRequestStatus::InProgress)
.await
.unwrap();
db.update_benchmark_request_status(
"sha-3",
BenchmarkRequestStatus::Completed {
completed_at: Utc::now(),
},
)
.await
.unwrap();

let requests = db.load_pending_benchmark_requests().await.unwrap();

Expand Down Expand Up @@ -837,4 +891,87 @@ mod tests {
})
.await;
}

#[tokio::test]
async fn mark_request_as_complete_empty() {
run_postgres_test(|ctx| async {
let db = ctx.db_client().connection().await;
let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap();

let insert_result = db
.add_collector_config("collector-1", &Target::X86_64UnknownLinuxGnu, 1, true)
.await;
assert!(insert_result.is_ok());

let benchmark_request =
BenchmarkRequest::create_master("sha-1", "parent-sha-1", 42, time);
db.insert_benchmark_request(&benchmark_request)
.await
.unwrap();
assert_eq!(
db.mark_benchmark_request_as_completed("sha-1")
.await
.unwrap(),
true
);
Ok(ctx)
})
.await;
}

#[tokio::test]
async fn mark_request_as_complete() {
run_postgres_test(|ctx| async {
let db = ctx.db_client().connection().await;
let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap();
let benchmark_set = BenchmarkSet(0u32);
let tag = "sha-1";
let collector_name = "collector-1";
let target = Target::X86_64UnknownLinuxGnu;

let insert_result = db
.add_collector_config(collector_name, &target, 1, true)
.await;
assert!(insert_result.is_ok());

/* Create the request */
let benchmark_request = BenchmarkRequest::create_release(tag, time);
db.insert_benchmark_request(&benchmark_request)
.await
.unwrap();

/* Create job for the request */
db.enqueue_benchmark_job(
benchmark_request.tag().unwrap(),
&target,
&CodegenBackend::Llvm,
&Profile::Opt,
benchmark_set.0,
)
.await
.unwrap();

let job = db
.dequeue_benchmark_job(collector_name, &target, &benchmark_set)
.await
.unwrap()
.unwrap();

assert_eq!(job.request_tag(), benchmark_request.tag().unwrap());

/* Mark the job as complete */
db.mark_benchmark_job_as_completed(job.id(), &BenchmarkJobConclusion::Success)
.await
.unwrap();

db.mark_benchmark_request_as_completed(tag).await.unwrap();

let completed = db.load_benchmark_request_index().await.unwrap();

assert!(completed.contains_tag("sha-1"));

Ok(ctx)
})
.await;
}
}
Loading
Loading