11"""Handles all communication with the PX4 autopilot using px4_msgs."""
2- from px4_msgs .msg import OffboardControlMode , TrajectorySetpoint , VehicleCommand , VehicleStatus
2+ from px4_msgs .msg import OffboardControlMode , TrajectorySetpoint , VehicleCommand , VehicleOdometry , VehicleStatus
33
44from rclpy .qos import DurabilityPolicy , HistoryPolicy , QoSProfile , ReliabilityPolicy
55
@@ -18,7 +18,9 @@ def __init__(self, node):
1818
1919 # Initialize variables
2020 self .vehicle_status = None
21+ self .vehicle_odometry = None
2122 self ._last_nav_state = None
23+ self ._last_odom_print_time = 0 # For tracking when to print position
2224
2325 px4_qos = QoSProfile (
2426 reliability = ReliabilityPolicy .BEST_EFFORT ,
@@ -27,16 +29,22 @@ def __init__(self, node):
2729 depth = 1
2830 )
2931
30- self .vehicle_command_publisher = self .node .create_publisher (
31- VehicleCommand , 'fmu/in/vehicle_command' , px4_qos )
32- self .offboard_control_mode_publisher = self .node .create_publisher (
33- OffboardControlMode , 'fmu/in/offboard_control_mode' , px4_qos )
34- self .trajectory_setpoint_publisher = self .node .create_publisher (
35- TrajectorySetpoint , 'fmu/in/trajectory_setpoint' , px4_qos )
36-
37- self .vehicle_status_subscriber = self .node .create_subscription (
38- VehicleStatus , 'fmu/out/vehicle_status_v1' , self .vehicle_status_callback ,
39- px4_qos )
32+ # Group communication interfaces
33+ self .publishers = {
34+ 'vehicle_command' : self .node .create_publisher (
35+ VehicleCommand , 'fmu/in/vehicle_command' , px4_qos ),
36+ 'offboard_control_mode' : self .node .create_publisher (
37+ OffboardControlMode , 'fmu/in/offboard_control_mode' , px4_qos ),
38+ 'trajectory_setpoint' : self .node .create_publisher (
39+ TrajectorySetpoint , 'fmu/in/trajectory_setpoint' , px4_qos )
40+ }
41+
42+ self .subscribers = {
43+ 'vehicle_status' : self .node .create_subscription (
44+ VehicleStatus , 'fmu/out/vehicle_status_v1' , self .vehicle_status_callback , px4_qos ),
45+ 'vehicle_odometry' : self .node .create_subscription (
46+ VehicleOdometry , 'fmu/out/vehicle_odometry' , self .vehicle_odometry_callback , px4_qos )
47+ }
4048
4149 def vehicle_status_callback (self , msg ):
4250 """Update vehicle status."""
@@ -58,6 +66,41 @@ def vehicle_status_callback(self, msg):
5866 self .node .get_logger ().info (f'Navigation state changed to: { current_state } ' )
5967 self ._last_nav_state = msg .nav_state
6068
69+ def vehicle_odometry_callback (self , msg ):
70+ """Update vehicle odometry."""
71+ self .vehicle_odometry = msg
72+
73+ def should_print_position (self , current_time_sec , print_interval = 1.0 ):
74+ """
75+ Check if we should print the position based on the time interval.
76+
77+ Args:
78+ current_time_sec: Current time in seconds
79+ print_interval: Interval in seconds between prints (default 1.0)
80+
81+ Returns:
82+ bool: True if position should be printed
83+ """
84+ if current_time_sec - self ._last_odom_print_time >= print_interval :
85+ self ._last_odom_print_time = current_time_sec
86+ return True
87+ return False
88+
89+ def get_current_position (self ):
90+ """
91+ Get the current position from vehicle odometry.
92+
93+ Returns:
94+ tuple: (x, y, z) position if available, otherwise None
95+ """
96+ if self .vehicle_odometry is not None :
97+ # The position is usually in the X, Y, Z fields of the pose
98+ x = self .vehicle_odometry .position [0 ]
99+ y = self .vehicle_odometry .position [1 ]
100+ z = self .vehicle_odometry .position [2 ]
101+ return (x , y , z )
102+ return None
103+
61104 # pylint: disable=unknown-option-value too-many-positional-arguments too-many-arguments
62105 def send_command (self , command ,
63106 param1 = float ('nan' ), param2 = float ('nan' ), param3 = float ('nan' ), param4 = float ('nan' ),
@@ -79,7 +122,7 @@ def send_command(self, command,
79122 vehicle_command .source_component = 1
80123 vehicle_command .from_external = True
81124
82- self .vehicle_command_publisher .publish (vehicle_command )
125+ self .publishers [ 'vehicle_command' ] .publish (vehicle_command )
83126
84127 def arm_vehicle (self ):
85128 """Send arm command to vehicle."""
@@ -98,7 +141,7 @@ def set_offboard_mode(self):
98141 offboard_control_mode .thrust_and_torque = False
99142 offboard_control_mode .direct_actuator = False
100143
101- self .offboard_control_mode_publisher .publish (offboard_control_mode )
144+ self .publishers [ 'offboard_control_mode' ] .publish (offboard_control_mode )
102145
103146 def send_offboard_mode_command (self ):
104147 """Send VEHICLE_CMD_DO_SET_MODE command to switch to offboard mode."""
@@ -142,7 +185,7 @@ def send_zero_velocity_setpoint(self):
142185 trajectory_setpoint .yaw = float ('nan' ) # Don't control yaw
143186 trajectory_setpoint .yawspeed = float ('nan' )
144187
145- self .trajectory_setpoint_publisher .publish (trajectory_setpoint )
188+ self .publishers [ 'trajectory_setpoint' ] .publish (trajectory_setpoint )
146189
147190 def send_velocity_setpoint (self , velocity ):
148191 """Send constant velocity setpoint."""
@@ -166,4 +209,5 @@ def send_velocity_setpoint(self, velocity):
166209 trajectory_setpoint .yaw = float ('nan' ) # Don't control yaw
167210 trajectory_setpoint .yawspeed = float ('nan' )
168211
169- self .trajectory_setpoint_publisher .publish (trajectory_setpoint )
212+ self .publishers ['trajectory_setpoint' ].publish (trajectory_setpoint )
213+
0 commit comments