Snappea Stats: first version

This commit is contained in:
Klaas van Schelven
2025-04-16 16:40:28 +02:00
parent 67f769d3e5
commit 94338051ef
5 changed files with 202 additions and 12 deletions

View File

@@ -9,6 +9,7 @@ from django.db import transaction as django_db_transaction
from django.db import DEFAULT_DB_ALIAS from django.db import DEFAULT_DB_ALIAS
performance_logger = logging.getLogger("bugsink.performance.db") performance_logger = logging.getLogger("bugsink.performance.db")
local_storage = threading.local()
# as per https://sqlite.org/forum/forumpost/f2427b925c1669b7 (the text below is slightly improved) # as per https://sqlite.org/forum/forumpost/f2427b925c1669b7 (the text below is slightly improved)
# #
@@ -47,10 +48,11 @@ class SemaphoreContext:
# the locking mechanism specifically, not actually caused by the DB being busy. # the locking mechanism specifically, not actually caused by the DB being busy.
raise RuntimeError("Could not acquire immediate_semaphore") raise RuntimeError("Could not acquire immediate_semaphore")
took = (time.time() - t0) * 1_000 took = time.time() - t0
inc_stat(self.using, "get_write_lock", took)
# textually, slightly misleading since it's not literally "BEGIN IMMEDIATE" we're waiting for here (instead: the # textually, slightly misleading since it's not literally "BEGIN IMMEDIATE" we're waiting for here (instead: the
# semaphore) but it's clear enough # semaphore) but it's clear enough
performance_logger.info(f"{took:6.2f}ms BEGIN IMMEDIATE, A.K.A. get-write-lock") performance_logger.info(f"{took * 1000:6.2f}ms BEGIN IMMEDIATE, A.K.A. get-write-lock")
def __exit__(self, exc_type, exc_value, traceback): def __exit__(self, exc_type, exc_value, traceback):
immediate_semaphores[self.using].release() immediate_semaphores[self.using].release()
@@ -174,8 +176,9 @@ class ImmediateAtomic(SuperDurableAtomic):
def __exit__(self, exc_type, exc_value, traceback): def __exit__(self, exc_type, exc_value, traceback):
super(ImmediateAtomic, self).__exit__(exc_type, exc_value, traceback) super(ImmediateAtomic, self).__exit__(exc_type, exc_value, traceback)
took = (time.time() - self.t0) * 1000 took = time.time() - self.t0
performance_logger.info(f"{took:6.2f}ms IMMEDIATE transaction") inc_stat(self.using, "immediate_transaction", took)
performance_logger.info(f"{took * 1000:6.2f}ms IMMEDIATE transaction")
connection = django_db_transaction.get_connection(self.using) connection = django_db_transaction.get_connection(self.using)
if hasattr(connection, "_start_transaction_under_autocommit"): if hasattr(connection, "_start_transaction_under_autocommit"):
@@ -194,13 +197,13 @@ def immediate_atomic(using=None, savepoint=True, durable=True):
# but rather, "are savepoints allowed inside the current context". (The former would imply that it could never be # but rather, "are savepoints allowed inside the current context". (The former would imply that it could never be
# combined with durable=True, which is not the case.) # combined with durable=True, which is not the case.)
assert durable, "immediate_atomic should always be used with durable=True" assert durable, "immediate_atomic should always be used with durable=True"
using = DEFAULT_DB_ALIAS if using is None else using # harmonize to "default" at the top for downstream lookups
if callable(using): if callable(using):
immediate_atomic = ImmediateAtomic(DEFAULT_DB_ALIAS, savepoint, durable)(using) immediate_atomic = ImmediateAtomic(DEFAULT_DB_ALIAS, savepoint, durable)(using)
else: else:
immediate_atomic = ImmediateAtomic(using, savepoint, durable) immediate_atomic = ImmediateAtomic(using, savepoint, durable)
using = DEFAULT_DB_ALIAS if using is None else using
# https://stackoverflow.com/a/45681273/339144 provides some context on nesting context managers; and how to proceed # https://stackoverflow.com/a/45681273/339144 provides some context on nesting context managers; and how to proceed
# if you want to do this with an arbitrary number of context managers. # if you want to do this with an arbitrary number of context managers.
with SemaphoreContext(using), immediate_atomic: with SemaphoreContext(using), immediate_atomic:
@@ -209,3 +212,24 @@ def immediate_atomic(using=None, savepoint=True, durable=True):
def delay_on_commit(function, *args, **kwargs): def delay_on_commit(function, *args, **kwargs):
django_db_transaction.on_commit(partial(function.delay, *args, **kwargs)) django_db_transaction.on_commit(partial(function.delay, *args, **kwargs))
def inc_stat(using, stat, took):
if using != "default":
return # function signature ready for such stats; not actually collected though
if not hasattr(local_storage, "stats"):
# In practice, I've found that this needs to be done lazily (module-level doesn't work)
local_storage.stats = {}
# the rest of the lazyness is because (using, stat) pairs are not always used.
if stat not in local_storage.stats:
# a single set-to-0 is good for our architecture (no "reset" operation needed); we only care about these stats
# in the context of snappea anyway, and in snappea we get a fresh thread (with 0-stat) for each task.
local_storage.stats[stat] = 0
local_storage.stats[stat] += took
def get_stat(stat):
return getattr(local_storage, "stats", {}).get(stat, 0)

