Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,565 changes: 3,565 additions & 0 deletions .komment/00000.json

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions .komment/komment.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"meta": {
"version": "1",
"updated_at": "2024-07-24T11:48:24.616Z",
"created_at": "2024-07-03T12:07:43.789Z",
"pipelines": [
"1e4ffa95-bdaf-4704-9e22-a4e72cbd5dbe",
"e329fad3-152c-4b69-b3f3-adec4460ee20",
"e13d59cf-aac3-4640-92c5-48fea5fa852c",
"7013db2f-829c-4c3a-938f-e6dcdc3ab191",
"4491c5fd-b71b-49e0-8541-46bb94e775d8",
"de7a47c1-ece4-4bf1-9c4b-6028e2cda2fc",
"9814c399-3b35-44fc-9710-2c2ec682082e",
"2f1a3f35-66e0-44e0-b2ef-66474aa9dc9b",
"5212b28c-e549-4008-af08-a2ba530a1ffb",
"6c45dd8b-72bd-4c2b-b45d-3098c6c49c86",
"7da7f7f4-27ac-4c85-9b54-acf249641c8f",
"3d298415-35f6-4628-aeb7-d8fb2f055a9c",
"b0ca715b-40b5-4025-a5cf-99989a3fdd89"
]
},
"lookup": [
[
"components/camera_kinova/src/specificworker.py",
"pybullet_controller/src/specificworker.py",
"components/kinova_controller/src/specificworker.py",
"components/kinova_controller/src/kinovaarmI.py",
"components/kinova_controller/src/test.py",
"components/kinova_controller/src/genericworker.py",
"components/kinova_controller/src/interfaces.py"
]
]
}
234 changes: 208 additions & 26 deletions components/camera_kinova/src/specificworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from PySide2.QtCore import QTimer
from PySide2.QtWidgets import QApplication
from orca.debug import pidOf
from rich.console import Console
from genericworker import *
import interfaces as ifaces
Expand All @@ -37,36 +38,58 @@
console = Console(highlight=False)

class SpecificWorker(GenericWorker):
"""
Captures and processes RGB-D video streams from a camera, storing frames in
queues for further processing or retrieval by other components. It manages
threads for color and depth stream capture and provides methods to retrieve
images and depth data.

Attributes:
Period (int): 1000, indicating a period or interval (in milliseconds) for
various operations to be performed by the worker, such as connecting
to streams and processing frames.
hide (bool): Set to `self.hide()` in the `__init__` method, but its exact
purpose or effect is not clear without additional context.
depth_queue (queueQueue[int]): Initialized with a maximum size of 1, which
allows it to store one frame from the depth stream at a time.
color_queue (Queue[Any]): Used to hold color frames from a video stream,
with a maximum size of one frame at a time. It follows first-in-first-out
(FIFO) order.
init_timestamp (int): Initialized with the current time (in milliseconds)
at the instance creation, obtained using `int(time.time()*1000)`.
startup_check (bool): Used to determine whether a startup check should be
performed when initializing the worker. The check tests various data
structures from the RoboComp library.
timer (QTimer): Connected to the compute method, which it calls every
Period milliseconds.
compute (None|Callable[[],None]): Annotated with @QtCore.Slot(). It
represents a slot that gets called when a timer event occurs. The
function does not contain any operation, it only passes.

"""
def __init__(self, proxy_map, startup_check=False):
"""
Initializes object properties, such as time period and queues for depth
and color data, sets up event handling, and starts the worker process upon
initialization or startup check.

Args:
proxy_map (Dict[str, Any]): Passed to the parent class's constructor
using `super(SpecificWorker, self).__init__(proxy_map)`, indicating
it serves as a configuration or setup map.
startup_check (bool): Optional, with a default value of False. It
determines whether to run startup checks or not when the worker
is initialized.

"""
super(SpecificWorker, self).__init__(proxy_map)
self.Period = 100
self.Period = 1000
self.hide()

#set queue size
self.depth_queue = queue.Queue(maxsize=1)
self.color_queue = queue.Queue(maxsize=1)

self.color_stream = cv2.VideoCapture(
"gst-launch-1.0 rtspsrc location=rtsp://192.168.1.10/color latency=30 ! rtph264depay ! h264parse ! avdec_h264 ! videoconvert n-threads=2 ! video/x-raw,format=BGR ! queue ! appsink drop=true",cv2.CAP_GSTREAMER)

self.depth_stream = cv2.VideoCapture(
"gst-launch-1.0 rtspsrc location=rtsp://192.168.1.10/depth latency=30 ! rtpgstdepay ! videoconvert n-threads=2 ! video/x-raw,format=GRAY16_LE ! queue ! appsink drop=true",cv2.CAP_GSTREAMER)
#print(self.depth_stream.isOpened())

