diff --git a/ingest/management/commands/send_json.py b/ingest/management/commands/send_json.py index 6e98b65..fc0211d 100644 --- a/ingest/management/commands/send_json.py +++ b/ingest/management/commands/send_json.py @@ -130,7 +130,9 @@ class Command(BaseCommand): } data_bytes = json.dumps(data).encode("utf-8") if use_envelope: - data_bytes = b'{}\n{"type": "event"}\n' + data_bytes # the smallest possible envelope. + # the smallest possible envelope: + data_bytes = (b'{}\n{"type": "event", "event_id": "%s"}\n' % (data["event_id"]).encode("utf-8") + + data_bytes) if compress in ["gzip", "deflate"]: if compress == "gzip": diff --git a/ingest/parsers.py b/ingest/parsers.py index 925e661..af3cf42 100644 --- a/ingest/parsers.py +++ b/ingest/parsers.py @@ -151,8 +151,11 @@ class StreamingEnvelopeParser: if "length" in item_headers: # items with an explicit length are terminated by a newline (if at EOF, this is optional as per the set # of examples in the docs) + should_be_empty = io.BytesIO() self.remainder, self.at_eof = readuntil( - self.input_stream, self.remainder, NewlineFinder(), io.BytesIO(), self.chunk_size) + self.input_stream, self.remainder, NewlineFinder(), should_be_empty, self.chunk_size) + if should_be_empty.getvalue() != b"": + raise ParseError("Item with explicit length not terminated by newline/EOF") yield item_headers, item_output_stream diff --git a/ingest/tests.py b/ingest/tests.py index 6ea9a62..fee4657 100644 --- a/ingest/tests.py +++ b/ingest/tests.py @@ -481,6 +481,9 @@ class TestParser(RegularTestCase): def test_missing_content_aka_length_too_long(self): # based on test_envelope_with_2_items_last_newline_omitted, but with length "41" replaced by "42" + + # > If length cannot be consumed, that is, the Envelope is EOF before the number of bytes has been consumed, + # > then the Envelope is malformed. parser = StreamingEnvelopeParser(io.BytesIO(b"""{"event_id":"9ec79c33ec9942ab8353589fcb2e04dc","dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}\n{"type":"event","length":42,"content_type":"application/json","filename":"application.log"}\n{"message":"hello world","level":"error"}""")) # noqa items = parser.get_items_directly() @@ -489,6 +492,19 @@ class TestParser(RegularTestCase): header, item = next(items) self.assertEquals("EOF while reading item with explicitly specified length", str(e.exception)) + def test_too_much_content_aka_length_too_short(self): + # based on test_envelope_with_2_items_last_newline_omitted, but with length "41" replaced by "40" + + # > Length-prefixed payloads must terminate with \n or EOF. The newline is not considered part of the payload. + # > Any other character, including whitespace, means the Envelope is malformed. + parser = StreamingEnvelopeParser(io.BytesIO(b"""{"event_id":"9ec79c33ec9942ab8353589fcb2e04dc","dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}\n{"type":"event","length":40,"content_type":"application/json","filename":"application.log"}\n{"message":"hello world","level":"error"}""")) # noqa + + items = parser.get_items_directly() + + with self.assertRaises(ParseError) as e: + header, item = next(items) + self.assertEquals("Item with explicit length not terminated by newline/EOF", str(e.exception)) + def test_non_json_header(self): parser = StreamingEnvelopeParser(io.BytesIO(b"""{"event_id":"9ec79c33ec9942ab8353589fcb2e04dc","dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}\nTHIS IS NOT JSON\n{"message":"hello world","level":"error"}""")) # noqa @@ -501,6 +517,9 @@ class TestParser(RegularTestCase): def test_eof_after_envelope_headers(self): # whether this is valid or not: not entirely clear from the docs. It won't matter in practice, of course # (because nothing interesting is contained) + # hints in the documentation that this is correct: + # > Header-only Example: <= this implies such an example might be seen in the wild + # > There can be an arbitrary number of Items in an Envelope <= 0 is an arbitrary number parser = StreamingEnvelopeParser(io.BytesIO(b"""{}""")) items = parser.get_items_directly() diff --git a/ingest/views.py b/ingest/views.py index c3f3fdb..f12119b 100644 --- a/ingest/views.py +++ b/ingest/views.py @@ -272,17 +272,27 @@ class IngestEnvelopeAPIView(BaseIngestAPIView): # TODO: use the envelope_header's DSN if it is available (exact order-of-operations will depend on load-shedding # mechanisms) # envelope_headers = parser.get_envelope_headers() + # envelope_headers["event_id"] is required when type=event per the spec (and takes precedence over the payload's + # event_id), so we can relay on it having been set. for item_headers, output_stream in parser.get_items(lambda item_headers: io.BytesIO()): try: item_bytes = output_stream.getvalue() if item_headers.get("type") != "event": logger.info("skipping non-event item: %s", item_headers.get("type")) + + if item_headers.get("type") == "transaction": + # From the spec of type=event: This Item is mutually exclusive with `"transaction"` Items. + # i.e. when we see a transaction, a regular event will not be present and we can stop. + logger.info("discarding the rest of the envelope") + break + continue event_data = json.loads(item_bytes.decode("utf-8")) self.process_event(event_data, project, request) + break # From the spec of type=event: This Item may occur at most once per Envelope. i.e. seen=done finally: output_stream.close()