Show stats command for snappea

This commit is contained in:
Klaas van Schelven
2025-04-16 21:48:41 +02:00
parent 4cedffc1b7
commit fb45a8ccc9

View File

@@ -1,8 +1,11 @@
from datetime import datetime, timedelta
from django.core.management.base import BaseCommand
from django.utils import timezone
from django.db.models import Sum, Max
from bugsink.transaction import durable_atomic
from snappea.models import Task
from snappea.models import Task, Stat
from events.models import Event
@@ -14,9 +17,15 @@ class Command(BaseCommand):
choices=[
"snappea-queue-size",
"event_count",
"snappea-stats",
"digestion_speed",
],
)
parser.add_argument(
"--task-name",
help="Task name to filter by (snappea-stats only)",
default=None,
)
def handle(self, *args, **options):
stat = options["stat"]
@@ -28,13 +37,18 @@ class Command(BaseCommand):
with durable_atomic():
print(Event.objects.all().count())
if stat == "snappea-stats":
return self.snappea_stats(options["task_name"])
if stat == "digestion_speed":
# NOTE: is this still a valuable stat? snappea_stat for "digest" task is more useful, I'd say. esp. given
# the warning
print("WARNING: when eviction is enabled, the numbers will be wrong")
for window in [1, 10, 30, 60, 5 * 60, 60 * 60, 24 * 60 * 60]:
now = timezone.now()
with durable_atomic():
qs = Event.objects.filter(digested_at__gte=now - timezone.timedelta(seconds=window))
now = timezone.now()
with durable_atomic():
for window in [1, 10, 30, 60, 5 * 60, 60 * 60, 24 * 60 * 60]:
qs = Event.objects.filter(digested_at__gte=now - timedelta(seconds=window))
digested_in_window = qs.count()
# (safe assumption: digestion is done in order; this is "safe enough" (perhaps with a tiny rounding
# error) because of the snappea queue, which is a FIFO queue); the rounding error is in the
@@ -65,3 +79,54 @@ class Command(BaseCommand):
factor = digestion_window / ingestion_window
print(f" ingestion window: {ingestion_window:.1f}s, factor: {factor:.1f}")
def snappea_stats(self, filter_task_name):
now_floor = datetime(*(datetime.now(timezone.utc).timetuple()[:5]), tzinfo=timezone.utc)
print("""past n minutes task AVG MAX
done/s err/s wall wait write wall wait write backlog""")
with durable_atomic(using="snappea"):
for window in [1, 2, 5, 10, 60, 5 * 60, 24 * 60]:
since = now_floor - timedelta(minutes=window)
seconds_in_window = 60 * window
base_qs = Stat.objects.filter(timestamp__gte=since, timestamp__lt=now_floor)
if filter_task_name:
base_qs = base_qs.filter(task_name__endswith="." + filter_task_name)
stats = base_qs.values(
"task_name",
).annotate(
done=Sum("done"),
errors=Sum("errors"),
wall_time=Sum("wall_time"),
wait_time=Sum("wait_time"),
write_time=Sum("write_time"),
max_wall_time=Max("max_wall_time"),
max_wait_time=Max("max_wait_time"),
max_write_time=Max("max_write_time"),
max_task_count=Max("task_count"),
)
for stat in stats:
for field in ["wall_time", "wait_time", "write_time"]:
stat[field] /= stat["done"]
for field in ["done", "errors"]:
stat[field] /= seconds_in_window
task_name = stat["task_name"].split(".")[-1]
# len("send_project_invite_email_new_user") == 34
print(f"{window:<4} "
f"{task_name:<34} "
f"{stat['done']:5.1f} "
f"{stat['errors']:5.1f} "
f"{stat['wall_time']:6.3f} "
f"{stat['wait_time']:6.3f} "
f"{stat['write_time']:6.3f} "
f"{stat['max_wall_time']:6.3f} "
f"{stat['max_wait_time']:6.3f} "
f"{stat['max_write_time']:6.3f} " +
(f"{stat['max_task_count']:9d}" if stat["max_task_count"] else " v. many")
)