Skip to content

Conversation

LeoOMaia
Copy link
Contributor

@LeoOMaia LeoOMaia commented Aug 17, 2025

This is acording to #12684 and #12814.
I developed a new tab to group findings in accord in the mode (Vuln ID from tool/Title/CVE) the user wants to group dynamically.

Advantages of using dynamic grouping instead of static groups:

  • Flexibility for the user to choose how the groups should be organized.
  • Ability to group all existing findings as well as new ones generated by the system, without requiring imports.
  • Support for grouping findings across multiple products simultaneously. The backend automatically filters which findings should be considered for each user. This means one user may already have all findings in a group analyzed, while for another the group may still be under review and remain open.
  • Independent from the PostgreSQL database, making it easy to extend with new features for findings without affecting the rest of the system.

Disadvantages:

  • Requires Redis to store groupings.
  • Any changes to findings trigger a reconstruction of the groups when a user accesses the tab. (Note: if findings are changed but the user does not open the tab, the groups stored in Redis will not be created or updated. More details can be found in the README located in the directory where the views for dynamic and static groups are defined.)

Below is how it will be shown for the user to choose the group:
Screenshot from 2025-08-17 16-50-40

The mode selected by the user (default is empty and displays nothing):
Screenshot from 2025-08-17 16-53-55

Below is an example with the static and dynamic groups tab:

  • Static (contains more groups):
Screenshot from 2025-08-17 16-31-28
  • Dynamic:
Screenshot from 2025-08-17 16-33-27

And another example where a user has 1 and 2 products, and another has only 2 products where all the findings are mitigated:

  • User1:
Screenshot from 2025-08-17 16-38-07
  • User2:
Screenshot from 2025-08-17 16-45-06

Below is the findings page in a dynamic group:
Screenshot from 2025-08-17 17-01-50

Copy link

dryrunsecurity bot commented Aug 17, 2025

DryRun Security

🔴 Risk threshold exceeded.

This pull request contains multiple security findings, including potential Denial of Service vulnerabilities related to Redis cache rebuilding and in-memory sorting, an authorization bypass issue in dynamic finding group filters, and several sensitive file edits that require review and configuration in .dryrunsecurity.yaml.

🔴 Configured Codepaths Edit in dojo/filters.py
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/finding/helper.py
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/finding_group/redis.py
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/finding_group/urls.py
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/finding_group/views_dynamic.py
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/models.py
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/templates/base.html
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/templates/dojo/finding_group_dynamic_findings.html
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/templates/dojo/finding_group_dynamic_findings_list_snippet.html
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/templates/dojo/finding_groups_dynamic_list.html
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/templates/dojo/finding_groups_dynamic_list_snippet.html
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/filters.py
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/finding/helper.py
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/models.py
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/filters.py
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/finding_group/redis.py
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/finding_group/views_dynamic.py
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/templates/dojo/finding_group_dynamic_findings_list_snippet.html
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
🔴 Configured Codepaths Edit in dojo/templates/dojo/finding_groups_dynamic_list_snippet.html
Vulnerability Configured Codepaths Edit
Description Sensitive edits detected for this file. Sensitive file paths and allowed authors can be configured in .dryrunsecurity.yaml.
Potential Denial of Service (DoS) via Resource Exhaustion in dojo/models.py
Vulnerability Potential Denial of Service (DoS) via Resource Exhaustion
Description The patch introduces a critical performance bottleneck and potential Denial of Service (DoS) vulnerability. Every time a finding is created, updated, or deleted, a blocking Redis SET operation is performed to update a timestamp (LAST_FINDING_CHANGE). This high volume of synchronous writes, especially during bulk operations like scanner imports, can overwhelm the Redis server and/or the application's connection pool. This timestamp also acts as a cache invalidation mechanism for dynamic finding groups. If LAST_FINDING_CHANGE is newer than LAST_FINDING_UPDATE, the entire dynamic finding group cache is rebuilt by iterating over all findings in the database. This resource-intensive full cache rebuild will be triggered frequently, leading to degraded performance and potential Denial of Service for users accessing dynamic finding groups.

else:
logger.debug("no options selected that require finding post processing")
from dojo.finding_group.redis import DynamicFindingGroups
DynamicFindingGroups.set_last_finding_change()
def get_absolute_url(self):
from django.urls import reverse
return reverse("view_finding", args=[str(self.id)])

Denial of Service via Unbounded Cache Rebuild in dojo/finding_group/redis.py
Vulnerability Denial of Service via Unbounded Cache Rebuild
Description The load_or_rebuild_finding_groups function iterates over all Finding objects in the database to rebuild the Redis cache. This rebuild is triggered whenever a finding or vulnerability ID is saved or deleted, causing a timestamp mismatch. In a large system, this operation is extremely resource-intensive (CPU, memory, database, Redis I/O). The absence of a locking mechanism allows multiple concurrent rebuilds, leading to a Denial of Service.

