diff --git a/requirements.txt b/requirements.txt index 65badc7..be4a9dd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,7 @@ jsonschema==4.19.* semver==3.0.* django-admin-autocomplete-filter==0.7.* pygments==2.16.* +inotify_simple # testing/development only: requests diff --git a/snappea/datastructures.py b/snappea/datastructures.py index 34173f9..e374b3a 100644 --- a/snappea/datastructures.py +++ b/snappea/datastructures.py @@ -1,24 +1,29 @@ import threading -class SafeDict: +class Workers: """Python's dict is 'probably thread safe', but since this is hard to reason about, and in fact .items() may be unsafe, we just use a lock. See e.g. + * https://stackoverflow.com/questions/6953351/thread-safety-in-pythons-dictionary * https://stackoverflow.com/questions/66556511/is-listdict-items-thread-safe + + Furthermore: we need a way to do a Thread-safe update of what we actually have running, so worker_thread.start() is + tied to the dict-update in a lock. """ def __init__(self): self.d = {} self.lock = threading.Lock() - def set(self, k, v): + def start(self, task_id, worker_thread): with self.lock: - self.d[k] = v + self.d[task_id] = worker_thread + worker_thread.start() - def unset(self, k): + def stopped(self, task_id): with self.lock: - del self.d[k] + del self.d[task_id] def list(self): with self.lock: diff --git a/snappea/decorators.py b/snappea/decorators.py index d94a311..ef55564 100644 --- a/snappea/decorators.py +++ b/snappea/decorators.py @@ -1,5 +1,5 @@ +import uuid import os -import signal import json from django.conf import settings @@ -23,11 +23,12 @@ def shared_task(function): # notes on the lack of immediate_atomic go here Task.objects.create(task_name=name, args=json.dumps(args), kwargs=json.dumps(kwargs)) - with open("/tmp/snappea.pid", "r") as f: - # NOTE perhaps we can let systemd take a role here, it seems to do pids - # TODO: handling of [1] no such file [2] no such process - foreman_pid = int(f.read()) - os.kill(foreman_pid, signal.SIGUSR1) + wakeup_calls_dir = os.path.join('/tmp', 'snappea') + if not os.path.exists(wakeup_calls_dir): + os.mkdir(wakeup_calls_dir, exist_ok=True) + + with open(os.path.join(wakeup_calls_dir, str(uuid.uuid4())), "w"): + pass name = function.__module__ + "." + function.__name__ function.delay = delayed_function diff --git a/snappea/example_tasks.py b/snappea/example_tasks.py index 0211d4f..026d8a5 100644 --- a/snappea/example_tasks.py +++ b/snappea/example_tasks.py @@ -19,3 +19,8 @@ def example_worker(): @shared_task def example_failing_worker(): raise Exception("I am failing") + + +@shared_task +def fast_task(): + pass diff --git a/snappea/foreman.py b/snappea/foreman.py index 1dbb67a..ab88e3c 100644 --- a/snappea/foreman.py +++ b/snappea/foreman.py @@ -1,15 +1,20 @@ +import os +import glob + +import uuid import sys import json -import os import logging import time import signal import threading +from inotify_simple import INotify, flags from sentry_sdk import capture_exception from . import registry from .models import Task -from .datastructures import SafeDict +from .datastructures import Workers +from .example_tasks import * # noqa FOR NOW UNTIL WE implement task discovery GRACEFUL_TIMEOUT = 10 @@ -45,24 +50,26 @@ class Foreman: and 1 read are needed per task). I do not expect this to be in any way limiting (TODO check) The main idea is this endless loop of checking for new work and doing it. This leaves the question of how we "go to - sleep" when there is no more work and how we wake up from that. This is implemented using [1] OS signals across - processes (SIGUSR1 is sent from the 'client' after a Task object is created) and [2] a semaphore (to implement the - wake-up on the main loop). Note that this idea is somewhat incidental though, I also considered polling in a busy - loop or sending characters over a unix socket. - - Python signal handlers suspend whatever is currenlty going on (even other signal handlers). I find it hard to reason - about that. This makes reasoning about what happens if they were to be interrupted both more likely and harder if - they contain a lot of code. Solution: release a semaphore, that some other (sequentially looping) piece of code is - waiting for. + sleep" when there is no more work and how we wake up from that. This is implemented using inotify on a directory + created specifically for that purpose (for each Task a file is dropped there) (and a blocking read on the INotify + object). Note that this idea is somewhat incidental though (0MQ or polling the DB in a busy loop are some + alternatives). """ def __init__(self): - self.workers = SafeDict() + self.workers = Workers() self.stopping = False signal.signal(signal.SIGINT, self.handle_sigint) - signal.signal(signal.SIGUSR1, self.handle_sigusr1) + # We use inotify to wake up the Foreman when a new Task is created. + self.wakeup_calls_dir = os.path.join('/tmp', 'snappea') + if not os.path.exists(self.wakeup_calls_dir): + os.makedirs(self.wakeup_calls_dir, exist_ok=True) + self.wakeup_calls = INotify() + self.wakeup_calls.add_watch(self.wakeup_calls_dir, flags.CREATE) + + # Pid stuff pid = os.getpid() logger.info("Foreman created, my pid is %s", pid) with open("/tmp/snappea.pid", "w") as f: # TODO configurable location @@ -72,7 +79,7 @@ class Foreman: # this is 0 self.signal_semaphore = threading.Semaphore(0) - # Counts the number of available worker threads. When this is 0, create_worker will first wait until a worker + # Counts the number of available worker threads. When this is 0, create_workers will first wait until a worker # stops. (the value of this semaphore is implicitly NUM_WORKERS - active_workers) self.worker_semaphore = threading.Semaphore(NUM_WORKERS) @@ -88,7 +95,7 @@ class Foreman: capture_exception(e) finally: logger.info("worker done: %s", task_id) - self.workers.unset(task_id) + self.workers.stopped(task_id) self.worker_semaphore.release() worker_thread = threading.Thread(target=non_failing_function, args=args, kwargs=kwargs) @@ -97,79 +104,102 @@ class Foreman: # we can, however, set deamon=True which will ensure that an exit of the main thread is the end of the program # (we have implemented manual waiting for GRACEFUL_TIMEOUT separately). worker_thread.daemon = True - worker_thread.start() - self.workers.set(task_id, worker_thread) + + self.workers.start(task_id, worker_thread) return worker_thread - def handle_sigusr1(self, sig, frame): - logger.debug("Received SIGUSR1") - self.signal_semaphore.release() - def handle_sigint(self, signal, frame): - # We take the same approach as with handle SIGUSR1: we set a flag and release a semaphore. The advantage is that - # we don't have to think about e.g. handle_sigint being called while we're handling a previous call to it. The - # (slight) disadvantage is that we need to sprinkle calls to check_for_stopping() in more locations (at least - # after every semaphore is acquired) + # We set a flag and release a semaphore. The advantage is that we don't have to think about e.g. handle_sigint + # being called while we're handling a previous call to it. The (slight) disadvantage is that we need to sprinkle + # calls to check_for_stopping() in more locations (at least after every semaphore is acquired) logger.debug("Received SIGINT") if not self.stopping: # without this if-statement, repeated signals would extend the deadline self.stopping = True self.stop_deadline = time.time() + GRACEFUL_TIMEOUT - # Ensure that anything we might be waiting for is unblocked. A single .release call is enough because after - # every acquire call in our codebase check_for_stopping() is the first thing we do, so the release cannot be - # inadvertently be "used up" by something else. - self.signal_semaphore.release() + # Ensure that anything we might be waiting for is unblocked. A single notification file and .release call is + # enough because after every wakeup_calls.read() / acquire call in our codebase the first thing we do is + # check_for_stopping(), so the release cannot be inadvertently be "used up" by something else. + with open(os.path.join(self.wakeup_calls_dir, str(uuid.uuid4())), "w"): + pass self.worker_semaphore.release() def run_forever(self): - # Before we do our regular sleep-wake-check-do loop, we clear any outstanding work. sigusr1 signals coming in + pre_existing_wakeup_notifications = glob.glob(os.path.join(self.wakeup_calls_dir, "*")) + if len(pre_existing_wakeup_notifications) > 0: + # We clear the wakeup_calls_dir on startup. Not strictly necessary because such files would be cleared by in + # the loop anyway, but it's more efficient to do it first. + logger.info("Clearing %s pre-existing wakeup notifications", len(pre_existing_wakeup_notifications)) + for filename in pre_existing_wakeup_notifications: + os.remove(filename) + + # Before we do our regular sleep-wake-check-do loop, we clear any outstanding work. wake up signals coming in # during this time-period will simply "count up" the semaphore even though the work is already being done. This # is not really a problem, we'll just notice that there is "No task found" an equal amount of times and go into # deep sleep after. logger.info("Checking Task backlog") - while self.create_worker(): - self.check_for_stopping() + while self.create_workers() > 0: + pass logger.info("Task backlog empty now, proceeding to main loop") while True: - logger.debug("Checking (potentially waiting) for SIGUSR1") - self.signal_semaphore.acquire() - self.check_for_stopping() - self.create_worker() + for event in self.wakeup_calls.read(): + # I think we can just do os.unlink(), without being afraid of an error either here or on the side where + # we write the file. I don't have a link to the man page to back this up, but when running "many" calls + # (using 2 processes with each simple tight loop, one creating the files and one deleting them, I did + # not get any errors) + os.unlink(os.path.join(self.wakeup_calls_dir, event.name)) - def create_worker(self): - """returns a boolean 'was anything done?'""" + self.check_for_stopping() # always check after .read() + while self.create_workers() > 0: + pass - logger.debug("Checking (potentially waiting) for available worker slots") - self.worker_semaphore.acquire() - self.check_for_stopping() # always check after .acquire() + def create_workers(self): + """returns the number of workers created (AKA tasks done)""" - task = Task.objects.first() - if task is None: - # Seeing this is expected on-bootup (one after all Peas are dealt with, and once for each SIGUSR1 that was - # received while clearing the initial backlog, but before we went into sleeping mode). If you see it later, - # it's odd. (We could even assert for it) + logger.debug("Querying for tasks") + # We get "a lot" of Tasks at once, rather than one by one. We assume (but did not test) this is more efficient + # than getting the Tasks one by one. It also has consequences for the case where many Tasks (and thus + # wakeup notifications) come in at the same time: in such cases, we may deal with more than one Task for a + # single iteration through run_forever's while loop. The final loop before sleeping will then have a "No task + # found" (and associated useless READ on the database). Why we do this: the .all() approach works with how we + # deal with wake up notifications, namely: whenever we get some, we .read() (and delete) all of them away in one + # swoop. This means a number of notifications will fold into a single iteration through our main run_forever + # loop and thus we need to do more than a single Task. Also, the waisted READ is precisely when there is nothing + # to do (i.e. it's waisting time when we have a lot of time). + + # (we've put _some_ limit on the amount of tasks to get in a single query to avoid memory overflows when there + # is a lot of work. the expected case is: when the snappeaserver has been gone for a while, and work has been + # built up in the backlog; we want to at least be resilient for that case.) + tasks = Task.objects.all()[:100] + + task_i = -1 + for task_i, task in enumerate(tasks): + logger.debug("Creating worker for with task", task.id) + logger.debug("Checking (potentially waiting) for available worker slots") + self.worker_semaphore.acquire() + self.check_for_stopping() # always check after .acquire() + # TODO note on why no transactions are needed (it's just a single call anyway) + # TODO note on the guarantees we provide (not many) + # TODO this bit is the main bit where an exception handler is missing: for both the (potentially failing) DB + # write and the business of looking up tasks by name. + task_id = task.id + function = registry[task.task_name] + args = json.loads(task.args) + kwargs = json.loads(task.kwargs) + + self.check_for_stopping() # check_for_stopping() right before taking on the work + task.delete() + self.run_in_thread(task_id, function, *args, **kwargs) + + if task_i == -1: + # Seeing this is expected on-bootup (one after all Peas are dealt with, and once for each noification that + # was received while clearing the initial backlog, but before we went into "sleeping" (wait for + # notification) mode). See also above, starting with 'We get "a lot" of Tasks at once' logger.info("No task found") - # We acquired the worker_semaphore at the start of this method, but we're not using it. Release immediately! - self.worker_semaphore.release() - return False - - # TODO note on why no transactions are needed (it's just a single call anyway) - # TODO note on the guarantees we provide (not many) - # TODO this bit is the main bit where an exception handler is missing: for both the (potentially failing) DB - # write and the business of looking up tasks by name. - task_id = task.id - function = registry[task.task_name] - args = json.loads(task.args) - kwargs = json.loads(task.kwargs) - - self.check_for_stopping() - task.delete() - self.run_in_thread(task_id, function, *args, **kwargs) - - return True + return task_i + 1 def check_for_stopping(self): if not self.stopping: