diff --git a/dojo/api_v2/views.py b/dojo/api_v2/views.py index 63bbd480f1f..090770eab9f 100644 --- a/dojo/api_v2/views.py +++ b/dojo/api_v2/views.py @@ -40,6 +40,7 @@ prefetch, serializers, ) +from dojo.api_v2.prefetch.prefetcher import _Prefetcher from dojo.authorization.roles_permissions import Permissions from dojo.cred.queries import get_authorized_cred_mappings from dojo.endpoint.queries import ( @@ -584,8 +585,6 @@ def files(self, request, pk=None): ) @action(detail=True, methods=["get", "post"]) def complete_checklist(self, request, pk=None): - from dojo.api_v2.prefetch.prefetcher import _Prefetcher - engagement = self.get_object() check_lists = Check_List.objects.filter(engagement=engagement) if request.method == "POST": diff --git a/dojo/apps.py b/dojo/apps.py index c1831b6a06e..0477b3c520d 100644 --- a/dojo/apps.py +++ b/dojo/apps.py @@ -71,21 +71,21 @@ def ready(self): # Load any signals here that will be ready for runtime # Importing the signals file is good enough if using the reciever decorator - import dojo.announcement.signals - import dojo.benchmark.signals - import dojo.cred.signals - import dojo.endpoint.signals - import dojo.engagement.signals - import dojo.file_uploads.signals - import dojo.finding_group.signals - import dojo.notes.signals - import dojo.product.signals - import dojo.product_type.signals - import dojo.risk_acceptance.signals - import dojo.sla_config.helpers - import dojo.tags_signals - import dojo.test.signals - import dojo.tool_product.signals # noqa: F401 + import dojo.announcement.signals # noqa: PLC0415 raised: AppRegistryNotReady + import dojo.benchmark.signals # noqa: PLC0415 raised: AppRegistryNotReady + import dojo.cred.signals # noqa: PLC0415 raised: AppRegistryNotReady + import dojo.endpoint.signals # noqa: PLC0415 raised: AppRegistryNotReady + import dojo.engagement.signals # noqa: PLC0415 raised: AppRegistryNotReady + import dojo.file_uploads.signals # noqa: PLC0415 raised: AppRegistryNotReady + import dojo.finding_group.signals # noqa: PLC0415 raised: AppRegistryNotReady + import dojo.notes.signals # noqa: PLC0415 raised: AppRegistryNotReady + import dojo.product.signals # noqa: PLC0415 raised: AppRegistryNotReady + import dojo.product_type.signals # noqa: PLC0415 raised: AppRegistryNotReady + import dojo.risk_acceptance.signals # noqa: PLC0415 raised: AppRegistryNotReady + import dojo.sla_config.helpers # noqa: PLC0415 raised: AppRegistryNotReady + import dojo.tags_signals # noqa: PLC0415 raised: AppRegistryNotReady + import dojo.test.signals # noqa: PLC0415 raised: AppRegistryNotReady + import dojo.tool_product.signals # noqa: F401,PLC0415 raised: AppRegistryNotReady def get_model_fields_with_extra(model, extra_fields=()): diff --git a/dojo/celery.py b/dojo/celery.py index d747ee7a4cd..5f2935b4460 100644 --- a/dojo/celery.py +++ b/dojo/celery.py @@ -1,5 +1,6 @@ import logging import os +from logging.config import dictConfig from celery import Celery from celery.signals import setup_logging @@ -26,7 +27,6 @@ def debug_task(self): @setup_logging.connect def config_loggers(*args, **kwags): - from logging.config import dictConfig dictConfig(settings.LOGGING) diff --git a/dojo/context_processors.py b/dojo/context_processors.py index 7a3c84af035..adcf0b9874f 100644 --- a/dojo/context_processors.py +++ b/dojo/context_processors.py @@ -1,8 +1,11 @@ import contextlib +import time # import the settings file from django.conf import settings +from dojo.models import Alerts, System_Settings, UserAnnouncement + def globalize_vars(request): # return the value you want as a dictionnary. you may add multiple values in there. @@ -34,14 +37,11 @@ def globalize_vars(request): def bind_system_settings(request): - from dojo.models import System_Settings - return {"system_settings": System_Settings.objects.get()} def bind_alert_count(request): if not settings.DISABLE_ALERT_COUNTER: - from dojo.models import Alerts if hasattr(request, "user") and request.user.is_authenticated: return {"alert_count": Alerts.objects.filter(user_id=request.user).count()} @@ -49,8 +49,6 @@ def bind_alert_count(request): def bind_announcement(request): - from dojo.models import UserAnnouncement - with contextlib.suppress(Exception): # TODO: this should be replaced with more meaningful exception if request.user.is_authenticated: user_announcement = UserAnnouncement.objects.select_related( @@ -61,8 +59,6 @@ def bind_announcement(request): def session_expiry_notification(request): - import time - try: if request.user.is_authenticated: last_activity = request.session.get("_last_activity", time.time()) diff --git a/dojo/decorators.py b/dojo/decorators.py index 791e76cb8ab..1d1f6aac67c 100644 --- a/dojo/decorators.py +++ b/dojo/decorators.py @@ -55,8 +55,7 @@ def get_tasks(self): def we_want_async(*args, func=None, **kwargs): - from dojo.models import Dojo_User - from dojo.utils import get_current_user + from dojo.utils import get_current_user # noqa: PLC0415 circular import sync = kwargs.get("sync", False) if sync: @@ -83,7 +82,7 @@ def we_want_async(*args, func=None, **kwargs): def dojo_async_task(func): @wraps(func) def __wrapper__(*args, **kwargs): - from dojo.utils import get_current_user + from dojo.utils import get_current_user # noqa: PLC0415 circular import user = get_current_user() kwargs["async_user"] = user diff --git a/dojo/filters.py b/dojo/filters.py index 940d1f970f5..c6a5039db51 100644 --- a/dojo/filters.py +++ b/dojo/filters.py @@ -3465,7 +3465,6 @@ class Meta: class LogEntryFilter(DojoFilter): - from auditlog.models import LogEntry action = MultipleChoiceFilter(choices=LogEntry.Action.choices) actor = ModelMultipleChoiceFilter(queryset=Dojo_User.objects.none()) diff --git a/dojo/finding/helper.py b/dojo/finding/helper.py index 3f6a597a542..ad7ad916b2d 100644 --- a/dojo/finding/helper.py +++ b/dojo/finding/helper.py @@ -27,7 +27,15 @@ Vulnerability_Id_Template, ) from dojo.notes.helper import delete_related_notes -from dojo.utils import get_current_user, mass_model_updater, to_str_typed +from dojo.tools import tool_issue_updater +from dojo.utils import ( + calculate_grade, + do_dedupe_finding, + do_false_positive_history, + get_current_user, + mass_model_updater, + to_str_typed, +) logger = logging.getLogger(__name__) deduplicationLogger = logging.getLogger("dojo.specific-loggers.deduplication") @@ -366,7 +374,6 @@ def post_process_finding_save(finding, dedupe_option=True, rules_option=True, pr if dedupe_option: if finding.hash_code is not None: if system_settings.enable_deduplication: - from dojo.utils import do_dedupe_finding do_dedupe_finding(finding, *args, **kwargs) else: deduplicationLogger.debug("skipping dedupe because it's disabled in system settings") @@ -378,17 +385,14 @@ def post_process_finding_save(finding, dedupe_option=True, rules_option=True, pr if system_settings.enable_deduplication: deduplicationLogger.warning("skipping false positive history because deduplication is also enabled") else: - from dojo.utils import do_false_positive_history do_false_positive_history(finding, *args, **kwargs) # STEP 2 run all non-status changing tasks as celery tasks in the background if issue_updater_option: - from dojo.tools import tool_issue_updater tool_issue_updater.async_tool_issue_update(finding) if product_grading_option: if system_settings.enable_product_grade: - from dojo.utils import calculate_grade calculate_grade(finding.test.engagement.product) else: deduplicationLogger.debug("skipping product grading because it's disabled in system settings") @@ -396,7 +400,6 @@ def post_process_finding_save(finding, dedupe_option=True, rules_option=True, pr # Adding a snippet here for push to JIRA so that it's in one place if push_to_jira: logger.debug("pushing finding %s to jira from finding.save()", finding.pk) - import dojo.jira_link.helper as jira_helper # current approach is that whenever a finding is in a group, the group will be pushed to JIRA # based on feedback we could introduct another push_group_to_jira boolean everywhere diff --git a/dojo/finding/views.py b/dojo/finding/views.py index fc7fe43284f..9d1ebd547e2 100644 --- a/dojo/finding/views.py +++ b/dojo/finding/views.py @@ -101,6 +101,7 @@ ) from dojo.notifications.helper import create_notification from dojo.test.queries import get_authorized_tests +from dojo.tools import tool_issue_updater from dojo.utils import ( FileIterWrapper, Product_Tab, @@ -2918,8 +2919,6 @@ def finding_bulk_update_all(request, pid=None): error_counts = defaultdict(lambda: 0) success_count = 0 for finding in finds: - from dojo.tools import tool_issue_updater - tool_issue_updater.async_tool_issue_update(finding) # not sure yet if we want to support bulk unlink, so leave as commented out for now diff --git a/dojo/forms.py b/dojo/forms.py index 7362cacce95..6cb114c573d 100644 --- a/dojo/forms.py +++ b/dojo/forms.py @@ -18,6 +18,7 @@ from django.contrib.auth.password_validation import validate_password from django.core import validators from django.core.exceptions import ValidationError +from django.core.validators import URLValidator from django.db.models import Count, Q from django.forms import modelformset_factory from django.forms.widgets import Select, Widget @@ -384,8 +385,6 @@ class EditFindingGroupForm(forms.ModelForm): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - import dojo.jira_link.helper as jira_helper - self.fields["push_to_jira"] = forms.BooleanField() self.fields["push_to_jira"].required = False self.fields["push_to_jira"].help_text = "Checking this will overwrite content of your JIRA issue, or create one." @@ -2596,7 +2595,6 @@ class BaseJiraForm(forms.ModelForm): password = forms.CharField(widget=forms.PasswordInput, required=True, help_text=JIRA_Instance._meta.get_field("password").help_text, label=JIRA_Instance._meta.get_field("password").verbose_name) def test_jira_connection(self): - import dojo.jira_link.helper as jira_helper try: # Attempt to validate the credentials before moving forward jira_helper.get_jira_connection_raw(self.cleaned_data["url"], @@ -2665,13 +2663,6 @@ class Meta: fields = ["id"] -# class JIRA_ProjectForm(forms.ModelForm): - -# class Meta: -# model = JIRA_Project -# exclude = ['product'] - - class Product_API_Scan_ConfigurationForm(forms.ModelForm): def __init__(self, *args, **kwargs): @@ -2768,7 +2759,6 @@ class Meta: exclude = ["product"] def clean(self): - from django.core.validators import URLValidator form_data = self.cleaned_data try: @@ -2849,7 +2839,6 @@ class Meta: order = ["name"] def clean(self): - from django.core.validators import URLValidator form_data = self.cleaned_data try: @@ -3061,7 +3050,6 @@ class Meta: fields = ["inherit_from_product", "jira_instance", "project_key", "issue_template_dir", "epic_issue_type_name", "component", "custom_fields", "jira_labels", "default_assignee", "enabled", "add_vulnerability_id_to_jira_label", "push_all_issues", "enable_engagement_epic_mapping", "push_notes", "product_jira_sla_notification", "risk_acceptance_expiration_notification"] def __init__(self, *args, **kwargs): - from dojo.jira_link import helper as jira_helper # if the form is shown for an engagement, we set a placeholder text around inherited settings from product self.target = kwargs.pop("target", "product") self.product = kwargs.pop("product", None) diff --git a/dojo/github.py b/dojo/github.py index 011a3feb28e..1f0f33c3313 100644 --- a/dojo/github.py +++ b/dojo/github.py @@ -16,7 +16,7 @@ def reopen_external_issue_github(find, note, prod, eng): - from dojo.utils import get_system_setting + from dojo.utils import get_system_setting # noqa: PLC0415 circular import if not get_system_setting("enable_github"): return @@ -47,7 +47,7 @@ def reopen_external_issue_github(find, note, prod, eng): def close_external_issue_github(find, note, prod, eng): - from dojo.utils import get_system_setting + from dojo.utils import get_system_setting # noqa: PLC0415 circular import if not get_system_setting("enable_github"): return @@ -78,7 +78,7 @@ def close_external_issue_github(find, note, prod, eng): def update_external_issue_github(find, prod, eng): - from dojo.utils import get_system_setting + from dojo.utils import get_system_setting # noqa: PLC0415 circular import if not get_system_setting("enable_github"): return @@ -106,7 +106,7 @@ def update_external_issue_github(find, prod, eng): def add_external_issue_github(find, prod, eng): - from dojo.utils import get_system_setting + from dojo.utils import get_system_setting # noqa: PLC0415 circular import if not get_system_setting("enable_github"): return diff --git a/dojo/jira_link/helper.py b/dojo/jira_link/helper.py index 77977469240..2a4cb0ad205 100644 --- a/dojo/jira_link/helper.py +++ b/dojo/jira_link/helper.py @@ -1,3 +1,4 @@ +import importlib import io import json import logging @@ -442,7 +443,6 @@ def connect_to_jira(jira_server, jira_username, jira_password): def get_jira_connect_method(): if hasattr(settings, "JIRA_CONNECT_METHOD"): try: - import importlib mn, _, fn = settings.JIRA_CONNECT_METHOD.rpartition(".") m = importlib.import_module(mn) return getattr(m, fn) @@ -1770,7 +1770,7 @@ def escape_for_jira(text): def process_resolution_from_jira(finding, resolution_id, resolution_name, assignee_name, jira_now, jira_issue, finding_group: Finding_Group = None) -> bool: """Processes the resolution field in the JIRA issue and updated the finding in Defect Dojo accordingly""" - import dojo.risk_acceptance.helper as ra_helper + import dojo.risk_acceptance.helper as ra_helper # noqa: PLC0415 import error status_changed = False resolved = resolution_id is not None jira_instance = get_jira_instance(finding) diff --git a/dojo/middleware.py b/dojo/middleware.py index c83e076b86c..ffae7dd9432 100644 --- a/dojo/middleware.py +++ b/dojo/middleware.py @@ -13,6 +13,7 @@ from django.urls import reverse from django.utils.functional import SimpleLazyObject +from dojo.models import Dojo_User from dojo.product_announcements import LongRunningRequestProductAnnouncement logger = logging.getLogger(__name__) @@ -65,7 +66,6 @@ def __call__(self, request): # this populates dd_user log var, so can appear in the uwsgi logs uwsgi.set_logvar("dd_user", str(request.user)) path = request.path_info.lstrip("/") - from dojo.models import Dojo_User if Dojo_User.force_password_reset(request.user) and path != "change_password": return HttpResponseRedirect(reverse("change_password")) @@ -77,8 +77,7 @@ class DojoSytemSettingsMiddleware: def __init__(self, get_response): self.get_response = get_response - # avoid circular imports - from dojo.models import System_Settings + from dojo.models import System_Settings # noqa: PLC0415 circular import models.signals.post_save.connect(self.cleanup, sender=System_Settings) def __call__(self, request): @@ -107,7 +106,7 @@ def cleanup(cls, *args, **kwargs): # noqa: ARG003 def load(cls): # cleanup any existing settings first to ensure fresh state cls.cleanup() - from dojo.models import System_Settings + from dojo.models import System_Settings # noqa: PLC0415 circular import system_settings = System_Settings.objects.get(no_cache=True) cls._thread_local.system_settings = system_settings return system_settings @@ -120,7 +119,7 @@ def get_from_db(self, *args, **kwargs): try: from_db = super().get(*args, **kwargs) except: - from dojo.models import System_Settings + from dojo.models import System_Settings # noqa: PLC0415 circular import # this mimics the existing code that was in filters.py and utils.py. # cases I have seen triggering this is for example manage.py collectstatic inside a docker build where mysql is not available # logger.debug('unable to get system_settings from database, constructing (new) default instance. Exception was:', exc_info=True) diff --git a/dojo/notifications/helper.py b/dojo/notifications/helper.py index a92ef11fc0a..c525bb4fb8c 100644 --- a/dojo/notifications/helper.py +++ b/dojo/notifications/helper.py @@ -696,8 +696,7 @@ def _process_objects(self, **kwargs: dict) -> None: self.product = finding.test.engagement.product logger.debug("Defined product of finding %s", self.product) elif (obj := kwargs.get("obj")) is not None: - from dojo.utils import get_product - + from dojo.utils import get_product # noqa: PLC0415 circular import self.product = get_product(obj) logger.debug("Defined product of obj %s", self.product) diff --git a/dojo/tasks.py b/dojo/tasks.py index 90d4a928bf4..0957ac40194 100644 --- a/dojo/tasks.py +++ b/dojo/tasks.py @@ -11,6 +11,8 @@ from django.utils import timezone from dojo.celery import app +from dojo.finding.helper import fix_loop_duplicates +from dojo.management.commands.jira_status_reconciliation import jira_status_reconciliation from dojo.models import Alerts, Announcement, Endpoint, Engagement, Finding, Product, System_Settings, User from dojo.notifications.helper import create_notification from dojo.utils import calculate_grade, sla_compute_and_notify @@ -182,13 +184,11 @@ def async_sla_compute_and_notify_task(*args, **kwargs): @app.task def jira_status_reconciliation_task(*args, **kwargs): - from dojo.management.commands.jira_status_reconciliation import jira_status_reconciliation return jira_status_reconciliation(*args, **kwargs) @app.task def fix_loop_duplicates_task(*args, **kwargs): - from dojo.finding.helper import fix_loop_duplicates return fix_loop_duplicates() diff --git a/dojo/templatetags/display_tags.py b/dojo/templatetags/display_tags.py index 85b704c3c47..375cae915ef 100644 --- a/dojo/templatetags/display_tags.py +++ b/dojo/templatetags/display_tags.py @@ -6,6 +6,7 @@ import re from ast import literal_eval from itertools import chain +from pathlib import Path import bleach import dateutil.relativedelta @@ -25,8 +26,9 @@ import dojo.jira_link.helper as jira_helper import dojo.utils +from dojo import __docs__, __version__ from dojo.models import Benchmark_Product, Check_List, Dojo_User, FileAccessToken, Finding, Product, System_Settings -from dojo.utils import get_file_images, get_full_url, get_system_setting, prepare_for_view +from dojo.utils import calculate_grade, get_file_images, get_full_url, get_system_setting, prepare_for_view logger = logging.getLogger(__name__) @@ -122,7 +124,6 @@ def checklist_status(value): @register.simple_tag def dojo_version(): - from dojo import __version__ version = __version__ if settings.FOOTER_VERSION: version = settings.FOOTER_VERSION @@ -151,7 +152,6 @@ def display_date_with_secs(obj): @register.simple_tag def dojo_docs_url(): - from dojo import __docs__ return mark_safe(__docs__) @@ -305,7 +305,6 @@ def product_grade(product): prod_numeric_grade = product.prod_numeric_grade if prod_numeric_grade == "" or prod_numeric_grade is None: - from dojo.utils import calculate_grade calculate_grade(product) if prod_numeric_grade: if prod_numeric_grade >= system_settings.product_grade_a: @@ -900,7 +899,6 @@ def jira_severity(findings): @register.filter def get_thumbnail(file): - from pathlib import Path file_format = Path(file.file.url).suffix[1:] return file_format in supported_thumbnail_file_formats diff --git a/dojo/tools/snyk/parser.py b/dojo/tools/snyk/parser.py index 7de453d924e..a3cdd43cb1e 100644 --- a/dojo/tools/snyk/parser.py +++ b/dojo/tools/snyk/parser.py @@ -1,3 +1,4 @@ +import io import json from cvss.cvss3 import CVSS3 @@ -122,7 +123,6 @@ def get_items(self, tree, test): if "runs" in tree and tree["runs"][0].get("results"): # Delegate SARIF handling to Snyk Code parser snyk_code_parser = SnykCodeParser() - import io json_output = io.StringIO(json.dumps(tree)) findings = snyk_code_parser.get_findings(json_output, test) items.extend(findings) diff --git a/dojo/tools/tool_issue_updater.py b/dojo/tools/tool_issue_updater.py index 79c4d0ff108..add5beb54a5 100644 --- a/dojo/tools/tool_issue_updater.py +++ b/dojo/tools/tool_issue_updater.py @@ -1,6 +1,8 @@ from dojo.celery import app from dojo.decorators import dojo_async_task, dojo_model_from_id, dojo_model_to_id from dojo.tools.api_sonarqube.parser import SCAN_SONARQUBE_API +from dojo.tools.api_sonarqube.updater import SonarQubeApiUpdater +from dojo.tools.api_sonarqube.updater_from_source import SonarQubeApiUpdaterFromSource def async_tool_issue_update(finding, *args, **kwargs): @@ -22,14 +24,12 @@ def tool_issue_updater(finding, *args, **kwargs): test_type = finding.test.test_type if test_type.name == SCAN_SONARQUBE_API: - from dojo.tools.api_sonarqube.updater import SonarQubeApiUpdater SonarQubeApiUpdater().update_sonarqube_finding(finding) @dojo_async_task @app.task def update_findings_from_source_issues(**kwargs): - from dojo.tools.api_sonarqube.updater_from_source import SonarQubeApiUpdaterFromSource findings = SonarQubeApiUpdaterFromSource().get_findings_to_update() diff --git a/dojo/utils.py b/dojo/utils.py index 929fef76f0f..a8b6b1aa86b 100644 --- a/dojo/utils.py +++ b/dojo/utils.py @@ -591,8 +591,6 @@ def count_findings(findings: QuerySet) -> tuple[dict["Product", list[int]], dict ) rows = list(agg) - from dojo.models import Product # imported lazily to avoid circulars - products = Product.objects.in_bulk([r["prod_id"] for r in rows]) product_count = { products[r["prod_id"]]: [r["crit"], r["high"], r["med"], r["low"], r["total"]] for r in rows @@ -1603,7 +1601,7 @@ def calculate_grade(product, *args, **kwargs): def get_celery_worker_status(): - from .tasks import celery_status + from .tasks import celery_status # noqa: PLC0415 circular import res = celery_status.apply_async() # Wait 5 seconds for a response from Celery @@ -1850,7 +1848,7 @@ def sla_compute_and_notify(*args, **kwargs): Notifications are managed the usual way, so you'd have to opt-in. Exception is for JIRA issues, which would get a comment anyways. """ - import dojo.jira_link.helper as jira_helper + import dojo.jira_link.helper as jira_helper # noqa: PLC0415 circular import class NotificationEntry: def __init__(self, finding=None, jira_issue=None, *, do_jira_sla_comment=False): diff --git a/ruff.toml b/ruff.toml index 273ff180496..0987acc09ff 100644 --- a/ruff.toml +++ b/ruff.toml @@ -82,7 +82,7 @@ select = [ "D2", "D3", "D402", "D403", "D405", "D406", "D407", "D408", "D409", "D410", "D411", "D412", "D413", "D414", "D416", "F", "PGH", - "PLC01", "PLC02", "PLC0414", "PLC18", "PLC24", "PLC28", "PLC3", + "PLC0", "PLC18", "PLC24", "PLC28", "PLC3", "PLE", "PLR01", "PLR02", "PLR04", "PLR0915", "PLR1711", "PLR1704", "PLR1714", "PLR1716", "PLR172", "PLR173", "PLR2044", "PLR5", "PLR6104", "PLR6201", "PLW", @@ -114,6 +114,9 @@ preview = true "S105", # hardcoded passwords in tests are fine "S108", # tmp paths mentioned in tests are fine ] +"dojo/models.py" = [ + "PLC0415", # fix later +] [lint.flake8-boolean-trap] extend-allowed-calls = ["dojo.utils.get_system_setting"] diff --git a/tests/base_test_class.py b/tests/base_test_class.py index 69cbf140a1b..eae23f891e2 100644 --- a/tests/base_test_class.py +++ b/tests/base_test_class.py @@ -2,6 +2,7 @@ import os import re import unittest +import warnings from pathlib import Path from selenium import webdriver @@ -95,7 +96,6 @@ def setUpClass(cls): ) # TODO: - this filter needs to be removed - import warnings warnings.filterwarnings("ignore", message="executable_path has been deprecated, please pass in a Service object") warnings.filterwarnings("ignore", message="use options instead of chrome_options") warnings.filterwarnings("ignore", message="desired_capabilities has been deprecated, please pass in a Service object") diff --git a/unittests/test_endpoint_model.py b/unittests/test_endpoint_model.py index 847a76d80b6..94c2cfd9953 100644 --- a/unittests/test_endpoint_model.py +++ b/unittests/test_endpoint_model.py @@ -2,6 +2,7 @@ from unittest import skip from django.apps import apps +from django.contrib.auth import get_user_model from django.core.exceptions import ValidationError from django.utils import timezone @@ -221,7 +222,6 @@ def test_endpoint_status_broken(self): target_end=datetime.datetime(2022, 1, 1, tzinfo=timezone.utc), test_type_id=1, ) - from django.contrib.auth import get_user_model user = get_user_model().objects.create().pk self.finding = Finding.objects.create(test=self.test, reporter_id=user).pk self.endpoint = Endpoint.objects.create(protocol="http", host="foo.bar.eps").pk diff --git a/unittests/test_jira_import_and_pushing_api.py b/unittests/test_jira_import_and_pushing_api.py index a56ea11ce85..2073959f9e2 100644 --- a/unittests/test_jira_import_and_pushing_api.py +++ b/unittests/test_jira_import_and_pushing_api.py @@ -3,6 +3,7 @@ from unittest.mock import patch from crum import impersonate +from django.contrib.auth import get_user_model from django.urls import reverse from rest_framework.authtoken.models import Token from rest_framework.test import APIClient @@ -22,6 +23,7 @@ logger = logging.getLogger(__name__) + # these tests are using vcrpy to record traffic to and from JIRA: https://vcrpy.readthedocs.io/en/latest/usage.html # after being recorded, the traffic is used for future runs of the tests # this allows us to locally develop tests, run them, make them work against a real JIRA instance. @@ -1018,7 +1020,6 @@ def test_bulk_edit_mixed_findings_and_groups_jira_push_bug(self, mock_webhooks, self.assertGreater(len(ungrouped_findings), 0, "Should have some ungrouped findings") # Use Django test client instead of RequestFactory for proper auth - from django.contrib.auth import get_user_model # Prepare bulk edit request data # Get the current finding IDs after group modifications diff --git a/unittests/test_migrations.py b/unittests/test_migrations.py index 804739e5cb2..28e7a013f4d 100644 --- a/unittests/test_migrations.py +++ b/unittests/test_migrations.py @@ -1,6 +1,7 @@ import datetime from unittest import skip +from django.contrib.auth import get_user_model from django.utils import timezone from django_test_migrations.contrib.unittest_case import MigratorTestCase @@ -32,7 +33,6 @@ def prepare(self): target_end=datetime.datetime(2022, 1, 1, tzinfo=timezone.utc), test_type_id=1, ) - from django.contrib.auth import get_user_model user = get_user_model().objects.create().pk self.finding = Finding.objects.create(test_id=self.test.pk, reporter_id=user).pk diff --git a/unittests/test_rest_framework.py b/unittests/test_rest_framework.py index 18d1b38632d..afbd4d1ae6f 100644 --- a/unittests/test_rest_framework.py +++ b/unittests/test_rest_framework.py @@ -16,6 +16,7 @@ from django.urls import reverse from drf_spectacular.drainage import GENERATOR_STATS from drf_spectacular.settings import spectacular_settings +from drf_spectacular.validation import validate_schema from rest_framework import status from rest_framework.authtoken.models import Token from rest_framework.mixins import ( @@ -161,7 +162,6 @@ def get_open_api3_json_schema(): schema = generator.get_schema(request=None, public=True) GENERATOR_STATS.emit_summary() - from drf_spectacular.validation import validate_schema validate_schema(schema) return schema diff --git a/unittests/tools/test_nuclei_parser.py b/unittests/tools/test_nuclei_parser.py index 1ff970711e2..cfd260dd80b 100644 --- a/unittests/tools/test_nuclei_parser.py +++ b/unittests/tools/test_nuclei_parser.py @@ -1,6 +1,7 @@ from datetime import datetime from dateutil.tz import tzoffset +from django.contrib.auth import get_user_model from dojo.models import Test, Test_Type from dojo.tools.nuclei.parser import NucleiParser @@ -259,7 +260,6 @@ def test_parse_same_template_multiple_matches(self): self.assertEqual("turquoise-estrellita-69.tiiny.site", finding.unsaved_endpoints[0].host) # required to compute hash code, same as in test_endpoint_model - move to some test utils module? - from django.contrib.auth import get_user_model user, _ = get_user_model().objects.get_or_create(username="importer") product_type = self.create_product_type("prod_type") product = self.create_product("test_deduplicate_finding", prod_type=product_type) diff --git a/unittests/tools/test_ptart_parser.py b/unittests/tools/test_ptart_parser.py index bc22e7dda90..78d0ccafb46 100644 --- a/unittests/tools/test_ptart_parser.py +++ b/unittests/tools/test_ptart_parser.py @@ -1,5 +1,18 @@ from dojo.models import Engagement, Product, Test from dojo.tools.ptart.parser import PTARTParser +from dojo.tools.ptart.ptart_parser_tools import ( + generate_test_description_from_report, + parse_attachment_from_hit, + parse_cwe_from_hit, + parse_cwe_id_from_cwe, + parse_endpoints_from_hit, + parse_ptart_fix_effort, + parse_ptart_severity, + parse_references_from_hit, + parse_retest_status, + parse_screenshots_from_hit, + parse_title_from_hit, +) from unittests.dojo_test_case import DojoTestCase, get_unit_tests_scans_path @@ -13,7 +26,6 @@ def setUp(self): self.test = Test(engagement=self.engagement) def test_ptart_parser_tools_parse_ptart_severity(self): - from dojo.tools.ptart.ptart_parser_tools import parse_ptart_severity with self.subTest("Critical"): self.assertEqual("Critical", parse_ptart_severity(1)) with self.subTest("High"): @@ -28,7 +40,6 @@ def test_ptart_parser_tools_parse_ptart_severity(self): self.assertEqual("Info", parse_ptart_severity(6)) def test_ptart_parser_tools_parse_ptart_fix_effort(self): - from dojo.tools.ptart.ptart_parser_tools import parse_ptart_fix_effort with self.subTest("High"): self.assertEqual("High", parse_ptart_fix_effort(1)) with self.subTest("Medium"): @@ -39,7 +50,6 @@ def test_ptart_parser_tools_parse_ptart_fix_effort(self): self.assertEqual(None, parse_ptart_fix_effort(4)) def test_ptart_parser_tools_parse_title_from_hit(self): - from dojo.tools.ptart.ptart_parser_tools import parse_title_from_hit with self.subTest("Title and ID"): self.assertEqual("1234: Test Title", parse_title_from_hit({"title": "Test Title", "id": "1234"})) with self.subTest("Title Only"): @@ -60,7 +70,6 @@ def test_ptart_parser_tools_parse_title_from_hit(self): self.assertEqual("Test Title", parse_title_from_hit({"title": "Test Title", "id": ""})) def test_ptart_parser_tools_retest_fix_status_parse(self): - from dojo.tools.ptart.ptart_parser_tools import parse_retest_status with self.subTest("Fixed"): self.assertEqual("Fixed", parse_retest_status("F")) with self.subTest("Not Fixed"): @@ -77,7 +86,6 @@ def test_ptart_parser_tools_retest_fix_status_parse(self): self.assertEqual(None, parse_retest_status("")) def test_ptart_parser_tools_parse_screenshots_from_hit(self): - from dojo.tools.ptart.ptart_parser_tools import parse_screenshots_from_hit with self.subTest("No Screenshots"): hit = {} screenshots = parse_screenshots_from_hit(hit) @@ -193,7 +201,6 @@ def test_ptart_parser_tools_parse_screenshots_from_hit(self): "Invalid Screenshot Data") def test_ptart_parser_tools_parse_attachment_from_hit(self): - from dojo.tools.ptart.ptart_parser_tools import parse_attachment_from_hit with self.subTest("No Attachments"): hit = {} attachments = parse_attachment_from_hit(hit) @@ -271,7 +278,6 @@ def test_ptart_parser_tools_parse_attachment_from_hit(self): self.assertTrue(attachment["data"] == "TUlUIExpY2Vuc2UKCkNvcHl", "Invalid Attachment Data") def test_ptart_parser_tools_get_description_from_report_base(self): - from dojo.tools.ptart.ptart_parser_tools import generate_test_description_from_report with self.subTest("No Description"): data = {} self.assertEqual(None, generate_test_description_from_report(data)) @@ -335,7 +341,6 @@ def test_ptart_parser_tools_get_description_from_report_base(self): self.assertEqual("This is an overview", generate_test_description_from_report(data)) def test_ptart_parser_tools_parse_references_from_hit(self): - from dojo.tools.ptart.ptart_parser_tools import parse_references_from_hit with self.subTest("No References"): hit = {} self.assertEqual(None, parse_references_from_hit(hit)) @@ -392,7 +397,6 @@ def test_ptart_parser_tools_parse_references_from_hit(self): self.assertEqual("Reference1: https://ref.example.com\nReference: https://ref3.example.com", parse_references_from_hit(hit)) def test_ptart_parser_tools_parse_cwe_id_from_cwe(self): - from dojo.tools.ptart.ptart_parser_tools import parse_cwe_id_from_cwe with self.subTest("Valid CWE"): self.assertEqual(862, parse_cwe_id_from_cwe({"cwe_id": 862, "title": "CWE-862 - Missing Authorization"})) with self.subTest("Invalid CWE ID Type"): @@ -405,7 +409,6 @@ def test_ptart_parser_tools_parse_cwe_id_from_cwe(self): self.assertEqual(None, parse_cwe_id_from_cwe(None)) def test_ptart_parser_tools_parse_cwe_from_hit(self): - from dojo.tools.ptart.ptart_parser_tools import parse_cwe_from_hit with self.subTest("Valid CWE"): hit = { "cwes": [{ @@ -442,7 +445,6 @@ def test_ptart_parser_tools_parse_cwe_from_hit(self): self.assertEqual(None, parse_cwe_from_hit(hit)) def test_ptart_parser_tools_parse_endpoints_from_hit(self): - from dojo.tools.ptart.ptart_parser_tools import parse_endpoints_from_hit with self.subTest("No Asset"): hit = {} self.assertEqual([], parse_endpoints_from_hit(hit))