Merge pull request #266 from bugsink/brotli-bombs-and-1.2

Fixes for brotli bombs (associated with brotli-1.2 upgrade); GeneratorReader fix
This commit is contained in:
Klaas van Schelven
2025-11-08 11:45:08 +01:00
committed by GitHub
3 changed files with 74 additions and 58 deletions

View File

@@ -3,7 +3,6 @@ import io
import brotli
from bugsink.app_settings import get_settings
from bugsink.utils import assert_
DEFAULT_CHUNK_SIZE = 8 * 1024
@@ -38,43 +37,54 @@ def zlib_generator(input_stream, wbits, chunk_size=DEFAULT_CHUNK_SIZE):
def brotli_generator(input_stream, chunk_size=DEFAULT_CHUNK_SIZE):
# implementation notes: in principle chunk_size for input and output could be different, we keep them the same here.
# I've also seen that the actual output data may be quite a bit larger than the output_buffer_limit; a detail that
# I do not fully understand (but I understand that at least it's not _unboundedly_ larger).
decompressor = brotli.Decompressor()
input_is_finished = False
while True:
compressed_chunk = input_stream.read(chunk_size)
if not compressed_chunk:
break
while not (decompressor.is_finished() and input_is_finished):
if decompressor.can_accept_more_data():
compressed_chunk = input_stream.read(chunk_size)
if not compressed_chunk:
input_is_finished = True
data = decompressor.process(b"", output_buffer_limit=chunk_size) # b"": no input available, "drain"
else:
data = decompressor.process(compressed_chunk, output_buffer_limit=chunk_size)
else:
data = decompressor.process(b"", output_buffer_limit=chunk_size) # b"" compressor cannot accept more input
yield decompressor.process(compressed_chunk)
assert_(decompressor.is_finished())
if data:
yield data
class GeneratorReader:
"""Read from a generator (yielding bytes) as from a file-like object."""
def __init__(self, generator):
self.generator = generator
self.unread = b""
self.buffer = bytearray()
def read(self, size=None):
if size is None:
for chunk in self.generator:
self.unread += chunk
result = self.unread
self.unread = b""
self.buffer.extend(chunk)
result = bytes(self.buffer)
self.buffer.clear()
return result
while size > len(self.unread):
while len(self.buffer) < size:
try:
chunk = next(self.generator)
if chunk == b"":
if not chunk:
break
self.unread += chunk
self.buffer.extend(chunk)
except StopIteration:
break
self.unread, result = self.unread[size:], self.unread[:size]
result = bytes(self.buffer[:size])
del self.buffer[:size]
return result

View File

@@ -43,68 +43,43 @@ class StreamsTestCase(RegularTestCase):
def test_compress_decompress_gzip(self):
with open(__file__, 'rb') as f:
myself_times_ten = f.read() * 10
plain_stream = io.BytesIO(myself_times_ten)
compressed_stream = io.BytesIO(compress_with_zlib(plain_stream, WBITS_PARAM_FOR_GZIP))
result = b""
reader = GeneratorReader(zlib_generator(compressed_stream, WBITS_PARAM_FOR_GZIP))
while True:
chunk = reader.read(3)
result += chunk
if chunk == b"":
break
self.assertEqual(myself_times_ten, result)
self.assertEqual(myself_times_ten, reader.read())
def test_compress_decompress_deflate(self):
with open(__file__, 'rb') as f:
myself_times_ten = f.read() * 10
plain_stream = io.BytesIO(myself_times_ten)
compressed_stream = io.BytesIO(compress_with_zlib(plain_stream, WBITS_PARAM_FOR_DEFLATE))
result = b""
reader = GeneratorReader(zlib_generator(compressed_stream, WBITS_PARAM_FOR_DEFLATE))
while True:
chunk = reader.read(3)
result += chunk
if chunk == b"":
break
self.assertEqual(myself_times_ten, result)
self.assertEqual(myself_times_ten, reader.read())
def test_compress_decompress_brotli(self):
with open(__file__, 'rb') as f:
myself_times_ten = f.read() * 10
compressed_stream = io.BytesIO(brotli.compress(myself_times_ten))
result = b""
reader = GeneratorReader(brotli_generator(compressed_stream))
while True:
chunk = reader.read(3)
result += chunk
if chunk == b"":
break
self.assertEqual(myself_times_ten, reader.read())
self.assertEqual(myself_times_ten, result)
def test_decompress_brotli_tiny_bomb(self):
# by picking something "sufficiently large" we can ensure all three code paths in brotli_generator are taken,
# in particular the "cannot accept more input" path. (for it to be taken, we need a "big thing" on the output
# side)
compressed_stream = io.BytesIO(brotli.compress(b"\x00" * 15_000_000))
def test_compress_decompress_read_none(self):
with open(__file__, 'rb') as f:
myself_times_ten = f.read() * 10
plain_stream = io.BytesIO(myself_times_ten)
compressed_stream = io.BytesIO(compress_with_zlib(plain_stream, WBITS_PARAM_FOR_DEFLATE))
result = b""
reader = GeneratorReader(zlib_generator(compressed_stream, WBITS_PARAM_FOR_DEFLATE))
result = reader.read(None)
self.assertEqual(myself_times_ten, result)
size = 0
generator = brotli_generator(compressed_stream)
for chunk in generator:
size += len(chunk)
self.assertEqual(15_000_000, size)
def test_max_data_reader(self):
stream = io.BytesIO(b"hello" * 100)
@@ -143,6 +118,37 @@ class StreamsTestCase(RegularTestCase):
with self.assertRaises(ValueError):
writer.write(b"hellohello")
def test_generator_reader(self):
def generator():
yield b"hello "
yield b"I am "
yield b"a generator"
reader = GeneratorReader(generator())
self.assertEqual(b"hel", reader.read(3))
self.assertEqual(b"lo ", reader.read(3))
self.assertEqual(b"I a", reader.read(3))
self.assertEqual(b"m a", reader.read(3))
self.assertEqual(b" generator", reader.read(None))
def test_generator_reader_performance(self):
# at least one test directly for GeneratorReader; doubles as a regression test for performance issue that showed
# up when the underlying generator yielded relatively big chunks and the read() size was small. should run
# easily under a second.
def yielding_big_chunks():
yield b"x" * 500_000
read = []
reader = GeneratorReader(yielding_big_chunks())
while True:
chunk = reader.read(1)
if chunk == b"":
break
read.append(chunk)
@override_settings(DEBUG_CSRF=True)
class CSRFViewsTestCase(DjangoTestCase):

View File

@@ -7,7 +7,7 @@ semver==3.0.*
django-admin-autocomplete-filter==0.7.*
pygments==2.19.*
inotify_simple==2.0.*
Brotli==1.1.*
Brotli==1.2.*
python-dateutil==2.9.*
whitenoise==6.11.*
requests==2.32.*