# create a thread to capture the stream and start it
self.color_thread = threading.Thread(target=self.video_color_thread, args=(self.color_stream, self.color_queue))
if(not self.color_stream.isOpened()):
print("color stream not opened")
sys.exit()
self.color_thread.start()

self.depth_thread = threading.Thread(target=self.video_depth_thread, args=(self.depth_stream, self.depth_queue))
if(not self.depth_stream.isOpened()):
print("depth stream not opened")
sys.exit()
self.depth_thread.start()

print("Reading threads started")
self.init_timestamp = int(time.time()*1000)

if startup_check:
self.startup_check()
Expand All @@ -76,15 +99,74 @@ def __init__(self, proxy_map, startup_check=False):

def __del__(self):
"""Destructor"""

self.killThreads()
self.color_stream.release()
self.depth_stream.release()

print("Destructor")


def setParams(self, params):
"""
Initializes parameters for video stream capture, starts capturing color
stream from an IP address, and creates a thread to process the captured
frames. It also checks if the stream is opened successfully before proceeding.

Args:
params (Dict[str, str | int | bool]): Expected to contain key-value
pairs representing various settings such as IP address, latency
and others necessary for connecting to an RTSP stream.

Returns:
bool: Set to True when the execution is successful, indicating that
the video streams have been successfully opened and a thread has been
created to capture them.

"""
self.ip = params["ip"]
self.run = True
# create the video capture objects
print(f"Connecting to {self.ip}")
# self.color_stream = cv2.VideoCapture(
# f"gst-launch-1.0 rtspsrc location=rtsp://{self.ip}/color latency=30 ! rtph264depay ! h264parse ! avdec_h264 ! videoconvert n-threads=2 ! video/x-raw,format=BGR ! queue ! appsink drop=true",
# cv2.CAP_GSTREAMER)
self.color_stream = cv2.VideoCapture(
f"gst-launch-1.0 rtspsrc location=rtsp://{self.ip}/color latency=30 ! rtph264depay ! h264parse ! nvh264dec ! videoconvert n-threads=2 ! video/x-raw,format=BGR ! queue ! appsink drop=true",
cv2.CAP_GSTREAMER)

# self.depth_stream = cv2.VideoCapture(
# f"gst-launch-1.0 rtspsrc location=rtsp://{self.ip}/depth latency=30 ! rtpgstdepay ! videoconvert n-threads=2 ! video/x-raw,format=GRAY16_LE ! queue ! appsink drop=true",
# cv2.CAP_GSTREAMER)
# print(self.depth_stream.isOpened())

# create a thread to capture the stream and start it
self.color_thread = threading.Thread(target=self.video_color_thread, args=(self.color_stream, self.color_queue))
if (not self.color_stream.isOpened()):
print("color stream not opened")
sys.exit()
self.color_thread.start()

# self.depth_thread = threading.Thread(target=self.video_depth_thread, args=(self.depth_stream, self.depth_queue))
# if (not self.depth_stream.isOpened()):
# print("depth stream not opened")
# sys.exit()
# self.depth_thread.start()

print("Reading threads started")

return True


@QtCore.Slot()
def compute(self):
"""
Computes color and depth pixel values, but these lines are currently
commented out, rendering them inactive. The method also checks if the
worker is hidden and returns True immediately if so. This could be used
to implement lazy computation.

"""
# print('SpecificWorker.compute...')
#
# color_frame = self.color_queue.get()
Expand All @@ -106,8 +188,22 @@ def compute(self):

################################################################################################################
def video_color_thread(self, cap, inqueue):
"""
Captures video frames from an OpenCV camera, puts them into a queue, and
releases the camera when it finishes or is interrupted by a keyboard signal.

Args:
cap (cv2.VideoCapture): Referenced as an object, likely representing
a video capture device or a video file. It provides functionality
for reading frames from the captured video.
inqueue (Queue[Any]): Used to store frames from the camera for further
processing, allowing for efficient handling of video data and
preventing buffer overflows due to full queue condition.

"""
try:
while cap.isOpened():
while cap.isOpened() and self.run:
# print("color", int(time.time()*1000)-self.init_timestamp)
ret, frame = cap.read()
if ret:
# inqueue.put_nowait(frame)
Expand All @@ -118,15 +214,32 @@ def video_color_thread(self, cap, inqueue):
# Si la cola está llena, descarta la imagen más antigua y agrega la nueva
inqueue.get_nowait()
inqueue.put_nowait(frame)
print("color finish")
except KeyboardInterrupt:
print("hilo finish")
cap.release()


