From 94338051ef011e3e3d661df6764fd431627bc8be Mon Sep 17 00:00:00 2001 From: Klaas van Schelven Date: Wed, 16 Apr 2025 16:40:28 +0200 Subject: [PATCH] Snappea Stats: first version --- bugsink/transaction.py | 34 ++++++++++-- snappea/foreman.py | 19 ++++--- snappea/migrations/0005_stat.py | 41 ++++++++++++++ snappea/models.py | 22 ++++++++ snappea/stats.py | 98 +++++++++++++++++++++++++++++++++ 5 files changed, 202 insertions(+), 12 deletions(-) create mode 100644 snappea/migrations/0005_stat.py create mode 100644 snappea/stats.py diff --git a/bugsink/transaction.py b/bugsink/transaction.py index 0762ea7..bbf4f42 100644 --- a/bugsink/transaction.py +++ b/bugsink/transaction.py @@ -9,6 +9,7 @@ from django.db import transaction as django_db_transaction from django.db import DEFAULT_DB_ALIAS performance_logger = logging.getLogger("bugsink.performance.db") +local_storage = threading.local() # as per https://sqlite.org/forum/forumpost/f2427b925c1669b7 (the text below is slightly improved) # @@ -47,10 +48,11 @@ class SemaphoreContext: # the locking mechanism specifically, not actually caused by the DB being busy. raise RuntimeError("Could not acquire immediate_semaphore") - took = (time.time() - t0) * 1_000 + took = time.time() - t0 + inc_stat(self.using, "get_write_lock", took) # textually, slightly misleading since it's not literally "BEGIN IMMEDIATE" we're waiting for here (instead: the # semaphore) but it's clear enough - performance_logger.info(f"{took:6.2f}ms BEGIN IMMEDIATE, A.K.A. get-write-lock") + performance_logger.info(f"{took * 1000:6.2f}ms BEGIN IMMEDIATE, A.K.A. get-write-lock") def __exit__(self, exc_type, exc_value, traceback): immediate_semaphores[self.using].release() @@ -174,8 +176,9 @@ class ImmediateAtomic(SuperDurableAtomic): def __exit__(self, exc_type, exc_value, traceback): super(ImmediateAtomic, self).__exit__(exc_type, exc_value, traceback) - took = (time.time() - self.t0) * 1000 - performance_logger.info(f"{took:6.2f}ms IMMEDIATE transaction") + took = time.time() - self.t0 + inc_stat(self.using, "immediate_transaction", took) + performance_logger.info(f"{took * 1000:6.2f}ms IMMEDIATE transaction") connection = django_db_transaction.get_connection(self.using) if hasattr(connection, "_start_transaction_under_autocommit"): @@ -194,13 +197,13 @@ def immediate_atomic(using=None, savepoint=True, durable=True): # but rather, "are savepoints allowed inside the current context". (The former would imply that it could never be # combined with durable=True, which is not the case.) assert durable, "immediate_atomic should always be used with durable=True" + using = DEFAULT_DB_ALIAS if using is None else using # harmonize to "default" at the top for downstream lookups if callable(using): immediate_atomic = ImmediateAtomic(DEFAULT_DB_ALIAS, savepoint, durable)(using) else: immediate_atomic = ImmediateAtomic(using, savepoint, durable) - using = DEFAULT_DB_ALIAS if using is None else using # https://stackoverflow.com/a/45681273/339144 provides some context on nesting context managers; and how to proceed # if you want to do this with an arbitrary number of context managers. with SemaphoreContext(using), immediate_atomic: @@ -209,3 +212,24 @@ def immediate_atomic(using=None, savepoint=True, durable=True): def delay_on_commit(function, *args, **kwargs): django_db_transaction.on_commit(partial(function.delay, *args, **kwargs)) + + +def inc_stat(using, stat, took): + if using != "default": + return # function signature ready for such stats; not actually collected though + + if not hasattr(local_storage, "stats"): + # In practice, I've found that this needs to be done lazily (module-level doesn't work) + local_storage.stats = {} + + # the rest of the lazyness is because (using, stat) pairs are not always used. + if stat not in local_storage.stats: + # a single set-to-0 is good for our architecture (no "reset" operation needed); we only care about these stats + # in the context of snappea anyway, and in snappea we get a fresh thread (with 0-stat) for each task. + local_storage.stats[stat] = 0 + + local_storage.stats[stat] += took + + +def get_stat(stat): + return getattr(local_storage, "stats", {}).get(stat, 0) diff --git a/snappea/foreman.py b/snappea/foreman.py index a21a669..237f55e 100644 --- a/snappea/foreman.py +++ b/snappea/foreman.py @@ -17,13 +17,14 @@ from django.db import connections from sentry_sdk_extensions import capture_or_log_exception from performance.context_managers import time_to_logger -from bugsink.transaction import durable_atomic +from bugsink.transaction import durable_atomic, get_stat from . import registry from .models import Task from .datastructures import Workers from .settings import get_settings from .utils import run_task_context +from .stats import Stats logger = logging.getLogger("snappea.foreman") @@ -76,6 +77,7 @@ class Foreman: self.settings = get_settings() self.workers = Workers() + self.stats = Stats() self.stopping = False # We deal with both of these in the same way: gracefully terminate. SIGINT is sent (at least) when running this @@ -170,9 +172,8 @@ class Foreman: def run_in_thread(self, task_id, function, *args, **kwargs): # NOTE: we expose args & kwargs in the logs; as it stands no sensitive stuff lives there in our case, but this # is something to keep an eye on - logger.info( - 'Starting %s for "%s.%s" with %s, %s', - short_id(task_id), function.__module__, function.__name__, args, kwargs) + task_name = "%s.%s" % (function.__module__, function.__name__) + logger.info('Starting %s for "%s" with %s, %s', short_id(task_id), task_name, args, kwargs) def non_failing_function(*inner_args, **inner_kwargs): t0 = time.time() @@ -181,11 +182,14 @@ class Foreman: function(*inner_args, **inner_kwargs) except Exception as e: + errored = True # at the top to avoid error-in-handler leaving us with unset variable if sentry_sdk.is_initialized(): # Only for the case where full error is captured to Dogfooded Bugsink, do we want to draw some # attention to this; in the other case the big error in the logs (full traceback) is clear enough. logger.warning("Snappea caught Exception: %s", str(e)) capture_or_log_exception(e, logger) + else: + errored = False finally: # equivalent to the below, but slightly more general (and thus more future-proof). In both cases nothing # happens with already-closed/never opened connections): @@ -194,9 +198,10 @@ class Foreman: for connection in connections.all(): connection.close() - logger.info( - 'Worker done for "%s.%s" in %.3fs', - function.__module__, function.__name__, time.time() - t0) + runtime = time.time() - t0 + logger.info('Worker done for "%s" in %.3fs', task_name, runtime) + self.stats.done( + task_name, runtime, get_stat("get_write_lock"), get_stat("immediate_transaction"), errored) self.workers.stopped(task_id) self.worker_semaphore.release() diff --git a/snappea/migrations/0005_stat.py b/snappea/migrations/0005_stat.py new file mode 100644 index 0000000..55f0352 --- /dev/null +++ b/snappea/migrations/0005_stat.py @@ -0,0 +1,41 @@ +# Generated by Django 4.2.19 on 2025-04-16 13:05 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("snappea", "0004_task_snappea_tas_created_eb0824_idx"), + ] + + operations = [ + migrations.CreateModel( + name="Stat", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("timestamp", models.DateTimeField()), + ("task_name", models.CharField(max_length=255)), + ("task_count", models.PositiveIntegerField(null=True)), + ("done", models.PositiveIntegerField()), + ("errors", models.PositiveIntegerField()), + ("wall_time", models.FloatField()), + ("wait_time", models.FloatField()), + ("write_time", models.FloatField()), + ("max_wall_time", models.FloatField()), + ("max_wait_time", models.FloatField()), + ("max_write_time", models.FloatField()), + ], + options={ + "unique_together": {("timestamp", "task_name")}, + }, + ), + ] diff --git a/snappea/models.py b/snappea/models.py index 08508e9..df1bebd 100644 --- a/snappea/models.py +++ b/snappea/models.py @@ -21,6 +21,28 @@ class Task(models.Model): ] +class Stat(models.Model): + timestamp = models.DateTimeField(null=False) + task_name = models.CharField(max_length=255) + task_count = models.PositiveIntegerField(null=True) # null signifies "too much to count quickly" + done = models.PositiveIntegerField(null=False) + errors = models.PositiveIntegerField(null=False) + wall_time = models.FloatField(null=False) + wait_time = models.FloatField(null=False) + write_time = models.FloatField(null=False) + max_wall_time = models.FloatField(null=False) + max_wait_time = models.FloatField(null=False) + max_write_time = models.FloatField(null=False) + + class Meta: + unique_together = ( + ('timestamp', 'task_name'), # in this order, for efficient deletions + ) + + def __str__(self): + return f"{self.timestamp.isoformat()[:16]} - {self.task_name}" + + def wakeup_server(): wakeup_file = os.path.join(get_settings().WAKEUP_CALLS_DIR, thread_uuid) diff --git a/snappea/stats.py b/snappea/stats.py new file mode 100644 index 0000000..63cb481 --- /dev/null +++ b/snappea/stats.py @@ -0,0 +1,98 @@ +from datetime import datetime, timezone, timedelta +import threading + +from django.db import OperationalError +from django.db.models import Count + +from bugsink.transaction import immediate_atomic +from bugsink.timed_sqlite_backend.base import different_runtime_limit + +from .models import Task, Stat + + +class Stats: + + def __init__(self): + self.lock = threading.Lock() + self.last_write_at = (datetime.now(timezone.utc) - timedelta(minutes=1)).timetuple()[:5] + self.d = {} + + def _ensure_task(self, task_name): + if task_name not in self.d: + self.d[task_name] = { + # we _could_ monitor starts, but my guess is it's not that interesting, since no timings are available, + # you don't actually gain much insight from the count of started tasks, and you're even in danger of + # setting people on the wrong track because start/done will differ slightly over the per-minute buckets. + # "starts": 0, + "done": 0, + "errors": 0, + "wall_time": 0, + "wait_time": 0, + "write_time": 0, + "max_wall_time": 0, + "max_wait_time": 0, + "max_write_time": 0, + } + + def done(self, task_name, wall_time, wait_time, write_time, error): + # we take "did it error" as a param to enable a single call-side path avoid duplicating taking timings call-side + + with self.lock: + self._possibly_write() + + self._ensure_task(task_name) + self.d[task_name]["done"] += 1 + self.d[task_name]["wall_time"] += wall_time + self.d[task_name]["wait_time"] += wait_time + self.d[task_name]["write_time"] += write_time + self.d[task_name]["max_wall_time"] = max(self.d[task_name]["max_wall_time"], wall_time) + self.d[task_name]["max_wait_time"] = max(self.d[task_name]["max_wait_time"], wait_time) + self.d[task_name]["max_write_time"] = max(self.d[task_name]["max_write_time"], write_time) + if error: + self.d[task_name]["errors"] += 1 + + def _possibly_write(self): + # we only write once-a-minute; this means the cost of writing stats is amortized (at least when it matters, i.e. + # under pressure) by approx 1/(60*30); + # + # "edge" cases, in which nothing is written: + # * snappea-shutdown + # * "no new minute" (only happens when there's almost no load, in which case you don't care) + # but low overhead, robustness and a simple impl are more important than after-the-comma precision. + + # we look at the clock ourselves, rather than pass this in, such that the looking at the clock happens only + # once we've grabbed the lock; this ensures our times are monotonicially increasing (assuming no system + # clock funnyness). + now = datetime.now(timezone.utc) + + tup = now.timetuple()[:5] # len("YMDHM") i.e. cut off at minute + if tup != self.last_write_at: + # the Stat w/ timestamp x is for the one-minute bucket from that point in time forwards: + timestamp = datetime(*(self.last_write_at), tzinfo=timezone.utc) + + with immediate_atomic(using="snappea"): # explicit is better than implicit; and we combine read/write here + # having stats is great, but I don't want to hog task-processing too long (which would happen precisely + # when the backlog grows large) + with different_runtime_limit(0.1): + try: + task_counts = Task.objects.values("task_name").annotate(count=Count("task_name")) + except OperationalError as e: + if e.args[0] != "interrupted": + raise + task_counts = None + + task_counts_d = {d['task_name']: d['count'] for d in task_counts} if task_counts else None + stats = [ + Stat( + timestamp=timestamp, + task_name=task_name, + task_count=task_counts_d.get(task_name, 0) if task_counts is not None else None, + **kwargs, + ) for task_name, kwargs in self.d.items() + ] + + Stat.objects.bulk_create(stats) + + # re-init: + self.last_write_at = tup + self.d = {}