-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththread_manager.py
More file actions
45 lines (34 loc) · 1.25 KB
/
thread_manager.py
File metadata and controls
45 lines (34 loc) · 1.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from PySide6.QtCore import QThread, QMutex, QObject
class WorkerThread(QThread):
"""
Worker thread class to execute a function in a separate thread.
Optionally supports mutex locking to prevent race conditions.
"""
def __init__(self, parent: QObject, service_name: str, function: callable = None, args: list = None, lock_required: bool = False):
super().__init__()
self.service_name = service_name
self.function = function
self.args = args or []
self.lock_required = lock_required
self.mutex = QMutex()
self.setParent(parent)
def run(self):
try:
if self.lock_required:
self.mutex.lock()
if self.function:
self.function(*self.args)
except Exception as e:
print(f"[{self.service_name}] Error:", e)
finally:
if self.lock_required:
self.mutex.unlock()
self.cleanup()
def cleanup(self):
"""Handles cleanup after the thread finishes."""
print(f"[{self.service_name}] Thread finished, cleaning up.")
def stop(self):
"""Stop the thread safely."""
if self.isRunning():
self.quit()
self.wait()