mirror of
https://github.com/jlengrand/bugsink.git
synced 2026-03-10 08:01:17 +00:00
Remove the periodCounter and the PC registry
direct consequence of switching to SQL-based counting
This commit is contained in:
@@ -1,210 +0,0 @@
|
||||
from datetime import datetime, timezone, timedelta
|
||||
|
||||
# these constants are quite arbitrary; it can easily be argued that 90 minutes is an interesting time-frame (because of
|
||||
# the loss of granularity when stepping up to minutes) but we pick something that's close to the next level of
|
||||
# granularity. We can start measuring performance (time, memory) from there and can always make it bigger later.
|
||||
|
||||
MAX_MINUTES = 60
|
||||
MAX_HOURS = 24
|
||||
MAX_DAYS = 30
|
||||
MAX_MONTHS = 12
|
||||
MAX_YEARS = 5
|
||||
MAX_TOTALS = 1
|
||||
|
||||
|
||||
MIN_VALUE_AT_TUP_INDEX = 1000, 1, 1, 0, 0
|
||||
MAX_VALUE_AT_TUP_INDEX = 3000, 12, "?", 23, 59
|
||||
|
||||
|
||||
# TL for "tuple length", the length of the tuples for a given time-period
|
||||
TL_TOTAL = 0
|
||||
TL_YEAR = 1
|
||||
TL_MONTH = 2
|
||||
TL_DAY = 3
|
||||
TL_HOUR = 4
|
||||
TL_MINUTE = 5
|
||||
|
||||
|
||||
def noop():
|
||||
pass
|
||||
|
||||
|
||||
def _prev_tup(tup, n=1):
|
||||
# TODO: isn't this a premature optimization? We could just use a datetime/timedelta?
|
||||
aslist = list(tup)
|
||||
|
||||
# if n > 1 we try to first remove the largest possible chunk from the last element of the tuple (for performance
|
||||
# reasons), so that we can then do the remainder in the loop (always 1) and the recursive call (the rest)
|
||||
if n > 1:
|
||||
DONE_IN_LOOP = 1
|
||||
first_chunk = max(min( # the minimum of:
|
||||
n - DONE_IN_LOOP, # [A] the work to be done minus 1 in the loop-over-digits below
|
||||
tup[-1] - MIN_VALUE_AT_TUP_INDEX[len(tup) - 1], # [B] jump to roll-over right before entering that loop
|
||||
), 0) # but never less than 0
|
||||
aslist[-1] -= first_chunk
|
||||
n_for_recursive_call = n - first_chunk - DONE_IN_LOOP
|
||||
else:
|
||||
n_for_recursive_call = 0
|
||||
|
||||
# In this loop we just decrease by 1;
|
||||
for tup_index, val in reversed(list(enumerate(aslist))):
|
||||
# we inspect the parts of the tuple right-to-left and continue to decrease until there is no more roll-over.
|
||||
|
||||
if aslist[tup_index] == MIN_VALUE_AT_TUP_INDEX[tup_index]:
|
||||
# we've reached the min value which is the case that influences more than the current digit.
|
||||
if tup_index == 2:
|
||||
# day roll-over: just use a datetime because the max value is one of 28, 29, 30, 31.
|
||||
aslist = list((datetime(*aslist, tzinfo=timezone.utc) - timedelta(days=1)).timetuple()[:len(tup)])
|
||||
break # we've used a timedelta, so we don't need to do months/years "by hand" in the loop
|
||||
|
||||
else:
|
||||
# roll over to max
|
||||
aslist[tup_index] = MAX_VALUE_AT_TUP_INDEX[tup_index]
|
||||
# implied because no break: continue with the next more significant digit of the tuple
|
||||
|
||||
else:
|
||||
# no min-value reached, just dec at this point and stop.
|
||||
aslist[tup_index] -= 1
|
||||
break
|
||||
|
||||
if n_for_recursive_call > 0:
|
||||
return _prev_tup(aslist, n_for_recursive_call)
|
||||
|
||||
return tuple(aslist)
|
||||
|
||||
|
||||
def _next_tup(tup, n=1):
|
||||
aslist = list(tup)
|
||||
|
||||
# no 'first_chunk' implementation here, calls to _next_tup() are not a hot path in our code anyway.
|
||||
|
||||
for i in range(n):
|
||||
# In this loop we just increase by 1;
|
||||
for tup_index, val in reversed(list(enumerate(aslist))):
|
||||
# we inspect the parts of the tuple right-to-left and continue to decrease until there is no more roll-over.
|
||||
|
||||
if (tup_index == 2 and aslist[tup_index] >= 28) or aslist[tup_index] == MAX_VALUE_AT_TUP_INDEX[tup_index]:
|
||||
# we've reached the min value which is the case that influences more than the current digit.
|
||||
if tup_index == 2:
|
||||
# day roll-over (potentially, 28 and up): just use a datetime (the max value is one of 28, 29, 30,
|
||||
# 31)
|
||||
aslist = list((datetime(*aslist, tzinfo=timezone.utc) + timedelta(days=1)).timetuple()[:len(tup)])
|
||||
break # we've used a timedelta, so we don't need to do months/years "by hand" in the loop
|
||||
|
||||
else:
|
||||
# roll over to min
|
||||
aslist[tup_index] = MIN_VALUE_AT_TUP_INDEX[tup_index]
|
||||
# implied because no break: continue with the next more significant digit of the tuple
|
||||
|
||||
else:
|
||||
# no min-value reached, just inc at this point and stop.
|
||||
aslist[tup_index] += 1
|
||||
break
|
||||
|
||||
return tuple(aslist)
|
||||
|
||||
|
||||
def _inc(counts_for_tl, tup, n, max_age):
|
||||
if tup not in counts_for_tl:
|
||||
min_tup = _prev_tup(tup, max_age - 1)
|
||||
for k, v in list(counts_for_tl.items()):
|
||||
if k < min_tup:
|
||||
del counts_for_tl[k]
|
||||
|
||||
# default
|
||||
counts_for_tl[tup] = 0
|
||||
|
||||
# inc
|
||||
counts_for_tl[tup] += n
|
||||
|
||||
|
||||
def _reorganize_by_tl(thresholds_by_purpose):
|
||||
by_tl = {}
|
||||
|
||||
for purpose, items in thresholds_by_purpose.items():
|
||||
for (period_name, nr_of_periods, gte_threshold, metadata) in items:
|
||||
tl = PeriodCounter._tl_for_period(period_name)
|
||||
|
||||
if tl not in by_tl:
|
||||
by_tl[tl] = []
|
||||
|
||||
by_tl[tl].append((nr_of_periods, gte_threshold, metadata, purpose))
|
||||
|
||||
return by_tl
|
||||
|
||||
|
||||
class PeriodCounter(object):
|
||||
|
||||
def __init__(self):
|
||||
self.counts = {tuple_length: {} for tuple_length in range(TL_MINUTE + 1)}
|
||||
|
||||
def inc(self, datetime_utc, n=1, thresholds={}):
|
||||
# thresholds :: purpose -> [(period_name, nr_of_periods, gte_threshold, metadata), ...]
|
||||
|
||||
# we only allow UTC, and we generally use Django model fields, which are UTC, so this should be good:
|
||||
assert datetime_utc.tzinfo == timezone.utc
|
||||
|
||||
tup = datetime_utc.timetuple()
|
||||
|
||||
thresholds_by_tl = _reorganize_by_tl(thresholds)
|
||||
states_with_metadata_by_purpose = {purpose: [] for purpose in thresholds.keys()}
|
||||
|
||||
for tl, mx in enumerate([MAX_TOTALS, MAX_YEARS, MAX_MONTHS, MAX_DAYS, MAX_HOURS, MAX_MINUTES]):
|
||||
_inc(self.counts[tl], tup[:tl], n, mx)
|
||||
|
||||
thresholds_for_tl = thresholds_by_tl.get(tl, {})
|
||||
for (nr_of_periods, gte_threshold, metadata, purpose) in thresholds_for_tl:
|
||||
state = self._state_for_threshold(tup[:tl], tl, nr_of_periods, gte_threshold)
|
||||
if state:
|
||||
if tl > 0:
|
||||
# `below_threshold_from` is the first moment in time where the condition no longer applies.
|
||||
below_threshold_tup = _next_tup(
|
||||
self._get_first_tup_contributing_to_threshold(tup[:tl], tl, nr_of_periods),
|
||||
|
||||
# +1 for "next period" (the first where the condition no longer applies), -1 for "first
|
||||
# period counts" hence no +/- correction.
|
||||
n=nr_of_periods,
|
||||
) + MIN_VALUE_AT_TUP_INDEX[tl:] # fill with min values for the non-given ones
|
||||
below_threshold_from = datetime(*below_threshold_tup, tzinfo=timezone.utc)
|
||||
else:
|
||||
# when counting the 'total', there will never be a time when the condition becomes false. We
|
||||
# just pick an arbitrarily large date; we'll deal with it by the end of the myria-annum.
|
||||
# unlikely to actually end up in the DB (because it would imply the use of a quota for total).
|
||||
below_threshold_from = datetime(9999, 12, 31, 23, 59, tzinfo=timezone.utc)
|
||||
else:
|
||||
below_threshold_from = None
|
||||
|
||||
states_with_metadata_by_purpose[purpose].append((state, below_threshold_from, metadata))
|
||||
|
||||
# we return tuples of (state, below_threshold_from, metadata) where metadata is something arbitrary that can be
|
||||
# passed in (it allows us to tie back to "what caused this to be true/false?"
|
||||
# TODO: I think that in practice the metadata is always implied by the thresholds, i.e. instead of
|
||||
# passing-through we could just return the thresholds that were met.
|
||||
return states_with_metadata_by_purpose
|
||||
|
||||
@staticmethod
|
||||
def _tl_for_period(period_name):
|
||||
return {
|
||||
"total": 0,
|
||||
"year": 1,
|
||||
"month": 2,
|
||||
"day": 3,
|
||||
"hour": 4,
|
||||
"minute": 5,
|
||||
}[period_name]
|
||||
|
||||
def _state_for_threshold(self, tup, tl, nr_of_periods, gte_threshold):
|
||||
min_tup = _prev_tup(tup, nr_of_periods - 1) if tup != () else () # -1 because the current one also counts
|
||||
counts_for_tl = self.counts[tl]
|
||||
total = sum([v for k, v in counts_for_tl.items() if k >= min_tup])
|
||||
|
||||
return total >= gte_threshold
|
||||
|
||||
def _get_first_tup_contributing_to_threshold(self, tup, tl, nr_of_periods):
|
||||
# there's code duplication here with _state_for_threshold which also results in stuff being executed twice when
|
||||
# the state is True; however, getting rid of this would be a "premature optimization", because states "flip to
|
||||
# true" only very irregularly (for unmute flip-to-true results in removal from the 'watch list', and for quota
|
||||
# flip-to-true results in 'no more ingestion for a while')
|
||||
min_tup = _prev_tup(tup, nr_of_periods - 1) if tup != () else ()
|
||||
counts_for_tl = self.counts[tl]
|
||||
return min([k for k, v in counts_for_tl.items() if k >= min_tup and v > 0])
|
||||
@@ -1,136 +0,0 @@
|
||||
import contextlib
|
||||
import atexit
|
||||
import os
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from projects.models import Project
|
||||
from events.models import Event
|
||||
from issues.models import Issue
|
||||
from performance.context_managers import time_to_logger
|
||||
from snappea.settings import get_settings as get_snappea_settings
|
||||
|
||||
from .period_counter import PeriodCounter
|
||||
from .app_settings import get_settings
|
||||
|
||||
|
||||
ingest_logger = logging.getLogger("bugsink.ingest")
|
||||
performance_logger = logging.getLogger("bugsink.performance.registry")
|
||||
|
||||
|
||||
_registry = None
|
||||
|
||||
|
||||
class PeriodCounterRegistry(object):
|
||||
# A few notes on observed memory consumption:
|
||||
|
||||
# >>> (4_000_000_000 *.02) / (1024*1024)
|
||||
# 76MB
|
||||
|
||||
# >>> _ / 6954 (nr of issues)
|
||||
# 0.01097123171016681
|
||||
# i.e. 10K per issue.
|
||||
|
||||
# >>> c. 15ms _average_ initialization time per issue
|
||||
|
||||
# Ways forward: I could just make it lazy (this is also good for startup times!)
|
||||
|
||||
def __init__(self):
|
||||
self.by_project, self.by_issue = self.load_from_scratch(
|
||||
projects=Project.objects.all(),
|
||||
issues=Issue.objects.all(),
|
||||
ordered_events=Event.objects.all().order_by('server_side_timestamp'),
|
||||
now=datetime.now(timezone.utc),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def load_from_scratch(self, projects, issues, ordered_events, now):
|
||||
# create period counters for all projects and issues
|
||||
by_project = {}
|
||||
by_issue = {}
|
||||
|
||||
for project in projects.iterator():
|
||||
by_project[project.id] = PeriodCounter()
|
||||
|
||||
for issue in issues.iterator():
|
||||
by_issue[issue.id] = PeriodCounter()
|
||||
|
||||
# load all events (one by one, let's measure the slowness of the naive implementation before making it faster)
|
||||
for event in ordered_events.iterator():
|
||||
project_pc = by_project[event.project_id]
|
||||
project_pc.inc(event.timestamp)
|
||||
|
||||
issue_pc = by_issue[event.issue_id]
|
||||
issue_pc.inc(event.timestamp)
|
||||
|
||||
return by_project, by_issue
|
||||
|
||||
|
||||
class NotSingleton(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _cleanup_pidfile():
|
||||
pid_filename = os.path.join(get_settings().INGEST_STORE_BASE_DIR, "pc_registry.pid")
|
||||
with contextlib.suppress(FileNotFoundError):
|
||||
os.remove(pid_filename)
|
||||
|
||||
|
||||
def _ensure_singleton():
|
||||
# We ensure that the assumption that there's only one process doing ingestion (this is a requirement of our setup
|
||||
# because our in-memory pc_registry only works correctly if there's only one that takes all `.inc` calls).
|
||||
#
|
||||
# Implementation copied with modifications from to snappea/foreman.py. We construct a PID_FILENAME from
|
||||
# INGEST_STORE_BASE_DIR (that's a good place because it maps nicely to 'one place on the FS where ingestion-related
|
||||
# stuff happens).
|
||||
#
|
||||
pid_filename = os.path.join(get_settings().INGEST_STORE_BASE_DIR, "pc_registry.pid")
|
||||
|
||||
# this implementation is not supposed to be bullet-proof for race conditions (nor is it cross-platform) but it
|
||||
# guards against:
|
||||
# * misconfigurations, in particular running multiple gunicorn workers when either TASK_ALWAYS_EAGER or
|
||||
# DIGEST_IMMEDIATELY is True (in both cases ingestion happens right in the web process, and if there is more than
|
||||
# one web process, we get multiple pc_registry instances)
|
||||
# * programming errors by the bugsink developers (non-ingesting processes calling `get_pc_registry`)
|
||||
|
||||
pid = os.getpid()
|
||||
|
||||
if os.path.exists(pid_filename):
|
||||
with open(pid_filename, "r") as f:
|
||||
old_pid = int(f.read())
|
||||
if os.path.exists(f"/proc/{old_pid}"):
|
||||
eager = "eager" if get_snappea_settings().TASK_ALWAYS_EAGER else "not eager"
|
||||
digest_immediately = "digest immediately" if get_settings().DIGEST_IMMEDIATELY else "digest later"
|
||||
running = settings.I_AM_RUNNING.lower()
|
||||
raise NotSingleton("Other pc_registry exists. I am '%s' in mode '%s, %s'" % (
|
||||
running, digest_immediately, eager))
|
||||
else:
|
||||
ingest_logger.warning("Stale pc_registry pid file found, removing %s", pid_filename)
|
||||
os.remove(pid_filename)
|
||||
|
||||
os.makedirs(os.path.dirname(pid_filename), exist_ok=True)
|
||||
with open(pid_filename, "w") as f:
|
||||
f.write(str(pid))
|
||||
|
||||
atexit.register(_cleanup_pidfile)
|
||||
|
||||
|
||||
def get_pc_registry():
|
||||
# note: must be run inside a transaction to ensure consistency because we use .iterator()
|
||||
# https://docs.djangoproject.com/en/5.0/ref/databases/#sqlite-isolation
|
||||
|
||||
global _registry
|
||||
if _registry is None:
|
||||
with time_to_logger(performance_logger, "period counter registry initialization"):
|
||||
_ensure_singleton()
|
||||
_registry = PeriodCounterRegistry()
|
||||
return _registry
|
||||
|
||||
|
||||
def reset_pc_registry():
|
||||
# needed for tests
|
||||
global _registry
|
||||
_registry = None
|
||||
_cleanup_pidfile()
|
||||
185
bugsink/tests.py
185
bugsink/tests.py
@@ -1,19 +1,9 @@
|
||||
import io
|
||||
from datetime import datetime, timezone
|
||||
import brotli
|
||||
|
||||
from unittest import TestCase as RegularTestCase
|
||||
from django.test import TestCase as DjangoTestCase
|
||||
|
||||
from projects.models import Project
|
||||
from issues.models import Issue, IssueStateManager
|
||||
from issues.factories import denormalized_issue_fields
|
||||
from events.models import Event
|
||||
from events.factories import create_event
|
||||
|
||||
from .period_counter import PeriodCounter, _prev_tup, _next_tup
|
||||
from .volume_based_condition import VolumeBasedCondition
|
||||
from .registry import PeriodCounterRegistry
|
||||
from .streams import (
|
||||
compress_with_zlib, GeneratorReader, WBITS_PARAM_FOR_GZIP, WBITS_PARAM_FOR_DEFLATE, MaxDataReader,
|
||||
MaxDataWriter, zlib_generator, brotli_generator)
|
||||
@@ -25,144 +15,6 @@ def apply_n(f, n, v):
|
||||
return v
|
||||
|
||||
|
||||
class PeriodCounterTestCase(RegularTestCase):
|
||||
|
||||
def test_prev_tup_near_rollover(self):
|
||||
self.assertEquals((2020,), _prev_tup((2021,)))
|
||||
|
||||
self.assertEquals((2020, 1), _prev_tup((2020, 2)))
|
||||
self.assertEquals((2019, 12), _prev_tup((2020, 1)))
|
||||
|
||||
self.assertEquals((2020, 1, 1), _prev_tup((2020, 1, 2)))
|
||||
self.assertEquals((2019, 12, 31), _prev_tup((2020, 1, 1)))
|
||||
self.assertEquals((2020, 2, 29), _prev_tup((2020, 3, 1)))
|
||||
self.assertEquals((2019, 2, 28), _prev_tup((2019, 3, 1)))
|
||||
|
||||
self.assertEquals((2020, 1, 1, 10), _prev_tup((2020, 1, 1, 11)))
|
||||
self.assertEquals((2020, 1, 1, 0), _prev_tup((2020, 1, 1, 1)))
|
||||
self.assertEquals((2019, 12, 31, 23), _prev_tup((2020, 1, 1, 0)))
|
||||
self.assertEquals((2019, 12, 31, 22), _prev_tup((2019, 12, 31, 23)))
|
||||
|
||||
self.assertEquals((2020, 1, 1, 0, 0), _prev_tup((2020, 1, 1, 0, 1)))
|
||||
self.assertEquals((2019, 12, 31, 23, 59), _prev_tup((2020, 1, 1, 0, 0)))
|
||||
|
||||
def test_prev_tup_large_number_of_applications(self):
|
||||
self.assertEquals((1920,), apply_n(_prev_tup, 100, (2020,)))
|
||||
self.assertEquals((2010, 5), apply_n(_prev_tup, 120, (2020, 5)))
|
||||
self.assertEquals((2019, 5, 7,), apply_n(_prev_tup, 366, (2020, 5, 7)))
|
||||
self.assertEquals((1899, 5, 7,), apply_n(_prev_tup, 365, (1900, 5, 7)))
|
||||
self.assertEquals((2020, 5, 6, 20,), apply_n(_prev_tup, 24, (2020, 5, 7, 20,)))
|
||||
self.assertEquals((2020, 5, 6, 20, 12), apply_n(_prev_tup, 1440, (2020, 5, 7, 20, 12)))
|
||||
|
||||
def test_prev_tup_with_explicit_n(self):
|
||||
self.assertEquals(_prev_tup((2020,), 100), apply_n(_prev_tup, 100, (2020,)))
|
||||
self.assertEquals(_prev_tup((2020, 5), 120), apply_n(_prev_tup, 120, (2020, 5)))
|
||||
self.assertEquals(_prev_tup((2020, 5, 7), 366), apply_n(_prev_tup, 366, (2020, 5, 7)))
|
||||
self.assertEquals(_prev_tup((2020, 5, 7, 20,), 24), apply_n(_prev_tup, 24, (2020, 5, 7, 20,)))
|
||||
self.assertEquals(_prev_tup((2020, 5, 7, 20, 12), 1440), apply_n(_prev_tup, 1440, (2020, 5, 7, 20, 12)))
|
||||
|
||||
def test_prev_tup_works_for_empty_tup(self):
|
||||
# in general 'prev' is not defined for empty tuples; but it is convienient to define it as the empty tuple
|
||||
# because it makes the implementation of PeriodCounter simpler for the case of "all 1 'total' periods".
|
||||
|
||||
self.assertEquals((), _prev_tup(()))
|
||||
# the meaninglessness of prev_tup is not extended to the case of n > 1, because "2 total periods" makes no sense
|
||||
# self.assertEquals((), _prev_tup((), 2))
|
||||
|
||||
def test_next_tup_near_rollover(self):
|
||||
self.assertEquals((2021,), _next_tup((2020,)))
|
||||
|
||||
self.assertEquals((2020, 2), _next_tup((2020, 1)))
|
||||
self.assertEquals((2020, 1), _next_tup((2019, 12)))
|
||||
|
||||
self.assertEquals((2020, 1, 2), _next_tup((2020, 1, 1)))
|
||||
self.assertEquals((2020, 1, 1), _next_tup((2019, 12, 31)))
|
||||
self.assertEquals((2020, 3, 1), _next_tup((2020, 2, 29)))
|
||||
self.assertEquals((2019, 3, 1), _next_tup((2019, 2, 28)))
|
||||
|
||||
self.assertEquals((2020, 1, 1, 11), _next_tup((2020, 1, 1, 10)))
|
||||
self.assertEquals((2020, 1, 1, 1), _next_tup((2020, 1, 1, 0)))
|
||||
self.assertEquals((2020, 1, 1, 0), _next_tup((2019, 12, 31, 23)))
|
||||
self.assertEquals((2019, 12, 31, 23), _next_tup((2019, 12, 31, 22)))
|
||||
|
||||
self.assertEquals((2020, 1, 1, 0, 1), _next_tup((2020, 1, 1, 0, 0)))
|
||||
self.assertEquals((2020, 1, 1, 0, 0), _next_tup((2019, 12, 31, 23, 59)))
|
||||
|
||||
def test_next_tup_large_number_of_applications(self):
|
||||
self.assertEquals((2020,), apply_n(_next_tup, 100, (1920,)))
|
||||
self.assertEquals((2020, 5), apply_n(_next_tup, 120, (2010, 5)))
|
||||
self.assertEquals((2020, 5, 7), apply_n(_next_tup, 366, (2019, 5, 7,)))
|
||||
self.assertEquals((1900, 5, 7), apply_n(_next_tup, 365, (1899, 5, 7,)))
|
||||
self.assertEquals((2020, 5, 7, 20,), apply_n(_next_tup, 24, (2020, 5, 6, 20,)))
|
||||
self.assertEquals((2020, 5, 7, 20, 12), apply_n(_next_tup, 1440, (2020, 5, 6, 20, 12)))
|
||||
|
||||
def test_next_tup_with_explicit_n(self):
|
||||
self.assertEquals(_next_tup((2020,), 100), apply_n(_next_tup, 100, (2020,)))
|
||||
self.assertEquals(_next_tup((2020, 5), 120), apply_n(_next_tup, 120, (2020, 5)))
|
||||
self.assertEquals(_next_tup((2020, 5, 7), 366), apply_n(_next_tup, 366, (2020, 5, 7)))
|
||||
self.assertEquals(_next_tup((2020, 5, 7, 20,), 24), apply_n(_next_tup, 24, (2020, 5, 7, 20,)))
|
||||
self.assertEquals(_next_tup((2020, 5, 7, 20, 12), 1440), apply_n(_next_tup, 1440, (2020, 5, 7, 20, 12)))
|
||||
|
||||
def test_next_tup_works_for_empty_tup(self):
|
||||
# TODO check if needed
|
||||
# in general 'next' is not defined for empty tuples; but it is convienient to define it as the empty tuple
|
||||
# because it makes the implementation of PeriodCounter simpler for the case of "all 1 'total' periods".
|
||||
|
||||
self.assertEquals((), _next_tup(()))
|
||||
# the meaninglessness of next_tup is not extended to the case of n > 1, because "2 total periods" makes no sense
|
||||
# self.assertEquals((), _next_tup((), 2))
|
||||
|
||||
def test_how_to_create_datetime_utc_objects(self):
|
||||
# basically I just want to write this down somewhere
|
||||
datetime_utc = datetime.now(timezone.utc)
|
||||
pc = PeriodCounter()
|
||||
pc.inc(datetime_utc)
|
||||
|
||||
def test_thresholds_for_total(self):
|
||||
timepoint = datetime(2020, 1, 1, 10, 15, tzinfo=timezone.utc)
|
||||
|
||||
pc = PeriodCounter()
|
||||
thresholds = {"unmute": [("total", 1, 2, "meta")]}
|
||||
|
||||
# first inc: should not yet be True
|
||||
states = pc.inc(timepoint, thresholds=thresholds)
|
||||
self.assertEquals({"unmute": [(False, None, "meta")]}, states)
|
||||
|
||||
# second inc: should be True (threshold of 2)
|
||||
states = pc.inc(timepoint, thresholds=thresholds)
|
||||
self.assertEquals({"unmute": [(True, datetime(9999, 12, 31, 23, 59, tzinfo=timezone.utc), "meta")]}, states)
|
||||
|
||||
# third inc: should still be True
|
||||
states = pc.inc(timepoint, thresholds=thresholds)
|
||||
self.assertEquals({"unmute": [(True, datetime(9999, 12, 31, 23, 59, tzinfo=timezone.utc), "meta")]}, states)
|
||||
|
||||
def test_thresholds_for_year(self):
|
||||
tp_2020 = datetime(2020, 1, 1, 10, 15, tzinfo=timezone.utc)
|
||||
tp_2021 = datetime(2021, 1, 1, 10, 15, tzinfo=timezone.utc)
|
||||
tp_2022 = datetime(2022, 1, 1, 10, 15, tzinfo=timezone.utc)
|
||||
|
||||
pc = PeriodCounter()
|
||||
thresholds = {"unmute": [("year", 2, 3, "meta")]}
|
||||
|
||||
states = pc.inc(tp_2020, thresholds=thresholds)
|
||||
self.assertEquals({"unmute": [(False, None, "meta")]}, states)
|
||||
|
||||
states = pc.inc(tp_2020, thresholds=thresholds)
|
||||
self.assertEquals({"unmute": [(False, None, "meta")]}, states)
|
||||
|
||||
# 3rd in total: become True
|
||||
states = pc.inc(tp_2021, thresholds=thresholds)
|
||||
self.assertEquals({"unmute": [(True, datetime(2022, 1, 1, 0, 0, tzinfo=timezone.utc), "meta")]}, states)
|
||||
|
||||
# into a new year, total == 2: become false
|
||||
states = pc.inc(tp_2022, thresholds=thresholds)
|
||||
self.assertEquals({"unmute": [(False, None, "meta")]}, states)
|
||||
|
||||
# 3rd in (new) total: become True again
|
||||
states = pc.inc(tp_2022, thresholds=thresholds)
|
||||
self.assertEquals({"unmute": [(True, datetime(2023, 1, 1, 0, 0, tzinfo=timezone.utc), "meta")]}, states)
|
||||
|
||||
|
||||
class VolumeBasedConditionTestCase(RegularTestCase):
|
||||
|
||||
def test_serialization(self):
|
||||
@@ -173,43 +25,6 @@ class VolumeBasedConditionTestCase(RegularTestCase):
|
||||
self.assertEquals(vbc, vbc2)
|
||||
|
||||
|
||||
class PCRegistryTestCase(DjangoTestCase):
|
||||
|
||||
def test_empty(self):
|
||||
result = PeriodCounterRegistry().load_from_scratch(
|
||||
Project.objects.all(),
|
||||
Issue.objects.all(),
|
||||
Event.objects.all(),
|
||||
datetime.now(timezone.utc),
|
||||
)
|
||||
self.assertEquals(({}, {}), result)
|
||||
|
||||
def test_with_muted_issue_and_event(self):
|
||||
project = Project.objects.create(name="project")
|
||||
issue = Issue.objects.create(
|
||||
project=project,
|
||||
is_muted=True,
|
||||
unmute_on_volume_based_conditions='[{"period": "day", "nr_of_periods": 1, "volume": 100}]',
|
||||
**denormalized_issue_fields(),
|
||||
)
|
||||
|
||||
create_event(project, issue)
|
||||
|
||||
by_project, by_issue = PeriodCounterRegistry().load_from_scratch(
|
||||
Project.objects.all(),
|
||||
Issue.objects.all(),
|
||||
Event.objects.all(),
|
||||
datetime.now(timezone.utc),
|
||||
)
|
||||
|
||||
self.assertEquals({project.id}, by_project.keys())
|
||||
self.assertEquals({issue.id}, by_issue.keys())
|
||||
|
||||
self.assertEquals("day", IssueStateManager.get_unmute_thresholds(issue)[0][0])
|
||||
self.assertEquals(1, IssueStateManager.get_unmute_thresholds(issue)[0][1])
|
||||
self.assertEquals(100, IssueStateManager.get_unmute_thresholds(issue)[0][2])
|
||||
|
||||
|
||||
class StreamsTestCase(RegularTestCase):
|
||||
|
||||
def test_compress_decompress_gzip(self):
|
||||
|
||||
@@ -131,7 +131,7 @@ The recommended way to run Bugsink is using Gunicorn, a WSGI server.
|
||||
You can start the Bugsink server by running:
|
||||
|
||||
```bash
|
||||
PYTHONUNBUFFERED=1 gunicorn --bind="127.0.0.1:9000" --workers=1 --access-logfile - --capture-output --error-logfile - bugsink.wsgi
|
||||
PYTHONUNBUFFERED=1 gunicorn --bind="127.0.0.1:9000" --workers=2 --access-logfile - --capture-output --error-logfile - bugsink.wsgi
|
||||
```
|
||||
|
||||
You should see output indicating that the server is running. You can now access Bugsink by visiting
|
||||
|
||||
@@ -17,7 +17,6 @@ from projects.models import Project
|
||||
from events.factories import create_event_data
|
||||
from issues.factories import get_or_create_issue
|
||||
from issues.models import IssueStateManager, Issue, TurningPoint, TurningPointKind
|
||||
from bugsink.registry import reset_pc_registry
|
||||
from bugsink.app_settings import override_settings
|
||||
from compat.timestamp import format_timestamp
|
||||
from compat.dsn import get_header_value
|
||||
@@ -30,8 +29,7 @@ from .parsers import readuntil, NewlineFinder, ParseError, LengthFinder, Streami
|
||||
def _digest_params(event_data, project, request, now=None):
|
||||
if now is None:
|
||||
# because we want to count events before having created event objects (quota may block the latter) we cannot
|
||||
# depend on event.timestamp; instead, we look on the clock once here, and then use that for both the project
|
||||
# and issue period counters.
|
||||
# depend on event.timestamp; instead, we look on the clock once here, and then use that everywhere
|
||||
now = datetime.datetime.now(timezone.utc)
|
||||
|
||||
# adapter to quickly reuse existing tests on refactored code. let's see where the code ends up before spending
|
||||
@@ -71,10 +69,6 @@ class IngestViewTestCase(TransactionTestCase):
|
||||
alert_on_regression=False,
|
||||
alert_on_unmute=False,
|
||||
)
|
||||
reset_pc_registry() # see notes in issues/tests.py for possible improvement; needed because we test unmuting.
|
||||
|
||||
def tearDown(self):
|
||||
reset_pc_registry()
|
||||
|
||||
@patch("ingest.views.send_new_issue_alert")
|
||||
@patch("ingest.views.send_regression_alert")
|
||||
|
||||
@@ -96,8 +96,7 @@ class BaseIngestAPIView(View):
|
||||
@classmethod
|
||||
def process_event(cls, event_id, event_data_stream, project, request):
|
||||
# because we want to count events before having created event objects (quota may block the latter) we cannot
|
||||
# depend on event.timestamp; instead, we look on the clock once here, and then use that for both the project
|
||||
# and issue period counters.
|
||||
# depend on event.timestamp; instead, we look on the clock once here, and then use that everywhere.
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
event_metadata = cls.get_event_meta(now, request, project)
|
||||
@@ -135,11 +134,6 @@ class BaseIngestAPIView(View):
|
||||
|
||||
timestamp = parse_timestamp(event_metadata["timestamp"])
|
||||
|
||||
# Leave counting at the top to ensure that get_pc_registry() is called so that when load_from_scratch is
|
||||
# triggered the pre-digest counts are correct. (if load_from_scratch would be triggered after Event-creation,
|
||||
# but before the call to `.inc` the first event to be digested would be double-counted). A note on locking:
|
||||
# period_counter accesses are serialized "automatically" because they are inside an immediate transaction, so
|
||||
# threading will "just work".
|
||||
cls.count_project_periods_and_act_on_it(project, timestamp)
|
||||
|
||||
# I resisted the temptation to put `get_denormalized_fields_for_data` in an if-statement: you basically "always"
|
||||
|
||||
@@ -13,7 +13,6 @@ from django.test import tag
|
||||
|
||||
from projects.models import Project, ProjectMembership
|
||||
from releases.models import create_release_if_needed
|
||||
from bugsink.registry import reset_pc_registry
|
||||
from events.factories import create_event
|
||||
from ingest.management.commands.send_json import Command as SendJsonCommand
|
||||
from compat.dsn import get_header_value
|
||||
@@ -301,33 +300,11 @@ seen as an undo rather than anything else.
|
||||
class MuteUnmuteTestCase(TransactionTestCase):
|
||||
"""
|
||||
Somewhat of an integration test. The unit-under-test here is the whole of
|
||||
* the pc_registry
|
||||
* BaseIngestAPIView.count_issue_periods_and_act_on_it
|
||||
* PeriodCounter (counting, thresholds)
|
||||
* threshold-counting
|
||||
* IssueStateManager.unmute
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
"""
|
||||
(TODO The below is not implemented yet; we may need it at some point)
|
||||
|
||||
To avoid unintentional modifications to the pc_registry in tests without a recent reset, consider the following
|
||||
|
||||
1. In each setup method, explicitly set a testing flag to indicate that it is safe to interact
|
||||
with the pc_registry. For example, include the line:
|
||||
`set_testing_flag(True)` (as well as resetting the pc_registry)
|
||||
|
||||
2. In each tearDown method, reset the testing flag to indicate that further modifications
|
||||
to the pc_registry should be avoided. For example, include the line:
|
||||
`set_testing_flag(False)`
|
||||
|
||||
This approach helps prevent accidental influences of one test on another via the pc_registry.
|
||||
"""
|
||||
reset_pc_registry()
|
||||
|
||||
def tearDown(self):
|
||||
reset_pc_registry()
|
||||
|
||||
def test_mute_no_vbc_for_unmute(self):
|
||||
project = Project.objects.create()
|
||||
|
||||
|
||||
@@ -1,165 +0,0 @@
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
import random
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from bugsink.period_counter import _prev_tup, PeriodCounter
|
||||
from performance.bursty_data import generate_bursty_data, buckets_to_points_in_time
|
||||
from bugsink.registry import get_pc_registry
|
||||
|
||||
from projects.models import Project
|
||||
from issues.models import Issue
|
||||
from events.models import Event
|
||||
|
||||
|
||||
# this file is the beginning of an approach to getting a handle on performance.
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "..."
|
||||
|
||||
def handle(self, *args, **options):
|
||||
if "performance" not in str(settings.DATABASES["default"]["NAME"]):
|
||||
raise ValueError("This command should only be run on the performance-test database")
|
||||
|
||||
print_thoughts_about_prev_tup()
|
||||
print_thoughts_about_inc()
|
||||
print_thoughts_about_event_evaluation()
|
||||
print_thoughts_about_pc_registry()
|
||||
|
||||
|
||||
class passed_time(object):
|
||||
def __enter__(self):
|
||||
self.t0 = time.time()
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
self.elapsed = (time.time() - self.t0) * 1_000 # miliseconds is a good unit for timeing things
|
||||
|
||||
|
||||
def print_thoughts_about_prev_tup():
|
||||
v = (2020, 1, 1, 10, 10)
|
||||
with passed_time() as t:
|
||||
for i in range(1_000):
|
||||
v = _prev_tup(v)
|
||||
|
||||
print(f"""## _prev_tup()
|
||||
|
||||
1_000 iterations of _prev_tup in {t.elapsed:.3f}ms. The main thing we care about is not this little
|
||||
private helper though, but PeriodCounter.inc(). Let's test that next.
|
||||
|
||||
""")
|
||||
|
||||
|
||||
def print_thoughts_about_inc():
|
||||
random.seed(42)
|
||||
|
||||
pc = PeriodCounter()
|
||||
|
||||
# make sure the pc has some data before we start
|
||||
for point in buckets_to_points_in_time(
|
||||
generate_bursty_data(num_buckets=350, expected_nr_of_bursts=10),
|
||||
datetime(2020, 10, 15, tzinfo=timezone.utc),
|
||||
datetime(2021, 10, 15, 10, 5, tzinfo=timezone.utc),
|
||||
10_000,
|
||||
):
|
||||
|
||||
pc.inc(point)
|
||||
|
||||
points = buckets_to_points_in_time(
|
||||
generate_bursty_data(num_buckets=25, expected_nr_of_bursts=5),
|
||||
datetime(2021, 10, 15, 10, 5, tzinfo=timezone.utc),
|
||||
datetime(2021, 10, 16, 10, 5, tzinfo=timezone.utc),
|
||||
1000)
|
||||
|
||||
with passed_time() as t:
|
||||
for point in points:
|
||||
pc.inc(point)
|
||||
|
||||
print(f"""## PeriodCounter.inc()
|
||||
|
||||
1_000 iterations of PeriodCounter.inc() in {t.elapsed:.3f}ms. We care about evaluation of some event more though. Let's
|
||||
test that next.
|
||||
""")
|
||||
|
||||
|
||||
def print_thoughts_about_event_evaluation():
|
||||
random.seed(42)
|
||||
|
||||
pc = PeriodCounter()
|
||||
|
||||
def noop():
|
||||
pass
|
||||
|
||||
# Now, let's add some event-listeners. These are chosen to match a typical setup of quota for a given Issue or
|
||||
# Project. In this setup, the monthly maximum is spread out in a way that the smaller parts are a bit more than just
|
||||
# splitting things equally. Why? We want some flexibility for bursts of activity without using up the entire budget
|
||||
# for a longer time all at once.
|
||||
pc.add_event_listener("day", 30, 10_000, noop, noop, initial_event_state=False) # 1 month rolling window
|
||||
pc.add_event_listener("hour", 24, 1_000, noop, noop, initial_event_state=False) # 1 day rolling window
|
||||
pc.add_event_listener("minute", 60, 200, noop, noop, initial_event_state=False) # 1 hour rolling window
|
||||
|
||||
# make sure the pc has some data before we start. we pick a 1-month period to match the listeners in the above.
|
||||
for point in buckets_to_points_in_time(
|
||||
generate_bursty_data(num_buckets=350, expected_nr_of_bursts=10),
|
||||
datetime(2021, 10, 15, tzinfo=timezone.utc),
|
||||
datetime(2021, 11, 15, 10, 5, tzinfo=timezone.utc),
|
||||
10_000,
|
||||
):
|
||||
|
||||
pc.inc(point)
|
||||
|
||||
# now we start the test: we generate a bursty data-set for a 1-day period, and see how long it takes to evaluate
|
||||
points = buckets_to_points_in_time(
|
||||
generate_bursty_data(num_buckets=25, expected_nr_of_bursts=5),
|
||||
datetime(2021, 11, 15, 10, 5, tzinfo=timezone.utc),
|
||||
datetime(2021, 11, 16, 10, 5, tzinfo=timezone.utc),
|
||||
1000)
|
||||
|
||||
with passed_time() as t:
|
||||
for point in points:
|
||||
pc.inc(point)
|
||||
|
||||
print(f"""## PeriodCounter.inc()
|
||||
|
||||
1_000 iterations of PeriodCounter.inc() in {t.elapsed:.3f}ms. (when 3 event-listeners are active). I'm not sure exactly
|
||||
what a good performance would be here, but I can say the following: this means when a 1,000 events happen in a second,
|
||||
the period-counter uses up 3% of the budget. A first guess would be: this is good enough.""")
|
||||
|
||||
|
||||
def print_thoughts_about_pc_registry():
|
||||
# note: in load_performance_insights we use minimal (non-data-containing) events here. this may not be
|
||||
# representative of real world performance. having said that: this immediately triggers the thought that for real
|
||||
# initialization only timestamps and issue_ids are needed, and that we should adjust the code accordingly
|
||||
|
||||
with passed_time() as t:
|
||||
get_pc_registry()
|
||||
|
||||
print(f"""## get_pc_registry()
|
||||
|
||||
getting the pc-registry takes {t.elapsed:.3f}ms. (with the default fixtures, which contain
|
||||
|
||||
* { Project.objects.count() } projects,
|
||||
* { Issue.objects.count() } issues,
|
||||
* { Event.objects.count() } events
|
||||
|
||||
This means (surprisingly) we can take our eye off optimizing this particular part of code (for now), because:
|
||||
|
||||
* in the (expected) production setup where we we cut ingestion and handling in 2 parts, 6s delay on the handling server
|
||||
boot is fine.
|
||||
* in the debugserver (integrated ingestion/handling) we don't expect 100k events; and even if we did a 6s delay on the
|
||||
first event/request is fine.
|
||||
|
||||
Counterpoint: on playground.bugsink.com I just observed 42s to initalize 150k events, which is ~5 times more slow than
|
||||
the above. It's also a "real hiccup". Anyway, there's too many questions about period counter (e.g. how to share
|
||||
across processes, or the consequences of quota) to focus on this particular point first.
|
||||
|
||||
Ways forward once we do decide to improve:
|
||||
|
||||
* regular saving of state (savepoint in time, with "unhandled after") (the regularity of saving is left as an exercise
|
||||
to the reader)
|
||||
* more granular caching/loading, e.g. load per project/issue on demand
|
||||
""")
|
||||
@@ -1,41 +0,0 @@
|
||||
## _prev_tup()
|
||||
|
||||
1_000 iterations of _prev_tup in 0.832ms. The main thing we care about is not this little
|
||||
private helper though, but PeriodCounter.inc(). Let's test that next.
|
||||
|
||||
|
||||
## PeriodCounter.inc()
|
||||
|
||||
1_000 iterations of PeriodCounter.inc() in 7.885ms. We care about evaluation of some event more though. Let's
|
||||
test that next.
|
||||
|
||||
## PeriodCounter.inc()
|
||||
|
||||
1_000 iterations of PeriodCounter.inc() in 29.567ms. (when 3 event-listeners are active). I'm not sure exactly
|
||||
what a good performance would be here, but I can say the following: this means when a 1,000 events happen in a second,
|
||||
the period-counter uses up 3% of the budget. A first guess would be: this is good enough.
|
||||
## get_pc_registry()
|
||||
|
||||
getting the pc-registry takes 6615.371ms. (with the default fixtures, which contain
|
||||
|
||||
* 10 projects,
|
||||
* 1000 issues,
|
||||
* 100000 events
|
||||
|
||||
This means (surprisingly) we can take our eye off optimizing this particular part of code (for now), because:
|
||||
|
||||
* in the (expected) production setup where we we cut ingestion and handling in 2 parts, 6s delay on the handling server
|
||||
boot is fine.
|
||||
* in the debugserver (integrated ingestion/handling) we don't expect 100k events; and even if we did a 6s delay on the
|
||||
first event/request is fine.
|
||||
|
||||
Counterpoint: on playground.bugsink.com I just observed 42s to initalize 150k events, which is ~5 times more slow than
|
||||
the above. It's also a "real hiccup". Anyway, there's too many questions about period counter (e.g. how to share
|
||||
across processes, or the consequences of quota) to focus on this particular point first.
|
||||
|
||||
Ways forward once we do decide to improve:
|
||||
|
||||
* regular saving of state (savepoint in time, with "unhandled after") (the regularity of saving is left as an exercise
|
||||
to the reader)
|
||||
* more granular caching/loading, e.g. load per project/issue on demand
|
||||
|
||||
@@ -120,14 +120,6 @@ class Foreman:
|
||||
# stops. (the value of this semaphore is implicitly NUM_WORKERS - active_workers)
|
||||
self.worker_semaphore = threading.Semaphore(self.settings.NUM_WORKERS)
|
||||
|
||||
# "initialize" the application.
|
||||
# this might be in the wrong place, but I want to have it "somewhere" (before the workers start).
|
||||
from bugsink.registry import get_pc_registry
|
||||
with durable_atomic():
|
||||
get_pc_registry()
|
||||
|
||||
self.connection_close() # close the connection that was created as part of `get_pc_registry()`.
|
||||
|
||||
def connection_close(self, using="default"):
|
||||
# (as a method to allow for a single point of documentation)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user