def video_depth_thread(self, cap, inqueue):
"""
Reads frames from an opened camera capture object (cap), converts them
into depth data, and pushes them into a shared queue for processing. It
runs until the camera is closed or the thread is stopped.

Args:
cap (cv2.VideoCapture | None): A video capture object that provides
read access to video frames from the device's camera or other video
source.
inqueue (Queue): Used to store frames from the camera capture (`cap`)
as they are read, enabling thread-safe input queuing.

Used for handling full queue conditions by removing an item before
adding the new one.

"""
#print("inicio bucle")
try:
while cap.isOpened():
while cap.isOpened() and self.run:
print("depth", int(time.time()*1000)-self.init_timestamp)
ret, frame = cap.read()
if ret:
# inqueue.put_nowait(frame)
Expand All @@ -137,12 +250,31 @@ def video_depth_thread(self, cap, inqueue):
# Si la cola está llena, descarta la imagen más antigua y agrega la nueva
inqueue.get_nowait()
inqueue.put_nowait(frame)
print("depth finish")
except KeyboardInterrupt:
print("hilo finish")
cap.release()

def killThreads(self):
"""
Terminates two threads, color_thread and depth_thread, when its run variable
is set to False, allowing the program to exit safely by joining these
threads before proceeding.

"""
self.run = False
self.color_thread.join()
self.depth_thread.join()
print("threads killed")


def startup_check(self):
"""
Tests three types of images from the ifaces module, printing a message for
each. After testing, it schedules a single shot to quit the application
after 200 milliseconds using QTimer and QApplication's instance.

"""
print(f"Testing RoboCompCameraRGBDSimple.TImage from ifaces.RoboCompCameraRGBDSimple")
test = ifaces.RoboCompCameraRGBDSimple.TImage()
print(f"Testing RoboCompCameraRGBDSimple.TDepth from ifaces.RoboCompCameraRGBDSimple")
Expand All @@ -160,6 +292,22 @@ def startup_check(self):
# IMPLEMENTATION of getAll method from CameraRGBDSimple interface
#
def CameraRGBDSimple_getAll(self, camera):
"""
Retrieves data from two queues and returns it in a structured format,
specifically an instance of TRGBD from the RoboComp library. It handles
exceptions where either queue is empty.

Args:
camera (CameraRGBDSimple): Not used within the function itself. It
seems to be an unused parameter possibly intended for future use
or method overriding.

Returns:
TRGBD|None: An object containing depth and image data along with their
corresponding timestamps, unless a queue is empty. In that case it
returns None.

"""
try:
ret = ifaces.RoboCompCameraRGBDSimple.TRGBD()
#ret.depth.depth = cv2.resize(self.depth_queue.get(), (480, 270))
Expand All @@ -170,13 +318,31 @@ def CameraRGBDSimple_getAll(self, camera):
ret.image.image = self.color_queue.get_nowait()
ret.image.height, ret.image.width, ret.image.depth = ret.image.image.shape
ret.image.alivetime = int(time.time()*1000)
print("get all")
return ret
except queue.Empty:
print("Empty queue")
return None
#
# IMPLEMENTATION of getDepth method from CameraRGBDSimple interface
#
def CameraRGBDSimple_getDepth(self, camera):
"""
Dequeues an image from a queue, extracts its dimensions and pixel data,
and returns them as a TDepth object representing depth information.

Args:
camera (RoboCompCameraRGBDSimple): Passed to the method as an argument,
however its usage within the method is unclear and appears redundant
since it's not used.

Returns:
ifacesRoboCompCameraRGBDSimpleTDepth: A structured data object containing
three elements: ret.height, ret.width and ret.depth where ret.height
and ret.width represent the image dimensions and ret.depth represents
the depth image itself.

"""
try:
img = self.depth_queue.get()
# img = cv2.resize(img, (480, 270))
Expand All @@ -191,12 +357,28 @@ def CameraRGBDSimple_getDepth(self, camera):
# IMPLEMENTATION of getImage method from CameraRGBDSimple interface
#
def CameraRGBDSimple_getImage(self, camera):
"""
Retrieves an image from a color queue, packages it into a TImage object,
and returns the packaged image along with its height, width, depth, and
alivetime stamp. It handles empty queues by returning None.

Args:
camera (RoboCompCameraRGBDSimple.Camera): Used to pass a camera object
to the function.

Returns:
RoboCompCameraRGBDSimpleTImage: An object containing image data along
with its dimensions, alive time and the image itself represented as a
numpy array.

"""
try:
img = self.color_queue.get()
# img = cv2.resize(img, (480, 270))
ret = ifaces.RoboCompCameraRGBDSimple.TImage()
ret.height, ret.width, ret.depth = img.shape
ret.image = img
ret.alivetime = int(time.time()*1000)

return ret
except queue.Empty:
Expand Down
Loading