import base64
import json
import logging
import os
from dataclasses import asdict, dataclass, field
from datetime import datetime
from enum import StrEnum
from functools import lru_cache
from typing import Self
import redis
from django.conf import settings
from django.utils.functional import cached_property
from dojo.models import Finding
logger = logging.getLogger(__name__)
DD_TEST = os.getenv("DD_TEST", "False").lower() == "true"
USER_MODES_KEY = "finding_groups_user_modes"
LAST_FINDING_CHANGE = "finding_groups_last_finding_change"
LAST_FINDING_UPDATE = "finding_groups_last_update"
class GroupMode(StrEnum):
VULN_ID_FROM_TOOL = "vuln_id_from_tool"
TITLE = "title"
CVE = "cve"
@dataclass
class DynamicFindingGroups:
finding_group_id: str
name: str = ""
severity: str = "Info"
main_finding_id: int | None = None
finding_ids: set[int] = field(default_factory=set)
def to_dict(self) -> dict:
data = asdict(self)
data["finding_ids"] = list(data["finding_ids"])
return data
@staticmethod
def from_dict(data: dict) -> Self:
data["finding_ids"] = set(data.get("finding_ids", []))
return DynamicFindingGroups(**data)
@staticmethod
def load_from_id(finding_group_id: str, fg_key: str) -> Self | None:
redis_client = get_redis_client()
finding_group_data = redis_client.hget(fg_key, finding_group_id)
if finding_group_data:
return DynamicFindingGroups.from_dict(json.loads(finding_group_data))
return None
def update_sev_sla(self, finding: Finding) -> None:
if Finding.get_number_severity(finding.severity) > Finding.get_number_severity(self.severity):
self.severity = finding.severity
self.main_finding_id = finding.id
def add(self, finding: Finding) -> None:
self.update_sev_sla(finding)
self.finding_ids.add(finding.id)
# This method is used when we filter findings in a finding group
def reconfig_finding_group(self) -> None:
self.severity = "Info"
findings = Finding.objects.filter(id__in=self.finding_ids)
for finding in findings:
self.update_sev_sla(finding)
@staticmethod
def get_group_names(finding: Finding, mode: GroupMode) -> list[str] | None:
if mode == GroupMode.VULN_ID_FROM_TOOL:
if finding.vuln_id_from_tool:
return [finding.vuln_id_from_tool]
if mode == GroupMode.TITLE:
if finding.title:
return [finding.title]
if mode == GroupMode.CVE:
cves = [
cve for cve in finding.vulnerability_id_set.values_list("vulnerability_id", flat=True)
if cve
]
if cves:
return cves
return None
@staticmethod
def get_fg_key(mode: GroupMode) -> str:
return f"finding_groups_{mode.value}"
@staticmethod
def get_id_map_key(mode: GroupMode) -> str:
return f"finding_groups_id_to_finding_group_{mode.value}"
@staticmethod
def set_last_finding_change() -> None:
if DD_TEST:
logger.info("Redis is not used in test environment, skipping.")
return
redis_client = get_redis_client()
redis_client.set(LAST_FINDING_CHANGE, datetime.now().isoformat())
@staticmethod
def set_last_update(mode: GroupMode, timestamp: datetime | None = None) -> None:
if timestamp is None:
return
redis_client = get_redis_client()
redis_client.hset(LAST_FINDING_UPDATE, mode.value, timestamp.isoformat())
@staticmethod
def add_finding(finding: Finding, mode: GroupMode) -> None:
finding_groups = DynamicFindingGroups.get_group_names(finding, mode)
if not finding_groups:
return
redis_client = get_redis_client()
for finding_group_name in finding_groups:
finding_group_id = base64.b64encode(finding_group_name.encode()).decode()
fg_key = DynamicFindingGroups.get_fg_key(mode)
id_map_key = DynamicFindingGroups.get_id_map_key(mode)
finding_group = DynamicFindingGroups.load_from_id(finding_group_id, fg_key)
if not finding_group:
finding_group = DynamicFindingGroups(
finding_group_id=finding_group_id,
name=finding_group_name,
)
if finding.id not in finding_group.finding_ids:
finding_group.add(finding)
redis_client.hset(fg_key, finding_group_id, json.dumps(finding_group.to_dict()))
group_ids_raw = redis_client.hget(id_map_key, finding.id)
group_ids = json.loads(group_ids_raw) if group_ids_raw else []
if finding_group_id not in group_ids:
group_ids.append(finding_group_id)
redis_client.hset(id_map_key, finding.id, json.dumps(group_ids))
@cached_property
def sla_days_remaining_internal(self):
findings = Finding.objects.filter(id__in=self.finding_ids, active=True)
if not findings:
return None
return min([find.sla_days_remaining() for find in findings if find.sla_days_remaining()], default=None)
@property
def sla_days_remaining(self) -> int | None:
return self.sla_days_remaining_internal
@lru_cache(maxsize=1)
def get_redis_client() -> redis.Redis:
host = getattr(settings, "REDIS_HOST", "redis")
port = getattr(settings, "REDIS_PORT", 6379)
return redis.Redis(host=host, port=port, decode_responses=True)
def get_user_mode(user_id: int) -> GroupMode | None:
redis_client = get_redis_client()
value = redis_client.hget(USER_MODES_KEY, str(user_id))
if value and value not in [m.value for m in GroupMode]:
logger.warning(f"Invalid group mode '{value}' found in Redis for user {user_id}, resetting to None.")
redis_client.hdel(USER_MODES_KEY, str(user_id))
return None
return GroupMode(value) if value else None
def set_user_mode(user_id: int, mode: GroupMode) -> None:
redis_client = get_redis_client()
redis_client.hset(USER_MODES_KEY, str(user_id), mode.value)
logger.info(f"User {user_id} dynamic finding groups mode set to {mode.value}")
def load_or_rebuild_finding_groups(mode: GroupMode) -> dict[str, DynamicFindingGroups]:
redis_client = get_redis_client()
fg_key = DynamicFindingGroups.get_fg_key(mode)
id_map_key = DynamicFindingGroups.get_id_map_key(mode)
if not redis_client.exists(LAST_FINDING_CHANGE):
DynamicFindingGroups.set_last_finding_change()
last_finding_change_raw = redis_client.get(LAST_FINDING_CHANGE)
try:
last_finding_change_time = datetime.fromisoformat(last_finding_change_raw)
except ValueError:
logger.warning(f"Invalid datetime format in Redis for {LAST_FINDING_CHANGE}: {last_finding_change_raw}, resetting last finding change.")
DynamicFindingGroups.set_last_finding_change()
last_finding_change_raw = redis_client.get(LAST_FINDING_CHANGE)
last_finding_change_time = datetime.fromisoformat(last_finding_change_raw) if last_finding_change_raw else None
try:
last_groups_update_time = redis_client.hget(LAST_FINDING_UPDATE, mode.value)
last_groups_update_time = datetime.fromisoformat(last_groups_update_time) if last_groups_update_time else None
except ValueError:
logger.warning(f"Invalid datetime format in Redis for {LAST_FINDING_UPDATE}: {last_groups_update_time}")
last_groups_update_time = None
# Check if finding_groups and id_map exist in Redis
# Check if last update is the same as last finding change
# If not, rebuild them
if (
not redis_client.exists(fg_key)
or not redis_client.exists(id_map_key)
or last_groups_update_time != last_finding_change_time
):
if not last_finding_change_time:
logger.warning("Last finding change is not set, setting it to now.")
elif last_groups_update_time and last_finding_change_time < last_groups_update_time:
logger.warning("Last finding change is older than last update, they should be equal or last finding change should be newer.")
redis_client.delete(fg_key, id_map_key)
for finding in Finding.objects.all():
DynamicFindingGroups.add_finding(finding, mode)
DynamicFindingGroups.set_last_update(mode, last_finding_change_time)
return _load_finding_groups_from_redis(fg_key, redis_client)
def _load_finding_groups_from_redis(fg_key: str, redis_client: redis.Redis) -> dict[str, DynamicFindingGroups]:
finding_groups_data = redis_client.hgetall(fg_key)
if finding_groups_data:
return {
key: DynamicFindingGroups.from_dict(json.loads(value))
for key, value in finding_groups_data.items()
}
return {}

Authorization Bypass (IDOR) in dojo/filters.py
Vulnerability Authorization Bypass (IDOR)
Description The DynamicFindingGroupsFilter and DynamicFindingGroupsFindingsFilter in dojo/filters.py are instantiated with request.GET, allowing a user to supply a pid (product ID) via a GET parameter. When pid is present, the set_related_object_fields method directly uses this pid in Engagement.objects.filter(product_id=self.pid) without performing an authorization check to ensure the user has permission to access the specified product. This bypasses the intended authorization flow, which is only applied when pid is not provided.

self.form.fields["engagement"].queryset = get_authorized_engagements(Permissions.Engagement_View)
class DynamicFindingGroupsFilter(FilterSet):
name = CharFilter(lookup_expr="icontains", label="Name")
severity = ChoiceFilter(
choices=[
("Low", "Low"),
("Medium", "Medium"),
("High", "High"),
("Critical", "Critical"),
],
label="Min Severity",
)
engagement = ModelMultipleChoiceFilter(queryset=Engagement.objects.none(), label="Engagement")
product = ModelMultipleChoiceFilter(queryset=Product.objects.none(), label="Product")
class Meta:
model = Finding
fields = ["name", "severity", "engagement", "product"]
def __init__(self, *args, **kwargs):
self.user = kwargs.pop("user", None)
self.pid = kwargs.pop("pid", None)
super().__init__(*args, **kwargs)
self.set_related_object_fields()
def set_related_object_fields(self):
if self.pid is not None:
self.form.fields["engagement"].queryset = Engagement.objects.filter(product_id=self.pid)
if "product" in self.form.fields:
del self.form.fields["product"]
else:
self.form.fields["product"].queryset = get_authorized_products(Permissions.Product_View)
self.form.fields["engagement"].queryset = get_authorized_engagements(Permissions.Engagement_View)
class DynamicFindingGroupsFindingsFilter(FilterSet):
name = CharFilter(lookup_expr="icontains", label="Name")
severity = MultipleChoiceFilter(
choices=[
("Low", "Low"),
("Medium", "Medium"),
("High", "High"),
("Critical", "Critical"),
],
label="Severity",
)
vuln_id_from_tool = CharFilter(lookup_expr="icontains", label="Vulnerability Id From Tool")
reporter = ModelMultipleChoiceFilter(queryset=Dojo_User.objects.none(), label="Reporter")
active = ChoiceFilter(choices=[("Yes", "Yes"), ("No", "No")], label="Active")
engagement = ModelMultipleChoiceFilter(queryset=Engagement.objects.none(), label="Engagement")
product = ModelMultipleChoiceFilter(queryset=Product.objects.none(), label="Product")
class Meta:
model = Finding
fields = ["name", "severity", "vuln_id_from_tool", "reporter", "active", "engagement", "product"]
def __init__(self, *args, **kwargs):
self.user = kwargs.pop("user", None)
self.pid = kwargs.pop("pid", None)
super().__init__(*args, **kwargs)
self.set_related_object_fields()
def set_related_object_fields(self):
if self.pid is not None:
self.form.fields["engagement"].queryset = Engagement.objects.filter(product_id=self.pid)
if "product" in self.form.fields:
del self.form.fields["product"]
else:
self.form.fields["product"].queryset = get_authorized_products(Permissions.Product_View)
self.form.fields["engagement"].queryset = get_authorized_engagements(Permissions.Engagement_View)
self.form.fields["reporter"].queryset = get_authorized_users(Permissions.Finding_View)
class AcceptedFindingFilter(FindingFilter):
risk_acceptance__created__date = DateRangeFilter(label="Acceptance Date")
risk_acceptance__owner = ModelMultipleChoiceFilter(

Denial of Service via In-Memory Sorting in dojo/finding_group/views_dynamic.py
Vulnerability Denial of Service via In-Memory Sorting
Description The ListDynamicFindingGroups and its subclasses load all relevant dynamic finding groups into memory, filter them, and then sort the entire collection in memory using Python's sorted() function. Pagination is applied only after this in-memory sorting is complete. For instances with a large number of findings and dynamic groups, especially when accessed by users with broad permissions (e.g., superusers), this can lead to significant memory consumption and high CPU usage, potentially causing a Denial of Service.

import logging
from django.core.paginator import Paginator
from django.http import HttpRequest
from django.shortcuts import render
from django.views import View
from dojo.authorization.roles_permissions import Permissions
from dojo.filters import DynamicFindingGroupsFilter, DynamicFindingGroupsFindingsFilter
from dojo.finding_group.redis import (
GroupMode,
get_user_mode,
load_or_rebuild_finding_groups,
set_user_mode,
)
from dojo.forms import FindingBulkUpdateForm
from dojo.models import Finding, Global_Role
from dojo.product.queries import get_authorized_products
from dojo.utils import add_breadcrumb
logger = logging.getLogger(__name__)
def paginate_queryset(queryset, request: HttpRequest):
page_size = request.GET.get("page_size", 25) # Default is 25
paginator = Paginator(queryset, page_size)
page_number = request.GET.get("page")
return paginator.get_page(page_number)
class ListDynamicFindingGroups(View):
filter_name = "All"
def get_template(self):
return "dojo/finding_groups_dynamic_list.html"
def order_field(self, request: HttpRequest, finding_groups_findings_list):
order_field = request.GET.get("o")
if order_field:
reverse_order = order_field.startswith("-")
if reverse_order:
order_field = order_field[1:]
if order_field == "name":
finding_groups_findings_list = sorted(finding_groups_findings_list, key=lambda x: x.name, reverse=reverse_order)
elif order_field == "findings_count":
finding_groups_findings_list = sorted(finding_groups_findings_list, key=lambda x: len(x.finding_ids), reverse=reverse_order)
return finding_groups_findings_list
def filters(self, request: HttpRequest):
name_filter = request.GET.get("name", "").lower()
min_severity_filter = request.GET.get("severity")
engagement_filter = request.GET.getlist("engagement")
product_filter = request.GET.getlist("product")
return name_filter, min_severity_filter, engagement_filter, product_filter
def filter_finding_group(self, finding_group, request: HttpRequest):
name_filter, min_severity_filter, engagement_filter, product_filter = self.filters(request)
add_finding_group = True
if product_filter:
finding_group.finding_ids = set(finding_group.finding_ids) & set(
Finding.objects.filter(test__engagement__product__id__in=product_filter).values_list("id", flat=True),
)
if engagement_filter:
finding_group.finding_ids = set(finding_group.finding_ids) & set(
Finding.objects.filter(test__engagement__id__in=engagement_filter).values_list("id", flat=True),
)
finding_group.reconfig_finding_group()
if name_filter and name_filter not in finding_group.name.lower():
add_finding_group = False
if min_severity_filter and Finding.get_number_severity(finding_group.severity) < Finding.get_number_severity(min_severity_filter):
add_finding_group = False
if not finding_group.finding_ids:
add_finding_group = False
return add_finding_group
def get_findings(self, products):
finding_group_fids = {
fid for finding_group in self.finding_groups_map.values() for fid in finding_group.finding_ids
}
filters = {"id__in": finding_group_fids}
if products:
filters["test__engagement__product__in"] = products
user_findings_qs = Finding.objects.filter(**filters)
user_fids = set(user_findings_qs.values_list("id", flat=True))
active_fids = set(
user_findings_qs.filter(active=True).values_list("id", flat=True),
)
return user_fids, active_fids
def get_finding_groups(self, request: HttpRequest, products=None):
"""
Retrieve all dynamic finding groups for the current user.
Steps:
1. Retrieve finding IDs relevant for the user (optionally filtered by products).
2. Iterate over all finding groups in self.finding_groups_map.
3. For each group:
- Restrict the group's findings to those the user can see.
- Apply additional filters based on the request.
- No additional filtering for active findings.
4. Append groups that pass all filters to the result list.
5. Order the resulting list according to the request via order_field and return.
"""
user_fids, _ = self.get_findings(products)
list_finding_group = []
for finding_group in self.finding_groups_map.values():
finding_group.finding_ids = set(finding_group.finding_ids) & user_fids
if self.filter_finding_group(finding_group, request):
list_finding_group.append(finding_group)
return self.order_field(request, list_finding_group)
def get(self, request: HttpRequest):
global_role = Global_Role.objects.filter(user=request.user).first()
products = get_authorized_products(Permissions.Product_View)
mode_str = request.GET.get("mode", None)
user_id = request.user.id
if mode_str:
try:
mode = GroupMode(mode_str)
set_user_mode(user_id, mode)
except ValueError:
if mode_str is not None:
logger.warning(f"Invalid mode: {mode_str}")
mode = get_user_mode(user_id)
else:
mode = get_user_mode(user_id)
self.finding_groups_map = load_or_rebuild_finding_groups(mode=mode) if mode else {}
if request.user.is_superuser or (global_role and global_role.role):
finding_groups = self.get_finding_groups(request)
elif products.exists():
finding_groups = self.get_finding_groups(request, products)
paginated_finding_groups = paginate_queryset(finding_groups, request)
context = {
"filter_name": self.filter_name,
"mode": mode.value if mode else None,
"filtered": DynamicFindingGroupsFilter(request.GET),
"finding_groups": paginated_finding_groups,
}
add_breadcrumb(title="Dynamic Finding Group", top_level=not len(request.GET), request=request)
return render(request, self.get_template(), context)
class ListOpenDynamicFindingGroups(ListDynamicFindingGroups):
filter_name = "Open"
def get_finding_groups(self, request: HttpRequest, products=None):
"""
Retrieve dynamic finding groups containing at least one active finding.
Steps:
1. Retrieve finding IDs relevant for the user and the active subset.
2. Iterate over all finding groups in self.finding_groups_map.
3. For each group:
- Restrict the group's findings to those the user can see.
- Apply additional filters based on the request.
- Keep only groups with at least one active finding.
4. Append groups that pass all filters to the result list.
5. Order the resulting list according to the request via order_field and return.
"""
user_fids, active_fids = self.get_findings(products)
list_finding_group = []
for finding_group in self.finding_groups_map.values():
finding_group.finding_ids = set(finding_group.finding_ids) & user_fids
if self.filter_finding_group(finding_group, request):
if finding_group.finding_ids & active_fids:
list_finding_group.append(finding_group)
return self.order_field(request, list_finding_group)
class ListClosedDynamicFindingGroups(ListDynamicFindingGroups):
filter_name = "Closed"
def get_finding_groups(self, request: HttpRequest, products=None):
"""
Retrieve dynamic finding groups containing no active findings.
Steps:
1. Retrieve finding IDs relevant for the user and the active subset.
2. Iterate over all finding groups in self.finding_groups_map.
3. For each group:
- Restrict the group's findings to those the user can see.
- Apply additional filters based on the request.
- Keep only groups with no active findings.
4. Append groups that pass all filters to the result list.
5. Order the resulting list according to the request via order_field and return.
"""
user_fids, active_fids = self.get_findings(products)
list_finding_group = []
for finding_group in self.finding_groups_map.values():
finding_group.finding_ids = set(finding_group.finding_ids) & user_fids
if self.filter_finding_group(finding_group, request):
if not (finding_group.finding_ids & active_fids):
list_finding_group.append(finding_group)
return self.order_field(request, list_finding_group)
class DynamicFindingGroupsFindings(View):
def get_template(self):
return "dojo/finding_group_dynamic_findings.html"
def order_field(self, request: HttpRequest, finding_groups_findings_list):
order_field = request.GET.get("o")
if order_field:
reverse_order = order_field.startswith("-")
if reverse_order:
order_field = order_field[1:]
if order_field == "title":
finding_groups_findings_list = sorted(finding_groups_findings_list, key=lambda x: x.title, reverse=reverse_order)
elif order_field == "found_by":
finding_groups_findings_list = sorted(finding_groups_findings_list, key=lambda x: x.found_by.count(), reverse=reverse_order)
return finding_groups_findings_list
def filters(self, request: HttpRequest):
name_filter = request.GET.get("name", "").lower()
severity_filter = request.GET.getlist("severity")
vuln_id_from_tool_filter = request.GET.get("vuln_id_from_tool")
reporter_filter = request.GET.getlist("reporter")
active_filter = request.GET.get("active")
engagement_filter = request.GET.getlist("engagement")
product_filter = request.GET.getlist("product")
return name_filter, severity_filter, vuln_id_from_tool_filter, reporter_filter, active_filter, engagement_filter, product_filter
def filter_findings(self, findings, request: HttpRequest):
name_filter, severity_filter, vuln_id_from_tool_filter, reporter_filter, active_filter, engagement_filter, product_filter = self.filters(request)
filter_kwargs = {}
if name_filter:
filter_kwargs["title__icontains"] = name_filter
if severity_filter:
filter_kwargs["severity__in"] = severity_filter
if vuln_id_from_tool_filter:
filter_kwargs["vuln_id_from_tool__icontains"] = vuln_id_from_tool_filter
if reporter_filter:
filter_kwargs["reporter__id__in"] = reporter_filter
if active_filter:
filter_kwargs["active"] = (active_filter == "Yes")
if engagement_filter:
filter_kwargs["test__engagement__id__in"] = engagement_filter
if product_filter:
filter_kwargs["test__engagement__product__id__in"] = product_filter
return findings.filter(**filter_kwargs)
def get_findings(self, request: HttpRequest, products=None):
finding_group = self.finding_groups_map.get(self.finding_group_id)
# When the finding_group not exists
if not finding_group:
return None, []
list_findings = finding_group.finding_ids
if products:
findings = Finding.objects.filter(id__in=list_findings, test__engagement__product__in=products)
else:
findings = Finding.objects.filter(id__in=list_findings)
findings = self.filter_findings(findings, request)
return finding_group.name, self.order_field(request, findings)
def get(self, request: HttpRequest, finding_group_id: int):
self.finding_group_id = finding_group_id
global_role = Global_Role.objects.filter(user=request.user).first()
products = get_authorized_products(Permissions.Product_View)
mode = get_user_mode(request.user.id)
self.finding_groups_map = load_or_rebuild_finding_groups(mode=mode) if mode else {}
if request.user.is_superuser or (global_role and global_role.role):
finding_group_name, findings = self.get_findings(request)
elif products.exists():
finding_group_name, findings = self.get_findings(request, products)
else:
finding_group_name = None
paginated_findings = paginate_queryset(findings, request)
context = {
"finding_group": finding_group_name,
"filtered": DynamicFindingGroupsFindingsFilter(request.GET),
"finding_group_id": self.finding_group_id,
"findings": paginated_findings,
"bulk_edit_form": FindingBulkUpdateForm(request.GET),
}
add_breadcrumb(title="Dynamic Finding Group Findings", top_level=not len(request.GET), request=request)
return render(request, self.get_template(), context)

We've notified @mtesauro.


All finding details can be found in the DryRun Security Dashboard.

@LeoOMaia LeoOMaia force-pushed the dynamic-group branch 7 times, most recently from 934806b to 89cc47a Compare August 18, 2025 03:56
* dynamic-group

* own mode for each user
Copy link
Contributor

This pull request has conflicts, please resolve those before we can evaluate the pull request.

Copy link
Contributor

Conflicts have been resolved. A maintainer will review the pull request shortly.

dojo/filters.py Outdated
],
label="Severity",
)
script_id = CharFilter(lookup_expr="icontains", label="Script ID")
Copy link
Member

@valentijnscholten valentijnscholten Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to vuln_id_from_tool to avoid confusion as the script_id field doesn't exist in the Finding model?

dojo/filters.py Outdated
)
script_id = CharFilter(lookup_expr="icontains", label="Script ID")
reporter = ModelMultipleChoiceFilter(queryset=Dojo_User.objects.none(), label="Reporter")
status = ChoiceFilter(choices=[("Yes", "Yes"), ("No", "No")], label="Active")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to active or is_active to avoid confusion as the status field doesn't exist in the Finding model?

"Info": 1,
}
USER_MODES_KEY = "finding_groups_user_modes"
SYSTEM_CHANGE = "finding_groups_last_finding_change"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coudl this have a more clear name like LAST_FINDING_CHANGE or LAST_FINDING_UPDATE

return None

def update_sev_sla(self, finding: Finding) -> None:
if SEVERITY_ORDER[finding.severity] > SEVERITY_ORDER[self.severity]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this also work without another mapping by using the numerical_severity as we do elsewhere?

Copy link
Contributor Author

@LeoOMaia LeoOMaia Aug 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change to use Finding.get_number_severity

Comment on lines 151 to 180
def get_days_remaining(self) -> str:
if self.sla_finding_id:
finding = Finding.objects.filter(id=self.sla_finding_id).first()
days_remaining = finding.sla_days_remaining()
severity = finding.severity
sla_start_date = finding.get_sla_start_date().strftime("%b %d, %Y")
status = "age-green"
status_text = f"Remediation for {severity.lower()} findings due in {days_remaining} days or less (started {sla_start_date})"
if days_remaining and days_remaining < 0:
status = "age-red"
status_text = f"Overdue: Remediation for {severity.lower()} findings overdue {days_remaining} days (started {sla_start_date})"
days_remaining = abs(days_remaining)
elif any(
Finding.objects.filter(
id__in=self.finding_ids,
active=True,
),
):
status = "severity-Info"
status_text = "No SLA set, but at least one finding is active"
days_remaining = "No SLA"
else:
status = "age-blue"
status_text = "No active finding"
days_remaining = "Concluded"
title = (
f'<a class="has-popover" data-toggle="tooltip" data-placement="bottom" title="" href="#" data-content="{status_text}">'
f'<span class="label severity {status}">{days_remaining}</span></a>'
)
return mark_safe(title)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this could be made more clean by maybe putting it in a display tag to avoid mixing logic and display too much? I also wonder about the overlap there is with the Finding_Group model which also has logic to determine SLA data (and severity?) for a list of findings.

