-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Dynamic groups #13001
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Dynamic groups #13001
Conversation
🔴 Risk threshold exceeded.This pull request contains multiple security findings, including potential Denial of Service vulnerabilities related to Redis cache rebuilding and in-memory sorting, an authorization bypass issue in dynamic finding group filters, and several sensitive file edits that require review and configuration in
🔴 Configured Codepaths Edit in
|
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/finding/helper.py
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/finding_group/redis.py
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/finding_group/urls.py
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/finding_group/views_dynamic.py
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/models.py
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/templates/base.html
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/templates/dojo/finding_group_dynamic_findings.html
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/templates/dojo/finding_group_dynamic_findings_list_snippet.html
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/templates/dojo/finding_groups_dynamic_list.html
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/templates/dojo/finding_groups_dynamic_list_snippet.html
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/filters.py
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/finding/helper.py
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/models.py
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/filters.py
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/finding_group/redis.py
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/finding_group/views_dynamic.py
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/templates/dojo/finding_group_dynamic_findings_list_snippet.html
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
🔴 Configured Codepaths Edit in dojo/templates/dojo/finding_groups_dynamic_list_snippet.html
Vulnerability | Configured Codepaths Edit |
---|---|
Description | Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml . |
Potential Denial of Service (DoS) via Resource Exhaustion in dojo/models.py
Vulnerability | Potential Denial of Service (DoS) via Resource Exhaustion |
---|---|
Description | The patch introduces a critical performance bottleneck and potential Denial of Service (DoS) vulnerability. Every time a finding is created, updated, or deleted, a blocking Redis SET operation is performed to update a timestamp (LAST_FINDING_CHANGE ). This high volume of synchronous writes, especially during bulk operations like scanner imports, can overwhelm the Redis server and/or the application's connection pool. This timestamp also acts as a cache invalidation mechanism for dynamic finding groups. If LAST_FINDING_CHANGE is newer than LAST_FINDING_UPDATE , the entire dynamic finding group cache is rebuilt by iterating over all findings in the database. This resource-intensive full cache rebuild will be triggered frequently, leading to degraded performance and potential Denial of Service for users accessing dynamic finding groups. |
django-DefectDojo/dojo/models.py
Lines 2825 to 2833 in 8ff0705
else: | |
logger.debug("no options selected that require finding post processing") | |
from dojo.finding_group.redis import DynamicFindingGroups | |
DynamicFindingGroups.set_last_finding_change() | |
def get_absolute_url(self): | |
from django.urls import reverse | |
return reverse("view_finding", args=[str(self.id)]) |
Denial of Service via Unbounded Cache Rebuild in dojo/finding_group/redis.py
Vulnerability | Denial of Service via Unbounded Cache Rebuild |
---|---|
Description | The load_or_rebuild_finding_groups function iterates over all Finding objects in the database to rebuild the Redis cache. This rebuild is triggered whenever a finding or vulnerability ID is saved or deleted, causing a timestamp mismatch. In a large system, this operation is extremely resource-intensive (CPU, memory, database, Redis I/O). The absence of a locking mechanism allows multiple concurrent rebuilds, leading to a Denial of Service. |
django-DefectDojo/dojo/finding_group/redis.py
Lines 1 to 226 in 8ff0705
import base64 | |
import json | |
import logging | |
import os | |
from dataclasses import asdict, dataclass, field | |
from datetime import datetime | |
from enum import StrEnum | |
from functools import lru_cache | |
from typing import Self | |
import redis | |
from django.conf import settings | |
from django.utils.functional import cached_property | |
from dojo.models import Finding | |
logger = logging.getLogger(__name__) | |
DD_TEST = os.getenv("DD_TEST", "False").lower() == "true" | |
USER_MODES_KEY = "finding_groups_user_modes" | |
LAST_FINDING_CHANGE = "finding_groups_last_finding_change" | |
LAST_FINDING_UPDATE = "finding_groups_last_update" | |
class GroupMode(StrEnum): | |
VULN_ID_FROM_TOOL = "vuln_id_from_tool" | |
TITLE = "title" | |
CVE = "cve" | |
@dataclass | |
class DynamicFindingGroups: | |
finding_group_id: str | |
name: str = "" | |
severity: str = "Info" | |
main_finding_id: int | None = None | |
finding_ids: set[int] = field(default_factory=set) | |
def to_dict(self) -> dict: | |
data = asdict(self) | |
data["finding_ids"] = list(data["finding_ids"]) | |
return data | |
@staticmethod | |
def from_dict(data: dict) -> Self: | |
data["finding_ids"] = set(data.get("finding_ids", [])) | |
return DynamicFindingGroups(**data) | |
@staticmethod | |
def load_from_id(finding_group_id: str, fg_key: str) -> Self | None: | |
redis_client = get_redis_client() | |
finding_group_data = redis_client.hget(fg_key, finding_group_id) | |
if finding_group_data: | |
return DynamicFindingGroups.from_dict(json.loads(finding_group_data)) | |
return None | |
def update_sev_sla(self, finding: Finding) -> None: | |
if Finding.get_number_severity(finding.severity) > Finding.get_number_severity(self.severity): | |
self.severity = finding.severity | |
self.main_finding_id = finding.id | |
def add(self, finding: Finding) -> None: | |
self.update_sev_sla(finding) | |
self.finding_ids.add(finding.id) | |
# This method is used when we filter findings in a finding group | |
def reconfig_finding_group(self) -> None: | |
self.severity = "Info" | |
findings = Finding.objects.filter(id__in=self.finding_ids) | |
for finding in findings: | |
self.update_sev_sla(finding) | |
@staticmethod | |
def get_group_names(finding: Finding, mode: GroupMode) -> list[str] | None: | |
if mode == GroupMode.VULN_ID_FROM_TOOL: | |
if finding.vuln_id_from_tool: | |
return [finding.vuln_id_from_tool] | |
if mode == GroupMode.TITLE: | |
if finding.title: | |
return [finding.title] | |
if mode == GroupMode.CVE: | |
cves = [ | |
cve for cve in finding.vulnerability_id_set.values_list("vulnerability_id", flat=True) | |
if cve | |
] | |
if cves: | |
return cves | |
return None | |
@staticmethod | |
def get_fg_key(mode: GroupMode) -> str: | |
return f"finding_groups_{mode.value}" | |
@staticmethod | |
def get_id_map_key(mode: GroupMode) -> str: | |
return f"finding_groups_id_to_finding_group_{mode.value}" | |
@staticmethod | |
def set_last_finding_change() -> None: | |
if DD_TEST: | |
logger.info("Redis is not used in test environment, skipping.") | |
return | |
redis_client = get_redis_client() | |
redis_client.set(LAST_FINDING_CHANGE, datetime.now().isoformat()) | |
@staticmethod | |
def set_last_update(mode: GroupMode, timestamp: datetime | None = None) -> None: | |
if timestamp is None: | |
return | |
redis_client = get_redis_client() | |
redis_client.hset(LAST_FINDING_UPDATE, mode.value, timestamp.isoformat()) | |
@staticmethod | |
def add_finding(finding: Finding, mode: GroupMode) -> None: | |
finding_groups = DynamicFindingGroups.get_group_names(finding, mode) | |
if not finding_groups: | |
return | |
redis_client = get_redis_client() | |
for finding_group_name in finding_groups: | |
finding_group_id = base64.b64encode(finding_group_name.encode()).decode() | |
fg_key = DynamicFindingGroups.get_fg_key(mode) | |
id_map_key = DynamicFindingGroups.get_id_map_key(mode) | |
finding_group = DynamicFindingGroups.load_from_id(finding_group_id, fg_key) | |
if not finding_group: | |
finding_group = DynamicFindingGroups( | |
finding_group_id=finding_group_id, | |
name=finding_group_name, | |
) | |
if finding.id not in finding_group.finding_ids: | |
finding_group.add(finding) | |
redis_client.hset(fg_key, finding_group_id, json.dumps(finding_group.to_dict())) | |
group_ids_raw = redis_client.hget(id_map_key, finding.id) | |
group_ids = json.loads(group_ids_raw) if group_ids_raw else [] | |
if finding_group_id not in group_ids: | |
group_ids.append(finding_group_id) | |
redis_client.hset(id_map_key, finding.id, json.dumps(group_ids)) | |
@cached_property | |
def sla_days_remaining_internal(self): | |
findings = Finding.objects.filter(id__in=self.finding_ids, active=True) | |
if not findings: | |
return None | |
return min([find.sla_days_remaining() for find in findings if find.sla_days_remaining()], default=None) | |
@property | |
def sla_days_remaining(self) -> int | None: | |
return self.sla_days_remaining_internal | |
@lru_cache(maxsize=1) | |
def get_redis_client() -> redis.Redis: | |
host = getattr(settings, "REDIS_HOST", "redis") | |
port = getattr(settings, "REDIS_PORT", 6379) | |
return redis.Redis(host=host, port=port, decode_responses=True) | |
def get_user_mode(user_id: int) -> GroupMode | None: | |
redis_client = get_redis_client() | |
value = redis_client.hget(USER_MODES_KEY, str(user_id)) | |
if value and value not in [m.value for m in GroupMode]: | |
logger.warning(f"Invalid group mode '{value}' found in Redis for user {user_id}, resetting to None.") | |
redis_client.hdel(USER_MODES_KEY, str(user_id)) | |
return None | |
return GroupMode(value) if value else None | |
def set_user_mode(user_id: int, mode: GroupMode) -> None: | |
redis_client = get_redis_client() | |
redis_client.hset(USER_MODES_KEY, str(user_id), mode.value) | |
logger.info(f"User {user_id} dynamic finding groups mode set to {mode.value}") | |
def load_or_rebuild_finding_groups(mode: GroupMode) -> dict[str, DynamicFindingGroups]: | |
redis_client = get_redis_client() | |
fg_key = DynamicFindingGroups.get_fg_key(mode) | |
id_map_key = DynamicFindingGroups.get_id_map_key(mode) | |
if not redis_client.exists(LAST_FINDING_CHANGE): | |
DynamicFindingGroups.set_last_finding_change() | |
last_finding_change_raw = redis_client.get(LAST_FINDING_CHANGE) | |
try: | |
last_finding_change_time = datetime.fromisoformat(last_finding_change_raw) | |
except ValueError: | |
logger.warning(f"Invalid datetime format in Redis for {LAST_FINDING_CHANGE}: {last_finding_change_raw}, resetting last finding change.") | |
DynamicFindingGroups.set_last_finding_change() | |
last_finding_change_raw = redis_client.get(LAST_FINDING_CHANGE) | |
last_finding_change_time = datetime.fromisoformat(last_finding_change_raw) if last_finding_change_raw else None | |
try: | |
last_groups_update_time = redis_client.hget(LAST_FINDING_UPDATE, mode.value) | |
last_groups_update_time = datetime.fromisoformat(last_groups_update_time) if last_groups_update_time else None | |
except ValueError: | |
logger.warning(f"Invalid datetime format in Redis for {LAST_FINDING_UPDATE}: {last_groups_update_time}") | |
last_groups_update_time = None | |
# Check if finding_groups and id_map exist in Redis | |
# Check if last update is the same as last finding change | |
# If not, rebuild them | |
if ( | |
not redis_client.exists(fg_key) | |
or not redis_client.exists(id_map_key) | |
or last_groups_update_time != last_finding_change_time | |
): | |
if not last_finding_change_time: | |
logger.warning("Last finding change is not set, setting it to now.") | |
elif last_groups_update_time and last_finding_change_time < last_groups_update_time: | |
logger.warning("Last finding change is older than last update, they should be equal or last finding change should be newer.") | |
redis_client.delete(fg_key, id_map_key) | |
for finding in Finding.objects.all(): | |
DynamicFindingGroups.add_finding(finding, mode) | |
DynamicFindingGroups.set_last_update(mode, last_finding_change_time) | |
return _load_finding_groups_from_redis(fg_key, redis_client) | |
def _load_finding_groups_from_redis(fg_key: str, redis_client: redis.Redis) -> dict[str, DynamicFindingGroups]: | |
finding_groups_data = redis_client.hgetall(fg_key) | |
if finding_groups_data: | |
return { | |
key: DynamicFindingGroups.from_dict(json.loads(value)) | |
for key, value in finding_groups_data.items() | |
} | |
return {} |
Authorization Bypass (IDOR) in dojo/filters.py
Vulnerability | Authorization Bypass (IDOR) |
---|---|
Description | The DynamicFindingGroupsFilter and DynamicFindingGroupsFindingsFilter in dojo/filters.py are instantiated with request.GET , allowing a user to supply a pid (product ID) via a GET parameter. When pid is present, the set_related_object_fields method directly uses this pid in Engagement.objects.filter(product_id=self.pid) without performing an authorization check to ensure the user has permission to access the specified product. This bypasses the intended authorization flow, which is only applied when pid is not provided. |
django-DefectDojo/dojo/filters.py
Lines 2043 to 2120 in 8ff0705
self.form.fields["engagement"].queryset = get_authorized_engagements(Permissions.Engagement_View) | |
class DynamicFindingGroupsFilter(FilterSet): | |
name = CharFilter(lookup_expr="icontains", label="Name") | |
severity = ChoiceFilter( | |
choices=[ | |
("Low", "Low"), | |
("Medium", "Medium"), | |
("High", "High"), | |
("Critical", "Critical"), | |
], | |
label="Min Severity", | |
) | |
engagement = ModelMultipleChoiceFilter(queryset=Engagement.objects.none(), label="Engagement") | |
product = ModelMultipleChoiceFilter(queryset=Product.objects.none(), label="Product") | |
class Meta: | |
model = Finding | |
fields = ["name", "severity", "engagement", "product"] | |
def __init__(self, *args, **kwargs): | |
self.user = kwargs.pop("user", None) | |
self.pid = kwargs.pop("pid", None) | |
super().__init__(*args, **kwargs) | |
self.set_related_object_fields() | |
def set_related_object_fields(self): | |
if self.pid is not None: | |
self.form.fields["engagement"].queryset = Engagement.objects.filter(product_id=self.pid) | |
if "product" in self.form.fields: | |
del self.form.fields["product"] | |
else: | |
self.form.fields["product"].queryset = get_authorized_products(Permissions.Product_View) | |
self.form.fields["engagement"].queryset = get_authorized_engagements(Permissions.Engagement_View) | |
class DynamicFindingGroupsFindingsFilter(FilterSet): | |
name = CharFilter(lookup_expr="icontains", label="Name") | |
severity = MultipleChoiceFilter( | |
choices=[ | |
("Low", "Low"), | |
("Medium", "Medium"), | |
("High", "High"), | |
("Critical", "Critical"), | |
], | |
label="Severity", | |
) | |
vuln_id_from_tool = CharFilter(lookup_expr="icontains", label="Vulnerability Id From Tool") | |
reporter = ModelMultipleChoiceFilter(queryset=Dojo_User.objects.none(), label="Reporter") | |
active = ChoiceFilter(choices=[("Yes", "Yes"), ("No", "No")], label="Active") | |
engagement = ModelMultipleChoiceFilter(queryset=Engagement.objects.none(), label="Engagement") | |
product = ModelMultipleChoiceFilter(queryset=Product.objects.none(), label="Product") | |
class Meta: | |
model = Finding | |
fields = ["name", "severity", "vuln_id_from_tool", "reporter", "active", "engagement", "product"] | |
def __init__(self, *args, **kwargs): | |
self.user = kwargs.pop("user", None) | |
self.pid = kwargs.pop("pid", None) | |
super().__init__(*args, **kwargs) | |
self.set_related_object_fields() | |
def set_related_object_fields(self): | |
if self.pid is not None: | |
self.form.fields["engagement"].queryset = Engagement.objects.filter(product_id=self.pid) | |
if "product" in self.form.fields: | |
del self.form.fields["product"] | |
else: | |
self.form.fields["product"].queryset = get_authorized_products(Permissions.Product_View) | |
self.form.fields["engagement"].queryset = get_authorized_engagements(Permissions.Engagement_View) | |
self.form.fields["reporter"].queryset = get_authorized_users(Permissions.Finding_View) | |
class AcceptedFindingFilter(FindingFilter): | |
risk_acceptance__created__date = DateRangeFilter(label="Acceptance Date") | |
risk_acceptance__owner = ModelMultipleChoiceFilter( |
Denial of Service via In-Memory Sorting in dojo/finding_group/views_dynamic.py
Vulnerability | Denial of Service via In-Memory Sorting |
---|---|
Description | The ListDynamicFindingGroups and its subclasses load all relevant dynamic finding groups into memory, filter them, and then sort the entire collection in memory using Python's sorted() function. Pagination is applied only after this in-memory sorting is complete. For instances with a large number of findings and dynamic groups, especially when accessed by users with broad permissions (e.g., superusers), this can lead to significant memory consumption and high CPU usage, potentially causing a Denial of Service. |
django-DefectDojo/dojo/finding_group/views_dynamic.py
Lines 1 to 282 in 8ff0705
import logging | |
from django.core.paginator import Paginator | |
from django.http import HttpRequest | |
from django.shortcuts import render | |
from django.views import View | |
from dojo.authorization.roles_permissions import Permissions | |
from dojo.filters import DynamicFindingGroupsFilter, DynamicFindingGroupsFindingsFilter | |
from dojo.finding_group.redis import ( | |
GroupMode, | |
get_user_mode, | |
load_or_rebuild_finding_groups, | |
set_user_mode, | |
) | |
from dojo.forms import FindingBulkUpdateForm | |
from dojo.models import Finding, Global_Role | |
from dojo.product.queries import get_authorized_products | |
from dojo.utils import add_breadcrumb | |
logger = logging.getLogger(__name__) | |
def paginate_queryset(queryset, request: HttpRequest): | |
page_size = request.GET.get("page_size", 25) # Default is 25 | |
paginator = Paginator(queryset, page_size) | |
page_number = request.GET.get("page") | |
return paginator.get_page(page_number) | |
class ListDynamicFindingGroups(View): | |
filter_name = "All" | |
def get_template(self): | |
return "dojo/finding_groups_dynamic_list.html" | |
def order_field(self, request: HttpRequest, finding_groups_findings_list): | |
order_field = request.GET.get("o") | |
if order_field: | |
reverse_order = order_field.startswith("-") | |
if reverse_order: | |
order_field = order_field[1:] | |
if order_field == "name": | |
finding_groups_findings_list = sorted(finding_groups_findings_list, key=lambda x: x.name, reverse=reverse_order) | |
elif order_field == "findings_count": | |
finding_groups_findings_list = sorted(finding_groups_findings_list, key=lambda x: len(x.finding_ids), reverse=reverse_order) | |
return finding_groups_findings_list | |
def filters(self, request: HttpRequest): | |
name_filter = request.GET.get("name", "").lower() | |
min_severity_filter = request.GET.get("severity") | |
engagement_filter = request.GET.getlist("engagement") | |
product_filter = request.GET.getlist("product") | |
return name_filter, min_severity_filter, engagement_filter, product_filter | |
def filter_finding_group(self, finding_group, request: HttpRequest): | |
name_filter, min_severity_filter, engagement_filter, product_filter = self.filters(request) | |
add_finding_group = True | |
if product_filter: | |
finding_group.finding_ids = set(finding_group.finding_ids) & set( | |
Finding.objects.filter(test__engagement__product__id__in=product_filter).values_list("id", flat=True), | |
) | |
if engagement_filter: | |
finding_group.finding_ids = set(finding_group.finding_ids) & set( | |
Finding.objects.filter(test__engagement__id__in=engagement_filter).values_list("id", flat=True), | |
) | |
finding_group.reconfig_finding_group() | |
if name_filter and name_filter not in finding_group.name.lower(): | |
add_finding_group = False | |
if min_severity_filter and Finding.get_number_severity(finding_group.severity) < Finding.get_number_severity(min_severity_filter): | |
add_finding_group = False | |
if not finding_group.finding_ids: | |
add_finding_group = False | |
return add_finding_group | |
def get_findings(self, products): | |
finding_group_fids = { | |
fid for finding_group in self.finding_groups_map.values() for fid in finding_group.finding_ids | |
} | |
filters = {"id__in": finding_group_fids} | |
if products: | |
filters["test__engagement__product__in"] = products | |
user_findings_qs = Finding.objects.filter(**filters) | |
user_fids = set(user_findings_qs.values_list("id", flat=True)) | |
active_fids = set( | |
user_findings_qs.filter(active=True).values_list("id", flat=True), | |
) | |
return user_fids, active_fids | |
def get_finding_groups(self, request: HttpRequest, products=None): | |
""" | |
Retrieve all dynamic finding groups for the current user. | |
Steps: | |
1. Retrieve finding IDs relevant for the user (optionally filtered by products). | |
2. Iterate over all finding groups in self.finding_groups_map. | |
3. For each group: | |
- Restrict the group's findings to those the user can see. | |
- Apply additional filters based on the request. | |
- No additional filtering for active findings. | |
4. Append groups that pass all filters to the result list. | |
5. Order the resulting list according to the request via order_field and return. | |
""" | |
user_fids, _ = self.get_findings(products) | |
list_finding_group = [] | |
for finding_group in self.finding_groups_map.values(): | |
finding_group.finding_ids = set(finding_group.finding_ids) & user_fids | |
if self.filter_finding_group(finding_group, request): | |
list_finding_group.append(finding_group) | |
return self.order_field(request, list_finding_group) | |
def get(self, request: HttpRequest): | |
global_role = Global_Role.objects.filter(user=request.user).first() | |
products = get_authorized_products(Permissions.Product_View) | |
mode_str = request.GET.get("mode", None) | |
user_id = request.user.id | |
if mode_str: | |
try: | |
mode = GroupMode(mode_str) | |
set_user_mode(user_id, mode) | |
except ValueError: | |
if mode_str is not None: | |
logger.warning(f"Invalid mode: {mode_str}") | |
mode = get_user_mode(user_id) | |
else: | |
mode = get_user_mode(user_id) | |
self.finding_groups_map = load_or_rebuild_finding_groups(mode=mode) if mode else {} | |
if request.user.is_superuser or (global_role and global_role.role): | |
finding_groups = self.get_finding_groups(request) | |
elif products.exists(): | |
finding_groups = self.get_finding_groups(request, products) | |
paginated_finding_groups = paginate_queryset(finding_groups, request) | |
context = { | |
"filter_name": self.filter_name, | |
"mode": mode.value if mode else None, | |
"filtered": DynamicFindingGroupsFilter(request.GET), | |
"finding_groups": paginated_finding_groups, | |
} | |
add_breadcrumb(title="Dynamic Finding Group", top_level=not len(request.GET), request=request) | |
return render(request, self.get_template(), context) | |
class ListOpenDynamicFindingGroups(ListDynamicFindingGroups): | |
filter_name = "Open" | |
def get_finding_groups(self, request: HttpRequest, products=None): | |
""" | |
Retrieve dynamic finding groups containing at least one active finding. | |
Steps: | |
1. Retrieve finding IDs relevant for the user and the active subset. | |
2. Iterate over all finding groups in self.finding_groups_map. | |
3. For each group: | |
- Restrict the group's findings to those the user can see. | |
- Apply additional filters based on the request. | |
- Keep only groups with at least one active finding. | |
4. Append groups that pass all filters to the result list. | |
5. Order the resulting list according to the request via order_field and return. | |
""" | |
user_fids, active_fids = self.get_findings(products) | |
list_finding_group = [] | |
for finding_group in self.finding_groups_map.values(): | |
finding_group.finding_ids = set(finding_group.finding_ids) & user_fids | |
if self.filter_finding_group(finding_group, request): | |
if finding_group.finding_ids & active_fids: | |
list_finding_group.append(finding_group) | |
return self.order_field(request, list_finding_group) | |
class ListClosedDynamicFindingGroups(ListDynamicFindingGroups): | |
filter_name = "Closed" | |
def get_finding_groups(self, request: HttpRequest, products=None): | |
""" | |
Retrieve dynamic finding groups containing no active findings. | |
Steps: | |
1. Retrieve finding IDs relevant for the user and the active subset. | |
2. Iterate over all finding groups in self.finding_groups_map. | |
3. For each group: | |
- Restrict the group's findings to those the user can see. | |
- Apply additional filters based on the request. | |
- Keep only groups with no active findings. | |
4. Append groups that pass all filters to the result list. | |
5. Order the resulting list according to the request via order_field and return. | |
""" | |
user_fids, active_fids = self.get_findings(products) | |
list_finding_group = [] | |
for finding_group in self.finding_groups_map.values(): | |
finding_group.finding_ids = set(finding_group.finding_ids) & user_fids | |
if self.filter_finding_group(finding_group, request): | |
if not (finding_group.finding_ids & active_fids): | |
list_finding_group.append(finding_group) | |
return self.order_field(request, list_finding_group) | |
class DynamicFindingGroupsFindings(View): | |
def get_template(self): | |
return "dojo/finding_group_dynamic_findings.html" | |
def order_field(self, request: HttpRequest, finding_groups_findings_list): | |
order_field = request.GET.get("o") | |
if order_field: | |
reverse_order = order_field.startswith("-") | |
if reverse_order: | |
order_field = order_field[1:] | |
if order_field == "title": | |
finding_groups_findings_list = sorted(finding_groups_findings_list, key=lambda x: x.title, reverse=reverse_order) | |
elif order_field == "found_by": | |
finding_groups_findings_list = sorted(finding_groups_findings_list, key=lambda x: x.found_by.count(), reverse=reverse_order) | |
return finding_groups_findings_list | |
def filters(self, request: HttpRequest): | |
name_filter = request.GET.get("name", "").lower() | |
severity_filter = request.GET.getlist("severity") | |
vuln_id_from_tool_filter = request.GET.get("vuln_id_from_tool") | |
reporter_filter = request.GET.getlist("reporter") | |
active_filter = request.GET.get("active") | |
engagement_filter = request.GET.getlist("engagement") | |
product_filter = request.GET.getlist("product") | |
return name_filter, severity_filter, vuln_id_from_tool_filter, reporter_filter, active_filter, engagement_filter, product_filter | |
def filter_findings(self, findings, request: HttpRequest): | |
name_filter, severity_filter, vuln_id_from_tool_filter, reporter_filter, active_filter, engagement_filter, product_filter = self.filters(request) | |
filter_kwargs = {} | |
if name_filter: | |
filter_kwargs["title__icontains"] = name_filter | |
if severity_filter: | |
filter_kwargs["severity__in"] = severity_filter | |
if vuln_id_from_tool_filter: | |
filter_kwargs["vuln_id_from_tool__icontains"] = vuln_id_from_tool_filter | |
if reporter_filter: | |
filter_kwargs["reporter__id__in"] = reporter_filter | |
if active_filter: | |
filter_kwargs["active"] = (active_filter == "Yes") | |
if engagement_filter: | |
filter_kwargs["test__engagement__id__in"] = engagement_filter | |
if product_filter: | |
filter_kwargs["test__engagement__product__id__in"] = product_filter | |
return findings.filter(**filter_kwargs) | |
def get_findings(self, request: HttpRequest, products=None): | |
finding_group = self.finding_groups_map.get(self.finding_group_id) | |
# When the finding_group not exists | |
if not finding_group: | |
return None, [] | |
list_findings = finding_group.finding_ids | |
if products: | |
findings = Finding.objects.filter(id__in=list_findings, test__engagement__product__in=products) | |
else: | |
findings = Finding.objects.filter(id__in=list_findings) | |
findings = self.filter_findings(findings, request) | |
return finding_group.name, self.order_field(request, findings) | |
def get(self, request: HttpRequest, finding_group_id: int): | |
self.finding_group_id = finding_group_id | |
global_role = Global_Role.objects.filter(user=request.user).first() | |
products = get_authorized_products(Permissions.Product_View) | |
mode = get_user_mode(request.user.id) | |
self.finding_groups_map = load_or_rebuild_finding_groups(mode=mode) if mode else {} | |
if request.user.is_superuser or (global_role and global_role.role): | |
finding_group_name, findings = self.get_findings(request) | |
elif products.exists(): | |
finding_group_name, findings = self.get_findings(request, products) | |
else: | |
finding_group_name = None | |
paginated_findings = paginate_queryset(findings, request) | |
context = { | |
"finding_group": finding_group_name, | |
"filtered": DynamicFindingGroupsFindingsFilter(request.GET), | |
"finding_group_id": self.finding_group_id, | |
"findings": paginated_findings, | |
"bulk_edit_form": FindingBulkUpdateForm(request.GET), | |
} | |
add_breadcrumb(title="Dynamic Finding Group Findings", top_level=not len(request.GET), request=request) | |
return render(request, self.get_template(), context) |
We've notified @mtesauro.
All finding details can be found in the DryRun Security Dashboard.
934806b
to
89cc47a
Compare
* dynamic-group * own mode for each user
89cc47a
to
f8bc92c
Compare
This pull request has conflicts, please resolve those before we can evaluate the pull request. |
Conflicts have been resolved. A maintainer will review the pull request shortly. |
dojo/filters.py
Outdated
], | ||
label="Severity", | ||
) | ||
script_id = CharFilter(lookup_expr="icontains", label="Script ID") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename this to vuln_id_from_tool
to avoid confusion as the script_id field doesn't exist in the Finding model?
dojo/filters.py
Outdated
) | ||
script_id = CharFilter(lookup_expr="icontains", label="Script ID") | ||
reporter = ModelMultipleChoiceFilter(queryset=Dojo_User.objects.none(), label="Reporter") | ||
status = ChoiceFilter(choices=[("Yes", "Yes"), ("No", "No")], label="Active") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename this to active
or is_active
to avoid confusion as the status field doesn't exist in the Finding model?
dojo/finding_group/redis.py
Outdated
"Info": 1, | ||
} | ||
USER_MODES_KEY = "finding_groups_user_modes" | ||
SYSTEM_CHANGE = "finding_groups_last_finding_change" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coudl this have a more clear name like LAST_FINDING_CHANGE or LAST_FINDING_UPDATE
dojo/finding_group/redis.py
Outdated
return None | ||
|
||
def update_sev_sla(self, finding: Finding) -> None: | ||
if SEVERITY_ORDER[finding.severity] > SEVERITY_ORDER[self.severity]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this also work without another mapping by using the numerical_severity as we do elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will change to use Finding.get_number_severity
dojo/finding_group/redis.py
Outdated
def get_days_remaining(self) -> str: | ||
if self.sla_finding_id: | ||
finding = Finding.objects.filter(id=self.sla_finding_id).first() | ||
days_remaining = finding.sla_days_remaining() | ||
severity = finding.severity | ||
sla_start_date = finding.get_sla_start_date().strftime("%b %d, %Y") | ||
status = "age-green" | ||
status_text = f"Remediation for {severity.lower()} findings due in {days_remaining} days or less (started {sla_start_date})" | ||
if days_remaining and days_remaining < 0: | ||
status = "age-red" | ||
status_text = f"Overdue: Remediation for {severity.lower()} findings overdue {days_remaining} days (started {sla_start_date})" | ||
days_remaining = abs(days_remaining) | ||
elif any( | ||
Finding.objects.filter( | ||
id__in=self.finding_ids, | ||
active=True, | ||
), | ||
): | ||
status = "severity-Info" | ||
status_text = "No SLA set, but at least one finding is active" | ||
days_remaining = "No SLA" | ||
else: | ||
status = "age-blue" | ||
status_text = "No active finding" | ||
days_remaining = "Concluded" | ||
title = ( | ||
f'<a class="has-popover" data-toggle="tooltip" data-placement="bottom" title="" href="#" data-content="{status_text}">' | ||
f'<span class="label severity {status}">{days_remaining}</span></a>' | ||
) | ||
return mark_safe(title) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this could be made more clean by maybe putting it in a display tag to avoid mixing logic and display too much? I also wonder about the overlap there is with the Finding_Group model which also has logic to determine SLA data (and severity?) for a list of findings.
def get_redis_client() -> redis.Redis: | ||
host = getattr(settings, "REDIS_HOST", "redis") | ||
port = getattr(settings, "REDIS_PORT", 6379) | ||
return redis.Redis(host=host, port=port, decode_responses=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to reuse existing config in settings.dist.py?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn’t find anything definitive in the settings that confirms the default host and port for Redis. Because of that, I decided to handle it directly in the code. This way, if someone wants to change the host or port, they just need to define those two variables in the project settings. It works, but I’m not sure this is the best practice. Would you recommend handling it differently?
"Low": 2, | ||
"Info": 1, | ||
} | ||
USER_MODES_KEY = "finding_groups_user_modes" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was confused by the word "USER" in this name. If I look at the code it seems to be grouping mode of the group. Is there are more clear name for this constant? Or is it that the grouping mode is stored per user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's per user. Once a user selects a mode, the next time they access the tab it will default to their previously chosen mode, without affecting other users who may have a different mode set.
dojo/finding_group/views_dynamic.py
Outdated
def get_finding_groups(self, request: HttpRequest, products=None): | ||
user_fids, active_fids = self.get_findings(products) | ||
list_finding_group = [] | ||
for finding_group in self.finding_groups_map.values(): | ||
finding_group.finding_ids = set(finding_group.finding_ids) & user_fids | ||
if self.filter_finding_group(finding_group, request): | ||
if finding_group.finding_ids & active_fids: | ||
list_finding_group.append(finding_group) | ||
return self.order_field(request, list_finding_group) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain what's happening here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
It retrieves two sets of finding IDs:
user_fids
: all findings the user is allowed to see (possibly restricted by products).
active_fids
: the subset of findings that are currently active. -
It iterates through all finding groups in self.finding_groups_map.
-
For each group, it intersects (
&
) the group’s finding IDs with user_fids, so the group only keeps findings the user has access to. -
It applies additional filters through
filter_finding_group
, based on the request. -
If the group still has at least one active finding (
finding_group.finding_ids & active_fids
), it is added to the result list to show inListOpenDynamicFindingGroups
. -
Finally, it orders the groups using
order_field
and returns them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add some comments or an overall explanation of how the grouping and caching works?
Are you doing the filtering/grouping/ordering in python code?
Are you caching the results per user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm doing it in Python code, and I'm not caching the results per user. I use the finding_group created for the title mode, for example, stored in Redis, which is used by everyone who chose this mode, and I filter the findings and groups that the user can't see or wants to filter.
|
||
logger = logging.getLogger(__name__) | ||
|
||
DD_TEST = os.getenv("DD_TEST", "False").lower() == "true" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels weird to have this flag/variable. Can we avoid it or at least use the same pattern as our other env variables via settings.dist.py?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created this variable in docker-compose.override.unit_tests_cicd.yml
to avoid using Redis during CI, because Redis is not created and there is no variable to indicate when we are running tests in CI. Do you recommend something to avoid this variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, as said before I like the idea of being able to group findings in the UI.
I am still thinking if this is the best way to do it, especially since there's quite a lot of code just to be able to put the groups into Redis.
Before diving into it in detail, I've left some comments to take a look at.
I also tried out the feature locally, but I run into an error when grouping on vuln_id_from_tool:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/django/core/handlers/exception.py", line 55, in inner
response = get_response(request)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/django/core/handlers/base.py", line 197, in _get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/django/views/generic/base.py", line 105, in view
return self.dispatch(request, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/django/views/generic/base.py", line 144, in dispatch
return handler(request, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/dojo/finding_group/views_dynamic.py", line 115, in get
self.finding_groups_map = load_or_rebuild_finding_groups(mode=mode) if mode else {}
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/dojo/finding_group/redis.py", line 243, in load_or_rebuild_finding_groups
DynamicFindingGroups.add_finding(finding, mode)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/dojo/finding_group/redis.py", line 129, in add_finding
finding_group_id = base64.b64encode(finding_group_name.encode()).decode()
^^^^^^^^^^^^^^^^^^^^^^^^^
Exception Type: AttributeError at /dynamic_finding_group/open
Exception Value: 'NoneType' object has no attribute 'encode'
When grouping on title it just keeps spinning, maybe it's just very slow. This might make sense as title is a free form text field which will be hard to group by efficiently.
Similar for CVE which should be easier to group by, but also this requests doesn't seem to complete just spinning.
To be honest I couldn't follow all of the code and there seems to be alot of processing inside python instead of the database. Maybe that's why it's not performing at the moment?
My instance has ~170K findings.
I understand why this happened: I didn’t handle the case where vuln_id_from_tool is None.
does not filter it out.
i will fix that, sorry. |
When I look at your screenshots, the list of Dynamic Finding Groups looks a lot like pages in Defect Dojo where we display lists of entities with some statistics. For example the View Engagement page lists all the Tests in the Engagement and the number of findings, etc. This basically a way to view at findings that are grouped by a certain field, in this case This PR wants to group findings by other fields such as For example if I do an SQL query to group findings by
This gives me similar results to the screenshots in this PR: ![]() And that is including some joines to list the affected products, which would be a nice feature. I might be forgetting something, but my initial approach would be to implement the dynamic grouping in the same way. What are your thoughts on this @LeoOMaia Also did another test today to look at the static finding group pages, but running into this error:
|
I think your idea would be much better and would make it possible to remove the Redis storage. Regarding the message you received, was it an error or just a warning? I checked, and indeed I didn’t define a default ordering for when the user doesn’t request any sorting. I’m opening a PR to add a default ordering. You’re right that grouping by title and vuln_id_from_tool would work correctly with the query you shared, and I agree this is a good approach for those fields. For CVE, however, the logic would need to be different. A single finding can have multiple CVEs, so the idea was to create groups for each CVE, which means the same finding could belong to multiple groups. Because of that, the query would have to be adapted, but I believe it’s still possible to achieve with a modified version of your example. I’m currently looking into how I can adjust my implementation to better align with the query-based approach you suggested. |
Thank you for your comments! I’ll convert this PR into a draft so I can carefully review my code and remove Redis following your suggestion. It might take a little time, but once I’m done, I’ll reopen it. |
This is acording to #12684 and #12814.
I developed a new tab to group findings in accord in the mode (Vuln ID from tool/Title/CVE) the user wants to group dynamically.
Advantages of using dynamic grouping instead of static groups:
Disadvantages:
Below is how it will be shown for the user to choose the group:

The mode selected by the user (default is empty and displays nothing):

Below is an example with the static and dynamic groups tab:
And another example where a user has 1 and 2 products, and another has only 2 products where all the findings are mitigated:
Below is the findings page in a dynamic group:
