diff --git a/scripts/setup.bash b/scripts/setup.bash index bd62213e..bac7cbc6 100644 --- a/scripts/setup.bash +++ b/scripts/setup.bash @@ -269,3 +269,5 @@ torpedo() { ros2 service call /torpedo subjugator_msgs/srv/Servo "{angle: '$1'}" } + +export ROS_DOMAIN_ID=37 diff --git a/src/subjugator/gnc/subjugator_centroids/package.xml b/src/subjugator/gnc/subjugator_centroids/package.xml new file mode 100644 index 00000000..e34167ce --- /dev/null +++ b/src/subjugator/gnc/subjugator_centroids/package.xml @@ -0,0 +1,20 @@ + + + + subjugator_centroids + 0.0.0 + TODO: Package description + jh + TODO: License declaration + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + subjugator_msgs + + + ament_python + + diff --git a/src/subjugator/gnc/subjugator_centroids/random_code/green_tracker_cv2_example.py b/src/subjugator/gnc/subjugator_centroids/random_code/green_tracker_cv2_example.py new file mode 100644 index 00000000..d1737700 --- /dev/null +++ b/src/subjugator/gnc/subjugator_centroids/random_code/green_tracker_cv2_example.py @@ -0,0 +1,91 @@ +# this file isn't used and it's just an example of how to use cv2 to find centroids + +import cv2 +import numpy as np + + +def get_lime_green_centroid(image): + """ + Detect lime green areas in the image and return the centroid (x, y). + Returns None if no lime green region is detected. + """ + # Convert BGR to HSV + hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) + + # Define HSV range for lime green + lower_green = np.array([40, 100, 100]) + upper_green = np.array([80, 255, 255]) + + # Threshold the HSV image to get only green colors + mask = cv2.inRange(hsv, lower_green, upper_green) + + # Find contours in the mask + contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + if not contours: + return None, mask + + # Find the largest contour + largest = max(contours, key=cv2.contourArea) + + # Compute centroid + M = cv2.moments(largest) + if M["m00"] == 0: + return None, mask + + cx = int(M["m10"] / M["m00"]) + cy = int(M["m01"] / M["m00"]) + return (cx, cy), mask + + +def display_debug_image(image, mask, centroid): + """ + Displays the original image, the binary mask, and overlays the centroid (if found). + """ + # Clone the original image + display_img = image.copy() + + # Overlay centroid if it exists + if centroid: + cv2.circle(display_img, centroid, 5, (0, 0, 255), -1) + cv2.putText( + display_img, + f"Centroid: {centroid}", + (centroid[0] + 10, centroid[1]), + cv2.FONT_HERSHEY_SIMPLEX, + 0.5, + (0, 0, 255), + 2, + ) + + # Show images + cv2.imshow("Lime Green Detection", display_img) + cv2.imshow("Mask", mask) + + +def main(): + cam_path = "/dev/v4l/by-id/usb-Chicony_Tech._Inc._Dell_Webcam_WB7022_4962D17A78D6-video-index0" + cap = cv2.VideoCapture(cam_path) + + if not cap.isOpened(): + print("Cannot open camera") + return + + while True: + ret, frame = cap.read() + if not ret: + print("Failed to grab frame") + break + + centroid, mask = get_lime_green_centroid(frame) + display_debug_image(frame, mask, centroid) + + if cv2.waitKey(1) & 0xFF == ord("q"): + break + + cap.release() + cv2.destroyAllWindows() + + +if __name__ == "__main__": + main() diff --git a/src/subjugator/gnc/subjugator_centroids/resource/subjugator_centroids b/src/subjugator/gnc/subjugator_centroids/resource/subjugator_centroids new file mode 100644 index 00000000..e69de29b diff --git a/src/subjugator/gnc/subjugator_centroids/setup.cfg b/src/subjugator/gnc/subjugator_centroids/setup.cfg new file mode 100644 index 00000000..5df330d4 --- /dev/null +++ b/src/subjugator/gnc/subjugator_centroids/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/subjugator_centroids +[install] +install_scripts=$base/lib/subjugator_centroids diff --git a/src/subjugator/gnc/subjugator_centroids/setup.py b/src/subjugator/gnc/subjugator_centroids/setup.py new file mode 100644 index 00000000..11fe96ee --- /dev/null +++ b/src/subjugator/gnc/subjugator_centroids/setup.py @@ -0,0 +1,26 @@ +from setuptools import find_packages, setup + +package_name = 'subjugator_centroids' + +setup( + name=package_name, + version='0.0.0', + packages=find_packages(exclude=['test']), + data_files=[ + ('share/ament_index/resource_index/packages', + ['resource/' + package_name]), + ('share/' + package_name, ['package.xml']), + ], + install_requires=['setuptools'], + zip_safe=True, + maintainer='jh', + maintainer_email='nottellingu@gmail.com', + description='TODO: Package description', + license='TODO: License declaration', + tests_require=['pytest'], + entry_points={ + 'console_scripts': [ + 'subjugator_centroids = subjugator_centroids.subjugator_centroids_node:main' + ], + }, +) diff --git a/src/subjugator/gnc/subjugator_centroids/subjugator_centroids/__init__.py b/src/subjugator/gnc/subjugator_centroids/subjugator_centroids/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/subjugator/gnc/subjugator_centroids/subjugator_centroids/centroid_finder.py b/src/subjugator/gnc/subjugator_centroids/subjugator_centroids/centroid_finder.py new file mode 100644 index 00000000..d864cb29 --- /dev/null +++ b/src/subjugator/gnc/subjugator_centroids/subjugator_centroids/centroid_finder.py @@ -0,0 +1,19 @@ +from abc import ABC, abstractmethod + +from cv2.typing import MatLike + +""" +If you want to track some object, you need a class that implements this class +""" + + +class CentroidFinder(ABC): + @property + @abstractmethod + def topic_name(self) -> str: + pass + + # returns (x, y) pair of centroid or None if not found + @abstractmethod + def find_centroid(self, frame: MatLike) -> tuple[int, int] | None: + pass diff --git a/src/subjugator/gnc/subjugator_centroids/subjugator_centroids/green_tracker.py b/src/subjugator/gnc/subjugator_centroids/subjugator_centroids/green_tracker.py new file mode 100644 index 00000000..5b35643d --- /dev/null +++ b/src/subjugator/gnc/subjugator_centroids/subjugator_centroids/green_tracker.py @@ -0,0 +1,51 @@ +import cv2 +import numpy as np +from cv2.typing import MatLike + +from subjugator_centroids.centroid_finder import CentroidFinder + + +# example implementation of centroid abstract class, this one tracks green objects +class GreenTracker(CentroidFinder): + def __init__(self, topic_name: str): + self.topic_name_: str = topic_name + + @property + def topic_name(self) -> str: + return self.topic_name_ + + def find_centroid(self, frame: MatLike) -> tuple[int, int] | None: + """ + Detect lime green areas in the image and return the centroid (x, y). + Returns None if no lime green region is detected. + """ + # Convert BGR to HSV + hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) + + # Define HSV range for lime green + lower_green = np.array([40, 100, 100]) + upper_green = np.array([80, 255, 255]) + + # Threshold the HSV image to get only green colors + mask = cv2.inRange(hsv, lower_green, upper_green) + + # Find contours in the mask + contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + if not contours: + return None + + # Find the largest contour (ignore small ones) + largest = max(contours, key=cv2.contourArea) + if cv2.contourArea(largest) < 50: + return None + + # Compute centroid + M = cv2.moments(largest) + if M["m00"] == 0: + return None + + cx = int(M["m10"] / M["m00"]) + cy = int(M["m01"] / M["m00"]) + + return (cx, cy) diff --git a/src/subjugator/gnc/subjugator_centroids/subjugator_centroids/red_tracker.py b/src/subjugator/gnc/subjugator_centroids/subjugator_centroids/red_tracker.py new file mode 100644 index 00000000..641c8c9b --- /dev/null +++ b/src/subjugator/gnc/subjugator_centroids/subjugator_centroids/red_tracker.py @@ -0,0 +1,74 @@ +import cv2 +import numpy as np +from cv2.typing import MatLike + +from subjugator_centroids.centroid_finder import CentroidFinder + + +# example implementation of centroid abstract class, this one tracks green objects +class RedTracker(CentroidFinder): + def __init__(self, topic_name: str): + self.topic_name_: str = topic_name + self.debug = False + + @property + def topic_name(self) -> str: + return self.topic_name_ + + def find_centroid(self, frame: MatLike) -> tuple[int, int] | None: + """ + Detect red areas in the image and return the centroid (x, y). + Returns None if no red region is detected. + """ + # Convert BGR to HSV + hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) + + # Define HSV ranges for red (wraps around the 0° hue boundary) + lower_red1 = np.array([0, 140, 140]) + upper_red1 = np.array([8, 255, 255]) + + lower_red2 = np.array([172, 140, 140]) + upper_red2 = np.array([180, 255, 255]) + + # Create two masks and combine them + mask1 = cv2.inRange(hsv, lower_red1, upper_red1) + mask2 = cv2.inRange(hsv, lower_red2, upper_red2) + mask = cv2.bitwise_or(mask1, mask2) + + if self.debug: + cv2.imshow("Red Mask", mask) + cv2.waitKey(1) + + # Find contours in the mask + contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + if not contours: + return None + + # Find the largest contour (ignore small ones) + largest = max(contours, key=cv2.contourArea) + if cv2.contourArea(largest) < 50: + return None + + # Compute centroid + M = cv2.moments(largest) + if M["m00"] == 0: + return None + + cx = int(M["m10"] / M["m00"]) + cy = int(M["m01"] / M["m00"]) + + return (cx, cy) + +def test(): + gt = RedTracker("testing/rn/sry") + gt.debug = True + cap =cv2.VideoCapture(0) + while True: + ret, frame = cap.read() + if not ret: + print("i hate you") + gt.find_centroid(frame) + +if __name__ == "__main__": + test() diff --git a/src/subjugator/gnc/subjugator_centroids/subjugator_centroids/subjugator_centroids_node.py b/src/subjugator/gnc/subjugator_centroids/subjugator_centroids/subjugator_centroids_node.py new file mode 100644 index 00000000..443d6a05 --- /dev/null +++ b/src/subjugator/gnc/subjugator_centroids/subjugator_centroids/subjugator_centroids_node.py @@ -0,0 +1,128 @@ +# TODO: add the camera rotator to this and make it work w sub + +# node which has a bunch of classes that implement the CentroidFinder abstract class, it feeds them images and publishes results + +from typing import Dict, List + +import cv2 +import rclpy +from cv2.typing import MatLike +from rclpy.node import Node +from rclpy.publisher import Publisher +from subjugator_msgs.msg import Centroid + +from subjugator_centroids.centroid_finder import CentroidFinder +from subjugator_centroids.green_tracker import GreenTracker +from subjugator_centroids.red_tracker import RedTracker + + +def rotate_front_cam(frame: MatLike) -> MatLike: + PADDING = 100 + rotation_angle = 137 + # Add black padding around the image + padded = cv2.copyMakeBorder( + frame, + top=PADDING, + bottom=PADDING, + left=PADDING, + right=PADDING, + borderType=cv2.BORDER_CONSTANT, + value=(0, 0, 0), # black + ) + + # Get new dimensions + (h, w) = padded.shape[:2] + center = (w // 2, h // 2) + + # Get rotation matrix + M = cv2.getRotationMatrix2D(center, rotation_angle, 1.0) + + # Rotate the padded image + rotated = cv2.warpAffine(padded, M, (w, h)) + return rotated + + +class SubjugatorCentroidsNode(Node): + def __init__(self, trackers: List[CentroidFinder]): + super().__init__("subjugator_centroids_node") + self.pubs: Dict[str, Publisher] = {} + self.trackers: List[CentroidFinder] = trackers + + # create publishers for each tracker + for tracker in trackers: + self.pubs[tracker.topic_name] = self.create_publisher( + Centroid, + tracker.topic_name, + 10, + ) + + # start camera + cam_path = "/dev/v4l/by-id/usb-Chicony_Tech._Inc._Dell_Webcam_WB7022_4962D17A78D6-video-index0" + self.cap = cv2.VideoCapture(cam_path) + if not self.cap.isOpened(): + self.get_logger().error("could not open camera") + return + + self.search_for_centroids_forever() + + def search_for_centroids_forever(self): + # literally just run this function forever + while True: + rclpy.spin_once(self, timeout_sec=0.01) + + ret, frame = self.cap.read() + if not ret: + self.get_logger().warn("no frame from camera") + break + + frame = rotate_front_cam(frame) + + for tracker in self.trackers: + centroid = tracker.find_centroid(frame) + if centroid is None: # nothing to publish + continue + + height, width, _ = frame.shape + topic_name = tracker.topic_name + self.publish_centroid( + topic_name, + width, + height, + centroid[0], + centroid[1], + ) + + # image_width: images width in pixels + # image_height: images height in pixels + # cx: x position of the image centroid + # cy: y position of the image centroid + def publish_centroid( + self, + topic_name: str, + image_width: int, + image_height: int, + cx: int, + cy: int, + ): + c = Centroid() + c.image_height = image_height + c.image_width = image_width + c.centroid_x = cx + c.centroid_y = cy + self.pubs[topic_name].publish(c) + + +def main(): + # here is where you create all of the things you want to track + gt = GreenTracker("centroids/green") + rt = RedTracker("centroids/red") + + rclpy.init() + n = SubjugatorCentroidsNode([gt, rt]) + # rclpy.spin(n) node spins itself sry + n.cap.release() # free camera so cool + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/src/subjugator/subjugator_msgs/CMakeLists.txt b/src/subjugator/subjugator_msgs/CMakeLists.txt index a4a85a04..2584465f 100644 --- a/src/subjugator/subjugator_msgs/CMakeLists.txt +++ b/src/subjugator/subjugator_msgs/CMakeLists.txt @@ -21,6 +21,7 @@ rosidl_generate_interfaces( "msg/ThrusterEfforts.msg" "msg/Trajectory.msg" "msg/Waypoint.msg" + "msg/Centroid.msg" "srv/BMatrix.srv" "srv/OcrRequest.srv" "srv/TBDetectionSwitch.srv" diff --git a/src/subjugator/subjugator_msgs/msg/Centroid.msg b/src/subjugator/subjugator_msgs/msg/Centroid.msg new file mode 100644 index 00000000..adc03cab --- /dev/null +++ b/src/subjugator/subjugator_msgs/msg/Centroid.msg @@ -0,0 +1,9 @@ +# Centroid2D.msg + +# Image dimensions +int32 image_width +int32 image_height + +# Detected centroid +int32 centroid_x +int32 centroid_y diff --git a/src/subjugator/testing/centroid_yaw_tracker/centroid_yaw_tracker/__init__.py b/src/subjugator/testing/centroid_yaw_tracker/centroid_yaw_tracker/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/subjugator/testing/centroid_yaw_tracker/centroid_yaw_tracker/centroid_yaw_tracker_node.py b/src/subjugator/testing/centroid_yaw_tracker/centroid_yaw_tracker/centroid_yaw_tracker_node.py new file mode 100644 index 00000000..6b6be773 --- /dev/null +++ b/src/subjugator/testing/centroid_yaw_tracker/centroid_yaw_tracker/centroid_yaw_tracker_node.py @@ -0,0 +1,94 @@ +import rclpy +from geometry_msgs.msg import Pose +from nav_msgs.msg import Odometry +from rclpy.node import Node +from subjugator_msgs.msg import Centroid +from tf_transformations import euler_from_quaternion, quaternion_from_euler + + +class CentroidYawTracker(Node): + def __init__(self): + super().__init__("centroid_yaw_tracker") + self.spotted = False + + self.odom_sub_ = self.create_subscription( + Odometry, + "odometry/filtered", + self.odom_cb, + 10, + ) + self.last_odom: Odometry = Odometry() + + self.centroid_sub_ = self.create_subscription( + Centroid, + "centroids/green", + self.centroid_cb, + 10, + ) + self.last_centroid: Centroid = Centroid() + self.last_centroid.image_width = 640 + + self.goal_pose_pub = self.create_publisher(Pose, "goal_pose", 10) + + self.timer = self.create_timer(1, self.control_loop) + + def odom_cb(self, msg: Odometry): + self.last_odom = msg + + def centroid_cb(self, msg: Centroid): + self.last_centroid = msg + self.spotted = True + + def control_loop(self): + if not self.spotted: + return + + x_error_pixels = ( + self.last_centroid.image_width / 2 - self.last_centroid.centroid_x + ) + kp = 2.5 / (2 * self.last_centroid.image_width) + + print("---------") + print(x_error_pixels) + yaw_command = -x_error_pixels * kp + print(yaw_command) + print("---------") + + # Get current orientation from odometry + current_quat = self.last_odom.pose.pose.orientation + + # Convert quaternion to Euler angles + (roll, pitch, yaw) = euler_from_quaternion( + [current_quat.x, current_quat.y, current_quat.z, current_quat.w], + ) + + # Apply yaw command + desired_yaw = yaw + yaw_command + + # Convert back to quaternion + quat = quaternion_from_euler(roll, pitch, desired_yaw) + + # Create goal pose + goal_pose = Pose() + goal_pose.position.x = 0.0 + goal_pose.position.y = 0.0 + goal_pose.position.z = -0.1 + goal_pose.orientation.x = quat[0] + goal_pose.orientation.y = quat[1] + goal_pose.orientation.z = quat[2] + goal_pose.orientation.w = quat[3] + + # Publish the goal pose + self.goal_pose_pub.publish(goal_pose) + self.spotted = False + + +def main(): + rclpy.init() + node = CentroidYawTracker() + rclpy.spin(node) + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/src/subjugator/testing/centroid_yaw_tracker/package.xml b/src/subjugator/testing/centroid_yaw_tracker/package.xml new file mode 100644 index 00000000..275b3e83 --- /dev/null +++ b/src/subjugator/testing/centroid_yaw_tracker/package.xml @@ -0,0 +1,18 @@ + + + + centroid_yaw_tracker + 0.0.0 + TODO: Package description + jh + TODO: License declaration + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/src/subjugator/testing/centroid_yaw_tracker/resource/centroid_yaw_tracker b/src/subjugator/testing/centroid_yaw_tracker/resource/centroid_yaw_tracker new file mode 100644 index 00000000..e69de29b diff --git a/src/subjugator/testing/centroid_yaw_tracker/setup.cfg b/src/subjugator/testing/centroid_yaw_tracker/setup.cfg new file mode 100644 index 00000000..7a6566d7 --- /dev/null +++ b/src/subjugator/testing/centroid_yaw_tracker/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/centroid_yaw_tracker +[install] +install_scripts=$base/lib/centroid_yaw_tracker diff --git a/src/subjugator/testing/centroid_yaw_tracker/setup.py b/src/subjugator/testing/centroid_yaw_tracker/setup.py new file mode 100644 index 00000000..af0faf90 --- /dev/null +++ b/src/subjugator/testing/centroid_yaw_tracker/setup.py @@ -0,0 +1,25 @@ +from setuptools import find_packages, setup + +package_name = "centroid_yaw_tracker" + +setup( + name=package_name, + version="0.0.0", + packages=find_packages(exclude=["test"]), + data_files=[ + ("share/ament_index/resource_index/packages", ["resource/" + package_name]), + ("share/" + package_name, ["package.xml"]), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="jh", + maintainer_email="nottellingu@gmail.com", + description="TODO: Package description", + license="TODO: License declaration", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + "centroid_yaw_tracker = centroid_yaw_tracker.centroid_yaw_tracker_node:main", + ], + }, +) diff --git a/src/subjugator/testing/nav_channel/nav_channel/__init__.py b/src/subjugator/testing/nav_channel/nav_channel/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/subjugator/testing/nav_channel/nav_channel/nav_channel.py b/src/subjugator/testing/nav_channel/nav_channel/nav_channel.py new file mode 100644 index 00000000..e5742227 --- /dev/null +++ b/src/subjugator/testing/nav_channel/nav_channel/nav_channel.py @@ -0,0 +1,221 @@ +# todo you need to center on the thing too :( + +from rclpy.node import Node +import rclpy + +from nav_msgs.msg import Odometry +from geometry_msgs.msg import Pose, Point, Quaternion +from subjugator_msgs.msg import Centroid +from tf_transformations import euler_from_quaternion, quaternion_from_euler + +from copy import deepcopy + +import numpy as np +import math +import tf_transformations + +class NavChannel(Node): + def __init__(self): + super().__init__('nav_channel') + + self.spotted = False + + # centroid cb + self._centroid_cb = self.create_subscription(Centroid, "centroids/red", self.centroid_cb, 10) + self.recent_centroid = Centroid() + self.centroid_cb_counter = 0 # for detecting if we are seeing red rn + + # odom cb + self._odom_cb = self.create_subscription(Odometry, "odometry/filtered", self.odom_cb, 10) + self.recent_odom = Odometry() + + # goal pub + self. goal_pub = self.create_publisher(Pose, "goal_pose", 10) + + def centroid_cb(self, msg: Centroid): + self.recent_centroid = msg + self.centroid_cb_counter += 1 + # what is a Centriod type? Here: + # self.recent_centroid.centroid_x + # self.recent_centroid.centroid_y + # self.recent_centroid.image_width + # self.recent_centroid.image_height + + def odom_cb(self, msg: Odometry): + self.recent_odom = msg + + def control_loop(self) -> bool: + rclpy.spin_once(self, timeout_sec=1) + if not self.spotted: + return False + + x_error_pixels = ( + self.recent_centroid.image_width / 2 - self.recent_centroid.centroid_x + ) + + if x_error_pixels < 50: + return True + + kp = 2.5 / (2 * self.recent_centroid.image_width) + + print("---------") + print(x_error_pixels) + yaw_command = -x_error_pixels * kp + print(yaw_command) + print("---------") + + # Get current orientation from odometry + current_quat = self.recent_odom.pose.pose.orientation + + # Convert quaternion to Euler angles + (roll, pitch, yaw) = euler_from_quaternion( + [current_quat.x, current_quat.y, current_quat.z, current_quat.w], + ) + + # Apply yaw command + desired_yaw = yaw + yaw_command + + # Convert back to quaternion + quat = quaternion_from_euler(roll, pitch, desired_yaw) + + # Create goal pose + goal_pose = Pose() + goal_pose.position.x = 0.0 + goal_pose.position.y = 0.0 + goal_pose.position.z = -0.1 + goal_pose.orientation.x = quat[0] + goal_pose.orientation.y = quat[1] + goal_pose.orientation.z = quat[2] + goal_pose.orientation.w = quat[3] + + # Publish the goal pose + self.goal_pub.publish(goal_pose) + self.spotted = False + return False + + def send_goal_and_wait(self, goal: Pose): + self.goal_pub.publish(goal) + position_tolerance = 0.2 # in meters + orientation_tolerance = 0.1 # in radians + + while True: + rclpy.spin_once(self, timeout_sec=0.25) + # Calculate position error + current_pos = self.recent_odom.pose.pose.position + dx = current_pos.x - goal.position.x + dy = current_pos.y - goal.position.y + dz = current_pos.z - goal.position.z + position_error = math.sqrt(dx*dx + dy*dy + dz*dz) + + # Calculate orientation error using euler angles + current_q = self.recent_odom.pose.pose.orientation + goal_q = goal.orientation + # Convert quaternions to euler angles (roll, pitch, yaw) + current_euler = euler_from_quaternion([current_q.x, current_q.y, current_q.z, current_q.w]) + goal_euler = euler_from_quaternion([goal_q.x, goal_q.y, goal_q.z, goal_q.w]) + + roll_error = abs(current_euler[0] - goal_euler[0]) + pitch_error = abs(current_euler[1] - goal_euler[1]) + yaw_error = abs(current_euler[2] - goal_euler[2]) + + # Handle angle wrapping for all angles + roll_error = min(roll_error, 2 * math.pi - roll_error) + pitch_error = min(pitch_error, 2 * math.pi - pitch_error) + yaw_error = min(yaw_error, 2 * math.pi - yaw_error) + + # choose max + orientation_error = max(roll_error, pitch_error, yaw_error) + + # Check if close enough to goal + if position_error < position_tolerance and orientation_error < orientation_tolerance: + self.get_logger().info("Goal reached!") + break + + def take_current_odom_and_yaw_by_n_degrees(self, n) -> Pose: + # Get current pose from odometry + current_pose = self.recent_odom.pose.pose + current_q = current_pose.orientation + + # Convert current orientation to euler angles + current_euler = euler_from_quaternion( + [current_q.x, current_q.y, current_q.z, current_q.w], + ) + + # Add 10 degrees to yaw (left rotation is positive in ROS standard) + rotation_increment = math.radians(n) # 10 degrees in radians + new_yaw = current_euler[2] + rotation_increment + + # Convert back to quaternion + new_quat = quaternion_from_euler(current_euler[0], current_euler[1], new_yaw) + + # Create new pose with same position but rotated orientation + new_pose = Pose() + new_pose.position = current_pose.position + new_pose.orientation = Quaternion( + x=new_quat[0], + y=new_quat[1], + z=new_quat[2], + w=new_quat[3], + ) # get our current odom and rotate left 10 degrees + + return new_pose + + def take_current_odom_and_move_in_plus_y(self, n) -> Pose: + # Get current pose from odometry + current_pose = self.recent_odom.pose.pose + current_q = current_pose.orientation + + # Create new pose with same position but rotated orientation + new_pose = Pose() + new_pose.position = deepcopy(current_pose.position) + new_pose.position.y += n + new_pose.orientation = current_q + + return new_pose + + def rotate_left_until_15_centroids(self): + self.centroid_cb_counter = 0 # reset counter, it will increase when red is in frame + while True: + rclpy.spin_once(self, timeout_sec=0.1) + # rotate left 10 degrees + new_pose = self.take_current_odom_and_yaw_by_n_degrees(10) + self.send_goal_and_wait(new_pose) + if self.centroid_cb_counter > 15: # implies we have 15 frames of red implies we are seeing pvc pipe + return + + def slow_forward_until_centroid(self): + self.centroid_cb_counter = 0 # reset counter, it will increase when red is in frame + while True: + rclpy.spin_once(self, timeout_sec=0.1) + # rotate left 10 degrees + new_pose = self.take_current_odom_and_move_in_plus_y(0.45) + self.send_goal_and_wait(new_pose) + if self.centroid_cb_counter > 15: # implies we have 15 frames of red implies we are seeing pvc pipe + return + def do_nav_channel(self): + # turn until red in frame + self.rotate_left_until_15_centroids() + + # center on red + while True: + if self.control_loop() == True: + break + + # store current bearing + looking_at_red_pole: Odometry = deepcopy(self.recent_odom) + + # turn left a little (but also yaw right by 90 + new_pose = self.take_current_odom_and_yaw_by_n_degrees(13-90) + self.send_goal_and_wait(new_pose) + + # slowly go straight until red in view + self.slow_forward_until_centroid() + +def main(): + rclpy.init() + nc = NavChannel() + rclpy.spin(nc) + rclpy.shutdown() + +if __name__ == "__main__": + main() diff --git a/src/subjugator/testing/nav_channel/package.xml b/src/subjugator/testing/nav_channel/package.xml new file mode 100644 index 00000000..695890e2 --- /dev/null +++ b/src/subjugator/testing/nav_channel/package.xml @@ -0,0 +1,18 @@ + + + + nav_channel + 0.0.0 + TODO: Package description + sub9 + TODO: License declaration + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/src/subjugator/testing/nav_channel/resource/nav_channel b/src/subjugator/testing/nav_channel/resource/nav_channel new file mode 100644 index 00000000..e69de29b diff --git a/src/subjugator/testing/nav_channel/setup.cfg b/src/subjugator/testing/nav_channel/setup.cfg new file mode 100644 index 00000000..9095ea61 --- /dev/null +++ b/src/subjugator/testing/nav_channel/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/nav_channel +[install] +install_scripts=$base/lib/nav_channel diff --git a/src/subjugator/testing/nav_channel/setup.py b/src/subjugator/testing/nav_channel/setup.py new file mode 100644 index 00000000..a4918529 --- /dev/null +++ b/src/subjugator/testing/nav_channel/setup.py @@ -0,0 +1,23 @@ +from setuptools import find_packages, setup + +package_name = "nav_channel" + +setup( + name=package_name, + version="0.0.0", + packages=find_packages(exclude=["test"]), + data_files=[ + ("share/ament_index/resource_index/packages", ["resource/" + package_name]), + ("share/" + package_name, ["package.xml"]), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sub9", + maintainer_email="jgoodman1@ufl.edu", + description="TODO: Package description", + license="TODO: License declaration", + tests_require=["pytest"], + entry_points={ + "console_scripts": ["nav_channel = nav_channel.nav_channel:main"], + }, +) diff --git a/src/subjugator/testing/red_triangle/package.xml b/src/subjugator/testing/red_triangle/package.xml new file mode 100644 index 00000000..7baa801d --- /dev/null +++ b/src/subjugator/testing/red_triangle/package.xml @@ -0,0 +1,18 @@ + + + + red_triangle + 0.0.0 + TODO: Package description + jh + Apache-2.0 + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/src/subjugator/testing/red_triangle/red_triangle/__init__.py b/src/subjugator/testing/red_triangle/red_triangle/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/subjugator/testing/red_triangle/red_triangle/red_triangle.py b/src/subjugator/testing/red_triangle/red_triangle/red_triangle.py new file mode 100644 index 00000000..1cc8a46b --- /dev/null +++ b/src/subjugator/testing/red_triangle/red_triangle/red_triangle.py @@ -0,0 +1,253 @@ +# todo you need to center on the thing too :( + +from rclpy.node import Node +import rclpy + +from nav_msgs.msg import Odometry +from geometry_msgs.msg import Pose, Point, Quaternion +from subjugator_msgs.msg import Centroid +from tf_transformations import euler_from_quaternion, quaternion_from_euler + +from copy import deepcopy + +import numpy as np +import math +import tf_transformations + +class RedTriangle(Node): + def __init__(self): + super().__init__('red_triangle') + + # centroid cb + self._centroid_cb = self.create_subscription(Centroid, "centroids/red", self.centroid_cb, 10) + self.recent_centroid = Centroid() + self.centroid_cb_counter = 0 # for detecting if we are seeing red rn + + # odom cb + self._odom_cb = self.create_subscription(Odometry, "odometry/filtered", self.odom_cb, 10) + self.recent_odom = Odometry() + + # goal pub + self. goal_pub = self.create_publisher(Pose, "goal_pose", 10) + + def centroid_cb(self, msg: Centroid): + self.recent_centroid = msg + self.centroid_cb_counter += 1 + # what is a Centriod type? Here: + # self.recent_centroid.centroid_x + # self.recent_centroid.centroid_y + # self.recent_centroid.image_width + # self.recent_centroid.image_height + + def odom_cb(self, msg: Odometry): + self.recent_odom = msg + + def send_goal_and_wait(self, goal: Pose): + self.goal_pub.publish(goal) + position_tolerance = 0.2 # in meters + orientation_tolerance = 0.1 # in radians + + while True: + rclpy.spin_once(self, timeout_sec=0.25) + # Calculate position error + current_pos = self.recent_odom.pose.pose.position + dx = current_pos.x - goal.position.x + dy = current_pos.y - goal.position.y + dz = current_pos.z - goal.position.z + position_error = math.sqrt(dx*dx + dy*dy + dz*dz) + + # Calculate orientation error using euler angles + current_q = self.recent_odom.pose.pose.orientation + goal_q = goal.orientation + # Convert quaternions to euler angles (roll, pitch, yaw) + current_euler = euler_from_quaternion([current_q.x, current_q.y, current_q.z, current_q.w]) + goal_euler = euler_from_quaternion([goal_q.x, goal_q.y, goal_q.z, goal_q.w]) + + roll_error = abs(current_euler[0] - goal_euler[0]) + pitch_error = abs(current_euler[1] - goal_euler[1]) + yaw_error = abs(current_euler[2] - goal_euler[2]) + + # Handle angle wrapping for all angles + roll_error = min(roll_error, 2 * math.pi - roll_error) + pitch_error = min(pitch_error, 2 * math.pi - pitch_error) + yaw_error = min(yaw_error, 2 * math.pi - yaw_error) + + # choose max + orientation_error = max(roll_error, pitch_error, yaw_error) + + # Check if close enough to goal + if position_error < position_tolerance and orientation_error < orientation_tolerance: + self.get_logger().info("Goal reached!") + break + + def take_current_odom_and_yaw_by_n_degrees(self, n) -> Pose: + # Get current pose from odometry + current_pose = self.recent_odom.pose.pose + current_q = current_pose.orientation + + # Convert current orientation to euler angles + current_euler = euler_from_quaternion([current_q.x, current_q.y, current_q.z, current_q.w]) + + # Add 10 degrees to yaw (left rotation is positive in ROS standard) + rotation_increment = math.radians(n) # 10 degrees in radians + new_yaw = current_euler[2] + rotation_increment + + # Convert back to quaternion + new_quat = quaternion_from_euler(current_euler[0], current_euler[1], new_yaw) + + # Create new pose with same position but rotated orientation + new_pose = Pose() + new_pose.position = current_pose.position + new_pose.orientation = Quaternion(x=new_quat[0], y=new_quat[1], z=new_quat[2], w=new_quat[3]) # get our current odom and rotate left 10 degrees + + return new_pose + + def rotate_left_until_15_centroids(self): + self.centroid_cb_counter = 0 # reset counter, it will increase when red is in frame + while True: + rclpy.spin_once(self, timeout_sec=0.1) + # rotate left 10 degrees + new_pose = self.take_current_odom_and_yaw_by_n_degrees(10) + self.send_goal_and_wait(new_pose) + if self.centroid_cb_counter > 15: # implies we have 15 frames of red implies we are seeing pvc pipe + return + + # this function assumes the thing is already in frame + def attempt_to_triangulate(self) -> Point: + # first, store current position and centroid + pose1: Odometry = deepcopy(self.recent_odom) + centroid1: Centroid = deepcopy(self.recent_centroid) + + # next, move right in odom + new_pose = Pose() + new_pose.position = self.recent_odom.pose.pose.position + new_pose.orientation = self.recent_odom.pose.pose.orientation + new_pose.position.y -= 1.5 # move right by 1.5 meters + + # block until goal is good + self.send_goal_and_wait(new_pose) + + # rotate left until we see the object again + self.rotate_left_until_15_centroids() + + # second pose + pose2: Odometry = deepcopy(self.recent_odom) + centroid2: Centroid = deepcopy(self.recent_centroid) + + return self.generate_triangulation_guess(pose1, pose2, centroid1, centroid2) + + # sorry, chat wrote this entire function + def generate_triangulation_guess(self, pose1: Odometry, pose2: Odometry, c1: Centroid, c2: Centroid) -> Point: + """ + Triangulate the 3D position of an object using two different viewpoints. + + Args: + pose1: First robot pose when observing the object + pose2: Second robot pose when observing the object + c1: Centroid observation from first position + c2: Centroid observation from second position + + Returns: + Point: Estimated 3D position of the object + """ + # Extract camera parameters (these should be adjusted based on your camera) + # Field of view in radians + fov_h = math.radians(60) # Horizontal field of view + fov_v = math.radians(45) # Vertical field of view + + # Function to convert centroid to direction vector in robot frame + def centroid_to_direction(centroid, pose): + # Normalize centroid coordinates to [-1, 1] + norm_x = (centroid.centroid_x / centroid.image_width) * 2 - 1 + norm_y = (centroid.centroid_y / centroid.image_height) * 2 - 1 + + # Calculate angles based on normalized coordinates and FOV + angle_h = norm_x * (fov_h / 2) + angle_v = norm_y * (fov_v / 2) + + # Create direction vector in camera frame + # Assuming camera points forward along x-axis + # z is up, y is left (standard ROS convention) + camera_dir = np.array([ + math.cos(angle_v) * math.cos(angle_h), # x - forward + math.cos(angle_v) * math.sin(angle_h), # y - left + math.sin(angle_v) # z - up + ]) + + # Convert quaternion to rotation matrix to transform from camera to world frame + q = pose.pose.pose.orientation + quat = [q.x, q.y, q.z, q.w] + rotation_matrix = tf_transformations.quaternion_matrix(quat)[:3, :3] + + # Transform direction vector from camera frame to world frame + world_dir = np.dot(rotation_matrix, camera_dir) + + # Normalize the direction vector + return world_dir / np.linalg.norm(world_dir) + + # Get positions and direction vectors + pos1 = np.array([pose1.pose.pose.position.x, pose1.pose.pose.position.y, pose1.pose.pose.position.z]) + pos2 = np.array([pose2.pose.pose.position.x, pose2.pose.pose.position.y, pose2.pose.pose.position.z]) + + dir1 = centroid_to_direction(c1, pose1) + dir2 = centroid_to_direction(c2, pose2) + + # Line 1: pos1 + t1 * dir1 + # Line 2: pos2 + t2 * dir2 + # Find closest points between these two lines + + # Vector connecting the two camera positions + v = pos2 - pos1 + + # Calculate dot products + d1_dot_d2 = np.dot(dir1, dir2) + d1_dot_v = np.dot(dir1, v) + d2_dot_v = np.dot(dir2, v) + + # Calculate denominators + denom = 1 - d1_dot_d2**2 + + # Handle parallel lines + if abs(denom) < 1e-6: + self.get_logger().warning("Triangulation failed: rays are nearly parallel") + # Return midpoint between cameras as fallback + midpoint = (pos1 + pos2) / 2 + result = Point() + result.x, result.y, result.z = midpoint + return result + + # Calculate parameters for closest points on each line + t1 = (d1_dot_v - d2_dot_v * d1_dot_d2) / denom + t2 = (d1_dot_v * d1_dot_d2 - d2_dot_v) / denom + + # Calculate closest points on each ray + point1 = pos1 + dir1 * t1 + point2 = pos2 + dir2 * t2 + + # Calculate squared distance between the closest points + distance_squared = np.sum((point1 - point2)**2) + + # Check if the rays are too far apart + if distance_squared > 1.0: # 1.0 meter squared threshold + self.get_logger().warning(f"Triangulation may be inaccurate: rays don't intersect closely (dist²={distance_squared})") + + # Calculate midpoint as the triangulated position + triangulated = (point1 + point2) / 2 + + # Create and return Point message + result = Point() + result.x = float(triangulated[0]) + result.y = float(triangulated[1]) + result.z = float(triangulated[2]) + + self.get_logger().info(f"Triangulated position: ({result.x}, {result.y}, {result.z})") + return result + +def main(): + rclpy.init() + rt = RedTriangle() + rclpy.spin(rt) + rclpy.shutdown() + +if __name__ == "__main__": + main() diff --git a/src/subjugator/testing/red_triangle/resource/red_triangle b/src/subjugator/testing/red_triangle/resource/red_triangle new file mode 100644 index 00000000..e69de29b diff --git a/src/subjugator/testing/red_triangle/setup.cfg b/src/subjugator/testing/red_triangle/setup.cfg new file mode 100644 index 00000000..949098e1 --- /dev/null +++ b/src/subjugator/testing/red_triangle/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/red_triangle +[install] +install_scripts=$base/lib/red_triangle diff --git a/src/subjugator/testing/red_triangle/setup.py b/src/subjugator/testing/red_triangle/setup.py new file mode 100644 index 00000000..b3981ea0 --- /dev/null +++ b/src/subjugator/testing/red_triangle/setup.py @@ -0,0 +1,26 @@ +from setuptools import find_packages, setup + +package_name = 'red_triangle' + +setup( + name=package_name, + version='0.0.0', + packages=find_packages(exclude=['test']), + data_files=[ + ('share/ament_index/resource_index/packages', + ['resource/' + package_name]), + ('share/' + package_name, ['package.xml']), + ], + install_requires=['setuptools'], + zip_safe=True, + maintainer='jh', + maintainer_email='nottellingu@gmail.com', + description='TODO: Package description', + license='Apache-2.0', + tests_require=['pytest'], + entry_points={ + 'console_scripts': [ + 'red_triangle = red_triangle.red_triangle:main' + ], + }, +)