-
-
Notifications
You must be signed in to change notification settings - Fork 920
fix multi-threading issue #1146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Summary by CodeRabbit
WalkthroughAdds a new event-driven dependency resolver and a cross-process/thread worker pool; integrates resolver into BaseEngine flows; updates module execution to optionally use the shared pool; and initializes/shuts down the shared pool in app startup/shutdown. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45–90 minutes Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
nettacker/core/module.py (1)
184-225
: Need to wait for shared-pool tasks before returningIn the shared-pool branch we only enqueue work and immediately proceed;
active_threads
tracks fallback threads only. As soon as the loop finishes,module.start()
returns while tasks are still running inCrossProcessThreadPool
. The parent process then callsshutdown_thread_pool()
, which flipsis_running
and tears down workers after ~10 s, killing any long-running tasks mid-flight. The scan is reported complete before the work actually finishes, and results can be lost.We need explicit completion tracking—consume
result_queue
, count outstanding submissions, or otherwise block on the shared pool—before returning fromstart()
. Without that synchronization the new pool path is functionally incorrect.nettacker/core/lib/base.py (1)
306-346
: Restoresub_step
before queuing dependent taskWhen a step has
dependent_on_temp_event
, we deletesub_step["method"]
and"response"
and pass the mutated dict todependency_resolver.get_dependency_results_efficiently
. If the dependency is still missing, we returnFalse
with those keys still removed. The resolver stores that strippedsub_step
, and once the dependency arrives_execute_task
calls back intoBaseEngine.run(updated_sub_step, …)
, which immediately hitssub_step["method"]
and raisesKeyError
. The queued work never executes.Capture a full copy of the original step (including method/response) for the resolver and restore the deleted keys before returning:
- backup_method = copy.deepcopy(sub_step["method"]) - backup_response = copy.deepcopy(sub_step["response"]) - del sub_step["method"] - del sub_step["response"] + original_sub_step = copy.deepcopy(sub_step) + backup_method = original_sub_step["method"] + backup_response = original_sub_step["response"] + del sub_step["method"] + del sub_step["response"] ... - temp_event = dependency_resolver.get_dependency_results_efficiently( + temp_event = dependency_resolver.get_dependency_results_efficiently( target, module_name, scan_id, backup_response["dependent_on_temp_event"], - sub_step, - self, - ( - sub_step, + copy.deepcopy(original_sub_step), + self, + ( + copy.deepcopy(original_sub_step), module_name, target, scan_id, options, process_number, module_thread_number, total_module_thread_number, request_number_counter, total_number_of_requests, ), ) if temp_event is None: - log.verbose_event_info(...) - return False + sub_step["method"] = backup_method + sub_step["response"] = backup_response + log.verbose_event_info(...) + return FalseAny equivalent approach that preserves the full step for deferred execution works.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
nettacker/core/app.py
(3 hunks)nettacker/core/lib/base.py
(4 hunks)nettacker/core/module.py
(5 hunks)nettacker/core/queue_manager.py
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py
: Use 4-space indents in Python code
Limit lines to 99 characters (ruff/ruff-format/isort profile=black)
Module and file names should use lower_snake_case
Function and variable names should use lower_snake_case
Class names should use PascalCase
Constants should use UPPER_SNAKE_CASE
Keep functions small and add type hints where practical
Files:
nettacker/core/module.py
nettacker/core/app.py
nettacker/core/lib/base.py
nettacker/core/queue_manager.py
nettacker/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
Add docstrings for public APIs in the nettacker package
Files:
nettacker/core/module.py
nettacker/core/app.py
nettacker/core/lib/base.py
nettacker/core/queue_manager.py
nettacker/core/**
📄 CodeRabbit inference engine (AGENTS.md)
Place core libraries under nettacker/core/
Files:
nettacker/core/module.py
nettacker/core/app.py
nettacker/core/lib/base.py
nettacker/core/queue_manager.py
🧬 Code graph analysis (4)
nettacker/core/module.py (3)
nettacker/core/queue_manager.py (1)
submit_task
(224-227)nettacker/core/lib/base.py (1)
run
(292-376)nettacker/core/utils/common.py (1)
wait_for_threads_to_finish
(54-71)
nettacker/core/app.py (2)
nettacker/core/queue_manager.py (2)
initialize_thread_pool
(320-325)shutdown_thread_pool
(328-333)nettacker/core/utils/common.py (1)
wait_for_threads_to_finish
(54-71)
nettacker/core/lib/base.py (2)
nettacker/core/queue_manager.py (2)
get_dependency_results_efficiently
(138-198)notify_dependency_available
(55-69)nettacker/database/db.py (2)
find_temp_events
(191-223)submit_temp_logs_to_db
(161-188)
nettacker/core/queue_manager.py (3)
nettacker/database/db.py (1)
find_temp_events
(191-223)nettacker/core/lib/base.py (2)
replace_dependent_values
(289-290)run
(292-376)nettacker/core/module.py (1)
start
(161-243)
🪛 Ruff (0.13.3)
nettacker/core/queue_manager.py
135-135: Do not catch blind exception: Exception
(BLE001)
136-136: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
207-207: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
265-265: Do not catch blind exception: Exception
(BLE001)
266-266: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
292-292: Do not catch blind exception: Exception
(BLE001)
320-320: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Test Docker 26.0.0-1
ubuntu.24.04noble image build - GitHub Check: Test Docker 27.5.0-1
ubuntu.24.04noble image build - GitHub Check: Test Docker 26.1.4-1
ubuntu.24.04noble image build - GitHub Check: Test Docker image
- GitHub Check: Build package
b973b70
to
dd552cc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
nettacker/core/module.py (1)
184-227
: Shared pool ignoresthread_per_host
capWhen the shared pool is active we enqueue every sub-step without any throttling, so a target can execute up to
max_workers_per_process
concurrent tasks instead of respectingself.module_inputs["thread_per_host"]
. That regresses the per-host safety limit and can easily overwhelm a target. Please gate pooled submissions by the same cap (e.g., acquire a semaphore keyed per host or block until outstanding pooled tasks for this host drop below the configured maximum) before callingsubmit_task
, otherwise fall back to the local-thread path.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
nettacker/core/app.py
(3 hunks)nettacker/core/lib/base.py
(4 hunks)nettacker/core/module.py
(5 hunks)nettacker/core/queue_manager.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- nettacker/core/lib/base.py
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py
: Use 4-space indents in Python code
Limit lines to 99 characters (ruff/ruff-format/isort profile=black)
Module and file names should use lower_snake_case
Function and variable names should use lower_snake_case
Class names should use PascalCase
Constants should use UPPER_SNAKE_CASE
Keep functions small and add type hints where practical
Files:
nettacker/core/module.py
nettacker/core/app.py
nettacker/core/queue_manager.py
nettacker/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
Add docstrings for public APIs in the nettacker package
Files:
nettacker/core/module.py
nettacker/core/app.py
nettacker/core/queue_manager.py
nettacker/core/**
📄 CodeRabbit inference engine (AGENTS.md)
Place core libraries under nettacker/core/
Files:
nettacker/core/module.py
nettacker/core/app.py
nettacker/core/queue_manager.py
🧬 Code graph analysis (3)
nettacker/core/module.py (3)
nettacker/core/queue_manager.py (1)
submit_task
(223-226)nettacker/core/lib/base.py (1)
run
(292-376)nettacker/core/utils/common.py (1)
wait_for_threads_to_finish
(54-71)
nettacker/core/app.py (2)
nettacker/core/queue_manager.py (2)
initialize_thread_pool
(305-310)shutdown_thread_pool
(313-318)nettacker/core/utils/common.py (1)
wait_for_threads_to_finish
(54-71)
nettacker/core/queue_manager.py (3)
nettacker/database/db.py (1)
find_temp_events
(191-223)nettacker/core/lib/base.py (2)
replace_dependent_values
(289-290)run
(292-376)nettacker/core/module.py (1)
start
(161-245)
🪛 Ruff (0.13.3)
nettacker/core/queue_manager.py
135-135: Do not catch blind exception: Exception
(BLE001)
136-136: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
264-264: Redundant exception object included in logging.exception
call
(TRY401)
285-285: Redundant exception object included in logging.exception
call
(TRY401)
305-305: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Test Docker 26.1.4-1
ubuntu.24.04noble image build - GitHub Check: Test Docker 27.5.0-1
ubuntu.24.04noble image build - GitHub Check: Build package
- GitHub Check: Test Docker 26.0.0-1
ubuntu.24.04noble image build - GitHub Check: Test Docker image
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
nettacker/core/module.py (1)
184-201
: Bound method cannot be marshaled across processes
submit_task(engine.run, ...)
puts a bound method into amultiprocessing
queue. This is not picklable and will fail on put/get. Use a serializable payload and reconstruct the engine in the worker.Apply this approach:
- Add a specialized submit helper that serializes the work item (see queue_manager diff).
- Call it here instead of passing a callable.
- queue_manager.thread_pool.submit_task( - engine.run, - sub_step, - self.module_name, - self.target, - self.scan_id, - self.module_inputs, - self.process_number, - self.module_thread_number, - self.total_module_thread_number, - request_number_counter, - total_number_of_requests, - ) + queue_manager.thread_pool.submit_engine_run( + library, + sub_step, + self.module_name, + self.target, + self.scan_id, + self.module_inputs, + self.process_number, + self.module_thread_number, + self.total_module_thread_number, + request_number_counter, + total_number_of_requests, + )
♻️ Duplicate comments (1)
nettacker/core/module.py (1)
10-10
: Frozen import issue resolved correctlySwitched to importing the module and dereferencing at call time. This fixes the stale
thread_pool
reference.
🧹 Nitpick comments (4)
nettacker/core/module.py (2)
122-159
: Sorting may misclassify steps by inspecting only the first sub_step
step_response = step[0]["response"]
assumes all sub_steps share dependency traits. Consider aggregating across sub_steps (e.g., any consumer/any generator) to avoid reordering mistakes when sub_steps differ.
204-229
: Per-host concurrency limit bypassed in shared-pool pathWhen using the shared pool,
thread_per_host
is not enforced; tasks may run far beyond the per-target throttle. Consider keyed throttling in the shared pool (e.g., per-target semaphores) or gate submissions here to respectthread_per_host
.nettacker/core/queue_manager.py (2)
135-137
: Improve exception logging and avoid blind exceptsUse
log.exception
when catching broad exceptions to preserve tracebacks; narrow exceptions where possible.- except Exception as e: - log.error(f"Error executing dependent task: {e}") + except Exception: + log.exception("Error executing dependent task") @@ - except Exception as e: - log.exception(f"Worker process {worker_id} error: {e}") + except Exception: + log.exception("Worker process error") @@ - except Exception as e: - log.error(f"Worker {worker_id} task execution failed: {e}") + except Exception: + log.exception("Worker task execution failed") @@ - except Exception as e: - log.error(f"Error while waiting for tasks to complete: {e}") + except Exception: + log.exception("Error while waiting for tasks to complete")Also applies to: 298-300, 322-324, 340-341
207-216
: Minor typing polishAnnotate optional parameters explicitly for clarity and to satisfy linters.
- def __init__(self, max_workers_per_process: Optional[int] = None): + def __init__(self, max_workers_per_process: Optional[int] = None): ... -def initialize_thread_pool(num_processes: int, max_workers_per_process: int = None): +def initialize_thread_pool( + num_processes: int, max_workers_per_process: Optional[int] = None +) -> CrossProcessThreadPool:Also applies to: 362-367
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
nettacker/core/module.py
(5 hunks)nettacker/core/queue_manager.py
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py
: Use 4-space indents in Python code
Limit lines to 99 characters (ruff/ruff-format/isort profile=black)
Module and file names should use lower_snake_case
Function and variable names should use lower_snake_case
Class names should use PascalCase
Constants should use UPPER_SNAKE_CASE
Keep functions small and add type hints where practical
Files:
nettacker/core/queue_manager.py
nettacker/core/module.py
nettacker/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
Add docstrings for public APIs in the nettacker package
Files:
nettacker/core/queue_manager.py
nettacker/core/module.py
nettacker/core/**
📄 CodeRabbit inference engine (AGENTS.md)
Place core libraries under nettacker/core/
Files:
nettacker/core/queue_manager.py
nettacker/core/module.py
🧬 Code graph analysis (2)
nettacker/core/queue_manager.py (3)
nettacker/database/db.py (1)
find_temp_events
(191-223)nettacker/core/lib/base.py (2)
replace_dependent_values
(289-290)run
(292-376)nettacker/core/module.py (1)
start
(161-258)
nettacker/core/module.py (3)
nettacker/core/queue_manager.py (2)
submit_task
(227-232)wait_for_completion
(234-259)nettacker/core/lib/base.py (1)
run
(292-376)nettacker/core/utils/common.py (1)
wait_for_threads_to_finish
(54-71)
🪛 Ruff (0.13.3)
nettacker/core/queue_manager.py
135-135: Do not catch blind exception: Exception
(BLE001)
136-136: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
299-299: Redundant exception object included in logging.exception
call
(TRY401)
322-322: Do not catch blind exception: Exception
(BLE001)
323-323: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
340-340: Do not catch blind exception: Exception
(BLE001)
341-341: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
362-362: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Run tests
- GitHub Check: CodeQL (javascript)
# Wait for completion based on execution path | ||
if used_shared_pool: | ||
# Wait for shared thread pool tasks to complete | ||
if queue_manager.thread_pool and hasattr( | ||
queue_manager.thread_pool, "wait_for_completion" | ||
): | ||
# Wait with a reasonable timeout to prevent hanging | ||
completed = queue_manager.thread_pool.wait_for_completion(timeout=300) # 5 minutes | ||
if not completed: | ||
log.warn(f"Module {self.module_name} tasks did not complete within timeout") | ||
|
||
# Wait for any remaining local threads to finish | ||
if active_threads: | ||
wait_for_threads_to_finish(active_threads, maximum=None, terminable=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Waits for global pool completion, not this module’s submissions
wait_for_completion()
tracks global submitted/completed counts. This can block on unrelated work or time out prematurely. Prefer per-batch handles: have submit_*
return a ticket and add wait_until(ticket, timeout)
that waits until completed >= ticket
, or return per-task futures/events and wait on those here.
with self._lock: | ||
# Cache the result | ||
self._cache_dependency_result(dependency_key, result) | ||
|
||
# Check for pending tasks that can now be executed | ||
self._process_pending_tasks(dependency_key) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t execute tasks while holding the resolver lock; also remove ready tasks from all wait lists
notify_dependency_available()
calls _process_pending_tasks()
inside with self._lock
, and _process_pending_tasks()
invokes _execute_task()
. This holds the lock during engine.run
, blocking other threads and risking reentrancy. Also, a task is queued under multiple dependency keys but only removed from the current key, leading to duplicate executions once other keys trigger.
Refactor to (a) collect ready tasks under lock, then execute them after releasing it, and (b) remove ready tasks from all dependency-key lists before executing.
Apply this diff:
@@
- def notify_dependency_available(
+ def notify_dependency_available(
self, target: str, module_name: str, scan_id: str, event_name: str, result: Any
):
@@
- with self._lock:
- # Cache the result
- self._cache_dependency_result(dependency_key, result)
-
- # Check for pending tasks that can now be executed
- self._process_pending_tasks(dependency_key)
+ # Cache and collect ready tasks under lock
+ with self._lock:
+ self._cache_dependency_result(dependency_key, result)
+ ready_tasks = self._process_pending_tasks(dependency_key)
+
+ # Execute outside the lock
+ for task in ready_tasks:
+ self._execute_task(task)
@@
- def _process_pending_tasks(self, dependency_key: str):
- """Process tasks that were waiting for the given dependency."""
- if dependency_key not in self._pending_tasks:
- return
+ def _process_pending_tasks(self, dependency_key: str) -> List[DependentTask]:
+ """Collect tasks that became ready due to the given dependency.
+ Must be called under self._lock.
+ """
+ if dependency_key not in self._pending_tasks:
+ return []
@@
- # Update pending tasks list
- if remaining_tasks:
- self._pending_tasks[dependency_key] = remaining_tasks
- else:
- del self._pending_tasks[dependency_key]
-
- # Execute ready tasks
- for task in ready_tasks:
- self._execute_task(task)
+ # Update pending list for the triggering key
+ if remaining_tasks:
+ self._pending_tasks[dependency_key] = remaining_tasks
+ else:
+ del self._pending_tasks[dependency_key]
+
+ # Remove ready tasks from all other dependency-key lists
+ if ready_tasks:
+ for key in list(self._pending_tasks.keys()):
+ new_list = [t for t in self._pending_tasks[key] if t not in ready_tasks]
+ if new_list:
+ self._pending_tasks[key] = new_list
+ else:
+ del self._pending_tasks[key]
+
+ # Return ready tasks to be executed outside the lock
+ return ready_tasks
Also applies to: 75-105
🤖 Prompt for AI Agents
In nettacker/core/queue_manager.py around lines 64-70 and 75-105, avoid
executing tasks while holding self._lock and ensure tasks queued under multiple
dependency keys are removed from all wait lists: inside the lock, collect the
set/list of ready tasks for the given dependency_key and remove those task IDs
from every dependency wait-list and from any per-task tracking structures, then
release the lock and iterate over the collected tasks to call
_execute_task/engine.run; ensure no engine.run or other long-running calls
happen while the lock is held and that removal is atomic so tasks won't be
executed more than once when other dependencies become available.
def submit_task(self, task_func, *args, **kwargs): | ||
"""Submit a task to the shared pool.""" | ||
task = {"func": task_func, "args": args, "kwargs": kwargs, "timestamp": time.time()} | ||
self.task_queue.put(task) | ||
with self.submitted_tasks.get_lock(): | ||
self.submitted_tasks.value += 1 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiprocessing task payload must be serializable (callables/bound methods won’t pickle)
Current design enqueues a callable. With multiprocessing
, this will not pickle (e.g., bound methods like engine.run
). Serialize the work item instead and reconstruct the engine in workers.
Apply this addition (backward-compatible helper and worker handling):
@@
- def submit_task(self, task_func, *args, **kwargs):
+ def submit_task(self, task_func, *args, **kwargs):
"""Submit a task to the shared pool."""
task = {"func": task_func, "args": args, "kwargs": kwargs, "timestamp": time.time()}
self.task_queue.put(task)
with self.submitted_tasks.get_lock():
self.submitted_tasks.value += 1
+
+ def submit_engine_run(
+ self,
+ library: str,
+ sub_step: Dict[str, Any],
+ module_name: str,
+ target: str,
+ scan_id: str,
+ options: Dict[str, Any],
+ process_number: int,
+ module_thread_number: int,
+ total_module_thread_number: int,
+ request_number_counter: int,
+ total_number_of_requests: int,
+ ):
+ """Submit a serialized Engine.run task (safe for multiprocessing)."""
+ task = {
+ "kind": "engine_run",
+ "library": library,
+ "payload": {
+ "sub_step": sub_step,
+ "module_name": module_name,
+ "target": target,
+ "scan_id": scan_id,
+ "options": options,
+ "process_number": process_number,
+ "module_thread_number": module_thread_number,
+ "total_module_thread_number": total_module_thread_number,
+ "request_number_counter": request_number_counter,
+ "total_number_of_requests": total_number_of_requests,
+ },
+ "timestamp": time.time(),
+ }
+ self.task_queue.put(task)
+ with self.submitted_tasks.get_lock():
+ self.submitted_tasks.value += 1
@@
- def _execute_task(
- self, task: Dict, worker_id: int, completed_tasks: multiprocessing.Value, task_queue
- ):
+ def _execute_task(
+ self, task: Dict, worker_id: int, completed_tasks: multiprocessing.Value, task_queue
+ ):
"""Execute a single task."""
try:
- func = task["func"]
- args = task["args"]
- kwargs = task["kwargs"]
-
- # Execute the task - engine.run() handles its own results/logging
- func(*args, **kwargs)
+ if task.get("kind") == "engine_run":
+ lib = task["library"]
+ payload = task["payload"]
+ # Reconstruct engine and call run
+ mod = __import__(f"nettacker.core.lib.{lib.lower()}", fromlist=["*"])
+ engine_cls = getattr(mod, f"{lib.capitalize()}Engine")
+ engine = engine_cls()
+ engine.run(
+ payload["sub_step"],
+ payload["module_name"],
+ payload["target"],
+ payload["scan_id"],
+ payload["options"],
+ payload["process_number"],
+ payload["module_thread_number"],
+ payload["total_module_thread_number"],
+ payload["request_number_counter"],
+ payload["total_number_of_requests"],
+ )
+ else:
+ # Backward-compat path (may fail if callable isn't picklable)
+ func = task["func"]
+ args = task["args"]
+ kwargs = task["kwargs"]
+ func(*args, **kwargs)
@@
- except Exception as e:
- log.error(f"Worker {worker_id} task execution failed: {e}")
+ except Exception:
+ log.exception(f"Worker {worker_id} task execution failed")
finally:
# Always increment completed count, even on failure
with completed_tasks.get_lock():
completed_tasks.value += 1
# Mark task as done for JoinableQueue
task_queue.task_done()
Use submit_engine_run()
from callers (see paired change in module.py).
Also applies to: 308-329
has_dependency = "dependent_on_temp_event" in step_response | ||
generates_dependency = "save_to_temp_events_only" in step_response | ||
|
||
if not has_dependency and not generates_dependency: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is equally hard to read, maybe something like this would be better?
no_dep = []
dep_temp_only = []
dep_normal = []
for step in copy.deepcopy(self.module_content["payloads"][index]["steps"])::
resp = step[0]["response"]
if "dependent_on_temp_event" not in resp:
no_dep.append(step)
elif "save_to_temp_events_only" in resp:
dep_temp_only.append(step)
else:
dep_normal.append(step)
payload["steps"] = no_dep + dep_temp_only + dep_normal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this change would benefit apart from this PR as well. Maybe you can make a new one with this, because the current one is O(n) but running it thrice with three deepcopies is bad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay will fix it, is it okay if we add this changes in new commit in this pr ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make a separate one with this because it will probably be a while before this one is tested nicely, and this change is easier to verify and test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uses event-driven approach to avoid CPU consumption. | ||
""" | ||
# Try to get results efficiently using the new dependency resolver | ||
results = dependency_resolver.get_dependency_results_efficiently( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you try and quantify as well how effective using this is? I understand theoretically, polling every 100ms is not the best approach but this function is only called under the condition that dependent_on_temp_event
is present in the response (that is for a few vuln and scan modules), for HTTP implementation.
So I am assuming that this polling doesn't really have to run for long periods. Given that, if you can justify adding this complexity, it would be cool.
Please also look into CodeRabbit's suggestions and also, the CI/CD tests aren't passing
I think there is some issue in the way you have defined your worker subprocess. Do look into that. |
Proposed change
Replaces busy-wait loops with event-driven dependency resolution and adds cross-process thread sharing for better resource utilization.
Changes and approach I took
Closes issue : #595
Files changed
Type of change
Checklist
make pre-commit
, it didn't generate any changesmake test
, all tests passed locally