diff --git a/src/viam/robot/client.py b/src/viam/robot/client.py index 333cbcd17..32e7f05a0 100644 --- a/src/viam/robot/client.py +++ b/src/viam/robot/client.py @@ -264,6 +264,11 @@ async def _with_channel(cls, channel: Union[Channel, ViamChannel], options: Opti ) try: + synchronous_connect = options.dial_options.with_synchronous_connect if options.dial_options else False + if synchronous_connect: + # the user has asked for a synchronous connect, so delay returning the robot + # client until it is running. + await self.wait_until_ready() await self.refresh() except Exception: LOGGER.error("Unable to establish a connection to the machine. Ensure the machine is online and reachable and try again.") @@ -308,24 +313,27 @@ async def refresh(self): For more information, see `Machine Management API `_. """ + with self._lock: + await self._refresh_inner() + + async def _refresh_inner(self): response: ResourceNamesResponse = await self._client.ResourceNames(ResourceNamesRequest()) resource_names: List[ResourceName] = list(response.resources) - with self._lock: - if resource_names == self._resource_names: - return - for rname in resource_names: - if rname.type not in [RESOURCE_TYPE_COMPONENT, RESOURCE_TYPE_SERVICE]: - continue - if rname.subtype == "remote": - continue + if resource_names == self._resource_names: + return + for rname in resource_names: + if rname.type not in [RESOURCE_TYPE_COMPONENT, RESOURCE_TYPE_SERVICE]: + continue + if rname.subtype == "remote": + continue - await self._create_or_reset_client(rname) + await self._create_or_reset_client(rname) - for rname in self.resource_names: - if rname not in resource_names: - await self._manager.remove_resource(rname) + for rname in self.resource_names: + if rname not in resource_names: + await self._manager.remove_resource(rname) - self._resource_names = resource_names + self._resource_names = resource_names async def _create_or_reset_client(self, resourceName: ResourceName): if resourceName in self._manager.resources: @@ -437,6 +445,12 @@ async def _check_connection(self, check_every: int, reconnect_every: int): # We failed to reconnect, sys.exit() so that this thread doesn't stick around forever. sys.exit() + # we've hade to reconnect which means the machine may still be initializing, but + # there isn't a programmatic way for a user to detect this and therefore call + # expect changes in status or call `wait_until_ready`. So, let's make sure the machine + # is ready. + await self.wait_until_ready() + def get_component(self, name: ResourceName) -> ComponentBase: """Get a component using its ResourceName. @@ -931,3 +945,22 @@ async def get_machine_status(self) -> GetMachineStatusResponse: request = GetMachineStatusRequest() return await self._client.GetMachineStatus(request) + + ###################### + # Wait Until Ready # + ###################### + + async def wait_until_ready(self): + """ + Waits until robot is done initializing, then returns. + + :: + + machine = await RobotClient.at_address(...) + await machine.wait_until_ready() + """ + while True: + await asyncio.sleep(.1) + status = await self.get_machine_status() + if status.state != status.STATE_INITIALIZING: + return diff --git a/src/viam/robot/service.py b/src/viam/robot/service.py index 715cbe11f..74d7cd80b 100644 --- a/src/viam/robot/service.py +++ b/src/viam/robot/service.py @@ -8,6 +8,8 @@ from viam.errors import ViamGRPCError from viam.proto.common import ResourceName from viam.proto.robot import ( + GetMachineStatusRequest, + GetMachineStatusResponse, ResourceNamesRequest, ResourceNamesResponse, StopAllRequest, @@ -33,6 +35,11 @@ def _generate_metadata(self) -> List[ResourceName]: return list(md) + async def GetMachineStatus(self, stream: Stream[GetMachineStatusRequest, GetMachineStatusResponse]) -> None: + request = await stream.recv_message() + assert request is not None + await stream.send_message(GetMachineStatusResponse(state=GetMachineStatusResponse.STATE_RUNNING)) + async def ResourceNames(self, stream: Stream[ResourceNamesRequest, ResourceNamesResponse]) -> None: request = await stream.recv_message() assert request is not None diff --git a/src/viam/rpc/dial.py b/src/viam/rpc/dial.py index f3fc5a1cc..71c19ea29 100644 --- a/src/viam/rpc/dial.py +++ b/src/viam/rpc/dial.py @@ -74,6 +74,9 @@ class DialOptions: """Number of seconds before the dial connection times out Set to 20sec to match _defaultOfferDeadline in goutils/rpc/wrtc_call_queue.go""" + with_synchronous_connect: bool = False + """If detected, delays return of a RobotClient until the machine status is running""" + def __init__( self, *, @@ -84,6 +87,7 @@ def __init__( allow_insecure_downgrade: bool = False, allow_insecure_with_creds_downgrade: bool = False, max_reconnect_attempts: int = 3, + with_synchronous_connect: bool = False, timeout: float = 20, ) -> None: self.disable_webrtc = disable_webrtc @@ -94,6 +98,7 @@ def __init__( self.allow_insecure_with_creds_downgrade = allow_insecure_with_creds_downgrade self.max_reconnect_attempts = max_reconnect_attempts self.timeout = timeout + self.with_synchronous_connect = with_synchronous_connect @classmethod def with_api_key(cls, api_key: str, api_key_id: str) -> Self: