From 58d448350a528938d0961b4734ad1f1b7a964d43 Mon Sep 17 00:00:00 2001 From: Klaas van Schelven Date: Thu, 25 Apr 2024 14:56:11 +0200 Subject: [PATCH] Implement MAX sizes for event & envelope --- bugsink/settings.py | 7 ++++ bugsink/streams.py | 44 +++++++++++++++++++++---- bugsink/tests.py | 8 ++--- ingest/management/commands/send_json.py | 1 - ingest/views.py | 39 ++++++++++++++++------ 5 files changed, 78 insertions(+), 21 deletions(-) diff --git a/bugsink/settings.py b/bugsink/settings.py index 5188358..db7de8e 100644 --- a/bugsink/settings.py +++ b/bugsink/settings.py @@ -257,3 +257,10 @@ EMAIL_PORT = 587 EMAIL_USE_TLS = True SERVER_EMAIL = DEFAULT_FROM_EMAIL = 'Klaas van Schelven ' + +BUGSINK = { + # "MAX_EVENT_SIZE": _MEBIBYTE, + # "MAX_EVENT_COMPRESSED_SIZE": 200 * _KIBIBYTE, + # "MAX_ENVELOPE_SIZE": 100 * _MEBIBYTE, + # "MAX_ENVELOPE_COMPRESSED_SIZE": 20 * _MEBIBYTE, +} diff --git a/bugsink/streams.py b/bugsink/streams.py index 8cdd02b..264d573 100644 --- a/bugsink/streams.py +++ b/bugsink/streams.py @@ -1,6 +1,8 @@ import zlib import io +from bugsink.app_settings import get_settings + DEFAULT_CHUNK_SIZE = 8 * 1024 @@ -16,6 +18,10 @@ WBITS_PARAM_FOR_GZIP = 16 + zlib.MAX_WBITS # zlib.MAX_WBITS == 15 WBITS_PARAM_FOR_DEFLATE = -zlib.MAX_WBITS +class MaxLengthExceeded(ValueError): + pass + + def zlib_generator(input_stream, wbits, chunk_size=DEFAULT_CHUNK_SIZE): z = zlib.decompressobj(wbits=wbits) @@ -91,10 +97,16 @@ def compress_with_zlib(input_stream, wbits, chunk_size=DEFAULT_CHUNK_SIZE): class MaxDataReader: - def __init__(self, stream, max_length): + def __init__(self, max_length, stream): self.bytes_read = 0 self.stream = stream - self.max_length = max_length + + if isinstance(max_length, str): # reusing this is a bit of a hack, but leads to readable code at usage + self.max_length = get_settings()[max_length] + self.reason = "%s: %s" % (max_length, self.max_length) + else: + self.max_length = max_length + self.reason = str(max_length) def read(self, size=None): if size is None: @@ -104,22 +116,42 @@ class MaxDataReader: self.bytes_read += len(result) if self.bytes_read > self.max_length: - raise ValueError("Max length (%s) exceeded" % self.max_length) + raise MaxLengthExceeded("Max length (%s) exceeded" % self.reason) return result + def __getattr__(self, attr): + return getattr(self.stream, attr) + class MaxDataWriter: - def __init__(self, stream, max_length): + def __init__(self, max_length, stream): self.bytes_written = 0 self.stream = stream - self.max_length = max_length + + if isinstance(max_length, str): # reusing this is a bit of a hack, but leads to readable code at usage + self.max_length = get_settings()[max_length] + self.reason = "%s: %s" % (max_length, self.max_length) + else: + self.max_length = max_length + self.reason = str(max_length) def write(self, data): self.bytes_written += len(data) if self.bytes_written > self.max_length: - raise ValueError("Max length exceeded") + raise MaxLengthExceeded("Max length (%s) exceeded" % self.reason) self.stream.write(data) + + def __getattr__(self, attr): + return getattr(self.stream, attr) + + +class NullWriter: + def write(self, data): + pass + + def close(self): + pass diff --git a/bugsink/tests.py b/bugsink/tests.py index bd1639c..adecb40 100644 --- a/bugsink/tests.py +++ b/bugsink/tests.py @@ -252,7 +252,7 @@ class StreamsTestCase(RegularTestCase): def test_max_data_reader(self): stream = io.BytesIO(b"hello" * 100) - reader = MaxDataReader(stream, 250) + reader = MaxDataReader(250, stream) for i in range(25): self.assertEquals(b"hellohello", reader.read(10)) @@ -264,13 +264,13 @@ class StreamsTestCase(RegularTestCase): def test_max_data_reader_none_ok(self): stream = io.BytesIO(b"hello" * 10) - reader = MaxDataReader(stream, 250) + reader = MaxDataReader(250, stream) self.assertEquals(b"hello" * 10, reader.read(None)) def test_max_data_reader_none_fail(self): stream = io.BytesIO(b"hello" * 100) - reader = MaxDataReader(stream, 250) + reader = MaxDataReader(250, stream) with self.assertRaises(ValueError) as e: reader.read(None) @@ -279,7 +279,7 @@ class StreamsTestCase(RegularTestCase): def test_max_data_writer(self): stream = io.BytesIO() - writer = MaxDataWriter(stream, 250) + writer = MaxDataWriter(250, stream) for i in range(25): writer.write(b"hellohello") diff --git a/ingest/management/commands/send_json.py b/ingest/management/commands/send_json.py index fc0211d..defbf20 100644 --- a/ingest/management/commands/send_json.py +++ b/ingest/management/commands/send_json.py @@ -161,6 +161,5 @@ class Command(BaseCommand): response.raise_for_status() return True except Exception as e: - raise self.stderr.write("Error %s, %s" % (e, getattr(getattr(e, 'response', None), 'content', None))) return False diff --git a/ingest/views.py b/ingest/views.py index f12119b..e712668 100644 --- a/ingest/views.py +++ b/ingest/views.py @@ -24,7 +24,7 @@ from bugsink.registry import get_pc_registry from bugsink.period_counter import PeriodCounter from bugsink.transaction import immediate_atomic, delay_on_commit from bugsink.exceptions import ViolatedExpectation -from bugsink.streams import content_encoding_reader +from bugsink.streams import content_encoding_reader, MaxDataReader, MaxDataWriter, NullWriter, MaxLengthExceeded from events.models import Event from releases.models import create_release_if_needed @@ -44,6 +44,14 @@ logger = logging.getLogger("bugsink.ingest") @method_decorator(csrf_exempt, name='dispatch') class BaseIngestAPIView(View): + def post(self, request, project_pk=None): + try: + return self._post(request, project_pk) + except MaxLengthExceeded as e: + return JsonResponse({"message": str(e)}, status=HTTP_400_BAD_REQUEST) # NOTE untested behavior + except exceptions.ValidationError as e: + return JsonResponse({"message": str(e)}, status=HTTP_400_BAD_REQUEST) # NOTE untested behavior + @classmethod def get_sentry_key_for_request(cls, request): # we simply pick the first authentication mechanism that matches, rather than raising a SuspiciousOperation as @@ -249,25 +257,29 @@ class BaseIngestAPIView(View): class IngestEventAPIView(BaseIngestAPIView): - def post(self, request, project_pk=None): + def _post(self, request, project_pk=None): project = self.get_project(request, project_pk) - request_data = json.loads(content_encoding_reader(request).read()) + request_data = json.loads( + MaxDataReader("MAX_EVENT_SIZE", content_encoding_reader( + MaxDataReader("MAX_EVENT_COMPRESSED_SIZE", request))).read()) - try: - self.process_event(request_data, project, request) - except exceptions.ValidationError as e: - return JsonResponse({"message": str(e)}, status=HTTP_400_BAD_REQUEST) # NOTE untested behavior + self.process_event(request_data, project, request) return HttpResponse() class IngestEnvelopeAPIView(BaseIngestAPIView): - def post(self, request, project_pk=None): + def _post(self, request, project_pk=None): project = self.get_project(request, project_pk) - parser = StreamingEnvelopeParser(content_encoding_reader(request)) + # Note: wrapping the COMPRESSES_SIZE checks arount request makes it so that when clients do not compress their + # requests, they are still subject to the (smaller) maximums that apply pre-uncompress. This is exactly what we + # want. + parser = StreamingEnvelopeParser( + MaxDataReader("MAX_ENVELOPE_SIZE", content_encoding_reader( + MaxDataReader("MAX_ENVELOPE_COMPRESSED_SIZE", request)))) # TODO: use the envelope_header's DSN if it is available (exact order-of-operations will depend on load-shedding # mechanisms) @@ -275,7 +287,14 @@ class IngestEnvelopeAPIView(BaseIngestAPIView): # envelope_headers["event_id"] is required when type=event per the spec (and takes precedence over the payload's # event_id), so we can relay on it having been set. - for item_headers, output_stream in parser.get_items(lambda item_headers: io.BytesIO()): + def factory(item_headers): + if item_headers.get("type") == "event": + return MaxDataWriter("MAX_EVENT_SIZE", io.BytesIO()) + + # everything else can be discarded; still: we check for size limits + return MaxDataWriter("MAX_EVENT_SIZE", NullWriter()) + + for item_headers, output_stream in parser.get_items(factory): try: item_bytes = output_stream.getvalue() if item_headers.get("type") != "event":