Snappea: Document stuff (and factor a few bits out)

This commit is contained in:
Klaas van Schelven
2024-04-19 23:14:19 +02:00
parent b03c2ec088
commit 46f34370f4
3 changed files with 108 additions and 53 deletions

25
snappea/datastructures.py Normal file
View File

@@ -0,0 +1,25 @@
import threading
class SafeDict:
"""Python's dict is 'probably thread safe', but since this is hard to reason about, and in fact .items() may be
unsafe, we just use a lock. See e.g.
* https://stackoverflow.com/questions/6953351/thread-safety-in-pythons-dictionary
* https://stackoverflow.com/questions/66556511/is-listdict-items-thread-safe
"""
def __init__(self):
self.d = {}
self.lock = threading.Lock()
def set(self, k, v):
with self.lock:
self.d[k] = v
def unset(self, k):
with self.lock:
del self.d[k]
def list(self):
with self.lock:
return list(self.d.items())

21
snappea/example_tasks.py Normal file
View File

@@ -0,0 +1,21 @@
import time
import random
import uuid
import logging
from .decorators import shared_task
logger = logging.getLogger("snappea.foreman")
@shared_task
def example_worker():
me = str(uuid.uuid4())
logger.info("example worker started %s", me)
time.sleep(random.random() * 10)
logger.info("example worker stopped %s", me)
@shared_task
def example_failing_worker():
raise Exception("I am failing")

View File

@@ -9,7 +9,7 @@ from sentry_sdk import capture_exception
from . import registry
from .models import Task
from .decorators import shared_task
from .datastructures import SafeDict
GRACEFUL_TIMEOUT = 10
@@ -19,46 +19,42 @@ NUM_WORKERS = 4
logger = logging.getLogger("snappea.foreman")
@shared_task
def example_worker():
import random
import uuid
me = str(uuid.uuid4())
logger.info("example worker started %s", me)
time.sleep(random.random() * 10)
logger.info("example worker stopped %s", me)
@shared_task
def example_failing_worker():
raise Exception("I am failing")
class SafeDict:
"""Python's dict is 'probably thread safe', but since this is hard to reason about, and in fact .items() may be
unsafe, we just use a lock. See e.g.
* https://stackoverflow.com/questions/6953351/thread-safety-in-pythons-dictionary
* https://stackoverflow.com/questions/66556511/is-listdict-items-thread-safe
"""
def __init__(self):
self.d = {}
self.lock = threading.Lock()
def set(self, k, v):
with self.lock:
self.d[k] = v
def unset(self, k):
with self.lock:
del self.d[k]
def list(self):
with self.lock:
return list(self.d.items())
class Foreman:
"""
The Foreman starts workers, as (threading.Thread) threads, based on snappea.Task objects it finds in the sqlite
database for that purpose (we use the DB as a simple MQ, we call this DBasMQ).
The Foreman code itself is single-threaded, so it can be easily reasoned about.
We provide an at-most-once guarantee: Tasks that are picked up are removed from the DB before the works starts.
This fits with our model of background tasks, namely
* things that you want to happen
* but you don't want to wait for them in the request-response loop
* getting them done for sure is not more important than in the server itself (which is also subject to os.kill)
* you don't care about the answer as part of your code (of course, the DB can be affected)
The read/writes to the DB-as-MQ are as such:
* "some other process" (the HTTP server) writes _new_ Tasks
* the Foreman reads Tasks (determines the workload)
* the Foreman deletes (a write operation) Tasks (when done)
Because the Foreman has a single sequential loop, and because it is the only thing that _updates_ tasks, there is no
need for a locking model of some sort. sqlite locks the whole DB on-write, of course, but in this case we don't use
that as a feature. The throughput of our MQ is limited by the speed with which we can do writes to sqlite (2 writes
and 1 read are needed per task). I do not expect this to be in any way limiting (TODO check)
The main idea is this endless loop of checking for new work and doing it. This leaves the question of how we "go to
sleep" when there is no more work and how we wake up from that. This is implemented using [1] OS signals across
processes (SIGUSR1 is sent from the 'client' after a Task object is created) and [2] a semaphore (to implement the
wake-up on the main loop). Note that this idea is somewhat incidental though, I also considered polling in a busy
loop or sending characters over a unix socket.
Python signal handlers suspend whatever is currenlty going on (even other signal handlers). I find it hard to reason
about that. This makes reasoning about what happens if they were to be interrupted both more likely and harder if
they contain a lot of code. Solution: release a semaphore, that some other (sequentially looping) piece of code is
waiting for.
"""
def __init__(self):
self.workers = SafeDict()
@@ -99,31 +95,43 @@ class Foreman:
# killing threads seems to be 'hard'(https://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thre)
# we can, however, set deamon=True which will ensure that an exit of the main thread is the end of the program
# (and then implement some manual waiting separately).
# (we have implemented manual waiting for GRACEFUL_TIMEOUT separately).
worker_thread.daemon = True
worker_thread.start()
self.workers.set(task_id, worker_thread)
return worker_thread
def handle_sigint(self, signal, frame):
# Note: a more direct way is available (just put the stopping in the sigint) but I think the indirection is
# easier to think about.
logger.debug("Received SIGINT")
self.stopping = True
# ensure that anything we might be waiting for is unblocked
self.signal_semaphore.release()
self.worker_semaphore.release()
def handle_sigusr1(self, sig, frame):
logger.debug("Received SIGUSR1")
self.signal_semaphore.release()
def handle_sigint(self, signal, frame):
# We take the same approach as with handle SIGUSR1: we set a flag and release a semaphore. The advantage is that
# we don't have to think about e.g. handle_sigint being called while we're handling a previous call to it. The
# (slight) disadvantage is that we need to sprinkle calls to check_for_stopping() in more locations (at least
# after every semaphore is acquired)
logger.debug("Received SIGINT")
if not self.stopping: # without this if-statement, repeated signals would extend the deadline
self.stopping = True
self.stop_deadline = time.time() + GRACEFUL_TIMEOUT
# Ensure that anything we might be waiting for is unblocked. A single .release call is enough because after
# every acquire call in our codebase check_for_stopping() is the first thing we do, so the release cannot be
# inadvertently be "used up" by something else.
self.signal_semaphore.release()
self.worker_semaphore.release()
def run_forever(self):
# Before we do our regular sleep-wake-check-do loop, we clear any outstanding work. sigusr1 signals coming in
# during this time-period will simply "count up" the semaphore even though the work is already being done. This
# is not really a problem, we'll just notice that there is "No task found" an equal amount of times and go into
# deep sleep after.
logger.info("Checking Task backlog")
while self.create_worker():
self.check_for_stopping()
logger.info("Task backlog empty now, proceeding to main loop")
while True:
logger.debug("Checking (potentially waiting) for SIGUSR1")
self.signal_semaphore.acquire()
@@ -168,10 +176,11 @@ class Foreman:
return
logger.info("Foreman stopping")
deadline = time.time() + GRACEFUL_TIMEOUT
# Loop over all tasks, waiting for them to finish. If they don't finish in time (GRACEFUL_TIMEOUT), we'll kill
# them with a system-exit.
for task_id, worker_thread in self.workers.list():
if worker_thread.is_alive():
time_left = deadline - time.time()
time_left = self.stop_deadline - time.time()
if time_left > 0:
logger.info("Waiting for worker %s", task_id)
worker_thread.join(time_left)