Tools for gzip/deflate and max-size reading/writing

This commit is contained in:
Klaas van Schelven
2024-04-24 15:30:12 +02:00
parent 483aca6802
commit 59bc105457
2 changed files with 131 additions and 0 deletions

82
bugsink/streams.py Normal file
View File

@@ -0,0 +1,82 @@
import zlib
DEFAULT_CHUNK_SIZE = 8 * 1024
# https://docs.python.org/3/library/zlib.html#zlib.decompress
# > +24 to +31 = 16 + (8 to 15): Uses the low 4 bits of the value as the window size logarithm. The input must include a
# > gzip header and trailer.
WBITS_PARAM_FOR_GZIP = 16 + zlib.MAX_WBITS # zlib.MAX_WBITS == 15
# "deflate" simply means: the same algorithm as used for "gzip", but without the gzip header.
# https://docs.python.org/3/library/zlib.html#zlib.decompress
# > 8 to 15: Uses the absolute value of wbits as the window size logarithm. The input must be a raw stream with no
# > header or trailer.
WBITS_PARAM_FOR_DEFLATE = -zlib.MAX_WBITS
def decompress_with_zlib(input_stream, output_stream, wbits, chunk_size=DEFAULT_CHUNK_SIZE):
z = zlib.decompressobj(wbits=wbits)
while True:
compressed_chunk = input_stream.read(chunk_size)
if not compressed_chunk:
break
output_stream.write(z.decompress(compressed_chunk))
output_stream.write(z.flush())
def compress_with_zlib(input_stream, output_stream, wbits, chunk_size=DEFAULT_CHUNK_SIZE):
# mostly useful for testing (compress-decompress cycles)
z = zlib.compressobj(wbits=wbits)
while True:
uncompressed_chunk = input_stream.read(chunk_size)
if not uncompressed_chunk:
break
output_stream.write(z.compress(uncompressed_chunk))
output_stream.write(z.flush())
class MaxDataReader:
def __init__(self, stream, max_length):
self.bytes_read = 0
self.stream = stream
self.max_length = max_length
def read(self, size=None):
if size is None:
raise ValueError("MaxDataReader.read() - size must be specified")
# Note: we raise the error when an attempt is made to read to much data. In theory/principle this means that we
# could be too strict, because we would complain before the actual problem had occurred, and the downstream read
# may actually return something much smaller than what we request.
# In practice [1] this is a rounding error [2] max sizes are usually integer multiples of our chunk size.
# (this tool is meant to be used in some chunked setting)
self.bytes_read += size
if self.bytes_read > self.max_length:
raise ValueError("Max length exceeded")
return self.stream.read(size)
class MaxDataWriter:
def __init__(self, stream, max_length):
self.bytes_written = 0
self.stream = stream
self.max_length = max_length
def write(self, data):
self.bytes_written += len(data)
if self.bytes_written > self.max_length:
raise ValueError("Max length exceeded")
self.stream.write(data)

View File

@@ -1,3 +1,4 @@
import io
from datetime import datetime, timezone
from unittest import TestCase as RegularTestCase
@@ -12,6 +13,9 @@ from events.factories import create_event
from .period_counter import PeriodCounter, _prev_tup, TL_DAY, TL_MONTH, TL_YEAR
from .volume_based_condition import VolumeBasedCondition
from .registry import PeriodCounterRegistry
from .streams import (
compress_with_zlib, decompress_with_zlib, WBITS_PARAM_FOR_GZIP, WBITS_PARAM_FOR_DEFLATE, MaxDataReader,
MaxDataWriter)
def apply_n(f, n, v):
@@ -196,3 +200,48 @@ class PCRegistryTestCase(DjangoTestCase):
self.assertEquals({project.id}, by_project.keys())
self.assertEquals({issue.id}, by_issue.keys())
self.assertEquals({(1, 100)}, by_issue[issue.id].event_listeners[TL_DAY].keys())
class StreamsTestCase(RegularTestCase):
def test_compress_decompress_gzip(self):
myself_times_ten = open(__file__, 'rb').read() * 10
plain_stream = io.BytesIO(myself_times_ten)
compressed_stream = io.BytesIO()
result_stream = io.BytesIO()
compress_with_zlib(plain_stream, compressed_stream, WBITS_PARAM_FOR_GZIP)
compressed_stream.seek(0)
decompress_with_zlib(compressed_stream, result_stream, WBITS_PARAM_FOR_GZIP)
self.assertEquals(myself_times_ten, result_stream.getvalue())
def test_compress_decompress_deflate(self):
myself_times_ten = open(__file__, 'rb').read() * 10
plain_stream = io.BytesIO(open(__file__, 'rb').read() * 10)
compressed_stream = io.BytesIO()
result_stream = io.BytesIO()
compress_with_zlib(plain_stream, compressed_stream, WBITS_PARAM_FOR_DEFLATE)
compressed_stream.seek(0)
decompress_with_zlib(compressed_stream, result_stream, WBITS_PARAM_FOR_DEFLATE)
self.assertEquals(myself_times_ten, result_stream.getvalue())
def test_max_data_reader(self):
stream = io.BytesIO(b"hello" * 100)
reader = MaxDataReader(stream, 250)
for i in range(25):
self.assertEquals(b"hellohello", reader.read(10))
with self.assertRaises(ValueError):
self.assertEquals(b"hellohello", reader.read(10))
def test_max_data_writer(self):
stream = io.BytesIO()
writer = MaxDataWriter(stream, 250)
for i in range(25):
writer.write(b"hellohello")
with self.assertRaises(ValueError):
writer.write(b"hellohello")