diff --git a/nexus/db-queries/src/db/datastore/volume.rs b/nexus/db-queries/src/db/datastore/volume.rs
index 008d7fdeb9f..b84da3aad17 100644
--- a/nexus/db-queries/src/db/datastore/volume.rs
+++ b/nexus/db-queries/src/db/datastore/volume.rs
@@ -467,9 +467,18 @@ impl DataStore {
     ) -> DeleteResult {
         use db::schema::volume::dsl;
 
+        let conn = self.pool_connection_unauthorized().await?;
+
+        // If running integration tests, assert that the volume we're about to
+        // hard delete has no volume usage records.
+        #[cfg(any(test, feature = "testing"))]
+        Self::validate_volume_has_no_usage_records(&conn, volume_id)
+            .await
+            .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?;
+
         diesel::delete(dsl::volume)
             .filter(dsl::id.eq(to_db_typed_uuid(volume_id)))
-            .execute_async(&*self.pool_connection_unauthorized().await?)
+            .execute_async(&*conn)
             .await
             .map(|_| ())
             .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
@@ -4061,6 +4070,32 @@ impl DataStore {
 
         Ok(())
     }
+
+    async fn validate_volume_has_no_usage_records(
+        conn: &async_bb8_diesel::Connection<DbConnection>,
+        volume_id: VolumeUuid,
+    ) -> Result<(), diesel::result::Error> {
+        use db::schema::volume_resource_usage::dsl;
+
+        let matching_usage_records: Vec<VolumeResourceUsage> =
+            dsl::volume_resource_usage
+                .filter(dsl::volume_id.eq(to_db_typed_uuid(volume_id)))
+                .select(VolumeResourceUsageRecord::as_select())
+                .get_results_async(conn)
+                .await?
+                .into_iter()
+                .map(|r| r.try_into().unwrap())
+                .collect();
+
+        if !matching_usage_records.is_empty() {
+            return Err(Self::volume_invariant_violated(format!(
+                "volume {volume_id} has matching usage records: {:?}",
+                matching_usage_records,
+            )));
+        }
+
+        Ok(())
+    }
 }
 
 #[cfg(test)]
diff --git a/nexus/src/app/sagas/region_snapshot_replacement_start.rs b/nexus/src/app/sagas/region_snapshot_replacement_start.rs
index d9e8a0349ba..ca2b7a7e8fa 100644
--- a/nexus/src/app/sagas/region_snapshot_replacement_start.rs
+++ b/nexus/src/app/sagas/region_snapshot_replacement_start.rs
@@ -911,6 +911,15 @@ async fn rsrss_new_region_volume_create_undo(
 
     let new_region_volume_id =
         sagactx.lookup::<VolumeUuid>("new_region_volume_id")?;
+
+    if osagactx.datastore().volume_get(new_region_volume_id).await?.is_some() {
+        // All the knowledge to unwind the resources created by this saga is in
+        // this saga, but use soft delete in order to keep volume resource usage
+        // records consistent (they would have been added in the volume create).
+        // Make sure to only call this if the volume still exists.
+        osagactx.datastore().soft_delete_volume(new_region_volume_id).await?;
+    }
+
     osagactx.datastore().volume_hard_delete(new_region_volume_id).await?;
 
     Ok(())
@@ -1008,6 +1017,15 @@ async fn rsrss_create_fake_volume_undo(
     // Delete the fake volume.
 
     let new_volume_id = sagactx.lookup::<VolumeUuid>("new_volume_id")?;
+
+    if osagactx.datastore().volume_get(new_volume_id).await?.is_some() {
+        // All the knowledge to unwind the resources created by this saga is in
+        // this saga, but use soft delete in order to keep volume resource usage
+        // records consistent (they would have been added in the volume create).
+        // Make sure to only call this if the volume still exists.
+        osagactx.datastore().soft_delete_volume(new_volume_id).await?;
+    }
+
     osagactx.datastore().volume_hard_delete(new_volume_id).await?;
 
     Ok(())
diff --git a/nexus/src/app/sagas/region_snapshot_replacement_step.rs b/nexus/src/app/sagas/region_snapshot_replacement_step.rs
index 5a3fdaed93e..18a10f26993 100644
--- a/nexus/src/app/sagas/region_snapshot_replacement_step.rs
+++ b/nexus/src/app/sagas/region_snapshot_replacement_step.rs
@@ -311,6 +311,15 @@ async fn rssrs_create_fake_volume_undo(
     // Delete the fake volume.
 
     let new_volume_id = sagactx.lookup::<VolumeUuid>("new_volume_id")?;
+
+    if osagactx.datastore().volume_get(new_volume_id).await?.is_some() {
+        // All the knowledge to unwind the resources created by this saga is in
+        // this saga, but use soft delete in order to keep volume resource usage
+        // records consistent (they would have been added in the volume create).
+        // Make sure to only call this if the volume still exists.
+        osagactx.datastore().soft_delete_volume(new_volume_id).await?;
+    }
+
     osagactx.datastore().volume_hard_delete(new_volume_id).await?;
 
     Ok(())
@@ -625,3 +634,411 @@ async fn rsrss_update_request_record(
 
     Ok(())
 }
+
+#[cfg(test)]
+pub(crate) mod test {
+    use crate::{
+        app::RegionAllocationStrategy, app::db::DataStore,
+        app::db::datastore::region_snapshot_replacement::InsertStepResult,
+        app::db::lookup::LookupPath, app::saga::create_saga_dag,
+        app::sagas::region_snapshot_replacement_garbage_collect,
+        app::sagas::region_snapshot_replacement_garbage_collect::*,
+        app::sagas::region_snapshot_replacement_start,
+        app::sagas::region_snapshot_replacement_start::*,
+        app::sagas::region_snapshot_replacement_step,
+        app::sagas::region_snapshot_replacement_step::*,
+        app::sagas::test_helpers::test_opctx,
+    };
+    use nexus_db_model::RegionSnapshotReplacement;
+    use nexus_db_model::RegionSnapshotReplacementState;
+    use nexus_db_model::RegionSnapshotReplacementStep;
+    use nexus_db_model::RegionSnapshotReplacementStepState;
+    use nexus_db_model::Volume;
+    use nexus_db_queries::authn::saga::Serialized;
+    use nexus_db_queries::context::OpContext;
+    use nexus_test_utils::resource_helpers::DiskTest;
+    use nexus_test_utils::resource_helpers::create_disk;
+    use nexus_test_utils::resource_helpers::create_disk_from_snapshot;
+    use nexus_test_utils::resource_helpers::create_project;
+    use nexus_test_utils::resource_helpers::create_snapshot;
+    use nexus_test_utils_macros::nexus_test;
+    use nexus_types::identity::Asset;
+    use sled_agent_client::VolumeConstructionRequest;
+
+    type ControlPlaneTestContext =
+        nexus_test_utils::ControlPlaneTestContext<crate::Server>;
+
+    const DISK_NAME: &str = "my-disk";
+    const DISK_FROM_SNAPSHOT_NAME: &str = "my-disk-from-snap";
+    const SNAPSHOT_NAME: &str = "my-snap";
+    const PROJECT_NAME: &str = "springfield-squidport";
+
+    /// Create four zpools, a disk, and a snapshot of that disk
+    async fn prepare_for_test(
+        cptestctx: &ControlPlaneTestContext,
+    ) -> PrepareResult {
+        let client = &cptestctx.external_client;
+        let nexus = &cptestctx.server.server_context().nexus;
+        let datastore = nexus.datastore();
+        let opctx = test_opctx(cptestctx);
+
+        let mut disk_test = DiskTest::new(cptestctx).await;
+        disk_test.add_zpool_with_dataset(cptestctx.first_sled_id()).await;
+
+        let _project_id =
+            create_project(&client, PROJECT_NAME).await.identity.id;
+
+        // Create a disk
+        let disk = create_disk(&client, PROJECT_NAME, DISK_NAME).await;
+
+        let disk_id = disk.identity.id;
+        let (.., db_disk) = LookupPath::new(&opctx, &datastore)
+            .disk_id(disk_id)
+            .fetch()
+            .await
+            .unwrap_or_else(|_| panic!("test disk {:?} should exist", disk_id));
+
+        // Create a snapshot
+        let snapshot =
+            create_snapshot(&client, PROJECT_NAME, DISK_NAME, SNAPSHOT_NAME)
+                .await;
+
+        let snapshot_id = snapshot.identity.id;
+
+        // Create a disk from that snapshot
+        let disk_from_snapshot = create_disk_from_snapshot(
+            &client,
+            PROJECT_NAME,
+            DISK_FROM_SNAPSHOT_NAME,
+            snapshot_id,
+        )
+        .await;
+
+        let (.., db_disk_from_snapshot) = LookupPath::new(&opctx, &datastore)
+            .disk_id(disk_from_snapshot.identity.id)
+            .fetch()
+            .await
+            .unwrap_or_else(|_| panic!("test disk {:?} should exist", disk_id));
+
+        // Replace one of the snapshot's targets
+        let disk_allocated_regions =
+            datastore.get_allocated_regions(db_disk.volume_id()).await.unwrap();
+
+        let region: &nexus_db_model::Region = &disk_allocated_regions[0].1;
+
+        let region_snapshot = datastore
+            .region_snapshot_get(region.dataset_id(), region.id(), snapshot_id)
+            .await
+            .unwrap()
+            .unwrap();
+
+        // Manually insert the region snapshot replacement request
+        let request =
+            RegionSnapshotReplacement::for_region_snapshot(&region_snapshot);
+
+        datastore
+            .insert_region_snapshot_replacement_request(&opctx, request.clone())
+            .await
+            .unwrap();
+
+        // Run the region snapshot replacement start saga
+        let dag = create_saga_dag::<SagaRegionSnapshotReplacementStart>(
+            region_snapshot_replacement_start::Params {
+                serialized_authn: Serialized::for_opctx(&opctx),
+                request: request.clone(),
+                allocation_strategy: RegionAllocationStrategy::Random {
+                    seed: None,
+                },
+            },
+        )
+        .unwrap();
+
+        let runnable_saga = nexus.sagas.saga_prepare(dag).await.unwrap();
+
+        // Actually run the saga
+        runnable_saga.run_to_completion().await.unwrap();
+
+        // Validate the state transition
+        let result = datastore
+            .get_region_snapshot_replacement_request_by_id(&opctx, request.id)
+            .await
+            .unwrap();
+
+        assert_eq!(
+            result.replacement_state,
+            RegionSnapshotReplacementState::ReplacementDone
+        );
+        assert!(result.new_region_id.is_some());
+        assert!(result.operating_saga_id.is_none());
+
+        // Next step of region snapshot replacement: calling the garbage collect
+        // saga to move the request into the Running state
+
+        let dag =
+            create_saga_dag::<SagaRegionSnapshotReplacementGarbageCollect>(
+                region_snapshot_replacement_garbage_collect::Params {
+                    serialized_authn: Serialized::for_opctx(&opctx),
+                    old_snapshot_volume_id: result
+                        .old_snapshot_volume_id()
+                        .unwrap(),
+                    request: result,
+                },
+            )
+            .unwrap();
+
+        let runnable_saga = nexus.sagas.saga_prepare(dag).await.unwrap();
+
+        runnable_saga.run_to_completion().await.unwrap();
+
+        // Validate the state transition
+        let result = datastore
+            .get_region_snapshot_replacement_request_by_id(&opctx, request.id)
+            .await
+            .unwrap();
+
+        assert_eq!(
+            result.replacement_state,
+            RegionSnapshotReplacementState::Running
+        );
+
+        // Manually insert the region snapshot replacement step
+
+        let InsertStepResult::Inserted { step_id } = datastore
+            .create_region_snapshot_replacement_step(
+                &opctx,
+                request.id,
+                db_disk_from_snapshot.volume_id(),
+            )
+            .await
+            .unwrap()
+        else {
+            panic!("InsertStepResult::Inserted not returned");
+        };
+
+        let step = datastore
+            .get_region_snapshot_replacement_step_by_id(&opctx, step_id)
+            .await
+            .unwrap();
+
+        PrepareResult { step, db_disk_from_snapshot }
+    }
+
+    struct PrepareResult {
+        step: RegionSnapshotReplacementStep,
+        db_disk_from_snapshot: nexus_db_model::Disk,
+    }
+
+    #[nexus_test(server = crate::Server)]
+    async fn test_region_snapshot_replacement_step_saga(
+        cptestctx: &ControlPlaneTestContext,
+    ) {
+        let PrepareResult { step, .. } = prepare_for_test(cptestctx).await;
+
+        let nexus = &cptestctx.server.server_context().nexus;
+        let datastore = nexus.datastore();
+        let opctx = test_opctx(cptestctx);
+
+        // Run the region snapshot replacement step saga
+
+        let dag = create_saga_dag::<SagaRegionSnapshotReplacementStep>(
+            region_snapshot_replacement_step::Params {
+                serialized_authn: Serialized::for_opctx(&opctx),
+                request: step.clone(),
+            },
+        )
+        .unwrap();
+
+        let runnable_saga = nexus.sagas.saga_prepare(dag).await.unwrap();
+
+        runnable_saga.run_to_completion().await.unwrap();
+
+        // Validate the state transition
+        let result = datastore
+            .get_region_snapshot_replacement_step_by_id(&opctx, step.id)
+            .await
+            .unwrap();
+
+        assert_eq!(
+            result.replacement_state,
+            RegionSnapshotReplacementStepState::Complete
+        );
+    }
+
+    #[nexus_test(server = crate::Server)]
+    async fn test_action_failure_can_unwind(
+        cptestctx: &ControlPlaneTestContext,
+    ) {
+        let PrepareResult { step, db_disk_from_snapshot } =
+            prepare_for_test(cptestctx).await;
+
+        let log = &cptestctx.logctx.log;
+        let nexus = &cptestctx.server.server_context().nexus;
+        let datastore = nexus.datastore();
+        let opctx = test_opctx(cptestctx);
+
+        let affected_volume_original = datastore
+            .volume_get(db_disk_from_snapshot.volume_id())
+            .await
+            .unwrap()
+            .unwrap();
+
+        verify_clean_slate(&cptestctx, &step, &affected_volume_original).await;
+
+        crate::app::sagas::test_helpers::action_failure_can_unwind::<
+            SagaRegionSnapshotReplacementStep,
+            _,
+            _,
+        >(
+            nexus,
+            || Box::pin(async { new_test_params(&opctx, &step) }),
+            || {
+                Box::pin(async {
+                    verify_clean_slate(
+                        &cptestctx,
+                        &step,
+                        &affected_volume_original,
+                    )
+                    .await;
+                })
+            },
+            log,
+        )
+        .await;
+    }
+
+    #[nexus_test(server = crate::Server)]
+    async fn test_action_failure_can_unwind_idempotently(
+        cptestctx: &ControlPlaneTestContext,
+    ) {
+        let PrepareResult { step, db_disk_from_snapshot } =
+            prepare_for_test(cptestctx).await;
+
+        let log = &cptestctx.logctx.log;
+        let nexus = &cptestctx.server.server_context().nexus;
+        let datastore = nexus.datastore();
+        let opctx = test_opctx(cptestctx);
+
+        let affected_volume_original = datastore
+            .volume_get(db_disk_from_snapshot.volume_id())
+            .await
+            .unwrap()
+            .unwrap();
+
+        verify_clean_slate(&cptestctx, &step, &affected_volume_original).await;
+
+        crate::app::sagas::test_helpers::action_failure_can_unwind_idempotently::<
+            SagaRegionSnapshotReplacementStep,
+            _,
+            _
+        >(
+            nexus,
+            || Box::pin(async { new_test_params(&opctx, &step) }),
+            || Box::pin(async {
+                verify_clean_slate(
+                    &cptestctx,
+                    &step,
+                    &affected_volume_original,
+                ).await;
+            }),
+            log
+        ).await;
+    }
+
+    #[nexus_test(server = crate::Server)]
+    async fn test_actions_succeed_idempotently(
+        cptestctx: &ControlPlaneTestContext,
+    ) {
+        let PrepareResult { step, db_disk_from_snapshot } =
+            prepare_for_test(cptestctx).await;
+
+        let nexus = &cptestctx.server.server_context().nexus;
+        let datastore = nexus.datastore();
+        let opctx = test_opctx(cptestctx);
+
+        let affected_volume_original = datastore
+            .volume_get(db_disk_from_snapshot.volume_id())
+            .await
+            .unwrap()
+            .unwrap();
+
+        verify_clean_slate(&cptestctx, &step, &affected_volume_original).await;
+
+        // Build the saga DAG with the provided test parameters
+        let params = new_test_params(&opctx, &step);
+        let dag = create_saga_dag::<SagaRegionSnapshotReplacementStep>(params)
+            .unwrap();
+        crate::app::sagas::test_helpers::actions_succeed_idempotently(
+            nexus, dag,
+        )
+        .await;
+    }
+
+    // helpers
+
+    fn new_test_params(
+        opctx: &OpContext,
+        request: &RegionSnapshotReplacementStep,
+    ) -> region_snapshot_replacement_step::Params {
+        region_snapshot_replacement_step::Params {
+            serialized_authn: Serialized::for_opctx(opctx),
+            request: request.clone(),
+        }
+    }
+
+    pub(crate) async fn verify_clean_slate(
+        cptestctx: &ControlPlaneTestContext,
+        request: &RegionSnapshotReplacementStep,
+        affected_volume_original: &Volume,
+    ) {
+        let datastore = cptestctx.server.server_context().nexus.datastore();
+
+        crate::app::sagas::test_helpers::assert_no_failed_undo_steps(
+            &cptestctx.logctx.log,
+            datastore,
+        )
+        .await;
+
+        assert_region_snapshot_replacement_step_untouched(
+            cptestctx, &datastore, &request,
+        )
+        .await;
+
+        assert_volume_untouched(&datastore, &affected_volume_original).await;
+    }
+
+    async fn assert_region_snapshot_replacement_step_untouched(
+        cptestctx: &ControlPlaneTestContext,
+        datastore: &DataStore,
+        request: &RegionSnapshotReplacementStep,
+    ) {
+        let opctx = test_opctx(cptestctx);
+        let db_request = datastore
+            .get_region_snapshot_replacement_step_by_id(&opctx, request.id)
+            .await
+            .unwrap();
+
+        assert_eq!(
+            db_request.replacement_state,
+            RegionSnapshotReplacementStepState::Requested
+        );
+    }
+
+    async fn assert_volume_untouched(
+        datastore: &DataStore,
+        affected_volume_original: &Volume,
+    ) {
+        let affected_volume = datastore
+            .volume_get(affected_volume_original.id())
+            .await
+            .unwrap()
+            .unwrap();
+
+        let actual: VolumeConstructionRequest =
+            serde_json::from_str(&affected_volume.data()).unwrap();
+
+        let expected: VolumeConstructionRequest =
+            serde_json::from_str(&affected_volume_original.data()).unwrap();
+
+        assert_eq!(actual, expected);
+    }
+}
diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs
index fd185b97402..7ab61655437 100644
--- a/nexus/src/app/sagas/snapshot_create.rs
+++ b/nexus/src/app/sagas/snapshot_create.rs
@@ -629,7 +629,9 @@ async fn ssc_create_destination_volume_record_undo(
     // resources. It's safe here to perform a volume hard delete without
     // decreasing the crucible resource count because the destination volume is
     // guaranteed to never have read only resources that require that
-    // accounting.
+    // accounting. This is the same reason that volume resource usage records
+    // are not created for the destination volume (they're only created for
+    // read-only resources!)
 
     info!(log, "hard deleting volume {}", destination_volume_id);