diff --git a/bsmain/management/commands/fetch_event_schema_json.py b/bsmain/management/commands/fetch_event_schema_json.py
index a6fe33f..f4500c5 100644
--- a/bsmain/management/commands/fetch_event_schema_json.py
+++ b/bsmain/management/commands/fetch_event_schema_json.py
@@ -1,7 +1,9 @@
import json
import requests
import fastjsonschema
-from subprocess import run, PIPE
+
+# no_bandit_expl: subprocess.run is used to call black in an interal utility script; it's just fine.
+from subprocess import run, PIPE # nosec B404
from django.conf import settings
@@ -28,7 +30,8 @@ class Command(BaseCommand):
def handle(self, *args, **options):
# Fetch the event schema JSON from the API and save it to disk
json_result = requests.get(
- "https://raw.githubusercontent.com/getsentry/sentry-data-schemas/main/relay/event.schema.json")
+ "https://raw.githubusercontent.com/getsentry/sentry-data-schemas/main/relay/event.schema.json",
+ timeout=10)
json_result.raise_for_status()
with open(settings.BASE_DIR / "api/event.schema.json", "w") as f:
@@ -36,7 +39,7 @@ class Command(BaseCommand):
# Fetch the license from GitHub and save it to disk (with annotation about the source)
license_url = "https://raw.githubusercontent.com/getsentry/sentry-data-schemas/main/LICENSE"
- license_result = requests.get(license_url)
+ license_result = requests.get(license_url, timeout=10)
license_result.raise_for_status()
with open(settings.BASE_DIR / "api/LICENSE", "w") as f:
@@ -67,4 +70,5 @@ The source of this file is: %s
# put it through 'black' to make it fit on screen at least, and perhaps to get useable diffs too.
# (we don't bother putting it in requirements.txt, since it's used so rarely)
- run(["black", settings.BASE_DIR / "bugsink/event_schema.py"], stdout=PIPE, stderr=PIPE)
+ # no_bandit_expl: subprocess.run is used to call black in an interal utility script; it's just fine.
+ run(["black", settings.BASE_DIR / "bugsink/event_schema.py"], stdout=PIPE, stderr=PIPE) # nosec
diff --git a/bsmain/management/commands/send_json.py b/bsmain/management/commands/send_json.py
index 826f4e4..1b671ab 100644
--- a/bsmain/management/commands/send_json.py
+++ b/bsmain/management/commands/send_json.py
@@ -1,7 +1,6 @@
import io
import uuid
import brotli
-import random
import time
import json
@@ -13,6 +12,7 @@ from django.conf import settings
from compat.dsn import get_store_url, get_envelope_url, get_header_value
from bugsink.streams import compress_with_zlib, WBITS_PARAM_FOR_GZIP, WBITS_PARAM_FOR_DEFLATE
+from bugsink.utils import nc_rnd
from projects.models import Project
@@ -123,9 +123,9 @@ class Command(BaseCommand):
# https://develop.sentry.dev/sdk/data-model/event-payloads/span/#attributes
# > A random hex string with a length of 16 characters. [which is 8 bytes]
- data["contexts"]["trace"]["span_id"] = random.getrandbits(64).to_bytes(8, byteorder='big').hex()
+ data["contexts"]["trace"]["span_id"] = nc_rnd.getrandbits(64).to_bytes(8, byteorder='big').hex()
# > A random hex string with a length of 32 characters. [which is 16 bytes]
- data["contexts"]["trace"]["trace_id"] = random.getrandbits(128).to_bytes(16, byteorder='big').hex()
+ data["contexts"]["trace"]["trace_id"] = nc_rnd.getrandbits(128).to_bytes(16, byteorder='big').hex()
if options["tag"]:
if "tags" not in data:
@@ -176,6 +176,7 @@ class Command(BaseCommand):
get_envelope_url(dsn) if use_envelope else get_store_url(dsn),
headers=headers,
data=compressed_data,
+ timeout=10,
)
elif compress == "br":
@@ -187,6 +188,7 @@ class Command(BaseCommand):
get_envelope_url(dsn) if use_envelope else get_store_url(dsn),
headers=headers,
data=compressed_data,
+ timeout=10,
)
else:
@@ -197,6 +199,7 @@ class Command(BaseCommand):
get_envelope_url(dsn) if use_envelope else get_store_url(dsn),
headers=headers,
data=data_bytes,
+ timeout=10,
)
response.raise_for_status()
diff --git a/bsmain/management/commands/stress_test.py b/bsmain/management/commands/stress_test.py
index 2380881..fd6f9e1 100644
--- a/bsmain/management/commands/stress_test.py
+++ b/bsmain/management/commands/stress_test.py
@@ -1,4 +1,3 @@
-import random
import io
import uuid
import brotli
@@ -13,16 +12,17 @@ from django.core.management.base import BaseCommand
from compat.dsn import get_envelope_url, get_header_value
from bugsink.streams import compress_with_zlib, WBITS_PARAM_FOR_GZIP, WBITS_PARAM_FOR_DEFLATE
+from bugsink.utils import nc_rnd
from issues.utils import get_values
def random_postfix():
# avoids numbers, because when usedd in the type I imagine numbers may at some point be ignored in the grouping.
- random_number = random.random()
+ random_number = nc_rnd.random()
if random_number < 0.1:
# 10% of the time we simply sample from 1M to create a "fat tail".
- unevenly_distributed_number = int(random.random() * 1_000_000)
+ unevenly_distributed_number = int(nc_rnd.random() * 1_000_000)
else:
unevenly_distributed_number = int(1 / random_number)
@@ -105,9 +105,9 @@ class Command(BaseCommand):
# https://develop.sentry.dev/sdk/data-model/event-payloads/span/#attributes
# > A random hex string with a length of 16 characters. [which is 8 bytes]
- data["contexts"]["trace"]["span_id"] = random.getrandbits(64).to_bytes(8, byteorder='big').hex()
+ data["contexts"]["trace"]["span_id"] = nc_rnd.getrandbits(64).to_bytes(8, byteorder='big').hex()
# > A random hex string with a length of 32 characters. [which is 16 bytes]
- data["contexts"]["trace"]["trace_id"] = random.getrandbits(128).to_bytes(16, byteorder='big').hex()
+ data["contexts"]["trace"]["trace_id"] = nc_rnd.getrandbits(128).to_bytes(16, byteorder='big').hex()
if options["tag"]:
if "tags" not in data:
@@ -153,7 +153,7 @@ class Command(BaseCommand):
for compressed_data in compressed_datas.values():
if self.stopping:
return
- dsn = random.choice(dsns)
+ dsn = nc_rnd.choice(dsns)
t0 = time.time()
success = Command.send_to_server(dsn, options, compress, compressed_data)
@@ -179,6 +179,7 @@ class Command(BaseCommand):
get_envelope_url(dsn),
headers=headers,
data=compressed_data,
+ timeout=10,
)
elif compress == "br":
@@ -187,12 +188,14 @@ class Command(BaseCommand):
get_envelope_url(dsn),
headers=headers,
data=compressed_data,
+ timeout=10,
)
response = requests.post(
get_envelope_url(dsn),
headers=headers,
data=compressed_data,
+ timeout=10,
)
response.raise_for_status()
return True
diff --git a/bugsink/app_settings.py b/bugsink/app_settings.py
index e51c97e..fe68237 100644
--- a/bugsink/app_settings.py
+++ b/bugsink/app_settings.py
@@ -5,6 +5,7 @@ import os
from contextlib import contextmanager
from django.conf import settings
+from bugsink.utils import assert_
_KIBIBYTE = 1024
@@ -127,7 +128,7 @@ def override_settings(**new_settings):
_settings = AttrLikeDict()
_settings.update(old_settings)
for k in new_settings:
- assert k in old_settings, "Unknown setting (likely error in tests): %s" % k
+ assert_(k in old_settings, "Unknown setting (likely error in tests): %s" % k)
_settings.update(new_settings)
yield
_settings = old_settings
diff --git a/bugsink/context_processors.py b/bugsink/context_processors.py
index f797596..172560e 100644
--- a/bugsink/context_processors.py
+++ b/bugsink/context_processors.py
@@ -22,10 +22,11 @@ from phonehome.models import Installation
SystemWarning = namedtuple('SystemWarning', ['message', 'ignore_url'])
+# no_bandit_expl: literally a constant string so safe by defenition
EMAIL_BACKEND_WARNING = mark_safe(
"""Email is not set up, emails won't be sent. To get the most out of Bugsink, please
set up email.""")
+ dark:text-slate-100">set up email.""") # nosec
def get_snappea_warnings():
diff --git a/bugsink/debug_views.py b/bugsink/debug_views.py
index 014d656..d3d2661 100644
--- a/bugsink/debug_views.py
+++ b/bugsink/debug_views.py
@@ -180,7 +180,8 @@ def _process_view_steps(middleware, request, wider_context):
try:
middleware._check_token(request)
- result["_check_token"] = "OK"
+ # no_bandit_expl: bandit triggers on the word "token" here but this is not exposure of a secret token
+ result["_check_token"] = "OK" # nosec
except RejectRequest as e:
result["_check_token"] = "FAILS WITH %s" % e.reason
result["process_view"] = "FAILS at _check_token"
@@ -219,7 +220,8 @@ def csrf_debug(request):
"META": {
k: request.META.get(k) for k in request.META.keys() if k.startswith("HTTP_")
},
- "SECURE_PROXY_SSL_HEADER": settings.SECURE_PROXY_SSL_HEADER[0] if settings.SECURE_PROXY_SSL_HEADER else None,
+ "SECURE_PROXY_SSL_HEADER":
+ settings.SECURE_PROXY_SSL_HEADER[0] if settings.SECURE_PROXY_SSL_HEADER else None,
"process_view": _process_view_steps(middleware, request, context),
})
diff --git a/bugsink/settings/development.py b/bugsink/settings/development.py
index 1e1c6c7..504b2e3 100644
--- a/bugsink/settings/development.py
+++ b/bugsink/settings/development.py
@@ -8,7 +8,9 @@ from sentry_sdk_extensions.transport import MoreLoudlyFailingTransport
from bugsink.utils import deduce_allowed_hosts, eat_your_own_dogfood
-SECRET_KEY = 'django-insecure-$@clhhieazwnxnha-_zah&(bieq%yux7#^07&xsvhn58t)8@xw'
+# no_bandit_expl: _development_ settings, we know that this is insecure; would fail to deploy in prod if (as configured)
+# the django checks (with --check --deploy) are run.
+SECRET_KEY = 'django-insecure-$@clhhieazwnxnha-_zah&(bieq%yux7#^07&xsvhn58t)8@xw' # nosec B105
DEBUG = True
diff --git a/bugsink/streams.py b/bugsink/streams.py
index 6472a44..295fa56 100644
--- a/bugsink/streams.py
+++ b/bugsink/streams.py
@@ -3,6 +3,7 @@ import io
import brotli
from bugsink.app_settings import get_settings
+from bugsink.utils import assert_
DEFAULT_CHUNK_SIZE = 8 * 1024
@@ -46,7 +47,7 @@ def brotli_generator(input_stream, chunk_size=DEFAULT_CHUNK_SIZE):
yield decompressor.process(compressed_chunk)
- assert decompressor.is_finished()
+ assert_(decompressor.is_finished())
class GeneratorReader:
diff --git a/bugsink/utils.py b/bugsink/utils.py
index 991e6e9..6a64e05 100644
--- a/bugsink/utils.py
+++ b/bugsink/utils.py
@@ -1,3 +1,4 @@
+import random
import logging
from collections import defaultdict
from urllib.parse import urlparse
@@ -10,6 +11,11 @@ from django.db.models import ForeignKey, F
from .version import version
+# alias for the random module that is explicitly used in "non-cryptographic" contexts; this is a utility to avoid false
+# positives in (bandit) security scans that complain about the use of `random`; by flagging a use as "non-cryptographic"
+# we avoid sprinkling `nosec` (and their explanations) all over the codebase.
+nc_rnd = random
+
logger = logging.getLogger("bugsink.email")
@@ -313,7 +319,7 @@ def delete_deps_with_budget(project_id, referring_model, fk_name, referred_ids,
my_num_deleted, del_d = referring_model.objects.filter(pk__in=[d['pk'] for d in relevant_ids_after_rec]).delete()
num_deleted += my_num_deleted
- assert set(del_d.keys()) == {referring_model._meta.label} # assert no-cascading (we do that ourselves)
+ assert_(set(del_d.keys()) == {referring_model._meta.label}) # assert no-cascading (we do that ourselves)
if is_for_project:
# short-circuit: project-deletion implies "no orphans" because the project kill everything with it.
@@ -325,3 +331,11 @@ def delete_deps_with_budget(project_id, referring_model, fk_name, referred_ids,
prune_orphans(referring_model, relevant_ids_after_rec)
return num_deleted
+
+
+def assert_(condition, message=None):
+ """Replacement for the `assert` statement as a function. Avoids the (possibly optimized-out) assert statement."""
+ if not condition:
+ if message is None:
+ raise AssertionError()
+ raise AssertionError(message)
diff --git a/compat/dsn.py b/compat/dsn.py
index bc7a297..e7a33c1 100644
--- a/compat/dsn.py
+++ b/compat/dsn.py
@@ -1,4 +1,5 @@
import urllib.parse
+from bugsink.utils import assert_
def _colon_port(port):
@@ -8,8 +9,9 @@ def _colon_port(port):
def build_dsn(base_url, project_id, public_key):
parts = urllib.parse.urlsplit(base_url)
- assert parts.scheme in ("http", "https"), "The BASE_URL setting must be a valid URL (starting with http or https)."
- assert parts.hostname, "The BASE_URL setting must be a valid URL. The hostname must be set."
+ assert_(
+ parts.scheme in ("http", "https"), "The BASE_URL setting must be a valid URL (starting with http or https).")
+ assert_(parts.hostname, "The BASE_URL setting must be a valid URL. The hostname must be set.")
return (f"{ parts.scheme }://{ public_key }@{ parts.hostname }{ _colon_port(parts.port) }" +
f"{ parts.path }/{ project_id }")
diff --git a/compat/timestamp.py b/compat/timestamp.py
index d0ee0f5..f9657e3 100644
--- a/compat/timestamp.py
+++ b/compat/timestamp.py
@@ -1,6 +1,7 @@
import datetime
from django.utils.dateparse import parse_datetime
+from bugsink.utils import assert_
def parse_timestamp(value):
@@ -25,7 +26,7 @@ def parse_timestamp(value):
def format_timestamp(value):
"""the reverse of parse_timestamp"""
- assert isinstance(value, datetime.datetime)
- assert value.tzinfo == datetime.timezone.utc
+ assert_(isinstance(value, datetime.datetime))
+ assert_(value.tzinfo == datetime.timezone.utc)
return value.isoformat()
diff --git a/ee/tenants/utils.py b/ee/tenants/utils.py
index 3fcd3f7..69e63f5 100644
--- a/ee/tenants/utils.py
+++ b/ee/tenants/utils.py
@@ -1,11 +1,12 @@
from contextlib import contextmanager
from .base import get_tenant_subdomain, use_tenant_subdomain
+from bugsink.utils import assert_
def add_tenant_subdomain_to_kwargs():
tenant_subdomain = get_tenant_subdomain()
- assert tenant_subdomain is not None, "Must have tenant set to be able to pass this to snappea"
+ assert_(tenant_subdomain is not None, "Must have tenant set to be able to pass this to snappea")
return {"TENANT_SUBDOMAIN": tenant_subdomain}
diff --git a/events/retention.py b/events/retention.py
index 53c1dad..c3162c7 100644
--- a/events/retention.py
+++ b/events/retention.py
@@ -1,10 +1,10 @@
import logging
from django.db.models import Q, Min, Max, Count
-from random import random
from datetime import timezone, datetime
from bugsink.moreiterutils import pairwise, map_N_until
+from bugsink.utils import assert_, nc_rnd
from performance.context_managers import time_and_query_count
from .storage_registry import get_storage
@@ -38,7 +38,7 @@ def get_epoch(datetime_obj):
# in the search for a cut-off value for the total irrelevance, so it doesn't matter in the end.)
# assuming we use model fields this 'just works' because Django's stores its stuff in timezone-aware UTC in the DB.
- assert datetime_obj.tzinfo == timezone.utc
+ assert_(datetime_obj.tzinfo == timezone.utc)
return int(datetime_obj.timestamp() / 3600)
@@ -85,8 +85,8 @@ def get_random_irrelevance(stored_event_count):
"""
# assert as a tripwire to check our assumptions; note that the actual calculation actually "succeeds" for < 1, but
# it becomes non-sensical, so I'd rather have it fail. The present event is part of the count, i.e. always >= 1
- assert stored_event_count >= 1
- return nonzero_leading_bits(round(random() * stored_event_count * 2))
+ assert_(stored_event_count >= 1)
+ return nonzero_leading_bits(round(nc_rnd.random() * stored_event_count * 2))
def should_evict(project, timestamp, stored_event_count):
diff --git a/events/utils.py b/events/utils.py
index 382b2fd..e79ed2c 100644
--- a/events/utils.py
+++ b/events/utils.py
@@ -6,6 +6,7 @@ import sourcemap
from issues.utils import get_values
from bugsink.transaction import delay_on_commit
+from bugsink.utils import assert_
from compat.timestamp import format_timestamp
@@ -73,7 +74,7 @@ def annotate_var_with_meta(var, meta_var):
"""
'var' is a (potentially trimmed) list or dict, 'meta_var' is a dict describing the trimming.
"""
- assert isinstance(var, (list, dict, str))
+ assert_(isinstance(var, (list, dict, str)))
if isinstance(var, list):
Incomplete = IncompleteList
diff --git a/files/tasks.py b/files/tasks.py
index bc52941..a193708 100644
--- a/files/tasks.py
+++ b/files/tasks.py
@@ -46,7 +46,8 @@ def assemble_artifact_bundle(bundle_checksum, chunk_checksums):
for filename, manifest_entry in manifest["files"].items():
file_data = bundle_zip.read(filename)
- checksum = sha1(file_data).hexdigest()
+ # usedforsecurity=false: sha1 is not used cryptographically, it's part of the protocol, so we use it as is.
+ checksum = sha1(file_data, usedforsecurity=False).hexdigest()
filename = basename(manifest_entry.get("url", filename))[:255]
@@ -112,7 +113,8 @@ def assemble_file(checksum, chunk_checksums, filename):
chunks_in_order = [chunks_dicts[checksum] for checksum in chunk_checksums] # implicitly checks chunk availability
data = b"".join([chunk.data for chunk in chunks_in_order])
- if sha1(data).hexdigest() != checksum:
+ # usedforsecurity=false: sha1 is not used cryptographically, and it's part of the protocol, so we use it as is.
+ if sha1(data, usedforsecurity=False).hexdigest() != checksum:
raise Exception("checksum mismatch")
result = File.objects.get_or_create(
diff --git a/files/views.py b/files/views.py
index 0508d71..4aa4be0 100644
--- a/files/views.py
+++ b/files/views.py
@@ -162,7 +162,8 @@ def chunk_upload(request, organization_slug):
for chunk in chunks:
data = chunk.getvalue()
- if sha1(data).hexdigest() != chunk.name:
+ # usedforsecurity=False: sha1 is not used cryptographically, and it's part of the protocol, so we use it as is.
+ if sha1(data, usedforsecurity=False).hexdigest() != chunk.name:
raise Exception("checksum mismatch")
with immediate_atomic(): # a snug fit around the only DB-writing thing we do here to ensure minimal blocking
diff --git a/ingest/event_counter.py b/ingest/event_counter.py
index 4f5c6b0..89da457 100644
--- a/ingest/event_counter.py
+++ b/ingest/event_counter.py
@@ -3,6 +3,7 @@ from datetime import timezone, datetime
from django.db.models import Min
from bugsink.period_utils import add_periods_to_datetime, sub_periods_from_datetime
+from bugsink.utils import assert_
def _filter_for_periods(qs, period_name, nr_of_periods, now):
@@ -28,7 +29,7 @@ def check_for_thresholds(qs, now, thresholds, add_for_current=0):
# The only relevant cost that this mechanism thus adds is the per-project counting of digested events.
# we only allow UTC, and we generally use Django model fields, which are UTC, so this should be good:
- assert now.tzinfo == timezone.utc
+ assert_(now.tzinfo == timezone.utc)
states = []
diff --git a/issues/models.py b/issues/models.py
index 7bf0003..7637521 100644
--- a/issues/models.py
+++ b/issues/models.py
@@ -9,6 +9,7 @@ from django.template.defaultfilters import date as default_date_filter
from django.conf import settings
from django.utils.functional import cached_property
+from bugsink.utils import assert_
from bugsink.volume_based_condition import VolumeBasedCondition
from bugsink.transaction import delay_on_commit
from alerts.tasks import send_unmute_alert
@@ -468,7 +469,7 @@ class IssueQuerysetStateManager(object):
unmute_after=None,
)
- assert triggering_event is None, "this method can only be called from the UI, i.e. user-not-event-triggered"
+ assert_(triggering_event is None, "this method can only be called from the UI, i.e. user-not-event-triggered")
# for the rest of this method there's no fancy queryset based stuff (we don't actually do updates on the DB)
# we resist the temptation to add filter(is_muted=True) in the below because that would actually add a query
# (for this remark to be true triggering_event must be None, which is asserted for in the above)
diff --git a/issues/views.py b/issues/views.py
index f3096c1..8e591a7 100644
--- a/issues/views.py
+++ b/issues/views.py
@@ -22,6 +22,7 @@ from bugsink.decorators import project_membership_required, issue_membership_req
from bugsink.transaction import durable_atomic
from bugsink.period_utils import add_periods_to_datetime
from bugsink.timed_sqlite_backend.base import different_runtime_limit
+from bugsink.utils import assert_
from events.models import Event
from events.ua_stuff import get_contexts_enriched_with_ua
@@ -780,7 +781,7 @@ def history_comment_new(request, issue):
if request.method == "POST":
form = CommentForm(request.POST)
- assert form.is_valid() # we have only a textfield with no validation properties; also: no html-side handling
+ assert_(form.is_valid()) # we have only a textfield with no validation properties; also: no html-side handling
if form.cleaned_data["comment"] != "":
# one special case: we simply ignore newly created comments without any contents as a (presumed) mistake. I
@@ -807,7 +808,7 @@ def history_comment_edit(request, issue, comment_pk):
if request.method == "POST":
form = CommentForm(request.POST, instance=comment)
- assert form.is_valid()
+ assert_(form.is_valid())
form.save()
return redirect(reverse(issue_history, kwargs={'issue_pk': issue.pk}) + f"#comment-{ comment_pk }")
diff --git a/performance/bursty_data.py b/performance/bursty_data.py
index 664142d..eb79e5f 100644
--- a/performance/bursty_data.py
+++ b/performance/bursty_data.py
@@ -1,6 +1,7 @@
import datetime
import math
-import random
+
+from bugsink.utils import nc_rnd
# a way to generate some bursty streams of points-in-time.
@@ -26,8 +27,8 @@ def generate_bursty_data(nr_of_waves=1, base_amplitude=1, expected_nr_of_bursts=
periodic_pattern = (1 + math.sin(i / period * 2 * math.pi)) / 2
# Introduce burst with probability 'burst_prob'
- if random.random() < burst_prob:
- burst = abs(random.gauss(0, burst_amplitude))
+ if nc_rnd.random() < burst_prob:
+ burst = abs(nc_rnd.gauss(0, burst_amplitude))
buckets[i] = periodic_pattern + burst
else:
buckets[i] = periodic_pattern
@@ -61,6 +62,6 @@ def buckets_to_points_in_time(buckets, begin, end, total_points):
bucket_points = round(bucket_points)
for j in range(bucket_points):
- points.append(begin + datetime.timedelta(seconds=bucket_size * (i + random.uniform(0, 1))))
+ points.append(begin + datetime.timedelta(seconds=bucket_size * (i + nc_rnd.uniform(0, 1))))
return sorted(points)
diff --git a/projects/forms.py b/projects/forms.py
index 2acf971..97dc50f 100644
--- a/projects/forms.py
+++ b/projects/forms.py
@@ -3,6 +3,7 @@ from django.contrib.auth import get_user_model
from django.template.defaultfilters import yesno
from django.urls import reverse
+from bugsink.utils import assert_
from teams.models import TeamMembership
from .models import Project, ProjectMembership, ProjectRole
@@ -40,7 +41,7 @@ class MyProjectMembershipForm(forms.ModelForm):
def __init__(self, *args, **kwargs):
edit_role = kwargs.pop("edit_role")
super().__init__(*args, **kwargs)
- assert self.instance is not None, "This form is only implemented for editing"
+ assert_(self.instance is not None, "This form is only implemented for editing")
if not edit_role:
del self.fields['role']
diff --git a/projects/views.py b/projects/views.py
index 92a911c..dd55500 100644
--- a/projects/views.py
+++ b/projects/views.py
@@ -17,6 +17,7 @@ from teams.models import TeamMembership, Team, TeamRole
from bugsink.app_settings import get_settings, CB_ANYBODY, CB_MEMBERS, CB_ADMINS
from bugsink.decorators import login_exempt, atomic_for_request_method
+from bugsink.utils import assert_
from alerts.models import MessagingServiceConfig
from alerts.forms import MessagingServiceConfigForm
@@ -415,7 +416,7 @@ def project_sdk_setup(request, project_pk, platform=""):
# NOTE about lexers:: I have bugsink/pyments_extensions; but the platforms mentioned there don't necessarily map to
# what I will make selectable here. "We'll see" whether yet another lookup dict will be needed.
- assert platform in ["", "python", "javascript", "php"]
+ assert_(platform in ["", "python", "javascript", "php"])
template_name = "projects/project_sdk_setup%s.html" % ("_" + platform if platform else "")
diff --git a/snappea/example_tasks.py b/snappea/example_tasks.py
index d228e09..f3a82a3 100644
--- a/snappea/example_tasks.py
+++ b/snappea/example_tasks.py
@@ -1,7 +1,7 @@
import time
-import random
import logging
+from bugsink.utils import nc_rnd
from .decorators import shared_task
# for the example tasks, we pick a non-snappea logger on purpose, to check that non-snappea logs are written in the
@@ -12,7 +12,7 @@ logger = logging.getLogger("bugsink")
@shared_task
def random_duration():
logger.info("Starting something of a random duration")
- time.sleep(random.random() * 10)
+ time.sleep(nc_rnd.random() * 10)
@shared_task
diff --git a/teams/forms.py b/teams/forms.py
index 3bcf3ec..a4ce73c 100644
--- a/teams/forms.py
+++ b/teams/forms.py
@@ -2,6 +2,7 @@ from django import forms
from django.contrib.auth import get_user_model
from django.template.defaultfilters import yesno
+from bugsink.utils import assert_
from .models import TeamRole, TeamMembership, Team
User = get_user_model()
@@ -37,7 +38,7 @@ class MyTeamMembershipForm(forms.ModelForm):
def __init__(self, *args, **kwargs):
edit_role = kwargs.pop("edit_role")
super().__init__(*args, **kwargs)
- assert self.instance is not None, "This form is only implemented for editing"
+ assert_(self.instance is not None, "This form is only implemented for editing")
if not edit_role:
del self.fields['role']
diff --git a/theme/templatetags/code.py b/theme/templatetags/code.py
index e222675..2f42a93 100644
--- a/theme/templatetags/code.py
+++ b/theme/templatetags/code.py
@@ -1,6 +1,7 @@
from pygments import highlight
from pygments.lexers import get_lexer_by_name
from pygments.formatters import HtmlFormatter
+from bugsink.utils import assert_
from django import template
@@ -28,8 +29,8 @@ class CodeNode(template.Node):
content = "\n".join([line.rstrip() for line in content.split("\n")])
lang_identifier, code = content.split("\n", 1)
- assert lang_identifier.startswith(":::") or lang_identifier.startswith("#!"), \
- "Expected code block identifier ':::' or '#!' not " + lang_identifier
+ assert_(lang_identifier.startswith(":::") or lang_identifier.startswith("#!"),
+ "Expected code block identifier ':::' or '#!' not " + lang_identifier)
lang = lang_identifier[3:].strip() if lang_identifier.startswith(":::") else lang_identifier[2:].strip()
is_shebang = lang_identifier.startswith("#!")
@@ -37,4 +38,5 @@ class CodeNode(template.Node):
lexer = get_lexer_by_name(lang, stripall=True)
- return highlight(code, lexer, formatter).replace("highlight", "p-4 mt-4 bg-slate-50 dark:bg-slate-800 syntax-coloring")
+ return highlight(code, lexer, formatter).replace(
+ "highlight", "p-4 mt-4 bg-slate-50 dark:bg-slate-800 syntax-coloring")
diff --git a/theme/templatetags/issues.py b/theme/templatetags/issues.py
index 9f0032a..898d839 100644
--- a/theme/templatetags/issues.py
+++ b/theme/templatetags/issues.py
@@ -5,12 +5,12 @@ from pygments import highlight
from pygments.formatters import HtmlFormatter
from django.utils.html import escape
-from django.utils.safestring import mark_safe
+from django.utils.safestring import SafeData, mark_safe
from django.template.defaultfilters import date
from compat.timestamp import parse_timestamp
-
+from bugsink.utils import assert_
from bugsink.pygments_extensions import guess_lexer_for_filename, lexer_for_platform
register = template.Library()
@@ -23,7 +23,7 @@ def _split(joined, lengths):
result.append(joined[start:start + length])
start += length
- assert [len(r) for r in result] == lengths
+ assert_([len(r) for r in result] == lengths)
return result
@@ -52,7 +52,7 @@ def _core_pygments(code, filename=None, platform=None):
# a line is" is not properly defined. (i.e.: is the thing after the final newline a line or not, both for the input
# and the output?). At the level of _pygmentize_lines the idea of a line is properly defined, so we only have to
# deal with pygments' funnyness.
- # assert len(code.split("\n")) == result.count("\n"), "%s != %s" % (len(code.split("\n")), result.count("\n"))
+ # assert_(len(code.split("\n")) == result.count("\n"), "%s != %s" % (len(code.split("\n")), result.count("\n")))
return result
@@ -71,7 +71,7 @@ def _pygmentize_lines(lines, filename=None, platform=None):
# [:-1] to remove the last empty line, a result of split()
result = _core_pygments(code, filename=filename, platform=platform).split('\n')[:-1]
- assert len(lines) == len(result), "%s != %s" % (len(lines), len(result))
+ assert_(len(lines) == len(result), "%s != %s" % (len(lines), len(result)))
return result
@@ -103,9 +103,10 @@ def pygmentize(value, platform):
pre_context, context_lines, post_context = _split(lines, lengths)
- value['pre_context'] = [mark_safe(s) for s in pre_context]
- value['context_line'] = mark_safe(context_lines[0])
- value['post_context'] = [mark_safe(s) for s in post_context]
+ # no_bandit_expl: see tests.TestPygmentizeEscape
+ value['pre_context'] = [mark_safe(s) for s in pre_context] # nosec B703, B308
+ value['context_line'] = mark_safe(context_lines[0]) # nosec B703, B308
+ value['post_context'] = [mark_safe(s) for s in post_context] # nosec B703, B308
return value
@@ -141,6 +142,18 @@ def shortsha(value):
return value[:12]
+def safe_join(sep, items, strict=False):
+ """join() that takes safe strings into account; strict=True means: I expect all inputs to be safe"""
+
+ text = sep.join(items)
+ if isinstance(sep, SafeData) and all(isinstance(i, SafeData) for i in items):
+ # no_bandit_expl: as per the check right above
+ return mark_safe(text) # nosec B703, B308
+ if strict:
+ raise ValueError("Cannot join non-safe in strict mode")
+ return text
+
+
@register.filter()
def format_var(value):
"""Formats a variable for display in the template; deals with 'marked as incomplete'."""
@@ -170,23 +183,27 @@ def format_var(value):
def gen_list(lst):
for value in lst:
- yield "", storevalue(value)
+ yield escape(""), storevalue(value)
if hasattr(lst, "incomplete"):
- yield f"<{lst.incomplete} items trimmed…>", None
+ # no_bandit_expl: constant string w/ substitution of an int (asserted)
+ assert_(isinstance(lst.incomplete, int))
+ yield mark_safe(f"<{lst.incomplete} items trimmed…>"), None # nosec B703, B308
def gen_dict(d):
for (k, v) in d.items():
- yield escape(repr(k)) + ": ", storevalue(v)
+ yield escape(repr(k)) + escape(": "), storevalue(v)
if hasattr(d, "incomplete"):
- yield f"<{d.incomplete} items trimmed…>", None
+ # no_bandit_expl: constant string w/ substitution of an int (asserted)
+ assert_(isinstance(d.incomplete, int))
+ yield mark_safe(f"<{d.incomplete} items trimmed…>"), None # nosec B703, B308
def gen_switch(obj):
if isinstance(obj, list):
- return bracket_wrap(gen_list(obj), "[", ", ", "]")
+ return bracket_wrap(gen_list(obj), escape("["), escape(", "), escape("]"))
if isinstance(obj, dict):
- return bracket_wrap(gen_dict(obj), "{", ", ", "}")
+ return bracket_wrap(gen_dict(obj), escape("{"), escape(", "), escape("}"))
return gen_base(obj)
result = []
@@ -209,8 +226,7 @@ def format_var(value):
stack.append(todo)
todo = gen_switch(recurse())
- # mark_safe is OK because the only non-escaped characters are the brackets, commas, and colons.
- return mark_safe("".join(result))
+ return safe_join(escape(""), result, strict=True)
# recursive equivalent:
@@ -218,19 +234,17 @@ def format_var(value):
# def format_var(value):
# """Formats a variable for display in the template; deals with 'marked as incomplete'.
# """
-# # mark_safe is OK because the only non-escaped characters are the brackets, commas, and colons.
-#
# if isinstance(value, dict):
-# parts = [(escape(repr(k)) + ": " + format_var(v)) for (k, v) in value.items()]
+# parts = [(escape(repr(k)) + escape(": ") + format_var(v)) for (k, v) in value.items()]
# if hasattr(value, "incomplete"):
# parts.append(mark_safe(f"<{value.incomplete} items trimmed…>"))
-# return mark_safe("{" + ", ".join(parts) + "}")
+# return escape("{") + safe_join(escape(", "), parts, strict=True) + escape("}")
#
# if isinstance(value, list):
# parts = [format_var(v) for v in value]
# if hasattr(value, "incomplete"):
# parts.append(mark_safe(f"<{value.incomplete} items trimmed…>"))
-# return mark_safe("[" + ", ".join(parts) + "]")
+# return escape("[") + safe_join(escape(", "), parts, strict=True) + escape("]")
#
# return escape(value)
@@ -242,10 +256,12 @@ def incomplete(value):
def _date_with_milis_html(timestamp):
- return mark_safe(
- '' +
- date(timestamp, "j M G:i:s") + "." +
- '' + date(timestamp, "u")[:3] + '')
+ # no_bandit_expl: constant string w/ substitution of an int (asserted)
+ return (
+ mark_safe('') + # nosec
+ escape(date(timestamp, "j M G:i:s")) + mark_safe(".") + # nosec
+ mark_safe('') + escape(date(timestamp, "u")[:3]) + # nosec
+ mark_safe('')) # nosec
@register.filter
diff --git a/theme/tests.py b/theme/tests.py
index cd6e168..45b2c5c 100644
--- a/theme/tests.py
+++ b/theme/tests.py
@@ -1,10 +1,11 @@
from unittest import TestCase as RegularTestCase
+from django.utils.safestring import SafeString
from bugsink.pygments_extensions import choose_lexer_for_pattern, get_all_lexers
from events.utils import IncompleteList, IncompleteDict
-from .templatetags.issues import _pygmentize_lines as actual_pygmentize_lines, format_var
+from .templatetags.issues import _pygmentize_lines as actual_pygmentize_lines, format_var, pygmentize
def _pygmentize_lines(lines):
@@ -109,6 +110,18 @@ class TestFormatVar(RegularTestCase):
self._format_var(var),
)
+ def test_format_var_nested_escaping(self):
+ # like format_nested, but with the focus on "does escaping happen correctly?"
+ var = {
+ "hacker": ["'],
+ 'context_line': '',
+ 'post_context': [''],
+ },
+ platform='python',
+ )
+
+ for line in out['pre_context'] + [out['context_line']] + out['post_context']:
+ self.assertIsInstance(line, SafeString)
+
+ # we just check for the non-existance of here because asserting against "whatever
+ # pygmentize does" is not very useful, as it may change in the future.
+ self.assertFalse("" in line)