Comment on lines +184 to +187
def get_redis_client() -> redis.Redis:
host = getattr(settings, "REDIS_HOST", "redis")
port = getattr(settings, "REDIS_PORT", 6379)
return redis.Redis(host=host, port=port, decode_responses=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to reuse existing config in settings.dist.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn’t find anything definitive in the settings that confirms the default host and port for Redis. Because of that, I decided to handle it directly in the code. This way, if someone wants to change the host or port, they just need to define those two variables in the project settings. It works, but I’m not sure this is the best practice. Would you recommend handling it differently?

"Low": 2,
"Info": 1,
}
USER_MODES_KEY = "finding_groups_user_modes"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was confused by the word "USER" in this name. If I look at the code it seems to be grouping mode of the group. Is there are more clear name for this constant? Or is it that the grouping mode is stored per user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's per user. Once a user selects a mode, the next time they access the tab it will default to their previously chosen mode, without affecting other users who may have a different mode set.

Comment on lines 136 to 145
def get_finding_groups(self, request: HttpRequest, products=None):
user_fids, active_fids = self.get_findings(products)
list_finding_group = []
for finding_group in self.finding_groups_map.values():
finding_group.finding_ids = set(finding_group.finding_ids) & user_fids
if self.filter_finding_group(finding_group, request):
if finding_group.finding_ids & active_fids:
list_finding_group.append(finding_group)
return self.order_field(request, list_finding_group)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what's happening here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It retrieves two sets of finding IDs:
    user_fids: all findings the user is allowed to see (possibly restricted by products).
    active_fids: the subset of findings that are currently active.

  2. It iterates through all finding groups in self.finding_groups_map.

  3. For each group, it intersects (&) the group’s finding IDs with user_fids, so the group only keeps findings the user has access to.

  4. It applies additional filters through filter_finding_group, based on the request.

  5. If the group still has at least one active finding (finding_group.finding_ids & active_fids), it is added to the result list to show in ListOpenDynamicFindingGroups.

  6. Finally, it orders the groups using order_field and returns them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments or an overall explanation of how the grouping and caching works?
