diff --git a/bugsink/settings.py b/bugsink/settings.py index 6019fea..17a4b17 100644 --- a/bugsink/settings.py +++ b/bugsink/settings.py @@ -246,8 +246,9 @@ if SENTRY_DSN is not None: BASE_URL = "http://bugsink:9000" # no trailing slash SITE_TITLE = "Bugsink" # you can customize this as e.g. "My Bugsink" or "Bugsink for My Company" - -SNAPPEA_TASK_ALWAYS_EAGER = True +SNAPPEA = { + "TASK_ALWAYS_EAGER": True, +} POSTMARK_API_KEY = os.getenv('POSTMARK_API_KEY') diff --git a/snappea/decorators.py b/snappea/decorators.py index 34a0682..429bfb0 100644 --- a/snappea/decorators.py +++ b/snappea/decorators.py @@ -1,15 +1,15 @@ import os import json -from django.conf import settings from . import registry, thread_uuid from .models import Task +from .settings import get_settings def shared_task(function): def delayed_function(*args, **kwargs): - if settings.SNAPPEA_TASK_ALWAYS_EAGER: + if get_settings().TASK_ALWAYS_EAGER: # To ensure that args and kwargs are json-serializable in automatic tests and when using the single-server # setup we do these 2 .dumps calls here. The cost should be negligible in general (args/kwargs are small), # so even when single-server is used as a "production" server this shouldn't be a problem. diff --git a/snappea/foreman.py b/snappea/foreman.py index 5a3738c..18300d2 100644 --- a/snappea/foreman.py +++ b/snappea/foreman.py @@ -14,11 +14,7 @@ from sentry_sdk import capture_exception from . import registry from .models import Task from .datastructures import Workers - - -TASK_QS_SIZE = 100 -GRACEFUL_TIMEOUT = .1 -NUM_WORKERS = 4 +from .settings import get_settings logger = logging.getLogger("snappea.foreman") @@ -67,6 +63,7 @@ class Foreman: def __init__(self): threading.current_thread().name = "FOREMAN" + self.settings = get_settings() self.workers = Workers() self.stopping = False @@ -84,7 +81,7 @@ class Foreman: pid = os.getpid() logger.info(" ========= SNAPPEA =========") - logger.info("Startup: my pid is %s", pid) + logger.info("Startup: pid is %s", pid) with open("/tmp/snappea.pid", "w") as f: # TODO configurable location f.write(str(pid)) @@ -94,7 +91,7 @@ class Foreman: # Counts the number of available worker threads. When this is 0, create_workers will first wait until a worker # stops. (the value of this semaphore is implicitly NUM_WORKERS - active_workers) - self.worker_semaphore = threading.Semaphore(NUM_WORKERS) + self.worker_semaphore = threading.Semaphore(self.settings.NUM_WORKERS) def run_in_thread(self, task_id, function, *args, **kwargs): # NOTE: we expose args & kwargs in the logs; as it stands no sensitive stuff lives there in our case, but this @@ -135,7 +132,7 @@ class Foreman: if not self.stopping: # without this if-statement, repeated signals would extend the deadline self.stopping = True - self.stop_deadline = time.time() + GRACEFUL_TIMEOUT + self.stop_deadline = time.time() + self.settings.GRACEFUL_TIMEOUT # Ensure that anything we might be waiting for is unblocked. A single notification file and .release call is # enough because after every wakeup_calls.read() / acquire call in our codebase the first thing we do is @@ -158,8 +155,8 @@ class Foreman: # is not really a problem, we'll just notice that there is "No task found" an equal amount of times and go into # deep sleep after. logger.info("Startup: Clearing Task backlog") - while self.create_workers() == TASK_QS_SIZE: - # `== TASK_QS_SIZE` means we may have more work to do, because the [:TASK_QS_SIZE] slice may be the reason + while self.create_workers() == self.settings.TASK_QS_LIMIT: + # `== TASK_QS_LIMIT` means we may have more work to do, because the [:TASK_QS_LIMIT] slice may be the reason # no more work is found. We keep doing this until we're sure there is no more work to do. pass @@ -175,8 +172,8 @@ class Foreman: os.unlink(os.path.join(self.wakeup_calls_dir, event.name)) self.check_for_stopping() # always check after .read() - while self.create_workers() == TASK_QS_SIZE: - pass # `== TASK_QS_SIZE`: as documented above + while self.create_workers() == self.settings.TASK_QS_LIMIT: + pass # `== TASK_QS_LIMIT`: as documented above def create_workers(self): """returns the number of workers created (AKA tasks done)""" @@ -195,7 +192,7 @@ class Foreman: # (we've put _some_ limit on the amount of tasks to get in a single query to avoid memory overflows when there # is a lot of work. the expected case is: when the snappeaserver has been gone for a while, and work has been # built up in the backlog; we want to at least be resilient for that case.) - tasks = Task.objects.all()[:TASK_QS_SIZE] + tasks = Task.objects.all()[:self.settings.TASK_QS_LIMIT] task_i = -1 for task_i, task in enumerate(tasks): @@ -204,6 +201,7 @@ class Foreman: self.worker_semaphore.acquire() logger.debug("Main loop: Worker slot available") self.check_for_stopping() # always check after .acquire() + # TODO note on why no transactions are needed (it's just a single call anyway) # TODO note on the guarantees we provide (not many) # TODO this bit is the main bit where an exception handler is missing: for both the (potentially failing) DB @@ -239,12 +237,13 @@ class Foreman: if worker_thread.is_alive(): logger.info( "Stopping: %s did not die in %.1fs, proceeding to kill", - short_id(task_id), GRACEFUL_TIMEOUT) + short_id(task_id), self.settings.GRACEFUL_TIMEOUT) else: # for the logs distinguishing between "explicit join targets" and "not dead yet" is irrelevant, we # use the same format. logger.info( - "Stopping: %s did not die in %.1fs, proceeding to kill", short_id(task_id), GRACEFUL_TIMEOUT) + "Stopping: %s did not die in %.1fs, proceeding to kill", + short_id(task_id), self.settings.GRACEFUL_TIMEOUT) logger.info("Stopping: EXIT")