Skip to content

Commit

Permalink
H8 and AI HAT support (#59)
Browse files Browse the repository at this point in the history
* Refactor applications and added pipeline helper functions.

* Added development guide.

* Adding tests to rpi example and the tests runner

* Adding install.sh file

* Adding hailo8 support

* pose h8 is set to yolov5s as default

* Post processes are included in this repo 

* moved args to rpi_common

* Added barcode video. All pipeline uses demo video as default input

* Updated H8 hefs. Added support for uint16 for pose postprocess

* updated doc for AI HAT.
---------

Co-authored-by: OmAz-AI <[email protected]>
Co-authored-by: OmriA <[email protected]>
Co-authored-by: ronithailo <[email protected]>
  • Loading branch information
4 people authored Oct 27, 2024
1 parent 123e675 commit c19d4ba
Show file tree
Hide file tree
Showing 31 changed files with 2,162 additions and 147 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Hailo Raspberry Pi 5 Examples

Welcome to the Hailo Raspberry Pi 5 Examples repository. This project showcases various examples demonstrating the capabilities of the Hailo AI processor on a Raspberry Pi 5. These examples will help you get started with AI on embedded devices.
The examples in this repository are designed to work with the Raspberry Pi AI Kit and AI HAT, supporting both the Hailo8 (26 TOPS) and Hailo8L (13 TOPS) AI processors. The examples can also be run on an x86_64 Ubuntu machine with the Hailo8/8L AI processor.
Visit the [Hailo Official Website](https://hailo.ai/) and [Hailo Community Forum](https://community.hailo.ai/) for more information.

## Table of Contents
Expand All @@ -29,11 +30,11 @@ Visit the [Hailo Official Website](https://hailo.ai/) and [Hailo Community Forum
- [License](#license)
- [Disclaimer](#disclaimer)

![Raspberry Pi 5 with Hailo M.2](doc/images/Raspberry_Pi_5_Hailo-8.png)
![Raspberry Pi 5 with Hailo AI HAT](doc/images/ai-hat-plus.jpg)

## Hailo Packages Installation

For installation instructions, see the [Hailo Raspberry Pi 5 installation guide](doc/install-raspberry-pi5.md#how-to-set-up-raspberry-pi-5-and-hailo-8l).
For installation instructions, see the [Hailo Raspberry Pi 5 installation guide](doc/install-raspberry-pi5.md#how-to-set-up-raspberry-pi-5-and-hailo).

### Hailo Version Upgrade Instructions

Expand Down
51 changes: 29 additions & 22 deletions basic_pipelines/detection_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@
get_default_parser,
QUEUE,
SOURCE_PIPELINE,
DETECTION_PIPELINE,
INFERENCE_PIPELINE,
INFERENCE_PIPELINE_WRAPPER,
USER_CALLBACK_PIPELINE,
DISPLAY_PIPELINE,
get_caps_from_pad,
get_numpy_from_buffer,
GStreamerApp,
app_callback_class,
dummy_callback,
detect_hailo_arch,
)



# -----------------------------------------------------------------------------------------------
# User Gstreamer Application
# -----------------------------------------------------------------------------------------------
Expand All @@ -32,18 +33,6 @@
class GStreamerDetectionApp(GStreamerApp):
def __init__(self, app_callback, user_data):
parser = get_default_parser()
# Add additional arguments here
parser.add_argument(
"--network",
default="yolov6n",
choices=['yolov6n', 'yolov8s'],
help="Which Network to use, default is yolov6n",
)
parser.add_argument(
"--hef-path",
default=None,
help="Path to HEF file",
)
parser.add_argument(
"--labels-json",
default=None,
Expand All @@ -61,15 +50,28 @@ def __init__(self, app_callback, user_data):
nms_score_threshold = 0.3
nms_iou_threshold = 0.45


# Determine the architecture if not specified
if args.arch is None:
detected_arch = detect_hailo_arch()
if detected_arch is None:
raise ValueError("Could not auto-detect Hailo architecture. Please specify --arch manually.")
self.arch = detected_arch
print(f"Auto-detected Hailo architecture: {self.arch}")
else:
self.arch = args.arch


if args.hef_path is not None:
self.hef_path = args.hef_path
# Set the HEF file path based on the network
elif args.network == "yolov6n":
self.hef_path = os.path.join(self.current_path, '../resources/yolov6n.hef')
elif args.network == "yolov8s":
# Set the HEF file path based on the arch
elif self.arch == "hailo8":
self.hef_path = os.path.join(self.current_path, '../resources/yolov8m.hef')
else: # hailo8l
self.hef_path = os.path.join(self.current_path, '../resources/yolov8s_h8l.hef')
else:
assert False, "Invalid network type"

# Set the post-processing shared object file
self.post_process_so = os.path.join(self.current_path, '../resources/libyolo_hailortpp_postprocess.so')

# User-defined label JSON file
self.labels_json = args.labels_json
Expand All @@ -89,7 +91,12 @@ def __init__(self, app_callback, user_data):

def get_pipeline_string(self):
source_pipeline = SOURCE_PIPELINE(self.video_source)
detection_pipeline = DETECTION_PIPELINE(hef_path=self.hef_path, batch_size=self.batch_size, labels_json=self.labels_json, additional_params=self.thresholds_str)
detection_pipeline = INFERENCE_PIPELINE(
hef_path=self.hef_path,
post_process_so=self.post_process_so,
batch_size=self.batch_size,
config_json=self.labels_json,
additional_params=self.thresholds_str)
user_callback_pipeline = USER_CALLBACK_PIPELINE()
display_pipeline = DISPLAY_PIPELINE(video_sink=self.video_sink, sync=self.sync, show_fps=self.show_fps)
pipeline_string = (
Expand Down
36 changes: 36 additions & 0 deletions basic_pipelines/get_usb_camera.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import os
import subprocess

# if udevadm is not installed, install it using the following command:
# sudo apt-get install udev


def get_usb_video_devices():
"""
Get a list of video devices that are connected via USB and have video capture capability.
"""
video_devices = [f'/dev/{device}' for device in os.listdir('/dev') if device.startswith('video')]
usb_video_devices = []

for device in video_devices:
try:
# Use udevadm to get detailed information about the device
udevadm_cmd = ["udevadm", "info", "--query=all", "--name=" + device]
result = subprocess.run(udevadm_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output = result.stdout.decode('utf-8')

# Check if the device is connected via USB and has video capture capabilities
if "ID_BUS=usb" in output and ":capture:" in output:
usb_video_devices.append(device)
except Exception as e:
print(f"Error checking device {device}: {e}")

return usb_video_devices

if __name__ == "__main__":
usb_video_devices = get_usb_video_devices()

if usb_video_devices:
print(f"USB cameras found on: {', '.join(usb_video_devices)}")
else:
print("No available USB cameras found.")
61 changes: 40 additions & 21 deletions basic_pipelines/hailo_rpi_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import cv2
import time
import signal
import subprocess

# Try to import hailo python module
try:
Expand Down Expand Up @@ -66,6 +67,30 @@ def dummy_callback(pad, info, user_data):
# -----------------------------------------------------------------------------------------------
# Common functions
# -----------------------------------------------------------------------------------------------
def detect_hailo_arch():
try:
# Run the hailortcli command to get device information
result = subprocess.run(['hailortcli', 'fw-control', 'identify'], capture_output=True, text=True)

# Check if the command was successful
if result.returncode != 0:
print(f"Error running hailortcli: {result.stderr}")
return None

# Search for the "Device Architecture" line in the output
for line in result.stdout.split('\n'):
if "Device Architecture" in line:
if "HAILO8L" in line:
return "hailo8l"
elif "HAILO8" in line:
return "hailo8"

print("Could not determine Hailo architecture from device information.")
return None
except Exception as e:
print(f"An error occurred while detecting Hailo architecture: {e}")
return None

def get_caps_from_pad(pad: Gst.Pad):
caps = pad.get_current_caps()
if caps:
Expand All @@ -91,14 +116,27 @@ def display_user_data_frame(user_data: app_callback_class):

def get_default_parser():
parser = argparse.ArgumentParser(description="Hailo App Help")
current_path = os.path.dirname(os.path.abspath(__file__))
default_video_source = os.path.join(current_path, '../resources/detection0.mp4')
parser.add_argument(
"--input", "-i", type=str, default="/dev/video0",
"--input", "-i", type=str, default=default_video_source,
help="Input source. Can be a file, USB or RPi camera (CSI camera module). \
For RPi camera use '-i rpi' (Still in Beta). \
Defaults to /dev/video0"
Defaults to example video resources/detection0.mp4"
)
parser.add_argument("--use-frame", "-u", action="store_true", help="Use frame from the callback function")
parser.add_argument("--show-fps", "-f", action="store_true", help="Print FPS on sink")
parser.add_argument(
"--arch",
default=None,
choices=['hailo8', 'hailo8l'],
help="Specify the Hailo architecture (hailo8 or hailo8l). Default is None , app will run check.",
)
parser.add_argument(
"--hef-path",
default=None,
help="Path to HEF file",
)
parser.add_argument(
"--disable-sync", action="store_true",
help="Disables display sink sync, will run as fast as possible. Relevant when using file source."
Expand Down Expand Up @@ -227,25 +265,6 @@ def INFERENCE_PIPELINE(hef_path, post_process_so, batch_size=1, config_json=None

return inference_pipeline

def DETECTION_PIPELINE(hef_path, batch_size=1, labels_json=None, additional_params='', name='detection'):
"""
Creates a GStreamer pipeline string for detection inference, using HailoRT post-processing.
This pipeline is compatible with detection models which is compiled with HailoRT post-processing.
Args:
hef_path (str): The path to the HEF file.
batch_size (int, optional): The batch size for the hailonet element. Defaults to 1.
labels_json (str, optional): The path to the labels JSON file. If None, no labels are added. Defaults to None.
additional_params (str, optional): Additional parameters for the hailonet element. Defaults to ''.
name (str, optional): The prefix name for the pipeline elements. Defaults to 'detection'.
Returns:
str: A string representing the GStreamer pipeline for detection.
"""
post_process_so = os.path.join(os.environ.get('TAPPAS_POST_PROC_DIR', ''), 'libyolo_hailortpp_post.so')
detection_pipeline = INFERENCE_PIPELINE(hef_path=hef_path, post_process_so=post_process_so, batch_size=batch_size, config_json=labels_json, additional_params=additional_params, name=name)
return detection_pipeline

def INFERENCE_PIPELINE_WRAPPER(inner_pipeline, bypass_max_size_buffers=20, name='inference_wrapper'):
"""
Creates a GStreamer pipeline string that wraps an inner pipeline with a hailocropper and hailoaggregator.
Expand Down
36 changes: 31 additions & 5 deletions basic_pipelines/instance_segmentation_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
INFERENCE_PIPELINE,
USER_CALLBACK_PIPELINE,
DISPLAY_PIPELINE,
get_caps_from_pad,
get_numpy_from_buffer,
GStreamerApp,
app_callback_class,
dummy_callback,
detect_hailo_arch,
)

#-----------------------------------------------------------------------------------------------
Expand All @@ -41,9 +40,34 @@ def __init__(self, app_callback, user_data):
self.network_width = 640
self.network_height = 640
self.network_format = "RGB"
self.default_post_process_so = os.path.join(self.postprocess_dir, 'libyolov5seg_post.so')

# Determine the architecture if not specified
if args.arch is None:
detected_arch = detect_hailo_arch()
if detected_arch is None:
raise ValueError("Could not auto-detect Hailo architecture. Please specify --arch manually.")
self.arch = detected_arch
print(f"Auto-detected Hailo architecture: {self.arch}")
else:
self.arch = args.arch

# Set the HEF file path based on the architecture
if args.hef_path:
self.hef_path = args.hef_path
elif self.arch == "hailo8":
self.hef_path = os.path.join(self.current_path, '../resources/yolov5m_seg.hef')
else: # hailo8l
self.hef_path = os.path.join(self.current_path, '../resources/yolov5n_seg_h8l_mz.hef')

# self.default_post_process_so = os.path.join(self.postprocess_dir, 'libyolov5seg_post.so')
if 'yolov5m_seg' in self.hef_path:
self.config_file = os.path.join(self.current_path, '../resources/yolov5m_seg.json')
elif 'yolov5n_seg' in self.hef_path:
self.config_file = os.path.join(self.current_path, '../resources/yolov5n_seg.json')
else:
raise ValueError("HEF version not supported, you will need to provide a config file")
self.default_post_process_so = os.path.join(self.current_path, '../resources/libyolov5seg_postprocess.so')
self.post_function_name = "yolov5seg"
self.hef_path = os.path.join(self.current_path, '../resources/yolov5n_seg_h8l_mz.hef')
self.app_callback = app_callback

# Set the process title
Expand All @@ -56,7 +80,9 @@ def get_pipeline_string(self):
infer_pipeline = INFERENCE_PIPELINE(
hef_path=self.hef_path,
post_process_so=self.default_post_process_so,
post_function_name=self.post_function_name
post_function_name=self.post_function_name,
batch_size=self.batch_size,
config_json=self.config_file,
)
user_callback_pipeline = USER_CALLBACK_PIPELINE()
display_pipeline = DISPLAY_PIPELINE(video_sink=self.video_sink, sync=self.sync, show_fps=self.show_fps)
Expand Down
38 changes: 31 additions & 7 deletions basic_pipelines/pose_estimation_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
INFERENCE_PIPELINE,
USER_CALLBACK_PIPELINE,
DISPLAY_PIPELINE,
get_caps_from_pad,
get_numpy_from_buffer,
GStreamerApp,
app_callback_class,
dummy_callback,
detect_hailo_arch,
)

#-----------------------------------------------------------------------------------------------
Expand All @@ -41,11 +40,35 @@ def __init__(self, app_callback, user_data):
self.network_width = 640
self.network_height = 640
self.network_format = "RGB"
self.default_post_process_so = os.path.join(self.postprocess_dir, 'libyolov8pose_post.so')
self.post_function_name = "filter"
self.hef_path = os.path.join(self.current_path, '../resources/yolov8s_pose_h8l_pi.hef')


# Determine the architecture if not specified
if args.arch is None:
detected_arch = detect_hailo_arch()
if detected_arch is None:
raise ValueError("Could not auto-detect Hailo architecture. Please specify --arch manually.")
self.arch = detected_arch
print(f"Auto-detected Hailo architecture: {self.arch}")
else:
self.arch = args.arch



# Set the HEF file path based on the architecture
if args.hef_path:
self.hef_path = args.hef_path
elif self.arch == "hailo8":
self.hef_path = os.path.join(self.current_path, '../resources/yolov8m_pose.hef')
else: # hailo8l
self.hef_path = os.path.join(self.current_path, '../resources/yolov8s_pose_h8l.hef')

self.app_callback = app_callback

# Set the post-processing shared object file
self.post_process_so = os.path.join(self.current_path, '../resources/libyolov8pose_postprocess.so')
self.post_process_function = "filter"


# Set the process title
setproctitle.setproctitle("Hailo Pose Estimation App")

Expand All @@ -55,8 +78,9 @@ def get_pipeline_string(self):
source_pipeline = SOURCE_PIPELINE(video_source=self.video_source)
infer_pipeline = INFERENCE_PIPELINE(
hef_path=self.hef_path,
post_process_so=self.default_post_process_so,
post_function_name=self.post_function_name
post_process_so=self.post_process_so,
post_function_name=self.post_process_function,
batch_size=self.batch_size
)
user_callback_pipeline = USER_CALLBACK_PIPELINE()
display_pipeline = DISPLAY_PIPELINE(video_sink=self.video_sink, sync=self.sync, show_fps=self.show_fps)
Expand Down
Loading

0 comments on commit c19d4ba

Please sign in to comment.