From 0fb15001e38d49dcdd19212efdf280ed873e8f0f Mon Sep 17 00:00:00 2001 From: Ethan Rodkin <ethan@viam.com> Date: Tue, 28 Jan 2025 11:49:52 -0500 Subject: [PATCH 1/8] ensure machine status is ready --- src/viam/robot/client.py | 62 +++++++++++++++++++++++++++++++--------- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/src/viam/robot/client.py b/src/viam/robot/client.py index 333cbcd17..caedbaf5c 100644 --- a/src/viam/robot/client.py +++ b/src/viam/robot/client.py @@ -264,6 +264,10 @@ async def _with_channel(cls, channel: Union[Channel, ViamChannel], options: Opti ) try: + # make sure the robot status isn't initializing (i.e., that the robot is ready for queries) + # before we try to refresh + while await self._check_still_initializing(): + pass await self.refresh() except Exception: LOGGER.error("Unable to establish a connection to the machine. Ensure the machine is online and reachable and try again.") @@ -281,6 +285,10 @@ async def _with_channel(cls, channel: Union[Channel, ViamChannel], options: Opti name=f"{viam._TASK_PREFIX}-robot_check_connection", ) + self._check_status_task = asyncio.create_task( + self._check_status(), name=f"{viam._TASK_PREFIX}-robot_check_status" + ) + return self _channel: Channel @@ -294,6 +302,7 @@ async def _with_channel(cls, channel: Union[Channel, ViamChannel], options: Opti _refresh_task: Optional[asyncio.Task] = None _check_connection_task: Optional[asyncio.Task] = None _resource_names: List[ResourceName] + _check_status_task: asyncio.Task _should_close_channel: bool _closed: bool = False _sessions_client: SessionsClient @@ -308,24 +317,27 @@ async def refresh(self): For more information, see `Machine Management API <https://docs.viam.com/appendix/apis/robot/>`_. """ + with self._lock: + self._refresh_inner() + + async def _refresh_inner(self): response: ResourceNamesResponse = await self._client.ResourceNames(ResourceNamesRequest()) resource_names: List[ResourceName] = list(response.resources) - with self._lock: - if resource_names == self._resource_names: - return - for rname in resource_names: - if rname.type not in [RESOURCE_TYPE_COMPONENT, RESOURCE_TYPE_SERVICE]: - continue - if rname.subtype == "remote": - continue + if resource_names == self._resource_names: + return + for rname in resource_names: + if rname.type not in [RESOURCE_TYPE_COMPONENT, RESOURCE_TYPE_SERVICE]: + continue + if rname.subtype == "remote": + continue - await self._create_or_reset_client(rname) + await self._create_or_reset_client(rname) - for rname in self.resource_names: - if rname not in resource_names: - await self._manager.remove_resource(rname) + for rname in self.resource_names: + if rname not in resource_names: + await self._manager.remove_resource(rname) - self._resource_names = resource_names + self._resource_names = resource_names async def _create_or_reset_client(self, resourceName: ResourceName): if resourceName in self._manager.resources: @@ -351,6 +363,30 @@ async def _create_or_reset_client(self, resourceName: ResourceName): except ResourceNotFoundError: pass + async def _check_still_initializing(self): + await asyncio.sleep(.1) + status = await self.get_machine_status() + return status.state == status.STATE_INITIALIZING + + async def _check_status(self): + while True: + # check to see if we're initializing. if not, then we're connected, and so we're good + if not await self._check_still_initializing(): + continue + + # at this point we know we're still initializing. so we want to take the lock to prevent + # attempts to do anything meaningful until we know the machine is ready for queries + with self._lock: + while await self._check_still_initializing(): + pass + + # now we know the machine is ready, but it's possible that we detected the need to + # ensure connection in the middle of a refresh call or after a refreh call completed, + # and therefore that the robot got an incorrect response from the `ResourceNames` call. + # So, we should refresh right now to make sure everything is correct, before we release + # the lock. + self._refresh_inner() + async def _refresh_every(self, interval: int): while True: await asyncio.sleep(interval) From 206b17d992782b1f49c70479760dd463232adab8 Mon Sep 17 00:00:00 2001 From: Ethan Rodkin <ethan@viam.com> Date: Tue, 28 Jan 2025 11:55:11 -0500 Subject: [PATCH 2/8] forgot awaits! --- src/viam/robot/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/viam/robot/client.py b/src/viam/robot/client.py index caedbaf5c..3ca1b9024 100644 --- a/src/viam/robot/client.py +++ b/src/viam/robot/client.py @@ -318,7 +318,7 @@ async def refresh(self): For more information, see `Machine Management API <https://docs.viam.com/appendix/apis/robot/>`_. """ with self._lock: - self._refresh_inner() + await self._refresh_inner() async def _refresh_inner(self): response: ResourceNamesResponse = await self._client.ResourceNames(ResourceNamesRequest()) @@ -385,7 +385,7 @@ async def _check_status(self): # and therefore that the robot got an incorrect response from the `ResourceNames` call. # So, we should refresh right now to make sure everything is correct, before we release # the lock. - self._refresh_inner() + await self._refresh_inner() async def _refresh_every(self, interval: int): while True: From dedf35488dafae28c682fd41de38844bdb0fb51f Mon Sep 17 00:00:00 2001 From: Ethan Rodkin <ethan@viam.com> Date: Tue, 28 Jan 2025 12:07:46 -0500 Subject: [PATCH 3/8] add mock robot for docs testing --- docs/examples/_server.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/examples/_server.py b/docs/examples/_server.py index a44084680..a30b630ec 100644 --- a/docs/examples/_server.py +++ b/docs/examples/_server.py @@ -6,6 +6,11 @@ from grpclib.server import Server, Stream from grpclib.utils import graceful_exit +from viam.proto.robot import ( + UnimplementedRobotServiceBase, + GetMachineStatusRequest, + GetMachineStatusResponse +) from viam.app.data_client import DataClient from viam.proto.app import ( AddRoleRequest, @@ -595,6 +600,11 @@ async def GetRegistryItem(self, stream: Stream[GetRegistryItemRequest, GetRegist raise NotImplementedError() +class MockRobot(UnimplementedRobotServiceBase): + async def GetMachineStatus(self, stream: Stream[GetMachineStatusRequest, GetMachineStatusResponse]) -> None: + await stream.send_message(GetMachineStatusResponse(state: GetMachineStatusResponse.STATE_RUNNING)) + + async def main(*, host: str = "127.0.0.1", port: int = 9092) -> None: server = Server([MockData(), MockDataSync(), MockApp()]) with graceful_exit([server]): From 2e6f0129c11c68930996b1e8880c3bc45d158537 Mon Sep 17 00:00:00 2001 From: Ethan Rodkin <ethan@viam.com> Date: Tue, 28 Jan 2025 12:10:14 -0500 Subject: [PATCH 4/8] forgot to add mock robot to server --- docs/examples/_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/examples/_server.py b/docs/examples/_server.py index a30b630ec..396105988 100644 --- a/docs/examples/_server.py +++ b/docs/examples/_server.py @@ -606,7 +606,7 @@ async def GetMachineStatus(self, stream: Stream[GetMachineStatusRequest, GetMach async def main(*, host: str = "127.0.0.1", port: int = 9092) -> None: - server = Server([MockData(), MockDataSync(), MockApp()]) + server = Server([MockData(), MockDataSync(), MockApp(), MockRobot()]) with graceful_exit([server]): await server.start(host, port) await server.wait_closed() From 01058e5c9d7df48a2418aa2ad8fb13da5b713ea5 Mon Sep 17 00:00:00 2001 From: Ethan Rodkin <ethan@viam.com> Date: Tue, 28 Jan 2025 13:28:08 -0500 Subject: [PATCH 5/8] undo uv.lock changes --- docs/examples/_server.py | 12 +----------- src/viam/robot/service.py | 7 +++++++ 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/docs/examples/_server.py b/docs/examples/_server.py index 396105988..a44084680 100644 --- a/docs/examples/_server.py +++ b/docs/examples/_server.py @@ -6,11 +6,6 @@ from grpclib.server import Server, Stream from grpclib.utils import graceful_exit -from viam.proto.robot import ( - UnimplementedRobotServiceBase, - GetMachineStatusRequest, - GetMachineStatusResponse -) from viam.app.data_client import DataClient from viam.proto.app import ( AddRoleRequest, @@ -600,13 +595,8 @@ async def GetRegistryItem(self, stream: Stream[GetRegistryItemRequest, GetRegist raise NotImplementedError() -class MockRobot(UnimplementedRobotServiceBase): - async def GetMachineStatus(self, stream: Stream[GetMachineStatusRequest, GetMachineStatusResponse]) -> None: - await stream.send_message(GetMachineStatusResponse(state: GetMachineStatusResponse.STATE_RUNNING)) - - async def main(*, host: str = "127.0.0.1", port: int = 9092) -> None: - server = Server([MockData(), MockDataSync(), MockApp(), MockRobot()]) + server = Server([MockData(), MockDataSync(), MockApp()]) with graceful_exit([server]): await server.start(host, port) await server.wait_closed() diff --git a/src/viam/robot/service.py b/src/viam/robot/service.py index 715cbe11f..74d7cd80b 100644 --- a/src/viam/robot/service.py +++ b/src/viam/robot/service.py @@ -8,6 +8,8 @@ from viam.errors import ViamGRPCError from viam.proto.common import ResourceName from viam.proto.robot import ( + GetMachineStatusRequest, + GetMachineStatusResponse, ResourceNamesRequest, ResourceNamesResponse, StopAllRequest, @@ -33,6 +35,11 @@ def _generate_metadata(self) -> List[ResourceName]: return list(md) + async def GetMachineStatus(self, stream: Stream[GetMachineStatusRequest, GetMachineStatusResponse]) -> None: + request = await stream.recv_message() + assert request is not None + await stream.send_message(GetMachineStatusResponse(state=GetMachineStatusResponse.STATE_RUNNING)) + async def ResourceNames(self, stream: Stream[ResourceNamesRequest, ResourceNamesResponse]) -> None: request = await stream.recv_message() assert request is not None From 793fa3eccc6bc27406033823b0c0180d5c6f730d Mon Sep 17 00:00:00 2001 From: Ethan Rodkin <ethan@viam.com> Date: Thu, 30 Jan 2025 13:17:23 -0500 Subject: [PATCH 6/8] add with_synchronous_connect option and wait_until_ready method --- src/viam/robot/client.py | 56 ++++++++++++++++++++-------------------- src/viam/rpc/dial.py | 5 ++++ 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/src/viam/robot/client.py b/src/viam/robot/client.py index 3ca1b9024..750637f7b 100644 --- a/src/viam/robot/client.py +++ b/src/viam/robot/client.py @@ -264,10 +264,11 @@ async def _with_channel(cls, channel: Union[Channel, ViamChannel], options: Opti ) try: - # make sure the robot status isn't initializing (i.e., that the robot is ready for queries) - # before we try to refresh - while await self._check_still_initializing(): - pass + if options.dial_options.with_synchronous_connect: + # the user has asked for a synchronous connect, so delay returning the robot + # client until it is running. + while await self._check_still_initializing(): + pass await self.refresh() except Exception: LOGGER.error("Unable to establish a connection to the machine. Ensure the machine is online and reachable and try again.") @@ -285,10 +286,6 @@ async def _with_channel(cls, channel: Union[Channel, ViamChannel], options: Opti name=f"{viam._TASK_PREFIX}-robot_check_connection", ) - self._check_status_task = asyncio.create_task( - self._check_status(), name=f"{viam._TASK_PREFIX}-robot_check_status" - ) - return self _channel: Channel @@ -302,7 +299,6 @@ async def _with_channel(cls, channel: Union[Channel, ViamChannel], options: Opti _refresh_task: Optional[asyncio.Task] = None _check_connection_task: Optional[asyncio.Task] = None _resource_names: List[ResourceName] - _check_status_task: asyncio.Task _should_close_channel: bool _closed: bool = False _sessions_client: SessionsClient @@ -368,25 +364,6 @@ async def _check_still_initializing(self): status = await self.get_machine_status() return status.state == status.STATE_INITIALIZING - async def _check_status(self): - while True: - # check to see if we're initializing. if not, then we're connected, and so we're good - if not await self._check_still_initializing(): - continue - - # at this point we know we're still initializing. so we want to take the lock to prevent - # attempts to do anything meaningful until we know the machine is ready for queries - with self._lock: - while await self._check_still_initializing(): - pass - - # now we know the machine is ready, but it's possible that we detected the need to - # ensure connection in the middle of a refresh call or after a refreh call completed, - # and therefore that the robot got an incorrect response from the `ResourceNames` call. - # So, we should refresh right now to make sure everything is correct, before we release - # the lock. - await self._refresh_inner() - async def _refresh_every(self, interval: int): while True: await asyncio.sleep(interval) @@ -473,6 +450,12 @@ async def _check_connection(self, check_every: int, reconnect_every: int): # We failed to reconnect, sys.exit() so that this thread doesn't stick around forever. sys.exit() + # we've hade to reconnect which means the machine may still be initializing, but + # there isn't a programmatic way for a user to detect this and therefore call + # expect changes in status or call `wait_until_ready`. So, let's make sure the machine + # is ready. + self.wait_until_ready() + def get_component(self, name: ResourceName) -> ComponentBase: """Get a component using its ResourceName. @@ -967,3 +950,20 @@ async def get_machine_status(self) -> GetMachineStatusResponse: request = GetMachineStatusRequest() return await self._client.GetMachineStatus(request) + + ###################### + # Wait Until Ready # + ###################### + + async def wait_until_ready(self): + """ + Waits until robot status is running, then returns. + + :: + + machine = await RobotClient.at_address(...) + await machine.wait_until_ready() + """ + + while await self._check_still_initializing(): + pass diff --git a/src/viam/rpc/dial.py b/src/viam/rpc/dial.py index f3fc5a1cc..71c19ea29 100644 --- a/src/viam/rpc/dial.py +++ b/src/viam/rpc/dial.py @@ -74,6 +74,9 @@ class DialOptions: """Number of seconds before the dial connection times out Set to 20sec to match _defaultOfferDeadline in goutils/rpc/wrtc_call_queue.go""" + with_synchronous_connect: bool = False + """If detected, delays return of a RobotClient until the machine status is running""" + def __init__( self, *, @@ -84,6 +87,7 @@ def __init__( allow_insecure_downgrade: bool = False, allow_insecure_with_creds_downgrade: bool = False, max_reconnect_attempts: int = 3, + with_synchronous_connect: bool = False, timeout: float = 20, ) -> None: self.disable_webrtc = disable_webrtc @@ -94,6 +98,7 @@ def __init__( self.allow_insecure_with_creds_downgrade = allow_insecure_with_creds_downgrade self.max_reconnect_attempts = max_reconnect_attempts self.timeout = timeout + self.with_synchronous_connect = with_synchronous_connect @classmethod def with_api_key(cls, api_key: str, api_key_id: str) -> Self: From 2ef1ce9f64e0ba083974ac673ca2a8bd009fd8ff Mon Sep 17 00:00:00 2001 From: Ethan Rodkin <ethan@viam.com> Date: Thu, 30 Jan 2025 13:22:57 -0500 Subject: [PATCH 7/8] cleanup --- src/viam/robot/client.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/viam/robot/client.py b/src/viam/robot/client.py index 750637f7b..1302ea78d 100644 --- a/src/viam/robot/client.py +++ b/src/viam/robot/client.py @@ -267,8 +267,7 @@ async def _with_channel(cls, channel: Union[Channel, ViamChannel], options: Opti if options.dial_options.with_synchronous_connect: # the user has asked for a synchronous connect, so delay returning the robot # client until it is running. - while await self._check_still_initializing(): - pass + await self.wait_until_ready() await self.refresh() except Exception: LOGGER.error("Unable to establish a connection to the machine. Ensure the machine is online and reachable and try again.") @@ -359,11 +358,6 @@ async def _create_or_reset_client(self, resourceName: ResourceName): except ResourceNotFoundError: pass - async def _check_still_initializing(self): - await asyncio.sleep(.1) - status = await self.get_machine_status() - return status.state == status.STATE_INITIALIZING - async def _refresh_every(self, interval: int): while True: await asyncio.sleep(interval) @@ -957,13 +951,15 @@ async def get_machine_status(self) -> GetMachineStatusResponse: async def wait_until_ready(self): """ - Waits until robot status is running, then returns. + Waits until robot is done initializing, then returns. :: machine = await RobotClient.at_address(...) await machine.wait_until_ready() """ - - while await self._check_still_initializing(): - pass + while True: + await asyncio.sleep(.1) + status = await self.get_machine_status() + if status.state != status.STATE_INITIALIZING: + return From 51dea8b2bd4ad383972871a49ac75d7e62a3e9a7 Mon Sep 17 00:00:00 2001 From: Ethan Rodkin <ethan@viam.com> Date: Thu, 30 Jan 2025 13:53:53 -0500 Subject: [PATCH 8/8] lint fix --- src/viam/robot/client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/viam/robot/client.py b/src/viam/robot/client.py index 1302ea78d..32e7f05a0 100644 --- a/src/viam/robot/client.py +++ b/src/viam/robot/client.py @@ -264,7 +264,8 @@ async def _with_channel(cls, channel: Union[Channel, ViamChannel], options: Opti ) try: - if options.dial_options.with_synchronous_connect: + synchronous_connect = options.dial_options.with_synchronous_connect if options.dial_options else False + if synchronous_connect: # the user has asked for a synchronous connect, so delay returning the robot # client until it is running. await self.wait_until_ready() @@ -448,7 +449,7 @@ async def _check_connection(self, check_every: int, reconnect_every: int): # there isn't a programmatic way for a user to detect this and therefore call # expect changes in status or call `wait_until_ready`. So, let's make sure the machine # is ready. - self.wait_until_ready() + await self.wait_until_ready() def get_component(self, name: ResourceName) -> ComponentBase: """Get a component using its ResourceName.