View File

@@ -17,13 +17,14 @@ from django.db import connections
from sentry_sdk_extensions import capture_or_log_exception from sentry_sdk_extensions import capture_or_log_exception
from performance.context_managers import time_to_logger from performance.context_managers import time_to_logger
from bugsink.transaction import durable_atomic from bugsink.transaction import durable_atomic, get_stat
from . import registry from . import registry
from .models import Task from .models import Task
from .datastructures import Workers from .datastructures import Workers
from .settings import get_settings from .settings import get_settings
from .utils import run_task_context from .utils import run_task_context
from .stats import Stats
logger = logging.getLogger("snappea.foreman") logger = logging.getLogger("snappea.foreman")
@@ -76,6 +77,7 @@ class Foreman:
self.settings = get_settings() self.settings = get_settings()
self.workers = Workers() self.workers = Workers()
self.stats = Stats()
self.stopping = False self.stopping = False
# We deal with both of these in the same way: gracefully terminate. SIGINT is sent (at least) when running this # We deal with both of these in the same way: gracefully terminate. SIGINT is sent (at least) when running this
@@ -170,9 +172,8 @@ class Foreman:
def run_in_thread(self, task_id, function, *args, **kwargs): def run_in_thread(self, task_id, function, *args, **kwargs):
# NOTE: we expose args & kwargs in the logs; as it stands no sensitive stuff lives there in our case, but this # NOTE: we expose args & kwargs in the logs; as it stands no sensitive stuff lives there in our case, but this
# is something to keep an eye on # is something to keep an eye on
logger.info( task_name = "%s.%s" % (function.__module__, function.__name__)
'Starting %s for "%s.%s" with %s, %s', logger.info('Starting %s for "%s" with %s, %s', short_id(task_id), task_name, args, kwargs)
short_id(task_id), function.__module__, function.__name__, args, kwargs)
def non_failing_function(*inner_args, **inner_kwargs): def non_failing_function(*inner_args, **inner_kwargs):
t0 = time.time() t0 = time.time()
@@ -181,11 +182,14 @@ class Foreman:
function(*inner_args, **inner_kwargs) function(*inner_args, **inner_kwargs)
except Exception as e: except Exception as e:
errored = True # at the top to avoid error-in-handler leaving us with unset variable
if sentry_sdk.is_initialized(): if sentry_sdk.is_initialized():
# Only for the case where full error is captured to Dogfooded Bugsink, do we want to draw some # Only for the case where full error is captured to Dogfooded Bugsink, do we want to draw some
# attention to this; in the other case the big error in the logs (full traceback) is clear enough. # attention to this; in the other case the big error in the logs (full traceback) is clear enough.
logger.warning("Snappea caught Exception: %s", str(e)) logger.warning("Snappea caught Exception: %s", str(e))
capture_or_log_exception(e, logger) capture_or_log_exception(e, logger)
else:
errored = False
finally: finally:
# equivalent to the below, but slightly more general (and thus more future-proof). In both cases nothing # equivalent to the below, but slightly more general (and thus more future-proof). In both cases nothing
# happens with already-closed/never opened connections): # happens with already-closed/never opened connections):
@@ -194,9 +198,10 @@ class Foreman:
for connection in connections.all(): for connection in connections.all():
connection.close() connection.close()
logger.info( runtime = time.time() - t0
'Worker done for "%s.%s" in %.3fs', logger.info('Worker done for "%s" in %.3fs', task_name, runtime)
function.__module__, function.__name__, time.time() - t0) self.stats.done(
task_name, runtime, get_stat("get_write_lock"), get_stat("immediate_transaction"), errored)
self.workers.stopped(task_id) self.workers.stopped(task_id)
self.worker_semaphore.release() self.worker_semaphore.release()

View File

@@ -0,0 +1,41 @@
# Generated by Django 4.2.19 on 2025-04-16 13:05
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("snappea", "0004_task_snappea_tas_created_eb0824_idx"),
]
operations = [
migrations.CreateModel(
name="Stat",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("timestamp", models.DateTimeField()),
("task_name", models.CharField(max_length=255)),
("task_count", models.PositiveIntegerField(null=True)),
("done", models.PositiveIntegerField()),
("errors", models.PositiveIntegerField()),
("wall_time", models.FloatField()),
("wait_time", models.FloatField()),
("write_time", models.FloatField()),
("max_wall_time", models.FloatField()),
("max_wait_time", models.FloatField()),
("max_write_time", models.FloatField()),
],
options={
"unique_together": {("timestamp", "task_name")},
},
),
]

