Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-9798 - ensure machine status is ready #831

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 46 additions & 13 deletions src/viam/robot/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ async def _with_channel(cls, channel: Union[Channel, ViamChannel], options: Opti
)

try:
synchronous_connect = options.dial_options.with_synchronous_connect if options.dial_options else False
if synchronous_connect:
# the user has asked for a synchronous connect, so delay returning the robot
# client until it is running.
await self.wait_until_ready()
await self.refresh()
except Exception:
LOGGER.error("Unable to establish a connection to the machine. Ensure the machine is online and reachable and try again.")
Expand Down Expand Up @@ -308,24 +313,27 @@ async def refresh(self):

For more information, see `Machine Management API <https://docs.viam.com/appendix/apis/robot/>`_.
"""
with self._lock:
await self._refresh_inner()

async def _refresh_inner(self):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has gone from being an important thing to a flyby. I still think that there's value abstractly in being able to call refresh from within a process that already has the lock so I'm leaving this, but I don't feel strongly and am happy to undo if others do.

response: ResourceNamesResponse = await self._client.ResourceNames(ResourceNamesRequest())
resource_names: List[ResourceName] = list(response.resources)
with self._lock:
if resource_names == self._resource_names:
return
for rname in resource_names:
if rname.type not in [RESOURCE_TYPE_COMPONENT, RESOURCE_TYPE_SERVICE]:
continue
if rname.subtype == "remote":
continue
if resource_names == self._resource_names:
return
for rname in resource_names:
if rname.type not in [RESOURCE_TYPE_COMPONENT, RESOURCE_TYPE_SERVICE]:
continue
if rname.subtype == "remote":
continue

await self._create_or_reset_client(rname)
await self._create_or_reset_client(rname)

for rname in self.resource_names:
if rname not in resource_names:
await self._manager.remove_resource(rname)
for rname in self.resource_names:
if rname not in resource_names:
await self._manager.remove_resource(rname)

self._resource_names = resource_names
self._resource_names = resource_names

async def _create_or_reset_client(self, resourceName: ResourceName):
if resourceName in self._manager.resources:
Expand Down Expand Up @@ -437,6 +445,12 @@ async def _check_connection(self, check_every: int, reconnect_every: int):
# We failed to reconnect, sys.exit() so that this thread doesn't stick around forever.
sys.exit()

# we've hade to reconnect which means the machine may still be initializing, but
# there isn't a programmatic way for a user to detect this and therefore call
# expect changes in status or call `wait_until_ready`. So, let's make sure the machine
# is ready.
await self.wait_until_ready()

def get_component(self, name: ResourceName) -> ComponentBase:
"""Get a component using its ResourceName.

Expand Down Expand Up @@ -931,3 +945,22 @@ async def get_machine_status(self) -> GetMachineStatusResponse:

request = GetMachineStatusRequest()
return await self._client.GetMachineStatus(request)

######################
# Wait Until Ready #
######################

async def wait_until_ready(self):
"""
Waits until robot is done initializing, then returns.

::

machine = await RobotClient.at_address(...)
await machine.wait_until_ready()
"""
while True:
await asyncio.sleep(.1)
status = await self.get_machine_status()
if status.state != status.STATE_INITIALIZING:
return
7 changes: 7 additions & 0 deletions src/viam/robot/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from viam.errors import ViamGRPCError
from viam.proto.common import ResourceName
from viam.proto.robot import (
GetMachineStatusRequest,
GetMachineStatusResponse,
ResourceNamesRequest,
ResourceNamesResponse,
StopAllRequest,
Expand All @@ -33,6 +35,11 @@ def _generate_metadata(self) -> List[ResourceName]:

return list(md)

async def GetMachineStatus(self, stream: Stream[GetMachineStatusRequest, GetMachineStatusResponse]) -> None:
request = await stream.recv_message()
assert request is not None
await stream.send_message(GetMachineStatusResponse(state=GetMachineStatusResponse.STATE_RUNNING))
Comment on lines +38 to +41
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wanted to flag this, for the purposes of the documentation tests we need the service to have a GetMachineStatus implementation. But, I'm leaving it fairly barebones (it just always returns that we're running) because the issue we're trying to fix is one that exposes itself when using RDK as a server specifically.


async def ResourceNames(self, stream: Stream[ResourceNamesRequest, ResourceNamesResponse]) -> None:
request = await stream.recv_message()
assert request is not None
Expand Down
5 changes: 5 additions & 0 deletions src/viam/rpc/dial.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ class DialOptions:
"""Number of seconds before the dial connection times out
Set to 20sec to match _defaultOfferDeadline in goutils/rpc/wrtc_call_queue.go"""

with_synchronous_connect: bool = False
"""If detected, delays return of a RobotClient until the machine status is running"""

def __init__(
self,
*,
Expand All @@ -84,6 +87,7 @@ def __init__(
allow_insecure_downgrade: bool = False,
allow_insecure_with_creds_downgrade: bool = False,
max_reconnect_attempts: int = 3,
with_synchronous_connect: bool = False,
timeout: float = 20,
) -> None:
self.disable_webrtc = disable_webrtc
Expand All @@ -94,6 +98,7 @@ def __init__(
self.allow_insecure_with_creds_downgrade = allow_insecure_with_creds_downgrade
self.max_reconnect_attempts = max_reconnect_attempts
self.timeout = timeout
self.with_synchronous_connect = with_synchronous_connect

@classmethod
def with_api_key(cls, api_key: str, api_key_id: str) -> Self:
Expand Down