Skip to content

Commit 42d6e74

Browse files
committed
Revert "Simplify subscription creation to remove a write lock"
This reverts commit dfd9e89.
1 parent 99ef8a6 commit 42d6e74

File tree

1 file changed

+27
-14
lines changed

1 file changed

+27
-14
lines changed

crates/corro-agent/src/api/public/pubsub.rs

+27-14
Original file line numberDiff line numberDiff line change
@@ -60,21 +60,34 @@ async fn sub_by_id(
6060
let (matcher, rx) = match matcher_rx {
6161
Some(matcher_rx) => matcher_rx,
6262
None => {
63-
if let Some(handle) = subs.remove(&id) {
64-
info!(sub_id = %id, "Removed subscription from sub_by_id");
65-
tokio::spawn(handle.cleanup());
66-
}
63+
// ensure this goes!
64+
let mut bcast_cache_write = bcast_cache.write().await;
65+
66+
if let Some(matcher_rx) = bcast_cache_write.get(&id).and_then(|tx| {
67+
subs.get(&id).map(|matcher| {
68+
debug!("found matcher by id {id}");
69+
(matcher, tx.subscribe())
70+
})
71+
}) {
72+
matcher_rx
73+
} else {
74+
bcast_cache_write.remove(&id);
75+
if let Some(handle) = subs.remove(&id) {
76+
info!(sub_id = %id, "Removed subscription from sub_by_id");
77+
tokio::spawn(handle.cleanup());
78+
}
6779

68-
return hyper::Response::builder()
69-
.status(StatusCode::NOT_FOUND)
70-
.body(
71-
serde_json::to_vec(&QueryEvent::Error(format_compact!(
72-
"could not find subscription with id {id}"
73-
)))
74-
.expect("could not serialize queries stream error")
75-
.into(),
76-
)
77-
.expect("could not build error response");
80+
return hyper::Response::builder()
81+
.status(StatusCode::NOT_FOUND)
82+
.body(
83+
serde_json::to_vec(&QueryEvent::Error(format_compact!(
84+
"could not find subscription with id {id}"
85+
)))
86+
.expect("could not serialize queries stream error")
87+
.into(),
88+
)
89+
.expect("could not build error response");
90+
}
7891
}
7992
};
8093

0 commit comments

Comments
 (0)