Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure CDN invalidations occur when crates or crate versions are deleted #10888

Merged
merged 5 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/bin/crates-admin/delete_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,13 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
warn!(%crate_name, "Failed to enqueue background job: {error}");
}

let mut paths = Vec::new();
for version in &opts.versions {
debug!(%crate_name, %version, "Deleting crate file from S3");
if let Err(error) = store.delete_crate_file(crate_name, version).await {
warn!(%crate_name, %version, ?error, "Failed to delete crate file from S3");
} else {
paths.push(store.crate_location(crate_name, version));
}

debug!(%crate_name, %version, "Deleting readme file from S3");
Expand All @@ -117,9 +120,18 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
Err(error) => {
warn!(%crate_name, %version, ?error, "Failed to delete readme file from S3")
}
Ok(_) => {}
Ok(_) => {
paths.push(store.readme_location(crate_name, version));
}
}
}

if let Err(e) = jobs::InvalidateCdns::new(paths.into_iter())
.enqueue(&mut conn)
.await
{
warn!("{crate_name}: Failed to enqueue CDN invalidation background job: {e}");
}

Ok(())
}
25 changes: 18 additions & 7 deletions src/cloudfront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,28 @@ impl CloudFront {
/// Invalidate a file on CloudFront
///
/// `path` is the path to the file to invalidate, such as `config.json`, or `re/ge/regex`
#[instrument(skip(self))]
pub async fn invalidate(&self, path: &str) -> anyhow::Result<()> {
let path = if path.starts_with('/') {
path.to_string()
} else {
format!("/{path}")
};
self.invalidate_many(vec![path.to_string()]).await
}

/// Invalidate multiple paths on Cloudfront.
#[instrument(skip(self))]
pub async fn invalidate_many(&self, mut paths: Vec<String>) -> anyhow::Result<()> {
let now = chrono::offset::Utc::now().timestamp_micros();

let paths = Paths::builder().quantity(1).items(path).build()?;
// We need to ensure that paths have a starting slash.
for path in paths.iter_mut() {
if !path.starts_with('/') {
*path = format!("/{path}");
}
}

let paths = Paths::builder()
// It looks like you have to set quantity even if you provide a full blown Vec, because
// reasons.
.quantity(paths.len() as i32)
.set_items(Some(paths))
.build()?;

let invalidation_batch = InvalidationBatch::builder()
.caller_reference(format!("{now}"))
Expand Down
38 changes: 31 additions & 7 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,16 @@ impl Storage {
apply_cdn_prefix(&self.cdn_prefix, &feed_id.into()).replace('+', "%2B")
}

/// Deletes all crate files for the given crate, returning the paths that were deleted.
#[instrument(skip(self))]
pub async fn delete_all_crate_files(&self, name: &str) -> Result<()> {
pub async fn delete_all_crate_files(&self, name: &str) -> Result<Vec<Path>> {
let prefix = format!("{PREFIX_CRATES}/{name}").into();
self.delete_all_with_prefix(&prefix).await
}

/// Deletes all READMEs for the given crate, returning the paths that were deleted.
#[instrument(skip(self))]
pub async fn delete_all_readmes(&self, name: &str) -> Result<()> {
pub async fn delete_all_readmes(&self, name: &str) -> Result<Vec<Path>> {
let prefix = format!("{PREFIX_READMES}/{name}").into();
self.delete_all_with_prefix(&prefix).await
}
Expand Down Expand Up @@ -333,16 +335,24 @@ impl Storage {
self.store.clone()
}

async fn delete_all_with_prefix(&self, prefix: &Path) -> Result<()> {
async fn delete_all_with_prefix(&self, prefix: &Path) -> Result<Vec<Path>> {
let objects = self.store.list(Some(prefix));
let locations = objects.map(|meta| meta.map(|m| m.location)).boxed();
let mut paths = Vec::new();
let locations = objects
.map(|meta| meta.map(|m| m.location))
.inspect(|r| {
if let Ok(path) = r {
paths.push(path.clone());
}
})
.boxed();

self.store
.delete_stream(locations)
.try_collect::<Vec<_>>()
.await?;

Ok(())
Ok(paths)
}

fn attrs(&self, slice: impl IntoIterator<Item = (Attribute, &'static str)>) -> Attributes {
Expand Down Expand Up @@ -505,7 +515,14 @@ mod tests {
async fn delete_all_crate_files() {
let storage = prepare().await;

storage.delete_all_crate_files("foo").await.unwrap();
let deleted_files = storage.delete_all_crate_files("foo").await.unwrap();
assert_eq!(
deleted_files,
vec![
"crates/foo/foo-1.0.0.crate".into(),
"crates/foo/foo-1.2.3.crate".into(),
]
);

let expected_files = vec![
"crates/bar/bar-2.0.0.crate",
Expand All @@ -520,7 +537,14 @@ mod tests {
async fn delete_all_readmes() {
let storage = prepare().await;

storage.delete_all_readmes("foo").await.unwrap();
let deleted_files = storage.delete_all_readmes("foo").await.unwrap();
assert_eq!(
deleted_files,
vec![
"readmes/foo/foo-1.0.0.html".into(),
"readmes/foo/foo-1.2.3.html".into(),
]
);

let expected_files = vec![
"crates/bar/bar-2.0.0.crate",
Expand Down
20 changes: 18 additions & 2 deletions src/worker/jobs/delete_crate.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::storage::FeedId;
use crate::worker::Environment;
use crate::worker::jobs::InvalidateCdns;
use anyhow::Context;
use crates_io_worker::BackgroundJob;
use std::sync::Arc;
Expand All @@ -25,8 +26,9 @@ impl BackgroundJob for DeleteCrateFromStorage {

async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> {
let name = &self.name;
let feed_id = FeedId::Crate { name };

try_join!(
let (crate_file_paths, readme_paths, _) = try_join!(
async {
info!("{name}: Deleting crate files from S3…");
let result = ctx.storage.delete_all_crate_files(name).await;
Expand All @@ -39,13 +41,27 @@ impl BackgroundJob for DeleteCrateFromStorage {
},
async {
info!("{name}: Deleting RSS feed from S3…");
let feed_id = FeedId::Crate { name };
let result = ctx.storage.delete_feed(&feed_id).await;
result.context("Failed to delete RSS feed from S3")
}
)?;

info!("{name}: Successfully deleted crate from S3");

info!("{name}: Enqueuing CDN invalidations");

let mut conn = ctx.deadpool.get().await?;
InvalidateCdns::new(
crate_file_paths
.into_iter()
.chain(readme_paths.into_iter())
.chain(std::iter::once(object_store::path::Path::from(&feed_id))),
)
.enqueue(&mut conn)
.await?;

info!("{name}: Successfully enqueued CDN invalidations.");

Ok(())
}
}
57 changes: 57 additions & 0 deletions src/worker/jobs/invalidate_cdns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::sync::Arc;

use anyhow::Context;
use crates_io_worker::BackgroundJob;

use crate::worker::Environment;

/// A background job that invalidates the given paths on all CDNs in use on crates.io.
#[derive(Deserialize, Serialize)]
pub struct InvalidateCdns {
paths: Vec<String>,
}

impl InvalidateCdns {
pub fn new<I>(paths: I) -> Self
where
I: Iterator,
I::Item: ToString,
{
Self {
paths: paths.map(|path| path.to_string()).collect(),
}
}
}

impl BackgroundJob for InvalidateCdns {
const JOB_NAME: &'static str = "invalidate_cdns";

type Context = Arc<Environment>;

async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> {
// Fastly doesn't provide an API to purge multiple paths at once, except through the use of
// surrogate keys. We can't use surrogate keys right now because they require a
// Fastly-specific header, and not all of our traffic goes through Fastly.
//
// For now, we won't parallelise: most crate deletions are for new crates with one (or very
// few) versions, so the actual number of paths being invalidated is likely to be small, and
// this is all happening from either a background job or admin command anyway.
if let Some(fastly) = ctx.fastly() {
for path in self.paths.iter() {
fastly
.invalidate(path)
.await
.with_context(|| format!("Failed to invalidate path on Fastly CDN: {path}"))?;
}
}

if let Some(cloudfront) = ctx.cloudfront() {
cloudfront
.invalidate_many(self.paths.clone())
.await
.context("Failed to invalidate paths on CloudFront CDN")?;
}

Ok(())
}
}
2 changes: 2 additions & 0 deletions src/worker/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod dump_db;
mod expiry_notification;
mod index;
mod index_version_downloads_archive;
mod invalidate_cdns;
mod readmes;
pub mod rss;
mod send_publish_notifications;
Expand All @@ -23,6 +24,7 @@ pub use self::dump_db::DumpDb;
pub use self::expiry_notification::SendTokenExpiryNotifications;
pub use self::index::{NormalizeIndex, SquashIndex, SyncToGitIndex, SyncToSparseIndex};
pub use self::index_version_downloads_archive::IndexVersionDownloadsArchive;
pub use self::invalidate_cdns::InvalidateCdns;
pub use self::readmes::RenderAndUploadReadme;
pub use self::send_publish_notifications::SendPublishNotificationsJob;
pub use self::sync_admins::SyncAdmins;
Expand Down
1 change: 1 addition & 0 deletions src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ impl RunnerExt for Runner<Arc<Environment>> {
.register_job_type::<jobs::DeleteCrateFromStorage>()
.register_job_type::<jobs::DumpDb>()
.register_job_type::<jobs::IndexVersionDownloadsArchive>()
.register_job_type::<jobs::InvalidateCdns>()
.register_job_type::<jobs::NormalizeIndex>()
.register_job_type::<jobs::ProcessCdnLog>()
.register_job_type::<jobs::ProcessCdnLogQueue>()
Expand Down