-
Couldn't load subscription status.
- Fork 175
Add TakerSubscription #499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| } | ||
|
|
||
| impl SubscriptionHandle { | ||
| fn create<'a, T: Message>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this logic into SubscriptionHandle to be used by both SubscriptionBase and TakerSubscription
| /// | ||
| /// [1]: crate::NodeState::create_taker_subscription | ||
| /// [2]: crate::Executor::spin | ||
| pub struct TakerSubscription<T: Message> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Named TakerSubscription rather than the discussed SubscriptionTaker to be closer to WorkerSubscription
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I think SubscriptionTaker makes more sense grammatically.
In this case Taker is the main object, and Subscription is describing the taker. Whereas for WorkerSubscription, the Subscription is the main object, and Worker is describing the subscription.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want Take to come first, then maybe TakingSubscription could work grammatically. That way Taking is describing the Subscription.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, TakerSubscription does sound correct, Taker being the adjectival noun of the Subscription, like "worker ant". Won't die on the anthill though.
| } | ||
| } | ||
|
|
||
| impl<T: Message> RclPrimitive for Arc<TakerSubscription<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented for use with WaitSet, see examples in unit test below and in NodeState::create_taker_subscription docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One recommendation I would make for the executor is to include a watch sender and trigger it each time a new message is available. That way a user with a TakerSubscription could use Receiver::changed to .await an update signal so they know that a message is ready to be taken.
This will allow the TakerSubscription to work well with async programs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But also we can't use Arc<TakerSubscription<T>> for this because then the wait set will never know when to drop this from its list.
If you take a look at Subscription::lifecycle you'll see that the struct the user holds onto needs to contain a WaitableLifecycle. That struct keeps the associated RclPrimitive inside the wait set. Once the WaitableLifecycle gets dropped, the RclPrimitive will be removed from the wait set.
In the current state of this PR, this RclPrimitive will be dropped from the wait set immediately because you're dropping the lifecycle right away (and you named it _lifecycle up above to escape the compiler warning). Since this RclPrimitive implementation is doing nothing, I suppose it doesn't matter that it's being dropped from the wait set, but then we should do one of the following:
- Don't bother implementing
RclPrimitivehere and don't bother adding anything to the wait set for aTakerSubscription. - Follow the pattern of the other primitive implementations and create separate structs for the
RclPrimitiveversus the user-facing object. Then have theRclPrimitivejust trigger a signal so users can know when a message is available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This type of subscription is not added to the executor's WaitSet at all in the first place, which is why it doesn't hold a lifecycle. I implemented RclPrimitive so that users could leverage their own WaitSets.
you named it _lifecycle up above to escape the compiler warning
Naming it _lifecycle keeps it from being dropped until it leaves scope. Naming it _ causes it to be dropped immediately. I tried to mention this in the docs for create_taker_subscription.
Similar to your second suggestion, I considered adding some more functionality for waiting. In the end I opted to just expose the minimum functionality for using it in a WaitSet and put an example in the docs, since WaitSet is already public. Plus users can add multiple Waitables to the set (vs if we implemented something like TakerSubscription::wait()).
WRT to the async API, is something like this what you have in mind? We'd have to add a tokio dep and possibly gate it with a feature flag.
pub fn watch(self, context: &Context) -> tokio::sync::watch::Receiver<T> {
let (tx, rx) = tokio::sync::watch::channel(T::default());
tokio::spawn(async move {
let subscription = Arc::new(self);
let (waitable, _lifecycle) = Waitable::new(Box::new(Arc::clone(&subscription)), None);
let mut waitset = WaitSet::new(context).unwrap(); // could send Results instead to avoid unwraps
waitset.add([waitable])?;
loop {
waitset.wait(None, |_| Ok(())).unwrap();
tx.send(subscription.take().unwrap().unwrap());
}
});
rx
}ef57cc0 to
8b3d65e
Compare
|
@harrisonmg thanks for your contribution. Is this PR ready for review? If not, could you move this to a draft and let us know when it's ready? Thanks |
Unless anybody has comments on the API itself, I think it's ready for review. Sorry if I spammed CI notifications while I tried to fix those tests. |
I took a shot at implementing the API discussed in #493.
I didn't implement the
take_withmethod that was previously discussed becausercl_takecopies the new message into anRmwMessage, not aMessage. It is not obvious to me how to avoid constructing a newMessagewhen converting fromRmwMessagetoMessageviafrom_rmw_message.