parser: event_output_stream closing pushed in

in preparation of the minidump handling through the envelope path,
which probably requires dealing with the whole envelope as a whole.

"theoretically" this might be less efficient, but [a] see the notes
at the top of the parser on how we think about streaming parsing and
[b] the inefficiencies are in the "immediate" path anyway (which has
the assumtion that it's not in high-performance envs).

"blind commit" (tests not run against this commit).
This commit is contained in:
Klaas van Schelven
2025-11-04 13:55:41 +01:00
parent e7aad45db2
commit 391e22bcf0
3 changed files with 36 additions and 32 deletions

View File

@@ -170,3 +170,12 @@ class NullWriter:
def close(self):
pass
class UnclosableBytesIO(io.BytesIO):
"""Intentionally does nothing on-close: BytesIO normally discards its buffer on .close(), breaking .getvalue(); this
overrides it so that we can use it in code that usually deals with real files (and calls .close()) while still using
the in-memory data afterwards. We just rely on the garbage collector for the actual cleanup."""
def close(self):
pass

View File

@@ -155,7 +155,6 @@ class StreamingEnvelopeParser:
def get_items(self, output_stream_factory):
# yields the item_headers and item_output_streams (with the content of the items written into them)
# closing the item_output_stream is the responsibility of the calller
self.get_envelope_headers()
@@ -175,17 +174,21 @@ class StreamingEnvelopeParser:
finder = NewlineFinder()
item_output_stream = output_stream_factory(item_headers)
self.remainder, self.at_eof = readuntil(
self.input_stream, self.remainder, finder, item_output_stream, self.chunk_size)
if "length" in item_headers:
# items with an explicit length are terminated by a newline (if at EOF, this is optional as per the set
# of examples in the docs)
should_be_empty = io.BytesIO()
try:
self.remainder, self.at_eof = readuntil(
self.input_stream, self.remainder, NewlineFinder(), should_be_empty, self.chunk_size)
if should_be_empty.getvalue() != b"":
raise ParseError("Item with explicit length not terminated by newline/EOF")
self.input_stream, self.remainder, finder, item_output_stream, self.chunk_size)
if "length" in item_headers:
# items with an explicit length are terminated by a newline (if at EOF, this is optional as per the
# set of examples in the docs)
should_be_empty = io.BytesIO()
self.remainder, self.at_eof = readuntil(
self.input_stream, self.remainder, NewlineFinder(), should_be_empty, self.chunk_size)
if should_be_empty.getvalue() != b"":
raise ParseError("Item with explicit length not terminated by newline/EOF")
finally:
item_output_stream.close()
yield item_headers, item_output_stream

View File

@@ -2,7 +2,6 @@ import uuid
import hashlib
import os
import logging
import io
from datetime import datetime, timezone
import json
import jsonschema
@@ -29,7 +28,8 @@ from issues.regressions import issue_is_regression
from bugsink.transaction import immediate_atomic, delay_on_commit
from bugsink.exceptions import ViolatedExpectation
from bugsink.streams import content_encoding_reader, MaxDataReader, MaxDataWriter, NullWriter, MaxLengthExceeded
from bugsink.streams import (
content_encoding_reader, MaxDataReader, MaxDataWriter, NullWriter, MaxLengthExceeded, UnclosableBytesIO)
from bugsink.app_settings import get_settings
from events.models import Event
@@ -162,10 +162,6 @@ class BaseIngestAPIView(View):
performance_logger.info("ingested event with %s bytes", len(event_data_bytes))
cls.digest_event(event_metadata, event_data)
else:
# In this case the stream will be a file that has been written the event's content to it.
# To ensure that the (possibly EAGER) handling of the digest has the file available, we flush it here:
event_data_stream.flush()
performance_logger.info("ingested event with %s bytes", event_data_stream.bytes_written)
digest.delay(event_id, event_metadata)
@@ -621,7 +617,7 @@ class IngestEnvelopeAPIView(BaseIngestAPIView):
def factory(item_headers):
if item_headers.get("type") == "event":
if get_settings().DIGEST_IMMEDIATELY:
return MaxDataWriter("MAX_EVENT_SIZE", io.BytesIO())
return MaxDataWriter("MAX_EVENT_SIZE", UnclosableBytesIO())
# envelope_headers["event_id"] is required when type=event per the spec (and takes precedence over the
# payload's event_id), so we can rely on it having been set.
@@ -643,23 +639,19 @@ class IngestEnvelopeAPIView(BaseIngestAPIView):
return NullWriter()
for item_headers, event_output_stream in parser.get_items(factory):
try:
if item_headers.get("type") != "event":
logger.info("skipping non-event item: %s", item_headers.get("type"))
if item_headers.get("type") != "event":
logger.info("skipping non-event item: %s", item_headers.get("type"))
if item_headers.get("type") == "transaction":
# From the spec of type=event: This Item is mutually exclusive with `"transaction"` Items.
# i.e. when we see a transaction, a regular event will not be present and we can stop.
logger.info("discarding the rest of the envelope")
break
if item_headers.get("type") == "transaction":
# From the spec of type=event: This Item is mutually exclusive with `"transaction"` Items.
# i.e. when we see a transaction, a regular event will not be present and we can stop.
logger.info("discarding the rest of the envelope")
break
continue
continue
self.process_event(ingested_at, envelope_headers["event_id"], event_output_stream, project, request)
break # From the spec of type=event: This Item may occur at most once per Envelope. once seen: done
finally:
event_output_stream.close()
self.process_event(ingested_at, envelope_headers["event_id"], event_output_stream, project, request)
break # From the spec of type=event: This Item may occur at most once per Envelope. once seen: done
return HttpResponse()
@@ -690,7 +682,7 @@ class MinidumpAPIView(BaseIngestAPIView):
# TSTTCPW: just ingest the invent as normally after we've done the minidump-parsing "immediately". We make
# ready for the expectations of process_event (DIGEST_IMMEDIATELY/event_output_stream) with an if-statement
event_output_stream = MaxDataWriter("MAX_EVENT_SIZE", io.BytesIO())
event_output_stream = MaxDataWriter("MAX_EVENT_SIZE", UnclosableBytesIO())
if get_settings().DIGEST_IMMEDIATELY:
# in this case the stream will be an BytesIO object, so we can actually call .get_value() on it.
event_output_stream.write(json.dumps(event_data).encode("utf-8"))