Snappea: settings configurable

This commit is contained in:
Klaas van Schelven
2024-04-23 13:32:33 +02:00
parent 1ef7458619
commit 9adf971a14
3 changed files with 19 additions and 19 deletions

View File

@@ -246,8 +246,9 @@ if SENTRY_DSN is not None:
BASE_URL = "http://bugsink:9000" # no trailing slash
SITE_TITLE = "Bugsink" # you can customize this as e.g. "My Bugsink" or "Bugsink for My Company"
SNAPPEA_TASK_ALWAYS_EAGER = True
SNAPPEA = {
"TASK_ALWAYS_EAGER": True,
}
POSTMARK_API_KEY = os.getenv('POSTMARK_API_KEY')

View File

@@ -1,15 +1,15 @@
import os
import json
from django.conf import settings
from . import registry, thread_uuid
from .models import Task
from .settings import get_settings
def shared_task(function):
def delayed_function(*args, **kwargs):
if settings.SNAPPEA_TASK_ALWAYS_EAGER:
if get_settings().TASK_ALWAYS_EAGER:
# To ensure that args and kwargs are json-serializable in automatic tests and when using the single-server
# setup we do these 2 .dumps calls here. The cost should be negligible in general (args/kwargs are small),
# so even when single-server is used as a "production" server this shouldn't be a problem.

View File

@@ -14,11 +14,7 @@ from sentry_sdk import capture_exception
from . import registry
from .models import Task
from .datastructures import Workers
TASK_QS_SIZE = 100
GRACEFUL_TIMEOUT = .1
NUM_WORKERS = 4
from .settings import get_settings
logger = logging.getLogger("snappea.foreman")
@@ -67,6 +63,7 @@ class Foreman:
def __init__(self):
threading.current_thread().name = "FOREMAN"
self.settings = get_settings()
self.workers = Workers()
self.stopping = False
@@ -84,7 +81,7 @@ class Foreman:
pid = os.getpid()
logger.info(" ========= SNAPPEA =========")
logger.info("Startup: my pid is %s", pid)
logger.info("Startup: pid is %s", pid)
with open("/tmp/snappea.pid", "w") as f: # TODO configurable location
f.write(str(pid))
@@ -94,7 +91,7 @@ class Foreman:
# Counts the number of available worker threads. When this is 0, create_workers will first wait until a worker
# stops. (the value of this semaphore is implicitly NUM_WORKERS - active_workers)
self.worker_semaphore = threading.Semaphore(NUM_WORKERS)
self.worker_semaphore = threading.Semaphore(self.settings.NUM_WORKERS)
def run_in_thread(self, task_id, function, *args, **kwargs):
# NOTE: we expose args & kwargs in the logs; as it stands no sensitive stuff lives there in our case, but this
@@ -135,7 +132,7 @@ class Foreman:
if not self.stopping: # without this if-statement, repeated signals would extend the deadline
self.stopping = True
self.stop_deadline = time.time() + GRACEFUL_TIMEOUT
self.stop_deadline = time.time() + self.settings.GRACEFUL_TIMEOUT
# Ensure that anything we might be waiting for is unblocked. A single notification file and .release call is
# enough because after every wakeup_calls.read() / acquire call in our codebase the first thing we do is
@@ -158,8 +155,8 @@ class Foreman:
# is not really a problem, we'll just notice that there is "No task found" an equal amount of times and go into
# deep sleep after.
logger.info("Startup: Clearing Task backlog")
while self.create_workers() == TASK_QS_SIZE:
# `== TASK_QS_SIZE` means we may have more work to do, because the [:TASK_QS_SIZE] slice may be the reason
while self.create_workers() == self.settings.TASK_QS_LIMIT:
# `== TASK_QS_LIMIT` means we may have more work to do, because the [:TASK_QS_LIMIT] slice may be the reason
# no more work is found. We keep doing this until we're sure there is no more work to do.
pass
@@ -175,8 +172,8 @@ class Foreman:
os.unlink(os.path.join(self.wakeup_calls_dir, event.name))
self.check_for_stopping() # always check after .read()
while self.create_workers() == TASK_QS_SIZE:
pass # `== TASK_QS_SIZE`: as documented above
while self.create_workers() == self.settings.TASK_QS_LIMIT:
pass # `== TASK_QS_LIMIT`: as documented above
def create_workers(self):
"""returns the number of workers created (AKA tasks done)"""
@@ -195,7 +192,7 @@ class Foreman:
# (we've put _some_ limit on the amount of tasks to get in a single query to avoid memory overflows when there
# is a lot of work. the expected case is: when the snappeaserver has been gone for a while, and work has been
# built up in the backlog; we want to at least be resilient for that case.)
tasks = Task.objects.all()[:TASK_QS_SIZE]
tasks = Task.objects.all()[:self.settings.TASK_QS_LIMIT]
task_i = -1
for task_i, task in enumerate(tasks):
@@ -204,6 +201,7 @@ class Foreman:
self.worker_semaphore.acquire()
logger.debug("Main loop: Worker slot available")
self.check_for_stopping() # always check after .acquire()
# TODO note on why no transactions are needed (it's just a single call anyway)
# TODO note on the guarantees we provide (not many)
# TODO this bit is the main bit where an exception handler is missing: for both the (potentially failing) DB
@@ -239,12 +237,13 @@ class Foreman:
if worker_thread.is_alive():
logger.info(
"Stopping: %s did not die in %.1fs, proceeding to kill",
short_id(task_id), GRACEFUL_TIMEOUT)
short_id(task_id), self.settings.GRACEFUL_TIMEOUT)
else:
# for the logs distinguishing between "explicit join targets" and "not dead yet" is irrelevant, we
# use the same format.
logger.info(
"Stopping: %s did not die in %.1fs, proceeding to kill", short_id(task_id), GRACEFUL_TIMEOUT)
"Stopping: %s did not die in %.1fs, proceeding to kill",
short_id(task_id), self.settings.GRACEFUL_TIMEOUT)
logger.info("Stopping: EXIT")