Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ dmypy.json
# Cython debug symbols
cython_debug/

# debug stuff (Joe-Up GlamSon)
videos/
bag_files/

# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
Expand Down
19 changes: 19 additions & 0 deletions scripts/setup.bash
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,25 @@ alias start-controller='ros2 service call /pid_controller/enable std_srvs/srv/Se
alias stop-controller='ros2 service call /pid_controller/enable std_srvs/srv/SetBool "{data: false}"'
alias reset-controller="ros2 service call /pid_controller/reset std_srvs/srv/Empty"

# aliases for waypoints
wp-set() {
if [ $# -lt 1 ]; then
echo "missing waypoint name! should be: wp-set <waypoint_name>"
return 1
fi

ros2 service call /wp/set subjugator_msgs/srv/StringTrigger "{wp_name: '$1'}"
}

wp-goto() {
if [ $# -lt 1 ]; then
echo "missing waypoint name! should be: wp-goto <waypoint_name>"
return 1
fi

ros2 service call /wp/goto subjugator_msgs/srv/StringTrigger "{wp_name: '$1'}"
}

#explain
dropper() {
if [ $# -lt 1 ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@
from rclpy.node import Node
from robot_localization.srv import SetPose
from std_srvs.srv import Empty
from std_msgs.msg import Empty as TopicEmpty


class ResetLocalizationService(Node):

def __init__(self):
super().__init__("reset_localization_service")

# this topic is Joe Handsome origonal :)
self.reset_pub_ = self.create_publisher(TopicEmpty, "localization/was_reset", 10)

self.srv = self.create_service(
Empty,
"subjugator_localization/reset",
Expand All @@ -24,6 +29,10 @@ def __init__(self):
self.get_logger().info("service not available, waiting again...")

def reset_localization_callback(self, request, response):
# inform the world of the reset
msg = TopicEmpty()
self.reset_pub_.publish(msg)

self.get_logger().info("Incoming request to reset localization")
set_pose_request = SetPose.Request()
set_pose_request.pose = PoseWithCovarianceStamped()
Expand All @@ -38,6 +47,7 @@ def reset_localization_callback(self, request, response):
set_pose_request.pose.pose.covariance = [0.0] * 36
self.set_pose_client.call_async(set_pose_request)
response = Empty.Response()

return response


Expand Down
20 changes: 20 additions & 0 deletions src/subjugator/gnc/subjugator_waypoints/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>subjugator_waypoints</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="[email protected]">jh</maintainer>
<license>TODO: License declaration</license>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>

<depend>subjugator_msgs</depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>
Empty file.
4 changes: 4 additions & 0 deletions src/subjugator/gnc/subjugator_waypoints/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/subjugator_waypoints
[install]
install_scripts=$base/lib/subjugator_waypoints
26 changes: 26 additions & 0 deletions src/subjugator/gnc/subjugator_waypoints/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from setuptools import find_packages, setup

package_name = 'subjugator_waypoints'

setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='jh',
maintainer_email='[email protected]',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'subjugator_waypoints = subjugator_waypoints.wp_node:main'
],
},
)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Pose

class MyPose:
def __init__(self, x: float, y: float, z: float, i: float, j: float, k: float, w: float):
self.x: float = x
self.y: float = y
self.z: float = z
self.i: float = i
self.j: float = j
self.k: float = k
self.w: float = w

def as_geo_msg_pose(self) -> Pose:
p = Pose()
p.position.x = float(self.x)
p.position.y = float(self.y)
p.position.z = float(self.z)
p.orientation.x = float(self.i)
p.orientation.y = float(self.j)
p.orientation.z = float(self.k)
p.orientation.w = float(self.w)

return p

def pose_from_odom(odom: Odometry) -> MyPose:
return MyPose(
odom.pose.pose.position.x,
odom.pose.pose.position.y,
odom.pose.pose.position.z,
odom.pose.pose.orientation.x,
odom.pose.pose.orientation.y,
odom.pose.pose.orientation.z,
odom.pose.pose.orientation.w
)

def quaternion_conjugate(q: MyPose) -> MyPose:
return MyPose(0, 0, 0, -q.i, -q.j, -q.k, q.w)

def quaternion_multiply(q1: MyPose, q2: MyPose) -> MyPose:
i = q1.w * q2.i + q1.i * q2.w + q1.j * q2.k - q1.k * q2.j
j = q1.w * q2.j - q1.i * q2.k + q1.j * q2.w + q1.k * q2.i
k = q1.w * q2.k + q1.i * q2.j - q1.j * q2.i + q1.k * q2.w
w = q1.w * q2.w - q1.i * q2.i - q1.j * q2.j - q1.k * q2.k
return MyPose(0, 0, 0, i, j, k, w)

def rotate_point_by_quaternion(point: MyPose, q: MyPose) -> MyPose:
# Create a quaternion representing just the position (with w=0 for pure vector part)
p = MyPose(point.x, point.y, point.z, 0, 0, 0, 0)

# Perform rotation: q * p * q^-1
rotated = quaternion_multiply(quaternion_multiply(q, p), quaternion_conjugate(q))

# Return the rotated position
return MyPose(rotated.x, rotated.y, rotated.z, 0, 0, 0, 0)

def transform_pose(pose: MyPose, origin: MyPose) -> MyPose:
rel_x = pose.x - origin.x
rel_y = pose.y - origin.y
rel_z = pose.z - origin.z
rel_position = MyPose(rel_x, rel_y, rel_z, 0, 0, 0, 0)

origin_inv = quaternion_conjugate(origin)
rotated_pos = rotate_point_by_quaternion(rel_position, origin_inv)
new_rotation = quaternion_multiply(origin_inv, pose)

return MyPose(rotated_pos.x, rotated_pos.y, rotated_pos.z,
new_rotation.i, new_rotation.j, new_rotation.k, new_rotation.w)

def inverse_transform_pose(local_pose: MyPose, origin: MyPose) -> MyPose:
# First apply rotation to the position part
local_position = MyPose(local_pose.x, local_pose.y, local_pose.z, 0, 0, 0, 0)
rotated_pos = rotate_point_by_quaternion(local_position, origin)

# Then add the origin position
world_x = rotated_pos.x + origin.x
world_y = rotated_pos.y + origin.y
world_z = rotated_pos.z + origin.z

# Calculate world orientation by multiplying quaternions
world_orientation = quaternion_multiply(origin, local_pose)

return MyPose(world_x, world_y, world_z,
world_orientation.i, world_orientation.j, world_orientation.k, world_orientation.w)

def transform_poses(poses, origin):
return [transform_pose(p, origin) for p in poses]

class WaypointManager:
def __init__(self):
self.waypoints: dict[str, MyPose] = {}

# if the waypoint already exists, it is reset
def add_waypoint(self, wp_name: str, wp: MyPose):
self.waypoints[wp_name] = wp

def print(self):
for (key,val) in self.waypoints.items():
print(key + " ", val.x, " ",val.y, " ",val.z, " ",val.i, " ",val.j, " ",val.k, " ",val.w, " ")

# if the waypoint doesn't exist, does nothing
def remove_waypoint(self, wp_name: str):
del self.waypoints[wp_name]

def reset_localization(self, current_pose: MyPose):
for key, wp in self.waypoints.items():
# Calculate vector from current position to waypoint
dx = wp.x - current_pose.x
dy = wp.y - current_pose.y
dz = wp.z - current_pose.z

# For rotation, we need the conjugate of the quaternion
# The conjugate is (−i, −j, −k, w)
q_i = -current_pose.i
q_j = -current_pose.j
q_k = -current_pose.k
q_w = current_pose.w

# Rotate the vector using our new function
new_x, new_y, new_z = rotate_vector(dx, dy, dz, q_i, q_j, q_k, q_w)

# For orientation, multiply quaternions (using your existing function)
# Create orientation quaternion for waypoint
wp_quat = MyPose(0, 0, 0, wp.i, wp.j, wp.k, wp.w)

# Create conjugate quaternion for current pose
conj_quat = MyPose(0, 0, 0, q_i, q_j, q_k, q_w)

# Calculate new orientation
new_orientation = quaternion_multiply(conj_quat, wp_quat)

# Update the waypoint
self.waypoints[key] = MyPose(
new_x, new_y, new_z,
new_orientation.i, new_orientation.j, new_orientation.k, new_orientation.w
)

def rotate_vector(v_x, v_y, v_z, q_i, q_j, q_k, q_w):
"""Rotate a vector [x,y,z] by a quaternion [i,j,k,w]"""
# Formula from: https://en.wikipedia.org/wiki/Quaternions_and_spatial_rotation

# Pre-calculate some common terms
ii = q_i * q_i
jj = q_j * q_j
kk = q_k * q_k
ij = q_i * q_j
ik = q_i * q_k
jk = q_j * q_k
iw = q_i * q_w
jw = q_j * q_w
kw = q_k * q_w

# Calculate rotated vector
x = v_x * (1 - 2*(jj + kk)) + v_y * 2*(ij - kw) + v_z * 2*(ik + jw)
y = v_x * 2*(ij + kw) + v_y * (1 - 2*(ii + kk)) + v_z * 2*(jk - iw)
z = v_x * 2*(ik - jw) + v_y * 2*(jk + iw) + v_z * (1 - 2*(ii + jj))

return x, y, z
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import rclpy
from geometry_msgs.msg import Pose
from nav_msgs.msg import Odometry
from rclpy.node import Node
from std_msgs.msg import Empty
from subjugator_msgs.srv import StringTrigger

from subjugator_waypoints import wp_manager


class WpNode(Node):
def __init__(self):
super().__init__("wp_node")
self.wps = wp_manager.WaypointManager()

self.reset_sub_ = self.create_subscription(
Empty,
"localization/was_reset",
self.reset_cb,
10,
)

self.odom_sub_ = self.create_subscription(
Odometry,
"odometry/filtered",
self.odom_cb,
10,
)
self.current_pose: Odometry

self.goal_pose_pub_ = self.create_publisher(Pose, "/goal_pose", 10)

self.set_srv_ = self.create_service(StringTrigger, "wp/set", self.set_wp)
self.goto_srv_ = self.create_service(StringTrigger, "wp/goto", self.goto_wp)

def set_wp(self, req, res):
self.wps.add_waypoint(req.wp_name, wp_manager.pose_from_odom(self.current_pose))
self.get_logger().info(f"adding new waypoint: {req.wp_name}")
return res

def goto_wp(self, req, res):
wp_name = req.wp_name

wp_exists = wp_name in self.wps.waypoints
if not wp_exists:
self.get_logger().warn(f"waypoint: {wp_name} does not exist...")
return res

goal: Pose = self.wps.waypoints[wp_name].as_geo_msg_pose()
self.get_logger().info(f"going to waypoint: {req.wp_name}")

self.goal_pose_pub_.publish(goal)

return res

def reset_cb(self, _: Empty):
current_pose: wp_manager.MyPose = wp_manager.pose_from_odom(self.current_pose)

self.wps.reset_localization(current_pose)
self.get_logger().info("localization was reset!")

def odom_cb(self, msg: Odometry):
self.current_pose = msg


def main():
rclpy.init()
node = WpNode()
node.current_pose = Odometry()
rclpy.spin(node)
rclpy.shutdown()


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ def generate_launch_description():
output="both",
)

waypoints = Node(
package="subjugator_waypoints",
executable="subjugator_waypoints",
name="subjugator_waypoints",
output="both",
)

return LaunchDescription(
[
gui_cmd,
Expand All @@ -128,5 +135,6 @@ def generate_launch_description():
controller,
path_planner,
trajectory_planner,
waypoints,
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,11 @@ def generate_launch_description():
name="start_gate_server",
parameters=[],
),
Node(
package="subjugator_mission_planner",
executable="waypoint_server",
name="waypoint_server",
parameters=[],
),
],
)
Loading
Loading