Are you doing the filtering/grouping/ordering in python code?
Are you caching the results per user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm doing it in Python code, and I'm not caching the results per user. I use the finding_group created for the title mode, for example, stored in Redis, which is used by everyone who chose this mode, and I filter the findings and groups that the user can't see or wants to filter.


logger = logging.getLogger(__name__)

DD_TEST = os.getenv("DD_TEST", "False").lower() == "true"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels weird to have this flag/variable. Can we avoid it or at least use the same pattern as our other env variables via settings.dist.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created this variable in docker-compose.override.unit_tests_cicd.yml to avoid using Redis during CI, because Redis is not created and there is no variable to indicate when we are running tests in CI. Do you recommend something to avoid this variable?

@valentijnscholten valentijnscholten changed the title Dynamic group Tab Dynamic groups Aug 28, 2025
Copy link
Member

@valentijnscholten valentijnscholten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, as said before I like the idea of being able to group findings in the UI.
I am still thinking if this is the best way to do it, especially since there's quite a lot of code just to be able to put the groups into Redis.

Before diving into it in detail, I've left some comments to take a look at.

I also tried out the feature locally, but I run into an error when grouping on vuln_id_from_tool:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/django/core/handlers/exception.py", line 55, in inner
    response = get_response(request)
               ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/django/core/handlers/base.py", line 197, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/django/views/generic/base.py", line 105, in view
    return self.dispatch(request, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/django/views/generic/base.py", line 144, in dispatch
    return handler(request, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/dojo/finding_group/views_dynamic.py", line 115, in get
    self.finding_groups_map = load_or_rebuild_finding_groups(mode=mode) if mode else {}
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/dojo/finding_group/redis.py", line 243, in load_or_rebuild_finding_groups
    DynamicFindingGroups.add_finding(finding, mode)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/dojo/finding_group/redis.py", line 129, in add_finding
    finding_group_id = base64.b64encode(finding_group_name.encode()).decode()
                                        ^^^^^^^^^^^^^^^^^^^^^^^^^

Exception Type: AttributeError at /dynamic_finding_group/open
Exception Value: 'NoneType' object has no attribute 'encode'

When grouping on title it just keeps spinning, maybe it's just very slow. This might make sense as title is a free form text field which will be hard to group by efficiently.
Similar for CVE which should be easier to group by, but also this requests doesn't seem to complete just spinning.

To be honest I couldn't follow all of the code and there seems to be alot of processing inside python instead of the database. Maybe that's why it's not performing at the moment?

My instance has ~170K findings.

@LeoOMaia
Copy link
Contributor Author

I understand why this happened: I didn’t handle the case where vuln_id_from_tool is None.
Since [None] is still a valid list, the check

if not finding_groups:
    return

does not filter it out.
In the function:

def get_group_names(finding: Finding, mode: GroupMode) -> list[str] | None:
    if mode == GroupMode.VULN_ID_FROM_TOOL:
        return [finding.vuln_id_from_tool]
    if mode == GroupMode.TITLE:
        return [finding.title]
    if mode == GroupMode.CVE:
        cves = list(
            finding.vulnerability_id_set.values_list("vulnerability_id", flat=True),
        )
        if cves:
            return cves
    return None

i will fix that, sorry.

@valentijnscholten
Copy link
Member

valentijnscholten commented Aug 30, 2025

When I look at your screenshots, the list of Dynamic Finding Groups looks a lot like pages in Defect Dojo where we display lists of entities with some statistics. For example the View Engagement page lists all the Tests in the Engagement and the number of findings, etc.

This basically a way to view at findings that are grouped by a certain field, in this case test_id. This is all done by the database using Django ORM / SQL queries. This is quite performant, at least up to 100K findings.

This PR wants to group findings by other fields such as vuln_id_from_tool, cve, title, etc.
Could you explain why a different implementation approach is needed here?

For example if I do an SQL query to group findings by title, this is very fast in my 170K instance (0.6s):

SELECT 
    f.title,
    COUNT(*) as total_findings,
    COUNT(CASE WHEN f.active = true THEN 1 END) as active_findings,
    MAX(f.numerical_severity) as max_numerical_severity,
    COUNT(DISTINCT t.engagement_id) as affected_engagements,
    COUNT(DISTINCT e.product_id) as affected_products,
    STRING_AGG(DISTINCT p.name, ', ') as product_names
FROM 
    dojo_finding f
    JOIN dojo_test t ON f.test_id = t.id
    JOIN dojo_engagement e ON t.engagement_id = e.id
    JOIN dojo_product p ON e.product_id = p.id
GROUP BY 
    f.title
ORDER BY 
    total_findings DESC;

This gives me similar results to the screenshots in this PR:

image

And that is including some joines to list the affected products, which would be a nice feature.

I might be forgetting something, but my initial approach would be to implement the dynamic grouping in the same way.

What are your thoughts on this @LeoOMaia

Also did another test today to look at the static finding group pages, but running into this error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/django/core/handlers/exception.py", line 55, in inner
    response = get_response(request)
               ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/django/core/handlers/base.py", line 197, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/django/views/generic/base.py", line 105, in view
    return self.dispatch(request, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/django/views/generic/base.py", line 144, in dispatch
    return handler(request, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/dojo/finding_group/views.py", line 302, in get
    paginated_finding_groups = self.paginate_queryset(finding_groups, request)
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/dojo/finding_group/views.py", line 288, in paginate_queryset
    paginator = Paginator(queryset, page_size)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/django/core/paginator.py", line 46, in __init__
    self._check_object_list_is_ordered()
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/django/core/paginator.py", line 142, in _check_object_list_is_ordered
    warnings.warn(
    ^

Exception Type: UnorderedObjectListWarning at /finding_group/open
Exception Value: Pagination may yield inconsistent results with an unordered object_list: <class 'dojo.models.Finding_Group'> QuerySet.

@LeoOMaia
Copy link
Contributor Author

I think your idea would be much better and would make it possible to remove the Redis storage. Regarding the message you received, was it an error or just a warning? I checked, and indeed I didn’t define a default ordering for when the user doesn’t request any sorting. I’m opening a PR to add a default ordering.

You’re right that grouping by title and vuln_id_from_tool would work correctly with the query you shared, and I agree this is a good approach for those fields. For CVE, however, the logic would need to be different. A single finding can have multiple CVEs, so the idea was to create groups for each CVE, which means the same finding could belong to multiple groups. Because of that, the query would have to be adapted, but I believe it’s still possible to achieve with a modified version of your example.

I’m currently looking into how I can adjust my implementation to better align with the query-based approach you suggested.

@LeoOMaia
Copy link
Contributor Author

Thank you for your comments! I’ll convert this PR into a draft so I can carefully review my code and remove Redis following your suggestion. It might take a little time, but once I’m done, I’ll reopen it.

@LeoOMaia LeoOMaia marked this pull request as draft August 30, 2025 23:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants