diff --git a/database/schema.md b/database/schema.md index 3d8ab3b5b..309e5b74f 100644 --- a/database/schema.md +++ b/database/schema.md @@ -313,3 +313,34 @@ Columns: execute. * **is_active** (`boolean NOT NULL`): For controlling whether the collector is active for use. Useful for adding/removing collectors. + +### job_queue + +This table stores ephemeral benchmark jobs, which specifically tell the +collector which benchmarks it should execute. The jobs will be kept in the +table for ~30 days after being completed, so that we can quickly figure out +what master parent jobs we need to backfill when handling try builds. + +Columns: + +* **id** (`bigint` / `serial`): Primary*key identifier for the job row; + auto*increments with each new job. +* **request_tag** (`text`): References the parent benchmark request that + spawned this job. +* **target** (`text NOT NULL`): Hardware/ISA the benchmarks must run on + (e.g. AArch64, x86_64). +* **backend** (`text NOT NULL`): Code generation backend the collector should + test (e.g. llvm, cranelift). +* **benchmark_set** (`int NOT NULL`): ID of the predefined benchmark suite to + execute. +* **collector_name** (`text`): Name of the collector that claimed the job + (populated once the job is started). +* **created_at** (`timestamptz NOT NULL`): Datetime when the job was queued. +* **started_at** (`timestamptz`): Datetime when the collector actually began + running the benchmarks; NULL until the job is claimed. +* **completed_at** (`timestampt`): Datetime when the collector finished + (successfully or otherwise); used to purge rows after ~30 days. +* **status** (`text NOT NULL`): Current job state. `queued`, `in_progress`, + `success`, or `failure`. +* **retry** (`int NOT NULL`): Number of times the job has been re*queued after + a failure; 0 on the first attempt. diff --git a/database/src/lib.rs b/database/src/lib.rs index d37d606ea..a8f39c28f 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -846,13 +846,6 @@ impl BenchmarkRequestStatus { _ => Err(anyhow!("Unknown BenchmarkRequestStatus `{text}`")), } } - - pub(crate) fn completed_at(&self) -> Option> { - match self { - Self::Completed { completed_at } => Some(*completed_at), - _ => None, - } - } } impl fmt::Display for BenchmarkRequestStatus { @@ -1026,6 +1019,10 @@ impl BenchmarkRequest { .collect::, _>>() .map_err(|e| anyhow::anyhow!("Invalid backend: {e}")) } + + pub fn is_completed(&self) -> bool { + matches!(self.status, BenchmarkRequestStatus::Completed { .. }) + } } /// Cached information about benchmark requests in the DB @@ -1047,6 +1044,11 @@ impl BenchmarkRequestIndex { pub fn completed_requests(&self) -> &HashSet { &self.completed } + + pub fn add_tag(&mut self, tag: &str) { + self.all.insert(tag.to_string()); + self.completed.insert(tag.to_string()); + } } #[derive(Debug, Clone, PartialEq)] @@ -1092,7 +1094,7 @@ impl fmt::Display for BenchmarkJobStatus { } #[derive(Debug, Clone, PartialEq)] -pub struct BenchmarkSet(u32); +pub struct BenchmarkSet(pub u32); /// A single unit of work generated from a benchmark request. Split by profiles /// and backends @@ -1102,6 +1104,7 @@ pub struct BenchmarkSet(u32); /// they are responsible for. #[derive(Debug, Clone, PartialEq)] pub struct BenchmarkJob { + id: u32, target: Target, backend: CodegenBackend, profile: Profile, @@ -1113,6 +1116,10 @@ pub struct BenchmarkJob { } impl BenchmarkJob { + pub fn id(&self) -> u32 { + self.id + } + pub fn target(&self) -> &Target { &self.target } @@ -1142,6 +1149,22 @@ impl BenchmarkJob { } } +/// Describes the final state of a job +#[derive(Debug, Clone, PartialEq)] +pub enum BenchmarkJobConclusion { + Failure, + Success, +} + +impl BenchmarkJobConclusion { + pub fn as_str(&self) -> &str { + match self { + BenchmarkJobConclusion::Failure => BENCHMARK_JOB_STATUS_FAILURE_STR, + BenchmarkJobConclusion::Success => BENCHMARK_JOB_STATUS_SUCCESS_STR, + } + } +} + /// The configuration for a collector #[derive(Debug, PartialEq)] pub struct CollectorConfig { diff --git a/database/src/pool.rs b/database/src/pool.rs index 563e4083c..dd0571045 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -1,8 +1,8 @@ use crate::selector::CompileTestCase; use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkJob, BenchmarkRequest, - BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkSet, CodegenBackend, CollectorConfig, - CompileBenchmark, Target, + ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkJob, BenchmarkJobConclusion, + BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkSet, CodegenBackend, + CollectorConfig, CompileBenchmark, Target, }; use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step}; use chrono::{DateTime, Utc}; @@ -246,13 +246,25 @@ pub trait Connection: Send + Sync { /// Get the confiuguration for a collector by the name of the collector async fn get_collector_config(&self, collector_name: &str) -> anyhow::Result; - /// Get the confiuguration for a collector by the name of the collector + /// Dequeue benchmark job async fn dequeue_benchmark_job( &self, collector_name: &str, target: &Target, benchmark_set: &BenchmarkSet, ) -> anyhow::Result>; + + /// Try and mark the benchmark_request as completed. Will return `true` if + /// it has been marked as completed else `false` meaning there was no change + async fn mark_benchmark_request_as_completed(&self, tag: &str) -> anyhow::Result; + + /// Mark the job as completed. Sets the status to 'failed' or 'success' + /// depending on the enum's completed state being a success + async fn mark_benchmark_job_as_completed( + &self, + id: u32, + benchmark_job_conculsion: &BenchmarkJobConclusion, + ) -> anyhow::Result<()>; } #[async_trait::async_trait] @@ -388,6 +400,45 @@ mod tests { } } + async fn complete_request( + db: &dyn Connection, + request_tag: &str, + collector_name: &str, + benchmark_set: u32, + target: &Target, + ) { + /* Create job for the request */ + db.enqueue_benchmark_job( + request_tag, + &target, + &CodegenBackend::Llvm, + &Profile::Opt, + benchmark_set, + ) + .await + .unwrap(); + + let job = db + .dequeue_benchmark_job(collector_name, &target, &BenchmarkSet(benchmark_set)) + .await + .unwrap() + .unwrap(); + + assert_eq!(job.request_tag(), request_tag); + + /* Mark the job as complete */ + db.mark_benchmark_job_as_completed(job.id(), &BenchmarkJobConclusion::Success) + .await + .unwrap(); + + assert_eq!( + db.mark_benchmark_request_as_completed(request_tag) + .await + .unwrap(), + true + ); + } + #[tokio::test] async fn pstat_returns_empty_vector_when_empty() { run_db_test(|ctx| async { @@ -475,28 +526,31 @@ mod tests { #[tokio::test] async fn multiple_non_completed_try_requests() { run_postgres_test(|ctx| async { - let db = ctx.db_client(); - let db = db.connection().await; + let db = ctx.db_client().connection().await; + let target = &Target::X86_64UnknownLinuxGnu; + let collector_name = "collector-1"; + let benchmark_set = 1; - // Completed + db.add_collector_config(collector_name, &target, benchmark_set, true) + .await + .unwrap(); + + // Complete parent + let parent = BenchmarkRequest::create_release("sha-parent-1", Utc::now()); + // Complete let req_a = BenchmarkRequest::create_try_without_artifacts(42, Utc::now(), "", ""); // WaitingForArtifacts let req_b = BenchmarkRequest::create_try_without_artifacts(42, Utc::now(), "", ""); let req_c = BenchmarkRequest::create_try_without_artifacts(42, Utc::now(), "", ""); + db.insert_benchmark_request(&parent).await.unwrap(); db.insert_benchmark_request(&req_a).await.unwrap(); db.attach_shas_to_try_benchmark_request(42, "sha1", "sha-parent-1") .await .unwrap(); - db.update_benchmark_request_status( - "sha1", - BenchmarkRequestStatus::Completed { - completed_at: Utc::now(), - }, - ) - .await - .unwrap(); + complete_request(&*db, "sha-parent-1", collector_name, benchmark_set, &target).await; + complete_request(&*db, "sha1", collector_name, benchmark_set, &target).await; // This should be fine, req_a was completed db.insert_benchmark_request(&req_b).await.unwrap(); @@ -543,8 +597,15 @@ mod tests { #[tokio::test] async fn load_pending_benchmark_requests() { run_postgres_test(|ctx| async { - let db = ctx.db_client(); + let db = ctx.db_client().connection().await; let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap(); + let target = &Target::X86_64UnknownLinuxGnu; + let collector_name = "collector-1"; + let benchmark_set = 1; + + db.add_collector_config(collector_name, &target, benchmark_set, true) + .await + .unwrap(); // ArtifactsReady let req_a = BenchmarkRequest::create_master("sha-1", "parent-sha-1", 42, time); @@ -555,24 +616,17 @@ mod tests { // InProgress let req_d = BenchmarkRequest::create_master("sha-2", "parent-sha-2", 51, time); // Completed - let req_e = BenchmarkRequest::create_master("sha-3", "parent-sha-3", 52, time); + let req_e = BenchmarkRequest::create_release("1.79.0", time); - let db = db.connection().await; for &req in &[&req_a, &req_b, &req_c, &req_d, &req_e] { db.insert_benchmark_request(req).await.unwrap(); } + complete_request(&*db, "1.79.0", collector_name, benchmark_set, &target).await; + db.update_benchmark_request_status("sha-2", BenchmarkRequestStatus::InProgress) .await .unwrap(); - db.update_benchmark_request_status( - "sha-3", - BenchmarkRequestStatus::Completed { - completed_at: Utc::now(), - }, - ) - .await - .unwrap(); let requests = db.load_pending_benchmark_requests().await.unwrap(); @@ -837,4 +891,87 @@ mod tests { }) .await; } + + #[tokio::test] + async fn mark_request_as_complete_empty() { + run_postgres_test(|ctx| async { + let db = ctx.db_client().connection().await; + let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap(); + + let insert_result = db + .add_collector_config("collector-1", &Target::X86_64UnknownLinuxGnu, 1, true) + .await; + assert!(insert_result.is_ok()); + + let benchmark_request = + BenchmarkRequest::create_master("sha-1", "parent-sha-1", 42, time); + db.insert_benchmark_request(&benchmark_request) + .await + .unwrap(); + assert_eq!( + db.mark_benchmark_request_as_completed("sha-1") + .await + .unwrap(), + true + ); + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn mark_request_as_complete() { + run_postgres_test(|ctx| async { + let db = ctx.db_client().connection().await; + let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap(); + let benchmark_set = BenchmarkSet(0u32); + let tag = "sha-1"; + let collector_name = "collector-1"; + let target = Target::X86_64UnknownLinuxGnu; + + let insert_result = db + .add_collector_config(collector_name, &target, 1, true) + .await; + assert!(insert_result.is_ok()); + + /* Create the request */ + let benchmark_request = BenchmarkRequest::create_release(tag, time); + db.insert_benchmark_request(&benchmark_request) + .await + .unwrap(); + + /* Create job for the request */ + db.enqueue_benchmark_job( + benchmark_request.tag().unwrap(), + &target, + &CodegenBackend::Llvm, + &Profile::Opt, + benchmark_set.0, + ) + .await + .unwrap(); + + let job = db + .dequeue_benchmark_job(collector_name, &target, &benchmark_set) + .await + .unwrap() + .unwrap(); + + assert_eq!(job.request_tag(), benchmark_request.tag().unwrap()); + + /* Mark the job as complete */ + db.mark_benchmark_job_as_completed(job.id(), &BenchmarkJobConclusion::Success) + .await + .unwrap(); + + db.mark_benchmark_request_as_completed(tag).await.unwrap(); + + let completed = db.load_benchmark_request_index().await.unwrap(); + + assert!(completed.contains_tag("sha-1")); + + Ok(ctx) + }) + .await; + } } diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 1d30a942a..9483a6239 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1,11 +1,12 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::selector::CompileTestCase; use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkJob, BenchmarkJobStatus, - BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestType, - BenchmarkSet, CodegenBackend, CollectionId, CollectorConfig, Commit, CommitType, - CompileBenchmark, Date, Index, Profile, QueuedCommit, Scenario, Target, - BENCHMARK_JOB_STATUS_IN_PROGRESS_STR, BENCHMARK_JOB_STATUS_QUEUED_STR, + ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkJob, + BenchmarkJobConclusion, BenchmarkJobStatus, BenchmarkRequest, BenchmarkRequestIndex, + BenchmarkRequestStatus, BenchmarkRequestType, BenchmarkSet, CodegenBackend, CollectionId, + CollectorConfig, Commit, CommitType, CompileBenchmark, Date, Index, Profile, QueuedCommit, + Scenario, Target, BENCHMARK_JOB_STATUS_FAILURE_STR, BENCHMARK_JOB_STATUS_IN_PROGRESS_STR, + BENCHMARK_JOB_STATUS_QUEUED_STR, BENCHMARK_JOB_STATUS_SUCCESS_STR, BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR, BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR, BENCHMARK_REQUEST_STATUS_COMPLETED_STR, BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR, BENCHMARK_REQUEST_STATUS_WAITING_FOR_ARTIFACTS_STR, @@ -1530,16 +1531,21 @@ where tag: &str, status: BenchmarkRequestStatus, ) -> anyhow::Result<()> { + // We cannot use this function to mark requests as complete, as + // we need to know if all jobs are complete first. + if matches!(status, BenchmarkRequestStatus::Completed { .. }) { + panic!("Please use `mark_benchmark_request_as_completed(...)` to complete benchmark_requests"); + } + let status_str = status.as_str(); - let completed_at = status.completed_at(); let modified_rows = self .conn() .execute( r#" UPDATE benchmark_request - SET status = $1, completed_at = $2 - WHERE tag = $3;"#, - &[&status_str, &completed_at, &tag], + SET status = $1 + WHERE tag = $2;"#, + &[&status_str, &tag], ) .await .context("failed to update benchmark request status")?; @@ -1843,6 +1849,7 @@ where WHERE job_queue.id = picked.id RETURNING + job_queue.id, job_queue.backend, job_queue.profile, job_queue.request_tag, @@ -1868,25 +1875,101 @@ where None => Ok(None), Some(row) => { let job = BenchmarkJob { + id: row.get::<_, i32>(0) as u32, target: *target, - backend: CodegenBackend::from_str(&row.get::<_, String>(0)) + backend: CodegenBackend::from_str(&row.get::<_, String>(1)) .map_err(|e| anyhow::anyhow!(e))?, - profile: Profile::from_str(&row.get::<_, String>(1)) + profile: Profile::from_str(&row.get::<_, String>(2)) .map_err(|e| anyhow::anyhow!(e))?, - request_tag: row.get::<_, String>(2), + request_tag: row.get::<_, String>(3), benchmark_set: benchmark_set.clone(), - created_at: row.get::<_, DateTime>(3), + created_at: row.get::<_, DateTime>(4), // The job is now in an in_progress state status: BenchmarkJobStatus::InProgress { - started_at: row.get::<_, DateTime>(4), + started_at: row.get::<_, DateTime>(5), collector_name: collector_name.into(), }, - retry: row.get::<_, i32>(5) as u32, + retry: row.get::<_, i32>(6) as u32, }; Ok(Some(job)) } } } + + async fn mark_benchmark_request_as_completed(&self, tag: &str) -> anyhow::Result { + // Find if the benchmark is completed and update it's status to completed + // in one SQL block + let row = self + .conn() + .query_opt( + " + UPDATE + benchmark_request + SET + status = $1, + completed_at = NOW() + WHERE + benchmark_request.tag = $2 + AND benchmark_request.status != $1 + AND NOT EXISTS ( + SELECT + 1 + FROM + job_queue + WHERE + job_queue.request_tag = benchmark_request.tag + AND job_queue.status NOT IN ($3, $4) + ) + AND ( + benchmark_request.parent_sha IS NULL + OR NOT EXISTS ( + SELECT + 1 + FROM + job_queue + WHERE + job_queue.request_tag = benchmark_request.parent_sha + AND job_queue.status NOT IN ($3, $4) + ) + ) + RETURNING + benchmark_request.tag; + ", + &[ + &BENCHMARK_REQUEST_STATUS_COMPLETED_STR, + &tag, + &BENCHMARK_JOB_STATUS_SUCCESS_STR, + &BENCHMARK_JOB_STATUS_FAILURE_STR, + ], + ) + .await + .context("Failed to mark benchmark_request as completed")?; + // The affected tag is returned by the query thus we can use the row's + // presence to determine if the request was marked as completed + Ok(row.is_some()) + } + + async fn mark_benchmark_job_as_completed( + &self, + id: u32, + benchmark_job_conclusion: &BenchmarkJobConclusion, + ) -> anyhow::Result<()> { + self.conn() + .execute( + " + UPDATE + job_queue + SET + status = $1, + completed_at = NOW() + WHERE + id = $2", + &[&benchmark_job_conclusion.as_str(), &(id as i32)], + ) + .await + .context("Failed to mark benchmark job as completed")?; + Ok(()) + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option>) -> ArtifactId { diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index e1efbddc0..c4391ba07 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1,9 +1,9 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::selector::CompileTestCase; use crate::{ - ArtifactCollection, ArtifactId, Benchmark, BenchmarkJob, BenchmarkRequest, - BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkSet, CodegenBackend, CollectionId, - CollectorConfig, Commit, CommitType, CompileBenchmark, Date, Profile, Target, + ArtifactCollection, ArtifactId, Benchmark, BenchmarkJob, BenchmarkJobConclusion, + BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkSet, CodegenBackend, + CollectionId, CollectorConfig, Commit, CommitType, CompileBenchmark, Date, Profile, Target, }; use crate::{ArtifactIdNumber, Index, QueuedCommit}; use chrono::{DateTime, TimeZone, Utc}; @@ -1353,6 +1353,18 @@ impl Connection for SqliteConnection { ) -> anyhow::Result { no_queue_implementation_abort!() } + + async fn mark_benchmark_request_as_completed(&self, _tag: &str) -> anyhow::Result { + no_queue_implementation_abort!() + } + + async fn mark_benchmark_job_as_completed( + &self, + _id: u32, + _benchmark_job_conculsion: &BenchmarkJobConclusion, + ) -> anyhow::Result<()> { + no_queue_implementation_abort!() + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option) -> ArtifactId { diff --git a/site/src/job_queue/mod.rs b/site/src/job_queue/mod.rs index 0f2f35587..5131f82d0 100644 --- a/site/src/job_queue/mod.rs +++ b/site/src/job_queue/mod.rs @@ -253,7 +253,13 @@ async fn try_enqueue_next_benchmark_request( break; } BenchmarkRequestStatus::InProgress => { - // TODO: Try to mark as completed + if conn + .mark_benchmark_request_as_completed(request.tag().unwrap()) + .await? + { + index.add_tag(request.tag().unwrap()); + continue; + } break; } BenchmarkRequestStatus::WaitingForArtifacts @@ -301,7 +307,9 @@ pub async fn cron_main(site_ctxt: Arc>>>, seconds: u mod tests { use super::*; use chrono::{Datelike, Duration, TimeZone, Utc}; - use database::tests::run_postgres_test; + use database::{ + tests::run_postgres_test, BenchmarkJobConclusion, BenchmarkSet, CodegenBackend, Profile, + }; fn days_ago(day_str: &str) -> chrono::DateTime { // Walk backwards until the first non-digit, then slice @@ -345,15 +353,54 @@ mod tests { } } - async fn mark_as_completed(conn: &dyn database::pool::Connection, shas: &[&str]) { - let completed_at = Utc::now(); - for sha in shas { - conn.update_benchmark_request_status( - sha, - BenchmarkRequestStatus::Completed { completed_at }, - ) + async fn complete_request( + db: &dyn database::pool::Connection, + request_tag: &str, + collector_name: &str, + benchmark_set: u32, + target: &Target, + ) { + /* Create job for the request */ + db.enqueue_benchmark_job( + request_tag, + &target, + &CodegenBackend::Llvm, + &Profile::Opt, + benchmark_set, + ) + .await + .unwrap(); + + let job = db + .dequeue_benchmark_job(collector_name, &target, &BenchmarkSet(benchmark_set)) + .await + .unwrap() + .unwrap(); + + assert_eq!(job.request_tag(), request_tag); + + /* Mark the job as complete */ + db.mark_benchmark_job_as_completed(job.id(), &BenchmarkJobConclusion::Success) .await .unwrap(); + + assert_eq!( + db.mark_benchmark_request_as_completed(request_tag) + .await + .unwrap(), + true + ); + } + + async fn mark_as_completed( + conn: &dyn database::pool::Connection, + shas: &[&str], + collector_name: &str, + benchmark_set: u32, + target: &database::Target, + ) { + for sha in shas { + complete_request(conn, sha, collector_name, benchmark_set, target).await; } } @@ -368,6 +415,14 @@ mod tests { #[tokio::test] async fn queue_ordering() { run_postgres_test(|ctx| async { + let db = ctx.db_client().connection().await; + let target = &Target::X86_64UnknownLinuxGnu; + let collector_name = "collector-1"; + let benchmark_set = 1; + + db.add_collector_config(collector_name, &target, benchmark_set, true) + .await + .unwrap(); /* Key: * +---------------------+ * | m - master | @@ -426,8 +481,6 @@ mod tests { * | t "baz" R pr17 | 6: a try with a low PR, blocked by parent * +----------------+ **/ - - let db = ctx.db_client().connection().await; let requests = vec![ create_master("bar", "parent1", 10, "days2"), create_master("345", "parent2", 11, "days2"), @@ -452,7 +505,14 @@ mod tests { .await .unwrap(); - mark_as_completed(&*db, &["bar", "345", "aaa", "rrr"]).await; + mark_as_completed( + &*db, + &["bar", "345", "aaa", "rrr"], + collector_name, + benchmark_set, + &target, + ) + .await; let index = db.load_benchmark_request_index().await.unwrap();