| 
24 | 24 | from nav2_msgs.action import DockRobot, NavigateToPose, UndockRobot  | 
25 | 25 | import pytest  | 
26 | 26 | import rclpy  | 
27 |  | -from rclpy.action import ActionClient, ActionServer  | 
 | 27 | +import rclpy.time  | 
 | 28 | +import rclpy.duration  | 
 | 29 | +from rclpy.action.client import ActionClient  | 
 | 30 | +from rclpy.action.server import ActionServer  | 
28 | 31 | from sensor_msgs.msg import BatteryState  | 
29 |  | -from tf2_ros import TransformBroadcaster  | 
 | 32 | +from tf2_ros import TransformBroadcaster, Buffer, TransformListener  | 
30 | 33 | 
 
  | 
31 | 34 | 
 
  | 
32 | 35 | # This test can be run standalone with:  | 
@@ -142,9 +145,40 @@ def nav_execute_callback(self, goal_handle):  | 
142 | 145 |         result.error_code = 0  | 
143 | 146 |         return result  | 
144 | 147 | 
 
  | 
 | 148 | +    def check_transform(self, timeout_sec, source_frame='odom', target_frame='base_link'):  | 
 | 149 | +        """Try to look up the transform we're publishing"""  | 
 | 150 | +        if self.transform_available:  | 
 | 151 | +            return True  | 
 | 152 | + | 
 | 153 | +        try:  | 
 | 154 | +            # Wait for transform to become available with timeout  | 
 | 155 | +            when = rclpy.time.Time()  | 
 | 156 | +            transform = self.tf_buffer.lookup_transform(  | 
 | 157 | +                target_frame,  | 
 | 158 | +                source_frame,  | 
 | 159 | +                when,  | 
 | 160 | +                timeout=rclpy.duration.Duration(seconds=timeout_sec)  | 
 | 161 | +            )  | 
 | 162 | + | 
 | 163 | +            self.node.get_logger().info('Successfully found transform:')  | 
 | 164 | +            self.node.get_logger().info(f'Translation: [{transform.transform.translation.x}, '  | 
 | 165 | +                                 f'{transform.transform.translation.y}, '  | 
 | 166 | +                                 f'{transform.transform.translation.z}]')  | 
 | 167 | +            self.transform_available = True  | 
 | 168 | +        except Exception as e:  | 
 | 169 | +            self.node.get_logger().error(f'Error looking up transform: {str(e)}')  | 
 | 170 | +            self.transform_available = False  | 
 | 171 | + | 
 | 172 | +        return self.transform_available  | 
 | 173 | + | 
145 | 174 |     def test_docking_server(self):  | 
146 | 175 |         # Publish TF for odometry  | 
147 | 176 |         self.tf_broadcaster = TransformBroadcaster(self.node)  | 
 | 177 | +        # Create tf buffer and listener  | 
 | 178 | +        self.tf_buffer = Buffer()  | 
 | 179 | +        self.tf_listener = TransformListener(self.tf_buffer, self.node)  | 
 | 180 | +        self.transform_available = False  | 
 | 181 | + | 
148 | 182 | 
 
  | 
149 | 183 |         # Create a timer to run "control loop" at 20hz  | 
150 | 184 |         self.timer = self.node.create_timer(0.05, self.timer_callback)  | 
@@ -175,20 +209,30 @@ def test_docking_server(self):  | 
175 | 209 |             'navigate_to_pose',  | 
176 | 210 |             self.nav_execute_callback)  | 
177 | 211 | 
 
  | 
178 |  | -        # Spin once so that TF is published  | 
179 |  | -        rclpy.spin_once(self.node, timeout_sec=0.1)  | 
 | 212 | +        # Publish transform  | 
 | 213 | +        self.publish()  | 
 | 214 | + | 
 | 215 | +        # Run for 2 seconds to allow tf to propogate  | 
 | 216 | +        for i in range(20):  | 
 | 217 | +            rclpy.spin_once(self.node, timeout_sec=0.1)  | 
 | 218 | +            time.sleep(0.1)  | 
 | 219 | + | 
 | 220 | +        assert self.check_transform(timeout_sec=1.0), 'transform not available'  | 
180 | 221 | 
 
  | 
181 | 222 |         # Test docking action  | 
182 | 223 |         self.action_result = []  | 
183 |  | -        self.dock_action_client.wait_for_server(timeout_sec=5.0)  | 
 | 224 | +        assert self.dock_action_client.wait_for_server(timeout_sec=5.0), \  | 
 | 225 | +          'dock_robot service not available'  | 
 | 226 | + | 
184 | 227 |         goal = DockRobot.Goal()  | 
185 | 228 |         goal.use_dock_id = True  | 
186 | 229 |         goal.dock_id = 'test_dock'  | 
187 | 230 |         future = self.dock_action_client.send_goal_async(  | 
188 | 231 |             goal, feedback_callback=self.action_feedback_callback)  | 
189 | 232 |         rclpy.spin_until_future_complete(self.node, future)  | 
190 | 233 |         self.goal_handle = future.result()  | 
191 |  | -        assert self.goal_handle.accepted  | 
 | 234 | +        assert self.goal_handle is not None, 'goal_handle should not be None'  | 
 | 235 | +        assert self.goal_handle.accepted, 'goal not accepted'  | 
192 | 236 |         result_future_original = self.goal_handle.get_result_async()  | 
193 | 237 | 
 
  | 
194 | 238 |         # Run for 2 seconds  | 
 | 
0 commit comments