From 391e22bcf0d3fefa69343a1ef1b4350571f93292 Mon Sep 17 00:00:00 2001 From: Klaas van Schelven Date: Tue, 4 Nov 2025 13:55:41 +0100 Subject: [PATCH] parser: event_output_stream closing pushed in in preparation of the minidump handling through the envelope path, which probably requires dealing with the whole envelope as a whole. "theoretically" this might be less efficient, but [a] see the notes at the top of the parser on how we think about streaming parsing and [b] the inefficiencies are in the "immediate" path anyway (which has the assumtion that it's not in high-performance envs). "blind commit" (tests not run against this commit). --- bugsink/streams.py | 9 +++++++++ ingest/parsers.py | 23 +++++++++++++---------- ingest/views.py | 36 ++++++++++++++---------------------- 3 files changed, 36 insertions(+), 32 deletions(-) diff --git a/bugsink/streams.py b/bugsink/streams.py index 295fa56..e3c6120 100644 --- a/bugsink/streams.py +++ b/bugsink/streams.py @@ -170,3 +170,12 @@ class NullWriter: def close(self): pass + + +class UnclosableBytesIO(io.BytesIO): + """Intentionally does nothing on-close: BytesIO normally discards its buffer on .close(), breaking .getvalue(); this + overrides it so that we can use it in code that usually deals with real files (and calls .close()) while still using + the in-memory data afterwards. We just rely on the garbage collector for the actual cleanup.""" + + def close(self): + pass diff --git a/ingest/parsers.py b/ingest/parsers.py index 3cd419d..bdc1401 100644 --- a/ingest/parsers.py +++ b/ingest/parsers.py @@ -155,7 +155,6 @@ class StreamingEnvelopeParser: def get_items(self, output_stream_factory): # yields the item_headers and item_output_streams (with the content of the items written into them) - # closing the item_output_stream is the responsibility of the calller self.get_envelope_headers() @@ -175,17 +174,21 @@ class StreamingEnvelopeParser: finder = NewlineFinder() item_output_stream = output_stream_factory(item_headers) - self.remainder, self.at_eof = readuntil( - self.input_stream, self.remainder, finder, item_output_stream, self.chunk_size) - if "length" in item_headers: - # items with an explicit length are terminated by a newline (if at EOF, this is optional as per the set - # of examples in the docs) - should_be_empty = io.BytesIO() + try: self.remainder, self.at_eof = readuntil( - self.input_stream, self.remainder, NewlineFinder(), should_be_empty, self.chunk_size) - if should_be_empty.getvalue() != b"": - raise ParseError("Item with explicit length not terminated by newline/EOF") + self.input_stream, self.remainder, finder, item_output_stream, self.chunk_size) + + if "length" in item_headers: + # items with an explicit length are terminated by a newline (if at EOF, this is optional as per the + # set of examples in the docs) + should_be_empty = io.BytesIO() + self.remainder, self.at_eof = readuntil( + self.input_stream, self.remainder, NewlineFinder(), should_be_empty, self.chunk_size) + if should_be_empty.getvalue() != b"": + raise ParseError("Item with explicit length not terminated by newline/EOF") + finally: + item_output_stream.close() yield item_headers, item_output_stream diff --git a/ingest/views.py b/ingest/views.py index b9d47c9..93b1a49 100644 --- a/ingest/views.py +++ b/ingest/views.py @@ -2,7 +2,6 @@ import uuid import hashlib import os import logging -import io from datetime import datetime, timezone import json import jsonschema @@ -29,7 +28,8 @@ from issues.regressions import issue_is_regression from bugsink.transaction import immediate_atomic, delay_on_commit from bugsink.exceptions import ViolatedExpectation -from bugsink.streams import content_encoding_reader, MaxDataReader, MaxDataWriter, NullWriter, MaxLengthExceeded +from bugsink.streams import ( + content_encoding_reader, MaxDataReader, MaxDataWriter, NullWriter, MaxLengthExceeded, UnclosableBytesIO) from bugsink.app_settings import get_settings from events.models import Event @@ -162,10 +162,6 @@ class BaseIngestAPIView(View): performance_logger.info("ingested event with %s bytes", len(event_data_bytes)) cls.digest_event(event_metadata, event_data) else: - # In this case the stream will be a file that has been written the event's content to it. - # To ensure that the (possibly EAGER) handling of the digest has the file available, we flush it here: - event_data_stream.flush() - performance_logger.info("ingested event with %s bytes", event_data_stream.bytes_written) digest.delay(event_id, event_metadata) @@ -621,7 +617,7 @@ class IngestEnvelopeAPIView(BaseIngestAPIView): def factory(item_headers): if item_headers.get("type") == "event": if get_settings().DIGEST_IMMEDIATELY: - return MaxDataWriter("MAX_EVENT_SIZE", io.BytesIO()) + return MaxDataWriter("MAX_EVENT_SIZE", UnclosableBytesIO()) # envelope_headers["event_id"] is required when type=event per the spec (and takes precedence over the # payload's event_id), so we can rely on it having been set. @@ -643,23 +639,19 @@ class IngestEnvelopeAPIView(BaseIngestAPIView): return NullWriter() for item_headers, event_output_stream in parser.get_items(factory): - try: - if item_headers.get("type") != "event": - logger.info("skipping non-event item: %s", item_headers.get("type")) + if item_headers.get("type") != "event": + logger.info("skipping non-event item: %s", item_headers.get("type")) - if item_headers.get("type") == "transaction": - # From the spec of type=event: This Item is mutually exclusive with `"transaction"` Items. - # i.e. when we see a transaction, a regular event will not be present and we can stop. - logger.info("discarding the rest of the envelope") - break + if item_headers.get("type") == "transaction": + # From the spec of type=event: This Item is mutually exclusive with `"transaction"` Items. + # i.e. when we see a transaction, a regular event will not be present and we can stop. + logger.info("discarding the rest of the envelope") + break - continue + continue - self.process_event(ingested_at, envelope_headers["event_id"], event_output_stream, project, request) - break # From the spec of type=event: This Item may occur at most once per Envelope. once seen: done - - finally: - event_output_stream.close() + self.process_event(ingested_at, envelope_headers["event_id"], event_output_stream, project, request) + break # From the spec of type=event: This Item may occur at most once per Envelope. once seen: done return HttpResponse() @@ -690,7 +682,7 @@ class MinidumpAPIView(BaseIngestAPIView): # TSTTCPW: just ingest the invent as normally after we've done the minidump-parsing "immediately". We make # ready for the expectations of process_event (DIGEST_IMMEDIATELY/event_output_stream) with an if-statement - event_output_stream = MaxDataWriter("MAX_EVENT_SIZE", io.BytesIO()) + event_output_stream = MaxDataWriter("MAX_EVENT_SIZE", UnclosableBytesIO()) if get_settings().DIGEST_IMMEDIATELY: # in this case the stream will be an BytesIO object, so we can actually call .get_value() on it. event_output_stream.write(json.dumps(event_data).encode("utf-8"))