Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-9798 - ensure machine status is ready #831

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 49 additions & 13 deletions src/viam/robot/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ async def _with_channel(cls, channel: Union[Channel, ViamChannel], options: Opti
)

try:
# make sure the robot status isn't initializing (i.e., that the robot is ready for queries)
# before we try to refresh
while await self._check_still_initializing():
pass
await self.refresh()
except Exception:
LOGGER.error("Unable to establish a connection to the machine. Ensure the machine is online and reachable and try again.")
Expand All @@ -281,6 +285,10 @@ async def _with_channel(cls, channel: Union[Channel, ViamChannel], options: Opti
name=f"{viam._TASK_PREFIX}-robot_check_connection",
)

self._check_status_task = asyncio.create_task(
self._check_status(), name=f"{viam._TASK_PREFIX}-robot_check_status"
)

purplenicole730 marked this conversation as resolved.
Show resolved Hide resolved
return self

_channel: Channel
Expand All @@ -294,6 +302,7 @@ async def _with_channel(cls, channel: Union[Channel, ViamChannel], options: Opti
_refresh_task: Optional[asyncio.Task] = None
_check_connection_task: Optional[asyncio.Task] = None
_resource_names: List[ResourceName]
_check_status_task: asyncio.Task
_should_close_channel: bool
_closed: bool = False
_sessions_client: SessionsClient
Expand All @@ -308,24 +317,27 @@ async def refresh(self):

For more information, see `Machine Management API <https://docs.viam.com/appendix/apis/robot/>`_.
"""
with self._lock:
await self._refresh_inner()

async def _refresh_inner(self):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has gone from being an important thing to a flyby. I still think that there's value abstractly in being able to call refresh from within a process that already has the lock so I'm leaving this, but I don't feel strongly and am happy to undo if others do.

response: ResourceNamesResponse = await self._client.ResourceNames(ResourceNamesRequest())
resource_names: List[ResourceName] = list(response.resources)
with self._lock:
if resource_names == self._resource_names:
return
for rname in resource_names:
if rname.type not in [RESOURCE_TYPE_COMPONENT, RESOURCE_TYPE_SERVICE]:
continue
if rname.subtype == "remote":
continue
if resource_names == self._resource_names:
return
for rname in resource_names:
if rname.type not in [RESOURCE_TYPE_COMPONENT, RESOURCE_TYPE_SERVICE]:
continue
if rname.subtype == "remote":
continue

await self._create_or_reset_client(rname)
await self._create_or_reset_client(rname)

for rname in self.resource_names:
if rname not in resource_names:
await self._manager.remove_resource(rname)
for rname in self.resource_names:
if rname not in resource_names:
await self._manager.remove_resource(rname)

self._resource_names = resource_names
self._resource_names = resource_names

async def _create_or_reset_client(self, resourceName: ResourceName):
if resourceName in self._manager.resources:
Expand All @@ -351,6 +363,30 @@ async def _create_or_reset_client(self, resourceName: ResourceName):
except ResourceNotFoundError:
pass

async def _check_still_initializing(self):
await asyncio.sleep(.1)
status = await self.get_machine_status()
return status.state == status.STATE_INITIALIZING

async def _check_status(self):
while True:
# check to see if we're initializing. if not, then we're connected, and so we're good
if not await self._check_still_initializing():
continue

# at this point we know we're still initializing. so we want to take the lock to prevent
# attempts to do anything meaningful until we know the machine is ready for queries
with self._lock:
while await self._check_still_initializing():
pass

# now we know the machine is ready, but it's possible that we detected the need to
# ensure connection in the middle of a refresh call or after a refreh call completed,
# and therefore that the robot got an incorrect response from the `ResourceNames` call.
# So, we should refresh right now to make sure everything is correct, before we release
# the lock.
await self._refresh_inner()

async def _refresh_every(self, interval: int):
while True:
await asyncio.sleep(interval)
Expand Down
7 changes: 7 additions & 0 deletions src/viam/robot/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from viam.errors import ViamGRPCError
from viam.proto.common import ResourceName
from viam.proto.robot import (
GetMachineStatusRequest,
GetMachineStatusResponse,
ResourceNamesRequest,
ResourceNamesResponse,
StopAllRequest,
Expand All @@ -33,6 +35,11 @@ def _generate_metadata(self) -> List[ResourceName]:

return list(md)

async def GetMachineStatus(self, stream: Stream[GetMachineStatusRequest, GetMachineStatusResponse]) -> None:
request = await stream.recv_message()
assert request is not None
await stream.send_message(GetMachineStatusResponse(state=GetMachineStatusResponse.STATE_RUNNING))
Comment on lines +38 to +41
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wanted to flag this, for the purposes of the documentation tests we need the service to have a GetMachineStatus implementation. But, I'm leaving it fairly barebones (it just always returns that we're running) because the issue we're trying to fix is one that exposes itself when using RDK as a server specifically.


async def ResourceNames(self, stream: Stream[ResourceNamesRequest, ResourceNamesResponse]) -> None:
request = await stream.recv_message()
assert request is not None
Expand Down