View File

@@ -21,6 +21,28 @@ class Task(models.Model):
] ]
class Stat(models.Model):
timestamp = models.DateTimeField(null=False)
task_name = models.CharField(max_length=255)
task_count = models.PositiveIntegerField(null=True) # null signifies "too much to count quickly"
done = models.PositiveIntegerField(null=False)
errors = models.PositiveIntegerField(null=False)
wall_time = models.FloatField(null=False)
wait_time = models.FloatField(null=False)
write_time = models.FloatField(null=False)
max_wall_time = models.FloatField(null=False)
max_wait_time = models.FloatField(null=False)
max_write_time = models.FloatField(null=False)
class Meta:
unique_together = (
('timestamp', 'task_name'), # in this order, for efficient deletions
)
def __str__(self):
return f"{self.timestamp.isoformat()[:16]} - {self.task_name}"
def wakeup_server(): def wakeup_server():
wakeup_file = os.path.join(get_settings().WAKEUP_CALLS_DIR, thread_uuid) wakeup_file = os.path.join(get_settings().WAKEUP_CALLS_DIR, thread_uuid)

98
snappea/stats.py Normal file
View File

@@ -0,0 +1,98 @@
from datetime import datetime, timezone, timedelta
import threading
from django.db import OperationalError
from django.db.models import Count
from bugsink.transaction import immediate_atomic
from bugsink.timed_sqlite_backend.base import different_runtime_limit
from .models import Task, Stat
class Stats:
def __init__(self):
self.lock = threading.Lock()
self.last_write_at = (datetime.now(timezone.utc) - timedelta(minutes=1)).timetuple()[:5]
self.d = {}
def _ensure_task(self, task_name):
if task_name not in self.d:
self.d[task_name] = {
# we _could_ monitor starts, but my guess is it's not that interesting, since no timings are available,
# you don't actually gain much insight from the count of started tasks, and you're even in danger of
# setting people on the wrong track because start/done will differ slightly over the per-minute buckets.
# "starts": 0,
"done": 0,
"errors": 0,
"wall_time": 0,
"wait_time": 0,
"write_time": 0,
"max_wall_time": 0,
"max_wait_time": 0,
"max_write_time": 0,
}
def done(self, task_name, wall_time, wait_time, write_time, error):
# we take "did it error" as a param to enable a single call-side path avoid duplicating taking timings call-side
with self.lock:
self._possibly_write()
self._ensure_task(task_name)
self.d[task_name]["done"] += 1
self.d[task_name]["wall_time"] += wall_time
self.d[task_name]["wait_time"] += wait_time
self.d[task_name]["write_time"] += write_time
self.d[task_name]["max_wall_time"] = max(self.d[task_name]["max_wall_time"], wall_time)
self.d[task_name]["max_wait_time"] = max(self.d[task_name]["max_wait_time"], wait_time)
self.d[task_name]["max_write_time"] = max(self.d[task_name]["max_write_time"], write_time)
if error:
self.d[task_name]["errors"] += 1
def _possibly_write(self):
# we only write once-a-minute; this means the cost of writing stats is amortized (at least when it matters, i.e.
# under pressure) by approx 1/(60*30);
#
# "edge" cases, in which nothing is written:
# * snappea-shutdown
# * "no new minute" (only happens when there's almost no load, in which case you don't care)
# but low overhead, robustness and a simple impl are more important than after-the-comma precision.
# we look at the clock ourselves, rather than pass this in, such that the looking at the clock happens only
# once we've grabbed the lock; this ensures our times are monotonicially increasing (assuming no system
# clock funnyness).
now = datetime.now(timezone.utc)
tup = now.timetuple()[:5] # len("YMDHM") i.e. cut off at minute
if tup != self.last_write_at:
# the Stat w/ timestamp x is for the one-minute bucket from that point in time forwards:
timestamp = datetime(*(self.last_write_at), tzinfo=timezone.utc)
with immediate_atomic(using="snappea"): # explicit is better than implicit; and we combine read/write here
# having stats is great, but I don't want to hog task-processing too long (which would happen precisely
# when the backlog grows large)
with different_runtime_limit(0.1):
try:
task_counts = Task.objects.values("task_name").annotate(count=Count("task_name"))
except OperationalError as e:
if e.args[0] != "interrupted":
raise
task_counts = None
task_counts_d = {d['task_name']: d['count'] for d in task_counts} if task_counts else None
stats = [
Stat(
timestamp=timestamp,
task_name=task_name,
task_count=task_counts_d.get(task_name, 0) if task_counts is not None else None,
**kwargs,
) for task_name, kwargs in self.d.items()
]
Stat.objects.bulk_create(stats)
# re-init:
self.last_write_at = tup
self.d = {}